This is a bilingual snapshot page saved by the user at 2024-6-25 19:24 for https://app.immersivetranslate.com/word/, provided with bilingual support by Immersive Translate. Learn how to save?

尚硅谷大数据技术之高频面试题
High frequency interview questions for big data technology in Silicon Valley

—————————————————————————————

尚硅谷大数据技术之高频面试题
High-frequency interview questions on big data technology in Silicon Valley

作者:尚硅谷研究院)
(Author: Shang Silicon Valley Research Institute)

版本:V9.2.0
Version: V9.2.0

目录

第1章 核心技术12
Chapter 1 Core Technologies 12

1.1 Linux&Shell12

1.1.1 Linux常用高级命令12
1.1.1 Common Linux Advanced Commands 12

1.1.2 Shell常用工具及写过的脚本12
1.1.2 Common Shell Tools and Scripts Written 12

1.1.3 Shell中单引号和双引号区别13
1.1.3 Differences between single and double quotation marks in shells 13

1.2 Hadoop13

1.2.1 Hadoop常用端口号13
1.2.1 Common Hadoop port number 13

1.2.2 HDFS读流程和写流程14
1.2.2 HDFS read and write flows 14

1.2.3 HDFS小文件处理15
1.2.3 HDFS Small File Handling 15

1.2.4 HDFS的NameNode内存15

1.2.5 Shuffle及优化16

1.2.6 Yarn工作机制17
1.2.6 How Yarn works 17

1.2.7 Yarn调度器17
1.2.7 Yarn scheduler 17

1.2.8 HDFS块大小18
1.2.8 HDFS block size 18

1.2.9 Hadoop脑裂原因及解决办法?19
1.2.9 What are the causes and solutions of Hadoop split-brain? 19

1.3 Zookeeper19

1.3.1 常用命令19
1.3.1 Common commands 19

1.3.2 选举机制19
1.3.2 Electoral mechanisms 19

1.3.3 Zookeeper符合法则中哪两个?21
1.3.3 Which two of the Zookeeper rules meet? 21

1.3.4 Zookeeper脑裂21

1.3.5 Zookeeper用来干嘛了21
1.3.5 What is Zookeeper used for 21

1.4 Flume21

1.4.1 Flume组成,Put事务,Take事务21
1.4.1 Flume Composition, Put Transaction, Take Transaction 21

1.4.2 Flume拦截器23
1.4.2 Flume interceptor 23

1.4.3 Flume Channel选择器24

1.4.4 Flume监控器24
1.4.4 Flume Monitor 24

1.4.5 Flume采集数据会丢失吗?24
1.4.5 Will the data collected by Flume be lost? 24

1.4.6 Flume如何提高吞吐量24
1.4.6 How Flume can increase throughput 24

1.5 Kafka24

1.5.1 Kafka架构24

1.5.2 Kafka生产端分区分配策略26
1.5.2 Kafka Production-side Partition Allocation Policy 26

1.5.3 Kafka丢不丢数据27
1.5.3 Kafka loses data 27

1.5.4 Kafka的ISR副本同步队列28
1.5.4 ISR replica synchronization queue for Kafka 28

1.5.5 Kafka数据重复28
1.5.5 Kafka data duplication 28

1.5.6 Kafka如何保证数据有序or怎么解决乱序29
1.5.6 How Kafka keeps data in order or solves disorder 29

1.5.7 Kafka分区Leader选举规则31
1.5.7 Kafka Divisional Leader Election Rule 31

1.5.8 Kafka中AR的顺序31
1.5.8 Order of AR in Kafka 31

1.5.9 Kafka日志保存时间32
1.5.9 Kafka log retention time 32

1.5.10 Kafka过期数据清理32
1.5.10 Kafka obsolete data cleanup 32

1.5.11 Kafka为什么能高效读写数据33
1.5.11 Why Kafka reads and writes data efficiently 33

1.5.12 自动创建主题34
1.5.12 Automatic topic creation 34

1.5.13 副本数设定34
1.5.13 Number of copies set 34

1.5.14 Kakfa分区数34
1.5.14 Number of Kakfa partitions 34

1.5.15 Kafka增加分区35
1.5.15 Kafka increases partition 35

1.5.16 Kafka中多少个Topic35

1.5.17 Kafka消费者是拉取数据还是推送数据35
1.5.17 Kafka Whether Consumers Pull or Push Data 35

1.5.18 Kafka消费端分区分配策略36
1.5.18 Kafka Consumer Partition Allocation Policy 36

1.5.19 消费者再平衡的条件37
1.5.19 Conditions for consumer rebalancing 37

1.5.20 指定Offset消费37
1.5.20 Specify Offset Consumption 37

1.5.21 指定时间消费38
1.5.21 Consumption at specified times 38

1.5.22 Kafka监控38

1.5.23 Kafka数据积压38
1.5.23 Kafka data backlog 38

1.5.24 如何提升吞吐量39
1.5.24 How to improve throughput 39

1.5.25 Kafka中数据量计算40
1.5.25 Calculation of data volume in Kafka 40

1.5.26 Kafka如何压测?40
1.5.26 How does Kafka pressure test? 40

1.5.27 磁盘选择42
1.5.27 Disk Selection 42

1.5.28 内存选择42
1.5.28 Memory Selection 42

1.5.29 CPU选择43

1.5.30 网络选择43
1.5.30 Network Selection 43

1.5.31 Kafka挂掉44

1.5.32 Kafka的机器数量44
1.5.32 Number of machines in Kafka 44

1.5.33 服役新节点退役旧节点44
1.5.33 New nodes in service Retired old nodes 44

1.5.34 Kafka单条日志传输大小44
1.5.34 Kafka Single Log Transfer Size 44

1.5.35 Kafka参数优化45
1.5.35 Kafka parameter optimization 45

1.6 Hive46

1.6.1 Hive的架构46
1.6.1 Hive architecture 46

1.6.2 HQL转换为MR流程46
1.6.2 HQL to MR Flow 46

1.6.3 Hive和数据库比较47
1.6.3 Hive vs. Database Comparison 47

1.6.4 内部表和外部表48
1.6.4 Internal and external tables 48

1.6.5 系统函数48
1.6.5 System functions 48

1.6.6 自定义UDF、UDTF函数49
1.6.6 Custom UDF, UDTF functions 49

1.6.7 窗口函数50
1.6.7 Window functions 50

1.6.8 Hive优化52

1.6.9 Hive解决数据倾斜方法59
1.6.9 Hive Solutions for Data Skew 59

1.6.10 Hive的数据中含有字段的分隔符怎么处理?64
1.6.10 What about separators in Hive data that contain fields? 64

1.6.11 MySQL元数据备份64
1.6.11 MySQL Metadata Backup 64

1.6.12 如何创建二级分区表?65
1.6.12 How do I create a secondary partition table? 65

1.6.13 Union与Union all区别65

1.7 Datax65

1.7.1 DataX与Sqoop区别65

1.7.2 速度控制66
1.7.2 Speed control 66

1.7.3 内存调整66
1.7.3 Memory Adjustments 66

1.7.4 空值处理66
1.7.4 Handling of null values 66

1.7.5 配置文件生成脚本67
1.7.5 Profile generation scripts 67

1.7.6 DataX一天导入多少数据67
1.7.6 DataX How much data is imported in a day 67

1.7.7 Datax如何实现增量同步68
1.7.7 Datax How to achieve incremental synchronization 68

1.8 Maxwell68

1.8.1 Maxwell与Canal、FlinkCDC的对比68

1.8.2 Maxwell好处68

1.8.3 Maxwell底层原理69
1.8.3 Maxwell's underlying principles 69

1.8.4 全量同步速度如何69
1.8.4 How to synchronize the speed of the whole quantity 69

1.8.5 Maxwell数据重复问题69
1.8.5 Maxwell Data Duplication Problem 69

1.9 DolphinScheduler调度器69

1.9.1 每天集群运行多少指标?69
1.9.1 How many metrics does the cluster run per day? 69

1.9.2 任务挂了怎么办?69
1.9.2 What if the mission fails? 69

1.9.3DS挂了怎么办?69
1.9.3 What happens when DS dies? 69

1.10 Spark Core & SQL70

1.10.1 Spark运行模式70
1.10.1 Spark Operating Mode 70

1.10.2 Spark常用端口号70
1.10.2 Spark Common Port Number 70

1.10.3 RDD五大属性70
1.10.3 RDD Five Attributes 70

1.10.4 RDD弹性体现在哪里71
1.10.4 Where is RDD Resilience Reflected 71

1.10.5 Spark的转换算子(8个)71
1.10.5 Spark's transformation operators (8) 71

1.10.6 Spark的行动算子(5个)72
1.10.6 Spark's Action Operators (5) 72

1.10.7 map和mapPartitions区别72

1.10.8 Repartition和Coalesce区别72

1.10.9 reduceByKey与groupByKey的区别73

1.10.10 Spark中的血缘73
1.10.10 Bloodlines in Spark 73

1.10.11 Spark任务的划分73
1.10.11 Division of Spark Tasks 73

1.10.12 SparkSQL中RDD、DataFrame、DataSet三者的转换及区别74
1.10.12 Conversion and Difference of RDD, DataFrame and DataSet in SparkSQL 74

1.10.13 Hive on Spark和Spark on Hive区别74

1.10.14 Spark内核源码(重点)74
1.10.14 Spark kernel source code (focus) 74

1.10.15 Spark统一内存模型76
1.10.15 Spark Unified Memory Model 76

1.10.16 Spark为什么比MR快?77
1.10.16 Why is Spark faster than MR? 77

1.10.17 Spark Shuffle和Hadoop Shuffle区别?78

1.10.18 Spark提交作业参数(重点)78
1.10.18 Spark Submission Job Parameters (Key) 78

1.10.19 Spark任务使用什么进行提交,JavaEE界面还是脚本79
1.10.19 What do Spark tasks use for submission, Java EE interface or script 79

1.10.20 请列举会引起Shuffle过程的Spark算子,并简述功能。79
1.10.20 List the Spark operators that give rise to Shuffle processes and describe their functions briefly. 79

1.10.21 Spark操作数据库时,如何减少Spark运行中的数据库连接数?79
1.10.21 How do I reduce the number of database connections Spark has running when I'm working with a database? 79

1.10.22 Spark数据倾斜79
1.10.22 Spark data tilt 79

1.10.23 Spark3.0新特性79

1.12 Flink80

1.12.1 Flink基础架构组成?80
1.12.1 Flink Infrastructure Composition? 80

1.12.2Flink和Spark Streaming的区别?80

1.12.3 Flink提交作业流程及核心概念81
1.12.3 Flink Submission Workflow and Core Concepts 81

1.12.4 Flink的部署模式及区别?82
1.12.4 Flink deployment patterns and differences? 82

1.12.5 Flink任务的并行度优先级设置?资源一般如何配置?82
1.12.5 Parallel Priority Setting for Flink Tasks? How are resources generally allocated? 82

1.12.6 Flink的三种时间语义83
1.12.6 Flink's three temporal semantics 83

1.12.7 你对Watermark的认识83
1.12.7 What you know about Watermark 83

1.12.8 Watermark多并行度下的传递、生成原理83
1.12.8 Watermark Transmission and Generation Principle under Multi-parallelism 83

1.12.9 Flink怎么处理乱序和迟到数据?84
1.12.9 How does Flink handle out-of-order and late data? 84

1.12.10 说说Flink中的窗口(分类、生命周期、触发、划分)85
1.12.10 Talk about windows in Flink (classification, lifecycle, trigger, partition) 85

1.12.11 Flink的keyby怎么实现的分区?分区、分组的区别是什么?86
1.12.11 Flink keyby how to achieve partition? What is the difference between subdivisions and groupings? 86

1.12.12 Flink的Interval Join的实现原理?Join不上的怎么办?86
1.12.12 How does Flink's Interval Join work? What if I don't join? 86

1.12.13 介绍一下Flink的状态编程、状态机制?86
1.12.13 Introduce Flink's state programming and state mechanism? 86

1.12.14 Flink如何实现端到端一致性?87
1.12.14 How does Flink achieve end-to-end consistency? 87

1.12.15 分布式异步快照原理88
1.12.15 Principles of Distributed Asynchronous Snapshots 88

1.12.16 Checkpoint的参数怎么设置的?88
1.12.16 How are Checkpoint parameters set? 88

1.12.17 Barrier对齐和不对齐的区别88
1.12.17 Difference between Barrier alignment and misalignment 88

1.12.18 Flink内存模型(重点)89
1.12.18 Flink Memory Model (Key) 89

1.12.19 Flink常见的维表Join方案89
1.12.19 Flink Common Dimension Tables Join Scheme 89

1.12.20 Flink的上下文对象理解89
1.12.20 Context Object Understanding for Flink 89

1.12.21 Flink网络调优-缓冲消胀机制90
1.12.21 Flink Network Tuning-Buffering and Deinflation Mechanism 90

1.12.22 FlinkCDC锁表问题90
1.12.22 FlinkCDC lock table problem 90

1.13 HBase90

1.13.1 HBase存储结构90
1.13.1 HBase Storage Structure 90

1.13.2 HBase的写流程92
1.13.2 HBase writing flow 92

1.13.3 HBase的读流程93
1.13.3 HBase reading flow 93

1.13.4 HBase的合并94
1.13.4 Consolidation of HBase 94

1.13.5 RowKey设计原则94
1.13.5 RowKey Design Principles 94

1.13.6 RowKey如何设计94
1.13.6 How RowKey is designed 94

1.13.7 HBase二级索引原理95
1.13.7 HBase secondary indexing principle 95

1.14 Clickhouse95

1.14.1 Clickhouse的优势95

1.14.2 Clickhouse的引擎95

1.14.3 Flink写入Clickhouse怎么保证一致性?96
1.14.3 Flink writes Clickhouse How to guarantee consistency? 96

1.14.4 Clickhouse存储多少数据?几张表?96
1.14.4 How much data does Clickhouse store? How many forms? 96

1.14.5 Clickhouse使用本地表还是分布式表96
1.14.5 Clickhouse Using Local or Distributed Tables 96

1.14.6 Clickhouse的物化视图96
1.14.6 Materialized Views of Clickhouse 96

1.14.7 Clickhouse的优化97

1.14.8 Clickhouse的新特性Projection97

1.14.9 Cilckhouse的索引、底层存储97
1.14.9 Indexing and underlying storage for Cilckhouse 97

1.15 Doris99

1.15.1 Doris中的三种模型及对比?99
1.15.1 The three models in Doris and how they compare? 99

1.15.2 Doris的分区分桶怎么理解,怎么划分字段99
1.15.2 How to understand the partition and bucket of Doris, how to divide the field 99

1.15.3 生产中节点多少个,FE,BE 那个对于CPU和内存的消耗大99
1.15.3 How many nodes are in production, FE, BE That consumes more than 99 CPU and memory

1.15.4 Doris使用过程中遇到过哪些问题?99
1.15.4 What problems have you encountered with Doris? 99

1.15.5 Doris跨库查询,关联MySQL有使用过吗99
1.15.5 Doris cross-database query, associated MySQL have you ever used 99

1.15.6 Doris的roll up和物化视图区别99
1.15.6 Difference between roll up and materialized view of Doris 99

1.15.7 Doris的前缀索引99
1.15.7 Prefix index of Doris 99

1.16 可视化报表工具99
1.16 Visual reporting tools 99

1.17 JavaSE100

1.17.1 并发编程100
1.17.1 Concurrent programming 100

1.17.2 如何创建线程池100
1.17.2 How to create a thread pool 100

1.17.3 ThreadPoolExecutor构造函数参数解析101
1.17.3 Parsing ThreadPoolExecutor constructor parameters 101

1.17.4 线程的生命周期101
1.17.4 Life cycle of threads 101

1.17.5 notify和notifyall区别101

1.17.6 集合101
1.17.6 Collections 101

1.17.7 列举线程安全的Map集合101
1.17.7 Listing Thread-Safe Map Collections 101

1.17.8 StringBuffer和StringBuilder的区别101

1.17.9 HashMap和HashTable的区别101

1.17.10 HashMap的底层原理102
1.17.10 The underlying principle of HashMap 102

1.17.11 项目中使用过的设计模式103
1.17.11 Design patterns used in projects 103

1.18 MySQL104

1.18.1 SQL执行顺序104
1.18.1 SQL Execution Order 104

1.18.2 TRUNCATE 、DROP、DELETE区别104

1.18.3 MyISAM与InnoDB的区别104

1.18.4 MySQL四种索引104
1.18.4 MySQL Four Indexes 104

1.18.5 MySQL的事务105

1.18.6 MySQL事务隔离级别105
MySQL Transaction Isolation Level 105

1.18.7 MyISAM与InnoDB对比105

1.18.8 B树和B+树对比106
1.18.8 Comparison of B trees and B+ trees

1.19 Redis106

1.19.1 Redis缓存穿透、缓存雪崩、缓存击穿106
1.19.1 Redis cache penetration, cache avalanche, cache breakdown 106

1.19.2 Redis哨兵模式107
1.19.2 Redis Sentinel Mode 107

1.19.3 Redis数据类型107
1.19.3 Redis data types 107

1.19.4 热数据通过什么样的方式导入Redis108
1.19.4 How hot data is imported into Redis 108

