This is a bilingual snapshot page saved by the user at 2024-6-25 19:24 for, provided with bilingual support by Immersive Translate. Learn how to save?

High frequency interview questions for big data technology in Silicon Valley


High-frequency interview questions on big data technology in Silicon Valley

(Author: Shang Silicon Valley Research Institute)

Version: V9.2.0


第1章 核心技术12
Chapter 1 Core Technologies 12

1.1 Linux&Shell12

1.1.1 Linux常用高级命令12
1.1.1 Common Linux Advanced Commands 12

1.1.2 Shell常用工具及写过的脚本12
1.1.2 Common Shell Tools and Scripts Written 12

1.1.3 Shell中单引号和双引号区别13
1.1.3 Differences between single and double quotation marks in shells 13

1.2 Hadoop13

1.2.1 Hadoop常用端口号13
1.2.1 Common Hadoop port number 13

1.2.2 HDFS读流程和写流程14
1.2.2 HDFS read and write flows 14

1.2.3 HDFS小文件处理15
1.2.3 HDFS Small File Handling 15

1.2.4 HDFS的NameNode内存15

1.2.5 Shuffle及优化16

1.2.6 Yarn工作机制17
1.2.6 How Yarn works 17

1.2.7 Yarn调度器17
1.2.7 Yarn scheduler 17

1.2.8 HDFS块大小18
1.2.8 HDFS block size 18

1.2.9 Hadoop脑裂原因及解决办法?19
1.2.9 What are the causes and solutions of Hadoop split-brain? 19

1.3 Zookeeper19

1.3.1 常用命令19
1.3.1 Common commands 19

1.3.2 选举机制19
1.3.2 Electoral mechanisms 19

1.3.3 Zookeeper符合法则中哪两个?21
1.3.3 Which two of the Zookeeper rules meet? 21

1.3.4 Zookeeper脑裂21

1.3.5 Zookeeper用来干嘛了21
1.3.5 What is Zookeeper used for 21

1.4 Flume21

1.4.1 Flume组成,Put事务,Take事务21
1.4.1 Flume Composition, Put Transaction, Take Transaction 21

1.4.2 Flume拦截器23
1.4.2 Flume interceptor 23

1.4.3 Flume Channel选择器24

1.4.4 Flume监控器24
1.4.4 Flume Monitor 24

1.4.5 Flume采集数据会丢失吗?24
1.4.5 Will the data collected by Flume be lost? 24

1.4.6 Flume如何提高吞吐量24
1.4.6 How Flume can increase throughput 24

1.5 Kafka24

1.5.1 Kafka架构24

1.5.2 Kafka生产端分区分配策略26
1.5.2 Kafka Production-side Partition Allocation Policy 26

1.5.3 Kafka丢不丢数据27
1.5.3 Kafka loses data 27

1.5.4 Kafka的ISR副本同步队列28
1.5.4 ISR replica synchronization queue for Kafka 28

1.5.5 Kafka数据重复28
1.5.5 Kafka data duplication 28

1.5.6 Kafka如何保证数据有序or怎么解决乱序29
1.5.6 How Kafka keeps data in order or solves disorder 29

1.5.7 Kafka分区Leader选举规则31
1.5.7 Kafka Divisional Leader Election Rule 31

1.5.8 Kafka中AR的顺序31
1.5.8 Order of AR in Kafka 31

1.5.9 Kafka日志保存时间32
1.5.9 Kafka log retention time 32

1.5.10 Kafka过期数据清理32
1.5.10 Kafka obsolete data cleanup 32

1.5.11 Kafka为什么能高效读写数据33
1.5.11 Why Kafka reads and writes data efficiently 33

1.5.12 自动创建主题34
1.5.12 Automatic topic creation 34

1.5.13 副本数设定34
1.5.13 Number of copies set 34

1.5.14 Kakfa分区数34
1.5.14 Number of Kakfa partitions 34

1.5.15 Kafka增加分区35
1.5.15 Kafka increases partition 35

1.5.16 Kafka中多少个Topic35

1.5.17 Kafka消费者是拉取数据还是推送数据35
1.5.17 Kafka Whether Consumers Pull or Push Data 35

1.5.18 Kafka消费端分区分配策略36
1.5.18 Kafka Consumer Partition Allocation Policy 36

1.5.19 消费者再平衡的条件37
1.5.19 Conditions for consumer rebalancing 37

1.5.20 指定Offset消费37
1.5.20 Specify Offset Consumption 37

1.5.21 指定时间消费38
1.5.21 Consumption at specified times 38

1.5.22 Kafka监控38

1.5.23 Kafka数据积压38
1.5.23 Kafka data backlog 38

1.5.24 如何提升吞吐量39
1.5.24 How to improve throughput 39

1.5.25 Kafka中数据量计算40
1.5.25 Calculation of data volume in Kafka 40

1.5.26 Kafka如何压测?40
1.5.26 How does Kafka pressure test? 40

1.5.27 磁盘选择42
1.5.27 Disk Selection 42

1.5.28 内存选择42
1.5.28 Memory Selection 42

1.5.29 CPU选择43

1.5.30 网络选择43
1.5.30 Network Selection 43

1.5.31 Kafka挂掉44

1.5.32 Kafka的机器数量44
1.5.32 Number of machines in Kafka 44

1.5.33 服役新节点退役旧节点44
1.5.33 New nodes in service Retired old nodes 44

1.5.34 Kafka单条日志传输大小44
1.5.34 Kafka Single Log Transfer Size 44

1.5.35 Kafka参数优化45
1.5.35 Kafka parameter optimization 45

1.6 Hive46

1.6.1 Hive的架构46
1.6.1 Hive architecture 46

1.6.2 HQL转换为MR流程46
1.6.2 HQL to MR Flow 46

1.6.3 Hive和数据库比较47
1.6.3 Hive vs. Database Comparison 47

1.6.4 内部表和外部表48
1.6.4 Internal and external tables 48

1.6.5 系统函数48
1.6.5 System functions 48

1.6.6 自定义UDF、UDTF函数49
1.6.6 Custom UDF, UDTF functions 49

1.6.7 窗口函数50
1.6.7 Window functions 50

1.6.8 Hive优化52

1.6.9 Hive解决数据倾斜方法59
1.6.9 Hive Solutions for Data Skew 59

1.6.10 Hive的数据中含有字段的分隔符怎么处理?64
1.6.10 What about separators in Hive data that contain fields? 64

1.6.11 MySQL元数据备份64
1.6.11 MySQL Metadata Backup 64

1.6.12 如何创建二级分区表?65
1.6.12 How do I create a secondary partition table? 65

1.6.13 Union与Union all区别65

1.7 Datax65

1.7.1 DataX与Sqoop区别65

1.7.2 速度控制66
1.7.2 Speed control 66

1.7.3 内存调整66
1.7.3 Memory Adjustments 66

1.7.4 空值处理66
1.7.4 Handling of null values 66

1.7.5 配置文件生成脚本67
1.7.5 Profile generation scripts 67

1.7.6 DataX一天导入多少数据67
1.7.6 DataX How much data is imported in a day 67

1.7.7 Datax如何实现增量同步68
1.7.7 Datax How to achieve incremental synchronization 68

1.8 Maxwell68

1.8.1 Maxwell与Canal、FlinkCDC的对比68

1.8.2 Maxwell好处68

1.8.3 Maxwell底层原理69
1.8.3 Maxwell's underlying principles 69

1.8.4 全量同步速度如何69
1.8.4 How to synchronize the speed of the whole quantity 69

1.8.5 Maxwell数据重复问题69
1.8.5 Maxwell Data Duplication Problem 69

1.9 DolphinScheduler调度器69

1.9.1 每天集群运行多少指标?69
1.9.1 How many metrics does the cluster run per day? 69

1.9.2 任务挂了怎么办?69
1.9.2 What if the mission fails? 69

1.9.3 What happens when DS dies? 69

1.10 Spark Core & SQL70

1.10.1 Spark运行模式70
1.10.1 Spark Operating Mode 70

1.10.2 Spark常用端口号70
1.10.2 Spark Common Port Number 70

1.10.3 RDD五大属性70
1.10.3 RDD Five Attributes 70

1.10.4 RDD弹性体现在哪里71
1.10.4 Where is RDD Resilience Reflected 71

1.10.5 Spark的转换算子(8个)71
1.10.5 Spark's transformation operators (8) 71

1.10.6 Spark的行动算子(5个)72
1.10.6 Spark's Action Operators (5) 72

1.10.7 map和mapPartitions区别72

1.10.8 Repartition和Coalesce区别72

1.10.9 reduceByKey与groupByKey的区别73

1.10.10 Spark中的血缘73
1.10.10 Bloodlines in Spark 73

1.10.11 Spark任务的划分73
1.10.11 Division of Spark Tasks 73

1.10.12 SparkSQL中RDD、DataFrame、DataSet三者的转换及区别74
1.10.12 Conversion and Difference of RDD, DataFrame and DataSet in SparkSQL 74

1.10.13 Hive on Spark和Spark on Hive区别74

1.10.14 Spark内核源码(重点)74
1.10.14 Spark kernel source code (focus) 74

1.10.15 Spark统一内存模型76
1.10.15 Spark Unified Memory Model 76

1.10.16 Spark为什么比MR快?77
1.10.16 Why is Spark faster than MR? 77

1.10.17 Spark Shuffle和Hadoop Shuffle区别?78

1.10.18 Spark提交作业参数(重点)78
1.10.18 Spark Submission Job Parameters (Key) 78

1.10.19 Spark任务使用什么进行提交,JavaEE界面还是脚本79
1.10.19 What do Spark tasks use for submission, Java EE interface or script 79

1.10.20 请列举会引起Shuffle过程的Spark算子,并简述功能。79
1.10.20 List the Spark operators that give rise to Shuffle processes and describe their functions briefly. 79

1.10.21 Spark操作数据库时,如何减少Spark运行中的数据库连接数?79
1.10.21 How do I reduce the number of database connections Spark has running when I'm working with a database? 79

1.10.22 Spark数据倾斜79
1.10.22 Spark data tilt 79

1.10.23 Spark3.0新特性79

1.12 Flink80

1.12.1 Flink基础架构组成?80
1.12.1 Flink Infrastructure Composition? 80

1.12.2Flink和Spark Streaming的区别?80

1.12.3 Flink提交作业流程及核心概念81
1.12.3 Flink Submission Workflow and Core Concepts 81

1.12.4 Flink的部署模式及区别?82
1.12.4 Flink deployment patterns and differences? 82

1.12.5 Flink任务的并行度优先级设置?资源一般如何配置?82
1.12.5 Parallel Priority Setting for Flink Tasks? How are resources generally allocated? 82

1.12.6 Flink的三种时间语义83
1.12.6 Flink's three temporal semantics 83

1.12.7 你对Watermark的认识83
1.12.7 What you know about Watermark 83

1.12.8 Watermark多并行度下的传递、生成原理83
1.12.8 Watermark Transmission and Generation Principle under Multi-parallelism 83

1.12.9 Flink怎么处理乱序和迟到数据?84
1.12.9 How does Flink handle out-of-order and late data? 84

1.12.10 说说Flink中的窗口(分类、生命周期、触发、划分)85
1.12.10 Talk about windows in Flink (classification, lifecycle, trigger, partition) 85

1.12.11 Flink的keyby怎么实现的分区?分区、分组的区别是什么?86
1.12.11 Flink keyby how to achieve partition? What is the difference between subdivisions and groupings? 86

1.12.12 Flink的Interval Join的实现原理?Join不上的怎么办?86
1.12.12 How does Flink's Interval Join work? What if I don't join? 86

1.12.13 介绍一下Flink的状态编程、状态机制?86
1.12.13 Introduce Flink's state programming and state mechanism? 86

1.12.14 Flink如何实现端到端一致性?87
1.12.14 How does Flink achieve end-to-end consistency? 87

1.12.15 分布式异步快照原理88
1.12.15 Principles of Distributed Asynchronous Snapshots 88

1.12.16 Checkpoint的参数怎么设置的?88
1.12.16 How are Checkpoint parameters set? 88

1.12.17 Barrier对齐和不对齐的区别88
1.12.17 Difference between Barrier alignment and misalignment 88

1.12.18 Flink内存模型(重点)89
1.12.18 Flink Memory Model (Key) 89

1.12.19 Flink常见的维表Join方案89
1.12.19 Flink Common Dimension Tables Join Scheme 89

1.12.20 Flink的上下文对象理解89
1.12.20 Context Object Understanding for Flink 89

1.12.21 Flink网络调优-缓冲消胀机制90
1.12.21 Flink Network Tuning-Buffering and Deinflation Mechanism 90

1.12.22 FlinkCDC锁表问题90
1.12.22 FlinkCDC lock table problem 90

1.13 HBase90

1.13.1 HBase存储结构90
1.13.1 HBase Storage Structure 90

1.13.2 HBase的写流程92
1.13.2 HBase writing flow 92

1.13.3 HBase的读流程93
1.13.3 HBase reading flow 93

1.13.4 HBase的合并94
1.13.4 Consolidation of HBase 94

1.13.5 RowKey设计原则94
1.13.5 RowKey Design Principles 94

1.13.6 RowKey如何设计94
1.13.6 How RowKey is designed 94

1.13.7 HBase二级索引原理95
1.13.7 HBase secondary indexing principle 95

1.14 Clickhouse95

1.14.1 Clickhouse的优势95

1.14.2 Clickhouse的引擎95

1.14.3 Flink写入Clickhouse怎么保证一致性?96
1.14.3 Flink writes Clickhouse How to guarantee consistency? 96