1.19.5 Redis的存储模式RDB,AOF108
1.19.5 Redis storage mode RDB, AOF 108

1.19.6 Redis存储的是k-v类型,为什么还会有Hash?108
1.19.6 Redis stores k-v type, why is there a Hash? 108

1.20 JVM108

第2章 离线数仓项目108
Chapter 2: The Last Day

2.1 提高自信108
2.1 Increased Confidence 108

2.2 为什么做这个项目109
2.2 Why did you do this project 109?

2.3 数仓概念109
2.3 Number of Concept 109

2.4 项目架构110
2.4 Project Architecture 110

2.5 框架版本选型110
2.5 Frame Version Selection 110

2.6 服务器选型112
2.6 Server selection 112

2.7 集群规模112
2.7 Cluster Size 112

2.8 人员配置参考115
2.8 Staffing Reference 115

2.8.1 整体架构115
2.8.1 Overall structure 115

2.8.2 你的的职级等级及晋升规则115
2.8.2 Your rank and promotion rules 115

2.8.3 人员配置参考116
2.8.3 Staffing Reference 116

2.9 从0-1搭建项目,你需要做什么?117
2.9 Building a project from 0-1, what do you need to do? 117

2.10 数仓建模准备118
2.10 Warehouse modeling preparation 118

2.11 数仓建模119
2.11 Warehouse modeling 119

2.12 数仓每层做了哪些事122
2.12 What do you do on each floor? 122.

2.13 数据量124
2.13 Volume of data 124

2.14 项目中遇到哪些问题?(*****)125
2.14 What problems were encountered in the project?(*****) 125

2.15 离线---业务126
2.15 Offline---Business 126

2.15.1 SKU和SPU126

2.15.2 订单表跟订单详情表区别?126
2.15.2 What is the difference between an order form and an order details form? 126

2.15.3 上卷和下钻126
2.15.3 Upper and Lower Drills 126

2.15.4 TOB和TOC解释127

2.15.5 流转G复活指标127
2.15.5 Circulation G Revival Indicator 127

2.15.6 活动的话,数据量会增加多少?怎么解决?128
2.15.6 How much more data will be added if the event is active? How? 128

2.15.7 哪个商品卖的好?128
2.15.7 Which product sells well? 128

2.15.8 数据仓库每天跑多少张表,大概什么时候运行,运行多久?128
2.15.8 How many tables does the data warehouse run per day, when and how long does it run? 128

2.15.9 哪张表数据量最大129
2.15.9 Which table has the largest amount of data 129

2.15.10 哪张表最费时间,有没有优化130
2.15.10 Which table is the most time-consuming and optimized? 130

2.15.11 并发峰值多少?大概哪个时间点?131
2.15.11 How many concurrent peaks? About what time? 131

2.15.12 分析过最难的指标131
2.15.12 The most difficult indicators analyzed 131

2.15.13 数仓中使用的哪种文件存储格式131
2.15.13 Which file storage format is used in Warehouse 131

2.15.14 数仓当中数据多久删除一次131
2.15.14 How often is data deleted in the warehouse 131

2.15.15 Mysql业务库中某张表发生变化,数仓中表需要做什么改变131
2.15.15 A table in Mysql business library has changed. What changes need to be made to the table in data warehouse 131

2.15.16 50多张表关联,如何进行性能调优131
2.15.16 50 Multiple table associations, how to tune performance 131

2.15.17 拉链表的退链如何实现131
2.15.17 How to realize the chain withdrawal of zipper table 131

2.15.18 离线数仓如何补数132
2.15.18 How to make up 132 offline positions

2.15.19 当ADS计算完,如何判断指标是正确的132
2.15.19 When ADS is calculated, how to determine whether the indicator is correct 132

2.15.20 ADS层指标计算错误,如何解决132
2.15.20 ADS layer index calculation error, how to solve 132

2.15.21 产品给新指标,该如何开发132
2.15.21 How to develop new indicators for products 132

2.15.22 新出指标,原有建模无法实现,如何操作133
2.15.22 New indicators, the original modeling can not be achieved, how to operate 133

2.15.23 和哪些部门沟通,以及沟通什么内容133
2.15.23 Which departments to communicate with and what to communicate

2.15.24 你们的需求和指标都是谁给的133
2.15.24 Who gave you your needs and indicators?

2.15.25 任务跑起来之后,整个集群资源占用比例133
2.15.25 After the task runs, the resource consumption ratio of the whole cluster is 133

2.15.26 业务场景:时间跨度比较大,数据模型的数据怎么更新的,例如:借款,使用一年,再还款,这个数据时间跨度大,在处理的时候怎么处理133
2.15.26 Business scenario: The time span is relatively large. How to update the data of the data model, for example: borrow money, use it for one year, and then repay it. The time span of this data is large. How to deal with it when processing 133

2.15.27 数据倾斜场景除了group by 和join外,还有哪些场景133
2.15.27 Data Tilt Scenarios What are the Scenarios besides group by and join?

2.15.28 你的公司方向是电子商务,自营的还是提货平台?你们会有自己的商品吗?134
2.15.28 Is your company oriented towards e-commerce, self-employed or delivery platform? Do you have your own merchandise? 134

2.15.29 ods事实表中订单状态会发生变化,你们是通过什么方式去监测数据变化的134
2.15.29 ods fact table order status will change, how do you monitor the data change 134

2.15.30 用户域你们构建了哪些事实表?登录事实表有哪些核心字段和指标?用户交易域连接起来有哪些表?134
2.15.30 User Domain What fact tables have you constructed? What are the core fields and metrics of the login fact table? What tables are there for linking user transaction domains? 134

2.15.31 当天订单没有闭环结束的数据量?135
2.15.31 Data volume of orders without closed loop closure on the day? 135

2.15.32 你们维度数据要做ETL吗?除了用户信息脱敏?没有做其他ETL吗136
2.15.32 Do you want ETL for dimensional data? Besides user information desensitization? No other ETL? 136

2.15.33 怎么做加密,加密数据要用怎么办,我讲的md5,他问我md5怎么做恢复136
2.15.33 How to encrypt, how to encrypt data, I talked about md5, he asked me how to do md5 recovery 136

2.15.34 真实项目流程137
2.15.34 Real Project Flow 137

2.15.35 指标的口径怎么统一的(离线这边口径变了,实时这边怎么去获取的口径)137
2.15.35 How to unify the caliber of indicators (the caliber changed offline, how to obtain the caliber in real time) 137

2.15.36 表生命周期管理怎么做的?138
2.15.36 How is life cycle management done? 138

2.15.37 如果上游数据链路非常的多,层级也非常的深,再知道处理链路和表的血缘的情况下,下游数据出现波动怎么处理?139
2.15.37 If the upstream data link is very many and the hierarchy is very deep, and then know how to deal with the link and the kinship of the table, how to deal with the fluctuation of the downstream data? 139

2.15.38 十亿条数据要一次查询多行用什么数据库比较好?139
2.15.38 Billion pieces of data to query multiple rows at once What database is better? 139

2.16 埋点139
2.16 Buried Point 139

第3章 实时数仓项目141
Chapter 3: The Last Day

3.1 为什么做这个项目141
3.1 Why did you do this project 141

3.2 项目架构141
3.2 Project Architecture 141

3.3 框架版本选型141
3.3 Frame Version Selection 141

3.4 服务器选型141
3.4 Server selection 141

3.5 集群规模142
3.5 Cluster Size 142

3.6 项目建模142
3.6 Project Modeling142

3.7 数据量144
3.7 Volume of data 144

3.7.1 数据分层数据量144
3.7.1 Data Stratification Data Volume 144

3.7.2 实时组件存储数据量145
3.7.2 Real-time Component Storage Data Volume 145

3.7.3 实时QPS峰值数据量145
3.7.3 Real-time QPS peak data volume 145

3.8 项目中遇到哪些问题及如何解决?145
3.8 What problems were encountered in the project and how to solve them? 145

3.8.1 业务数据采集框架选型问题145
3.8.1 Selection of Business Data Acquisition Framework 145

3.8.2 项目中哪里用到状态编程,状态是如何存储的,怎么解决大状态问题146
3.8.2 Where state programming is used in the project, how state is stored, and how to solve large state problems

3.8.3 项目中哪里遇到了反压,造成的危害,定位解决(*重点*)146
3.8.3 Where the project encountered back pressure, resulting in harm, positioning solution (* key *) 146

3.8.4 数据倾斜问题如何解决(****重点***)147
3.8.4 How to solve the data skew problem (**** Focus **) 147

3.8.5 数据如何保证一致性问题148
3.8.5 How to ensure consistency of data

3.8.6 FlinkSQL性能比较慢如何优化148
3.8.6 FlinkSQL performance is slow how to optimize 148

3.8.7 Kafka分区动态增加,Flink监控不到新分区数据导致数据丢失148
3.8.7 Kafka partition dynamically increases, Flink fails to monitor new partition data resulting in data loss 148

3.8.9 Kafka某个分区没有数据,导致下游水位线无法抬升,窗口无法关闭计算148
3.8.9 Kafka a partition has no data, resulting in the downstream water mark can not be raised, the window can not be closed calculation 148

3.8.10 Hbase的rowkey设计不合理导致的数据热点问题148
3.8.10 Data Hotspots Caused by Irrational Rowkey Design of Hbase 148

3.8.11 Redis和HBase的数据不一致问题148
3.8.11 Data inconsistency between Redis and HBase

3.8.12 双流join关联不上如何解决149
3.8.12 How to solve the problem of double stream join

3.9 生产经验150
3.9 Production experience 150

3.9.1 Flink任务提交使用那种模式,为何选用这种模式150
3.9.1 Which mode does Flink task submission use and why? 150

3.9.2 Flink任务提交参数,JobManager和TaskManager分别给多少150
3.9.2 Flink task submission parameters, JobManager and TaskManager how much 150

3.9.3 Flink任务并行度如何设置150
3.9.3 How to set Flink Task Parallelism to 150

3.9.4 项目中Flink作业Checkpoint参数如何设置150
3.9.4 How to set Flink Checkpoint parameter in project 150

3.9.5 迟到数据如何解决150
3.9.5 How to solve the problem of late data 150

3.9.6 实时数仓延迟多少151
3.9.6 How much is the delay in real-time counting 151

3.9.7 项目开发多久,维护了多久151
3.9.7 How long has the project been developed and maintained 151

3.9.8 如何处理缓存冷启动问题151
3.9.8 How to handle cache cold start problems

3.9.9 如何处理动态分流冷启动问题(主流数据先到,丢失数据怎么处理)151
3.9.9 How to deal with dynamic shunt cold start problem (mainstream data arrives first, how to deal with lost data) 151

3.9.10 代码升级,修改代码,如何上线151
3.9.10 Code upgrade, code modification, how to go online 151

3.9.11 如果现在做了5个Checkpoint,Flink Job挂掉之后想恢复到第三次Checkpoint保存的状态上,如何操作151
3.9.11 If 5 Checkpoints are made now, Flink Job hangs and wants to restore to the state saved by the third Checkpoint, how to operate 151

3.9.12 需要使用flink记录一群人,从北京出发到上海,记录出发时间和到达时间,同时要显示每个人用时多久,需要实时显示,如果让你来做,你怎么设计?152
3.9.12 Need to use flink to record a group of people, from Beijing to Shanghai, record departure time and arrival time, at the same time to show how long each person takes, need real-time display, if let you do, how do you design? 152

3.9.13 flink内部的数据质量和数据的时效怎么把控的152
3.9.13 Flink internal data quality and data timeliness how to control 152

3.9.14 实时任务问题(延迟)怎么排查152
3.9.14 How to troubleshoot real-time task problems (delays) 152

3.9.15 维度数据查询并发量152
3.9.15 Dimension Data Query Concurrent Volume 152

3.9.16 Prometheus+Grafana是自己搭的吗,监控哪些指标152
3.9.16 Is Prometheus+Grafana built by itself and which indicators are monitored 152

3.9.17 怎样在不停止任务的情况下改flink参数153
3.9.17 How to change flink parameters without stopping the task 153

3.9.18 hbase中有表,里面的1月份到3月份的数据我不要了,我需要删除它(彻底删除),要怎么做153
3.9.18 There is a table in hbase. I don't want the data from January to March in it. I need to delete it (delete it completely). How to do 153

3.9.19 如果flink程序的数据倾斜是偶然出现的,可能白天可能晚上突然出现,然后几个月都没有出现,没办法复现,怎么解决?153
3.9.19 If the data skew of flink program appears accidentally, it may suddenly appear during the day or at night, and then it does not appear for several months. There is no way to reproduce it. How to solve it? 153

3.9.20 维度数据改变之后,如何保证新join的维度数据是正确的数据153
3.9.20 How to ensure that the dimension data of the new join is correct after the dimension data is changed

3.10 实时---业务153
3.10 Real-time--business 153

3.10.1 数据采集到ODS层153
3.10.1 Data Acquisition to ODS Layer 153

3.10.2 ODS层154

3.10.3 DWD+DIM层154

3.10.4 DWS层155

3.10.5 ADS层157

第4章 数据考评平台项目158
Chapter 4: The Last Day

4.1项目背景158
4.1 Background of the project 158

4.1.1 为什么做数据治理158
4.1.1 Why Data Governance 158

4.1.2 数据治理概念158
4.1.2 Data Governance Concepts

4.1.3 数据治理考评平台做的是什么158
4.1.3 What does the Data Governance Assessment Platform do?

4.1.4 考评指标158
4.1.4 Evaluation indicators 158

4.2 技术架构159
4.2 Technical architecture 159

4.3 项目实现了哪些功能159
4.3 What functions are implemented by the project 159

4.3.1 元数据的加载与处理及各表数据的页面接口159
4.3.1 Loading and processing of metadata and page interfaces for table data 159

4.3.2 数据治理考评链路(**核心**)159
4.3.2 Data Governance Assessment Link (** Core **) 159

4.3.3 数据治理考评结果核算160
4.3.3 Data Governance Assessment Results Accounting 160

4.3.4 可视化治理考评提供数据接口160
4.3.4 Visual governance assessment provides data interface 160

4.4 项目中的问题/及优化161
4.4 Problems in the project/optimization 161

4.4.1 计算hdfs路径数据量大小、最后修改访问时间161
4.4.1 Calculate hdfs path data size, last modified access time 161

4.4.2 考评器作用是什么?161
4.4.2 What is the role of the evaluator? 161

4.4.3 稍微难度考评器实现思路161
4.4.3 Slightly difficult evaluator implementation ideas 161

4.4.4 利用多线程优化考评计算161
4.4.4 Using multithreading to optimize evaluation calculations 161

4.4.5 实现过哪些指标161
4.4.5 What indicators have been achieved

第4章 用户画像项目162
Chapter 4: The Last Day

4.1 画像系统主要做了哪些事162
4.1 What does the system do? 162.

4.2 项目整体架构162
4.2 Overall project structure 162

4.3 讲一下标签计算的调度过程163
4.3 Let's talk about the scheduling process of label calculation 163.

4.4 整个标签的批处理过程163
4.4 Batch process for whole label 163

4.5 你们的画像平台有哪些功能 ?163
4.5 What are the functions of your portrait platform? 163

4.6 是否做过Web应用开发,实现了什么功能163
4.6 Have you done Web application development and what functions have been implemented? 163

4.7 画像平台的上下游164
4.7 upstream and downstream of the portrait platform 164

4.8 BitMap原理,及为什么可以提高性能164
4.8 BitMap Principle and Why It Can Improve Performance 164

第5章 数据湖项目164
Chapter 5-The Data Lake Project

5.1 数据湖与数据仓库对比164
5.1 Data Lake vs. Data Warehouse

5.2 为什么做这个项目?解决了什么痛点?164
5.2 Why do this project? What pain points were solved? 164

5.3 项目架构165
5.3 Project Architecture 165

5.4 业务165
5.4 Business 165

5.5 优化or遇到的问题怎么解决165
5.5 How to solve the problem of optimization or encounter 165

第6章 测试&上线流程166
Chapter 6: The Last Day

6.1 测试相关166
6.1 Test related 166

6.1.1 公司有多少台测试服务器?166
6.1.1 How many test servers does the company have? 166

6.1.2 测试服务器配置?166
6.1.2 Test Server Configuration? 166

6.1.3 测试数据哪来的?167
6.1.3 Where did the test data come from? 167

6.1.4 如何保证写的SQL正确性(重点)167
6.1.4 How to ensure the correctness of SQL writing (emphasis) 167

6.1.5 测试之后如何上线?167
6.1.5 How to go online after testing? 167

6.1.6 A/B测试了解167
6.1.6 A/B Test Understanding 167

6.2 项目实际工作流程169
6.2 Project actual workflow 169

6.3 项目当前版本号是多少?多久升级一次版本171
6.3 What is the current version number of the project? How often do I update version 171?

6.4 项目中实现一个需求大概多长时间171
6.4 How long does it take to implement a requirement in a project? 171

6.5 项目开发中每天做什么事171
6.5 What do you do every day in project development? 171

第7章 数据治理172
Chapter 7: Data Governance

7.1 元数据管理172
7.1 Metadata management 172

7.2 数据质量监控173
7.2 Data quality monitoring 173

7.2.1 监控原则173
7.2.1 Principles of surveillance 173

7.2.2 数据质量实现174
7.2.2 Data Quality Realization 174

7.2.3 实现数据质量监控,你具体怎么做,详细说?174
7.2.3 Data quality monitoring, how do you do it, in detail? 174

7.3 权限管理(Ranger)175
7.3 Authority Management (Ranger) 175

7.4 用户认证(Kerberos)175
7.4 User authentication (Kerberos) 175

7.5 数据治理176
7.5 Data Governance 176

第8章 中台178
Chapter 8: The Last Day

8.1 什么是中台?179
8.1 What is the middle stage? 179

8.2 各家中台180
8.2 180 in each home.

8.3 中台具体划分180
8.3 180 in detail.

8.4 中台使用场景181
8.4 Central Station Usage Scene 181

8.5 中台的痛点182
8.5 Pain point 182 in the middle stage

第9章 算法题(LeetCode)182
Chapter 9: LeetCode 182

9.1 时间复杂度、空间复杂度理解182
9.1 Time complexity, spatial complexity understanding 182

9.2 常见算法求解思想182
9.2 Common algorithms for solving ideas182

9.3 基本算法183
9.3 Basic algorithm 183

9.3.1 冒泡排序183
9.3.1 Bubble Sorting 183

9.3.2 快速排序183
9.3.2 Quick Sorting 183

9.3.3 归并排序184
9.3.3 Merged sorting 184

9.3.4 遍历二叉树185
9.3.4 Traversing Binary Trees 185

9.3.5 二分查找185
9.3.5 Binary search 185

9.4 小青蛙跳台阶186
9.4 Little Frog Jumping Steps 186

9.5 最长回文子串186
9.5 Longest palindrome substring 186

9.6 数字字符转化成IP186
9.6 Digital characters converted to IP 186

9.7 最大公约数187
9.7 Maximum common divisor 187

9.8 链表反转187
9.8 List inversion 187

9.9 数组寻找峰值187
9.9 Array Looking for Peak 187

第10章 场景题187
Chapter 10: The Last Day

10.1 手写Flink的UV187

10.2 Flink的分组TopN187

10.3 Spark的分组TopN187

10.4 如何快速从40亿条数据中快速判断,数据123是否存在187
10.4 How to quickly determine from 4 billion pieces of data whether data 123 exists 187

10.5 给你100G数据,1G内存,如何排序?187
10.5 Give you 100 gigabytes of data, 1 gigabyte of memory, how to sort? 187

10.6 公平调度器容器集中在同一个服务器上?187
10.6 Fair scheduler containers centralized on the same server? 187

10.7 匹马赛跑,1个赛道,每次5匹进行比赛,无法对每次比赛计时,但知道每次比赛结果的先后顺序,最少赛多少次可以找出前三名?188
10.7 A horse race, 1 track, 5 horses at a time, unable to time each race, but know the order of each race results, at least how many races can find the top three? 188

10.8 给定一个点、一条线、一个三角形、一个有向无环图,请用java面向对象的思想进行建模188
10.8 Given a point, a line, a triangle, and a directed acyclic graph, model it using java object-oriented thinking.

10.9 现场出了一道sql题,让说出sql的优化,优化后效率提升了多少188
10.9 A sql question was given on the spot. Let's say how much sql optimization has improved efficiency after optimization. 188

第11章 HQL场景题188
Chapter 11: The Last Day

第12章 面试说明188
Chapter 12: The Last Day

12.1 面试过程最关键的是什么?188
12.1 What is the most important part of the interview process? 188

12.2 面试时该怎么说?188
12.2 What should I say during the interview? 188

12.3 面试技巧189
12.3 Interview skills 189

12.3.1 六个常见问题189
12.3.1 Six common questions 189

12.3.2 两个注意事项190
12.3.2 Two considerations 190

12.3.3 自我介绍190
12.3.3 Self-introduction 190

第1章 核心技术
Chapter 1: The Core Technology

1.1 Linux&Shell

1.1.1 Linux常用高级命令
1.1.1 Linux Common Advanced Commands

序号

命令

命令解释
command interpretation

1

top

实时显示系统中各个进程的资源占用状况(CPU、内存和执行时间)
Real-time display of resource usage (CPU, memory, and execution time) of various processes in the system

2

jmap -heap 进程号
jmap -heap process number

查看某个进程内存
View a process memory

3

free -m

查看系统内存使用情况
View system memory usage

4

ps -ef

查看进程
viewing process

5

netstat -tunlp | grep 端口号

查看端口占用情况
View port occupancy

6

du -sh 路径*
du -sh path *

查看路径下的磁盘使用情况
View disk usage under path