1.14.4 Clickhouse存储多少数据?几张表?96
1.14.4 How much data does Clickhouse store? How many forms? 96

1.14.5 Clickhouse使用本地表还是分布式表96
1.14.5 Clickhouse Using Local or Distributed Tables 96

1.14.6 Clickhouse的物化视图96
1.14.6 Materialized Views of Clickhouse 96

1.14.7 Clickhouse的优化97

1.14.8 Clickhouse的新特性Projection97

1.14.9 Cilckhouse的索引、底层存储97
1.14.9 Indexing and underlying storage for Cilckhouse 97

1.15 Doris99

1.15.1 Doris中的三种模型及对比?99
1.15.1 The three models in Doris and how they compare? 99

1.15.2 Doris的分区分桶怎么理解,怎么划分字段99
1.15.2 How to understand the partition and bucket of Doris, how to divide the field 99

1.15.3 生产中节点多少个,FE,BE 那个对于CPU和内存的消耗大99
1.15.3 How many nodes are in production, FE, BE That consumes more than 99 CPU and memory

1.15.4 Doris使用过程中遇到过哪些问题?99
1.15.4 What problems have you encountered with Doris? 99

1.15.5 Doris跨库查询,关联MySQL有使用过吗99
1.15.5 Doris cross-database query, associated MySQL have you ever used 99

1.15.6 Doris的roll up和物化视图区别99
1.15.6 Difference between roll up and materialized view of Doris 99

1.15.7 Doris的前缀索引99
1.15.7 Prefix index of Doris 99

1.16 可视化报表工具99
1.16 Visual reporting tools 99

1.17 JavaSE100

1.17.1 并发编程100
1.17.1 Concurrent programming 100

1.17.2 如何创建线程池100
1.17.2 How to create a thread pool 100

1.17.3 ThreadPoolExecutor构造函数参数解析101
1.17.3 Parsing ThreadPoolExecutor constructor parameters 101

1.17.4 线程的生命周期101
1.17.4 Life cycle of threads 101

1.17.5 notify和notifyall区别101

1.17.6 集合101
1.17.6 Collections 101

1.17.7 列举线程安全的Map集合101
1.17.7 Listing Thread-Safe Map Collections 101

1.17.8 StringBuffer和StringBuilder的区别101

1.17.9 HashMap和HashTable的区别101

1.17.10 HashMap的底层原理102
1.17.10 The underlying principle of HashMap 102

1.17.11 项目中使用过的设计模式103
1.17.11 Design patterns used in projects 103

1.18 MySQL104

1.18.1 SQL执行顺序104
1.18.1 SQL Execution Order 104


1.18.3 MyISAM与InnoDB的区别104

1.18.4 MySQL四种索引104
1.18.4 MySQL Four Indexes 104

1.18.5 MySQL的事务105

1.18.6 MySQL事务隔离级别105
MySQL Transaction Isolation Level 105

1.18.7 MyISAM与InnoDB对比105

1.18.8 B树和B+树对比106
1.18.8 Comparison of B trees and B+ trees

1.19 Redis106

1.19.1 Redis缓存穿透、缓存雪崩、缓存击穿106
1.19.1 Redis cache penetration, cache avalanche, cache breakdown 106

1.19.2 Redis哨兵模式107
1.19.2 Redis Sentinel Mode 107

1.19.3 Redis数据类型107
1.19.3 Redis data types 107

1.19.4 热数据通过什么样的方式导入Redis108
1.19.4 How hot data is imported into Redis 108

1.19.5 Redis的存储模式RDB,AOF108
1.19.5 Redis storage mode RDB, AOF 108

1.19.6 Redis存储的是k-v类型,为什么还会有Hash?108
1.19.6 Redis stores k-v type, why is there a Hash? 108

1.20 JVM108

第2章 离线数仓项目108
Chapter 2: The Last Day

2.1 提高自信108
2.1 Increased Confidence 108

2.2 为什么做这个项目109
2.2 Why did you do this project 109?

2.3 数仓概念109
2.3 Number of Concept 109

2.4 项目架构110
2.4 Project Architecture 110

2.5 框架版本选型110
2.5 Frame Version Selection 110

2.6 服务器选型112
2.6 Server selection 112

2.7 集群规模112
2.7 Cluster Size 112

2.8 人员配置参考115
2.8 Staffing Reference 115

2.8.1 整体架构115
2.8.1 Overall structure 115

2.8.2 你的的职级等级及晋升规则115
2.8.2 Your rank and promotion rules 115

2.8.3 人员配置参考116
2.8.3 Staffing Reference 116

2.9 从0-1搭建项目,你需要做什么?117
2.9 Building a project from 0-1, what do you need to do? 117

2.10 数仓建模准备118
2.10 Warehouse modeling preparation 118

2.11 数仓建模119
2.11 Warehouse modeling 119

2.12 数仓每层做了哪些事122
2.12 What do you do on each floor? 122.

2.13 数据量124
2.13 Volume of data 124

2.14 项目中遇到哪些问题?(*****)125
2.14 What problems were encountered in the project?(*****) 125

2.15 离线---业务126
2.15 Offline---Business 126

2.15.1 SKU和SPU126

2.15.2 订单表跟订单详情表区别?126
2.15.2 What is the difference between an order form and an order details form? 126

2.15.3 上卷和下钻126
2.15.3 Upper and Lower Drills 126

2.15.4 TOB和TOC解释127

2.15.5 流转G复活指标127
2.15.5 Circulation G Revival Indicator 127

2.15.6 活动的话,数据量会增加多少?怎么解决?128
2.15.6 How much more data will be added if the event is active? How? 128

2.15.7 哪个商品卖的好?128
2.15.7 Which product sells well? 128

2.15.8 数据仓库每天跑多少张表,大概什么时候运行,运行多久?128
2.15.8 How many tables does the data warehouse run per day, when and how long does it run? 128

2.15.9 哪张表数据量最大129
2.15.9 Which table has the largest amount of data 129

2.15.10 哪张表最费时间,有没有优化130
2.15.10 Which table is the most time-consuming and optimized? 130

2.15.11 并发峰值多少?大概哪个时间点?131
2.15.11 How many concurrent peaks? About what time? 131

2.15.12 分析过最难的指标131
2.15.12 The most difficult indicators analyzed 131

2.15.13 数仓中使用的哪种文件存储格式131
2.15.13 Which file storage format is used in Warehouse 131

2.15.14 数仓当中数据多久删除一次131
2.15.14 How often is data deleted in the warehouse 131

2.15.15 Mysql业务库中某张表发生变化,数仓中表需要做什么改变131
2.15.15 A table in Mysql business library has changed. What changes need to be made to the table in data warehouse 131

2.15.16 50多张表关联,如何进行性能调优131
2.15.16 50 Multiple table associations, how to tune performance 131

2.15.17 拉链表的退链如何实现131
2.15.17 How to realize the chain withdrawal of zipper table 131

2.15.18 离线数仓如何补数132
2.15.18 How to make up 132 offline positions

2.15.19 当ADS计算完,如何判断指标是正确的132
2.15.19 When ADS is calculated, how to determine whether the indicator is correct 132

2.15.20 ADS层指标计算错误,如何解决132
2.15.20 ADS layer index calculation error, how to solve 132

2.15.21 产品给新指标,该如何开发132
2.15.21 How to develop new indicators for products 132

2.15.22 新出指标,原有建模无法实现,如何操作133
2.15.22 New indicators, the original modeling can not be achieved, how to operate 133

2.15.23 和哪些部门沟通,以及沟通什么内容133
2.15.23 Which departments to communicate with and what to communicate

2.15.24 你们的需求和指标都是谁给的133
2.15.24 Who gave you your needs and indicators?

2.15.25 任务跑起来之后,整个集群资源占用比例133
2.15.25 After the task runs, the resource consumption ratio of the whole cluster is 133

2.15.26 业务场景:时间跨度比较大,数据模型的数据怎么更新的,例如:借款,使用一年,再还款,这个数据时间跨度大,在处理的时候怎么处理133
2.15.26 Business scenario: The time span is relatively large. How to update the data of the data model, for example: borrow money, use it for one year, and then repay it. The time span of this data is large. How to deal with it when processing 133

2.15.27 数据倾斜场景除了group by 和join外,还有哪些场景133
2.15.27 Data Tilt Scenarios What are the Scenarios besides group by and join?

2.15.28 你的公司方向是电子商务,自营的还是提货平台?你们会有自己的商品吗?134
2.15.28 Is your company oriented towards e-commerce, self-employed or delivery platform? Do you have your own merchandise? 134

2.15.29 ods事实表中订单状态会发生变化,你们是通过什么方式去监测数据变化的134
2.15.29 ods fact table order status will change, how do you monitor the data change 134

2.15.30 用户域你们构建了哪些事实表?登录事实表有哪些核心字段和指标?用户交易域连接起来有哪些表?134
2.15.30 User Domain What fact tables have you constructed? What are the core fields and metrics of the login fact table? What tables are there for linking user transaction domains? 134

2.15.31 当天订单没有闭环结束的数据量?135
2.15.31 Data volume of orders without closed loop closure on the day? 135

2.15.32 你们维度数据要做ETL吗?除了用户信息脱敏?没有做其他ETL吗136
2.15.32 Do you want ETL for dimensional data? Besides user information desensitization? No other ETL? 136

2.15.33 怎么做加密,加密数据要用怎么办,我讲的md5,他问我md5怎么做恢复136
2.15.33 How to encrypt, how to encrypt data, I talked about md5, he asked me how to do md5 recovery 136

2.15.34 真实项目流程137
2.15.34 Real Project Flow 137

2.15.35 指标的口径怎么统一的(离线这边口径变了,实时这边怎么去获取的口径)137
2.15.35 How to unify the caliber of indicators (the caliber changed offline, how to obtain the caliber in real time) 137

2.15.36 表生命周期管理怎么做的?138
2.15.36 How is life cycle management done? 138

2.15.37 如果上游数据链路非常的多,层级也非常的深,再知道处理链路和表的血缘的情况下,下游数据出现波动怎么处理?139
2.15.37 If the upstream data link is very many and the hierarchy is very deep, and then know how to deal with the link and the kinship of the table, how to deal with the fluctuation of the downstream data? 139

2.15.38 十亿条数据要一次查询多行用什么数据库比较好?139
2.15.38 Billion pieces of data to query multiple rows at once What database is better? 139

2.16 埋点139
2.16 Buried Point 139

第3章 实时数仓项目141
Chapter 3: The Last Day

3.1 为什么做这个项目141
3.1 Why did you do this project 141

3.2 项目架构141
3.2 Project Architecture 141

3.3 框架版本选型141
3.3 Frame Version Selection 141

3.4 服务器选型141
3.4 Server selection 141

3.5 集群规模142
3.5 Cluster Size 142

3.6 项目建模142
3.6 Project Modeling142

3.7 数据量144
3.7 Volume of data 144

3.7.1 数据分层数据量144
3.7.1 Data Stratification Data Volume 144

3.7.2 实时组件存储数据量145
3.7.2 Real-time Component Storage Data Volume 145

3.7.3 实时QPS峰值数据量145
3.7.3 Real-time QPS peak data volume 145

3.8 项目中遇到哪些问题及如何解决?145
3.8 What problems were encountered in the project and how to solve them? 145

3.8.1 业务数据采集框架选型问题145
3.8.1 Selection of Business Data Acquisition Framework 145

3.8.2 项目中哪里用到状态编程,状态是如何存储的,怎么解决大状态问题146
3.8.2 Where state programming is used in the project, how state is stored, and how to solve large state problems

3.8.3 项目中哪里遇到了反压,造成的危害,定位解决(*重点*)146
3.8.3 Where the project encountered back pressure, resulting in harm, positioning solution (* key *) 146

3.8.4 数据倾斜问题如何解决(****重点***)147
3.8.4 How to solve the data skew problem (**** Focus **) 147

3.8.5 数据如何保证一致性问题148
3.8.5 How to ensure consistency of data

3.8.6 FlinkSQL性能比较慢如何优化148
3.8.6 FlinkSQL performance is slow how to optimize 148

3.8.7 Kafka分区动态增加,Flink监控不到新分区数据导致数据丢失148
3.8.7 Kafka partition dynamically increases, Flink fails to monitor new partition data resulting in data loss 148

3.8.9 Kafka某个分区没有数据,导致下游水位线无法抬升,窗口无法关闭计算148
3.8.9 Kafka a partition has no data, resulting in the downstream water mark can not be raised, the window can not be closed calculation 148

3.8.10 Hbase的rowkey设计不合理导致的数据热点问题148
3.8.10 Data Hotspots Caused by Irrational Rowkey Design of Hbase 148

3.8.11 Redis和HBase的数据不一致问题148
3.8.11 Data inconsistency between Redis and HBase

3.8.12 双流join关联不上如何解决149
3.8.12 How to solve the problem of double stream join

3.9 生产经验150
3.9 Production experience 150

3.9.1 Flink任务提交使用那种模式,为何选用这种模式150
3.9.1 Which mode does Flink task submission use and why? 150

3.9.2 Flink任务提交参数,JobManager和TaskManager分别给多少150
3.9.2 Flink task submission parameters, JobManager and TaskManager how much 150

3.9.3 Flink任务并行度如何设置150
3.9.3 How to set Flink Task Parallelism to 150

3.9.4 项目中Flink作业Checkpoint参数如何设置150
3.9.4 How to set Flink Checkpoint parameter in project 150

3.9.5 迟到数据如何解决150
3.9.5 How to solve the problem of late data 150