例如:$ du -sh /opt/*
For example: $ du -sh /opt/*

7

df -h

查看磁盘存储情况
View disk storage

1.1.2 Shell常用工具及写过的脚本
1.1.2 Shell Common Tools and Written Scripts

1awk、sed、cut、sort

2)用Shell写过哪些脚本
2) What scripts have been written in Shell?

(1)集群启动,分发脚本
(1) Cluster startup, distribution script

#!/bin/bash

case $1 in

"start")

for i in hadoop102 hadoop103 hadoop104

do

ssh $i "绝对路径"
ssh $i "absolute path"

done

;;

"stop")

;;

esac

2)数仓层级内部的导入:ods->dwd->dws ->ads
(2) Import within warehouse hierarchy: ods->dwd->dws ->ads

①#!/bin/bash

②定义变量 APP=gmall
② Define variable APP=gmall

③获取时间
③ Acquisition time

传入 按照传入时间
incoming by incoming time

不传 T+1
No T+1

④sql="

先按照当前天 写sql => 遇到时间 $do_date 遇到表 {$APP}.
First write sql => encounter time $do_date encounter table {$APP} according to the day before yesterday.

自定义函数 UDF UDTF {$APP}.
UDF UDTF {$APP}.

"

⑤执行sql
Execution of SQL

1.1.3 Shell中单引号和双引号区别
1.1.3 Shell Single Quotation and Double Quotation Difference

1)在/home/atguigu/bin创建一个test.sh文件
1) Create a test.sh file at/home/atguigu/bin

[atguigu@hadoop102 bin]$ vim test.sh

文件中添加如下内容
Add the following to the file

#!/bin/bash

do_date=$1

echo '$do_date'

echo "$do_date"

echo "'$do_date'"

echo '"$do_date"'

echo `date`

2)查看执行结果
2) View execution results

[atguigu@hadoop102 bin]$ test.sh 2022-02-10

$do_date

2022-02-10

'2022-02-10'

"$do_date"

2022年 05月 02日 星期四 21:02:08 CST
Thursday May 2nd, 2022 21:02:08 CST

3)总结:
3) Summary:

(1)单引号取变量值
(1) Single quotes do not take variable values

(2)双引号取变量值
(2) Double quotes take variable values

(3)反引号`,执行引号中命令
(3) Back quotes `, execute the command in quotes

(4)双引号内部嵌套单引号,取出变量值
(4) Double quotation marks nested within single quotation marks, take out variable values

(5)单引号内部嵌套引号,不取出变量值
(5) Double quotation marks nested inside single quotation marks, do not take out variable values

1.2 Hadoop

1.2.1 Hadoop常用端口号
1.2.1 Common port numbers for Hadoop

hadoop2.x

hadoop3.x

访问HDFS端口
Access HDFS port

50070

9870

访问MR执行情况端口
Access MR Performance Port

8088

8088

历史服务器
history server

19888

19888

客户端访问集群端口
Client Access Cluster Port

9000

8020

1.2.2 HDFS读流程写流程
1.2.2 HDFS Read and Write Flow

注意:HDFS写入流程时候,某台dataNode挂掉如何运行?
Note: When HDFS writes the process, how does a dataNode hang up?

当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认,客户端会通知NameNode,NameNode检查并确认该块的副本与规定的不符,NameNode会通知闲置的DataNode去复制副本,并将挂掉的DataNode作下线处理。挂掉的DataNode节点恢复后, 删除该节点中曾经拷贝的不完整副本数据。
When a DataNode suddenly hangs up, the client does not receive the ack acknowledgement sent by the DataNode, and the client notifies the NameNode. The NameNode checks and confirms that the copy of the block does not match the specified copy. The NameNode notifies the idle DataNode to copy the copy, and the suspended DataNode is offline. After the suspended DataNode node is restored, delete the incomplete copy data that was copied in the node.

1.2.3 HDFS小文件处理
1.2.3 HDFS Small File Handling

1)会有什么影响
1) What impact will it have?

(1)存储层面
(1) Storage level

1个文件块,占用namenode多大内存150字节
1 file block, occupying 150 bytes of namenode memory

128G能存储多少文件块? 128 g* 1024m*1024kb*1024byte/150字节 = 9.1亿文件块
How many file blocks can 128G store? 128 g* 1024m*1024kb*1024 bytes/150 bytes = 910 million file blocks

(2)计算层面
(2) Calculation level

每个小文件都会起到一个MapTask,1个MapTask默认内存1G。浪费资源。
Each small file will play a MapTask, 1 MapTask default memory 1G. Waste of resources.

2)怎么解决
2) How to solve

(1)采用har归档方式,将小文件归档
(1) Use har filing method to file small files

(2)采用CombineTextInputFormat

(3)自己写一个MR程序将产生的小文件合并成一个大文件。如果是Hive或者Spark有merge功能自动帮助我们合并。
(3) Write your own MR program to merge the small files generated into one large file. If it's Hive or Spark, there's a merge feature that automatically helps us merge.

4)有小文件场景开启JVM重用;如果没有小文件,不要开启JVM重用,因为会一直占用使用到的Task卡槽,直到任务完成才释放
(4) There are small file scenarios to enable JVM reuse; if there are no small files, do not enable JVM reuse, because it will occupy the used Task card slot until the task is completed.

JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间。
JVM reuse enables JVM instances to be reused N times in the same job, and the value of N can be configured in Hadoop's mapred-site.xml file. Usually between 10-20.

<property>

<name>mapreduce.job.jvm.numtasks</name>

<value>10</value>

<description>How many tasks to run per jvm,if set to -1 ,there is no limit</description>

</property>

1.2.4 HDFS的NameNode内存

1)Hadoop2.x系列,配置NameNode默认2000m
1) Hadoop 2.x series, configuration NameNode default 2000m

2)Hadoop3.x系列,配置NameNode内存是动态分配的
2) Hadoop 3.x series, configuration NameNode memory is dynamically allocated

NameNode内存最小值1G,每增加100万个文件block,增加1G内存。
NameNode memory minimum of 1G, each increase of 1 million file blocks, an increase of 1G memory.

1.2.5 Shuffle及优化
1.2.5 Shuffle and optimization

1.2.6 Yarn工作机制
1.2.6 Yarn working mechanism

1.2.7 Yarn调度器
1.2.7 Yarn Scheduler

1Hadoop调度器重要分为三类
1) Hadoop scheduler is divided into three important categories

FIFO、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
FIFO, Capacity Scheduler, and Fair Scheduler.

Apache默认的资源调度器是容量调度器
Apache's default resource scheduler is Capacity Scheduler.

CDH默认的资源调度器是公平调度器。
The default resource scheduler for CDH is the fair scheduler.

2)区别
2) Difference

FIFO调度器:支持单队列 、先进先出 生产环境不会用。
FIFO scheduler: supports single queue, FIFO production environment will not be used.

容量调度器:支持多队列。队列资源分配,优先选择资源占用率最低的队列分配资源;作业资源分配,按照作业的优先级和提交时间顺序分配资源;容器资源分配,本地原则(同一节点/同一机架/不同节点不同机架)。
Capacity Scheduler: Supports multiple queues. Queue resource allocation: priority is given to the queue with the lowest resource occupancy rate; job resource allocation: resources are allocated according to job priority and submission time order; container resource allocation: local principle (same node/same rack/different nodes and different racks)

公平调度器:支持多队列,保证每个任务公平享有队列资源资源不够时可以按照缺额分配。
Fair scheduler: Support multiple queues to ensure that each task has equal access to queue resources. When resources are insufficient, they can be allocated according to the shortfall.

3)在生产环境下怎么选择?
3) How to choose in the production environment?

大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK。
Dachang: If the concurrency requirements are relatively high, choose fairness, and require the server performance to be OK.

中小公司,集群服务器资源不太充裕选择容量。
Small and medium-sized companies, cluster server resources are not sufficient to choose capacity.

4)在生产环境怎么创建队列?
4) How do you create queues in a production environment?

(1)调度器默认就1个default队列,不能满足生产要求。
(1) The scheduler defaults to one default queue, which cannot meet the production requirements.

2)按照部门:业务部门1、业务部门2。
(2) According to departments: business department 1, business department 2.

3)按照业务模块:登录注册、购物车、下单。
(3) According to the business module: login registration, shopping cart, order.

5)创建多队列的好处?
5) What are the benefits of creating multiple queues?

(1)因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。
(1) Worried that employees might not be careful, write recursive endless loop code that exhausts all resources.

(2)实现任务的降级使用,特殊时期保证重要的任务队列资源充足。
(2) Implement the degraded use of tasks, and ensure that important task queue resources are sufficient in special periods.

业务部门1(重要)=》业务部门2(比较重要)=》下单(一般)=》购物车(一般)=》登录注册(次要)
Business Unit 1 (Important)= Business Unit 2 (More Important)= Orders (General)= Shopping Cart (General)= Login Registration (Secondary)

1.2.8 HDFS块大小
1.2.8 HDFS Block Size

1)块大小
1) Block size

1.x 64m

2.x 3.x 128m

本地 32m
Local 32m

企业 128m 256m 512m

2)块大小决定因素
2) Block Size Determinants

磁盘读写速度
disk read/write speed

普通的机械硬盘 100m/s => 128m
Ordinary mechanical hard disk 100m/s => 128m

固态硬盘普通的 300m/s => 256m
Solid state drive ordinary 300m/s => 256m

内存镜像 500-600m/s => 512m
Memory Mirrors 500-600m/s => 512m

1.2.9 Hadoop脑裂原因及解决办法?
1.2.9 Hadoop Brain Split Causes and Solutions?

1)出现脑裂的原因
1) Causes of brain splitting

Leader出现故障,系统开始改朝换代,当Follower完成全部工作并且成为Leader后,原Leader又复活了(它的故障可能是暂时断开或系统暂时变慢,不能及时响应,但其NameNode进程还在),并且由于某种原因它对应的ZKFC并没有把它设置为Standby,所以原Leader还认为自己是Leader,客户端向它发出的请求仍会响应,于是脑裂就发生了。
Leader failure, the system began to change dynasties, when the Follower completed all work and became the Leader, the original Leader revived (its failure may be temporarily disconnected or the system temporarily slowed down, unable to respond in time, but its NameNode process is still in), and for some reason its corresponding ZKFC did not set it to Standby, so the original Leader still thinks it is the Leader, the client will still respond to requests sent to it, so the brain split occurs.

2)Hadoop通常不会出现脑裂。
2) Hadoop usually does not show brain splitting.

如果出现脑裂,意味着多个Namenode数据不一致,此时只能选择保留其中一个的数据。例如:现在有三台Namenode,分别为nn1nn2nn3,出现脑裂,想要保留nn1的数据,步骤为:
If there is a split brain, it means that multiple Namenode data are inconsistent, and only one of the data can be selected to retain. For example, there are three Namenode, namely nn1, nn2, nn3, there is a brain split, want to keep nn1 data, the steps are:

(1)关闭nn2和nn3
(1) Close nn2 and nn3

(2)在nn2和nn3节点重新执行数据同步命令:hdfs namenode -bootstrapStandby
(2) Re-execute the data synchronization command at nn2 and nn3 nodes: hdfs namenode -bootstrapStandby

(3)重新启动nn2和nn3
(3) Restart nn2 and nn3

1.3 Zookeeper

1.3.1 常用命令
1.3.1 Common commands

lsgetcreate、delete、deleteall

1.3.2 选举机制
1.3.2 Electoral mechanisms

半数机制(过半机制):2n + 1,安装奇数台。
Half mechanism (half mechanism): 2n + 1, odd number of stations installed.

10服务器:3台。
10 servers: 3.

20台服务器:5台。
20 servers: 5.

100台服务器:11台。
100 servers: 11.

台数多,好处:提高可靠性;坏处:影响通信延时。
More numbers, advantages: improve reliability; disadvantages: affect communication delay.

1.3.3 Zookeeper符合法则中哪两个?
1.3.3 Which two of Zookeeper's laws fit?

1.3.4 Zookeeper脑裂

Zookeeper采用过半选举机制,防止了脑裂。
Zookeeper uses a majority voting mechanism to prevent brain splitting.

1.3.5 Zookeeper用来干嘛了
1.3.5 What is Zookeeper Used For?

(1)作为HA的协调者:如 HDFS的HA、YARN的HA。
(1) As HA coordinator: such as HA of HDFS, HA of YARN.

(2)被组件依赖:如Kafka、HBase、CK。
(2) Dependent on components: such as Kafka, HBase, CK.

1.4 Flume

1.4.1 Flume组成,Put事务,Take事务
1.4.1 Flume Composition, Put Transaction, Take Transaction

1)Taildir Source

(1)断点续传、多目录
(1) Breakpoint continuation, multi-directory

(2)taildir底层原理
(2) taildir underlying principle

3Taildir挂了怎么办?
(3) What if Taildir dies?

不会丢数:断点续传
No loss: breakpoint resume

重复数据:有可能
Duplicate data: Possible

(4)存在的问题及解决方案
(4) Existing problems and solutions

①问题:
1 Question:

新文件判断条件 = iNode值 + 绝对路径(包含文件名)
New file judgment condition = iNode value + absolute path (including file name)

日志框架凌晨修改了文件名称=》导致会再次重读一次昨天产生的数据
Log frame modified file name ="in the early morning causing rereading of yesterday's data

②解决:
② Solution:

方案一:建议生成的文件名称为带日期的。同时配置日志生成框架为不更名的;
Option 1: It is recommended that the generated file name be dated. At the same time, configure the log generation framework to be unrenamed;

方案二:修改TairDirSource源码,只按照iNode值去确定文件
Option 2: Modify the TairDirSource source code, only determine the file according to the iNode value

修改源码视频地址:
Modify source video address:

https://www.bilibili.com/video/BV1wf4y1G7EQ?p=14&vd_source=891aa1a363111d4914eb12ace2e039af

2)file channel /memory channel/kafka channel

(1)File Channel

数据存储于磁盘,优势:可靠性高;劣势:传输速度低
Data is stored on disk, advantages: high reliability; disadvantages: low transmission speed

默认容量:100万个event
Default capacity: 1 million events

注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
Note: FileChannel can increase Flume throughput by configuring dataDirs to point to multiple paths, each path corresponding to a different hard disk.

(2)Memory Channel

数据存储于内存,优势:传输速度快;劣势:可靠性差
Data storage in memory, advantages: fast transmission speed; disadvantages: poor reliability

默认容量:100个event
Default capacity: 100 events

(3)Kafka Channel

数据存储于Kafka,基于磁盘;
Data stored in Kafka, disk-based;

优势:可靠性高;
Advantages: high reliability;

传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段
Fast transmission speed Kafka Channel is greater than Memory Channel + Kafka Sink

4)生产环境如何选择
(4) How to choose the production environment

如果下一级是Kafka,优先选择Kafka Channel。
If the next level is Kafka, Kafka Channel is preferred.

如果是金融、对钱要求准确的公司,选择File Channel。
If you are a financial company, select File Channel.

如果就是普通的日志,通常可以选择Memory Channel。
If it is a normal log, you can usually choose Memory Channel.

每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?
Millions of pb-level billionaires lose data every day. Will they pick up a dollar?

3)HDFS Sink

(1)时间(半个小时) or 大小128m 且 设置Event个数等于0,该值默认10
(1) Time (half an hour) or Size 128m and set the number of Events equal to 0, which defaults to 10

具体参数:hdfs.rollInterval=1800,hdfs.rollSize=134217728 hdfs.rollCount=0

4)事务
4) Business

SourceChannel是Put事务

Channel到Sink是Take事务
Channel to Sink is Take Business

1.4.2 Flume拦截器
1.4.2 Flume interceptor

1拦截器注意事项
1) Interceptor precautions

1)时间戳拦截器:主要是解决零点漂移问题
(1) Timestamp interceptor: mainly to solve the zero drift problem

2)自定义拦截器步骤
2) Custom interceptor steps

(1)实现 Interceptor

(2)重写四个方法
(2) Rewrite four methods

initialize 初始化
initialize

public Event intercept(Event event) 处理单个Event

public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)

close方法
close method

(3)静态内部类,实现Interceptor.Builder
(3) Static inner class, implement Interceptor.Builder

3)拦截器可以不用吗?
3) Can the interceptor be used?

时间戳拦截器建议使用。如果不用需要采用延迟15-20分钟处理数据的方式,比较麻烦。
Timestamp interceptors are recommended. If you don't need to delay 15-20 minutes to process data, it's more troublesome.

1.4.3 Flume Channel选择器

Replicating:默认选择器。功能:将数据发往下一级所有通道。
Replicating: Default selector. Function: Send data to all channels at the next level.

Multiplexing:选择性发往指定通道。
Multiplexing: selective routing to specified channels.

1.4.4 Flume监控器
1.4.4 Flume Monitor

1)监控到异常现象
1) Monitoring abnormal phenomenon

采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。主要是内存不够导致的。
With Ganglia Monitor, Flume attempts to submit far more than the number of successful attempts, indicating that Flume is running poorly. Mainly due to insufficient memory.

2)解决办法?
2) The solution?

(1)自身:默认内存是20m,考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g
(1) itself: default memory is 20m, consider increasing flume memory, modify flume memory to 4-6g in flume-env.sh configuration file

(2)找朋友:增加服务器台数
(2) Find friends: increase the number of servers

搞活动 618 =》增加服务器 =》用完在退出
Engage in activity618 =》Add server =》Run out in exit

日志服务器配置:8-16g内存、磁盘8T
Log server configuration: 8-16g memory, 8T disk

1.4.5 Flume采集数据会丢失吗?
1.4.5 Is Flume collecting data lost?

如果是kafka channel 或者FileChannel不会丢失数据,数据存储可以存储在磁盘中。
If the kafka channel or FileChannel does not lose data, the data store can be stored on disk.

如果是MemoryChannel有可能丢。
MemoryChannel may be lost.

1.4.6 Flume如何提高吞吐量
1.4.6 How Flume Increases Throughput

调整taildir sourcebatchSize大小可以控制吞吐量,默认大小100个Event。
Adjust the batchSize of taildir source to control throughput, the default size is 100 Events.

吞吐量的瓶颈一般是网络带宽。
The bottleneck in throughput is usually network bandwidth.

1.5 Kafka

1.5.1 Kafka架构

生产者、Broker、消费者、Zookeeper。
Producer, Broker, Consumer, Zookeeper.

注意:Zookeeper中保存Broker id和controller等信息,但是没有生产者信息。
Note: Zookeeper stores Broker id and controller information, but no producer information.

1.5.2 Kafka生产端分区分配策略
1.5.2 Kafka production partition allocation strategy

Kafka官方为我们实现了三种Partitioner(分区器),分别是DefaultPartitioner(当未指定分区器时候所使用的默认分区器)、UniformStickyPartitioner、RoundRobinPartitioner。
Kafka officially implemented three partitions for us, namely DefaultPartition (the default partition used when no partition is specified), UniformStickyPartition, RoundRobinPartition.

1)DefaultPartitioner默认分区器
1) DefaultPartition

下图说明了默认分区器的分区分配策略:
The following figure illustrates the partition allocation policy for the default partitioner:

2)UniformStickyPartitioner纯粹的粘性分区器
2) UniformStickyPartitioner Pure sticky partitioner

(1)如果指定了分区号,则会按照指定的分区号进行分配
(1) If the partition number is specified, it will be allocated according to the specified partition number.

(2)若没有指定分区好,,则使用粘性分区器
(2) If no partition is specified, use sticky partitioner

3)RoundRobinPartitioner轮询分区器

(1)如果在消息中指定了分区则使用指定分区。
(1) If a partition is specified in the message, the specified partition is used.

(2)如果未指定分区,都会将消息轮询每个分区,将数据平均分配到每个分区中。
(2) If no partition is specified, the message polls each partition, distributing the data evenly among each partition.

4)自定义分区器
4) Custom Partitioner

自定义分区策略:可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口,重写 partition 方法来达到自定义分区效果。
Custom partitioning policies: Custom partitioning can be achieved by implementing the org.apache.kafka.clients.producer.Partitioner interface and overriding the partition method.

例如我们想要实现随机分配,只需要以下代码:
For example, if we want to implement random allocation, we only need the following code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
Compute the total number of partitions for the topic and return a positive integer less than it randomly.

在项目中,如果希望把MySQL中某张表的数据发送到一个分区。可以以表名为key进行发送。
In a project, if you want to send data from a table in MySQL to a partition. It can be sent with the table name key.

1.5.3 Kafka丢不丢数据
1.5.3 Kafka loses data

1)Producer角度

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=0, the producer sends the data regardless, poor reliability, high efficiency;

acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=1, the producer sends the data Leader response, the reliability is medium, the efficiency is medium;

acks=-1,生产者发送过来数据LeaderISR队列里面所有Follwer应答,可靠性高,效率低;
acks=-1, all Follwer responses in the data Leader and ISR queue sent by the producer have high reliability and low efficiency;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
In the production environment, acks=0 is rarely used;acks=1, generally used to transmit ordinary logs, allowing individual data to be lost;acks=-1, generally used to transmit data related to money, and for scenarios with high reliability requirements.

2)Broker角度
2) Broker angle

副本数大于等于2。
The number of copies is greater than or equal to 2.

min.insync.replicas大于等于2。

1.5.4 Kafka的ISR副本同步队列
1.5.4 ISR replica synchronization queue for Kafka

ISR(In-Sync Replicas),副本同步队列。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s
ISR (In-Sync Replicas), replica synchronization queue. If a Follower does not send a communication request or synchronization data to the Leader for a long time, the Follower will be kicked out of the ISR. This time threshold is set by the replica.lag.time.max.ms parameter and defaults to 30s.

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
Any dimension exceeding the threshold will remove the Followers from the ISR and store them in the OSR (Outof-Sync Replicas) list. The newly added Followers will also be stored in the OSR first.

Kafka分区中的所有副本统称为AR = ISR + OSR
All copies in the Kafka partition are collectively referred to as AR = ISR + OSR

1.5.5 Kafka数据重复
1.5.5 Duplicate Kafka data

去重 = 幂等性 + 事务
Deduplication = idempotent + transaction

1)幂等性原理
1) The idempotent principle

2)幂等性配置参数
2) Idempotent configuration parameters

参数名称
name of parameter

描述

enable.idempotence

是否开启幂等性,默认true,表示开启幂等性。
Whether idempotent is enabled, default true, means idempotent is enabled.

max.in.flight.requests.per.connection

1.0.X版本前,需设置为1,1.0.X之后,小于等于5
Before version 1.0.X, it needs to be set to 1. After version 1.0.X, it should be less than or equal to 5.

retries

失败重试次数,需要大于0
Failed retry times, greater than 0

acks

需要设置为all
Need to be set to all

3Kafka的事务一共有如下5个API
3) Kafka transactions have five APIs as follows

// 1初始化事务
// 1 Initialize transactions

void initTransactions();

// 2开启事务
// 2 Open transaction

void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
// 3 Commit consumed offsets within transactions (primarily for consumers)

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

String consumerGroupId) throws ProducerFencedException;

// 4提交事务
// 4 Submission of affairs

void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
// 5 Discard transaction (similar to rollback transaction)

void abortTransaction() throws ProducerFencedException;

4)总结
4) Summary

(1)生产者角度
1) Producer's perspective

acks设置为-1 (acks=-1)。
acks is set to-1 (acks=-1).

幂等性(enable.idempotence = true) + 事务

(2)broker服务端角度
(2) Broker-side perspective

分区副本大于等于2 --replication-factor 2)。
Partition replication factor greater than or equal to 2.

ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。
The minimum number of copies of a response in an ISR is greater than or equal to 2 (min.insync.replicas = 2).

(3)消费者
(3) Consumers

事务 + 手动提交offset enable.auto.commit = false)。
Transaction + manual commit offset (enable.auto.commit = false).

消费者输出的目的地必须支持事务(MySQLKafka)。
The destination of consumer output must support transactions (MySQL, Kafka).

1.5.6 Kafka如何保证数据有序or怎么解决乱序
1.5.6 Kafka How to keep data in order or how to solve disorder

1)Kafka 最多只保证单分区内的消息是有序的,所以如果要保证业务全局严格有序,就要设置 Topic 为单分区。
1) Kafka only ensures that messages in a single partition are ordered at most, so if you want to ensure strict order in the global business, you must set Topic to a single partition.

2)如何保证单分区内数据有序?
2) How to ensure that the data in a single partition is in order?

注:幂等机制保证数据有序的原理如下:
Note: The idempotent mechanism ensures that data is ordered as follows:

1.5.7 Kafka分区Leader选举规则
1.5.7 Kafka Divisional Leader Election Rules

ISR中存活为前提,按照AR中排在前面的优先。例如AR[1,0,2]ISR [102],那么Leader就会按照102的顺序轮询。
Survival in ISR is a prerequisite, according to the priority ranked first in AR. For example AR[1,0,2], ISR [1,0,2], then the Leader polls in the order of 1, 0, 2.

1.5.8 Kafka中AR的顺序
1.5.8 Order of AR in Kafka

如果Kafka服务器只有4个节点,那么设置Kafka的分区数大于服务器台数,在Kafka底层如何分配存储副本呢?
If Kafka server has only 4 nodes, then set the number of partitions of Kafka to be greater than the number of servers. How to allocate storage replicas at the bottom of Kafka?

1)创建16分区,3个副本
1) Create 16 partitions, 3 copies

(1)创建一个新的Topic,名称为second。
(1) Create a new Topic named second.

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second

(2)查看分区和副本情况。
(2) Check the partition and copy situation.

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second

Topic: second4Partition: 0Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 1Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 2Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 3Leader: 3Replicas: 3,0,1Isr: 3,0,1

Topic: second4Partition: 4Leader: 0Replicas: 0,2,3Isr: 0,2,3

Topic: second4Partition: 5Leader: 1Replicas: 1,3,0Isr: 1,3,0

Topic: second4Partition: 6Leader: 2Replicas: 2,0,1Isr: 2,0,1

Topic: second4Partition: 7Leader: 3Replicas: 3,1,2Isr: 3,1,2

Topic: second4Partition: 8Leader: 0Replicas: 0,3,1Isr: 0,3,1

Topic: second4Partition: 9Leader: 1Replicas: 1,0,2Isr: 1,0,2

Topic: second4Partition: 10Leader: 2Replicas: 2,1,3Isr: 2,1,3

Topic: second4Partition: 11Leader: 3Replicas: 3,2,0Isr: 3,2,0

Topic: second4Partition: 12Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 13Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 14Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 15Leader: 3Replicas: 3,0,1Isr: 3,0,1

1.5.9 Kafka日志保存时间
1.5.9 Kafka log retention time

默认保存7天;生产环境建议3天。
7 days by default; 3 days recommended for production environment.

1.5.10 Kafka过期数据清理
1.5.10 Kafka obsolete data cleanup

日志清理的策略只有delete和compact两种
There are only two log cleaning strategies: delete and compact.

1delete日志删除:将过期数据删除
1) delete log: delete expired data

log.cleanup.policy = delete ,所有数据启用删除策略
log.cleanup.policy = delete, all data enable delete policy

1基于时间:默认打开以segment中所有记录中的最大时间戳作为该文件时间戳。
(1) Based on time: open by default. Take the largest timestamp of all records in the segment as the timestamp of the file.

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment
(2) Based on size: off by default. Exceeds the set total log size and deletes the oldest segment.

log.retention.bytes,默认等于-1,表示无穷大。
log.retention.bytes, which defaults to-1 for infinity.

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?
Thinking: If part of the data in a segment is expired and part is not expired, what should be done?

2compact日志压缩
2) Compact log compression

1.5.11 Kafka为什么能高效读写数据
1.5.11 Why Kafka reads and writes data efficiently

1Kafka本身是分布式集群,可以采用分区技术,并行度高
1) Kafka itself is a distributed cluster, which can adopt partition technology with high parallelism.

2)读数据采用稀疏索引,可以快速定位要消费的数据
2) Read data using sparse index, you can quickly locate the data to be consumed

3)顺序写磁盘
3) Sequential write disk

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
Kafka producer production data, to be written to the log file, the writing process is appended to the end of the file, for sequential writing. Official website data shows that the same disk, sequential write can reach 600M/s, while random write only 100K/s. This has to do with the mechanics of the disk. Sequential writing is faster because it saves a lot of head addressing time.

4)页缓存 + 零拷贝技术
4) Page cache + zero-copy technology

1.5.12 自动创建主题
1.5.12 Automatic Theme Creation

如果Broker端配置参数auto.create.topics.enable设置为true(默认值是true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。
If the Broker side configuration parameter auto.create.topics.enable is set to true (the default is true), then when a producer sends a message to an uncreated topic, a topic with num.partitions (the default is 1) and a replica factor of default.replication.factor (the default is 1) is automatically created. In addition, when a consumer starts reading messages from an unknown topic, or when any client sends metadata requests to an unknown topic, a corresponding topic is automatically created. This unexpected way of creating themes increases the difficulty of theme management and maintenance. Production environments recommend setting this parameter to false.

(1)向一个没有提前创建five主题发送数据
(1) Sending data to a topic that has not been created five times in advance

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic five

>hello world

(2)查看five主题的详情
(2) See details of five themes

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five

1.5.13 副本设定
1.5.13 Number of copies set

一般我们设置成2个或3个,很多企业设置为2个
Usually we set it to 2 or 3, and many companies set it to 2.

副本的优势:提高可靠性;副本劣势:增加了网络IO传输。
Duplication advantage: improved reliability; duplication disadvantage: increased network IO transmission.

1.5.14 Kakfa分区数
1.5.14 Number of Kakfa partitions

(1)创建一个只有1个分区的Topic
(1) Create a Topic with only one partition.

(2)测试这个TopicProducer吞吐量和Consumer吞吐量。
(2) Test the Producer throughput and Consumer throughput of this Topic.

(3)假设他们的值分别是TpTc,单位可以是MB/s
(3) Assuming that their values are Tp and Tc, the unit can be MB/s.

(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / minTpTc)。
(4) Then assuming that the total target throughput is Tt, then the number of partitions = Tt / min (Tp, Tc).

例如:Producer吞吐量 = 20m/sConsumer吞吐量 = 50m/s,期望吞吐量100m/s
For example: Producer throughput = 20m/s;Consumer throughput = 50m/s, expected throughput 100m/s;

分区数 = 100 / 20 = 5分区
Number of partitions = 100 / 20 = 5 partitions

分区数一般设置为:3-10
The number of partitions is generally set to 3-10

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。
The number of partitions is not the more the better, nor the less the better. It is necessary to build a cluster, carry out pressure testing, and then flexibly adjust the number of partitions.

1.5.15 Kafka增加分区
1.5.15 Kafka adds partitions

1)可以通过命令行的方式增加分区,但是分区数只能增加,不能减少。
1) Partitions can be increased by command line, but the number of partitions can only be increased, not decreased.

2)为什么分区数只能增加,不能减少?
2) Why can the number of partitions only increase and not decrease?

(1)按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。
(1) According to Kafka's existing code logic, this function can be implemented completely, but it will also make the complexity of the code increase sharply.

(2)实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?
(2) There are many factors to consider to implement this function, such as how to deal with messages in deleted partitions.

如果随着分区一起消失则消息的可靠性得不到保障;
If the partition disappears with it, the reliability of the message is not guaranteed;

如果需要保留则又需要考虑如何保留,直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;
If it needs to be retained, it needs to consider how to retain it, directly store it at the end of the existing partition, and the timestamp of the message will not increment. Therefore, components such as Spark and Flink that need message timestamp (event time) will be affected.

如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?
If you insert them scattered into existing partitions, then internal data replication can be resource-intensive when message volumes are high, and how can the availability of this topic be guaranteed during replication?

同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。
At the same time, sequential problems, transactional problems, and state machine switching between partitions and replicas have to be faced.

(3)反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
(3) On the contrary, the revenue point of this function is very low. If you really need to implement such a function, you can completely recreate a theme with a smaller number of partitions, and then copy the messages in the existing theme according to the established logic.

1.5.16 Kafka多少个Topic
1.5.16 How many topics in Kafka

ODS层:2个
ODS layer: 2

DWD层:20
DWD layers: 20

1.5.17 Kafka消费者是拉取数据还是推送数据
1.5.17 Kafka Does the consumer pull or push data

拉取数据。
Pull data.

1.5.18 Kafka消费端分区分配策略
1.5.18 Kafka Consumer Partition Allocation Policy

粘性分区:
Viscous partition:

该分区分配算法是最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:
This partition allocation algorithm is the most complex one, which can be set by partition.assignment.strategy parameter. It has been introduced since version 0.11. The purpose is to make as few adjustments as possible on the previous allocation result when executing new allocation. It mainly achieves the following two goals:

(1)Topic Partition 的分配要尽量均衡。
(1) The distribution of Topic Partition should be balanced as much as possible.

(2)当 Rebalance 发生时,尽量与上一次分配结果保持一致。
(2) When Rebalance occurs, try to keep consistent with the previous allocation result.

注意:当两个目标发生冲突的时候,优先保证第一个目标,这样可以使分配更加均匀,其中第一个目标是3种分配策略都尽量去尝试完成的,而第二个目标才是该算法的精髓所在。
Note: When two goals conflict, priority is given to the first goal, which can make the distribution more uniform, where the first goal is the three allocation strategies try to complete, and the second goal is the essence of the algorithm.

1.5.19 消费者再平衡的条件
1.5.19 Conditions for consumer rebalancing

1)Rebalance 的触发条件有三种
1) There are three trigger conditions for Rebalance.

(1)当Consumer Group 组成员数量发生变化(主动加入、主动离组或者故障下线等)。
(1) When the number of members of the Consumer Group changes (actively joining, actively leaving the group or failing to go offline, etc.).

(2)当订阅主题的数量或者分区发生变化。
(2) When the number of subscription topics or partitions changes.

2)消费者故障下线的情况
2) Consumer failure offline situation

参数名称
name of parameter

描述

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
Connection timeout between Kafka consumer and coordinator, default 45s. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
The maximum time a consumer can process a message is 5 minutes. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

3)主动加入消费者组
3) Actively join consumer groups

在现有集中增加消费者,也会触发Kafka再平衡。注意,如果下游是Flink,Flink会自己维护offset,不会触发Kafka再平衡。
Adding consumers to existing concentrations also triggers Kafka rebalancing. Note that if Flink is downstream, Flink maintains its own offset and does not trigger Kafka rebalancing.

1.5.20 指定Offset消费
1.5.20 Specify Offset Consumption

可以在任意offset处消费数据。
Data can be consumed at any offset.

kafkaConsumer.seek(topic, 1000);

1.5.21 指定时间消费
1.5.21 Specified time consumption

可以通过时间来消费数据。
Data can be consumed over time.

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);

kafkaConsumer.offsetsForTimes(timestampToSearch);

1.5.22 Kafka监控

公司自己开发的监控器
The company developed its own monitor.

开源的监控器:KafkaManager、KafkaMonitorKafkaEagle
Open source monitors: KafkaManager, KafkaMonitor, KafkaEagle.

1.5.23 Kafka数据积压
1.5.23 Kafka data backlog

1)发现数据积压
1) Discovery of data backlog

通过Kafka的监控器Eagle,可以看到消费lag,就是积压情况:
Through Kafka's monitor Eagle, you can see the consumption lag, which is the backlog:

2)解决
2) Resolved

(1)消费者消费能力不足
(1) Insufficient consumer spending power

可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
① You can consider increasing the number of partitions of Topic, and at the same time increasing the number of consumers in the consumer group, the number of consumers = the number of partitions. (Both are missing.)

增加分区数;
Increase the number of partitions;

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

提高每批次拉取的数量,提高单个消费者的消费能力。
② Increase the number of pulls per batch and improve the consumption ability of individual consumers.

参数名称
name of parameter

描述

fetch.max.bytes

默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条
The maximum number of messages returned by pulling data at one time. The default is 500.

(2)消费者处理能力不行
(2) Consumer processing capacity is not good

①消费者,调整fetch.max.bytes大小,默认是50m。
① Consumers, adjust the size of fetch.max.bytes, the default is 50m.

②消费者,调整max.poll.records大小,默认是500条。
② Consumers, adjust the size of max.poll.records, the default is 500.

如果下游是Spark、Flink等计算引擎,消费到数据之后还要进行计算分析处理,当处理能力跟不上消费能力时,会导致背压的出现,从而使消费的速率下降。
If the downstream is computing engines such as Spark and Flink, calculation and analysis processing shall be carried out after data consumption. When the processing capacity cannot keep up with the consumption capacity, backpressure will occur, thus reducing the consumption rate.

需要对计算性能进行调优(看Spark、Flink优化)。
Compute performance needs to be tuned (see Spark, Flink optimization).

(3)消息积压后如何处理
(3) How to deal with the backlog of messages

某时刻,突然开始积压消息且持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题。
At some point, the backlog suddenly starts and continues to rise. This situation requires you to find the cause of the backlog in a short time and quickly solve the problem.

导致消息积压突然增加,只有两种:发送变快了或者消费变慢了。
There are only two ways message backlogs can suddenly increase: sending faster or consuming slower.

假如赶上大促或者抢购时,短时间内不太可能优化消费端的代码来提升消费性能,此时唯一的办法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,只能降级一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,保证重要业务服务正常。
If you catch up with big promotion or rush purchase, it is unlikely to optimize the code of consumer end to improve consumption performance in a short time. At this time, the only way is to improve the overall consumption ability by expanding the number of instances of consumer end. If there is not enough server resources for expansion in a short time, it can only downgrade some unimportant services, reduce the amount of data sent by the sender, and at least make the system work normally to ensure that important services are normal.

假如通过内部监控到消费变慢了,需要你检查消费实例,分析一下是什么原因导致消费变慢?
If consumption slows down through internal monitoring, you need to check consumption examples and analyze what causes consumption to slow down?

优先查看日志是否有大量的消费错误。
① Prioritize checking logs to see if there are a lot of consumption errors.

此时如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程卡在哪里「触发死锁或者卡在某些等待资源」。
② At this time, if there is no error, you can print the stack information to see where your consumption thread is stuck "triggering deadlock or stuck in some waiting resources".

1.5.24 如何提升吞吐量
1.5.24 How to improve throughput

如何提升吞吐量?
How to improve throughput?

1)提升生产吞吐量
1) Increase production throughput

(1)buffer.memory:发送消息的缓冲区大小,默认值是32m,可以增加到64m
buffer.memory: The buffer size of the message sent, the default value is 32m, can be increased to 64m.

(2)batch.size:默认是16k。如果batch设置太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
Batch size: The default is 16k. If the batch setting is too small, it will cause frequent network requests and throughput reduction; if the batch is too large, it will cause a message to wait for a long time to be sent, increasing network latency.

(3)linger.ms,这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
linger.ms, which defaults to 0, meaning the message must be sent immediately. Generally set a 5-100 ms. If linger.ms is set too small, it will cause frequent network requests and throughput degradation; if linger.ms is too long, it will cause a message to wait for a long time to be sent, increasing network latency.

4)compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的CPU开销。
(4) compression.type: default is none, no compression, but you can also use lz4 compression, efficiency is good, compression can reduce data volume, improve throughput, but will increase the CPU overhead of the producer side.

2)增加分区
2) Additional zoning

3)消费者提高吞吐量
3) Consumers increase throughput

(1)调整fetch.max.bytes大小,默认是50m。
(1) Adjust the size of fetch.max.bytes, the default is 50m.

(2)调整max.poll.records大小,默认是500条。
(2) Adjust the size of max.poll.records, the default is 500.

1.5.25 Kafka中数据量计算
1.5.25 Calculation of data volume in Kafka

每天总数据量100g,每天产生1亿日志,10000万/24/60/60=1150条/秒钟
Total data volume per day 100g, generating 100 million logs per day, 100 million/24/60/60=1150 logs per second

平均每秒钟:1150
Average per second: 1,150

低谷钟:50
Low point per second: 50

高峰每秒钟:1150*(2-20= 2300条 - 23000条
Peak per second: 1150 *(2-20 times)= 2300 - 23000

每条日志大小0.5k - 2k(取1k
Size of each log: 0.5k - 2k (take 1k)

每秒多少数据量:2.0M - 20MB
How much data per second: 2.0M - 20MB

1.5.26 Kafka如何压测?
1.5.26 How does Kafka pressure test?

用Kafka官方自带的脚本,对Kafka进行压测。
Use Kafka's official script to test Kafka.

生产者压测:kafka-producer-perf-test.sh

消费者压测:kafka-consumer-perf-test.sh

1)Kafka Producer压力测试
1) Kafka Producer Pressure Test

(1)创建一个test Topic,设置为3个分区3个副本
(1) Create a test Topic, set to 3 partitions and 3 copies

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test

2)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下
(2) These two files are located in the/opt/module/kafka/bin directory. Let's test it.

[atguigu@hadoop105 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0

参数说明:
Parameter Description:

record-size是一条信息有多大,单位是字节,本次测试设置为1k。
Record-size is how big a piece of information is, in bytes, and this test is set to 1k.

num-records是总共发送多少条信息,本次测试设置为100万条。
num-records is the total number of messages sent, and this test is set to 1 million.

throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟1万条。
Throughput is the number of messages per second, set to-1, indicating unlimited flow, as fast as possible production data, can measure the maximum throughput of producers. The experiment was set at 10,000 per second.

producer-props 后面可以配置生产者相关参数,batch.size配置为16k
Producer-related parameters can be configured after producer-props, batch.size is configured to 16k.

输出结果:
Outputs:

ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0

37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency.

。。。 。。。

33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency.

1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th.

3)调整batch.size大小
(3) Adjust batch.size

4)调整linger.ms时间
(4) Adjust linger.ms time

5)调整压缩方式
(5) Adjust the compression method

6)调整缓存大小
(6) Adjust cache size

2)Kafka Consumer压力测试
2) Kafka Consumer Pressure Test

1)修改/opt/module/kafka/config/consumer.properties文件中的一次拉取条数为500
(1) Modify the number of items pulled at one time in the/opt/module/kafka/config/consumer.properties file to 500

max.poll.records=500

2)消费100万条日志进行压测
(2) Consumption of 1 million logs for pressure testing

[atguigu@hadoop105 kafka]$ bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties

参数说明:
Parameter Description:

--bootstrap-server指定Kafka集群地址
--bootstrap-server Specifies the Kafka cluster address

--topic 指定topic的名称
--topic Specifies the name of the topic

--messages 总共要消费的消息个数。本次实验100万条。
--messages Total number of messages to consume. 1 million experiments.

输出结果:
Outputs:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418

3)一次拉取条数为2000
(3) The number of strips pulled at one time is 2000

4)调整fetch.max.bytes大小为100m
Adjust fetch.max.bytes size to 100m

1.5.27 磁盘选择
1.5.27 Disk Selection

kafka底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。
The bottom layer of kafka is mainly sequential writing, and the sequential writing speed of solid state disk and mechanical hard disk is similar.

建议选择普通的机械硬盘。
It is recommended to choose ordinary mechanical hard disk.

每天总数据量:1亿条 * 1k 100g
Total data volume per day: 100 million * 1k ≈ 100g

100g * 副本2 * 保存时间3天 / 0.7 ≈ 1T
100g * copy 2 * storage time 3 days/ 0.7 ≈ 1T

建议三台服务器硬盘总大小,大于等于1T。
It is recommended that the total hard disk size of three servers be greater than or equal to 1T.

1.5.28 内存选择
1.5.28 Memory Selection

Kafka内存组成:堆内存 + 页缓存
Kafka memory composition: heap memory + page cache

1Kafka堆内存建议每个节点:10g ~ 15g
1) Kafka heap memory recommended each node: 10g ~ 15g

kafka-server-start.sh中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

fi

(1)查看Kafka进程号
(1) Check the Kafka process number

[atguigu@hadoop102 kafka]$ jps

2321 Kafka

5255 Jps

1931 QuorumPeerMain

2)根据Kafka进程号,查看Kafka的GC情况
(2) According to Kafka process number, check Kafka GC situation

[atguigu@hadoop102 kafka]$ jstat -gc 2321 1s 10

S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT GCT

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

参数说明:
Parameter Description:

YGC:年轻代垃圾回收次数;
YGC: Young generation garbage collection times;

3)根据Kafka进程号,查看Kafka的堆内存
(3) According to Kafka process number, check Kafka heap memory

[atguigu@hadoop102 kafka]$ jmap -heap 2321

… …

Heap Usage:

G1 Heap:

regions = 2048

capacity = 2147483648 (2048.0MB)

used = 246367744 (234.95458984375MB)

free = 1901115904 (1813.04541015625MB)

11.472392082214355% used

2)页缓存:
2) Page cache:

页缓存是Linux系统服务器的内存。我们只需要保证1个segment(1g)中25%的数据在内存中就好。
The page cache is the memory of the Linux system server. We only need to ensure that 25% of the data in 1 segment (1g) is in memory.

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如10个分区,页缓存大小=(10 * 1g * 25%)/ 3 1g
Page cache size per node =(number of partitions * 1g * 25%)/number of nodes. For example, 10 partitions, page cache size =(10 * 1g * 25%)/ 3 ≈ 1g

建议服务器内存大于等于11G。
Recommended server memory is greater than or equal to 11G.

1.5.29 CPU选择
1.5.29 CPU Selection

1)默认配置
1) Default configuration

num.io.threads = 8 负责写磁盘的线程数。
num.io.threads = 8 Number of threads responsible for writing to disk.

num.replica.fetchers = 1 副本拉取线程数。
num.replica.fetchers = 1 Number of replica pull threads.

num.network.threads = 3 数据传输线程数。
num. network.threads = 3 Number of data transfer threads.

2)建议配置
2) Recommended configuration

此外还有后台的一些其他线程,比如清理数据线程,Controller负责感知和管控整个集群的线程等等,这样算,每个Broker都会有上百个线程存在。根据经验,4核CPU处理几十个线程在高峰期会打满,8核勉强够用,而且再考虑到集群上还要运行其他的服务,所以部署Kafka的服务器一般建议在16核以上可以应对一两百个线程的工作,如果条件允许,给到24核甚至32核就更好。
In addition, there are some other threads in the background, such as cleaning up the data thread, Controller is responsible for sensing and controlling the threads of the entire cluster, etc., so that each Broker will have hundreds of threads. According to experience, 4-core CPU processing dozens of threads will be full at peak times, 8 cores are barely enough, and considering that other services must be run on the cluster, so it is generally recommended that Kafka servers be deployed at more than 16 cores to handle one or two hundred threads of work. If conditions permit, it is better to give 24 cores or even 32 cores.

num.io.threads = 16 负责写磁盘的线程数。
num.io.threads = 16 Number of threads responsible for writing to disk.

num.replica.fetchers = 2 副本拉取线程数。
num.replica.fetchers = 2 Number of replica pull threads.

num.network.threads = 6 数据传输线程数。
num. network.threads = 6 Number of data transfer threads.

服务器建议购买 32核CPU
Server recommends buying 32-core CPU

1.5.30 网络选择
1.5.30 Network Selection

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
Network bandwidth = peak throughput ≈ 20MB/s, select Gigabit NIC.

100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s
100 Mbps/s, 10 M/s, 1 byte = 8 bit, 100 Mbps/8 = 12.5M/s.

一般百兆的网卡(100Mbps=12.5m/s)、千兆的网卡(1000Mbps=125m/s)、万兆的网卡(1250m/s)。
General 100 Mbps network card (100Mbps=12.5m/s), Gigabit network card (1000Mbps=125m/s), 10 Gigabit network card (1250m/s).

一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
General 100 Mbps network card (100Mbps), Gigabit network card (1000Mbps), 10 Gigabit network card (10000Mbps). 100Mbps unit is bit;10M/s unit is byte ; 1 byte = 8 bit, 100Mbps/8 = 12.5 M/s.

通常选用千兆或者是万兆网卡。
Usually choose gigabit or 10 gigabit network card.

1.5.31 Kafka挂掉

在生产环境中,如果某个Kafka节点挂掉。
In a production environment, if a Kafka node fails.

正常处理办法:
Normal treatment method:

(1)先看日志,尝试重新启动一下,如果能启动正常,那直接解决。
(1) Look at the log first, try to restart it, if it can start normally, then solve it directly.

(2)如果重启不行,检查内存、CPU、网络带宽。调优=》调优不行增加资源
(2) If the restart does not work, check the memory, CPU, and network bandwidth. Tuning => Tuning does not increase resources

(3)如果将Kafka整个节点误删除,如果副本数大于等于2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。
(3) If the entire Kafka node is deleted by mistake, if the number of copies is greater than or equal to 2, a new node can be re-served in the same way as the new node, and Load Balancer can be executed.

1.5.32 Kafka的机器数量
1.5.32 Number of machines in Kafka

1.5.33 服役新节点退役旧节点
1.5.33 New nodes in service Retired old nodes

可以通过bin/kafka-reassign-partitions.sh脚本服役和退役节点。
The service and retirement nodes can be accessed via the bin/kafka-reassign-partitions.sh script.

1.5.34 Kafka单条日志传输大小
1.5.34 Kafka Single Log Transfer Size

Kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中,常常会出现一条消息大于1M,如果不对Kafka进行配置。则会出现生产者无法将消息推送到Kafka或消费者无法去消费Kafka里面的数据,这时我们就要对Kafka进行以下配置:server.properties
The default size of Kafka for message body is 1M, but in our application scenario, there will often be a message larger than 1M, if Kafka is not configured. Then there will be producers unable to push messages to Kafka or consumers unable to consume data in Kafka, then we need to configure Kafka as follows: server.properties.

参数名称
name of parameter

描述

message.max.bytes

默认1m,Broker端接收每个批次消息最大值。
Default 1m, maximum number of messages received by Broker.

max.request.size

默认1m,生产者发往Broker每个请求消息最大值。针对Topic级别设置消息体的大小。
Default 1m, maximum value of each request message sent by producer to Broker. Sets the size of the message body for the Topic level.

replica.fetch.max.bytes

默认1m,副本同步数据,每个批次消息最大值。
Default 1m, copy sync data, maximum per batch message.

fetch.max.bytes

默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).

1.5.35 Kafka参数优化
1.5.35 Kafka parameter optimization

重点调优参数:
Key tuning parameters:

(1)buffer.memory 32m

(2)batch.size:16k

(3)linger.ms默认0 调整 5-100ms

(4)compression.type采用压缩 snappy

5)消费者端调整fetch.max.bytes大小,默认是50m。
(5) The consumer adjusts the fetch.max.bytes size, which defaults to 50m.

6)消费者端调整max.poll.records大小,默认是500条。
(6) The consumer adjusts the size of max.poll.records, the default is 500.

(7)单条日志大小:message.max.bytes、max.request.size、replica.fetch.max.bytes适当调整2-10m

(8)Kafka堆内存建议每个节点:10g ~ 15g
(8) Kafka heap memory recommendations per node: 10g ~ 15g

kafka-server-start.sh中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

fi

9)增加CPU核数
(9) Increase CPU core count

num.io.threads = 8 负责写磁盘的线程数
num.io.threads = 8 Number of threads responsible for writing to disk

num.replica.fetchers = 1 副本拉取线程数
num.replica.fetchers = 1 Number of replica pull threads

num.network.threads = 3 数据传输线程数
num. network.threads = 3 Number of data transfer threads

(10)日志保存时间log.retention.hours 3
(10) Log retention time log.retention.hours 3 days

(11)副本数,调整为2
(11) Number of copies, adjusted to 2

1.6 Hive

1.6.1 Hive的架构
1.6.1 Hive's architecture

1.6.2 HQL转换为MR流程
1.6.2 HQL to MR Process

(1)解析器(SQLParser):将SQL字符串转换成抽象语法树(AST)
SQLParser: converts SQL strings into abstract syntax trees (AST)

(2)语义分析器(Semantic Analyzer):将AST进一步抽象为QueryBlock(可以理解为一个子查询划分成一个QueryBlock)
(2) Semantic Analyzer: AST is further abstracted into QueryBlock (can be understood as a subquery divided into a QueryBlock)

(2)逻辑计划生成器(Logical Plan Gen):由QueryBlock生成逻辑计划
(2) Logical Plan Gen: Generate logical plans from QueryBlock

(3)逻辑优化器(Logical Optimizer):对逻辑计划进行优化
(3) Logical Optimizer: Optimize the logical plan

(4)物理计划生成器(Physical Plan Gen):根据优化后的逻辑计划生成物理计划
(4) Physical Plan Gen: Generate physical plan according to optimized logical plan.

(5)物理优化器(Physical Optimizer):对物理计划进行优化
(5) Physical Optimizer: Optimize the physical plan

(6)执行器(Execution):执行该计划,得到查询结果并返回给客户端
(6) Execution: execute the plan, get the query result and return it to the client.

1.6.3 Hive和数据库比较
1.6.3 Hive vs. Database Comparison

Hive 和数据库除了拥有类似的查询语言,再无类似之处。
Hive and databases have nothing in common except a similar query language.

1数据存储位置
1) Data storage location

Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
Hive is stored in HDFS. Databases store data in block devices or local file systems.

2数据更新
2) Data update

Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的
Hive does not recommend rewriting data. The data in the database usually needs to be modified frequently.

3执行延迟
3) Implementation delay

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
Hive has higher execution latency. Database execution latency is low. Of course, this is conditional, that is, the data size is small, when the data size is large enough to exceed the processing capacity of the database, Hive's parallel computing can obviously show its advantages.

4数据规模
4) Data size

Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
Hive supports very large-scale data computations; databases can support smaller data scales.

1.6.4 内部表和外部表
1.6.4 Internal and external tables

元数据、原始数据
Metadata, raw data

1)删除数据时
1) When deleting data

内部表:元数据、原始数据,全删除
Internal tables: metadata, raw data, delete all

外部表:元数据 只删除
External tables: metadata deleted only

2)在公司生产环境下,什么时候创建内部表,什么时候创建外部表?
2) In a company production environment, when are internal tables created and when are external tables created?

在公司中绝大多数场景都是外部表。
Most scenarios in a company are external tables.

自己使用的临时表,才会创建内部表;
Internal tables are created only when temporary tables are used by oneself;

1.6.5 系统函数
1.6.5 System functions

1)数值函数
1) numerical function

(1)round:四舍五入;(2)ceil:向上取整;(3)floor:向下取整
round round

2)字符串函数
2) String functions

(1)substring:截取字符串;(2)replace:替换;(3)regexp_replace:正则替换
(1) substring: intercept string;(2) replace: replace;(3) regexp_replace: regular replacement

(4)regexp:正则匹配;(5)repeat:重复字符串;(6)split:字符串切割
(4) regexp: regular matching;(5) repeat: repeated string;(6) split: string cutting

(7)nvl:替换null值;(8)concat:拼接字符串;
(7) nvl: replace null value;(8) concat: concatenate string;

(9)concat_ws:以指定分隔符拼接字符串或者字符串数组;
(9) concat_ws: concatenates strings or arrays of strings with specified delimiters;

(10get_json_object:解析JSON字符串
(10) get_json_object: Parse JSON string

3)日期函数
3) Date function

(1)unix_timestamp:返回当前或指定时间的时间戳
unix_timestamp: Returns the timestamp of the current or specified time

(2)from_unixtime:转化UNIX时间戳(从 1970-01-01 00:00:00 UTC 到指定时间的秒数)到当前时区的时间格式
(2) from_unixtime: converts the UNIX timestamp (seconds from 1970-01-01 00:00:00 UTC to the specified time) to the time format of the current time zone

(3)current_date:当前日期
(3) current_date: current date

(4)current_timestamp:当前的日期加时间,并且精确的毫秒
(4) current_timestamp: the current date plus time, and accurate milliseconds

(5)month:获取日期中的月;(6)day:获取日期中的日
(5) month: month of the acquisition date;(6) day: day of the acquisition date

(7)datediff:两个日期相差的天数(结束日期减去开始日期的天数)
datediff: the number of days between two dates (the end date minus the start date)

(8)date_add:日期加天数;(9)date_sub:日期减天数
(8) date_add: date plus days;(9) date_sub: date minus days

(10)date_format:将标准日期解析成指定格式字符串
(10) date_format: Parses a standard date into a string in a specified format

4)流程控制函数
4) Process control function

(1)case when:条件判断函数
(1) case when: conditional judgment function

(2)if:条件判断,类似于Java中三元运算符
(2) if: conditional judgment, similar to the ternary operator in Java

5)集合函数
5) Set function

(1)array:声明array集合
(1) array: declared array collection

(2)map:创建map集合
(2) Map: Create a map collection

(3)named_struct:声明struct的属性和值
(3) named_struct: Declares the attributes and values of struct

(4)size:集合中元素的个数
size: the number of elements in the collection

(5)map_keys:返回map中的key
(5) map_keys: returns the key in the map

(6)map_values:返回map中的value

(7)array_contains:判断array中是否包含某个元素
array_contains: determines whether an array contains an element

(8)sort_array:将array中的元素排序
sort_array: sort elements in an array

6)聚合函数
6) Aggregate function

(1)collect_list:收集并形成list集合,结果不去重
(1) collect_list: collect and form a list collection, the result is not repeated

(2)collect_set:收集并形成set集合,结果去重
(2) collect_set: collect and form a set, and remove the duplicate results

1.6.6 自定义UDF、UDTF函数
1.6.6 Custom UDF, UDTF Functions

1)在项目中是否自定义过UDF、UDTF函数以及用他们处理了什么问题及自定义步骤?
1) Have UDF and UDTF functions been customized in the project, and what problems have been solved with them, and custom steps?

(1)目前项目中逻辑不是特别复杂就没有用自定义UDF和UDTF
(1) At present, if the logic in the project is not particularly complex, custom UDF and UDTF are not used.

(2)自定义UDF:继承G..UDF重写核心方法evaluate
(2) Custom UDF: Inheriting G.. UDF, override core method evaluate

(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
(3) Custom UDTF: inherit from GenericUDTF, rewrite 3 methods: initialize (customize the column name and type of output), process (return the result forward (result)), close

2)企业中一般什么场景下使用UDF/UDTF
2) Under what circumstances do UDF/UDTF usually be used in enterprises?

(1)因为自定义函数,可以将自定函数内部任意计算过程打印输出,方便调试。
(1) Because of the custom function, you can print out any calculation process inside the custom function for debugging.

(2)引入第三方jar包时,也需要。
(2) When introducing third-party jar packages, it is also required.

3)广告数仓中解析IP/UA使用hi ve的自定义函数
3) Analyze IP/UA in advertising warehouse using hive custom function

解析IP:可以调用接口/ 使用离线的IP2Region数据库
Resolving IP: interfaces can be invoked/offline IP2 Region databases used

解析UA:正则/ Hutool解析UA的工具类
UA Resolution: Regular/ Hutool Tools for UA Resolution

1.6.7 窗口函数
1.6.7 Window functions

一般在场景题中出现手写:分组TopN、行转列、列转行。
Handwriting generally appears in scene questions: grouping TopN, row to column, column to column.

按照功能,常用窗口可划分为如下几类:聚合函数、跨行取值函数、排名函数。
According to the function, common windows can be divided into the following categories: aggregation function, cross-line value function, ranking function.

1)聚合函数
1) aggregation function

max:最大值。
Max: Maximum value.

min:最小值。
min: Minimum value.

sum:求和。
sum: sum.

avg:平均值。
AVG: Average.

count:计数。
Count: Counting.

2)跨行取值函数
2) Cross-line value function

(1)lead和lag

注:lag和lead函数不支持自定义窗口。
Note: The lag and lead functions do not support custom windows.

(2)first_value和last_value

3)排名函数
3) Ranking function

注:rank 、dense_rank、row_number不支持自定义窗口。
Note: rank, dense_rank, row_number do not support custom windows.

1.6.8 Hive优化
1.6.8 Hive optimization

1.6.8.1 分组聚合
1.6.8.1 Group Aggregation

一个分组聚合的查询语句,默认是通过一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
A grouped aggregate query statement, by default, is completed through a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal.

分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个Hash Table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至Reduce端,完成最终的聚合。
The optimization of grouping aggregation mainly revolves around reducing the amount of Shuffle data, and the specific method is map-side aggregation. Map-side aggregation is to maintain a Hash Table on the map side, use it to complete partial aggregation, and then send the results of partial aggregation to the Reduce side according to the grouping fields to complete the final aggregation.

相关参数如下:
The relevant parameters are as follows:

--启用map-side聚合,默认是true
--Enable map-side aggregation, default is true

set hive.map.aggr=true;

--用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。
--Used to detect whether the source table data is suitable for map-side aggregation. The detection method is as follows: map-side aggregation is carried out on several pieces of data at first; if the ratio of the number of pieces after aggregation to the number of pieces before aggregation is less than the value, the table is considered suitable for map-side aggregation; otherwise, the table data is considered not suitable for map-side aggregation, and subsequent data will not be map-side aggregated.

set hive.map.aggr.hash.min.reduction=0.5;

--用于检测源表是否适合map-side聚合的条数。
--Number of entries used to detect whether the source table fits into map-side aggregation.

set hive.groupby.mapaggr.checkinterval=100000;

--map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
--map-side The hash table used for aggregation, occupying the maximum proportion of map task heap memory. If this value is exceeded, the hash table will be flushed once.

set hive.map.aggr.hash.force.flush.memory.threshold=0.9;

1.6.8.2 Map Join

Hive中默认最稳定的Join算法是Common Join。其通过一个MapReduce Job完成一个Join操作。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
The most stable Join algorithm in Hive is Common Join by default. It completes a Join operation with a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

优化Join的最为常用的手段就是Map Join,其可通过两个只有Map阶段的Job完成一个join操作。第一个Job会读取小表数据,将其制作为Hash Table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。
The most common way to optimize Join is Map Join, which can complete a join operation through two jobs with only Map phases. The first Job reads the small table data, makes it into a Hash Table, and uploads it to Hadoop distributed cache (essentially HDFS). The second Job reads the small table data from the distributed cache and caches it in the memory of the Map Task, then scans the large table data, so that the association operation can be completed on the map side.

注:由于Map Join需要缓存整个小标的数据,故只适用于大表Join小表的场景。
Note: Map Join needs to cache the data of the whole small table, so it is only applicable to the scene of large table Join small table.

相关参数如下:
The relevant parameters are as follows:

--启动Map Join自动转换
--Start Map Join automatic conversion

set hive.auto.convert.join=true;

--开启无条件转Map Join
--Open Unconditional Map Join

set hive.auto.convert.join.noconditionaltask=true;

--无条件转Map Join小表阈值,默认值10M,推荐设置为Map Task总内存的三分之一到二分之一
--Unconditional Map Join table threshold, default value 10M, recommended to set to 1/3 to 1/2 of the total memory of Map Task

set hive.auto.convert.join.noconditionaltask.size=10000000;

1.6.8.3 SMB Map Join

上节提到,Map Join只适用于大表Join小表的场景。若想提高大表Join大表的计算效率,可使用Sort Merge Bucket Map Join。
As mentioned in the previous section, Map Join is only applicable to scenarios where large tables Join small tables. To improve the computational efficiency of large table Join large table, use Sort Merge Bucket Map Join.

需要注意的是SMB Map Join有如下要求:
SMB Map Join has the following requirements:

(1)参与Join的表均为分桶表,且分桶字段为Join的关联字段。
(1) All tables participating in Join are bucket tables, and bucket fields are associated fields of Join.

(2)两表分桶数呈倍数关系。
(2) The number of barrels in the two tables is multiple.

(3)数据在分桶内是按关联字段有序的。
(3) The data is ordered in buckets according to associated fields.

SMB Join的核心原理如下:只要保证了上述三点要求的前两点,就能保证参与Join的两张表的分桶之间具有明确的关联关系,因此就可以在两表的分桶间进行Join操作了。
The core principle of SMB Join is as follows: As long as the first two points of the above three requirements are guaranteed, there can be a clear association between the buckets of the two tables participating in Join, so you can perform Join operation between the buckets of the two tables.

若能保证第三点,也就是参与Join的数据是有序的,这样就能使用数据库中常用的Join算法之一——Sort Merge Join了,Merge Join原理如下:
If you can ensure that the third point, that is, the data participating in the Join is ordered, you can use one of the Join algorithms commonly used in the database-Sort Merge Join. The principle of Merge Join is as follows:

在满足了上述三点要求之后,就能使用SMB Map Join了。
After meeting the above three requirements, you can use SMB Map Join.

由于SMB Map Join无需构建Hash Table也无需缓存小表数据,故其对内存要求很低。适用于大表Join大表的场景。
SMB Map Join has low memory requirements because it does not need to build Hash tables or cache small table data. For scenarios where large tables Join large tables.

1.6.8.4 Reduce并行度
1.6.8.4 Reduce parallelism

Reduce端的并行度,也就是Reduce个数,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
The parallelism of the Reduce side, that is, the number of Reduces, can be specified by the user himself, or Hive can estimate it according to the file size input by the MR Job.

Reduce端的并行度的相关参数如下:
The relevant parameters of parallelism on the Reduce side are as follows:

--指定Reduce端并行度,默认值为-1,表示用户未指定
--Specifies the reduce-side parallelism, the default value is-1, indicating that the user does not specify

set mapreduce.job.reduces;

--Reduce端并行度最大值
--Reduce maximum parallelism

set hive.exec.reducers.max;

--单个Reduce Task计算的数据量,用于估算Reduce并行度
--The amount of data for a single Reduce Task computation, used to estimate Reduce parallelism

set hive.exec.reducers.bytes.per.reducer;

Reduce端并行度的确定逻辑如下:
The logic for determining reduce-side parallelism is as follows:

若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive自行估算Reduce并行度,估算逻辑如下:
If the value of the specified parameter mapreduce.job.reduces is a non-negative integer, the Reduce parallelism is the specified value. Otherwise, Hive estimates the Reduce parallelism by itself. The estimation logic is as follows:

假设Job输入的文件大小为totalInputBytes
Assuming the file size of Job input is totalInputBytes

参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer。

参数hive.exec.reducers.max的值为maxReducers。

则Reduce端的并行度为:
Then the parallelism of the Reduce side is:

min(ceil(totalInputBytesbytesPerReducer),maxReducers)

根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。
From the above description, it can be seen that Hive estimates the Reduce parallelism by itself based on the file size of the entire MR Job input. Therefore, in some cases, the estimated parallelism may not be accurate, and the user needs to specify the Reduce parallelism according to the actual situation.

需要说明的是:若使用Tez或者是Spark引擎,Hive可根据计算统计信息(Statistics)估算Reduce并行度,其估算的结果相对更加准确。
It should be noted that if Tez or Spark engine is used, Hive can estimate the Reduce parallelism based on computational statistics, and the estimated result is relatively accurate.

1.6.8.5 小文件合并
1.6.8.5 Small file merge

若Hive的Reduce并行度设置不合理,或者估算不合理,就可能导致计算结果出现大量的小文件。该问题可由小文件合并任务解决。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
If Hive's Reduce parallelism setting is unreasonable, or the estimation is unreasonable, it may lead to a large number of small files in the calculation result. This problem can be solved by the small file merge task. The principle is to judge according to the average size of the output file of the calculation task. If the condition is met, an additional task will be started separately for merging.

相关参数为:
The relevant parameters are:

--开启合并map only任务输出的小文件
--Open merge map only task output small file

set hive.merge.mapfiles=true;

--开启合并map reduce任务输出的小文件
--Open small files for merge map reduce task output

set hive.merge.mapredfiles=true;

--合并后的文件大小
--Combined file size

set hive.merge.size.per.task=256000000;

--触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
--Threshold value for triggering small file merge task. If the average file size output by a calculation task is lower than this value, merge will be triggered.

set hive.merge.smallfiles.avgsize=16000000;

1.6.8.6 谓词下推
1.6.8.6 Predicate Push Down

谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。开启谓词下推优化后,无需调整SQL语句,Hive就会自动将过滤操作尽可能的前移动。
Predicate pushdown refers to moving the filtering operation forward as much as possible to reduce the amount of data in subsequent calculation steps. When predicate pushdown optimization is enabled, Hive automatically moves filtering operations as far forward as possible without adjusting SQL statements.

相关参数为:
The relevant parameters are:

--是否启动谓词下推(predicate pushdown)优化
--Whether to start predicate pushdown optimization

set hive.optimize.ppd = true;

1.6.8.7 并行执行
1.6.8.7 Parallel Execution

Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。相关参数如下:
Hive transforms an SQL statement into one or more stages, one MR Job per Stage. By default, Hive performs only one Stage at a time. However, a SQL statement may contain multiple Stages, but these Stages may not be completely interdependent, meaning that some Stages can be executed in parallel. Parallel execution is referred to here as parallel execution of these stages. The relevant parameters are as follows:

--启用并行执行优化,默认是关闭的
--Enable parallel execution optimization, default is off

set hive.exec.parallel=true;  

--同一个sql允许最大并行度,默认为8
--Maximum parallelism allowed for the same sql, default is 8

set hive.exec.parallel.thread.number=8;

1.6.8.8 CBO优化
1.6.8.8 CBO Optimization

CBO是指Cost based Optimizer,即基于计算成本的优化。
CBO stands for Cost Based Optimizer, i.e. optimization based on computational costs.

在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在Hive的MR引擎下主要用于Join的优化,例如多表Join的Join顺序。
In Hive, the computational cost model takes into account: rows of data, CPU, local IO, HDFS IO, network IO, etc. Hive calculates the computational cost of different execution plans for the same SQL statement and selects the execution plan with the lowest cost. At present, CBO is mainly used for Join optimization under Hive's MR engine, such as the Join order of multi-table Join.

相关参数为:
The relevant parameters are:

--是否启用cbo优化
--whether to enable cbo optimization

set hive.cbo.enable=true;

1.6.8.9 列式存储
1.6.8.9 Column Storage

采用ORC列式存储加快查询速度。
ORC column storage is adopted to speed up query.

id name age

1 zs 18

2 lishi 19

行:1 zs 18 2 lishi 19

列:1 2 zs lishi 18 19

select name from user

1.6.8.10 压缩
1.6.8.10 Compression

压缩减少磁盘IO:因为Hive底层计算引擎默认是MR,可以在Map输出端采用Snappy压缩。
Compression reduces disk IO: Because Hive's underlying compute engine defaults to MR, Snappy compression can be used on the Map output.

Map(Snappy ) Reduce

1.6.8.11 分区和分桶
1.6.8.11 Partitions and buckets

(1)创建分区表 防止后续全表扫描
(1) Create partitioned tables to prevent subsequent full table scans

(2)创建分桶表 对未知的复杂的数据进行提前采样
(2) Create bucket tables to sample unknown complex data in advance

1.6.8.12 更换引擎
1.6.8.12 Engine Replacement

1MR/Tez/Spark区别:

MR引擎:多Job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。
MR engine: multi-job series, disk-based, more places to drop disk. Although slow, it will definitely run out of results. General treatment, weekly, monthly and annual indicators.

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。
Spark engine: Although it also falls in Shuffle process, not all operators need Shuffle, especially multi-operator process, and the intermediate process does not fall in DAG directed acyclic graph. Both reliability and efficiency are considered. General treatment day indicators.

2)Tez引擎的优点
2) Advantages of Tez Engine

(1)使用DAG描述任务,可以减少MR中不必要的中间节点,从而减少磁盘IO和网络IO。
(1) Using DAG to describe tasks reduces unnecessary intermediate nodes in MR, thereby reducing disk IO and network IO.

(2)可更好的利用集群资源,例如Container重用、根据集群资源计算初始任务的并行度等。
(2) Cluster resources can be better utilized, such as Container reuse, parallelism of initial tasks calculated according to cluster resources, etc.

(3)可在任务运行时,根据具体数据量,动态的调整后续任务的并行度。
(3) The parallelism of subsequent tasks can be dynamically adjusted according to the specific data amount when the task is running.

1.6.8.13 几十张表join 如何优化
1.6.8.13 Dozens of tables join How to optimize

(1)减少join的表数量:不影响业务前提,可以考虑将一些表进行预处理和合并,从而减少join操作。
(1) Reduce the number of join tables: Without affecting the business premise, you can consider preprocessing and merging some tables to reduce the join operation.

(2)使用Map Join:将小表加载到内存中,从而避免了Reduce操作,提高了性能。通过设置hive.auto.convert.join为true来启用自动Map Join。
(2) Use Map Join: Load small tables into memory, thus avoiding the Reduce operation and improving performance. Enable automatic Map Join by setting hive.auto.convert.join to true.

(3)使用Bucketed Map Join:通过设置hive.optimize.bucketmapjoin为true来启用Bucketed Map Join。

(4)使用Sort Merge Join:这种方式在Map阶段完成排序,从而减少了Reduce阶段的计算量。通过设置hive.auto.convert.sortmerge.join为true来启用。
(4) Sort Merge Join: This method completes the sorting in the Map phase, thus reducing the amount of computation in the Reduce phase. Enable by setting hive.auto.convert.sortmerge.join to true.

(5)控制Reduce任务数量:通过合理设置hive.exec.reducers.bytes.per.reducer和mapreduce.job.reduces参数来控制Reduce任务的数量。
(5) Control the number of Reduce tasks: Control the number of Reduce tasks by setting hive.exec.reducers.bytes.per.reducer and mapreduce.job.reduces.

(6)过滤不需要的数据:join操作之前,尽量过滤掉不需要的数据,从而提高性能。
(6) Filter unwanted data: Before joining operations, try to filter out unwanted data to improve performance.

(7)选择合适的join顺序:将小表放在前面可以减少中间结果的数据量,提高性能。
(7) Choose the right join order: Putting small tables in front can reduce the amount of data in the middle result and improve performance.

(8)使用分区:可以考虑使用分区技术。只需要读取与查询条件匹配的分区数据,从而减少数据量和计算量。
(8) Use zoning: Consider using zoning techniques. Only partition data matching the query criteria needs to be read, reducing the amount of data and computation.

(9)使用压缩:通过对数据进行压缩,可以减少磁盘和网络IO,提高性能。注意选择合适的压缩格式和压缩级别。
(9) Use compression: By compressing data, you can reduce disk and network IO and improve performance. Pay attention to choosing the appropriate compression format and compression level.

(10)调整Hive配置参数:根据集群的硬件资源和实际需求,合理调整Hive的配置参数,如内存、CPU、IO等,以提高性能。
(10) Adjust Hive configuration parameters: According to the hardware resources and actual needs of the cluster, reasonably adjust Hive configuration parameters, such as memory, CPU, IO, etc., to improve performance.

1.6.9 Hive解决数据倾斜方法
1.6.9 Hive Solutions for Data Skew

数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。以下为生产环境中数据倾斜的现象:
The data skew problem usually refers to the uneven distribution of data involved in computation, that is, the data volume of a certain key or some keys far exceeds that of other keys, resulting in a large number of data of the same key being sent to the same Reduce in the shuffle stage, which in turn leads to the time required for the Reduce far exceeding that of other Reductions, becoming the bottleneck of the whole task. The following are examples of data skewing in production environments:

Hive中的数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍在上述两种场景下的优化思路。
Data skew in Hive often occurs in the scenarios of grouping aggregation and join operations. The optimization ideas in the above two scenarios are described below.

1)分组聚合导致的数据倾斜
1) Data skew caused by grouping aggregation

前文提到过,Hive中的分组聚合是由一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。若group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜。
As mentioned earlier, packet aggregation in Hive is done by a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal. If the values of the group by field are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew.

由分组聚合导致的数据倾斜问题,有如下解决思路:
The data skew problem caused by grouping aggregation can be solved as follows:

(1)判断倾斜的值是否为null
(1) Determine whether the value of inclination is null

若倾斜的值为null,可考虑最终结果是否需要这部分数据,若不需要,只要提前将null过滤掉,就能解决问题。若需要保留这部分数据,考虑以下思路。
If the value of tilt is null, consider whether the final result needs this part of data. If not, as long as null is filtered out in advance, the problem can be solved. If you need to retain this data, consider the following ideas.

2Map-Side聚合

开启Map-Side聚合后,数据会现在Map端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map端聚合能完全屏蔽数据倾斜问题。
When Map-Side aggregation is enabled, the data will now be partially aggregated on the Map side. In this way, even if the original data is skewed, after the initial aggregation at the Map end, the data sent to Reduce is no longer skewed. At its best, Map-side aggregation can completely mask the data skew problem.

相关参数如下:
The relevant parameters are as follows:

set hive.map.aggr=true;

set hive.map.aggr.hash.min.reduction=0.5;

set hive.groupby.mapaggr.checkinterval=100000;

set hive.map.aggr.hash.force.flush.memory.threshold=0.9;

3Skew-GroupBy优化

Skew-GroupBy是Hive提供的一个专门用来解决分组聚合导致的数据倾斜问题的方案。其原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,并完成部分聚合,第二个MR按照分组字段分区,完成最终聚合。
Skew-GroupBy is a solution provided by Hive to solve the data skew problem caused by grouping aggregation. The principle is to start two MR tasks. The first MR is partitioned according to random numbers, sending data to Reduce and completing partial aggregation. The second MR is partitioned according to grouping fields to complete final aggregation.

相关参数如下:
The relevant parameters are as follows:

--启用分组聚合数据倾斜优化
--Enable group aggregation data skew optimization

set hive.groupby.skewindata=true;

2)Join导致的数据倾斜
2) Data skew caused by Join

若Join操作使用的是Common Join算法,就会通过一个MapReduce Job完成计算。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
If the Join operation uses the Common Join algorithm, the computation is done via a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
If the values of the associated fields are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew problems.

Join导致的数据倾斜问题,有如下解决思路:
The data skew problem caused by Join can be solved as follows:

1Map Join

使用Map Join算法,Join操作仅在Map端就能完成,没有Shuffle操作,没有Reduce阶段,自然不会产生Reduce端的数据倾斜。该方案适用于大表Join小表时发生数据倾斜的场景。
Using Map Join algorithm, Join operation can be completed only on Map side, there is no Shuffle operation, there is no Reduce stage, and naturally there will be no data skew on Reduce side. This scenario is suitable for scenarios where data skew occurs when a large table joins a small table.

相关参数如下:
The relevant parameters are as follows:

set hive.auto.convert.join=true;

set hive.auto.convert.join.noconditionaltask=true;

set hive.auto.convert.join.noconditionaltask.size=10000000;

(2)Skew Join

若参与Join的两表均为大表,Map Join就难以应对了。此时可考虑Skew Join,其核心原理是Skew Join的原理是,为倾斜的大key单独启动一个Map Join任务进行计算,其余key进行正常的Common Join。原理图如下:
If the two tables participating in the Join are both large tables, Map Join will be difficult to cope with. At this time, we can consider Skew Join, whose core principle is Skew Join. The principle of Skew Join is to start a Map Join task for inclined large keys separately, and the rest of the keys are used for normal Common Join. The schematic diagram is as follows:

相关参数如下:
The relevant parameters are as follows:

--启用skew join优化
--Enable skew join optimization

set hive.optimize.skewjoin=true;

--触发skew join的阈值,若某个key的行数超过该参数值,则触发
--Threshold for triggering skew join. Trigger if the number of rows in a key exceeds the parameter value.

set hive.skewjoin.key=100000;

3)调整SQL语句
3) Adjust SQL statements

若参与Join的两表均为大表,其中一张表的数据是倾斜的,此时也可通过以下方式对SQL语句进行相应的调整。
If the two tables participating in the Join are both large tables, and the data of one of the tables is skewed, you can also adjust the SQL statement accordingly in the following ways.

假设原始SQL语句如下:A,B两表均为大表,且其中一张表的数据是倾斜的。
Suppose the original SQL statement is as follows: A, B are both large tables, and one of the tables has skewed data.

hive (default)>

select

*

from A

join B

on A.id=B.id;

Join过程如下:
The process of joining is as follows:

图中1001为倾斜的大key,可以看到,其被发往了同一个Reduce进行处理。
1001 in the figure is a large tilted key, which can be seen to be sent to the same Reduce for processing.

调整之后的SQL语句执行计划如下图所示:
The SQL statement execution plan after adjustment is shown in the following figure:

调整SQL语句如下:
Adjust SQL statements as follows:

hive (default)>

select

*

from(

select --打散操作
select --scatter operation

concat(id,'_',cast(rand()*2 as int)) id,

value

from A

)ta

join(

select --扩容操作
select --expansion operation

concat(id,'_',1) id,

value

from B

union all

select

concat(id,'_',2) id,

value

from B

)tb

on ta.id=tb.id;

1.6.10 Hive的数据中含有字段的分隔符怎么处理?
1.6.10 What about separators in Hive data that contain fields?

Hive 默认的字段分隔符为Ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和JavaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束
Hive's default field delimiter is the Ascii code control character\001 (^A), and fields terminated by '\001' is used when creating tables. Note: If\t or\001 is used as the delimiter, it is required that the data passed from the front-end buried point and JavaEE background must not appear the delimiter, which is constrained by the code specification.

一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。通常采用Sqoop和DataX在同步数据时预处理。
Once the transmitted data contains delimiters, escape or substitution (ETL) is required in the previous level of data. Sqoop and DataX are usually used to preprocess data when synchronizing.

id name age

1 zs 18

2 li分隔符si 19
2 li separator si 19

1.6.11 MySQL元数据备份
MySQL Metadata Backup

元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)。
Metadata backup (key point, if the data is damaged, the whole cluster may not be able to run, at least two copies should be backed up to other servers after zero point every day).

(1)MySQL备份数据脚本(建议每天定时执行一次备份元数据)
(1) MySQL backup data script (it is recommended to backup metadata regularly once a day)

#/bin/bash

#常量设置

MYSQL_HOST='hadoop102'

MYSQL_USER='root'

MYSQL_PASSWORD='000000'

# 备份目录,需提前创建
#Backup directory, need to be created in advance

BACKUP_DIR='/root/mysql-backup'

# 备份天数,超过这个值,最旧的备份会被删除
#backup days, beyond which the oldest backup will be deleted

FILE_ROLL_COUNT='7'

# 备份MySQL数据库
#Backup MySQL database

[ -d "${BACKUP_DIR}" ] || exit 1

mysqldump \

--all-databases \

--opt \

--single-transaction \

--source-data=2 \

--default-character-set=utf8 \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \

-p"${MYSQL_PASSWORD}" | gzip > "${BACKUP_DIR}/$(date +%F).gz"

if [ "$(ls "${BACKUP_DIR}" | wc -l )" -gt "${FILE_ROLL_COUNT}" ]

then

ls "${BACKUP_DIR}" | sort |sed -n 1p | xargs -I {} -n1 rm -rf "${BACKUP_DIR}"/{}

fi

2)MySQL恢复数据脚本
(2) MySQL Recovery Data Script

#/bin/bash

#常量设置

MYSQL_HOST='hadoop102'

MYSQL_USER='root'

MYSQL_PASSWORD='000000'

BACKUP_DIR='/root/mysql-backup'

# 恢复指定日期,不指定就恢复最新数据
#Restore specified date, restore latest data without specifying

RESTORE_DATE=''

[ "${RESTORE_DATE}" ] && BACKUP_FILE="${RESTORE_DATE}.gz" || BACKUP_FILE="$(ls ${BACKUP_DIR} | sort -r | sed -n 1p)"

gunzip "${BACKUP_DIR}/${BACKUP_FILE}" --stdout | mysql \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \

-p"${MYSQL_PASSWORD}"

1.6.12 如何创建二级分区表?
1.6.12 How do I create a secondary partition table?

create table dept_partition2(

deptno int, -- 部门编号
deptno int, --department number

dname string, -- 部门名称
dname string, --Department name

)

partitioned by (day string, hour string)

row format delimited fields terminated by '\t';

1.6.13 UnionUnion all区别

(1)union会将联合的结果集去重
(1) union will deduplicate the result set of the union

(2)union all不会对结果集去重
(2) union all does not duplicate the result set

1.7 Datax

1.7.1 DataX与Sqoop区别

1)DataX与Sqoop都是主要用于离线系统中批量同步数据处理场景。
1) DataX and Sqoop are mainly used for batch synchronous data processing scenarios in offline systems.

2)DataX和Sqoop区别如下:
2) DataX and Sqoop differ as follows:

(1)DataX底层是单进程多线程;Sqoop底层是4个Map;
(1) DataX bottom layer is single process multithread;Sqoop bottom layer is 4 maps;

(2)数据量大的场景优先考虑Sqoop分布式同步;数据量小的场景优先考虑DataX,完全基于内存;DataX数据量大,可以使用多个DataX实例,每个实例负责一部分(手动划分)。
(2) Sqoop distributed synchronization is preferred for scenarios with large data volume; DataX is preferred for scenarios with small data volume, which is completely memory-based;DataX has large data volume, and multiple DataX instances can be used, each of which is responsible for a part (manual division).

(3)Sqoop是为Hadoop而生的,对Hadoop相关组件兼容性比较好;Datax是插件化开发,支持的Source和Sink更多一些。
(3) Sqoop is born for Hadoop and has good compatibility with Hadoop-related components;Datax is plug-in development and supports more Source and Sink.

(4)Sqoop目前官方不在升级维护;DataX目前阿里在升级维护
(4) Sqoop is not officially upgraded and maintained;DataX is currently upgraded and maintained by Ali.

(5)关于运行日志与统计信息,DataX更丰富,Sqoop基于Yarn不容易采集
(5) About running logs and statistics, DataX is richer, Sqoop is not easy to collect based on Yarn

1.7.2 速度控制
1.7.2 Speed control

1)关键优化参数如下:
1) The key optimization parameters are as follows:

参数

说明

job.setting.speed.channel

总并发数
Total Number of Concurrence

job.setting.speed.record

总record限速
Total record speed limit

job.setting.speed.byte

总byte限速
Total byte speed limit

core.transport.channel.speed.record

单个channel的record限速,默认值为10000(10000条/s)
Record speed limit of a single channel, default value is 10000 (10000/s)

core.transport.channel.speed.byte

单个channel的byte限速,默认值1024*1024(1M/s)
Byte speed limit of single channel, default value 1024*1024 (1M/s)

2)生效优先级:
2) Priority:

(1)全局Byte限速 / 单Channel Byte限速
(1) Global Byte Speed Limit/Single Channel Byte Speed Limit

(2)全局Record限速 / Channel Record限速
(2) Global Record Speed Limit/Single Channel Record Speed Limit

两个都设置,取结果小的
Set both, take the smaller result

(3)上面都没设置,总Channel数的设置生效
(3) None of the above is set, and the setting of the total number of channels takes effect.

3)项目配置
3) Project configuration

只设置 总channel数=5,基本可以跑满网卡带宽。
Just set the total number of channels =5, and you can basically run full network card bandwidth.

1.7.3 内存调整
1.7.3 Memory Adjustments

建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
It is recommended to set the memory to 4G or 8G, which can also be adjusted according to the actual situation.

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
There are two ways to adjust JVM xms xmx parameters: one is to directly change the datax.py script; the other is to add the corresponding parameters at startup, as follows:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

1.7.4 空值处理
1.7.4 Handling of null values

1)MySQL(null) => Hive (\N) 要求Hive建表语句
1) MySQL (null)=> Hive (\N) requires Hive table creation statements

解决该问题的方案有两个:
There are two solutions to this problem:

(1)修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考https://blog.csdn.net/u010834071/article/details/105506580
(1) Modify the source code of DataX HDFS Writer and add logic to customize null value storage format. Please refer to https://blog.csdn.net/u010834071/article/details/105506580.

(2)在Hive中建表时指定null值存储格式为空字符串(''),例如:
(2) Specify null values when building tables in Hive to be stored in an empty string (''), for example:

DROP TABLE IF EXISTS base_province;

CREATE EXTERNAL TABLE base_province

(

`id` STRING COMMENT '编号',

`name` STRING COMMENT '省份名称',

`region_id` STRING COMMENT '地区ID',

`area_code` STRING COMMENT '地区编码',

`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_code` STRING COMMENT 'Old ISO-3166-2 code for visualization',

`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
`iso_3166_2` STRING COMMENT 'New IOS-3166-2 code for visualization'

) COMMENT '省份表'
) COMMENT 'Province table'

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

NULL DEFINED AS ''

LOCATION '/base_province/';

2Hive(\N => MySQL null

"reader": {

"name": "hdfsreader",

"parameter": {

"defaultFS": "hdfs://hadoop102:8020",

"path": "/base_province",

"column": [

"*"

],

"fileType": "text",

"compress": "gzip",

"encoding": "UTF-8",

"nullFormat": "\\N",

"fieldDelimiter": "\t",

}

}

1.7.5 配置文件生成脚本
1.7.5 Profile Generation Scripts

(1)一个表一个配置,如果有几千张表,怎么编写的配置?
(1) A table a configuration, if there are thousands of tables, how to write the configuration?

2)脚本使用说明
(2) Description of script usage

python gen_import_config.py -d database -t table

1.7.6 DataX一天导入多少数据
1.7.6 DataX How much data is imported in a day

1)全量同步的表如下
1) The full synchronization table is as follows

活动表、优惠规则表、优惠卷表、SKU平台属性表、SKU销售属性表
Activity table, Offer rule table, Coupon table, SKU platform attribute table, SKU sales attribute table

SPU商品表(1-2万)、SKU商品表(10-20万)、品牌表、商品一级分类、商品二级分类、商品三级分类
SPU commodity table (100,000 - 200,000), SKU commodity table (100,000 - 200,000), brand table, commodity primary classification, commodity secondary classification, commodity tertiary classification

省份表、地区表
Province table, region table

编码字典表
coding dictionary table

以上全部加一起30万条,约等于300m。
All of the above add up to 300,000, which is about 300m.

加购表(每天增量20万、全量100万 =》1g)
Additional purchase table (daily increment of 200,000, total amount of 1 million => 1g)

所以Datax每天全量同步的数据1-2g左右。
Therefore, Datax has about 1-2g of data synchronized every day.

注意:金融、保险(平安 、民生银行),只有业务数据数据量大一些。
Note: Finance, insurance (Ping An, Minsheng Bank), only the amount of business data is larger.

2)增量同步的表如下
2) The incremental synchronization table is as follows

加购表(20万)、订单表(10万)、订单详情表(15万)、订单状态表、支付表(9万)、退单表(1000)、退款表(1000
Additional purchase table (200,000), order table (100,000), order details table (150,000), order status table, payment table (90,000), return table (1000), refund table (1000)

订单明细优惠卷关联表、优惠卷领用表
Order Details Coupon Association Table, Coupon Collection Table

商品评论表、收藏表
Product review table, collection table

用户表、订单明细活动关联表
User Table, Order Details Activity Association Table

增量数据每天1-2g
Incremental data 1-2g per day

1.7.7 Datax如何实现增量同步
1.7.7 Datax How to achieve incremental synchronization

获取今天新增和变化的数据:通过sql过滤,创建时间是今天或者操作时间等于今天。
Get today's new and changed data: filtered by sql, created today or operated on today.

1.8 Maxwell

1.8.1 Maxwell与Canal、FlinkCDC的对比

1)FlinkCDC、Maxwell、Canal都是主要用于实时系统中实时数据同步处理场景。
1) FlinkCDC, Maxwell and Canal are mainly used for real-time data synchronization processing scenarios in real-time systems.

FlinkCDC

Maxwell

Canal

SQL与数据条数关系
SQL and Data Number Relationship

SQL影响几条出现几条
SQL affects a few items appear a few items

SQL影响几条出现几条
SQL affects a few items appear a few items

只有一整条(后续可能需要炸开)
Only one whole (may need to be blown later)

数据初始化功能(同步全量数据)
Data initialization function (synchronous full data)

有(支持多库多表同时做)
Yes (support multi-database and multi-table simultaneous operation)

有(单表)
Yes (single table)

断点续传功能
breakpoint resume function

有(放在CK)
Yes (on CK)

有(存在MySQL)
Yes (MySQL exists)

有(本地)
Yes (local)

1.8.2 Maxwell好处

支持断点续传。
Support breakpoint resume.

全量初始化同步。
Full initialization synchronization.

自动根据库名和表名把数据发往Kafka的对应主题。
Automatically send data to the corresponding topic of Kafka based on the library name and table name.

1.8.3 Maxwell底层原理
1.8.3 Maxwell's underlying principles

MySQL主从复制。
MySQL master-slave replication.

1.8.4 全量同步速度如何
1.8.4 How about full synchronous speed

同步速度慢,全量同步建议采用Sqoop或者DataX
Slow synchronization speed, full synchronization is recommended to use Sqoop or DataX.

1.8.5 Maxwell数据重复问题
1.8.5 Maxwell data duplication problem

同步历史数据时,bootstrap会扫描所有数据。
When synchronizing historical data, bootstrap scans all data.

同时maxwell会监听binlog变化。
Maxwell also listens for binlog changes.

例如:用bootstrap同步历史数据库时,历史数据库中新插入一条数据,这时bootstrap扫描到,maxwell进程也监控到了,这时就会出现数据重复问题。
For example, when synchronizing the historical database with bootstrap, a new piece of data is inserted into the historical database. At this time, bootstrap scans and maxwell processes monitor it. At this time, the data duplication problem will occur.

1.9 DolphinScheduler调度器

1.3.9版本,支持邮件、企业微信。
1.3.9 Version, support email, enterprise WeChat.

2.0.3版本,支持的报警信息更全一些,配置更容易。
2.0.3 Version, support alarm information more complete, easier configuration.

3.0.0以上版本,支持数据质量监控。
3.0.0 The above versions support data quality monitoring.

1.9.1 每天集群运行多少指标?
1.9.1 How many metrics does the cluster run per day?

每天跑100多个指标,有活动时跑200个左右。
Run more than 100 indicators every day, and run about 200 when there is activity.

1.9.2 任务挂了怎么办?
1.9.2 What if the mission fails?

(1)运行成功或者失败都会发邮件、发钉钉、集成自动打电话。
(1) E-mail, nail and integrated automatic phone call will be sent if the operation succeeds or fails.

(2)最主要的解决方案就是,看日志,解决问题。
(2) The main solution is to look at the log and solve the problem.

(3)报警网站睿象云,http://www.onealert.com/
(3) Alarm website Ruixiangyun, www.onealert.com/

(4)双11和618活动需要24小时值班
(4) Double 11 and 618 activities require 24-hour duty

1.9.3DS挂了怎么办?
1.9.3 What happens when DS dies?

看日志报错原因:直接重启,资源不够增加资源在重启
See the log error reason: direct restart, insufficient resources to increase resources in the restart

1.10 Spark Core & SQL

1.10.1 Spark运行模式
1.10.1 Spark operating mode

(1)Local运行在一台机器上。测试用。
Local: Running on a machine. For testing.

(2)Standalone是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
Standalone: A scheduling system for Spark itself. When cluster performance requirements are very high. It is rarely used domestically.

(3)Yarn:采用Hadoop的资源调度器 国内大量使用。
Yarn: Resource scheduler with Hadoop. Large domestic use.

Yarn-client模式:Driver运行在Client上(不在AM里)
Yarn-client mode: Driver runs on Client (not AM)

Yarn-cluster模式:Driver在AM上

(4)Mesos:国内很少使用
(4) Mesos: rarely used in the country.

(5)K8S:趋势,但是目前不成熟,需要的配置信息太多。
(5) K8S: Trend, but immature at present, too much configuration information is required.

1.10.2 Spark常用端口号
1.10.2 Common port numbers for Spark

(1)4040 spark-shell任务端口
(1) 4040 spark-shell task port

(2)7077 内部通讯端口。类比Hadoop的8020/9000
(2) 7077 internal communication port. Analogy of Hadoop 8020/9000

(3)8080 查看任务执行情况端口。 类比Hadoop的8088
(3) Port 8080 to view task execution status. Analogy of Hadoop 8088

(4)18080 历史服务器。类比Hadoop的19888
(4) 18080 History Server. Analogy of Hadoop 19888

注意:由于Spark只负责计算,所有并没有Hadoop中存储数据的端口9870/50070
Note: Since Spark is only responsible for computing, there is no port 9870/50070 for storing data in Hadoop.

1.10.3 RDD五大属性
1.10.3 RDD Five Attributes

1.10.4 RDD弹性体现在哪里
1.10.4 Where is RDD Resilience

主要表现为存储弹性、计算弹性、任务(Task、Stage)弹性、数据位置弹性,具体如下:
It is mainly manifested as storage elasticity, calculation elasticity, task elasticity and data location elasticity, as follows:

(1)自动进行内存和磁盘切换
(1) Automatic memory and disk switching

(2)基于lineage的高效容错
(2) Efficient fault tolerance based on lineages

(3)Task如果失败会特定次数的重试
(3) Task will retry a certain number of times if it fails

(4)Stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
(4) Stage automatically retries a certain number of times if it fails, and only failed fragments will be calculated.

(5)Checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
(5) Checkpoint [every RDD operation will generate a new RDD, if the chain is long, the calculation is heavy, put the data in the hard disk] and persist [data multiplexing in memory or disk](checkpoint, persistence)

(6)数据调度弹性:DAG Task 和资源管理无关
(6) Data scheduling flexibility: DAG Task is independent of resource management

(7)数据分片的高度弹性repartion
(7) Highly elastic reparations of data fragmentation

1.10.5 Spark的转换算子(8个)
1.10.5 Conversion operators of Spark (8)

1)单Value

(1)map

(2)mapPartitions

(3)mapPartitionsWithIndex

(4)flatMap

(5)groupBy

(6)filter

(7)distinct

(8)coalesce

(9)repartition

(10)sortBy

2)双vlaue

(1)intersection

(2)union

(3)subtract

(4)zip

3)Key-Value

(1)partitionBy

(2)reduceByKey

(3)groupByKey

(4)sortByKey

(5)mapValues

(6)join

1.10.6 Spark的行动算子(5个)
1.10.6 Spark's Action Operators (5)

(1)reduce

(2)coll