3.9.6 实时数仓延迟多少151
3.9.6 How much is the delay in real-time counting 151

3.9.7 项目开发多久,维护了多久151
3.9.7 How long has the project been developed and maintained 151

3.9.8 如何处理缓存冷启动问题151
3.9.8 How to handle cache cold start problems

3.9.9 如何处理动态分流冷启动问题(主流数据先到,丢失数据怎么处理)151
3.9.9 How to deal with dynamic shunt cold start problem (mainstream data arrives first, how to deal with lost data) 151

3.9.10 代码升级,修改代码,如何上线151
3.9.10 Code upgrade, code modification, how to go online 151

3.9.11 如果现在做了5个Checkpoint,Flink Job挂掉之后想恢复到第三次Checkpoint保存的状态上,如何操作151
3.9.11 If 5 Checkpoints are made now, Flink Job hangs and wants to restore to the state saved by the third Checkpoint, how to operate 151

3.9.12 需要使用flink记录一群人,从北京出发到上海,记录出发时间和到达时间,同时要显示每个人用时多久,需要实时显示,如果让你来做,你怎么设计?152
3.9.12 Need to use flink to record a group of people, from Beijing to Shanghai, record departure time and arrival time, at the same time to show how long each person takes, need real-time display, if let you do, how do you design? 152

3.9.13 flink内部的数据质量和数据的时效怎么把控的152
3.9.13 Flink internal data quality and data timeliness how to control 152

3.9.14 实时任务问题(延迟)怎么排查152
3.9.14 How to troubleshoot real-time task problems (delays) 152

3.9.15 维度数据查询并发量152
3.9.15 Dimension Data Query Concurrent Volume 152

3.9.16 Prometheus+Grafana是自己搭的吗,监控哪些指标152
3.9.16 Is Prometheus+Grafana built by itself and which indicators are monitored 152

3.9.17 怎样在不停止任务的情况下改flink参数153
3.9.17 How to change flink parameters without stopping the task 153

3.9.18 hbase中有表,里面的1月份到3月份的数据我不要了,我需要删除它(彻底删除),要怎么做153
3.9.18 There is a table in hbase. I don't want the data from January to March in it. I need to delete it (delete it completely). How to do 153

3.9.19 如果flink程序的数据倾斜是偶然出现的,可能白天可能晚上突然出现,然后几个月都没有出现,没办法复现,怎么解决?153
3.9.19 If the data skew of flink program appears accidentally, it may suddenly appear during the day or at night, and then it does not appear for several months. There is no way to reproduce it. How to solve it? 153

3.9.20 维度数据改变之后,如何保证新join的维度数据是正确的数据153
3.9.20 How to ensure that the dimension data of the new join is correct after the dimension data is changed

3.10 实时---业务153
3.10 Real-time--business 153

3.10.1 数据采集到ODS层153
3.10.1 Data Acquisition to ODS Layer 153

3.10.2 ODS层154

3.10.3 DWD+DIM层154

3.10.4 DWS层155

3.10.5 ADS层157

第4章 数据考评平台项目158
Chapter 4: The Last Day

4.1 Background of the project 158

4.1.1 为什么做数据治理158
4.1.1 Why Data Governance 158

4.1.2 数据治理概念158
4.1.2 Data Governance Concepts

4.1.3 数据治理考评平台做的是什么158
4.1.3 What does the Data Governance Assessment Platform do?

4.1.4 考评指标158
4.1.4 Evaluation indicators 158

4.2 技术架构159
4.2 Technical architecture 159

4.3 项目实现了哪些功能159
4.3 What functions are implemented by the project 159

4.3.1 元数据的加载与处理及各表数据的页面接口159
4.3.1 Loading and processing of metadata and page interfaces for table data 159

4.3.2 数据治理考评链路(**核心**)159
4.3.2 Data Governance Assessment Link (** Core **) 159

4.3.3 数据治理考评结果核算160
4.3.3 Data Governance Assessment Results Accounting 160

4.3.4 可视化治理考评提供数据接口160
4.3.4 Visual governance assessment provides data interface 160

4.4 项目中的问题/及优化161
4.4 Problems in the project/optimization 161

4.4.1 计算hdfs路径数据量大小、最后修改访问时间161
4.4.1 Calculate hdfs path data size, last modified access time 161

4.4.2 考评器作用是什么?161
4.4.2 What is the role of the evaluator? 161

4.4.3 稍微难度考评器实现思路161
4.4.3 Slightly difficult evaluator implementation ideas 161

4.4.4 利用多线程优化考评计算161
4.4.4 Using multithreading to optimize evaluation calculations 161

4.4.5 实现过哪些指标161
4.4.5 What indicators have been achieved

第4章 用户画像项目162
Chapter 4: The Last Day

4.1 画像系统主要做了哪些事162
4.1 What does the system do? 162.

4.2 项目整体架构162
4.2 Overall project structure 162

4.3 讲一下标签计算的调度过程163
4.3 Let's talk about the scheduling process of label calculation 163.

4.4 整个标签的批处理过程163
4.4 Batch process for whole label 163

4.5 你们的画像平台有哪些功能 ?163
4.5 What are the functions of your portrait platform? 163

4.6 是否做过Web应用开发,实现了什么功能163
4.6 Have you done Web application development and what functions have been implemented? 163

4.7 画像平台的上下游164
4.7 upstream and downstream of the portrait platform 164

4.8 BitMap原理,及为什么可以提高性能164
4.8 BitMap Principle and Why It Can Improve Performance 164

第5章 数据湖项目164
Chapter 5-The Data Lake Project

5.1 数据湖与数据仓库对比164
5.1 Data Lake vs. Data Warehouse

5.2 为什么做这个项目?解决了什么痛点?164
5.2 Why do this project? What pain points were solved? 164

5.3 项目架构165
5.3 Project Architecture 165

5.4 业务165
5.4 Business 165

5.5 优化or遇到的问题怎么解决165
5.5 How to solve the problem of optimization or encounter 165

第6章 测试&上线流程166
Chapter 6: The Last Day

6.1 测试相关166
6.1 Test related 166

6.1.1 公司有多少台测试服务器?166
6.1.1 How many test servers does the company have? 166

6.1.2 测试服务器配置?166
6.1.2 Test Server Configuration? 166

6.1.3 测试数据哪来的?167
6.1.3 Where did the test data come from? 167

6.1.4 如何保证写的SQL正确性(重点)167
6.1.4 How to ensure the correctness of SQL writing (emphasis) 167

6.1.5 测试之后如何上线?167
6.1.5 How to go online after testing? 167

6.1.6 A/B测试了解167
6.1.6 A/B Test Understanding 167

6.2 项目实际工作流程169
6.2 Project actual workflow 169

6.3 项目当前版本号是多少?多久升级一次版本171
6.3 What is the current version number of the project? How often do I update version 171?

6.4 项目中实现一个需求大概多长时间171
6.4 How long does it take to implement a requirement in a project? 171

6.5 项目开发中每天做什么事171
6.5 What do you do every day in project development? 171

第7章 数据治理172
Chapter 7: Data Governance

7.1 元数据管理172
7.1 Metadata management 172

7.2 数据质量监控173
7.2 Data quality monitoring 173

7.2.1 监控原则173
7.2.1 Principles of surveillance 173

7.2.2 数据质量实现174
7.2.2 Data Quality Realization 174

7.2.3 实现数据质量监控,你具体怎么做,详细说?174
7.2.3 Data quality monitoring, how do you do it, in detail? 174

7.3 权限管理(Ranger)175
7.3 Authority Management (Ranger) 175

7.4 用户认证(Kerberos)175
7.4 User authentication (Kerberos) 175

7.5 数据治理176
7.5 Data Governance 176

第8章 中台178
Chapter 8: The Last Day

8.1 什么是中台?179
8.1 What is the middle stage? 179

8.2 各家中台180
8.2 180 in each home.

8.3 中台具体划分180
8.3 180 in detail.

8.4 中台使用场景181
8.4 Central Station Usage Scene 181

8.5 中台的痛点182
8.5 Pain point 182 in the middle stage

第9章 算法题(LeetCode)182
Chapter 9: LeetCode 182

9.1 时间复杂度、空间复杂度理解182
9.1 Time complexity, spatial complexity understanding 182

9.2 常见算法求解思想182
9.2 Common algorithms for solving ideas182

9.3 基本算法183
9.3 Basic algorithm 183

9.3.1 冒泡排序183
9.3.1 Bubble Sorting 183

9.3.2 快速排序183
9.3.2 Quick Sorting 183

9.3.3 归并排序184
9.3.3 Merged sorting 184

9.3.4 遍历二叉树185
9.3.4 Traversing Binary Trees 185

9.3.5 二分查找185
9.3.5 Binary search 185

9.4 小青蛙跳台阶186
9.4 Little Frog Jumping Steps 186

9.5 最长回文子串186
9.5 Longest palindrome substring 186

9.6 数字字符转化成IP186
9.6 Digital characters converted to IP 186

9.7 最大公约数187
9.7 Maximum common divisor 187

9.8 链表反转187
9.8 List inversion 187

9.9 数组寻找峰值187
9.9 Array Looking for Peak 187

第10章 场景题187
Chapter 10: The Last Day

10.1 手写Flink的UV187

10.2 Flink的分组TopN187

10.3 Spark的分组TopN187

10.4 如何快速从40亿条数据中快速判断,数据123是否存在187
10.4 How to quickly determine from 4 billion pieces of data whether data 123 exists 187

10.5 给你100G数据,1G内存,如何排序?187
10.5 Give you 100 gigabytes of data, 1 gigabyte of memory, how to sort? 187

10.6 公平调度器容器集中在同一个服务器上?187
10.6 Fair scheduler containers centralized on the same server? 187

10.7 匹马赛跑,1个赛道,每次5匹进行比赛,无法对每次比赛计时,但知道每次比赛结果的先后顺序,最少赛多少次可以找出前三名?188
10.7 A horse race, 1 track, 5 horses at a time, unable to time each race, but know the order of each race results, at least how many races can find the top three? 188

10.8 给定一个点、一条线、一个三角形、一个有向无环图,请用java面向对象的思想进行建模188
10.8 Given a point, a line, a triangle, and a directed acyclic graph, model it using java object-oriented thinking.

10.9 现场出了一道sql题,让说出sql的优化,优化后效率提升了多少188
10.9 A sql question was given on the spot. Let's say how much sql optimization has improved efficiency after optimization. 188

第11章 HQL场景题188
Chapter 11: The Last Day

第12章 面试说明188
Chapter 12: The Last Day

12.1 面试过程最关键的是什么?188
12.1 What is the most important part of the interview process? 188

12.2 面试时该怎么说?188
12.2 What should I say during the interview? 188

12.3 面试技巧189
12.3 Interview skills 189

12.3.1 六个常见问题189
12.3.1 Six common questions 189

12.3.2 两个注意事项190
12.3.2 Two considerations 190

12.3.3 自我介绍190
12.3.3 Self-introduction 190

第1章 核心技术
Chapter 1: The Core Technology

1.1 Linux&Shell

1.1.1 Linux常用高级命令
1.1.1 Linux Common Advanced Commands



command interpretation



Real-time display of resource usage (CPU, memory, and execution time) of various processes in the system


jmap -heap 进程号
jmap -heap process number

View a process memory


free -m

View system memory usage


ps -ef

viewing process


netstat -tunlp | grep 端口号

View port occupancy


du -sh 路径*
du -sh path *

View disk usage under path

例如:$ du -sh /opt/*
For example: $ du -sh /opt/*


df -h

View disk storage

1.1.2 Shell常用工具及写过的脚本
1.1.2 Shell Common Tools and Written Scripts


2) What scripts have been written in Shell?

(1) Cluster startup, distribution script


case $1 in


for i in hadoop102 hadoop103 hadoop104


ssh $i "绝对路径"
ssh $i "absolute path"






2)数仓层级内部的导入:ods->dwd->dws ->ads
(2) Import within warehouse hierarchy: ods->dwd->dws ->ads


②定义变量 APP=gmall
② Define variable APP=gmall

③ Acquisition time

传入 按照传入时间
incoming by incoming time

不传 T+1
No T+1


先按照当前天 写sql => 遇到时间 $do_date 遇到表 {$APP}.
First write sql => encounter time $do_date encounter table {$APP} according to the day before yesterday.

自定义函数 UDF UDTF {$APP}.


Execution of SQL

1.1.3 Shell中单引号和双引号区别
1.1.3 Shell Single Quotation and Double Quotation Difference

1) Create a file at/home/atguigu/bin

[atguigu@hadoop102 bin]$ vim

Add the following to the file



echo '$do_date'

echo "$do_date"

echo "'$do_date'"

echo '"$do_date"'

echo `date`

2) View execution results

[atguigu@hadoop102 bin]$ 2022-02-10





2022年 05月 02日 星期四 21:02:08 CST
Thursday May 2nd, 2022 21:02:08 CST

3) Summary:

(1) Single quotes do not take variable values

(2) Double quotes take variable values

(3) Back quotes `, execute the command in quotes

(4) Double quotation marks nested within single quotation marks, take out variable values

(5) Double quotation marks nested inside single quotation marks, do not take out variable values

1.2 Hadoop

1.2.1 Hadoop常用端口号
1.2.1 Common port numbers for Hadoop



Access HDFS port



Access MR Performance Port



history server



Client Access Cluster Port



1.2.2 HDFS读流程写流程
1.2.2 HDFS Read and Write Flow

Note: When HDFS writes the process, how does a dataNode hang up?

当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认,客户端会通知NameNode,NameNode检查并确认该块的副本与规定的不符,NameNode会通知闲置的DataNode去复制副本,并将挂掉的DataNode作下线处理。挂掉的DataNode节点恢复后, 删除该节点中曾经拷贝的不完整副本数据。
When a DataNode suddenly hangs up, the client does not receive the ack acknowledgement sent by the DataNode, and the client notifies the NameNode. The NameNode checks and confirms that the copy of the block does not match the specified copy. The NameNode notifies the idle DataNode to copy the copy, and the suspended DataNode is offline. After the suspended DataNode node is restored, delete the incomplete copy data that was copied in the node.

1.2.3 HDFS小文件处理
1.2.3 HDFS Small File Handling

1) What impact will it have?

(1) Storage level

1 file block, occupying 150 bytes of namenode memory

128G能存储多少文件块? 128 g* 1024m*1024kb*1024byte/150字节 = 9.1亿文件块
How many file blocks can 128G store? 128 g* 1024m*1024kb*1024 bytes/150 bytes = 910 million file blocks

(2) Calculation level

Each small file will play a MapTask, 1 MapTask default memory 1G. Waste of resources.

2) How to solve

(1) Use har filing method to file small files


(3) Write your own MR program to merge the small files generated into one large file. If it's Hive or Spark, there's a merge feature that automatically helps us merge.

(4) There are small file scenarios to enable JVM reuse; if there are no small files, do not enable JVM reuse, because it will occupy the used Task card slot until the task is completed.

JVM reuse enables JVM instances to be reused N times in the same job, and the value of N can be configured in Hadoop's mapred-site.xml file. Usually between 10-20.




<description>How many tasks to run per jvm,if set to -1 ,there is no limit</description>


1.2.4 HDFS的NameNode内存

1) Hadoop 2.x series, configuration NameNode default 2000m

2) Hadoop 3.x series, configuration NameNode memory is dynamically allocated

NameNode memory minimum of 1G, each increase of 1 million file blocks, an increase of 1G memory.

1.2.5 Shuffle及优化
1.2.5 Shuffle and optimization

1.2.6 Yarn工作机制
1.2.6 Yarn working mechanism

1.2.7 Yarn调度器
1.2.7 Yarn Scheduler

1) Hadoop scheduler is divided into three important categories

FIFO、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
FIFO, Capacity Scheduler, and Fair Scheduler.

Apache's default resource scheduler is Capacity Scheduler.

The default resource scheduler for CDH is the fair scheduler.

2) Difference

FIFO调度器:支持单队列 、先进先出 生产环境不会用。
FIFO scheduler: supports single queue, FIFO production environment will not be used.

Capacity Scheduler: Supports multiple queues. Queue resource allocation: priority is given to the queue with the lowest resource occupancy rate; job resource allocation: resources are allocated according to job priority and submission time order; container resource allocation: local principle (same node/same rack/different nodes and different racks)

Fair scheduler: Support multiple queues to ensure that each task has equal access to queue resources. When resources are insufficient, they can be allocated according to the shortfall.

3) How to choose in the production environment?

Dachang: If the concurrency requirements are relatively high, choose fairness, and require the server performance to be OK.

Small and medium-sized companies, cluster server resources are not sufficient to choose capacity.

4) How do you create queues in a production environment?

(1) The scheduler defaults to one default queue, which cannot meet the production requirements.

(2) According to departments: business department 1, business department 2.

(3) According to the business module: login registration, shopping cart, order.

5) What are the benefits of creating multiple queues?

(1) Worried that employees might not be careful, write recursive endless loop code that exhausts all resources.

(2) Implement the degraded use of tasks, and ensure that important task queue resources are sufficient in special periods.

Business Unit 1 (Important)= Business Unit 2 (More Important)= Orders (General)= Shopping Cart (General)= Login Registration (Secondary)

1.2.8 HDFS块大小
1.2.8 HDFS Block Size

1) Block size

1.x 64m

2.x 3.x 128m

本地 32m
Local 32m

企业 128m 256m 512m

2) Block Size Determinants

disk read/write speed

普通的机械硬盘 100m/s => 128m
Ordinary mechanical hard disk 100m/s => 128m

固态硬盘普通的 300m/s => 256m
Solid state drive ordinary 300m/s => 256m

内存镜像 500-600m/s => 512m
Memory Mirrors 500-600m/s => 512m

1.2.9 Hadoop脑裂原因及解决办法?
1.2.9 Hadoop Brain Split Causes and Solutions?

1) Causes of brain splitting

Leader failure, the system began to change dynasties, when the Follower completed all work and became the Leader, the original Leader revived (its failure may be temporarily disconnected or the system temporarily slowed down, unable to respond in time, but its NameNode process is still in), and for some reason its corresponding ZKFC did not set it to Standby, so the original Leader still thinks it is the Leader, the client will still respond to requests sent to it, so the brain split occurs.

2) Hadoop usually does not show brain splitting.

If there is a split brain, it means that multiple Namenode data are inconsistent, and only one of the data can be selected to retain. For example, there are three Namenode, namely nn1, nn2, nn3, there is a brain split, want to keep nn1 data, the steps are:

(1) Close nn2 and nn3

(2)在nn2和nn3节点重新执行数据同步命令:hdfs namenode -bootstrapStandby
(2) Re-execute the data synchronization command at nn2 and nn3 nodes: hdfs namenode -bootstrapStandby

(3) Restart nn2 and nn3

1.3 Zookeeper

1.3.1 常用命令
1.3.1 Common commands


1.3.2 选举机制
1.3.2 Electoral mechanisms

半数机制(过半机制):2n + 1,安装奇数台。
Half mechanism (half mechanism): 2n + 1, odd number of stations installed.

10 servers: 3.

20 servers: 5.

100 servers: 11.

More numbers, advantages: improve reliability; disadvantages: affect communication delay.

1.3.3 Zookeeper符合法则中哪两个?
1.3.3 Which two of Zookeeper's laws fit?

1.3.4 Zookeeper脑裂

Zookeeper uses a majority voting mechanism to prevent brain splitting.

1.3.5 Zookeeper用来干嘛了
1.3.5 What is Zookeeper Used For?

(1)作为HA的协调者:如 HDFS的HA、YARN的HA。
(1) As HA coordinator: such as HA of HDFS, HA of YARN.

(2) Dependent on components: such as Kafka, HBase, CK.

1.4 Flume

1.4.1 Flume组成,Put事务,Take事务
1.4.1 Flume Composition, Put Transaction, Take Transaction

1)Taildir Source

(1) Breakpoint continuation, multi-directory

(2) taildir underlying principle

(3) What if Taildir dies?

No loss: breakpoint resume

Duplicate data: Possible

(4) Existing problems and solutions

1 Question:

新文件判断条件 = iNode值 + 绝对路径(包含文件名)
New file judgment condition = iNode value + absolute path (including file name)

Log frame modified file name ="in the early morning causing rereading of yesterday's data

② Solution:

Option 1: It is recommended that the generated file name be dated. At the same time, configure the log generation framework to be unrenamed;

Option 2: Modify the TairDirSource source code, only determine the file according to the iNode value

Modify source video address:

2)file channel /memory channel/kafka channel

(1)File Channel

Data is stored on disk, advantages: high reliability; disadvantages: low transmission speed

Default capacity: 1 million events

Note: FileChannel can increase Flume throughput by configuring dataDirs to point to multiple paths, each path corresponding to a different hard disk.

(2)Memory Channel

Data storage in memory, advantages: fast transmission speed; disadvantages: poor reliability

Default capacity: 100 events

(3)Kafka Channel

Data stored in Kafka, disk-based;

Advantages: high reliability;

传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段
Fast transmission speed Kafka Channel is greater than Memory Channel + Kafka Sink

(4) How to choose the production environment

如果下一级是Kafka,优先选择Kafka Channel。
If the next level is Kafka, Kafka Channel is preferred.

如果是金融、对钱要求准确的公司,选择File Channel。
If you are a financial company, select File Channel.

如果就是普通的日志,通常可以选择Memory Channel。
If it is a normal log, you can usually choose Memory Channel.

每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?
Millions of pb-level billionaires lose data every day. Will they pick up a dollar?

3)HDFS Sink

(1)时间(半个小时) or 大小128m 且 设置Event个数等于0,该值默认10
(1) Time (half an hour) or Size 128m and set the number of Events equal to 0, which defaults to 10

具体参数:hdfs.rollInterval=1800,hdfs.rollSize=134217728 hdfs.rollCount=0

4) Business


Channel to Sink is Take Business

1.4.2 Flume拦截器
1.4.2 Flume interceptor

1) Interceptor precautions

(1) Timestamp interceptor: mainly to solve the zero drift problem

2) Custom interceptor steps

(1)实现 Interceptor

(2) Rewrite four methods

initialize 初始化

public Event intercept(Event event) 处理单个Event

public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)

close method

(3) Static inner class, implement Interceptor.Builder

3) Can the interceptor be used?

Timestamp interceptors are recommended. If you don't need to delay 15-20 minutes to process data, it's more troublesome.

1.4.3 Flume Channel选择器

Replicating: Default selector. Function: Send data to all channels at the next level.

Multiplexing: selective routing to specified channels.

1.4.4 Flume监控器
1.4.4 Flume Monitor

1) Monitoring abnormal phenomenon

With Ganglia Monitor, Flume attempts to submit far more than the number of successful attempts, indicating that Flume is running poorly. Mainly due to insufficient memory.

2) The solution?

(1)自身:默认内存是20m,考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g
(1) itself: default memory is 20m, consider increasing flume memory, modify flume memory to 4-6g in configuration file

(2) Find friends: increase the number of servers

搞活动 618 =》增加服务器 =》用完在退出
Engage in activity618 =》Add server =》Run out in exit

Log server configuration: 8-16g memory, 8T disk

1.4.5 Flume采集数据会丢失吗?
1.4.5 Is Flume collecting data lost?

如果是kafka channel 或者FileChannel不会丢失数据,数据存储可以存储在磁盘中。
If the kafka channel or FileChannel does not lose data, the data store can be stored on disk.

MemoryChannel may be lost.

1.4.6 Flume如何提高吞吐量
1.4.6 How Flume Increases Throughput

调整taildir sourcebatchSize大小可以控制吞吐量,默认大小100个Event。
Adjust the batchSize of taildir source to control throughput, the default size is 100 Events.

The bottleneck in throughput is usually network bandwidth.

1.5 Kafka

1.5.1 Kafka架构

Producer, Broker, Consumer, Zookeeper.

注意:Zookeeper中保存Broker id和controller等信息,但是没有生产者信息。
Note: Zookeeper stores Broker id and controller information, but no producer information.

1.5.2 Kafka生产端分区分配策略
1.5.2 Kafka production partition allocation strategy

Kafka officially implemented three partitions for us, namely DefaultPartition (the default partition used when no partition is specified), UniformStickyPartition, RoundRobinPartition.

1) DefaultPartition

The following figure illustrates the partition allocation policy for the default partitioner:

2) UniformStickyPartitioner Pure sticky partitioner

(1) If the partition number is specified, it will be allocated according to the specified partition number.

(2) If no partition is specified, use sticky partitioner


(1) If a partition is specified in the message, the specified partition is used.

(2) If no partition is specified, the message polls each partition, distributing the data evenly among each partition.

4) Custom Partitioner

自定义分区策略:可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口,重写 partition 方法来达到自定义分区效果。
Custom partitioning policies: Custom partitioning can be achieved by implementing the org.apache.kafka.clients.producer.Partitioner interface and overriding the partition method.

For example, if we want to implement random allocation, we only need the following code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return ThreadLocalRandom.current().nextInt(partitions.size());

Compute the total number of partitions for the topic and return a positive integer less than it randomly.

In a project, if you want to send data from a table in MySQL to a partition. It can be sent with the table name key.

1.5.3 Kafka丢不丢数据
1.5.3 Kafka loses data


acks=0, the producer sends the data regardless, poor reliability, high efficiency;

acks=1, the producer sends the data Leader response, the reliability is medium, the efficiency is medium;

acks=-1, all Follwer responses in the data Leader and ISR queue sent by the producer have high reliability and low efficiency;

In the production environment, acks=0 is rarely used;acks=1, generally used to transmit ordinary logs, allowing individual data to be lost;acks=-1, generally used to transmit data related to money, and for scenarios with high reliability requirements.

2) Broker angle

The number of copies is greater than or equal to 2.


1.5.4 Kafka的ISR副本同步队列
1.5.4 ISR replica synchronization queue for Kafka

ISR(In-Sync Replicas),副本同步队列。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s
ISR (In-Sync Replicas), replica synchronization queue. If a Follower does not send a communication request or synchronization data to the Leader for a long time, the Follower will be kicked out of the ISR. This time threshold is set by the parameter and defaults to 30s.

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
Any dimension exceeding the threshold will remove the Followers from the ISR and store them in the OSR (Outof-Sync Replicas) list. The newly added Followers will also be stored in the OSR first.

Kafka分区中的所有副本统称为AR = ISR + OSR
All copies in the Kafka partition are collectively referred to as AR = ISR + OSR

1.5.5 Kafka数据重复
1.5.5 Duplicate Kafka data

去重 = 幂等性 + 事务
Deduplication = idempotent + transaction

1) The idempotent principle

2) Idempotent configuration parameters

name of parameter



Whether idempotent is enabled, default true, means idempotent is enabled.

Before version 1.0.X, it needs to be set to 1. After version 1.0.X, it should be less than or equal to 5.


Failed retry times, greater than 0


Need to be set to all

3) Kafka transactions have five APIs as follows

// 1初始化事务
// 1 Initialize transactions

void initTransactions();

// 2开启事务
// 2 Open transaction

void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
// 3 Commit consumed offsets within transactions (primarily for consumers)

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

String consumerGroupId) throws ProducerFencedException;

// 4提交事务
// 4 Submission of affairs

void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
// 5 Discard transaction (similar to rollback transaction)

void abortTransaction() throws ProducerFencedException;

4) Summary

1) Producer's perspective

acks设置为-1 (acks=-1)。
acks is set to-1 (acks=-1).

幂等性(enable.idempotence = true) + 事务

(2) Broker-side perspective

分区副本大于等于2 --replication-factor 2)。
Partition replication factor greater than or equal to 2.

ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。
The minimum number of copies of a response in an ISR is greater than or equal to 2 (min.insync.replicas = 2).

(3) Consumers

事务 + 手动提交offset = false)。
Transaction + manual commit offset ( = false).

The destination of consumer output must support transactions (MySQL, Kafka).

1.5.6 Kafka如何保证数据有序or怎么解决乱序
1.5.6 Kafka How to keep data in order or how to solve disorder

1)Kafka 最多只保证单分区内的消息是有序的,所以如果要保证业务全局严格有序,就要设置 Topic 为单分区。
1) Kafka only ensures that messages in a single partition are ordered at most, so if you want to ensure strict order in the global business, you must set Topic to a single partition.

2) How to ensure that the data in a single partition is in order?

Note: The idempotent mechanism ensures that data is ordered as follows:

1.5.7 Kafka分区Leader选举规则
1.5.7 Kafka Divisional Leader Election Rules

ISR中存活为前提,按照AR中排在前面的优先。例如AR[1,0,2]ISR [102],那么Leader就会按照102的顺序轮询。
Survival in ISR is a prerequisite, according to the priority ranked first in AR. For example AR[1,0,2], ISR [1,0,2], then the Leader polls in the order of 1, 0, 2.

1.5.8 Kafka中AR的顺序
1.5.8 Order of AR in Kafka

If Kafka server has only 4 nodes, then set the number of partitions of Kafka to be greater than the number of servers. How to allocate storage replicas at the bottom of Kafka?

1) Create 16 partitions, 3 copies

(1) Create a new Topic named second.

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second

(2) Check the partition and copy situation.

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --describe --topic second

Topic: second4Partition: 0Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 1Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 2Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 3Leader: 3Replicas: 3,0,1Isr: 3,0,1

Topic: second4Partition: 4Leader: 0Replicas: 0,2,3Isr: 0,2,3

Topic: second4Partition: 5Leader: 1Replicas: 1,3,0Isr: 1,3,0

Topic: second4Partition: 6Leader: 2Replicas: 2,0,1Isr: 2,0,1

Topic: second4Partition: 7Leader: 3Replicas: 3,1,2Isr: 3,1,2

Topic: second4Partition: 8Leader: 0Replicas: 0,3,1Isr: 0,3,1

Topic: second4Partition: 9Leader: 1Replicas: 1,0,2Isr: 1,0,2

Topic: second4Partition: 10Leader: 2Replicas: 2,1,3Isr: 2,1,3

Topic: second4Partition: 11Leader: 3Replicas: 3,2,0Isr: 3,2,0

Topic: second4Partition: 12Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 13Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 14Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 15Leader: 3Replicas: 3,0,1Isr: 3,0,1

1.5.9 Kafka日志保存时间
1.5.9 Kafka log retention time

7 days by default; 3 days recommended for production environment.

1.5.10 Kafka过期数据清理
1.5.10 Kafka obsolete data cleanup

There are only two log cleaning strategies: delete and compact.

1) delete log: delete expired data

log.cleanup.policy = delete ,所有数据启用删除策略
log.cleanup.policy = delete, all data enable delete policy

(1) Based on time: open by default. Take the largest timestamp of all records in the segment as the timestamp of the file.

(2) Based on size: off by default. Exceeds the set total log size and deletes the oldest segment.

log.retention.bytes, which defaults to-1 for infinity.

Thinking: If part of the data in a segment is expired and part is not expired, what should be done?

2) Compact log compression

1.5.11 Kafka为什么能高效读写数据
1.5.11 Why Kafka reads and writes data efficiently

1) Kafka itself is a distributed cluster, which can adopt partition technology with high parallelism.

2) Read data using sparse index, you can quickly locate the data to be consumed

3) Sequential write disk

Kafka producer production data, to be written to the log file, the writing process is appended to the end of the file, for sequential writing. Official website data shows that the same disk, sequential write can reach 600M/s, while random write only 100K/s. This has to do with the mechanics of the disk. Sequential writing is faster because it saves a lot of head addressing time.

4)页缓存 + 零拷贝技术
4) Page cache + zero-copy technology

1.5.12 自动创建主题
1.5.12 Automatic Theme Creation

If the Broker side configuration parameter auto.create.topics.enable is set to true (the default is true), then when a producer sends a message to an uncreated topic, a topic with num.partitions (the default is 1) and a replica factor of default.replication.factor (the default is 1) is automatically created. In addition, when a consumer starts reading messages from an unknown topic, or when any client sends metadata requests to an unknown topic, a corresponding topic is automatically created. This unexpected way of creating themes increases the difficulty of theme management and maintenance. Production environments recommend setting this parameter to false.

(1) Sending data to a topic that has not been created five times in advance

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --topic five

>hello world

(2) See details of five themes

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --describe --topic five

1.5.13 副本设定
1.5.13 Number of copies set

Usually we set it to 2 or 3, and many companies set it to 2.

Duplication advantage: improved reliability; duplication disadvantage: increased network IO transmission.

1.5.14 Kakfa分区数
1.5.14 Number of Kakfa partitions

(1) Create a Topic with only one partition.

(2) Test the Producer throughput and Consumer throughput of this Topic.

(3) Assuming that their values are Tp and Tc, the unit can be MB/s.

(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / minTpTc)。
(4) Then assuming that the total target throughput is Tt, then the number of partitions = Tt / min (Tp, Tc).

例如:Producer吞吐量 = 20m/sConsumer吞吐量 = 50m/s,期望吞吐量100m/s
For example: Producer throughput = 20m/s;Consumer throughput = 50m/s, expected throughput 100m/s;

分区数 = 100 / 20 = 5分区
Number of partitions = 100 / 20 = 5 partitions

The number of partitions is generally set to 3-10

The number of partitions is not the more the better, nor the less the better. It is necessary to build a cluster, carry out pressure testing, and then flexibly adjust the number of partitions.

1.5.15 Kafka增加分区
1.5.15 Kafka adds partitions

1) Partitions can be increased by command line, but the number of partitions can only be increased, not decreased.

2) Why can the number of partitions only increase and not decrease?

(1) According to Kafka's existing code logic, this function can be implemented completely, but it will also make the complexity of the code increase sharply.

(2) There are many factors to consider to implement this function, such as how to deal with messages in deleted partitions.

If the partition disappears with it, the reliability of the message is not guaranteed;

If it needs to be retained, it needs to consider how to retain it, directly store it at the end of the existing partition, and the timestamp of the message will not increment. Therefore, components such as Spark and Flink that need message timestamp (event time) will be affected.

If you insert them scattered into existing partitions, then internal data replication can be resource-intensive when message volumes are high, and how can the availability of this topic be guaranteed during replication?

At the same time, sequential problems, transactional problems, and state machine switching between partitions and replicas have to be faced.

(3) On the contrary, the revenue point of this function is very low. If you really need to implement such a function, you can completely recreate a theme with a smaller number of partitions, and then copy the messages in the existing theme according to the established logic.

1.5.16 Kafka多少个Topic
1.5.16 How many topics in Kafka

ODS layer: 2

DWD layers: 20

1.5.17 Kafka消费者是拉取数据还是推送数据
1.5.17 Kafka Does the consumer pull or push data

Pull data.

1.5.18 Kafka消费端分区分配策略
1.5.18 Kafka Consumer Partition Allocation Policy

Viscous partition:

该分区分配算法是最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:
This partition allocation algorithm is the most complex one, which can be set by partition.assignment.strategy parameter. It has been introduced since version 0.11. The purpose is to make as few adjustments as possible on the previous allocation result when executing new allocation. It mainly achieves the following two goals:

(1)Topic Partition 的分配要尽量均衡。
(1) The distribution of Topic Partition should be balanced as much as possible.

(2)当 Rebalance 发生时,尽量与上一次分配结果保持一致。
(2) When Rebalance occurs, try to keep consistent with the previous allocation result.

Note: When two goals conflict, priority is given to the first goal, which can make the distribution more uniform, where the first goal is the three allocation strategies try to complete, and the second goal is the essence of the algorithm.

1.5.19 消费者再平衡的条件
1.5.19 Conditions for consumer rebalancing

1)Rebalance 的触发条件有三种
1) There are three trigger conditions for Rebalance.

(1)当Consumer Group 组成员数量发生变化(主动加入、主动离组或者故障下线等)。
(1) When the number of members of the Consumer Group changes (actively joining, actively leaving the group or failing to go offline, etc.).

(2) When the number of subscription topics or partitions changes.

2) Consumer failure offline situation

name of parameter


Connection timeout between Kafka consumer and coordinator, default 45s. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

The maximum time a consumer can process a message is 5 minutes. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

3) Actively join consumer groups

Adding consumers to existing concentrations also triggers Kafka rebalancing. Note that if Flink is downstream, Flink maintains its own offset and does not trigger Kafka rebalancing.

1.5.20 指定Offset消费
1.5.20 Specify Offset Consumption

Data can be consumed at any offset., 1000);

1.5.21 指定时间消费
1.5.21 Specified time consumption

Data can be consumed over time.

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);


1.5.22 Kafka监控

The company developed its own monitor.

Open source monitors: KafkaManager, KafkaMonitor, KafkaEagle.

1.5.23 Kafka数据积压
1.5.23 Kafka data backlog

1) Discovery of data backlog

Through Kafka's monitor Eagle, you can see the consumption lag, which is the backlog:

2) Resolved

(1) Insufficient consumer spending power

可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
① You can consider increasing the number of partitions of Topic, and at the same time increasing the number of consumers in the consumer group, the number of consumers = the number of partitions. (Both are missing.)

Increase the number of partitions;

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

② Increase the number of pulls per batch and improve the consumption ability of individual consumers.

name of parameter



默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).


The maximum number of messages returned by pulling data at one time. The default is 500.

(2) Consumer processing capacity is not good

① Consumers, adjust the size of fetch.max.bytes, the default is 50m.

② Consumers, adjust the size of max.poll.records, the default is 500.

If the downstream is computing engines such as Spark and Flink, calculation and analysis processing shall be carried out after data consumption. When the processing capacity cannot keep up with the consumption capacity, backpressure will occur, thus reducing the consumption rate.

Compute performance needs to be tuned (see Spark, Flink optimization).

(3) How to deal with the backlog of messages

At some point, the backlog suddenly starts and continues to rise. This situation requires you to find the cause of the backlog in a short time and quickly solve the problem.

There are only two ways message backlogs can suddenly increase: sending faster or consuming slower.

If you catch up with big promotion or rush purchase, it is unlikely to optimize the code of consumer end to improve consumption performance in a short time. At this time, the only way is to improve the overall consumption ability by expanding the number of instances of consumer end. If there is not enough server resources for expansion in a short time, it can only downgrade some unimportant services, reduce the amount of data sent by the sender, and at least make the system work normally to ensure that important services are normal.

If consumption slows down through internal monitoring, you need to check consumption examples and analyze what causes consumption to slow down?

① Prioritize checking logs to see if there are a lot of consumption errors.

② At this time, if there is no error, you can print the stack information to see where your consumption thread is stuck "triggering deadlock or stuck in some waiting resources".

1.5.24 如何提升吞吐量
1.5.24 How to improve throughput

How to improve throughput?

1) Increase production throughput

buffer.memory: The buffer size of the message sent, the default value is 32m, can be increased to 64m.

Batch size: The default is 16k. If the batch setting is too small, it will cause frequent network requests and throughput reduction; if the batch is too large, it will cause a message to wait for a long time to be sent, increasing network latency.

(3),这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。, which defaults to 0, meaning the message must be sent immediately. Generally set a 5-100 ms. If is set too small, it will cause frequent network requests and throughput degradation; if is too long, it will cause a message to wait for a long time to be sent, increasing network latency.

(4) compression.type: default is none, no compression, but you can also use lz4 compression, efficiency is good, compression can reduce data volume, improve throughput, but will increase the CPU overhead of the producer side.

2) Additional zoning

3) Consumers increase throughput

(1) Adjust the size of fetch.max.bytes, the default is 50m.

(2) Adjust the size of max.poll.records, the default is 500.

1.5.25 Kafka中数据量计算
1.5.25 Calculation of data volume in Kafka

Total data volume per day 100g, generating 100 million logs per day, 100 million/24/60/60=1150 logs per second

Average per second: 1,150

Low point per second: 50

高峰每秒钟:1150*(2-20= 2300条 - 23000条
Peak per second: 1150 *(2-20 times)= 2300 - 23000

每条日志大小0.5k - 2k(取1k
Size of each log: 0.5k - 2k (take 1k)

每秒多少数据量:2.0M - 20MB
How much data per second: 2.0M - 20MB

1.5.26 Kafka如何压测?
1.5.26 How does Kafka pressure test?

Use Kafka's official script to test Kafka.



1)Kafka Producer压力测试
1) Kafka Producer Pressure Test

(1)创建一个test Topic,设置为3个分区3个副本
(1) Create a test Topic, set to 3 partitions and 3 copies

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test

(2) These two files are located in the/opt/module/kafka/bin directory. Let's test it.

[atguigu@hadoop105 kafka]$ bin/ --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384

Parameter Description:

Record-size is how big a piece of information is, in bytes, and this test is set to 1k.

num-records is the total number of messages sent, and this test is set to 1 million.

throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟1万条。
Throughput is the number of messages per second, set to-1, indicating unlimited flow, as fast as possible production data, can measure the maximum throughput of producers. The experiment was set at 10,000 per second.

producer-props 后面可以配置生产者相关参数,batch.size配置为16k
Producer-related parameters can be configured after producer-props, batch.size is configured to 16k.


ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384

37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency.

。。。 。。。

33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency.

1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th.

(3) Adjust batch.size

(4) Adjust time

(5) Adjust the compression method

(6) Adjust cache size

2)Kafka Consumer压力测试
2) Kafka Consumer Pressure Test

(1) Modify the number of items pulled at one time in the/opt/module/kafka/config/ file to 500


(2) Consumption of 1 million logs for pressure testing

[atguigu@hadoop105 kafka]$ bin/ --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/

Parameter Description:

--bootstrap-server Specifies the Kafka cluster address

--topic 指定topic的名称
--topic Specifies the name of the topic

--messages 总共要消费的消息个数。本次实验100万条。
--messages Total number of messages to consume. 1 million experiments.


start.time, end.time,, MB.sec,, nMsg.sec,,, fetch.MB.sec, fetch.nMsg.sec

2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418

(3) The number of strips pulled at one time is 2000

Adjust fetch.max.bytes size to 100m

1.5.27 磁盘选择
1.5.27 Disk Selection

The bottom layer of kafka is mainly sequential writing, and the sequential writing speed of solid state disk and mechanical hard disk is similar.

It is recommended to choose ordinary mechanical hard disk.

每天总数据量:1亿条 * 1k 100g
Total data volume per day: 100 million * 1k ≈ 100g

100g * 副本2 * 保存时间3天 / 0.7 ≈ 1T
100g * copy 2 * storage time 3 days/ 0.7 ≈ 1T

It is recommended that the total hard disk size of three servers be greater than or equal to 1T.

1.5.28 内存选择
1.5.28 Memory Selection

Kafka内存组成:堆内存 + 页缓存
Kafka memory composition: heap memory + page cache

1Kafka堆内存建议每个节点:10g ~ 15g
1) Kafka heap memory recommended each node: 10g ~ 15g


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"


(1) Check the Kafka process number

[atguigu@hadoop102 kafka]$ jps

2321 Kafka

5255 Jps

1931 QuorumPeerMain

(2) According to Kafka process number, check Kafka GC situation

[atguigu@hadoop102 kafka]$ jstat -gc 2321 1s 10


0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

Parameter Description:

YGC: Young generation garbage collection times;

(3) According to Kafka process number, check Kafka heap memory

[atguigu@hadoop102 kafka]$ jmap -heap 2321

… …

Heap Usage:

G1 Heap:

regions = 2048

capacity = 2147483648 (2048.0MB)

used = 246367744 (234.95458984375MB)

free = 1901115904 (1813.04541015625MB)

11.472392082214355% used

2) Page cache:

The page cache is the memory of the Linux system server. We only need to ensure that 25% of the data in 1 segment (1g) is in memory.

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如10个分区,页缓存大小=(10 * 1g * 25%)/ 3 1g
Page cache size per node =(number of partitions * 1g * 25%)/number of nodes. For example, 10 partitions, page cache size =(10 * 1g * 25%)/ 3 ≈ 1g

Recommended server memory is greater than or equal to 11G.

1.5.29 CPU选择
1.5.29 CPU Selection

1) Default configuration = 8 负责写磁盘的线程数。 = 8 Number of threads responsible for writing to disk.

num.replica.fetchers = 1 副本拉取线程数。
num.replica.fetchers = 1 Number of replica pull threads. = 3 数据传输线程数。
num. network.threads = 3 Number of data transfer threads.

2) Recommended configuration

In addition, there are some other threads in the background, such as cleaning up the data thread, Controller is responsible for sensing and controlling the threads of the entire cluster, etc., so that each Broker will have hundreds of threads. According to experience, 4-core CPU processing dozens of threads will be full at peak times, 8 cores are barely enough, and considering that other services must be run on the cluster, so it is generally recommended that Kafka servers be deployed at more than 16 cores to handle one or two hundred threads of work. If conditions permit, it is better to give 24 cores or even 32 cores. = 16 负责写磁盘的线程数。 = 16 Number of threads responsible for writing to disk.

num.replica.fetchers = 2 副本拉取线程数。
num.replica.fetchers = 2 Number of replica pull threads. = 6 数据传输线程数。
num. network.threads = 6 Number of data transfer threads.

服务器建议购买 32核CPU
Server recommends buying 32-core CPU

1.5.30 网络选择
1.5.30 Network Selection

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
Network bandwidth = peak throughput ≈ 20MB/s, select Gigabit NIC.

100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s
100 Mbps/s, 10 M/s, 1 byte = 8 bit, 100 Mbps/8 = 12.5M/s.

General 100 Mbps network card (100Mbps=12.5m/s), Gigabit network card (1000Mbps=125m/s), 10 Gigabit network card (1250m/s).

一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
General 100 Mbps network card (100Mbps), Gigabit network card (1000Mbps), 10 Gigabit network card (10000Mbps). 100Mbps unit is bit;10M/s unit is byte ; 1 byte = 8 bit, 100Mbps/8 = 12.5 M/s.

Usually choose gigabit or 10 gigabit network card.

1.5.31 Kafka挂掉

In a production environment, if a Kafka node fails.

Normal treatment method:

(1) Look at the log first, try to restart it, if it can start normally, then solve it directly.

(2) If the restart does not work, check the memory, CPU, and network bandwidth. Tuning => Tuning does not increase resources

(3) If the entire Kafka node is deleted by mistake, if the number of copies is greater than or equal to 2, a new node can be re-served in the same way as the new node, and Load Balancer can be executed.

1.5.32 Kafka的机器数量
1.5.32 Number of machines in Kafka

1.5.33 服役新节点退役旧节点
1.5.33 New nodes in service Retired old nodes

The service and retirement nodes can be accessed via the bin/ script.

1.5.34 Kafka单条日志传输大小
1.5.34 Kafka Single Log Transfer Size

The default size of Kafka for message body is 1M, but in our application scenario, there will often be a message larger than 1M, if Kafka is not configured. Then there will be producers unable to push messages to Kafka or consumers unable to consume data in Kafka, then we need to configure Kafka as follows:

name of parameter



Default 1m, maximum number of messages received by Broker.


Default 1m, maximum value of each request message sent by producer to Broker. Sets the size of the message body for the Topic level.


Default 1m, copy sync data, maximum per batch message.


默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).

1.5.35 Kafka参数优化
1.5.35 Kafka parameter optimization

Key tuning parameters:

(1)buffer.memory 32m


(3)linger.ms默认0 调整 5-100ms

(4)compression.type采用压缩 snappy

(5) The consumer adjusts the fetch.max.bytes size, which defaults to 50m.

(6) The consumer adjusts the size of max.poll.records, the default is 500.


(8)Kafka堆内存建议每个节点:10g ~ 15g
(8) Kafka heap memory recommendations per node: 10g ~ 15g


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"


(9) Increase CPU core count = 8 负责写磁盘的线程数 = 8 Number of threads responsible for writing to disk

num.replica.fetchers = 1 副本拉取线程数
num.replica.fetchers = 1 Number of replica pull threads = 3 数据传输线程数
num. network.threads = 3 Number of data transfer threads

(10)日志保存时间log.retention.hours 3
(10) Log retention time log.retention.hours 3 days

(11) Number of copies, adjusted to 2

1.6 Hive

1.6.1 Hive的架构
1.6.1 Hive's architecture

1.6.2 HQL转换为MR流程
1.6.2 HQL to MR Process

SQLParser: converts SQL strings into abstract syntax trees (AST)

(2)语义分析器(Semantic Analyzer):将AST进一步抽象为QueryBlock(可以理解为一个子查询划分成一个QueryBlock)
(2) Semantic Analyzer: AST is further abstracted into QueryBlock (can be understood as a subquery divided into a QueryBlock)

(2)逻辑计划生成器(Logical Plan Gen):由QueryBlock生成逻辑计划
(2) Logical Plan Gen: Generate logical plans from QueryBlock

(3)逻辑优化器(Logical Optimizer):对逻辑计划进行优化
(3) Logical Optimizer: Optimize the logical plan

(4)物理计划生成器(Physical Plan Gen):根据优化后的逻辑计划生成物理计划
(4) Physical Plan Gen: Generate physical plan according to optimized logical plan.

(5)物理优化器(Physical Optimizer):对物理计划进行优化
(5) Physical Optimizer: Optimize the physical plan

(6) Execution: execute the plan, get the query result and return it to the client.

1.6.3 Hive和数据库比较
1.6.3 Hive vs. Database Comparison

Hive 和数据库除了拥有类似的查询语言,再无类似之处。
Hive and databases have nothing in common except a similar query language.

1) Data storage location

Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
Hive is stored in HDFS. Databases store data in block devices or local file systems.

2) Data update

Hive does not recommend rewriting data. The data in the database usually needs to be modified frequently.

3) Implementation delay

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
Hive has higher execution latency. Database execution latency is low. Of course, this is conditional, that is, the data size is small, when the data size is large enough to exceed the processing capacity of the database, Hive's parallel computing can obviously show its advantages.

4) Data size

Hive supports very large-scale data computations; databases can support smaller data scales.

1.6.4 内部表和外部表
1.6.4 Internal and external tables

Metadata, raw data

1) When deleting data

Internal tables: metadata, raw data, delete all

外部表:元数据 只删除
External tables: metadata deleted only

2) In a company production environment, when are internal tables created and when are external tables created?

Most scenarios in a company are external tables.

Internal tables are created only when temporary tables are used by oneself;

1.6.5 系统函数
1.6.5 System functions

1) numerical function

round round

2) String functions

(1) substring: intercept string;(2) replace: replace;(3) regexp_replace: regular replacement

(4) regexp: regular matching;(5) repeat: repeated string;(6) split: string cutting

(7) nvl: replace null value;(8) concat: concatenate string;

(9) concat_ws: concatenates strings or arrays of strings with specified delimiters;

(10) get_json_object: Parse JSON string

3) Date function

unix_timestamp: Returns the timestamp of the current or specified time

(2)from_unixtime:转化UNIX时间戳(从 1970-01-01 00:00:00 UTC 到指定时间的秒数)到当前时区的时间格式
(2) from_unixtime: converts the UNIX timestamp (seconds from 1970-01-01 00:00:00 UTC to the specified time) to the time format of the current time zone

(3) current_date: current date

(4) current_timestamp: the current date plus time, and accurate milliseconds

(5) month: month of the acquisition date;(6) day: day of the acquisition date

datediff: the number of days between two dates (the end date minus the start date)

(8) date_add: date plus days;(9) date_sub: date minus days

(10) date_format: Parses a standard date into a string in a specified format

4) Process control function

(1)case when:条件判断函数
(1) case when: conditional judgment function

(2) if: conditional judgment, similar to the ternary operator in Java

5) Set function

(1) array: declared array collection

(2) Map: Create a map collection

(3) named_struct: Declares the attributes and values of struct

size: the number of elements in the collection

(5) map_keys: returns the key in the map


array_contains: determines whether an array contains an element

sort_array: sort elements in an array

6) Aggregate function

(1) collect_list: collect and form a list collection, the result is not repeated

(2) collect_set: collect and form a set, and remove the duplicate results

1.6.6 自定义UDF、UDTF函数
1.6.6 Custom UDF, UDTF Functions

1) Have UDF and UDTF functions been customized in the project, and what problems have been solved with them, and custom steps?

(1) At present, if the logic in the project is not particularly complex, custom UDF and UDTF are not used.

(2) Custom UDF: Inheriting G.. UDF, override core method evaluate

(3) Custom UDTF: inherit from GenericUDTF, rewrite 3 methods: initialize (customize the column name and type of output), process (return the result forward (result)), close

2) Under what circumstances do UDF/UDTF usually be used in enterprises?

(1) Because of the custom function, you can print out any calculation process inside the custom function for debugging.

(2) When introducing third-party jar packages, it is also required.

3)广告数仓中解析IP/UA使用hi ve的自定义函数
3) Analyze IP/UA in advertising warehouse using hive custom function

解析IP:可以调用接口/ 使用离线的IP2Region数据库
Resolving IP: interfaces can be invoked/offline IP2 Region databases used

解析UA:正则/ Hutool解析UA的工具类
UA Resolution: Regular/ Hutool Tools for UA Resolution

1.6.7 窗口函数
1.6.7 Window functions

Handwriting generally appears in scene questions: grouping TopN, row to column, column to column.

According to the function, common windows can be divided into the following categories: aggregation function, cross-line value function, ranking function.

1) aggregation function

Max: Maximum value.

min: Minimum value.

sum: sum.

AVG: Average.

Count: Counting.

2) Cross-line value function


Note: The lag and lead functions do not support custom windows.


3) Ranking function

注:rank 、dense_rank、row_number不支持自定义窗口。
Note: rank, dense_rank, row_number do not support custom windows.

1.6.8 Hive优化
1.6.8 Hive optimization 分组聚合 Group Aggregation

一个分组聚合的查询语句,默认是通过一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
A grouped aggregate query statement, by default, is completed through a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal.

分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个Hash Table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至Reduce端,完成最终的聚合。
The optimization of grouping aggregation mainly revolves around reducing the amount of Shuffle data, and the specific method is map-side aggregation. Map-side aggregation is to maintain a Hash Table on the map side, use it to complete partial aggregation, and then send the results of partial aggregation to the Reduce side according to the grouping fields to complete the final aggregation.

The relevant parameters are as follows:

--Enable map-side aggregation, default is true


--Used to detect whether the source table data is suitable for map-side aggregation. The detection method is as follows: map-side aggregation is carried out on several pieces of data at first; if the ratio of the number of pieces after aggregation to the number of pieces before aggregation is less than the value, the table is considered suitable for map-side aggregation; otherwise, the table data is considered not suitable for map-side aggregation, and subsequent data will not be map-side aggregated.


--Number of entries used to detect whether the source table fits into map-side aggregation.

set hive.groupby.mapaggr.checkinterval=100000;

--map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
--map-side The hash table used for aggregation, occupying the maximum proportion of map task heap memory. If this value is exceeded, the hash table will be flushed once.

set; Map Join

Hive中默认最稳定的Join算法是Common Join。其通过一个MapReduce Job完成一个Join操作。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
The most stable Join algorithm in Hive is Common Join by default. It completes a Join operation with a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

优化Join的最为常用的手段就是Map Join,其可通过两个只有Map阶段的Job完成一个join操作。第一个Job会读取小表数据,将其制作为Hash Table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。
The most common way to optimize Join is Map Join, which can complete a join operation through two jobs with only Map phases. The first Job reads the small table data, makes it into a Hash Table, and uploads it to Hadoop distributed cache (essentially HDFS). The second Job reads the small table data from the distributed cache and caches it in the memory of the Map Task, then scans the large table data, so that the association operation can be completed on the map side.

注:由于Map Join需要缓存整个小标的数据,故只适用于大表Join小表的场景。
Note: Map Join needs to cache the data of the whole small table, so it is only applicable to the scene of large table Join small table.

The relevant parameters are as follows:

--启动Map Join自动转换
--Start Map Join automatic conversion


--开启无条件转Map Join
--Open Unconditional Map Join


--无条件转Map Join小表阈值,默认值10M,推荐设置为Map Task总内存的三分之一到二分之一
--Unconditional Map Join table threshold, default value 10M, recommended to set to 1/3 to 1/2 of the total memory of Map Task

set; SMB Map Join

上节提到,Map Join只适用于大表Join小表的场景。若想提高大表Join大表的计算效率,可使用Sort Merge Bucket Map Join。
As mentioned in the previous section, Map Join is only applicable to scenarios where large tables Join small tables. To improve the computational efficiency of large table Join large table, use Sort Merge Bucket Map Join.

需要注意的是SMB Map Join有如下要求:
SMB Map Join has the following requirements:

(1) All tables participating in Join are bucket tables, and bucket fields are associated fields of Join.

(2) The number of barrels in the two tables is multiple.

(3) The data is ordered in buckets according to associated fields.

SMB Join的核心原理如下:只要保证了上述三点要求的前两点,就能保证参与Join的两张表的分桶之间具有明确的关联关系,因此就可以在两表的分桶间进行Join操作了。
The core principle of SMB Join is as follows: As long as the first two points of the above three requirements are guaranteed, there can be a clear association between the buckets of the two tables participating in Join, so you can perform Join operation between the buckets of the two tables.

若能保证第三点,也就是参与Join的数据是有序的,这样就能使用数据库中常用的Join算法之一——Sort Merge Join了,Merge Join原理如下:
If you can ensure that the third point, that is, the data participating in the Join is ordered, you can use one of the Join algorithms commonly used in the database-Sort Merge Join. The principle of Merge Join is as follows:

在满足了上述三点要求之后,就能使用SMB Map Join了。
After meeting the above three requirements, you can use SMB Map Join.

由于SMB Map Join无需构建Hash Table也无需缓存小表数据,故其对内存要求很低。适用于大表Join大表的场景。
SMB Map Join has low memory requirements because it does not need to build Hash tables or cache small table data. For scenarios where large tables Join large tables. Reduce并行度 Reduce parallelism

Reduce端的并行度,也就是Reduce个数,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
The parallelism of the Reduce side, that is, the number of Reduces, can be specified by the user himself, or Hive can estimate it according to the file size input by the MR Job.

The relevant parameters of parallelism on the Reduce side are as follows:

--Specifies the reduce-side parallelism, the default value is-1, indicating that the user does not specify

set mapreduce.job.reduces;

--Reduce maximum parallelism

set hive.exec.reducers.max;

--单个Reduce Task计算的数据量,用于估算Reduce并行度
--The amount of data for a single Reduce Task computation, used to estimate Reduce parallelism

set hive.exec.reducers.bytes.per.reducer;

The logic for determining reduce-side parallelism is as follows:

If the value of the specified parameter mapreduce.job.reduces is a non-negative integer, the Reduce parallelism is the specified value. Otherwise, Hive estimates the Reduce parallelism by itself. The estimation logic is as follows:

Assuming the file size of Job input is totalInputBytes



Then the parallelism of the Reduce side is:


根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。
From the above description, it can be seen that Hive estimates the Reduce parallelism by itself based on the file size of the entire MR Job input. Therefore, in some cases, the estimated parallelism may not be accurate, and the user needs to specify the Reduce parallelism according to the actual situation.

It should be noted that if Tez or Spark engine is used, Hive can estimate the Reduce parallelism based on computational statistics, and the estimated result is relatively accurate. 小文件合并 Small file merge

If Hive's Reduce parallelism setting is unreasonable, or the estimation is unreasonable, it may lead to a large number of small files in the calculation result. This problem can be solved by the small file merge task. The principle is to judge according to the average size of the output file of the calculation task. If the condition is met, an additional task will be started separately for merging.

The relevant parameters are:

--开启合并map only任务输出的小文件
--Open merge map only task output small file

set hive.merge.mapfiles=true;

--开启合并map reduce任务输出的小文件
--Open small files for merge map reduce task output

set hive.merge.mapredfiles=true;

--Combined file size

set hive.merge.size.per.task=256000000;

--Threshold value for triggering small file merge task. If the average file size output by a calculation task is lower than this value, merge will be triggered.

set hive.merge.smallfiles.avgsize=16000000; 谓词下推 Predicate Push Down

谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。开启谓词下推优化后,无需调整SQL语句,Hive就会自动将过滤操作尽可能的前移动。
Predicate pushdown refers to moving the filtering operation forward as much as possible to reduce the amount of data in subsequent calculation steps. When predicate pushdown optimization is enabled, Hive automatically moves filtering operations as far forward as possible without adjusting SQL statements.

The relevant parameters are:

--是否启动谓词下推(predicate pushdown)优化
--Whether to start predicate pushdown optimization

set hive.optimize.ppd = true; 并行执行 Parallel Execution

Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。相关参数如下:
Hive transforms an SQL statement into one or more stages, one MR Job per Stage. By default, Hive performs only one Stage at a time. However, a SQL statement may contain multiple Stages, but these Stages may not be completely interdependent, meaning that some Stages can be executed in parallel. Parallel execution is referred to here as parallel execution of these stages. The relevant parameters are as follows:

--Enable parallel execution optimization, default is off

set hive.exec.parallel=true;  

--Maximum parallelism allowed for the same sql, default is 8

set hive.exec.parallel.thread.number=8; CBO优化 CBO Optimization

CBO是指Cost based Optimizer,即基于计算成本的优化。
CBO stands for Cost Based Optimizer, i.e. optimization based on computational costs.

在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在Hive的MR引擎下主要用于Join的优化,例如多表Join的Join顺序。
In Hive, the computational cost model takes into account: rows of data, CPU, local IO, HDFS IO, network IO, etc. Hive calculates the computational cost of different execution plans for the same SQL statement and selects the execution plan with the lowest cost. At present, CBO is mainly used for Join optimization under Hive's MR engine, such as the Join order of multi-table Join.

The relevant parameters are:

--whether to enable cbo optimization

set hive.cbo.enable=true; 列式存储 Column Storage

ORC column storage is adopted to speed up query.

id name age

1 zs 18

2 lishi 19

行:1 zs 18 2 lishi 19

列:1 2 zs lishi 18 19

select name from user 压缩 Compression

Compression reduces disk IO: Because Hive's underlying compute engine defaults to MR, Snappy compression can be used on the Map output.

Map(Snappy ) Reduce 分区和分桶 Partitions and buckets

(1)创建分区表 防止后续全表扫描
(1) Create partitioned tables to prevent subsequent full table scans

(2)创建分桶表 对未知的复杂的数据进行提前采样
(2) Create bucket tables to sample unknown complex data in advance 更换引擎 Engine Replacement


MR engine: multi-job series, disk-based, more places to drop disk. Although slow, it will definitely run out of results. General treatment, weekly, monthly and annual indicators.

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。
Spark engine: Although it also falls in Shuffle process, not all operators need Shuffle, especially multi-operator process, and the intermediate process does not fall in DAG directed acyclic graph. Both reliability and efficiency are considered. General treatment day indicators.

2) Advantages of Tez Engine

(1) Using DAG to describe tasks reduces unnecessary intermediate nodes in MR, thereby reducing disk IO and network IO.

(2) Cluster resources can be better utilized, such as Container reuse, parallelism of initial tasks calculated according to cluster resources, etc.

(3) The parallelism of subsequent tasks can be dynamically adjusted according to the specific data amount when the task is running. 几十张表join 如何优化 Dozens of tables join How to optimize

(1) Reduce the number of join tables: Without affecting the business premise, you can consider preprocessing and merging some tables to reduce the join operation.

(2)使用Map Join:将小表加载到内存中,从而避免了Reduce操作,提高了性能。通过设置为true来启用自动Map Join。
(2) Use Map Join: Load small tables into memory, thus avoiding the Reduce operation and improving performance. Enable automatic Map Join by setting to true.

(3)使用Bucketed Map Join:通过设置hive.optimize.bucketmapjoin为true来启用Bucketed Map Join。

(4)使用Sort Merge Join:这种方式在Map阶段完成排序,从而减少了Reduce阶段的计算量。通过设置为true来启用。
(4) Sort Merge Join: This method completes the sorting in the Map phase, thus reducing the amount of computation in the Reduce phase. Enable by setting to true.

(5) Control the number of Reduce tasks: Control the number of Reduce tasks by setting hive.exec.reducers.bytes.per.reducer and mapreduce.job.reduces.

(6) Filter unwanted data: Before joining operations, try to filter out unwanted data to improve performance.

(7) Choose the right join order: Putting small tables in front can reduce the amount of data in the middle result and improve performance.

(8) Use zoning: Consider using zoning techniques. Only partition data matching the query criteria needs to be read, reducing the amount of data and computation.

(9) Use compression: By compressing data, you can reduce disk and network IO and improve performance. Pay attention to choosing the appropriate compression format and compression level.

(10) Adjust Hive configuration parameters: According to the hardware resources and actual needs of the cluster, reasonably adjust Hive configuration parameters, such as memory, CPU, IO, etc., to improve performance.

1.6.9 Hive解决数据倾斜方法
1.6.9 Hive Solutions for Data Skew

The data skew problem usually refers to the uneven distribution of data involved in computation, that is, the data volume of a certain key or some keys far exceeds that of other keys, resulting in a large number of data of the same key being sent to the same Reduce in the shuffle stage, which in turn leads to the time required for the Reduce far exceeding that of other Reductions, becoming the bottleneck of the whole task. The following are examples of data skewing in production environments:

Data skew in Hive often occurs in the scenarios of grouping aggregation and join operations. The optimization ideas in the above two scenarios are described below.

1) Data skew caused by grouping aggregation

前文提到过,Hive中的分组聚合是由一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。若group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜。
As mentioned earlier, packet aggregation in Hive is done by a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal. If the values of the group by field are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew.

The data skew problem caused by grouping aggregation can be solved as follows:

(1) Determine whether the value of inclination is null

If the value of tilt is null, consider whether the final result needs this part of data. If not, as long as null is filtered out in advance, the problem can be solved. If you need to retain this data, consider the following ideas.


When Map-Side aggregation is enabled, the data will now be partially aggregated on the Map side. In this way, even if the original data is skewed, after the initial aggregation at the Map end, the data sent to Reduce is no longer skewed. At its best, Map-side aggregation can completely mask the data skew problem.

The relevant parameters are as follows:



set hive.groupby.mapaggr.checkinterval=100000;



Skew-GroupBy is a solution provided by Hive to solve the data skew problem caused by grouping aggregation. The principle is to start two MR tasks. The first MR is partitioned according to random numbers, sending data to Reduce and completing partial aggregation. The second MR is partitioned according to grouping fields to complete final aggregation.

The relevant parameters are as follows:

--Enable group aggregation data skew optimization

set hive.groupby.skewindata=true;

2) Data skew caused by Join

若Join操作使用的是Common Join算法,就会通过一个MapReduce Job完成计算。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
If the Join operation uses the Common Join algorithm, the computation is done via a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

If the values of the associated fields are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew problems.

The data skew problem caused by Join can be solved as follows:

1Map Join

使用Map Join算法,Join操作仅在Map端就能完成,没有Shuffle操作,没有Reduce阶段,自然不会产生Reduce端的数据倾斜。该方案适用于大表Join小表时发生数据倾斜的场景。
Using Map Join algorithm, Join operation can be completed only on Map side, there is no Shuffle operation, there is no Reduce stage, and naturally there will be no data skew on Reduce side. This scenario is suitable for scenarios where data skew occurs when a large table joins a small table.

The relevant parameters are as follows:




(2)Skew Join

若参与Join的两表均为大表,Map Join就难以应对了。此时可考虑Skew Join,其核心原理是Skew Join的原理是,为倾斜的大key单独启动一个Map Join任务进行计算,其余key进行正常的Common Join。原理图如下:
If the two tables participating in the Join are both large tables, Map Join will be difficult to cope with. At this time, we can consider Skew Join, whose core principle is Skew Join. The principle of Skew Join is to start a Map Join task for inclined large keys separately, and the rest of the keys are used for normal Common Join. The schematic diagram is as follows:

The relevant parameters are as follows:

--启用skew join优化
--Enable skew join optimization

set hive.optimize.skewjoin=true;

--触发skew join的阈值,若某个key的行数超过该参数值,则触发
--Threshold for triggering skew join. Trigger if the number of rows in a key exceeds the parameter value.

set hive.skewjoin.key=100000;

3) Adjust SQL statements

If the two tables participating in the Join are both large tables, and the data of one of the tables is skewed, you can also adjust the SQL statement accordingly in the following ways.

Suppose the original SQL statement is as follows: A, B are both large tables, and one of the tables has skewed data.

hive (default)>



from A

join B


The process of joining is as follows:

1001 in the figure is a large tilted key, which can be seen to be sent to the same Reduce for processing.

The SQL statement execution plan after adjustment is shown in the following figure:

Adjust SQL statements as follows:

hive (default)>




select --打散操作
select --scatter operation

concat(id,'_',cast(rand()*2 as int)) id,


from A



select --扩容操作
select --expansion operation

concat(id,'_',1) id,


from B

union all


concat(id,'_',2) id,


from B



1.6.10 Hive的数据中含有字段的分隔符怎么处理?
1.6.10 What about separators in Hive data that contain fields?

Hive 默认的字段分隔符为Ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和JavaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束
Hive's default field delimiter is the Ascii code control character\001 (^A), and fields terminated by '\001' is used when creating tables. Note: If\t or\001 is used as the delimiter, it is required that the data passed from the front-end buried point and JavaEE background must not appear the delimiter, which is constrained by the code specification.

Once the transmitted data contains delimiters, escape or substitution (ETL) is required in the previous level of data. Sqoop and DataX are usually used to preprocess data when synchronizing.

id name age

1 zs 18

2 li分隔符si 19
2 li separator si 19

1.6.11 MySQL元数据备份
MySQL Metadata Backup

Metadata backup (key point, if the data is damaged, the whole cluster may not be able to run, at least two copies should be backed up to other servers after zero point every day).

(1) MySQL backup data script (it is recommended to backup metadata regularly once a day)






# 备份目录,需提前创建
#Backup directory, need to be created in advance


# 备份天数,超过这个值,最旧的备份会被删除
#backup days, beyond which the oldest backup will be deleted


# 备份MySQL数据库
#Backup MySQL database

[ -d "${BACKUP_DIR}" ] || exit 1

mysqldump \

--all-databases \

--opt \

--single-transaction \

--source-data=2 \

--default-character-set=utf8 \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \

-p"${MYSQL_PASSWORD}" | gzip > "${BACKUP_DIR}/$(date +%F).gz"

if [ "$(ls "${BACKUP_DIR}" | wc -l )" -gt "${FILE_ROLL_COUNT}" ]


ls "${BACKUP_DIR}" | sort |sed -n 1p | xargs -I {} -n1 rm -rf "${BACKUP_DIR}"/{}


(2) MySQL Recovery Data Script







# 恢复指定日期,不指定就恢复最新数据
#Restore specified date, restore latest data without specifying


[ "${RESTORE_DATE}" ] && BACKUP_FILE="${RESTORE_DATE}.gz" || BACKUP_FILE="$(ls ${BACKUP_DIR} | sort -r | sed -n 1p)"

gunzip "${BACKUP_DIR}/${BACKUP_FILE}" --stdout | mysql \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \


1.6.12 如何创建二级分区表?
1.6.12 How do I create a secondary partition table?

create table dept_partition2(

deptno int, -- 部门编号
deptno int, --department number

dname string, -- 部门名称
dname string, --Department name


partitioned by (day string, hour string)

row format delimited fields terminated by '\t';

1.6.13 UnionUnion all区别

(1) union will deduplicate the result set of the union

(2)union all不会对结果集去重
(2) union all does not duplicate the result set

1.7 Datax

1.7.1 DataX与Sqoop区别

1) DataX and Sqoop are mainly used for batch synchronous data processing scenarios in offline systems.

2) DataX and Sqoop differ as follows:

(1) DataX bottom layer is single process multithread;Sqoop bottom layer is 4 maps;

(2) Sqoop distributed synchronization is preferred for scenarios with large data volume; DataX is preferred for scenarios with small data volume, which is completely memory-based;DataX has large data volume, and multiple DataX instances can be used, each of which is responsible for a part (manual division).

(3) Sqoop is born for Hadoop and has good compatibility with Hadoop-related components;Datax is plug-in development and supports more Source and Sink.

(4) Sqoop is not officially upgraded and maintained;DataX is currently upgraded and maintained by Ali.

(5) About running logs and statistics, DataX is richer, Sqoop is not easy to collect based on Yarn

1.7.2 速度控制
1.7.2 Speed control

1) The key optimization parameters are as follows:



Total Number of Concurrence


Total record speed limit


Total byte speed limit

Record speed limit of a single channel, default value is 10000 (10000/s)

Byte speed limit of single channel, default value 1024*1024 (1M/s)

2) Priority:

(1)全局Byte限速 / 单Channel Byte限速
(1) Global Byte Speed Limit/Single Channel Byte Speed Limit

(2)全局Record限速 / Channel Record限速
(2) Global Record Speed Limit/Single Channel Record Speed Limit

Set both, take the smaller result

(3) None of the above is set, and the setting of the total number of channels takes effect.

3) Project configuration

只设置 总channel数=5,基本可以跑满网卡带宽。
Just set the total number of channels =5, and you can basically run full network card bandwidth.

1.7.3 内存调整
1.7.3 Memory Adjustments

It is recommended to set the memory to 4G or 8G, which can also be adjusted according to the actual situation.

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
There are two ways to adjust JVM xms xmx parameters: one is to directly change the script; the other is to add the corresponding parameters at startup, as follows:

python datax/bin/ --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

1.7.4 空值处理
1.7.4 Handling of null values

1)MySQL(null) => Hive (\N) 要求Hive建表语句
1) MySQL (null)=> Hive (\N) requires Hive table creation statements

There are two solutions to this problem:

(1)修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考
(1) Modify the source code of DataX HDFS Writer and add logic to customize null value storage format. Please refer to

(2) Specify null values when building tables in Hive to be stored in an empty string (''), for example:

DROP TABLE IF EXISTS base_province;




`name` STRING COMMENT '省份名称',

`region_id` STRING COMMENT '地区ID',

`area_code` STRING COMMENT '地区编码',

`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_code` STRING COMMENT 'Old ISO-3166-2 code for visualization',

`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
`iso_3166_2` STRING COMMENT 'New IOS-3166-2 code for visualization'

) COMMENT '省份表'
) COMMENT 'Province table'



LOCATION '/base_province/';

2Hive(\N => MySQL null

"reader": {

"name": "hdfsreader",

"parameter": {

"defaultFS": "hdfs://hadoop102:8020",

"path": "/base_province",

"column": [



"fileType": "text",

"compress": "gzip",

"encoding": "UTF-8",

"nullFormat": "\\N",

"fieldDelimiter": "\t",



1.7.5 配置文件生成脚本
1.7.5 Profile Generation Scripts

(1) A table a configuration, if there are thousands of tables, how to write the configuration?

(2) Description of script usage

python -d database -t table

1.7.6 DataX一天导入多少数据
1.7.6 DataX How much data is imported in a day

1) The full synchronization table is as follows

Activity table, Offer rule table, Coupon table, SKU platform attribute table, SKU sales attribute table

SPU commodity table (100,000 - 200,000), SKU commodity table (100,000 - 200,000), brand table, commodity primary classification, commodity secondary classification, commodity tertiary classification

Province table, region table

coding dictionary table

All of the above add up to 300,000, which is about 300m.

加购表(每天增量20万、全量100万 =》1g)
Additional purchase table (daily increment of 200,000, total amount of 1 million => 1g)

Therefore, Datax has about 1-2g of data synchronized every day.

注意:金融、保险(平安 、民生银行),只有业务数据数据量大一些。
Note: Finance, insurance (Ping An, Minsheng Bank), only the amount of business data is larger.

2) The incremental synchronization table is as follows

Additional purchase table (200,000), order table (100,000), order details table (150,000), order status table, payment table (90,000), return table (1000), refund table (1000)

Order Details Coupon Association Table, Coupon Collection Table

Product review table, collection table

User Table, Order Details Activity Association Table

Incremental data 1-2g per day

1.7.7 Datax如何实现增量同步
1.7.7 Datax How to achieve incremental synchronization

Get today's new and changed data: filtered by sql, created today or operated on today.

1.8 Maxwell

1.8.1 Maxwell与Canal、FlinkCDC的对比

1) FlinkCDC, Maxwell and Canal are mainly used for real-time data synchronization processing scenarios in real-time systems.




SQL and Data Number Relationship

SQL affects a few items appear a few items

SQL affects a few items appear a few items

Only one whole (may need to be blown later)

Data initialization function (synchronous full data)

Yes (support multi-database and multi-table simultaneous operation)

Yes (single table)

breakpoint resume function

Yes (on CK)

Yes (MySQL exists)

Yes (local)

1.8.2 Maxwell好处

Support breakpoint resume.

Full initialization synchronization.

Automatically send data to the corresponding topic of Kafka based on the library name and table name.

1.8.3 Maxwell底层原理
1.8.3 Maxwell's underlying principles

MySQL master-slave replication.

1.8.4 全量同步速度如何
1.8.4 How about full synchronous speed

Slow synchronization speed, full synchronization is recommended to use Sqoop or DataX.

1.8.5 Maxwell数据重复问题
1.8.5 Maxwell data duplication problem

When synchronizing historical data, bootstrap scans all data.

Maxwell also listens for binlog changes.

For example, when synchronizing the historical database with bootstrap, a new piece of data is inserted into the historical database. At this time, bootstrap scans and maxwell processes monitor it. At this time, the data duplication problem will occur.

1.9 DolphinScheduler调度器

1.3.9 Version, support email, enterprise WeChat.

2.0.3 Version, support alarm information more complete, easier configuration.

3.0.0 The above versions support data quality monitoring.

1.9.1 每天集群运行多少指标?
1.9.1 How many metrics does the cluster run per day?

Run more than 100 indicators every day, and run about 200 when there is activity.

1.9.2 任务挂了怎么办?
1.9.2 What if the mission fails?

(1) E-mail, nail and integrated automatic phone call will be sent if the operation succeeds or fails.

(2) The main solution is to look at the log and solve the problem.

(3) Alarm website Ruixiangyun,

(4) Double 11 and 618 activities require 24-hour duty

1.9.3 What happens when DS dies?

See the log error reason: direct restart, insufficient resources to increase resources in the restart

1.10 Spark Core & SQL

1.10.1 Spark运行模式
1.10.1 Spark operating mode

Local: Running on a machine. For testing.

(2)Standalone是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
Standalone: A scheduling system for Spark itself. When cluster performance requirements are very high. It is rarely used domestically.

(3)Yarn:采用Hadoop的资源调度器 国内大量使用。
Yarn: Resource scheduler with Hadoop. Large domestic use.

Yarn-client mode: Driver runs on Client (not AM)


(4) Mesos: rarely used in the country.

(5) K8S: Trend, but immature at present, too much configuration information is required.

1.10.2 Spark常用端口号
1.10.2 Common port numbers for Spark

(1)4040 spark-shell任务端口
(1) 4040 spark-shell task port

(2)7077 内部通讯端口。类比Hadoop的8020/9000
(2) 7077 internal communication port. Analogy of Hadoop 8020/9000

(3)8080 查看任务执行情况端口。 类比Hadoop的8088
(3) Port 8080 to view task execution status. Analogy of Hadoop 8088

(4)18080 历史服务器。类比Hadoop的19888
(4) 18080 History Server. Analogy of Hadoop 19888

Note: Since Spark is only responsible for computing, there is no port 9870/50070 for storing data in Hadoop.

1.10.3 RDD五大属性
1.10.3 RDD Five Attributes

1.10.4 RDD弹性体现在哪里
1.10.4 Where is RDD Resilience

It is mainly manifested as storage elasticity, calculation elasticity, task elasticity and data location elasticity, as follows:

(1) Automatic memory and disk switching

(2) Efficient fault tolerance based on lineages

(3) Task will retry a certain number of times if it fails

(4) Stage automatically retries a certain number of times if it fails, and only failed fragments will be calculated.

(5)Checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
(5) Checkpoint [every RDD operation will generate a new RDD, if the chain is long, the calculation is heavy, put the data in the hard disk] and persist [data multiplexing in memory or disk](checkpoint, persistence)

(6)数据调度弹性:DAG Task 和资源管理无关
(6) Data scheduling flexibility: DAG Task is independent of resource management

(7) Highly elastic reparations of data fragmentation

1.10.5 Spark的转换算子(8个)
1.10.5 Conversion operators of Spark (8)
























1.10.6 Spark的行动算子(5个)
1.10.6 Spark's Action Operators (5)