第1章 核心技术
Chapter 1: The Core Technology

1.1 Linux&Shell

1.1.1 Linux常用高级命令
1.1.1 Linux Common Advanced Commands



command interpretation



Real-time display of resource usage (CPU, memory, and execution time) of various processes in the system


jmap -heap 进程号
jmap -heap process number

View a process memory


free -m

View system memory usage


ps -ef

viewing process


netstat -tunlp | grep 端口号

View port occupancy


du -sh 路径*
du -sh path *

View disk usage under path

例如:$ du -sh /opt/*
For example: $ du -sh /opt/*


df -h

View disk storage

1.1.2 Shell常用工具及写过的脚本
1.1.2 Shell Common Tools and Written Scripts


2) What scripts have been written in Shell?

(1) Cluster startup, distribution script


case $1 in


for i in hadoop102 hadoop103 hadoop104


ssh $i "绝对路径"
ssh $i "absolute path"






2)数仓层级内部的导入:ods->dwd->dws ->ads
(2) Import within warehouse hierarchy: ods->dwd->dws ->ads


②定义变量 APP=gmall
② Define variable APP=gmall

③ Acquisition time

传入 按照传入时间
incoming by incoming time

不传 T+1
No T+1


先按照当前天 写sql => 遇到时间 $do_date 遇到表 {$APP}.
First write sql => encounter time $do_date encounter table {$APP} according to the day before yesterday.

自定义函数 UDF UDTF {$APP}.


Execution of SQL

1.1.3 Shell中单引号和双引号区别
1.1.3 Shell Single Quotation and Double Quotation Difference

1) Create a file at/home/atguigu/bin

[atguigu@hadoop102 bin]$ vim

Add the following to the file



echo '$do_date'

echo "$do_date"

echo "'$do_date'"

echo '"$do_date"'

echo `date`

2) View execution results

[atguigu@hadoop102 bin]$ 2022-02-10





2022年 05月 02日 星期四 21:02:08 CST
Thursday May 2nd, 2022 21:02:08 CST

3) Summary:

(1) Single quotes do not take variable values

(2) Double quotes take variable values

(3) Back quotes `, execute the command in quotes

(4) Double quotation marks nested within single quotation marks, take out variable values

(5) Double quotation marks nested inside single quotation marks, do not take out variable values

1.2 Hadoop

1.2.1 Hadoop常用端口号
1.2.1 Common port numbers for Hadoop



Access HDFS port



Access MR Performance Port



history server



Client Access Cluster Port



1.2.2 HDFS读流程写流程
1.2.2 HDFS Read and Write Flow

Note: When HDFS writes the process, how does a dataNode hang up?

当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认,客户端会通知NameNode,NameNode检查并确认该块的副本与规定的不符,NameNode会通知闲置的DataNode去复制副本,并将挂掉的DataNode作下线处理。挂掉的DataNode节点恢复后, 删除该节点中曾经拷贝的不完整副本数据。
When a DataNode suddenly hangs up, the client does not receive the ack acknowledgement sent by the DataNode, and the client notifies the NameNode. The NameNode checks and confirms that the copy of the block does not match the specified copy. The NameNode notifies the idle DataNode to copy the copy, and the suspended DataNode is offline. After the suspended DataNode node is restored, delete the incomplete copy data that was copied in the node.

1.2.3 HDFS小文件处理
1.2.3 HDFS Small File Handling

1) What impact will it have?

(1) Storage level

1 file block, occupying 150 bytes of namenode memory

128G能存储多少文件块? 128 g* 1024m*1024kb*1024byte/150字节 = 9.1亿文件块
How many file blocks can 128G store? 128 g* 1024m*1024kb*1024 bytes/150 bytes = 910 million file blocks

(2) Calculation level

Each small file will play a MapTask, 1 MapTask default memory 1G. Waste of resources.

2) How to solve

(1) Use har filing method to file small files


(3) Write your own MR program to merge the small files generated into one large file. If it's Hive or Spark, there's a merge feature that automatically helps us merge.

(4) There are small file scenarios to enable JVM reuse; if there are no small files, do not enable JVM reuse, because it will occupy the used Task card slot until the task is completed.

JVM reuse enables JVM instances to be reused N times in the same job, and the value of N can be configured in Hadoop's mapred-site.xml file. Usually between 10-20.




<description>How many tasks to run per jvm,if set to -1 ,there is no limit</description>


1.2.4 HDFS的NameNode内存

1) Hadoop 2.x series, configuration NameNode default 2000m

2) Hadoop 3.x series, configuration NameNode memory is dynamically allocated

NameNode memory minimum of 1G, each increase of 1 million file blocks, an increase of 1G memory.

1.2.5 Shuffle及优化
1.2.5 Shuffle and optimization

1.2.6 Yarn工作机制
1.2.6 Yarn working mechanism

1.2.7 Yarn调度器
1.2.7 Yarn Scheduler

1) Hadoop scheduler is divided into three important categories

FIFO、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
FIFO, Capacity Scheduler, and Fair Scheduler.

Apache's default resource scheduler is Capacity Scheduler.

The default resource scheduler for CDH is the fair scheduler.

2) Difference

FIFO调度器:支持单队列 、先进先出 生产环境不会用。
FIFO scheduler: supports single queue, FIFO production environment will not be used.

Capacity Scheduler: Supports multiple queues. Queue resource allocation: priority is given to the queue with the lowest resource occupancy rate; job resource allocation: resources are allocated according to job priority and submission time order; container resource allocation: local principle (same node/same rack/different nodes and different racks)

Fair scheduler: Support multiple queues to ensure that each task has equal access to queue resources. When resources are insufficient, they can be allocated according to the shortfall.

3) How to choose in the production environment?

Dachang: If the concurrency requirements are relatively high, choose fairness, and require the server performance to be OK.

Small and medium-sized companies, cluster server resources are not sufficient to choose capacity.

4) How do you create queues in a production environment?

(1) The scheduler defaults to one default queue, which cannot meet the production requirements.

(2) According to departments: business department 1, business department 2.

(3) According to the business module: login registration, shopping cart, order.

5) What are the benefits of creating multiple queues?

(1) Worried that employees might not be careful, write recursive endless loop code that exhausts all resources.

(2) Implement the degraded use of tasks, and ensure that important task queue resources are sufficient in special periods.

Business Unit 1 (Important)= Business Unit 2 (More Important)= Orders (General)= Shopping Cart (General)= Login Registration (Secondary)

1.2.8 HDFS块大小
1.2.8 HDFS Block Size

1) Block size

1.x 64m

2.x 3.x 128m

本地 32m
Local 32m

企业 128m 256m 512m

2) Block Size Determinants

disk read/write speed

普通的机械硬盘 100m/s => 128m
Ordinary mechanical hard disk 100m/s => 128m

固态硬盘普通的 300m/s => 256m
Solid state drive ordinary 300m/s => 256m

内存镜像 500-600m/s => 512m
Memory Mirrors 500-600m/s => 512m

1.2.9 Hadoop脑裂原因及解决办法?
1.2.9 Hadoop Brain Split Causes and Solutions?

1) Causes of brain splitting

Leader failure, the system began to change dynasties, when the Follower completed all work and became the Leader, the original Leader revived (its failure may be temporarily disconnected or the system temporarily slowed down, unable to respond in time, but its NameNode process is still in), and for some reason its corresponding ZKFC did not set it to Standby, so the original Leader still thinks it is the Leader, the client will still respond to requests sent to it, so the brain split occurs.

2) Hadoop usually does not show brain splitting.

If there is a split brain, it means that multiple Namenode data are inconsistent, and only one of the data can be selected to retain. For example, there are three Namenode, namely nn1, nn2, nn3, there is a brain split, want to keep nn1 data, the steps are:

(1) Close nn2 and nn3

(2)在nn2和nn3节点重新执行数据同步命令:hdfs namenode -bootstrapStandby
(2) Re-execute the data synchronization command at nn2 and nn3 nodes: hdfs namenode -bootstrapStandby

(3) Restart nn2 and nn3

1.3 Zookeeper

1.3.1 常用命令
1.3.1 Common commands


1.3.2 选举机制
1.3.2 Electoral mechanisms

半数机制(过半机制):2n + 1,安装奇数台。
Half mechanism (half mechanism): 2n + 1, odd number of stations installed.

10 servers: 3.

20 servers: 5.

100 servers: 11.

More numbers, advantages: improve reliability; disadvantages: affect communication delay.

1.3.3 Zookeeper符合法则中哪两个?
1.3.3 Which two of Zookeeper's laws fit?

1.3.4 Zookeeper脑裂

Zookeeper uses a majority voting mechanism to prevent brain splitting.

1.3.5 Zookeeper用来干嘛了
1.3.5 What is Zookeeper Used For?

(1)作为HA的协调者:如 HDFS的HA、YARN的HA。
(1) As HA coordinator: such as HA of HDFS, HA of YARN.

(2) Dependent on components: such as Kafka, HBase, CK.

1.4 Flume

1.4.1 Flume组成,Put事务,Take事务
1.4.1 Flume Composition, Put Transaction, Take Transaction

1)Taildir Source

(1) Breakpoint continuation, multi-directory

(2) taildir underlying principle

(3) What if Taildir dies?

No loss: breakpoint resume

Duplicate data: Possible

(4) Existing problems and solutions

1 Question:

新文件判断条件 = iNode值 + 绝对路径(包含文件名)
New file judgment condition = iNode value + absolute path (including file name)

Log frame modified file name ="in the early morning causing rereading of yesterday's data

② Solution:

Option 1: It is recommended that the generated file name be dated. At the same time, configure the log generation framework to be unrenamed;

Option 2: Modify the TairDirSource source code, only determine the file according to the iNode value

Modify source video address:

2)file channel /memory channel/kafka channel

(1)File Channel

Data is stored on disk, advantages: high reliability; disadvantages: low transmission speed

Default capacity: 1 million events

Note: FileChannel can increase Flume throughput by configuring dataDirs to point to multiple paths, each path corresponding to a different hard disk.

(2)Memory Channel

Data storage in memory, advantages: fast transmission speed; disadvantages: poor reliability

Default capacity: 100 events

(3)Kafka Channel

Data stored in Kafka, disk-based;

Advantages: high reliability;

传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段
Fast transmission speed Kafka Channel is greater than Memory Channel + Kafka Sink

(4) How to choose the production environment

如果下一级是Kafka,优先选择Kafka Channel。
If the next level is Kafka, Kafka Channel is preferred.

如果是金融、对钱要求准确的公司,选择File Channel。
If you are a financial company, select File Channel.

如果就是普通的日志,通常可以选择Memory Channel。
If it is a normal log, you can usually choose Memory Channel.

每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?
Millions of pb-level billionaires lose data every day. Will they pick up a dollar?

3)HDFS Sink

(1)时间(半个小时) or 大小128m 且 设置Event个数等于0,该值默认10
(1) Time (half an hour) or Size 128m and set the number of Events equal to 0, which defaults to 10

具体参数:hdfs.rollInterval=1800,hdfs.rollSize=134217728 hdfs.rollCount=0

4) Business


Channel to Sink is Take Business

1.4.2 Flume拦截器
1.4.2 Flume interceptor

1) Interceptor precautions

(1) Timestamp interceptor: mainly to solve the zero drift problem

2) Custom interceptor steps

(1)实现 Interceptor

(2) Rewrite four methods

initialize 初始化

public Event intercept(Event event) 处理单个Event

public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)

close method

(3) Static inner class, implement Interceptor.Builder

3) Can the interceptor be used?

Timestamp interceptors are recommended. If you don't need to delay 15-20 minutes to process data, it's more troublesome.

1.4.3 Flume Channel选择器

Replicating: Default selector. Function: Send data to all channels at the next level.

Multiplexing: selective routing to specified channels.

1.4.4 Flume监控器
1.4.4 Flume Monitor

1) Monitoring abnormal phenomenon

With Ganglia Monitor, Flume attempts to submit far more than the number of successful attempts, indicating that Flume is running poorly. Mainly due to insufficient memory.

2) The solution?

(1)自身:默认内存是20m,考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g
(1) itself: default memory is 20m, consider increasing flume memory, modify flume memory to 4-6g in configuration file

(2) Find friends: increase the number of servers

搞活动 618 =》增加服务器 =》用完在退出
Engage in activity618 =》Add server =》Run out in exit

Log server configuration: 8-16g memory, 8T disk

1.4.5 Flume采集数据会丢失吗?
1.4.5 Is Flume collecting data lost?

如果是kafka channel 或者FileChannel不会丢失数据,数据存储可以存储在磁盘中。
If the kafka channel or FileChannel does not lose data, the data store can be stored on disk.

MemoryChannel may be lost.

1.4.6 Flume如何提高吞吐量
1.4.6 How Flume Increases Throughput

调整taildir sourcebatchSize大小可以控制吞吐量,默认大小100个Event。
Adjust the batchSize of taildir source to control throughput, the default size is 100 Events.

The bottleneck in throughput is usually network bandwidth.

1.5 Kafka

1.5.1 Kafka架构

Producer, Broker, Consumer, Zookeeper.

注意:Zookeeper中保存Broker id和controller等信息,但是没有生产者信息。
Note: Zookeeper stores Broker id and controller information, but no producer information.

1.5.2 Kafka生产端分区分配策略
1.5.2 Kafka production partition allocation strategy

Kafka officially implemented three partitions for us, namely DefaultPartition (the default partition used when no partition is specified), UniformStickyPartition, RoundRobinPartition.

1) DefaultPartition

The following figure illustrates the partition allocation policy for the default partitioner:

2) UniformStickyPartitioner Pure sticky partitioner

(1) If the partition number is specified, it will be allocated according to the specified partition number.

(2) If no partition is specified, use sticky partitioner


(1) If a partition is specified in the message, the specified partition is used.

(2) If no partition is specified, the message polls each partition, distributing the data evenly among each partition.

4) Custom Partitioner

自定义分区策略:可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口,重写 partition 方法来达到自定义分区效果。
Custom partitioning policies: Custom partitioning can be achieved by implementing the org.apache.kafka.clients.producer.Partitioner interface and overriding the partition method.

For example, if we want to implement random allocation, we only need the following code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return ThreadLocalRandom.current().nextInt(partitions.size());

Compute the total number of partitions for the topic and return a positive integer less than it randomly.

In a project, if you want to send data from a table in MySQL to a partition. It can be sent with the table name key.

1.5.3 Kafka丢不丢数据
1.5.3 Kafka loses data


acks=0, the producer sends the data regardless, poor reliability, high efficiency;

acks=1, the producer sends the data Leader response, the reliability is medium, the efficiency is medium;

acks=-1, all Follwer responses in the data Leader and ISR queue sent by the producer have high reliability and low efficiency;

In the production environment, acks=0 is rarely used;acks=1, generally used to transmit ordinary logs, allowing individual data to be lost;acks=-1, generally used to transmit data related to money, and for scenarios with high reliability requirements.

2) Broker angle

The number of copies is greater than or equal to 2.


1.5.4 Kafka的ISR副本同步队列
1.5.4 ISR replica synchronization queue for Kafka

ISR(In-Sync Replicas),副本同步队列。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s
ISR (In-Sync Replicas), replica synchronization queue. If a Follower does not send a communication request or synchronization data to the Leader for a long time, the Follower will be kicked out of the ISR. This time threshold is set by the parameter and defaults to 30s.

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
Any dimension exceeding the threshold will remove the Followers from the ISR and store them in the OSR (Outof-Sync Replicas) list. The newly added Followers will also be stored in the OSR first.

Kafka分区中的所有副本统称为AR = ISR + OSR
All copies in the Kafka partition are collectively referred to as AR = ISR + OSR

1.5.5 Kafka数据重复
1.5.5 Duplicate Kafka data

去重 = 幂等性 + 事务
Deduplication = idempotent + transaction

1) The idempotent principle

2) Idempotent configuration parameters

name of parameter



Whether idempotent is enabled, default true, means idempotent is enabled.

Before version 1.0.X, it needs to be set to 1. After version 1.0.X, it should be less than or equal to 5.


Failed retry times, greater than 0


Need to be set to all

3) Kafka transactions have five APIs as follows

// 1初始化事务
// 1 Initialize transactions

void initTransactions();

// 2开启事务
// 2 Open transaction

void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
// 3 Commit consumed offsets within transactions (primarily for consumers)

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

String consumerGroupId) throws ProducerFencedException;

// 4提交事务
// 4 Submission of affairs

void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
// 5 Discard transaction (similar to rollback transaction)

void abortTransaction() throws ProducerFencedException;

4) Summary

1) Producer's perspective

acks设置为-1 (acks=-1)。
acks is set to-1 (acks=-1).

幂等性(enable.idempotence = true) + 事务

(2) Broker-side perspective

分区副本大于等于2 --replication-factor 2)。
Partition replication factor greater than or equal to 2.

ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。
The minimum number of copies of a response in an ISR is greater than or equal to 2 (min.insync.replicas = 2).

(3) Consumers

事务 + 手动提交offset = false)。
Transaction + manual commit offset ( = false).

The destination of consumer output must support transactions (MySQL, Kafka).

1.5.6 Kafka如何保证数据有序or怎么解决乱序
1.5.6 Kafka How to keep data in order or how to solve disorder

1)Kafka 最多只保证单分区内的消息是有序的,所以如果要保证业务全局严格有序,就要设置 Topic 为单分区。
1) Kafka only ensures that messages in a single partition are ordered at most, so if you want to ensure strict order in the global business, you must set Topic to a single partition.

2) How to ensure that the data in a single partition is in order?

Note: The idempotent mechanism ensures that data is ordered as follows:

1.5.7 Kafka分区Leader选举规则
1.5.7 Kafka Divisional Leader Election Rules

ISR中存活为前提,按照AR中排在前面的优先。例如AR[1,0,2]ISR [102],那么Leader就会按照102的顺序轮询。
Survival in ISR is a prerequisite, according to the priority ranked first in AR. For example AR[1,0,2], ISR [1,0,2], then the Leader polls in the order of 1, 0, 2.

1.5.8 Kafka中AR的顺序
1.5.8 Order of AR in Kafka

If Kafka server has only 4 nodes, then set the number of partitions of Kafka to be greater than the number of servers. How to allocate storage replicas at the bottom of Kafka?

1) Create 16 partitions, 3 copies

(1) Create a new Topic named second.

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second

(2) Check the partition and copy situation.

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --describe --topic second

Topic: second4Partition: 0Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 1Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 2Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 3Leader: 3Replicas: 3,0,1Isr: 3,0,1

Topic: second4Partition: 4Leader: 0Replicas: 0,2,3Isr: 0,2,3

Topic: second4Partition: 5Leader: 1Replicas: 1,3,0Isr: 1,3,0

Topic: second4Partition: 6Leader: 2Replicas: 2,0,1Isr: 2,0,1

Topic: second4Partition: 7Leader: 3Replicas: 3,1,2Isr: 3,1,2

Topic: second4Partition: 8Leader: 0Replicas: 0,3,1Isr: 0,3,1

Topic: second4Partition: 9Leader: 1Replicas: 1,0,2Isr: 1,0,2

Topic: second4Partition: 10Leader: 2Replicas: 2,1,3Isr: 2,1,3

Topic: second4Partition: 11Leader: 3Replicas: 3,2,0Isr: 3,2,0

Topic: second4Partition: 12Leader: 0Replicas: 0,1,2Isr: 0,1,2

Topic: second4Partition: 13Leader: 1Replicas: 1,2,3Isr: 1,2,3

Topic: second4Partition: 14Leader: 2Replicas: 2,3,0Isr: 2,3,0

Topic: second4Partition: 15Leader: 3Replicas: 3,0,1Isr: 3,0,1

1.5.9 Kafka日志保存时间
1.5.9 Kafka log retention time

7 days by default; 3 days recommended for production environment.

1.5.10 Kafka过期数据清理
1.5.10 Kafka obsolete data cleanup

There are only two log cleaning strategies: delete and compact.

1) delete log: delete expired data

log.cleanup.policy = delete ,所有数据启用删除策略
log.cleanup.policy = delete, all data enable delete policy

(1) Based on time: open by default. Take the largest timestamp of all records in the segment as the timestamp of the file.

(2) Based on size: off by default. Exceeds the set total log size and deletes the oldest segment.

log.retention.bytes, which defaults to-1 for infinity.

Thinking: If part of the data in a segment is expired and part is not expired, what should be done?

2) Compact log compression

1.5.11 Kafka为什么能高效读写数据
1.5.11 Why Kafka reads and writes data efficiently

1) Kafka itself is a distributed cluster, which can adopt partition technology with high parallelism.

2) Read data using sparse index, you can quickly locate the data to be consumed

3) Sequential write disk

Kafka producer production data, to be written to the log file, the writing process is appended to the end of the file, for sequential writing. Official website data shows that the same disk, sequential write can reach 600M/s, while random write only 100K/s. This has to do with the mechanics of the disk. Sequential writing is faster because it saves a lot of head addressing time.

4)页缓存 + 零拷贝技术
4) Page cache + zero-copy technology

1.5.12 自动创建主题
1.5.12 Automatic Theme Creation

If the Broker side configuration parameter auto.create.topics.enable is set to true (the default is true), then when a producer sends a message to an uncreated topic, a topic with num.partitions (the default is 1) and a replica factor of default.replication.factor (the default is 1) is automatically created. In addition, when a consumer starts reading messages from an unknown topic, or when any client sends metadata requests to an unknown topic, a corresponding topic is automatically created. This unexpected way of creating themes increases the difficulty of theme management and maintenance. Production environments recommend setting this parameter to false.

(1) Sending data to a topic that has not been created five times in advance

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --topic five

>hello world

(2) See details of five themes

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --describe --topic five

1.5.13 副本设定
1.5.13 Number of copies set

Usually we set it to 2 or 3, and many companies set it to 2.

Duplication advantage: improved reliability; duplication disadvantage: increased network IO transmission.

1.5.14 Kakfa分区数
1.5.14 Number of Kakfa partitions

(1) Create a Topic with only one partition.

(2) Test the Producer throughput and Consumer throughput of this Topic.

(3) Assuming that their values are Tp and Tc, the unit can be MB/s.

(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / minTpTc)。
(4) Then assuming that the total target throughput is Tt, then the number of partitions = Tt / min (Tp, Tc).

例如:Producer吞吐量 = 20m/sConsumer吞吐量 = 50m/s,期望吞吐量100m/s
For example: Producer throughput = 20m/s;Consumer throughput = 50m/s, expected throughput 100m/s;

分区数 = 100 / 20 = 5分区
Number of partitions = 100 / 20 = 5 partitions

The number of partitions is generally set to 3-10

The number of partitions is not the more the better, nor the less the better. It is necessary to build a cluster, carry out pressure testing, and then flexibly adjust the number of partitions.

1.5.15 Kafka增加分区
1.5.15 Kafka adds partitions

1) Partitions can be increased by command line, but the number of partitions can only be increased, not decreased.

2) Why can the number of partitions only increase and not decrease?

(1) According to Kafka's existing code logic, this function can be implemented completely, but it will also make the complexity of the code increase sharply.

(2) There are many factors to consider to implement this function, such as how to deal with messages in deleted partitions.

If the partition disappears with it, the reliability of the message is not guaranteed;

If it needs to be retained, it needs to consider how to retain it, directly store it at the end of the existing partition, and the timestamp of the message will not increment. Therefore, components such as Spark and Flink that need message timestamp (event time) will be affected.

If you insert them scattered into existing partitions, then internal data replication can be resource-intensive when message volumes are high, and how can the availability of this topic be guaranteed during replication?

At the same time, sequential problems, transactional problems, and state machine switching between partitions and replicas have to be faced.

(3) On the contrary, the revenue point of this function is very low. If you really need to implement such a function, you can completely recreate a theme with a smaller number of partitions, and then copy the messages in the existing theme according to the established logic.

1.5.16 Kafka多少个Topic
1.5.16 How many topics in Kafka

ODS layer: 2

DWD layers: 20

1.5.17 Kafka消费者是拉取数据还是推送数据
1.5.17 Kafka Does the consumer pull or push data

Pull data.

1.5.18 Kafka消费端分区分配策略
1.5.18 Kafka Consumer Partition Allocation Policy

Viscous partition:

该分区分配算法是最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:
This partition allocation algorithm is the most complex one, which can be set by partition.assignment.strategy parameter. It has been introduced since version 0.11. The purpose is to make as few adjustments as possible on the previous allocation result when executing new allocation. It mainly achieves the following two goals:

(1)Topic Partition 的分配要尽量均衡。
(1) The distribution of Topic Partition should be balanced as much as possible.

(2)当 Rebalance 发生时,尽量与上一次分配结果保持一致。
(2) When Rebalance occurs, try to keep consistent with the previous allocation result.

Note: When two goals conflict, priority is given to the first goal, which can make the distribution more uniform, where the first goal is the three allocation strategies try to complete, and the second goal is the essence of the algorithm.

1.5.19 消费者再平衡的条件
1.5.19 Conditions for consumer rebalancing

1)Rebalance 的触发条件有三种
1) There are three trigger conditions for Rebalance.

(1)当Consumer Group 组成员数量发生变化(主动加入、主动离组或者故障下线等)。
(1) When the number of members of the Consumer Group changes (actively joining, actively leaving the group or failing to go offline, etc.).

(2) When the number of subscription topics or partitions changes.

2) Consumer failure offline situation

name of parameter


Connection timeout between Kafka consumer and coordinator, default 45s. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

The maximum time a consumer can process a message is 5 minutes. Beyond this value, the consumer is removed and the consumer group performs rebalancing.

3) Actively join consumer groups

Adding consumers to existing concentrations also triggers Kafka rebalancing. Note that if Flink is downstream, Flink maintains its own offset and does not trigger Kafka rebalancing.

1.5.20 指定Offset消费
1.5.20 Specify Offset Consumption

Data can be consumed at any offset., 1000);

1.5.21 指定时间消费
1.5.21 Specified time consumption

Data can be consumed over time.

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);


1.5.22 Kafka监控

The company developed its own monitor.

Open source monitors: KafkaManager, KafkaMonitor, KafkaEagle.

1.5.23 Kafka数据积压
1.5.23 Kafka data backlog

1) Discovery of data backlog

Through Kafka's monitor Eagle, you can see the consumption lag, which is the backlog:

2) Resolved

(1) Insufficient consumer spending power

可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
① You can consider increasing the number of partitions of Topic, and at the same time increasing the number of consumers in the consumer group, the number of consumers = the number of partitions. (Both are missing.)

Increase the number of partitions;

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

② Increase the number of pulls per batch and improve the consumption ability of individual consumers.

name of parameter



默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).


The maximum number of messages returned by pulling data at one time. The default is 500.

(2) Consumer processing capacity is not good

① Consumers, adjust the size of fetch.max.bytes, the default is 50m.

② Consumers, adjust the size of max.poll.records, the default is 500.

If the downstream is computing engines such as Spark and Flink, calculation and analysis processing shall be carried out after data consumption. When the processing capacity cannot keep up with the consumption capacity, backpressure will occur, thus reducing the consumption rate.

Compute performance needs to be tuned (see Spark, Flink optimization).

(3) How to deal with the backlog of messages

At some point, the backlog suddenly starts and continues to rise. This situation requires you to find the cause of the backlog in a short time and quickly solve the problem.

There are only two ways message backlogs can suddenly increase: sending faster or consuming slower.

If you catch up with big promotion or rush purchase, it is unlikely to optimize the code of consumer end to improve consumption performance in a short time. At this time, the only way is to improve the overall consumption ability by expanding the number of instances of consumer end. If there is not enough server resources for expansion in a short time, it can only downgrade some unimportant services, reduce the amount of data sent by the sender, and at least make the system work normally to ensure that important services are normal.

If consumption slows down through internal monitoring, you need to check consumption examples and analyze what causes consumption to slow down?

① Prioritize checking logs to see if there are a lot of consumption errors.

② At this time, if there is no error, you can print the stack information to see where your consumption thread is stuck "triggering deadlock or stuck in some waiting resources".

1.5.24 如何提升吞吐量
1.5.24 How to improve throughput

How to improve throughput?

1) Increase production throughput

buffer.memory: The buffer size of the message sent, the default value is 32m, can be increased to 64m.

Batch size: The default is 16k. If the batch setting is too small, it will cause frequent network requests and throughput reduction; if the batch is too large, it will cause a message to wait for a long time to be sent, increasing network latency.

(3),这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。, which defaults to 0, meaning the message must be sent immediately. Generally set a 5-100 ms. If is set too small, it will cause frequent network requests and throughput degradation; if is too long, it will cause a message to wait for a long time to be sent, increasing network latency.

(4) compression.type: default is none, no compression, but you can also use lz4 compression, efficiency is good, compression can reduce data volume, improve throughput, but will increase the CPU overhead of the producer side.

2) Additional zoning

3) Consumers increase throughput

(1) Adjust the size of fetch.max.bytes, the default is 50m.

(2) Adjust the size of max.poll.records, the default is 500.

1.5.25 Kafka中数据量计算
1.5.25 Calculation of data volume in Kafka

Total data volume per day 100g, generating 100 million logs per day, 100 million/24/60/60=1150 logs per second

Average per second: 1,150

Low point per second: 50

高峰每秒钟:1150*(2-20= 2300条 - 23000条
Peak per second: 1150 *(2-20 times)= 2300 - 23000

每条日志大小0.5k - 2k(取1k
Size of each log: 0.5k - 2k (take 1k)

每秒多少数据量:2.0M - 20MB
How much data per second: 2.0M - 20MB

1.5.26 Kafka如何压测?
1.5.26 How does Kafka pressure test?

Use Kafka's official script to test Kafka.



1)Kafka Producer压力测试
1) Kafka Producer Pressure Test

(1)创建一个test Topic,设置为3个分区3个副本
(1) Create a test Topic, set to 3 partitions and 3 copies

[atguigu@hadoop102 kafka]$ bin/ --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test

(2) These two files are located in the/opt/module/kafka/bin directory. Let's test it.

[atguigu@hadoop105 kafka]$ bin/ --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384

Parameter Description:

Record-size is how big a piece of information is, in bytes, and this test is set to 1k.

num-records is the total number of messages sent, and this test is set to 1 million.

throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟1万条。
Throughput is the number of messages per second, set to-1, indicating unlimited flow, as fast as possible production data, can measure the maximum throughput of producers. The experiment was set at 10,000 per second.

producer-props 后面可以配置生产者相关参数,batch.size配置为16k
Producer-related parameters can be configured after producer-props, batch.size is configured to 16k.


ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384

37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency.

。。。 。。。

33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency.

1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th.

(3) Adjust batch.size

(4) Adjust time

(5) Adjust the compression method

(6) Adjust cache size

2)Kafka Consumer压力测试
2) Kafka Consumer Pressure Test

(1) Modify the number of items pulled at one time in the/opt/module/kafka/config/ file to 500


(2) Consumption of 1 million logs for pressure testing

[atguigu@hadoop105 kafka]$ bin/ --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/

Parameter Description:

--bootstrap-server Specifies the Kafka cluster address

--topic 指定topic的名称
--topic Specifies the name of the topic

--messages 总共要消费的消息个数。本次实验100万条。
--messages Total number of messages to consume. 1 million experiments.


start.time, end.time,, MB.sec,, nMsg.sec,,, fetch.MB.sec, fetch.nMsg.sec

2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418

(3) The number of strips pulled at one time is 2000

Adjust fetch.max.bytes size to 100m

1.5.27 磁盘选择
1.5.27 Disk Selection

The bottom layer of kafka is mainly sequential writing, and the sequential writing speed of solid state disk and mechanical hard disk is similar.

It is recommended to choose ordinary mechanical hard disk.

每天总数据量:1亿条 * 1k 100g
Total data volume per day: 100 million * 1k ≈ 100g

100g * 副本2 * 保存时间3天 / 0.7 ≈ 1T
100g * copy 2 * storage time 3 days/ 0.7 ≈ 1T

It is recommended that the total hard disk size of three servers be greater than or equal to 1T.

1.5.28 内存选择
1.5.28 Memory Selection

Kafka内存组成:堆内存 + 页缓存
Kafka memory composition: heap memory + page cache

1Kafka堆内存建议每个节点:10g ~ 15g
1) Kafka heap memory recommended each node: 10g ~ 15g


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"


(1) Check the Kafka process number

[atguigu@hadoop102 kafka]$ jps

2321 Kafka

5255 Jps

1931 QuorumPeerMain

(2) According to Kafka process number, check Kafka GC situation

[atguigu@hadoop102 kafka]$ jstat -gc 2321 1s 10


0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

Parameter Description:

YGC: Young generation garbage collection times;

(3) According to Kafka process number, check Kafka heap memory

[atguigu@hadoop102 kafka]$ jmap -heap 2321

… …

Heap Usage:

G1 Heap:

regions = 2048

capacity = 2147483648 (2048.0MB)

used = 246367744 (234.95458984375MB)

free = 1901115904 (1813.04541015625MB)

11.472392082214355% used

2) Page cache:

The page cache is the memory of the Linux system server. We only need to ensure that 25% of the data in 1 segment (1g) is in memory.

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如10个分区,页缓存大小=(10 * 1g * 25%)/ 3 1g
Page cache size per node =(number of partitions * 1g * 25%)/number of nodes. For example, 10 partitions, page cache size =(10 * 1g * 25%)/ 3 ≈ 1g

Recommended server memory is greater than or equal to 11G.

1.5.29 CPU选择
1.5.29 CPU Selection

1) Default configuration = 8 负责写磁盘的线程数。 = 8 Number of threads responsible for writing to disk.

num.replica.fetchers = 1 副本拉取线程数。
num.replica.fetchers = 1 Number of replica pull threads. = 3 数据传输线程数。
num. network.threads = 3 Number of data transfer threads.

2) Recommended configuration

In addition, there are some other threads in the background, such as cleaning up the data thread, Controller is responsible for sensing and controlling the threads of the entire cluster, etc., so that each Broker will have hundreds of threads. According to experience, 4-core CPU processing dozens of threads will be full at peak times, 8 cores are barely enough, and considering that other services must be run on the cluster, so it is generally recommended that Kafka servers be deployed at more than 16 cores to handle one or two hundred threads of work. If conditions permit, it is better to give 24 cores or even 32 cores. = 16 负责写磁盘的线程数。 = 16 Number of threads responsible for writing to disk.

num.replica.fetchers = 2 副本拉取线程数。
num.replica.fetchers = 2 Number of replica pull threads. = 6 数据传输线程数。
num. network.threads = 6 Number of data transfer threads.

服务器建议购买 32核CPU
Server recommends buying 32-core CPU

1.5.30 网络选择
1.5.30 Network Selection

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
Network bandwidth = peak throughput ≈ 20MB/s, select Gigabit NIC.

100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s
100 Mbps/s, 10 M/s, 1 byte = 8 bit, 100 Mbps/8 = 12.5M/s.

General 100 Mbps network card (100Mbps=12.5m/s), Gigabit network card (1000Mbps=125m/s), 10 Gigabit network card (1250m/s).

一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
General 100 Mbps network card (100Mbps), Gigabit network card (1000Mbps), 10 Gigabit network card (10000Mbps). 100Mbps unit is bit;10M/s unit is byte ; 1 byte = 8 bit, 100Mbps/8 = 12.5 M/s.

Usually choose gigabit or 10 gigabit network card.

1.5.31 Kafka挂掉

In a production environment, if a Kafka node fails.

Normal treatment method:

(1) Look at the log first, try to restart it, if it can start normally, then solve it directly.

(2) If the restart does not work, check the memory, CPU, and network bandwidth. Tuning => Tuning does not increase resources

(3) If the entire Kafka node is deleted by mistake, if the number of copies is greater than or equal to 2, a new node can be re-served in the same way as the new node, and Load Balancer can be executed.

1.5.32 Kafka的机器数量
1.5.32 Number of machines in Kafka

1.5.33 服役新节点退役旧节点
1.5.33 New nodes in service Retired old nodes

The service and retirement nodes can be accessed via the bin/ script.

1.5.34 Kafka单条日志传输大小
1.5.34 Kafka Single Log Transfer Size

The default size of Kafka for message body is 1M, but in our application scenario, there will often be a message larger than 1M, if Kafka is not configured. Then there will be producers unable to push messages to Kafka or consumers unable to consume data in Kafka, then we need to configure Kafka as follows:

name of parameter



Default 1m, maximum number of messages received by Broker.


Default 1m, maximum value of each request message sent by producer to Broker. Sets the size of the message body for the Topic level.


Default 1m, copy sync data, maximum per batch message.


默认Default:5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。
Default: 52428800 (50 m). The consumer gets the maximum number of bytes in a batch of server-side messages. If a batch of data on the server side is larger than this value (50m), it can still be pulled back, so this is not an absolute maximum. The size of a batch is affected by message.max.bytes (broker config) or max.message.bytes (topic config).

1.5.35 Kafka参数优化
1.5.35 Kafka parameter optimization

Key tuning parameters:

(1)buffer.memory 32m


(3)linger.ms默认0 调整 5-100ms

(4)compression.type采用压缩 snappy

(5) The consumer adjusts the fetch.max.bytes size, which defaults to 50m.

(6) The consumer adjusts the size of max.poll.records, the default is 500.


(8)Kafka堆内存建议每个节点:10g ~ 15g
(8) Kafka heap memory recommendations per node: 10g ~ 15g


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"


(9) Increase CPU core count = 8 负责写磁盘的线程数 = 8 Number of threads responsible for writing to disk

num.replica.fetchers = 1 副本拉取线程数
num.replica.fetchers = 1 Number of replica pull threads = 3 数据传输线程数
num. network.threads = 3 Number of data transfer threads

(10)日志保存时间log.retention.hours 3
(10) Log retention time log.retention.hours 3 days

(11) Number of copies, adjusted to 2

1.6 Hive

1.6.1 Hive的架构
1.6.1 Hive's architecture

1.6.2 HQL转换为MR流程
1.6.2 HQL to MR Process

SQLParser: converts SQL strings into abstract syntax trees (AST)

(2)语义分析器(Semantic Analyzer):将AST进一步抽象为QueryBlock(可以理解为一个子查询划分成一个QueryBlock)
(2) Semantic Analyzer: AST is further abstracted into QueryBlock (can be understood as a subquery divided into a QueryBlock)

(2)逻辑计划生成器(Logical Plan Gen):由QueryBlock生成逻辑计划
(2) Logical Plan Gen: Generate logical plans from QueryBlock

(3)逻辑优化器(Logical Optimizer):对逻辑计划进行优化
(3) Logical Optimizer: Optimize the logical plan

(4)物理计划生成器(Physical Plan Gen):根据优化后的逻辑计划生成物理计划
(4) Physical Plan Gen: Generate physical plan according to optimized logical plan.

(5)物理优化器(Physical Optimizer):对物理计划进行优化
(5) Physical Optimizer: Optimize the physical plan

(6) Execution: execute the plan, get the query result and return it to the client.

1.6.3 Hive和数据库比较
1.6.3 Hive vs. Database Comparison

Hive 和数据库除了拥有类似的查询语言,再无类似之处。
Hive and databases have nothing in common except a similar query language.

1) Data storage location

Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
Hive is stored in HDFS. Databases store data in block devices or local file systems.

2) Data update

Hive does not recommend rewriting data. The data in the database usually needs to be modified frequently.

3) Implementation delay

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
Hive has higher execution latency. Database execution latency is low. Of course, this is conditional, that is, the data size is small, when the data size is large enough to exceed the processing capacity of the database, Hive's parallel computing can obviously show its advantages.

4) Data size

Hive supports very large-scale data computations; databases can support smaller data scales.

1.6.4 内部表和外部表
1.6.4 Internal and external tables

Metadata, raw data

1) When deleting data

Internal tables: metadata, raw data, delete all

外部表:元数据 只删除
External tables: metadata deleted only

2) In a company production environment, when are internal tables created and when are external tables created?

Most scenarios in a company are external tables.

Internal tables are created only when temporary tables are used by oneself;

1.6.5 系统函数
1.6.5 System functions

1) numerical function

round round

2) String functions

(1) substring: intercept string;(2) replace: replace;(3) regexp_replace: regular replacement

(4) regexp: regular matching;(5) repeat: repeated string;(6) split: string cutting

(7) nvl: replace null value;(8) concat: concatenate string;

(9) concat_ws: concatenates strings or arrays of strings with specified delimiters;

(10) get_json_object: Parse JSON string

3) Date function

unix_timestamp: Returns the timestamp of the current or specified time

(2)from_unixtime:转化UNIX时间戳(从 1970-01-01 00:00:00 UTC 到指定时间的秒数)到当前时区的时间格式
(2) from_unixtime: converts the UNIX timestamp (seconds from 1970-01-01 00:00:00 UTC to the specified time) to the time format of the current time zone

(3) current_date: current date

(4) current_timestamp: the current date plus time, and accurate milliseconds

(5) month: month of the acquisition date;(6) day: day of the acquisition date

datediff: the number of days between two dates (the end date minus the start date)

(8) date_add: date plus days;(9) date_sub: date minus days

(10) date_format: Parses a standard date into a string in a specified format

4) Process control function

(1)case when:条件判断函数
(1) case when: conditional judgment function

(2) if: conditional judgment, similar to the ternary operator in Java

5) Set function

(1) array: declared array collection

(2) Map: Create a map collection

(3) named_struct: Declares the attributes and values of struct

size: the number of elements in the collection

(5) map_keys: returns the key in the map


array_contains: determines whether an array contains an element

sort_array: sort elements in an array

6) Aggregate function

(1) collect_list: collect and form a list collection, the result is not repeated

(2) collect_set: collect and form a set, and remove the duplicate results

1.6.6 自定义UDF、UDTF函数
1.6.6 Custom UDF, UDTF Functions

1) Have UDF and UDTF functions been customized in the project, and what problems have been solved with them, and custom steps?

(1) At present, if the logic in the project is not particularly complex, custom UDF and UDTF are not used.

(2) Custom UDF: Inheriting G.. UDF, override core method evaluate

(3) Custom UDTF: inherit from GenericUDTF, rewrite 3 methods: initialize (customize the column name and type of output), process (return the result forward (result)), close

2) Under what circumstances do UDF/UDTF usually be used in enterprises?

(1) Because of the custom function, you can print out any calculation process inside the custom function for debugging.

(2) When introducing third-party jar packages, it is also required.

3)广告数仓中解析IP/UA使用hi ve的自定义函数
3) Analyze IP/UA in advertising warehouse using hive custom function

解析IP:可以调用接口/ 使用离线的IP2Region数据库
Resolving IP: interfaces can be invoked/offline IP2 Region databases used

解析UA:正则/ Hutool解析UA的工具类
UA Resolution: Regular/ Hutool Tools for UA Resolution

1.6.7 窗口函数
1.6.7 Window functions

Handwriting generally appears in scene questions: grouping TopN, row to column, column to column.

According to the function, common windows can be divided into the following categories: aggregation function, cross-line value function, ranking function.

1) aggregation function

Max: Maximum value.

min: Minimum value.

sum: sum.

AVG: Average.

Count: Counting.

2) Cross-line value function


Note: The lag and lead functions do not support custom windows.


3) Ranking function

注:rank 、dense_rank、row_number不支持自定义窗口。
Note: rank, dense_rank, row_number do not support custom windows.

1.6.8 Hive优化
1.6.8 Hive optimization 分组聚合 Group Aggregation

一个分组聚合的查询语句,默认是通过一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
A grouped aggregate query statement, by default, is completed through a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal.

分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个Hash Table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至Reduce端,完成最终的聚合。
The optimization of grouping aggregation mainly revolves around reducing the amount of Shuffle data, and the specific method is map-side aggregation. Map-side aggregation is to maintain a Hash Table on the map side, use it to complete partial aggregation, and then send the results of partial aggregation to the Reduce side according to the grouping fields to complete the final aggregation.

The relevant parameters are as follows:

--Enable map-side aggregation, default is true


--Used to detect whether the source table data is suitable for map-side aggregation. The detection method is as follows: map-side aggregation is carried out on several pieces of data at first; if the ratio of the number of pieces after aggregation to the number of pieces before aggregation is less than the value, the table is considered suitable for map-side aggregation; otherwise, the table data is considered not suitable for map-side aggregation, and subsequent data will not be map-side aggregated.


--Number of entries used to detect whether the source table fits into map-side aggregation.

set hive.groupby.mapaggr.checkinterval=100000;

--map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
--map-side The hash table used for aggregation, occupying the maximum proportion of map task heap memory. If this value is exceeded, the hash table will be flushed once.

set; Map Join

Hive中默认最稳定的Join算法是Common Join。其通过一个MapReduce Job完成一个Join操作。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
The most stable Join algorithm in Hive is Common Join by default. It completes a Join operation with a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

优化Join的最为常用的手段就是Map Join,其可通过两个只有Map阶段的Job完成一个join操作。第一个Job会读取小表数据,将其制作为Hash Table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。
The most common way to optimize Join is Map Join, which can complete a join operation through two jobs with only Map phases. The first Job reads the small table data, makes it into a Hash Table, and uploads it to Hadoop distributed cache (essentially HDFS). The second Job reads the small table data from the distributed cache and caches it in the memory of the Map Task, then scans the large table data, so that the association operation can be completed on the map side.

注:由于Map Join需要缓存整个小标的数据,故只适用于大表Join小表的场景。
Note: Map Join needs to cache the data of the whole small table, so it is only applicable to the scene of large table Join small table.

The relevant parameters are as follows:

--启动Map Join自动转换
--Start Map Join automatic conversion


--开启无条件转Map Join
--Open Unconditional Map Join


--无条件转Map Join小表阈值,默认值10M,推荐设置为Map Task总内存的三分之一到二分之一
--Unconditional Map Join table threshold, default value 10M, recommended to set to 1/3 to 1/2 of the total memory of Map Task

set; SMB Map Join

上节提到,Map Join只适用于大表Join小表的场景。若想提高大表Join大表的计算效率,可使用Sort Merge Bucket Map Join。
As mentioned in the previous section, Map Join is only applicable to scenarios where large tables Join small tables. To improve the computational efficiency of large table Join large table, use Sort Merge Bucket Map Join.

需要注意的是SMB Map Join有如下要求:
SMB Map Join has the following requirements:

(1) All tables participating in Join are bucket tables, and bucket fields are associated fields of Join.

(2) The number of barrels in the two tables is multiple.

(3) The data is ordered in buckets according to associated fields.

SMB Join的核心原理如下:只要保证了上述三点要求的前两点,就能保证参与Join的两张表的分桶之间具有明确的关联关系,因此就可以在两表的分桶间进行Join操作了。
The core principle of SMB Join is as follows: As long as the first two points of the above three requirements are guaranteed, there can be a clear association between the buckets of the two tables participating in Join, so you can perform Join operation between the buckets of the two tables.

若能保证第三点,也就是参与Join的数据是有序的,这样就能使用数据库中常用的Join算法之一——Sort Merge Join了,Merge Join原理如下:
If you can ensure that the third point, that is, the data participating in the Join is ordered, you can use one of the Join algorithms commonly used in the database-Sort Merge Join. The principle of Merge Join is as follows:

在满足了上述三点要求之后,就能使用SMB Map Join了。
After meeting the above three requirements, you can use SMB Map Join.

由于SMB Map Join无需构建Hash Table也无需缓存小表数据,故其对内存要求很低。适用于大表Join大表的场景。
SMB Map Join has low memory requirements because it does not need to build Hash tables or cache small table data. For scenarios where large tables Join large tables. Reduce并行度 Reduce parallelism

Reduce端的并行度,也就是Reduce个数,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
The parallelism of the Reduce side, that is, the number of Reduces, can be specified by the user himself, or Hive can estimate it according to the file size input by the MR Job.

The relevant parameters of parallelism on the Reduce side are as follows:

--Specifies the reduce-side parallelism, the default value is-1, indicating that the user does not specify

set mapreduce.job.reduces;

--Reduce maximum parallelism

set hive.exec.reducers.max;

--单个Reduce Task计算的数据量,用于估算Reduce并行度
--The amount of data for a single Reduce Task computation, used to estimate Reduce parallelism

set hive.exec.reducers.bytes.per.reducer;

The logic for determining reduce-side parallelism is as follows:

If the value of the specified parameter mapreduce.job.reduces is a non-negative integer, the Reduce parallelism is the specified value. Otherwise, Hive estimates the Reduce parallelism by itself. The estimation logic is as follows:

Assuming the file size of Job input is totalInputBytes



Then the parallelism of the Reduce side is:


根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。
From the above description, it can be seen that Hive estimates the Reduce parallelism by itself based on the file size of the entire MR Job input. Therefore, in some cases, the estimated parallelism may not be accurate, and the user needs to specify the Reduce parallelism according to the actual situation.

It should be noted that if Tez or Spark engine is used, Hive can estimate the Reduce parallelism based on computational statistics, and the estimated result is relatively accurate. 小文件合并 Small file merge

If Hive's Reduce parallelism setting is unreasonable, or the estimation is unreasonable, it may lead to a large number of small files in the calculation result. This problem can be solved by the small file merge task. The principle is to judge according to the average size of the output file of the calculation task. If the condition is met, an additional task will be started separately for merging.

The relevant parameters are:

--开启合并map only任务输出的小文件
--Open merge map only task output small file

set hive.merge.mapfiles=true;

--开启合并map reduce任务输出的小文件
--Open small files for merge map reduce task output

set hive.merge.mapredfiles=true;

--Combined file size

set hive.merge.size.per.task=256000000;

--Threshold value for triggering small file merge task. If the average file size output by a calculation task is lower than this value, merge will be triggered.

set hive.merge.smallfiles.avgsize=16000000; 谓词下推 Predicate Push Down

谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。开启谓词下推优化后,无需调整SQL语句,Hive就会自动将过滤操作尽可能的前移动。
Predicate pushdown refers to moving the filtering operation forward as much as possible to reduce the amount of data in subsequent calculation steps. When predicate pushdown optimization is enabled, Hive automatically moves filtering operations as far forward as possible without adjusting SQL statements.

The relevant parameters are:

--是否启动谓词下推(predicate pushdown)优化
--Whether to start predicate pushdown optimization

set hive.optimize.ppd = true; 并行执行 Parallel Execution

Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。相关参数如下:
Hive transforms an SQL statement into one or more stages, one MR Job per Stage. By default, Hive performs only one Stage at a time. However, a SQL statement may contain multiple Stages, but these Stages may not be completely interdependent, meaning that some Stages can be executed in parallel. Parallel execution is referred to here as parallel execution of these stages. The relevant parameters are as follows:

--Enable parallel execution optimization, default is off

set hive.exec.parallel=true;  

--Maximum parallelism allowed for the same sql, default is 8

set hive.exec.parallel.thread.number=8; CBO优化 CBO Optimization

CBO是指Cost based Optimizer,即基于计算成本的优化。
CBO stands for Cost Based Optimizer, i.e. optimization based on computational costs.

在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在Hive的MR引擎下主要用于Join的优化,例如多表Join的Join顺序。
In Hive, the computational cost model takes into account: rows of data, CPU, local IO, HDFS IO, network IO, etc. Hive calculates the computational cost of different execution plans for the same SQL statement and selects the execution plan with the lowest cost. At present, CBO is mainly used for Join optimization under Hive's MR engine, such as the Join order of multi-table Join.

The relevant parameters are:

--whether to enable cbo optimization

set hive.cbo.enable=true; 列式存储 Column Storage

ORC column storage is adopted to speed up query.

id name age

1 zs 18

2 lishi 19

行:1 zs 18 2 lishi 19

列:1 2 zs lishi 18 19

select name from user 压缩 Compression

Compression reduces disk IO: Because Hive's underlying compute engine defaults to MR, Snappy compression can be used on the Map output.

Map(Snappy ) Reduce 分区和分桶 Partitions and buckets

(1)创建分区表 防止后续全表扫描
(1) Create partitioned tables to prevent subsequent full table scans

(2)创建分桶表 对未知的复杂的数据进行提前采样
(2) Create bucket tables to sample unknown complex data in advance 更换引擎 Engine Replacement


MR engine: multi-job series, disk-based, more places to drop disk. Although slow, it will definitely run out of results. General treatment, weekly, monthly and annual indicators.

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。
Spark engine: Although it also falls in Shuffle process, not all operators need Shuffle, especially multi-operator process, and the intermediate process does not fall in DAG directed acyclic graph. Both reliability and efficiency are considered. General treatment day indicators.

2) Advantages of Tez Engine

(1) Using DAG to describe tasks reduces unnecessary intermediate nodes in MR, thereby reducing disk IO and network IO.

(2) Cluster resources can be better utilized, such as Container reuse, parallelism of initial tasks calculated according to cluster resources, etc.

(3) The parallelism of subsequent tasks can be dynamically adjusted according to the specific data amount when the task is running. 几十张表join 如何优化 Dozens of tables join How to optimize

(1) Reduce the number of join tables: Without affecting the business premise, you can consider preprocessing and merging some tables to reduce the join operation.

(2)使用Map Join:将小表加载到内存中,从而避免了Reduce操作,提高了性能。通过设置为true来启用自动Map Join。
(2) Use Map Join: Load small tables into memory, thus avoiding the Reduce operation and improving performance. Enable automatic Map Join by setting to true.

(3)使用Bucketed Map Join:通过设置hive.optimize.bucketmapjoin为true来启用Bucketed Map Join。

(4)使用Sort Merge Join:这种方式在Map阶段完成排序,从而减少了Reduce阶段的计算量。通过设置为true来启用。
(4) Sort Merge Join: This method completes the sorting in the Map phase, thus reducing the amount of computation in the Reduce phase. Enable by setting to true.

(5) Control the number of Reduce tasks: Control the number of Reduce tasks by setting hive.exec.reducers.bytes.per.reducer and mapreduce.job.reduces.

(6) Filter unwanted data: Before joining operations, try to filter out unwanted data to improve performance.

(7) Choose the right join order: Putting small tables in front can reduce the amount of data in the middle result and improve performance.

(8) Use zoning: Consider using zoning techniques. Only partition data matching the query criteria needs to be read, reducing the amount of data and computation.

(9) Use compression: By compressing data, you can reduce disk and network IO and improve performance. Pay attention to choosing the appropriate compression format and compression level.

(10) Adjust Hive configuration parameters: According to the hardware resources and actual needs of the cluster, reasonably adjust Hive configuration parameters, such as memory, CPU, IO, etc., to improve performance.

1.6.9 Hive解决数据倾斜方法
1.6.9 Hive Solutions for Data Skew

The data skew problem usually refers to the uneven distribution of data involved in computation, that is, the data volume of a certain key or some keys far exceeds that of other keys, resulting in a large number of data of the same key being sent to the same Reduce in the shuffle stage, which in turn leads to the time required for the Reduce far exceeding that of other Reductions, becoming the bottleneck of the whole task. The following are examples of data skewing in production environments:

Data skew in Hive often occurs in the scenarios of grouping aggregation and join operations. The optimization ideas in the above two scenarios are described below.

1) Data skew caused by grouping aggregation

前文提到过,Hive中的分组聚合是由一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。若group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜。
As mentioned earlier, packet aggregation in Hive is done by a MapReduce Job. The Map terminal is responsible for reading data, partitioning the data according to grouping fields, sending the data to the Reduce terminal through Shuffle, and completing the final aggregation operation of each group of data at the Reduce terminal. If the values of the group by field are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew.

The data skew problem caused by grouping aggregation can be solved as follows:

(1) Determine whether the value of inclination is null

If the value of tilt is null, consider whether the final result needs this part of data. If not, as long as null is filtered out in advance, the problem can be solved. If you need to retain this data, consider the following ideas.


When Map-Side aggregation is enabled, the data will now be partially aggregated on the Map side. In this way, even if the original data is skewed, after the initial aggregation at the Map end, the data sent to Reduce is no longer skewed. At its best, Map-side aggregation can completely mask the data skew problem.

The relevant parameters are as follows:



set hive.groupby.mapaggr.checkinterval=100000;



Skew-GroupBy is a solution provided by Hive to solve the data skew problem caused by grouping aggregation. The principle is to start two MR tasks. The first MR is partitioned according to random numbers, sending data to Reduce and completing partial aggregation. The second MR is partitioned according to grouping fields to complete final aggregation.

The relevant parameters are as follows:

--Enable group aggregation data skew optimization

set hive.groupby.skewindata=true;

2) Data skew caused by Join

若Join操作使用的是Common Join算法,就会通过一个MapReduce Job完成计算。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
If the Join operation uses the Common Join algorithm, the computation is done via a MapReduce Job. The Map end is responsible for reading the data of the table required for the Join operation, partitioning it according to the associated fields, and sending it to the Reduce end through Shuffle. The data with the same key completes the final Join operation at the Reduce end.

If the values of the associated fields are unevenly distributed, it may cause a large number of the same keys to enter the same Reduce, resulting in data skew problems.

The data skew problem caused by Join can be solved as follows:

1Map Join

使用Map Join算法,Join操作仅在Map端就能完成,没有Shuffle操作,没有Reduce阶段,自然不会产生Reduce端的数据倾斜。该方案适用于大表Join小表时发生数据倾斜的场景。
Using Map Join algorithm, Join operation can be completed only on Map side, there is no Shuffle operation, there is no Reduce stage, and naturally there will be no data skew on Reduce side. This scenario is suitable for scenarios where data skew occurs when a large table joins a small table.

The relevant parameters are as follows:




(2)Skew Join

若参与Join的两表均为大表,Map Join就难以应对了。此时可考虑Skew Join,其核心原理是Skew Join的原理是,为倾斜的大key单独启动一个Map Join任务进行计算,其余key进行正常的Common Join。原理图如下:
If the two tables participating in the Join are both large tables, Map Join will be difficult to cope with. At this time, we can consider Skew Join, whose core principle is Skew Join. The principle of Skew Join is to start a Map Join task for inclined large keys separately, and the rest of the keys are used for normal Common Join. The schematic diagram is as follows:

The relevant parameters are as follows:

--启用skew join优化
--Enable skew join optimization

set hive.optimize.skewjoin=true;

--触发skew join的阈值,若某个key的行数超过该参数值,则触发
--Threshold for triggering skew join. Trigger if the number of rows in a key exceeds the parameter value.

set hive.skewjoin.key=100000;

3) Adjust SQL statements

If the two tables participating in the Join are both large tables, and the data of one of the tables is skewed, you can also adjust the SQL statement accordingly in the following ways.

Suppose the original SQL statement is as follows: A, B are both large tables, and one of the tables has skewed data.

hive (default)>



from A

join B


The process of joining is as follows:

1001 in the figure is a large tilted key, which can be seen to be sent to the same Reduce for processing.

The SQL statement execution plan after adjustment is shown in the following figure:

Adjust SQL statements as follows:

hive (default)>




select --打散操作
select --scatter operation

concat(id,'_',cast(rand()*2 as int)) id,


from A



select --扩容操作
select --expansion operation

concat(id,'_',1) id,


from B

union all


concat(id,'_',2) id,


from B



1.6.10 Hive的数据中含有字段的分隔符怎么处理?
1.6.10 What about separators in Hive data that contain fields?

Hive 默认的字段分隔符为Ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和JavaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束
Hive's default field delimiter is the Ascii code control character\001 (^A), and fields terminated by '\001' is used when creating tables. Note: If\t or\001 is used as the delimiter, it is required that the data passed from the front-end buried point and JavaEE background must not appear the delimiter, which is constrained by the code specification.

Once the transmitted data contains delimiters, escape or substitution (ETL) is required in the previous level of data. Sqoop and DataX are usually used to preprocess data when synchronizing.

id name age

1 zs 18

2 li分隔符si 19
2 li separator si 19

1.6.11 MySQL元数据备份
MySQL Metadata Backup

Metadata backup (key point, if the data is damaged, the whole cluster may not be able to run, at least two copies should be backed up to other servers after zero point every day).

(1) MySQL backup data script (it is recommended to backup metadata regularly once a day)






# 备份目录,需提前创建
#Backup directory, need to be created in advance


# 备份天数,超过这个值,最旧的备份会被删除
#backup days, beyond which the oldest backup will be deleted


# 备份MySQL数据库
#Backup MySQL database

[ -d "${BACKUP_DIR}" ] || exit 1

mysqldump \

--all-databases \

--opt \

--single-transaction \

--source-data=2 \

--default-character-set=utf8 \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \

-p"${MYSQL_PASSWORD}" | gzip > "${BACKUP_DIR}/$(date +%F).gz"

if [ "$(ls "${BACKUP_DIR}" | wc -l )" -gt "${FILE_ROLL_COUNT}" ]


ls "${BACKUP_DIR}" | sort |sed -n 1p | xargs -I {} -n1 rm -rf "${BACKUP_DIR}"/{}


(2) MySQL Recovery Data Script







# 恢复指定日期,不指定就恢复最新数据
#Restore specified date, restore latest data without specifying


[ "${RESTORE_DATE}" ] && BACKUP_FILE="${RESTORE_DATE}.gz" || BACKUP_FILE="$(ls ${BACKUP_DIR} | sort -r | sed -n 1p)"

gunzip "${BACKUP_DIR}/${BACKUP_FILE}" --stdout | mysql \

-h"${MYSQL_HOST}" \

-u"${MYSQL_USER}" \


1.6.12 如何创建二级分区表?
1.6.12 How do I create a secondary partition table?

create table dept_partition2(

deptno int, -- 部门编号
deptno int, --department number

dname string, -- 部门名称
dname string, --Department name


partitioned by (day string, hour string)

row format delimited fields terminated by '\t';

1.6.13 UnionUnion all区别

(1) union will deduplicate the result set of the union

(2)union all不会对结果集去重
(2) union all does not duplicate the result set

1.7 Datax

1.7.1 DataX与Sqoop区别

1) DataX and Sqoop are mainly used for batch synchronous data processing scenarios in offline systems.

2) DataX and Sqoop differ as follows:

(1) DataX bottom layer is single process multithread;Sqoop bottom layer is 4 maps;

(2) Sqoop distributed synchronization is preferred for scenarios with large data volume; DataX is preferred for scenarios with small data volume, which is completely memory-based;DataX has large data volume, and multiple DataX instances can be used, each of which is responsible for a part (manual division).

(3) Sqoop is born for Hadoop and has good compatibility with Hadoop-related components;Datax is plug-in development and supports more Source and Sink.

(4) Sqoop is not officially upgraded and maintained;DataX is currently upgraded and maintained by Ali.

(5) About running logs and statistics, DataX is richer, Sqoop is not easy to collect based on Yarn

1.7.2 速度控制
1.7.2 Speed control

1) The key optimization parameters are as follows:



Total Number of Concurrence


Total record speed limit


Total byte speed limit

Record speed limit of a single channel, default value is 10000 (10000/s)

Byte speed limit of single channel, default value 1024*1024 (1M/s)

2) Priority:

(1)全局Byte限速 / 单Channel Byte限速
(1) Global Byte Speed Limit/Single Channel Byte Speed Limit

(2)全局Record限速 / Channel Record限速
(2) Global Record Speed Limit/Single Channel Record Speed Limit

Set both, take the smaller result

(3) None of the above is set, and the setting of the total number of channels takes effect.

3) Project configuration

只设置 总channel数=5,基本可以跑满网卡带宽。
Just set the total number of channels =5, and you can basically run full network card bandwidth.

1.7.3 内存调整
1.7.3 Memory Adjustments

It is recommended to set the memory to 4G or 8G, which can also be adjusted according to the actual situation.

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
There are two ways to adjust JVM xms xmx parameters: one is to directly change the script; the other is to add the corresponding parameters at startup, as follows:

python datax/bin/ --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

1.7.4 空值处理
1.7.4 Handling of null values

1)MySQL(null) => Hive (\N) 要求Hive建表语句
1) MySQL (null)=> Hive (\N) requires Hive table creation statements

There are two solutions to this problem:

(1)修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考
(1) Modify the source code of DataX HDFS Writer and add logic to customize null value storage format. Please refer to

(2) Specify null values when building tables in Hive to be stored in an empty string (''), for example:

DROP TABLE IF EXISTS base_province;




`name` STRING COMMENT '省份名称',

`region_id` STRING COMMENT '地区ID',

`area_code` STRING COMMENT '地区编码',

`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_code` STRING COMMENT 'Old ISO-3166-2 code for visualization',

`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
`iso_3166_2` STRING COMMENT 'New IOS-3166-2 code for visualization'

) COMMENT '省份表'
) COMMENT 'Province table'



LOCATION '/base_province/';

2Hive(\N => MySQL null

"reader": {

"name": "hdfsreader",

"parameter": {

"defaultFS": "hdfs://hadoop102:8020",

"path": "/base_province",

"column": [



"fileType": "text",

"compress": "gzip",

"encoding": "UTF-8",

"nullFormat": "\\N",

"fieldDelimiter": "\t",



1.7.5 配置文件生成脚本
1.7.5 Profile Generation Scripts

(1) A table a configuration, if there are thousands of tables, how to write the configuration?

(2) Description of script usage

python -d database -t table

1.7.6 DataX一天导入多少数据
1.7.6 DataX How much data is imported in a day

1) The full synchronization table is as follows

Activity table, Offer rule table, Coupon table, SKU platform attribute table, SKU sales attribute table

SPU commodity table (100,000 - 200,000), SKU commodity table (100,000 - 200,000), brand table, commodity primary classification, commodity secondary classification, commodity tertiary classification

Province table, region table

coding dictionary table

All of the above add up to 300,000, which is about 300m.

加购表(每天增量20万、全量100万 =》1g)
Additional purchase table (daily increment of 200,000, total amount of 1 million => 1g)

Therefore, Datax has about 1-2g of data synchronized every day.

注意:金融、保险(平安 、民生银行),只有业务数据数据量大一些。
Note: Finance, insurance (Ping An, Minsheng Bank), only the amount of business data is larger.

2) The incremental synchronization table is as follows

Additional purchase table (200,000), order table (100,000), order details table (150,000), order status table, payment table (90,000), return table (1000), refund table (1000)

Order Details Coupon Association Table, Coupon Collection Table

Product review table, collection table

User Table, Order Details Activity Association Table

Incremental data 1-2g per day

1.7.7 Datax如何实现增量同步
1.7.7 Datax How to achieve incremental synchronization

Get today's new and changed data: filtered by sql, created today or operated on today.

1.8 Maxwell

1.8.1 Maxwell与Canal、FlinkCDC的对比

1) FlinkCDC, Maxwell and Canal are mainly used for real-time data synchronization processing scenarios in real-time systems.




SQL and Data Number Relationship

SQL affects a few items appear a few items

SQL affects a few items appear a few items

Only one whole (may need to be blown later)

Data initialization function (synchronous full data)

Yes (support multi-database and multi-table simultaneous operation)

Yes (single table)

breakpoint resume function

Yes (on CK)

Yes (MySQL exists)

Yes (local)

1.8.2 Maxwell好处

Support breakpoint resume.

Full initialization synchronization.

Automatically send data to the corresponding topic of Kafka based on the library name and table name.

1.8.3 Maxwell底层原理
1.8.3 Maxwell's underlying principles

MySQL master-slave replication.

1.8.4 全量同步速度如何
1.8.4 How about full synchronous speed

Slow synchronization speed, full synchronization is recommended to use Sqoop or DataX.

1.8.5 Maxwell数据重复问题
1.8.5 Maxwell data duplication problem

When synchronizing historical data, bootstrap scans all data.

Maxwell also listens for binlog changes.

For example, when synchronizing the historical database with bootstrap, a new piece of data is inserted into the historical database. At this time, bootstrap scans and maxwell processes monitor it. At this time, the data duplication problem will occur.

1.9 DolphinScheduler调度器

1.3.9 Version, support email, enterprise WeChat.

2.0.3 Version, support alarm information more complete, easier configuration.

3.0.0 The above versions support data quality monitoring.

1.9.1 每天集群运行多少指标?
1.9.1 How many metrics does the cluster run per day?

Run more than 100 indicators every day, and run about 200 when there is activity.

1.9.2 任务挂了怎么办?
1.9.2 What if the mission fails?

(1) E-mail, nail and integrated automatic phone call will be sent if the operation succeeds or fails.

(2) The main solution is to look at the log and solve the problem.

(3) Alarm website Ruixiangyun,

(4) Double 11 and 618 activities require 24-hour duty

1.9.3 What happens when DS dies?

See the log error reason: direct restart, insufficient resources to increase resources in the restart

1.10 Spark Core & SQL

1.10.1 Spark运行模式
1.10.1 Spark operating mode

Local: Running on a machine. For testing.

(2)Standalone是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
Standalone: A scheduling system for Spark itself. When cluster performance requirements are very high. It is rarely used domestically.

(3)Yarn:采用Hadoop的资源调度器 国内大量使用。
Yarn: Resource scheduler with Hadoop. Large domestic use.

Yarn-client mode: Driver runs on Client (not AM)


(4) Mesos: rarely used in the country.

(5) K8S: Trend, but immature at present, too much configuration information is required.

1.10.2 Spark常用端口号
1.10.2 Common port numbers for Spark

(1)4040 spark-shell任务端口
(1) 4040 spark-shell task port

(2)7077 内部通讯端口。类比Hadoop的8020/9000
(2) 7077 internal communication port. Analogy of Hadoop 8020/9000

(3)8080 查看任务执行情况端口。 类比Hadoop的8088
(3) Port 8080 to view task execution status. Analogy of Hadoop 8088

(4)18080 历史服务器。类比Hadoop的19888
(4) 18080 History Server. Analogy of Hadoop 19888

Note: Since Spark is only responsible for computing, there is no port 9870/50070 for storing data in Hadoop.

1.10.3 RDD五大属性
1.10.3 RDD Five Attributes

1.10.4 RDD弹性体现在哪里
1.10.4 Where is RDD Resilience

It is mainly manifested as storage elasticity, calculation elasticity, task elasticity and data location elasticity, as follows:

(1) Automatic memory and disk switching

(2) Efficient fault tolerance based on lineages

(3) Task will retry a certain number of times if it fails

(4) Stage automatically retries a certain number of times if it fails, and only failed fragments will be calculated.

(5)Checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
(5) Checkpoint [every RDD operation will generate a new RDD, if the chain is long, the calculation is heavy, put the data in the hard disk] and persist [data multiplexing in memory or disk](checkpoint, persistence)

(6)数据调度弹性:DAG Task 和资源管理无关
(6) Data scheduling flexibility: DAG Task is independent of resource management

(7) Highly elastic reparations of data fragmentation

1.10.5 Spark的转换算子(8个)
1.10.5 Conversion operators of Spark (8)
























1.10.6 Spark的行动算子(5个)
1.10.6 Spark's Action Operators (5)








1.10.7 map和mapPartitions区别

map: one piece of data at a time

(2) mapPartitions: process data one partition at a time

1.10.8 Repartition和Coalesce区别

1) Relationship:

两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalescenumPartitions, shuffle = true)。
Both are used to change the number of partitions in the RDD, and the underlying call to repartition is the coalesce method: coalesce (numPartitions, shuffle = true).

2) Difference:

Shuffle must occur in partition, coalesce determines whether Shuffle occurs based on the parameters passed in.

In general, use repartition to increase the number of partitions in rdd, and use coalesce to decrease the number of partitions.

1.10.9 reduceByKey与groupByKey的区别

reduceByKey: Has pre-aggregation operations.

groupByKey: No preaggregation.

On the premise of not affecting business logic, reduceByKey is preferred.

1.10.10 Spark中的血缘
1.10.10 Bloodlines in Spark

Wide dependence and narrow dependence. Shuffle is wide dependence.

1.10.11 Spark任务的划分
1.10.11 Division of Spark Tasks

(1) Application: Initialize a SparkContext to generate an Application;

(2) Job: An Action operator generates a Job;

(3) Stage: Stage equals the number of wide dependencies plus 1;

(4) Task: In a Stage, the number of partitions in the last RDD is the number of tasks.

1.10.12 SparkSQL中RDD、DataFrame、DataSet三者的转换及区别
1.10.12 Conversion and Difference of RDD, DataFrame and DataSet in SparkSQL

Difference between DataFrame and DataSet: The former is row type

Difference between RDD and DataFrame and DataSet: The former has no field and table information

1.10.13 Hive on SparkSpark on Hive区别


execution engine



Hive on Spark





Spark on Hive

(Spark SQL )


df ds

Spark SQL

Defective (authority management, metadata management)

Built-in Hive


External Hive


1.10.14 Spark内核源码(重点)
1.10.14 Spark kernel source code (focus)

1) Submission process (focus)

2) Shuffle process (emphasis)

SortShuffle: Reduced small files.

The middle disk should be a local disk

生成的文件数 = Task数量*2
Number of files generated = Number of Tasks *2

Before overwriting the disk, sort by key, and the sorted data will be written to the disk file in batches. The default batch is 10000 pieces, and the data is written to disk files in batches of 10000 pieces. Writing to disk files occurs through buffer overwrites, each of which produces a disk file, meaning that a Task process produces multiple temporary files. Finally, in each Task, all temporary files are merged. This is the merge process, which reads all temporary files and writes them to the final file at once.

(4) bypassShuffle: reduces small files, does not sort, high efficiency. Use in scenarios where sorting is not required.

1.10.15 Spark统一内存模型
1.10.15 Spark Unified Memory Model

1) The heap memory structure of unified memory management is shown in the following figure

2) The dynamic occupancy mechanism of unified memory management is shown in the following figure

1.10.16 Spark为什么比MR快?
1.10.16 Why is Spark faster than MR?

1) Memory & Hard Disk

(1) MR frequently writes intermediate results to disk in the overflow phase in the Map phase, and pulls data from the disk in the Reduce phase. Frequent disk I/O consumes a lot of time.

(2) Spark does not need to write the intermediate results of the computation to disk. This is thanks to Spark's RDD, which allows each RDD partition to process its own intermediate results. This advantage is even more pronounced when it comes to iterative calculations.

2)Spark DAG任务划分减少了不必要的Shuffle
2) Spark DAG task division reduces unnecessary Shuffle

(1) For MR, the result of each job will be landed on disk. Subsequent jobs that depend on the results of the secondary job will read data from the disk and perform calculations.

(2) For Spark, the result of each job can be saved to memory for subsequent jobs. Combined with Spark's caching mechanism, unnecessary Shuffle is greatly reduced.

3) Resource application granularity: Process & Thread

The cost of starting and scheduling a process is generally greater than the cost of threads.

(1) MR tasks run in the Yarn cluster as a process. N MapTasks require N processes

(2) Spark tasks run in the process as threads. N MapTasks require N threads.

1.10.17 Spark Shuffle和Hadoop Shuffle区别?

(1) Hadoop does not need to wait for all MapTasks to finish to open ReduceTask; Spark must wait until all the parent stages are completed before it can go to Fetch data.

(2) Hadoop's Shuffle must be sorted, so whether it is the output of Map or Reduce, it is ordered within the partition, and Spark does not require this.

1.10.18 Spark提交作业参数(重点)
1.10.18 Spark Submission Job Parameters (Important)


1) Several important parameters when submitting a task

executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
executor-cores - the number of cores used by each executor, the default is 1, the official recommendation is 2-5, and our enterprise is 4

num-executors —— 启动executors的数量,默认为2
num-executors — the number of executors started, default is 2

executor-memory —— executor内存大小,默认1G
executor-memory -- executor Memory size, default 1G

driver-cores —— driver使用内核数,默认为1
driver-cores - the number of cores used by driver, which is 1 by default

driver-memory —— driver内存大小,默认512M
driver-memory - the size of the driver's memory, which is 512 MB by default

2) Edge gives a style for submitting tasks

spark-submit \

--master local[5] \

--driver-cores 2 \

--driver-memory 8g \

--executor-cores 4 \

--num-executors 10 \

--executor-memory 8g \

--class PackageName.ClassName XXXX.jar \

--name "Spark Job Name" \

InputPath \


1.10.19 Spark任务使用什么进行提交,JavaEE界面还是脚本
1.10.19 What is the submission of a Spark task, whether it is a JavaEE interface or a script

Shell script. Dolphin Scheduler can submit Spark tasks via a page.

1.10.20 请列举会引起Shuffle过程的Spark算子,并简述功能。
1.10.20 List the Spark operators that give rise to Shuffle processes and describe their functions briefly.




1.10.21 Spark操作数据库时,如何减少Spark运行中的数据库连接数?
1.10.21 How do I reduce the number of database connections Spark has running when I'm working with a database?

Use foreachPartition instead of foreach to get the database connection inside foreachPartition.

1.10.22 Spark数据倾斜
1.10.22 Spark data tilt

详见Hive on Spark数据倾斜讲解。
See Hive on Spark Data Tilt for more information.

1.10.23 Spark3.0新特性

Dynamic optimization: adaptive query execution, dynamic partition clipping

Dynamically set reduce quantity based on calculation

Dynamically pick the right HashJoin, BroadCastJon or MergeJoin based on the number of tables

1.12 Flink

1.12.1 Flink基础架构组成?
1.12.1 Flink Infrastructure Composition?

Flink program has three roles: TaskManager, JobManager and Client.

JobManager是集群的老大,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理TaskManager。 包含:DispatcherResourceManagerJobMaster
JobManager is the boss of the cluster, responsible for receiving Flink jobs, coordinating checkpoints, failover recovery, etc., while managing TaskManager. Includes: Dispatcher, ResourceManager, JobMaster.

TaskManagers are nodes that perform calculations, and each TaskManager is responsible for managing resource information such as memory, disk, and network on its node. Internal partition slot isolation memory, not CPU isolation. Subtasks of different operators of the same slot sharing group can share slots.

Client是Flink程序提交的客户端,将Flink Job提交给JobManager。
Client is the client submitted by Flink program, submitting Flink Job to JobManager.

1.12.2Flink和Spark Streaming的区别


Spark Streaming

calculation model



time semantic


No, processing time




Multiple, flexible

少、不灵活(窗口长度必须是 批次的整数倍)
Less, less flexible (window length must be an integer multiple of batch)


asynchronous boundary snapshot




streaming sql


1.12.3 Flink提交作业流程及核心概念
1.123 Flink Submission Workflow and Core Concepts


2)算子链路:Operator Chain
2) Operator Chain

Flink automatically optimizes, requiring One-to-one parallelism.

The code disableOperatorChaining() disables operator chaining.

3) Graph generation and transmission

Where is it generated?

To whom?

did something




The original DAG graph




operator link optimization




Refinement of parallelism

physical flow diagram


Subtask: A parallel instance of the operator.

Task: When a Subtask is running, it is called a Task.

5) Relationship between parallelism and Slot

Slot is a static concept, referring to the concurrent execution capability of TaskMangaer.

Parallelism is a dynamic concept that refers to the concurrency power actually used by a program at runtime.

Setting the appropriate parallelism can improve the computing efficiency, too much or too little is not appropriate.

6) Slot sharing group Know how to own slot

Default shared group default, tasks of the same shared group can share slots.

Set up sharing groups via slotSharingGroup().

1.12.4 Flink的部署模式及区别?
1.124 Flink deployment patterns and differences?

1) Local: Local mode, Flink jobs run in a single JVM process, suitable for the test phase

Standalone: Flink jobs run on a dedicated Flink cluster, independent of other cluster managers (Yarn or Kubernetes)


Per-job: Exclusive resources, code resolution in Client

Application: Exclusive resources, code analysis in JobMaster

Session: Shared resources, a cluster of multiple jobs

4) K8s: Support Cloud Native, Future Trends

5) Mesos: used abroad, for information only

1.12.5 Flink任务的并行度优先级设置?资源一般如何配置?
1.12.5 Parallel Priority Setting for Flink Tasks? How are resources generally allocated?

设置并行度有多种方式,优先级:算子 > 全局Env > 提交命令行 > 配置文件
There are many ways to set parallelism, priority: Operator> Global Env > Submit Command Line> Configuration File

1) Parallelism is set according to the task:

(1) Regular tasks: Source, Transform, Sink operators are consistent with Kafka partition

(2) Compute large tasks: Source, Sink operator consistent with Kafka partition, Transform operator can be set to 2 n, 64, 128…

2)资源设置:通用经验 1CU = 1CPU + 4G内存
2) Resource settings: General experience 1CU = 1CPU + 4G memory

Slot number of Taskmanager: 1 drag 1 (exclusive resources), 1 drag N (save resources, reduce network transmission)

TaskManager memory: 4~8 gigabytes

CPU of TaskManager: Flink assigns one CPU to one slot by default


CPU of JobManager: default is 1

3) Sufficient resources:

Resource settings, and then pressure test to see if each parallelism processing upper limit will appear back pressure

For example: each parallelism processing 5000/s, start to appear back pressure, for example, we set three parallelism, our program processing upper limit of 15000/s

1.12.6 Flink的三种时间语义
1.12.6 Three temporal semantics of Flink

事件时间Event Time:是事件创建的时间。数据本身携带的时间。
Event Time: The time at which the event was created. The data itself carries the time.

进入时间Ingestion Time:是数据进入Flink的时间。
Ingestion Time: is the time at which data enters Flink.

处理时间Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
Processing Time: is the local system time for each operator performing time-based operations, machine-dependent, and the default time attribute is Processing Time.

1.12.7 你对Watermark的认识
1.12.7 What you know about Watermark

Watermark is the core mechanism to ensure the correctness of results in Flink stream processing. It often cooperates with windows to complete the correct processing of out-of-order data.

A watermark is a marker inserted into the data stream and can be thought of as a special piece of data

The main content of a water mark is a time stamp indicating the progress of the current event time

Watermarks are generated based on timestamps of data

The timestamp of the watermark must be monotonically increasing to ensure that the event time clock of the task advances all the way forward

Watermarks can be delayed to ensure that out-of-order data is handled correctly

一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据
Watermark(t), indicating that the event time in the current stream has reached the timestamp t, which means that all data before t have arrived, and no data with timestamp t '≤ t will appear in the stream after that.

1.12.8 Watermark多并行度下的传递、生成原理
1.12.8 Watermark Transmission and Generation Principle under Multi-parallelism

1) Classification:

Intermittent: one data, update Watermark once.

Periodicity: Watermark is updated periodically.

The official API is cycle-based, with a default of 200ms, because intermittent can stress the system.

2) Principle of formation:

Watermark = 当前最大事件时间 - 乱序时间 - 1ms
Watermark = current maximum event time-out-of-order time- 1ms

3) Transmission:

Watermark is a special piece of data with a timestamp inserted into the stream from the location specified by the code.

One to many: broadcast.

Many against one: take the smallest.

Many to many: split, in fact, is the combination of the above two.

1.12.9 Flink怎么处理乱序和迟到数据?
1.129 How does Flink handle out-of-order and late data?

在Apache Flink中,迟到时间(lateness)和乱序时间(out-of-orderness)是两个与处理时间和事件时间相关的概念。它们在流处理过程中,尤其是在处理不按事件时间排序的数据时非常重要。
In Apache Flink, lateness and out-of-order are two concepts related to processing time and event time. They are important in stream processing, especially when dealing with data that is not sorted by event time.

(1) lateness: lateness can affect the window, after the window calculation is completed, you can still receive late data

Latency is the delay time for an event to arrive at the stream processing system, i.e. the difference between the actual time of reception of the event and its event time. In some scenarios, events may arrive late due to network delays, system failures, etc. To handle these late events, Flink provides a mechanism that allows late data to be accepted even after the window computation is complete. When you set the lateness time, Flink waits a while after the window closes to receive and process these lateness events.

How to set the tardiness time is as follows:

When defining windows, use the `allowedLatency` method to set the lateness time. For example, set the delay time to 10 minutes:


DataStream<T> input = ...;


.keyBy(<key selector>)

.window(<window assigner>)


.<window function>;


(2) Out-of-order time

Out-of-order time affects the data intake by affecting the watermark, which indicates the degree of chaos of the data.

Out-of-order timing means that events arrive in the stream out of the order of event timing. In some scenarios, events may arrive out of order due to network latency or the nature of the data source.

Flink provides a way to handle out-of-order events, namely watermarks.

Watermarks are a mechanism to indicate the temporal progression of events, telling the system which event time is currently being processed.

When the watermark reaches a certain value, all events with timestamps less than that value have been processed.

To handle out-of-order events, you can set a fixed delay for the watermark.

The following is how to set the out-of-order time:

When defining a data source, use the `assignTimestampsAndWatermarks` method to set the watermark policy. For example, set the watermark delay to 5 seconds:


DataStream<T> input = env.addSource(<source>);





.withTimestampAssigner(<timestamp assigner>))

.<other operations>;


1.12.10 说说Flink中的窗口(分类、生命周期、触发、划分)
1.1210 Talk about windows in Flink (classification, lifecycle, trigger, partition)

1) Window classification:

Keyed Window和Non-keyed Window

Based on time: scrolling, swiping, conversation.

Based on quantity: scrolling, sliding.

2) The four important components of the Window:

assigner: How to assign elements to windows.

function: The calculation defined for the window. In fact, it is a calculation function that completes the calculation of the contents of the window.

trigger: Under what conditions triggers the calculation of the window.

You can use custom triggers to solve the event time, no data arrives, the window does not trigger the calculation problem, you can also use persistent triggers to achieve a window multiple trigger output results, see the connection in detail



evector: Defines removing data from a window.

3) window division: e.g., scrolling window based on event time

Start = 按照数据的事件时间向下取窗口长度的整数倍。
Start = integer multiple of window length down by event time of data.

end = start + size

For example, if you open a 10s scrolling window, the first data is 857s, then it belongs to [850s,860s).

4) Window creation: When the first element belonging to a window arrives, Flink will create a window and put it into the singleton collection.

5)窗口的销毁:时间进展 >= 窗口最大时间戳 + 窗口允许延迟时间
5) Destruction of window: time progression>= window maximum time stamp + window allowable delay time

(Flink guarantees that only time-based windows are deleted, and other types of windows, such as global windows, cannot be deleted.)

6)窗口为什么左闭右开:属于窗口的最大时间戳 = end - 1ms
6) Why the window is left closed and right open: the maximum timestamp belonging to the window = end - 1ms

7)窗口什么时候触发:如基于事件时间的窗口 watermark >= end - 1ms
7) When the window is triggered: such as window based on event time watermark >= end - 1ms

1.12.11 Flink的keyby怎么实现的分区?分区、分组的区别是什么?
1.1211 Flink keyby how to achieve partition? What is the difference between subdivisions and groupings?

分组和分区在 Flink 中具有不同的含义和作用:
Grouping and partitioning have different meanings and roles in Flink:

Partitioning: Partitioning is the division of a data stream into subsets that can be processed on different task instances for parallel processing of data.

数据具体去往哪个分区,是通过指定的 key 值先进行一次 hash 再进行一次 murmurHash,通过上述计算得到的值再与并行度进行相应的计算得到。
The partition to which the data goes is obtained by hashing the specified key value first and then murmurHashing it once, and calculating the value obtained by the above calculation and the parallelism accordingly.

分组:分组(Grouping)是将具有相同键值的数据元素归类到一起,以便进行后续操作(如聚合、窗口计算等)。key 值相同的数据将进入同一个分组中。
Grouping: Grouping is grouping data elements with the same key value together for subsequent operations (such as aggregation, window calculation, etc.). Data with the same key value will enter the same group.

注意:数据如果具有相同的 key 将一定去往同一个分组和分区,但是同一分区中的数据不一定属于同一组。
Note: Data with the same key must go to the same group and partition, but data in the same partition does not necessarily belong to the same group.

1.12.12 Flink的Interval Join的实现原理?Join不上的怎么办?
1.1212 Flink Interval Join implementation principle? What if I don't join?

底层调用的是keyby + connect ,处理逻辑:
The underlying call is keyby + connect, processing logic:

(1) Judge whether you are late (if you are late, don't deal with it, return directly)

(2) Each stream stores a state of Map type (key is timestamp, value is List stored data)

(3) Any stream, a piece of data comes, traverse the map state of the other party, and send it to the join method if it can match.

(4) When the timer is used, the data in the corresponding Map will be deleted (not clear, remove) if the valid time range is exceeded.

Interval join不会处理join不上的数据,如果需要没join上的数据,可以用 coGroup+join算子实现,或者直接使用flinksql里的left join或right join语法。
Interval join does not handle data that is not on join. If you need data that is not on join, you can use coGroup+join operator to implement it, or directly use left join or right join syntax in flinksql.

1.12.13 介绍一下Flink的状态编程、状态机制?
1.1213 Introduce Flink's state programming and state mechanism?

(1) Operator state: The scope of action is an operator, and multiple parallel instances of the operator maintain a state each.

(2) Keying state: maintain one state per group

(3)状态后端:两件事=》 本地状态存哪里、checkpoint存哪里
(3) status backend: two things ="where the local state is stored, where the checkpoint is stored

1.13 prior to version

Local Status Checkpoint

Memory TaskManager memory JobManager memory

File TaskManager Memory HDFS


1.13 After version

local state

Hashmap() TaskManager的内存

RocksDB RocksDB

Checkpoint存储 参数指定
Checkpoint storage parameter specification

(4) Difference between List and UnionList in operator state?

When the parallelism of operators is adjusted

List is consolidated into a large list and distributed to different parallel subtasks by polling

Union as a whole into a large list, distributed to different parallel subtasks

1.12.14 Flink如何实现端到端一致性?
1.1214 How does Flink achieve end-to-end consistency?

1.12.15 分布式异步快照原理
1.12.15 Principles of distributed asynchronous snapshots

Barriers are injected into parallel data streams at the data stream source. The position where the barriers of snapshot n are inserted (we call it Sn) is the largest position in the data source where the snapshot contains data.

例如,在Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。
For example, in Kafka, this location would be the offset of the last record in the partition. Report this location Sn to the checkpoint coordinator (Flink's JobManager).

Barriers then flow downstream. When an intermediate operator receives barriers for snapshot n from all its input streams, it emits barriers for snapshot n into all its output streams.

一旦Sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向Checkpoint协调器确认快照n完成。
Once the Sink operator (the end of the streaming DAG) receives barriers n from all its input streams, it acknowledges snapshot n completion to the Checkpoint coordinator.

After all Sinks confirm the snapshot, the snapshot is meant to be complete. Once snapshot n is complete, Job will never ask the data source for records prior to Sn, because these records (and their successors) will have passed through the entire data flow topology, i.e., have been processed.

1.12.16 Checkpoint的参数怎么设置的?
1.12.16 How are Checkpoint parameters set?

(1) Interval: give consideration to performance and delay, general task setting minutes (1~5min), low delay setting seconds

(2) Task restart strategy (Failover):

Fixed delay restart strategy: how many retries and how long between retries.

Failure rate Restart policy: retry times, retry interval, retry interval.

No restart strategy: generally used during development testing.

Fallback restart policy: Default fixed delay restart policy.

1.12.17 Barrier对齐和不对齐的区别
1.12.17 Difference Between Barrier Alignment and Misalignment

Precise one-time semantics: Default Barrier alignment, you can set Barrier misalignment

Barrier alignment: The Barrier that arrives first will wait for the barrier of the other parallelism, and the data will be cached first, waiting for the snapshot to be aligned.

Barrier misalignment: Arriving at Barrier first takes a snapshot of state and buffer data over all operators.

At least once semantics: only barrier alignment.

Barrier arrives first, data does not block, downstream calculations are sent, which may cause data to double calculate

1.12.18 Flink内存模型(重点)
1.12.18 Flink Memory Model (Key)

1.12.19 Flink常见的维表Join方案
1.12.19 Flink Common Dimension Table Join Scheme

(1)预加载:open()方法,查询维表,存储下来 ==》 定时查询
(1) preload: open() method, query dimension table, store ==》timed query

(2) Thermal storage: there are external systems Redis, HBase, etc.

(3) Broadcast Dimension Table

(4)Lookup Join:外部存储,connector创建,SQL用法
(4) LookupJoin: external storage, connector creation, SQL usage

1.12.20 Flink的上下文对象理解
1.12.20 Context Object Understanding for Flink

Context objects are often used to access information about job execution and data processing, help developers better control and understand job behavior, and allow access to relevant information during job execution for custom actions and optimizations

RuntimeContext: 在 Flink 任务中,每个并行任务都有一个与之相关联的 RuntimeContext 对象。这个对象提供了任务的上下文信息,例如任务的名称、索引、并行度等。可以使用对象来访问运行时的配置和状态信息并执行一些有状态操作。
RuntimeContext: In Flink tasks, each parallel task has a RuntimeContext object associated with it. This object provides contextual information about the task, such as its name, index, parallelism, and so on. You can use objects to access runtime configuration and state information and perform stateful operations.

FunctionContext: Flink 作业中使用函数(如 MapFunction 或 KeyedProcessFunction),则可以使用 FunctionContext 来访问有关函数的上下文信息。这包括有关当前数据流记录、定时器、状态等的信息。
FunctionContext: FunctionContext can be used to access context information about functions used in Flink jobs, such as MapFunction or KeyedProcessFunction. This includes information about the current data stream record, timers, status, etc.

CheckpointContext:允许访问检查点相关的信息,例如检查点的 ID 和状态。
CheckpointContext: Allows access to checkpoint-related information, such as the checkpoint ID and status.

1.12.21 Flink网络调优-缓冲消胀机制
1.12.21 Flink Network Tuning-Buffered Deinflation Mechanism

配置缓冲数据量的唯一方法是指定缓冲区的数量和大小。然而,因为每次部署的不同很难配置一组完美的参数。 Flink 1.14 新引入的缓冲消胀机制尝试通过自动调整缓冲数据量到一个合理值来解决这个问题。
The only way to configure the amount of buffered data is to specify the number and size of buffers. However, it is difficult to configure a perfect set of parameters because each deployment is different. Flink 1.14's newly introduced buffer deflation mechanism attempts to solve this problem by automatically adjusting the amount of buffered data to a reasonable value.

1.12.22 FlinkCDC锁表问题
1.12.22 FlinkCDC lock table problem

(1)FlinkCDC 1.x同步历史数据会锁表
(1) FlinkCDC 1.x synchronization history data lock table

Setting parameters is unlocked, but can only be guaranteed at least once.

(2)2.x 实现了无锁算法,同步历史数据的时候不会锁表
(2) 2x implements a lockless algorithm, and the table will not be locked when synchronizing historical data.

2.x can synchronize multiple parallel subtasks in full synchronization phase, and only single parallel subtask in incremental phase.

1.13 HBase

1.13.1 HBase存储结构
1.13.1 HBase Storage Structure

Architecture Roles:


实现类为HMaster,负责监控集群中所有的 RegionServer 实例。主要作用如下:
The implementation class is HMaster, which is responsible for monitoring all RegionServer instances in the cluster. The main functions are as follows:

(1) Manage the metadata table hbase:meta, receive the user's command to create, modify, and delete the form and execute it

(2) Monitor whether the region needs to be load balanced, failover and region splitting.

The above is achieved by starting multiple background thread monitoring:

(1) LoadBalancer

Periodically monitor whether the distribution of regions on the regionServer is balanced, and the hbase.balancer.period parameter controls the cycle time, which is 5 minutes by default.

(2) CatalogJanitor Metadata Manager

Regularly check and clean up the data in HBase:meta. The contents of the meta table are described in Advanced.

MasterProcWAL Master预写日志处理器
(3) MasterProcWAL Master write-ahead log processor

Record the tasks that the master needs to perform to the write-ahead log WAL, and let the backupMaster read the log if the master goes down.

2)Region Server

Region Server实现类为HRegionServer,主要作用如下:
The implementation class of the Region Server is HRegionServer, which functions as follows:

(1) Responsible for the processing of data cells, such as writing data put, querying data get, etc

(2) The actual executor of the split and merged region is monitored by the master and executed by the regionServer.


HBase uses ZooKeeper to perform high availability of the master, record the deployment information of the RegionServer, and store the location information of meta tables.

HBase对于数据的读写操作时直接访问Zookeeper的,在2.3版本推出Master Registry模式,客户端可以直接访问Master。使用此功能,会加大对Master的压力,减轻对Zookeeper的压力。
HBase allows clients to directly access Zookeeper during data read and write operations, and the Master Registry mode is introduced in version 2.3, allowing clients to directly access the Master. Using this feature will increase the pressure on the Master and reduce the pressure on Zookeeper.


HDFS provides the ultimate underlying data storage service for HBase and provides highly fault-tolerant support for HBase.

1.13.2 HBase的写流程
1.13.2 HBase Write Process

Writing process:

The order in which the writing process is written, just like the order in which the APIs are written, first creates a heavyweight connection to HBase

(1) Read the Meta table information in the local cache; (The first start of the client is empty)

(2) Initiate a request to ZK to read the location of the Meta table;

(3) ZK returns the location of the Meta table normally;

(4) Initiate a request to the RegionServer where the Meta table is located to read the Meta table information;

(5) Read the Meta table information and cache it locally;

(6) Initiate a write data request to the table to be written;

(7) Write WAL first, then write MemStore, and return the successful data writing to the client.

1.13.3 HBase的读流程
1.13.3 Read flow of HBase

Create a connection and write process.

(1) Read the Meta table information in the local cache; (The first start of the client is empty)

(2) Initiate a request to ZK to read the location of the Meta table;

(3) ZK returns the location of the Meta table normally;

(4) Initiate a request to the RegionServer where the Meta table is located to read the Meta table information;

(5) Read the Meta table information and cache it locally;


Build a scanner for both MemStore and StoreFile,

MemStore: normal read


Determine the files to be read based on the index;

Then determine the read file according to BlockCache;

(7) Merge the data read from multiple locations, return the data of the maximum version to the user, and if the data of the maximum version is a deletion mark, no data will be returned without returning.

1.13.4 HBase的合并
1.13.4 Mergers of HBases

Compaction分为两种,分别是Minor CompactionMajor Compaction

1.13.5 RowKey设计原则
1.13.5 RowKey Design Principles

(1) Rowkey length principle

(2) Rowkey hashing principle

(3) Rowkey sole principle

1.13.6 RowKey如何设计
1.13.6 How to design a RowKey

1) Usage Scenarios:

A large amount of user information is stored in HBase.

2) Hot Issues:

Since user IDs are sequential, it is likely that all user information will be centralized in the same region after importing user data in batches. If user information is frequently accessed, nodes in the region are likely to become hotspots.

3)期望: 通过对Rowkey的设计,使用户数据能够分散到多个region中。
3) Expectation: Through the design of Rowkey, user data can be distributed across multiple regions.

4) Steps:

(1) Pre-partition

from the command

create 'GMALL:DIM_USER_INFO','INFO',SPLITS=>['20','40','60','80']

把用户信息表(GMALL:DIM_USER_INFO) 分为5个region : [00-20), [20-40), [40-60), [60-80), [80-99]

(2) Invert ID when writing

Fill the user ID with 10 left zeros (according to the maximum number of users) and reverse the order.

比如:用户id为1457,反转处理后变为7541000000; 根据前两位分到region [60-80),
For example: the user id is 1457, and after inversion processing, it becomes 754100000; according to the first two bits, it is divided into region [60-80),

用户id为1459,反转处理后变为9541000000;根据前两位分到 region [80-99]
The user id is 1459, which becomes 954100000 after inversion processing; it is divided into region [80-99] according to the first two bits.

After such consecutive user IDs are reversed, since Rowkey starts are not consecutive, they will enter different regions.

最终达到的效果可以通过Web UI进行观察:
The final result can be observed through the Web UI:

As shown above, user data is scattered across multiple partitions.

Note: When the user queries, the ID is also inverted as needed to query.

1.13.7 HBase二级索引原理
1.13.7 HBase secondary indexing principle

1) Principle

Coprocessor: Assists in processing data by writing an index entry to the index table after writing data to the original table.

2) Types and usage

(1)全局 读多写少
(1) Read more than write less globally

A separate table is created specifically to store indexes, which are smaller and faster to read than the original table. However, write operations write data from two tables across Regions, requiring multiple joins.

(2)本地 写多读少
(2) More local writing and less local reading

The index data and the original table are put together (Region), which adds up to a larger amount of data than the original table, and the reading is relatively slow, but because it is in a Region, the same connection is used to write two pieces of data.

1.14 Clickhouse

1.14.1 Clickhouse的优势

快:提供了丰富的表引擎,每个表引擎 都做了尽可能的优化。
Fast: Rich table engines are provided, each optimized as much as possible.

Why fast?

(1) vectorization

(2) Formulas

(3)尽可能使用本节点的 内存+cpu,不依赖其他组件,比如Hadoop
(3) Use the memory +CPU of this node as much as possible, and do not rely on other components, such as Hadoop.

(4) provide sql language

(5) Support for custom functions

(6) Rich table engines are provided, and the engines are optimized.

1.14.2 Clickhouse的引擎



(3)MergeTree: replacingmergetree、summingmergetree


(4)集成引擎: 外部系统映射,如MySQL
Integration engine: external system mapping, such as MySQL

1.14.3 Flink写入Clickhouse怎么保证一致性?
1.14.3 Flink writes Clickhouse How to guarantee consistency?

Clickhouse has no transactions, Flink writes are at least once semantic.

Clickhouse's ReplacingMergeTree engine deduplicates based on the primary key, but can only guarantee final consistency. Adding the final keyword to the query ensures consistency of the query results.

1.14.4 Clickhouse存储多少数据?几张表?
1.14.4 How much data does Clickhouse store? How many forms?

More than 10 wide tables, an average of 10 G per day, stored for one year.

需要磁盘 10G * 365* 2副本/0.7 = 约11T
Disk required 10G * 365 days * 2 copies/0.7 = approx. 11T

1.14.5 Clickhouse使用本地表还是分布式表
1.14.5 Clickhouse Using Local or Distributed Tables

1) Local table we use, 2 copies

2) Problems with distributed table writing:

If you have a 2-sharded cluster, insert distributed tables using clickhouse.

(1) Resource consumption problem: Writing data to the temporary directory of fragment 2 will cause write amplification, which will consume a lot of CPU and disk resources of fragment node 1.

(2) Data accuracy and consistency issues: When writing to shard 2, irregularities in node 1 or node 2 will cause data problems. (Node 1 is suspended and data is lost, node 2 is suspended, or node 1 is deleted from the table. There will be unlimited retry, occupying resources).

(3)part过多问题:每个节点每秒收到一个Insert Query,N个节点,分发N-1次,一共就是每秒生成Nx(N-1)个part目录。集群shard数越多,分发产生的小文件也会越多(如果写本地表就会相对集中些),最后会导致写入到MergeTree的Part的数会特别多,最后会拖垮整个文件的系统。
(3) Too many parts problem: Each node receives an Insert Query per second, N nodes are distributed N-1 times, and a total of Nx (N-1) part directories are generated per second. The more shards a cluster has, the more small files it will distribute (and the more concentrated it will be if it is written on the surface), which will eventually lead to a particularly large number of Parts written to MergeTree, which will eventually bring down the entire file system.

1.14.6 Clickhouse的物化视图
1.14.6 Materialized Views of Clickhouse

A persistence of query results that records query statements and corresponding query results.

Advantages: query speed is fast, if the materialized view these rules are all written, it is much faster than the original data query, the total number of rows is less, because all are expected to calculate well.

Disadvantages: Its essence is a use scenario of streaming data, which is an accumulative technology, so it is not easy to use historical data for analysis such as de-duplication and de-core in materialized view. The use in some scenarios is also limited. Moreover, if a table is added with many materialized views, when writing this table, it will consume a lot of machine resources, such as full data bandwidth and a lot of storage.

1.14.7 Clickhouse的优化

1) Memory optimization

max_memory_usage: 单个查询的内存上限,128G内存的服务器==》 设为100G
max_memory_usage: Maximum memory limit for a single query, 128 GB memory server ==> Set to 100 GB

max_bytes_before_external_group_by:设为 一半,50G



max_concurrent_queries: 默认 100/s ===> 300/s

3) Storage

SSD is faster

4) Materialized View

5)写入时攒批,避免写入过快导致 too many parts
5) Save batches when writing, avoid writing too fast and cause too many parts

1.14.8 Clickhouse的新特性Projection

Projection 意指一组列的组合,可以按照与原表不同的排序存储,并且支持聚合函数的查询。ClickHouse Projection 可以看做是一种更加智能的物化视图,它有如下特点:
Projection means a combination of columns that can be stored in a different order than the original table and supports queries for aggregate functions. ClickHouse Projection can be seen as a more intelligent materialized view, which has the following characteristics:


相比普通物化视图是一张独立的表,Projection 物化的数据就保存在原表的分区目录中,支持明细数据的普通Projection 和 预聚合Projection。
Compared with ordinary materialized view, which is an independent table, the materialized data of Projection is saved in the partition directory of the original table. It supports ordinary Projection and pre-aggregation Projection of detailed data.

2) No sense use, automatic hit

可以对一张 MergeTree 创建多个 Projection ,当执行 Select 语句的时候,能根据查询范围,自动匹配最优的 Projection 提供查询加速。如果没有命中 Projection , 就直接查询底表。
Multiple projections can be created for a MergeTree, and when executing Select statements, the optimal Projection can be automatically matched according to the query scope to provide query acceleration. If you don't hit the Projection , query the bottom table directly.

3) Data homology, life and death together

Because the materialized data is stored in the partition of the original table, the update and merge of the data are homologous, and there will be no inconsistency.

1.14.9 Cilckhouse的索引、底层存储
1.14.9 Indexing, underlying storage for Cilckhouse

1) Index

(1)一级索引:稀疏索引(主键索引) 粒度8192
(1) Level 1 index: sparse index (primary index) granularity 8192

2)二级索引:跳数索引 minmax、set、bloom_filter
(2) Secondary index: hop index minmax, set, bloom_filter, etc.

2) Underlying storage

Clickhouse default data directory is in/var/lib/clickhouse/data directory. All databases create a subfolder in this directory. The following figure shows Clickhouse's organization of data files.



partition directory, generated by partition +LSM



Table partition storage location after unloading through DETACH statement


text file

Plain text, format in which records are stored

分区目录命名 = 分区ID_最小数据块编号_最大数据块编号_层级构成。数据块编号从1开始自增,新创建的数据块最大和最小编号相同,当发生合并时会将其修改为合并的数据块编号。同时每次合并都会将层级增加1。
Partition Directory Name = Partition ID_Smallest Block Number_Largest Block Number_Hierarchy Composition. Block numbers increment from 1, the maximum and minimum block numbers for newly created blocks are the same, and when a merge occurs, it is modified to the merged block number. Each merge also increases the level by 1.

1.15 Doris

1.15.1 Doris中的三种模型及对比
1.15.1 The three models in Doris and how they compare?

- Aggregate 将数据分为key和value,进行聚合
- Aggregate divides data into key and value for aggregation

- Uniq 数据添加主键
- Uniq Data Add Primary Key

- Duplicate 明细数据
- Duplicate detail data

1.15.2 Doris的分区分桶怎么理解怎么划分字段
1.15.2 How to understand the partition and bucket of Doris and how to divide the fields

Doris支持两层的数据划分。第一层是 Partition,支持 Range和List的划分方式。第二层是 Bucket(Tablet),仅支持Hash的划分方式。也可以仅使用一层分区。使用一层分区时,只支持Bucket划分。
Doris supports two-tier data partitioning. The first layer is Partition, which supports Range and List partitioning. The second layer is Bucket (Tablet), which only supports Hash partition. It is also possible to use only one level of partitioning. When using one-level partitioning, only Bucket partitioning is supported.

1.15.3 生产中节点多少个,FE,BE 那个对于CPU和内存的消耗大
1.15.3 How many nodes are in production, FE, BE, which consumes a lot of CPU and memory

Independent deployment, starting with 5 sets, BE consumption is greater

1.15.4 Doris使用过程中遇到过哪些问题?
1.15.4 What problems have you encountered with Doris?

1) Large data volume and insufficient resources: increase the number of machines and increase concurrency

2) Doirs lock table problem

1.15.5 Doris跨库查询,关联MySQL有使用过吗
1.15.5 Doris cross-database query, associated MySQL have you ever used


1.15.6 Dorisroll up物化视图区别
1.15.6 Difference between roll up and materialized view of Doris

Roll up可以理解为物化视图的过度版本,目前Doris物化视图覆盖roll up功能
Roll up can be understood as an excessive version of the materialized view. Currently, Doris materialized view overrides roll up function.

1.15.7 Doris的前缀索引
1.15.7 Prefix index of Doris

Doris 不支持在任意列上创建索引,而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式
Doris does not support creating indexes on arbitrary columns, but prefix indexes, that is, indexes implemented on the basis of sorting to quickly query data according to a given prefix column.

例如:将一行数据的前 36 个字节 作为这行数据的前缀索引。当遇到 VARCHAR 类型时,前缀索引会直接截断。
For example, the first 36 bytes of a row of data are used as the prefix index of this row of data. When VARCHAR types are encountered, prefix indexes are truncated directly.

表结构的前缀索:user_id(8Byte) + age(4Bytes) + message(prefix 20 Bytes)

1.16 可视化报表工具
1.16 Visual Reporting Tools

Open source: Echarts (Baidu), Kibana, Superset (general function)

Charges: Tableau (powerful), QuickBI (Aliyun facing real-time), DataV (Aliyun facing real-time), Suga (Baidu real-time)

1.17 JavaSE

1.17.1 并发编程
1.17.1 Concurrent programming

1) What is Multithreading & Advantages of Multithreading

Multithreading refers to a program that contains multiple execution streams, that is, a program can run multiple different threads at the same time to perform different tasks.

Advantages: CPU utilization can be improved. In multithreading, when a thread has to wait, the cpu can run other threads instead of waiting, which greatly improves the efficiency of the program.

2)Java 3种常见创建多线程的方式
Java 3 Common Ways to Create Multithreading

(1) Inherit Thread class, repeat run() method

(2) Implement Runnable interface, rewrite run() method

(3) By creating a thread pool

1.17.2 如何创建线程池
1.17.2 How to create a thread pool

Executors provide thread factory methods for creating thread pools, and the returned thread pools implement the ExecutorServer interface.


Although Java's factory method is very convenient, but there are drawbacks,"Alibaba Java Development Manual" forced thread pool is not allowed to use the above method to create, but through the ThreadPoolExecutor way, this processing can be more clear thread pool running rules, avoid the risk of resource depletion, just for understanding.

1.17.3 ThreadPoolExecutor构造函数参数解析
ThreadPoolExecutor constructor parameter parsing

(1)corePoolSize 创建线程池的线程数量
(1) corePoolSize Number of threads creating the thread pool

(2)maximumPoolSize 线程池的最大线程数
(2) maximumPoolSize Maximum number of threads in the thread pool

(3)keepAliveTime 当线程数量大于corePoolSize ,空闲的线程当空闲时间超过keepAliveTime时就会回收;
(3) keepAliveTime When the number of threads is greater than corePoolSize, idle threads will be recycled when the idle time exceeds keepAliveTime;

(4)unit { keepAliveTime} 时间单位

(5)workQueue 保留任务的队列
(5) workQueue holds the queue of tasks

1.17.4 线程的生命周期
1.17.4 Life cycle of threads

创建 运行 阻塞 等待 死亡
Create Run Block Wait for Death

1.17.5 notify和notifyall区别

notify selects a thread waiting on the object monitor and wakes it up

notifyAll 方法用于唤醒等待在对象监视器上的所有线程
The notifyAll method is used to wake up all threads waiting on the object monitor

1.17.6 集合
1.17.6 Collections

1) Difference between List and Set

The former is an array at the bottom, ordered can be repeated, there are indexes, find fast, delete add slow

The latter is HashMap at the bottom, disordered and not repeated, slow search, fast insertion and deletion


ArrayList based on dynamic data implementation, query fast

LinkedList is based on linked list implementation, adding and deleting faster, does not support efficient access

Both are thread-unsafe.

1.17.7 列举线程安全的Map集合
1.17.7 Listing Thread-Safe Map Collections


1.17.8 StringBuffer和StringBuilder的区别

Most methods in StringBuffer are decorated with the synchronized keyword, which is thread-safe and inefficient.

StringBuilder is thread-unsafe and efficient.

1.17.9 HashMap和HashTable的区别

HashMap is thread-unsafe and efficient, HashTable is thread-safe and inefficient.

1.17.10 HashMap的底层原理
1.17.10 The underlying principle of HashMap

1) HashMap implementation principle

HashMap is actually a combination of array and linked list. HashMap is based on Hash algorithm.

(1) When we put an element into a HashMap, we recalculate the subscript of the element of the current object in the array using the hashCode of the key.

(2) When writing, if there is a key with the same Hash value, the classification is made at this time. If the key is the same, the original value is overwritten; if the key is different, the value is placed in the linked list.

(3) When reading, directly find the index corresponding to the hash value, and further determine whether the key is the same, and then find the corresponding value.

What are the differences between HashMap in JDK 1.7 and JDK 1.8?

JDK1.7:数组 + 链表
JDK 1.7: Array + List

JDK1.8: Array + Red-Black Tree

3) HashMap Put method specific process

4) HashMap expansion

When the key-value pairs in the HashMap are greater than the threshold or initialized, resize() is called to expand.

Every time it expands, it expands twice.

1.17.11 项目中使用过的设计模式
1.17.11 Design patterns used in projects

(1) Single-instance pattern: Ensure that there is only one instance of a class, thread pool in real-time projects

(2) template mode: an abstract class publicly defines its execution mode/template, the evaluator of the evaluation platform, the real-time warehouse dws layer association dimension table

1.18 MySQL

1.18.1 SQL执行顺序
1.18.1 SQL Execution Order

From、Where、Group By 、Having、Select、Order By、Limit


清空表数据 删除表 删除特定部分数据
Empty table data Delete table Delete specific part of data

1.18.3 MyISAM与InnoDB的区别











Table lock, even if the operation of a record will lock the entire table, not suitable for highly concurrent operations

Line lock, lock only one line during operation, no impact on other lines, suitable for highly concurrent operations


Caching index only, not real data

Caching not only indexes but also real data requires high memory requirements, and memory size has a decisive impact on performance.

1.18.4 MySQL四种索引
1.18.4 MySQL Four Indexes

1) Unique index

A primary key index is unique, usually with the ID of the table set as the primary key index, a table can only have one primary key index, which is the difference between it and a unique index.

2) Cluster index

聚簇索引的叶子节点都包含主键值、事务 ID、用于事务 MVCC 的回滚指针以及所有的剩余列
The leaf nodes of the clustered index all contain the primary key value, the transaction ID, the rollback pointer for the transaction MVCC, and all remaining columns.

3) Secondary indexes (non-clustered indexes|secondary indexes)

Secondary indexes are also called non-clustered indexes, secondary indexes, etc., and their leaf nodes store primary key values instead of row pointers. The advantage of such a strategy is that it reduces the maintenance of secondary indexes when rows move or data pages split.

4) Federated indexing

Two or more fields are combined to form a single index. When using it, you need to pay attention to satisfying the leftmost matching principle!

1.18.5 MySQL的事务
1.18.5 MySQL transactions

(1) Basic Elements of Transactions (ACID)

(2) The concurrency of transactions

Dirty read: Transaction A reads the data updated by transaction B, and then B rolls back the operation, then the data read by A is dirty data

不可重复读:事务 A 多次读取同一数据,事务 B 在事务A多次读取的过程中,对数据作了更新并提交,导致事务A多次读取同一数据时,结果 不一致
Non-rereadable: Transaction A reads the same data multiple times, and transaction B updates and commits the data during the multiple reads of transaction A, resulting in inconsistent results when transaction A reads the same data multiple times

Phantom reading: System administrator A changes the grades of all students in the database from specific scores to ABCDE grades, but system administrator B inserts a record of specific scores at this time, and when system administrator A finishes the change, he finds that there is still a record that has not been changed, as if he has hallucinated, which is called hallucination.

Summary: It is easy to confuse non-repeatable reads with phantom reads, with non-repeatable reads focusing on modifications and phantom reads focusing on adding or deleting. To solve the problem of non-repeatable read, you only need to lock the rows that meet the conditions, and to solve the phantom read, you need to lock the table

1.18.6 MySQL事务隔离级别
1.18.6 MySQL Transaction Isolation Level

Transaction isolation level


It cannot be read repeatedly






1.18.7 MyISAM与InnoDB对比

(1) The data file of InnoDB is an index file itself, while the index file of MyISAM is separated from the data file:

①InnoDB的表在磁盘上存储在以下文件中: .ibd(表结构、索引和数据都存在一起,MySQL5.7表结构放在.frm中)
(1) InnoDB tables are stored on disk in the following files: .ibd (the table structure, index, and data all exist together, and the MySQL 5.7 table structure is placed in .frm)

②MyISAM的表在磁盘上存储在以下文件中: *.sdi(描述表结构,MySQL5.7是.frm)、*.MYD(数据),*.MYI(索引)
MyISAM tables are stored on disk in the following files: *.sdi (describing table structure, MySQL 5.7 is.frm), *.MYD (data), *.MYI (index)

(2)InnoDB中主键索引是聚簇索引,叶子节点中存储完整的数据记录;其他索引是非聚簇索引,存储相应记录主键的值 。
(2) The primary key index in InnoDB is a clustered index, and the leaf node stores complete data records; other indexes are non-clustered indexes, and store the value of the primary key of the corresponding record.

(3)InnoDB要求表必须有主键 ( MyISAM可以没有 )。如果没有显式指定,则MySQL系统会自动选择一个可以非空且唯一标识数据记录的列作为主键。如果不存在这种列,则MySQL自动为InnoDB表生成一个隐含字段作为主键。
(3) InnoDB requires tables to have a primary key (MyISAM can have none). If not explicitly specified, MySQL automatically selects a column that can be non-null and uniquely identifies the data record as the primary key. If no such column exists, MySQL automatically generates an implied field as the primary key for the InnoDB table.

(4) In MyISAM, both primary key indexes and non-primary key indexes are non-clustered, and leaf nodes record the address of data.

(5) MyISAM table back operation is very fast, because it takes the address offset directly to the file to get data, in contrast, InnoDB is to get the primary key and then go to the cluster index to find records, although it is not slow, but still not as fast as directly using the address to access.

1.18.8 B树和B+树对比
1.18.8 Comparison of B trees and B+ trees

1)B+ 树和 B 树的差异
1) Differences between B+ trees and B trees

(1) A keyword that is not a leaf node in the B+ tree will also exist in the child node, and it is the maximum (or minimum) of all keywords in the child node.

(2) Non-leaf nodes in the B+ tree are only used for indexing, and data records are not saved. Information related to records is placed in leaf nodes. In B-trees, non-leaf nodes hold both indexes and data records.

(3) All keywords in B+ tree appear in leaf nodes, leaf nodes form an ordered linked list, and leaf nodes themselves are linked according to the size of keywords from small to large.

2) Why B+ trees have fewer IO times

真实环境中一个页存放的记录数量是非常大的(默认16KB),假设指针与键值忽略不计(或看做10个字节),数据占 1 kb 的空间:
The number of records stored in a page in the real environment is very large (default 16KB), assuming that pointers and key values are ignored (or regarded as 10 bytes), and the data occupies 1 kb of space:

If the B+ tree has only one layer, that is, only one node for storing user records, it can store up to 16 records.

如果B+树有2层,最多能存放1600×16 = 25600条记录。
If the B+ tree has two layers, it can store up to 1600×16 = 25600 records.

如果B+树有3层,最多能存放1600×1600×16 = 40960000条记录。
If the B+ tree has three layers, it can store up to 1600×1600×16 = 40960000 records.

If you store tens of millions of levels of data, you only need three layers.

The non-leaf nodes of the B+ tree do not store user records, only directory records. Compared to the B tree, each node can store more records, the height of the tree will be shorter, and the number of IO times will be less.

1.19 Redis

1.19.1 Redis缓存穿透、缓存雪崩、缓存击穿
1.19.1 Redis Cache Penetration, Cache Avalanche, Cache Breakdown

(1) Cache penetration refers to querying data that must not exist. Because the cache will query the database when it is not in the right place, if it cannot find the data, it will not be written to the cache, which will cause the non-existent data to be queried to the database every time, resulting in cache penetration.


1 is to cache empty objects and set a short expiration time for them, no more than 5 minutes.

② Bloom filter is used to hash all possible data into a bitmap large enough, and a certain non-existent data will be intercepted by this bitmap, thus avoiding the query pressure on the underlying storage system.

(2) If the cache fails over a period of time, a large number of cache penetrations occur, and all queries fall on the database, which will cause cache avalanche.

Solution: Try not to distribute the failure time points at the same time.

(3) Cache breakdown refers to a key that is very hot and constantly carrying large concurrency. When this key fails at the moment, the continuous large concurrency will break through the cache and directly request the database, just like cutting a hole in a barrier.

Solution: You can set the key to never expire.

1.19.2 Redis哨兵模式
1.19.2 Redis Sentinel Mode

(1) Automatic version of anti-guest master in master-slave replication. If the master is down, the sentinel will select one of the slaves as the master and set it as the master of other slaves, and if the original master is started again, it will also become a slave.

(2) Sentinel mode is a special mode, first Redis provides the sentinel command, sentinel is an independent process, as a process, it runs independently. The principle is that sentries monitor multiple running Redis instances by sending commands and waiting for Redis servers to respond.

(3) When the sentinel detects that the Redis host is down, it will automatically switch Slave to Master, and then notify other servers through publish subscription mode to modify the configuration file and let them change hosts.

(4)当一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此可以使用哨兵进行监控, 各个哨兵之间还会进行监控,这就形成了多哨兵模式。
(4) When a sentinel process monitors Redis servers, problems may occur. For this reason, sentinels can be used to monitor, and each sentinel can also monitor between sentinels. This forms a multi-sentinel pattern.

1.19.3 Redis数据类型
1.19.3 Redis data types

String 字符串
String String

List 可以重复的集合
List A repeatable collection

Set 不可以重复的集合
Set A collection that cannot be repeated

Hash 类似于Map<String,String>

Zser(sorted set) 分数的set

1.19.4 热数据通过什么样的方式导入Redis
1.19.4 How hot data is imported into Redis

Provide a simple way to implement cache invalidation: LRU (Left-Used Elimination).

即Redis的缓存每命中一次,就给命中的缓存增加一定TTL(过期时间)(根据具体情况来设定, 比如10分钟)。
That is, every time Redis cache hits, it adds a certain TTL (expiration time) to the hit cache (set according to the specific situation, such as 10 minutes).

After a period of time, the TTL of hot data will be larger and will not automatically fail, while the TTL of cold data basically exceeds the setting will immediately fail.

1.19.5 Redis的存储模式RDB,AOF
1.19.5 Redis storage modes RDB, AOF

Redis 默认开启RDB持久化方式,在指定的时间间隔内,执行指定次数的写操作,则将内存中的数据写入到磁盘中。
Redis enables RDB persistence mode by default, and writes the data in memory to disk after executing the specified number of write operations within the specified time interval.

RDB 持久化适合大规模的数据恢复但它的数据一致性和完整性较差。
RDB persistence is suitable for large-scale data recovery but has poor data consistency and integrity.

Redis 需要手动开启AOF持久化方式,默认是每秒将写操作日志追加到AOF文件中。
Redis needs to manually enable the AOF persistence mode. The default is to append the write operation log to the AOF file every second.

AOF 的数据完整性比RDB高,但记录内容多了,会影响数据恢复的效率。
AOF data integrity is higher than RDB, but the record content is too much, which will affect the efficiency of data recovery.

Redis 针对 AOF文件大的问题,提供重写的瘦身机制。
Redis provides a slimming mechanism for rewriting large AOF files.

若只打算用Redis 做缓存,可以关闭持久化。
If you only plan to use Redis as a cache, you can turn persistence off.

若打算使用Redis 的持久化。建议RDB和AOF都开启。其实RDB更适合做数据的备份,留一后手。AOF出问题了,还有RDB。
If you intend to use Redis persistence. It is recommended that both RDB and AOF be turned on. In fact, RDB is more suitable for data backup, leaving a hand behind. AOF has a problem, and RDB.

1.19.6 Redis存储的是k-v类型,为什么还会有Hash?
1.19.6 Redis stores k-v type, why is there a Hash?

Redis的hash数据结构是一个键值对(key-value)集合,他是一个String类型的field和value的映射表,Redis本身就是一个key-value 类型的数据库,因此Hash数据结构等于在原来的value上又套了一层key-vlaue型数据。所以Redis 的hash数据类型特别适合存储关系型对象。
Redis hash data structure is a key-value pair (key-value) collection, it is a String type of field and value mapping table, Redis itself is a key-value type database, so the Hash data structure is equivalent to the original value and a layer of key-vlue type data. So Redis hash data type is particularly suitable for storing relational objects.

1.20 JVM

Pay attention to Shangsi Valley Education Public Account and reply java.

2章 离线数仓项目
Chapter 2: The Last Day

2.1 提高自信
2.1 improve self-confidence


2.2 为什么做这个项目
2.2 Why do this project?

As the company grows, bosses need to understand its operations in detail. For example, daily activity, new addition, retention, conversion rate, etc. So the company decided to recruit big data talent to do this project, the purpose is to provide data support for the boss to make decisions.

2.3 数仓概念
2.3 concept of warehouse

What are the input data sources and output systems of a data warehouse?

(1) Input system: user behavior data generated by front-end buried points, business data generated by JavaEE background, and crawler data of individual companies.

(2) Output system: report system, user portrait system, recommendation system.

2.4 项目架构
2.4 project architecture

2.5 框架版本选型
2.5 Framework Version Selection

1) Apache: Operation and maintenance trouble, compatibility between components needs to be investigated by yourself. (General factory use, strong technical strength, professional operation and maintenance personnel).

2) CDH6.3.2: The most widely used version in China. CDH and HDP were combined and launched, CDP7.0. The fee is $10000 per node per year. (Not recommended)

3) HDP: open source, can be secondary development, but not CDH stability, less domestic use.

4) Cloud service selection


(2) Tencent Cloud EMR, Stream Computing Oceanus, Data Development Governance Platform WeData

(3) Huawei Cloud EMR

(4) Amazon Cloud EMR

Starlink International, Kingdee... Divine strategy, dream counting

The release time of major versions of each component of the Apache framework



Release time



Release time























































































1.2.0 (oldest)

















0.13.0 (oldest)












































( 1.3, 1.4, 1.5, 1.6)



( 2.1, 2.2, 2.3, 2.4)


*Note: The highlighted versions are commonly used in the company's actual production.

2.6 服务器选型
2.6 Server Selection

Is the server a physical machine or a cloud host?

1) Machine cost consideration:

(1) Physical machine: 128G memory, 20-core physical CPU, 40 threads, 40THDD and 80TSSD hard disk, a single unit is quoted at 4W, HP brand. Generally, the life span of a physical machine is about 5 years.

(2) Cloud hosts, taking Alibaba Cloud as an example, are almost the same configuration, 5W per year. HUAWEI CLOUD, Tencent Cloud, and e Cloud.

2) O&M cost considerations:

(1)物理机:需要有专业的运维人员(1万 * 13个月)、电费(商业用户)、安装空调、场地。
(1) Physical machine: professional operation and maintenance personnel (10,000 * 13 months), electricity fee (commercial users), installation of air conditioning, and site are required.

(2) Virtual machines: A lot of O&M work has been completed by Alibaba Cloud, and O&M is relatively easy.

3) Business selection

(1) Wealthy financial companies choose cloud products (Shanghai).

(2) Small and medium-sized companies, in order to raise funds and go public, choose cloud products, and buy physical machines after financing.

(3) Have a long-term plan, have sufficient funds, and choose a physical machine.

2.7 集群规模
2.7 Cluster size

1) Hard disk considerations

2) CPU considerations

20核物理CPU 40线程 * 8 = 320线程 (指标 100-200
20 cores physical CPU 40 threads * 8 = 320 threads (index 100-200)

3) Memory considerations

内存128g * 8= 1024g (计算任务内存800g,其他安装框架需要内存)
Memory 128g * 8 = 1024g (800g for computing tasks, memory required for other installation frameworks)

128m =512M内存

100g数据 、800g内存
100g data, 800g memory

4) Reference case description

根据数据规模大家集群(在企业,干了三年 通常服务器集群 5-20台之间)
According to the size of the data cluster (in the enterprise, usually between 5-20 server clusters after three years of work)

(1) Refer to Tencent Cloud EMR official recommended deployment

Master节点管理节点,保证集群的调度正常进行;主要部署NameNode、ResourceManager、HMaster 等进程;非 HA 模式下数量为1,HA 模式下数量为2
Master node: manages nodes to ensure normal scheduling of clusters; mainly deploys processes such as NameNode, ResourceManager, and HMaster; the number is 1 in non-HA mode and 2 in HA mode.

Core节点为计算及存储节点,我们在 HDFS 中的数据全部存储于 core 节点中,因此为了保证数据安全,扩容 core 节点后不允许缩容;主要部署 DataNode、NodeManager、RegionServer 等进程。非 HA 模式下数量≥2,HA 模式下数量≥3。
Core node: It is a computing and storage node. All data in HDFS is stored in the core node. Therefore, in order to ensure data security, capacity reduction is not allowed after expanding the core node. DataNode, NodeManager, RegionServer and other processes are mainly deployed.≥2 in non-HA mode and ≥3 in HA mode.

Common 节点为 HA 集群 Master 节点提供数据共享同步以及高可用容错服务;主要部署分布式协调器组件,如 ZooKeeper、JournalNode 等节点。非HA模式数量为0,HA 模式下数量≥3。
Common node: provides data sharing synchronization and high availability fault tolerance services for HA cluster Master nodes; mainly deploys distributed coordinator components, such as ZooKeeper, JournalNode and other nodes. 0 in non-HA mode and ≥3 in HA mode.

(2) Data transfer data is relatively close together (Kafka, clickhouse)

(3) The client should be placed on one or two servers as much as possible to facilitate external access

(4) If there are dependencies, try to put them on the same server (for example: Ds-worker and hive/spark, ClickHouse must be deployed separately)



















































2.8 人员配置参考
28 Staffing Reference

2.8.1 整体架构
2.8.1 Overall Architecture

大数据开发工程师 =》 大数据组组长 =》 项目经理=》部门经理=》技术总监CTO
Big Data Development Engineer =》 Big Data Team Leader =》 Project Manager =》Department Manager =》Technical Director CTO

=》 高级架构师 =》 资深架构师
=« Senior Architect =« Senior Architect

2.8.2 你的的职级等级及晋升规则
2.8.2 Your rank and promotion rules

Small company: rank is divided into junior, intermediate and senior. Promotion rules are not necessarily, depending on company efficiency and job vacancies.

Big companies have clear ranks:

2.8.3 人员配置参考
2.8.3 Staffing Reference

Small companies (1-3 people): 1 team leader, the rest of the team members have no clear division of labor, and may take care of Java EE and front end.

Small and medium-sized companies (about 3~6 people): 1 team leader, about 2 offline, about 1 real-time (offline is generally more than real-time), team leader and JavaEE, front end.

Medium-sized company (about 5~10 people): 1 team leader, about 3~5 offline (offline processing, warehouse), about 2 real-time people, team leader and technical bull and JavaEE, front end.

Medium and large companies (about 10~20 people): 1 person in the team leader, 5~10 people offline (offline processing, warehouse), about 5 people in real time, about 1 person JavaEE (responsible for connecting JavaEE business), 1 person in the front end (with or without a person responsible for the front end alone). (For medium and large companies with relatively good development, the big data department may have been divided into multiple big data groups, which are responsible for different businesses)

The above is only a reference configuration, because there are great differences between companies, for example, ofo big data department only has about 5 people, so determine a reasonable range according to the selected company size, this staffing must be considered clearly before the interview, and the answer should be very certain.

Our own company: big data group leader: 1 person; offline 3-4 people; real-time 1-3 people.

How many IOS? How many Android? How many people at the front? How many JavaEE? How many people are tested?

(IOS、安卓) 1-2个人 前端3个人; JavaEE一般是大数据的1-1.5倍,测试:有的有,1个左右,有的没有。 产品经理1个、产品助理1-2个,运营1-3个。
(IOS, Android) 1-2 personal front-end 3 people; JavaEE is generally 1-1.5 times that of big data, test: some have, 1 or so, some do not. 1 product manager, 1-2 product assistants, 1-3 operations.

Company Division:

0-50 小公司;50-500 中等;500-1000 大公司;1000以上 大厂 领军的存在。
0-50 Small companies;50-500 medium;500-1000 large companies; more than 1000 large companies lead the existence.

2.9 从0-1搭建项目,你需要做什么?
2.9 Building a project from 0-1, what do you need to do?

1) Questions to ask the project manager

(1)数据量(增量、全量): 100g
(1) Data volume (increment, total): 100g

(2)预算: 50万
(2) Budget: 500,000

(3)数据存储多久: 1年
(3) How long the data is stored: 1 year

(4)云主机、物理机: 云主机
(4) Cloud host, physical machine: cloud host

(5)日活: 100万
(5) Daily life: 1 million

(6)数据源: 用户行为数据(文件)、业务数据(MySQL)
(6) Data source: user behavior data (file), business data (MySQL)

(7)项目周期: 1个月-3个月
(7) Project cycle: 1 month-3 months

(8)团队多少人: 3-5
(8) How many people in the team: 3-5

(9)首批指标: 1-10个
(9) Initial indicators: 1-10

(10)未来的规划: 离线和实时 是否都要做
(10) Planning for the future: do you want to do both offline and real-time?

2) Project cycle (2 months)

(1)数据调研(2周) + 集群搭建
(1) Data survey (2 weeks)+ cluster building

(2) Clarify the data domain (2 days)

(3) Build Business Matrix (3 days)

(4)建模 至下而上 (2周)
(4) Modeling bottom-up (2 weeks)


(5)指标体系建设 至上而下 (2周)
(5) Index system construction from top to bottom (2 weeks)

(6)处理bug 1周
(6) Bug handling 1 week

2.10 数仓建模准备
2.10 Warehouse modeling preparation

1) Significance of data warehouse modeling

If we think of data as books in a library, we want to see them organized on shelves;

Reduce double counting.

Quickly query the data you need.

2) ER model

If the other person asks a triple normal form question. Initial judgment of the other party is a java programmer, do not chat with him in depth, mysql advanced, redis, multithreading, JVM, SSM and other frameworks.

The topic should be shifted to Big Data technology. Spark, flink, how to process massive data, dimensional modeling.

3) Dimensional modeling

星型模型:事实表周围一级维度 减少join => 大数据场景不适合频繁的join
Star model: One-level dimensionality reduction around fact tables join => Big data scenarios are not suitable for frequent joins

Snowflake model: multiple dimensions around fact tables

Constellations: Multiple fact sheets

4) Fact sheet

1) How do you know if a table is a fact table?

具有度量值的 可以累加的 个数、件数、金额、次数
Number, number, amount, and frequency of cumulative metrics

(2) Synchronization strategy

数据量大 =》 通常增量 特殊的,加购 (周期快照事实表)
large amount of data = usually incremental special additional purchase periodic snapshot fact table

(3) Classification

① Transaction type fact table

找原子操作。 例如:下单 加购 支付
Looking for atomic manipulation. For example: order plus purchase payment

① Select business process

② Statement granularity

③ Determine dimension

4. Determine the facts


连续性指标,不好找原子操作。 例如,库存(周期快照事实表)
Continuity index, difficult to find atomic operation. For example, inventory (periodic snapshot fact sheet)

多事实表关联。 例如,统计加购到支付的平均使用时长 (累积型快照事实表)
Multiple fact table associations. For example, count the average usage time from purchase to payment (cumulative snapshot fact table)

② Periodic snapshot fact table

③ Cumulative snapshot fact table

5) Dimension table

(1) How do you determine if a table is a dimension table?

没有度量值,都是描述信息。 身高 体重、年龄、性别
There are no metrics, just descriptive information. Height, weight, age, sex

(2) Synchronization strategy

数据量小 =》 通常 全量 特殊的 用户表
Small data ="usually full amount special user table

(3)维度整合 减少Join操作
(3) Dimension integration reduces Join operation

① Commodity table, commodity category table, SPU, commodity primary classification, secondary classification, tertiary classification = Commodity dimension table

②省份表、地区表 =》 地区维度表
② Province table, region table =》Region dimension table

③活动信息表、活动规则表 =》 活动维度表
Activity information table, activity rule table = Activity dimension table

(4) Zipper table

Zipper the user table.

缓慢变化维 场景
Slow-changing dimensional scene

6) What are modeling tools?


2.11 数仓建模
2.11 warehouse modeling

1) Data research

(1)先和Java人员要表,表中最好有字段的描述或者有表和字段的说明文档。(项目经理帮助协调) =》 快速熟悉表中业务。梳理清楚业务线,找到事实表和维度表。
(1) First and Java personnel to the table, the table preferably has a description of the field or a description of the table and field documents. (Project Manager helps coordinate)= Get familiar with the business in the table quickly. Sort through business lines and find fact tables and dimension tables.

(2)和业务人员聊 =》 验证你猜测的是否正确
(2) Chat with business people to verify whether your guess is correct

(3) Talk to the product manager

Demand: derived indicators, derived indicators

派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
Derivative indicator = atomic indicator (business process + metric + aggregation logic)+ statistical period + statistical granularity + business limit

The business processes in the requirements must correspond to the actual background business.

2) Clarify the data domain

(1) User domain: login, registration

(2) Flow domain: start, page, action, fault, exposure

(3) Transaction domain: purchase, order, payment, logistics, cancel order, cancel payment

(4) Tool domain: receive coupons, place orders with coupons, pay with coupons

(5) Interactive domain: likes, comments, favorites

3) Build Business Matrix

User, Product, Event, Time, Region, Coupon

(1) User domain:

Login, Register

(2)流量域: √
(2) Flow field: √

Start, page, action, fault, exposure

(3) Transaction domain:

Add purchase, order, payment, logistics, cancel order, cancel payment

(4) Toolfields:

Receive coupons, order with coupons, pay with coupons

(5) Interactive areas:

Like, comment, collect

4)建模 至下而上
4) Modeling from bottom to top


①保持数据原貌不做任何修改 起到备份作用
① Keep the original data without any modification to play a backup role

②采用压缩 减少磁盘空间,采用Gzip压缩
② Reduce disk space by compression and Gzip compression

③创建分区表 防止后续全表扫描
③ Create partition tables to prevent subsequent full table scans

(2)DWD层 事实表
(2) DWD layer fact table

① Transaction type fact table

atomic operation

a) Select Business Process

选择感兴趣的业务过程。 产品经理提出的指标中需要的。
Select the business process of interest. Required in the metrics proposed by the product manager.

b) Declaration granularity

Granularity: what a line of information represents. It can be a single order, a week order, a month order.

If it is a month's order, there is no way to count an order. Maintain minimum granularity.

As long as you don't aggregate yourself.

c) Determination of dimensions

确定感兴趣的维度。 产品经理提出的指标中需要的。
Determine the dimension of interest. Required in the metrics proposed by the product manager.

For example: users, products, activities, time, region, coupons

d) Establishment of facts

确定事实表的度量值。 可以累加的值,例如,个数、件数、次数、金额。
Determine the measure for the fact table. Value that can be accumulated, for example, number, amount.

Deficiencies in transactional fact tables:

连续性指标,不好找原子操作。 例如,库存(周期快照事实表)
Continuity index, difficult to find atomic operation. For example, inventory (periodic snapshot fact sheet)

Multiple fact table associations. For example, count the average usage time from purchase to payment (cumulative snapshot fact table)

(2) Periodic snapshot fact table

① Select business process

②声明粒度 =》 1天
② Statement granularity => 1 day

③ Determine dimension

4. Determine the facts

(3) Cumulative snapshot fact table

① Select business process

② Statement granularity

③ Determine dimension

④确定事实 确定多个事实表度量值
4. Determining facts Determining multiple fact table metrics

(3)DIM层 维度表
(3) DIM layer dimension table

①维度整合 减少join
Dimension integration reduces join

a) commodity table, commodity category table, spu, commodity first class classification, second class classification, third class classification = commodity dimension table

b)省份表、地区表 =》 地区维度表
b) Province Table, Region Table = Region Dimension Table

c)活动信息表、活动规则表 =》 活动维度表
c) Activity Information Table, Activity Rule Table = Activity Dimension Table

② Zipper watch

Zipper the user table.

缓慢变化维 场景。
Slowly changing dimensional scenes.

5)指标体系建设 至上而下
5) Index system construction from top to bottom


Demand, Daily Activity, New, Retention, Conversion Rate, GMV

(2)DWS层 聚合层
(2) DWS Layer Aggregate Layer

Demand: derived indicators, derived indicators

派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
Derivative indicator = atomic indicator (business process + metric + aggregation logic)+ statistical period + statistical granularity + business limit

For example, statistics, the total number of mobile phone brand transactions in various provinces every day

交易总额 (下单 + 金额 + sum ) + 每天 + 省份 + 手机品牌
Total transaction amount (order + amount + sum)+ daily + province + mobile phone brand

找公共的:业务过程 + 统计周期 + 统计粒度 建宽表
Find common: business process + statistical period + statistical granularity broadening table

2.12 数仓每层做了哪些事
2.12 What does each floor do?

1) What does the ODS layer do?

(1) Keep the data as it is and make no changes

(2) compression using gzip, compression ratio is 100g data compression about 30g.

(3) Create partition table

2) What does DIM/DWD do?

The operations inside the modeling were written normally.

(1) Means of data cleansing

HQL, MR, SparkSQL, Kettle, Python (cleaning with SQL in project)

(2) Cleaning rules

The amount must be all numbers,[0-9], mobile phone number, ID card, matching URL

Parse data, core field cannot be blank, obsolete data deletion, duplicate data filtering

json => 很多字段 =》 一个一个判断 =》 取数,根据规则匹配
json => many fields => one judgment => take the number, match according to the rules

(3) How much data is reasonable to clean

Reference, 10,000 data cleanses 1.

(4) Desensitization

Desensitization of sensitive data such as mobile phone number and ID card number.


135****0013 互联网公司经常采用
135***0013 Internet companies often use

②加密算法 md5 需要用数据统计分析,还想保证安全
② Encryption algorithm md5 needs statistical analysis of data, but also wants to ensure security

美团 滴滴 md5(12334354809)=》唯一值
Meituan Didi md5 (12334354809)=》Unique value

③加权限 需要正常使用 军工、银行、政府
③ Adding authority requires normal use of military, banking, and government

5) Reduce the snappy

(6) Orc column storage

3) What does the DWS layer do?

The contents of the indicator system construction should be repeated.

4) What does the ADS layer do?

Say at least 30 indicators a minute.

日活、月活、周活、留存、留存率、新增(日、周、年)、转化率、流失、回流、七天内连续3天登录(点赞、收藏、评价、购买、加购、下单、活动)、连续3周(月)登录、GMV、复购率、复购率排行、点赞、评论、收藏、领优惠卷人数、使用优惠卷人数、沉默、值不值得买、退款人数、退款率 topn 热门商品
Daily activity, monthly activity, weekly activity, retention, Retention rate, new addition (day, week, year), conversion rate, churn, reflow, login for 3 consecutive days within 7 days (likes, collections, evaluations, purchases, purchases, orders, activities), login for 3 consecutive weeks (months), GMV, repurchase rate, repurchase rate ranking, likes, comments, collections, number of coupon recipients, number of coupon users, silence, value not worth buying, number of refunds, refund rate topn Hot items

Product Manager's Most Concerned: Stay Turn G Revival

2.13 数据量
2.13 data size

The description of the data volume is the data volume before compression.


(1)用户行为数据(100g => 1亿条;1g => 100万条)
(1) User behavior data (100g => 100 million items;1g => 1 million items)

曝光(60g or 600万条)、页面(20g)、动作(10g)、故障 + 启动(10g)
Exposure (60g or 6 million), page (20g), action (10g), fault + start (10g)

(2)业务数据(1-2g => 100万-200万条)
(2) Business data (1-2g => 1 million-2 million items)

Login (200,000), registration (100-1000);

Additional purchase (daily increment of 200,000, full amount of 1 million), order (100,000), payment (90,000), logistics (90,000), cancellation of order (500), refund (500);

Receive coupons (50,000), order with coupons (40,000), pay with coupons (30,000);

Likes (1000), Comments (1000), Collections (1000);

Users (1 million active users, 1000 new users, 10 million total users), Product SPU (10,000 - 20,000), Product SKU (100,000 - 200,000), Activity (1000), Time (ignore), Region (ignore)

2)DWD层 + DIM层:
2) DWD + DIM:

and ODS layer are almost identical;


After mild polymerization, 20g-50g.


Between 10-50m, negligible.

2.14 项目中遇到哪些问题?(*****
2.14 What problems did you encounter in the project? (*****)

1) Flume zero drift

2) Flume hangs and optimizes

3) Datax null value and tuning

4) HDFS small file processing

5) Kafka hangs

6) Kafka is lost

7) Kafka data is duplicated

8) Kafka message data backlog

9) Kafk out of order

10) Kafka order

11) Kafka optimization (increase throughput)

12) How to ensure efficient reading and writing at the bottom of Kafka?

13) The size of a single Kafka log transfer

14)Hive优化(Hive on Spark

15) Hive solves the data skew method

19)疑难指标编写(7天内连续3次活跃、1 7 30指标、路径分析、用户留存率、最近7/30日各品牌复购率、最近30天发布的优惠券的补贴率、 同时在线人数)
19) Preparation of difficult indicators (3 consecutive active in 7 days, 1 7 30 indicators, path analysis, user retention rate, repurchase rate of each brand in the last 7/30 days, subsidy rate of coupons released in the last 30 days, number of people online at the same time)

20) What should I do if the DS task hangs?

21) DS fault alarm

2.15 离线---业务
2.15 Offline --- Services

2.15.1 SKU和SPU

SKU: A silver, 128G RAM, China Unicom iPhone X.


Tm_id: Brand Id Apple, including IPHONE, headphones, MAC, etc.

2.15.2 订单表跟订单详情表区别?
2.15.2 What is the difference between the order form and the order details table?

The order status of the order table changes, but the order details table does not, because there is no order status.

The order form records the user_id, the order ID, the order number, the total amount of the order order_status, the payment method, the order status, etc.

The order details table records user_id, product sku_id, and specific product information (product name sku_name, price order_price, quantity sku_num)

2.15.3 上卷和下钻
2.15.3 Roll up and down drill

Rollup: Rollup is the aggregation of summarized data along the hierarchy of dimensions.

Drill down: Drill down is the reverse of the scroll, which is down the hierarchy of dimensions to see more detailed data.

For example, this classic data cube model:

Dimensions include product, year, region, etc., and statistics on sales. In fact, the dimensions can be more fine-grained, for example, the time dimension can be composed of years, quarters, months, and days, and regions can also be composed of countries, provinces, cities, districts, and counties.

Drilling down can be understood as observing data from coarse-grained to fine-grained, for example, when analyzing product sales, you can observe data at a finer level from year to month to day along the time dimension.

Increase the dimension granularity of Month.

Scrolling up and drilling down are reversed operations, so scrolling up can be understood as deleting some granularity of dimensions, observing data from fine-grained to coarse-grained, and aggregating and summarizing data upwards.

2.15.4 TOBTOC解释
2.15.4 TOB and TOC Interpretation

TOB (toBusiness): indicates that the user is an enterprise.

TOC (toConsumer): indicates that the user to whom the target user is an individual.

2.15.5 流转G复活指标
2.15.5 Circulation G resurrection indicator

1) Be active

日活:100万 ;月活:是日活的2-3倍 300万
Daily life: 1 million; monthly life: 2-3 times daily life: 3 million

How many total registered users? Between 10 and 30 million.

渠道来源:app 公众号 抖音 百度 36 头条 地推
Channel source: app public number chatter Baidu 36 krypton headline push


GMV:每天 10万订单 (50 100元) 500万-1000万
GMV: 100,000 orders per day (50 - 100 yuan) 5 million-10 million

10%-20% 100万-200万(人员:程序员、人事、行政、财务、房租、收电费)
10%-20% 1 million-2 million (personnel: programmers, personnel, administration, finance, rent, electricity)

3) Repurchase rate

Re-purchase of a daily commodity;(toilet paper, mask, toothpaste) 10%-20%

电脑、显示器、手表 1%
Computers, monitors, watches 1%

4) Conversion rate

商品详情 =》 加购物车 =》下单 =》 支付
Item Details =》Add Cart =》Order =》Pay

1%-5% 50-60% 80%-95%

5) Retention rate

1/2/3-60 days, weeks, months

搞活动: 10-20%
Activities: 10-20%

2.15.6 活动的话,数据量会增加多少?怎么解决?
2.15.6 How much more data will be added if the event is active? How?

Daily activity increased by 50%, GMV increased by 20%. Valentine's Day, promotional toilet paper.

Cluster resources are reserved. 11.11 6.18, the amount of data is too large, dynamically increase the server in advance.

How many machines are added: 3-4 units

2.15.7 哪个商品卖的好?
2.15.7 Which product sells well?

Facial mask, toilet paper, 5000 sold daily. Download APP according to your own business

2.15.8 数据仓库每天跑多少张表,大概什么时候运行,运行多久
2.15.8 How many tables does the data warehouse run per day, when and how long does it run?

Basically, a database is built for one project, and the number of tables is the total number of initial original data tables plus statistical result tables. (Usually 70-100 forms).

用户行为5张;业务数据33张表 =ods34 =》dwd=>32张=》dws 22张宽表=>ads=15=103张。
5 user behaviors; 33 business data tables => ods34 => dwd=>32 => dws 22 wide tables =>ads=> 15 => 103.

Datax:00:10 => 10-20分钟左右 第一次全量。
Datax: 00:10 => 10-20 minutes for the first full dose.

用户行为数据,每天0:30开始运行。=》ds =》 5-6个小时运行完指标。
User behavior data, starting at 0:30 every day.=》ds => 5-6 hours to run the indicator.

All offline data reports are controlled within 8 hours.

Big data real-time processing is controlled within 5 minutes. (minutes, seconds)

If it is a real-time recommendation system, it requires a second-level response.

2.15.9 哪张表数据量最大
2.15.9 Which table has the largest amount of data

1) User behavior data

曝光(60g or 6000万条)、页面(20g)
Exposure (60g or 60 million), page (20g)

2)业务数据(1-2g => 100万-200万条)
2) Business data (1-2g => 1 million-2 million items)

Login (200,000), registration (100-1000);

Additional purchase (200,000 yuan), order (100,000 yuan)

Users (1 million active users, 1000 new users, 10 million total users)

SKU (100,000 - 200,000)

2.15.10 哪张表最费时间,有没有优化
2.15.10 Which table is the most time-consuming and optimized?

The most time-consuming, usually occurs when the data tilt, will be more time-consuming.

1)Group By

(1) Counting the transaction volume corresponding to each province

The time difference between the first statistic and the last statistic is 20 times

What we saw on Yarn

一共执行了多长时间 4-5小时
How long did it take? 4-5 hours.

你想:发生了数据倾斜 任务停止掉
You think: there's a data skew, the mission stops.

(2) Solution:

①开启map-side 预聚合
① Open map-side prepolymerization


解决后的效果怎么样 ?
What is the effect after solving?

30-50 It was executed in minutes.


统计 事实表 和维度表join => mapjoin
statistical fact table and dimension table join => mapjoin

(1)小表 大表 join mapjoin
(1) Small table big table join mapjoin

解决办法: mapjoin
Solution: Mapjoin

(2)大表 =》 大表 join
(2) Large table ="Large table join"

项目中什么出现 统计 加购到支付的平均使用时长
What appears in the item Statistics Average length of use from purchase to payment

执行时间 4-5小时 yarn
Execution time 4-5 hours yarn


②:smbjoin 分桶有序join 使用的前提 (分桶且有序)
②: smbjoin bucket orderly join premise of use (bucket and orderly)

③:左表随机 右表扩容
③: Random expansion of left table and right table

④:通过建模 规避 大表join大表
④: Avoid large tables by modeling join large tables

cumulative snapshot fact table

2.15.11 并发峰值多少?大概哪个时间点?
2.15.11 How many concurrent peaks? About what time?

高峰期晚上7-12点。Kafka里面20m/s 2万/s 并发峰值在1-2万人
Peak 7-12 p.m. Kafka inside 20m/s 20,000/s concurrent peak at 1- 20,000 people

2.15.12 分析过最难的指标
2.15.12 Analyzed the most difficult indicators

path analysis

User Retention rate

Re-purchase rate of each brand in the last 7/30 days

Log in for 3 consecutive days within 7 days

Number of simultaneous users per minute

expand themselves.

2.15.13 数仓中使用的哪种文件存储格式
2.15.13 Which file storage format is used in Warehouse

Commonly used include: textFile, ORC, Parquet, ORC or Parquet in general enterprises, because it is column storage, and the compression ratio is very high, so compared to textFile, the query speed is fast, occupying less hard disk space.

2.15.14 数仓当中数据多久删除一次
2.15.14 How often is data deleted in the warehouse

(1) Some companies do not delete permanently

(2)有一年、两年“删除”一次的,这里面说的删除是,先将超时数据压缩下载到单独安装的磁盘上。然后删除集群上数据。 很少有公司不备份数据,直接删除的。
(2) There is a "delete" once a year or two years, which means that the timeout data is compressed and downloaded to a separately installed disk. Then delete the data on the cluster. Very few companies delete data without backing it up.

2.15.15 Mysql业务库中某张表发生变化,数仓中表需要做什么改变
2.15.15 When a table in Mysql business library changes, what changes need to be made to the table in warehouse

Modify the table structure and place the newly added fields last!

2.15.16 50多张表关联,如何进行性能调优
2.15.16 50 Multiple table associations, how to tune performance

See chapter 1 Hive multi-table join optimization method for details

2.15.17 拉链表的退链如何实现
2.15.17 How to realize the chain withdrawal of zipper table

Zipper tables are used to record historical changes in dimension tables. In a zippered table, when a dimension attribute changes, a new record is inserted and the validity period of the original record is set to expire. Backout refers to reverting a change that has already taken effect to the previous state. The idea is as follows:

(1) Locating the record to be dropped: for example, finding the latest information update of the user

(2) Query the previous record: Query the previous record (same primary key, different version records, not just the previous record)

(3) Update validity period: update the effective time or effective start time of the current record to invalid, and change the expiration date of the previous record to the maximum value.

2.15.18 离线数仓如何补数
2.15.18 How to make up for off-line positions

Complement: Reprocessing data over a historical period of time to fix data problems

Using scheduling framework, complement function of Dolphin scheduler to complement

2.15.19 当ADS计算完,如何判断指标是正确的
2.15.19 When ADS is calculated, how to judge whether the indicator is correct

(1) Sample data validation: extract some sample data from the calculation results and compare them with the actual data of the business department.

(2) Logical verification: check whether the calculation sql of the indicator is correct.

(3) Verification of the relationship between indicators: Compare the relationship between different indicators and check whether they meet expectations. For example, if an indicator is the cumulative value of another indicator, then there must be a relationship between the two indicators.

(4) Historical data comparison: compare the calculation results with past data and observe the change trend of indicators.

(5) Outlier detection: Check whether there is an outlier in the calculation result.

(6) Cross-data department comparison: The calculation results can be compared with the data of other departments or teams for further verification.

2.15.20 ADS层指标计算错误,如何解决
2.15.20 ADS layer index calculation error, how to solve

(1) Determine the error range: find out the time range, indicators and related dimensions of the index calculation error, and narrow the scope of investigation.

(2) Check the data processing logic: start from the ods layer, find out the steps such as data cleaning, conversion and aggregation that may cause calculation errors, and confirm that the processing logic does not have errors.

(3) Review data quality: ensure data integrity, consistency, accuracy and timeliness at each level

(4) Recalculate metrics: recalculate after fixing data quality problems and processing logic

2.15.21 产品给新指标,该如何开发
2.15.21 How to develop new indicators for products

(1) Determine requirements: communicate with product managers and business departments

(2) Data source analysis: analyze existing data sources to determine whether they can meet the calculation requirements of new indicators. If they cannot support the introduction of new data sources or the expansion of existing data sources,

(3) Data model design: design model according to new index.

(4) Data processing flow design: use sql to extract, clean, transform and load data to achieve indicators

2.15.22 新出指标,原有建模无法实现,如何操作
2.15.22 New indicators, the original modeling can not be achieved, how to operate

Introducing new data sources or extending existing ones

2.15.23 和哪些部门沟通,以及沟通什么内容
2.15.23 Which departments to communicate with and what to communicate

Operations: new metrics, data reports, outliers

Back-end development: data sources and storage, data formats, business logic

Front-end development: data visualization requirements, user experience

BI Department: Reporting and Analysis Requirements

2.15.24 你们的需求和指标都是谁给的
2.15.24 Who gave you your needs and indicators?

Operations Department, BI Department Reports, Product or Product Manager

2.15.25 任务跑起来之后,整个集群资源占用比例
2.15.25 After the task runs, the proportion of resources occupied by the whole cluster

Warehouse scripts are executed serially, ods=> dim=> dwd=> dws=> ads

Therefore, the proportion of cluster resources occupied is only the resources required for the conversion of the script of the current execution layer into the underlying Sparkjob.

CPU与内存比 1:4
CPU to Memory Ratio 14

离线: 128M数据 512M内存
Offline: 128 MB data 512 MB memory

实时:并行度与Kafka分区一致,CPU与Slot比 1:3
Real-time: parallelism consistent with Kafka partition, CPU to Slot ratio 13

20M/s -> 3个分区 -> CPU与Slot比 1:3 -> 3个Slot -> Core数1-> CPU与内存比 1:4 -> TM 1 slot -> TM 4G资源
20M/s -> 3 partitions-> CPU to Slot ratio 13 -> 3 slots-> 1 core-> CPU to memory ratio 14 -> TM 1 slot -> TM 4G resources

JobManager 2G内存 1CPU

平均 一个Flink作业6G内存,2Core
Average Flink job 6 GB memory, 2 Core

2.15.26 业务场景:时间跨度比较大,数据模型的数据怎么更新的,例如:借款,使用一年,再还款,这个数据时间跨度大,在处理的时候怎么处理
2.15.26 Business scenario: The time span is relatively large. How to update the data of the data model, for example: borrow money, use it for one year, and then repay it. The time span of this data is large. How to deal with it when processing it

Cumulative snapshot fact table!

2.15.27 数据倾斜场景除了group by 和join外,还有哪些场景
2.15.27 Data Skew Scenarios What are the other scenarios besides group by and join?

(1) Data filtering: In a data filtering scenario, if certain data filtering conditions cause a Map task to process more data than other tasks, a data skew problem may occur.

(2) Snappy compression is used, the original file size varies, and the Map stage data is skewed.

(3)over开窗,partition by导致数据倾斜。
(3) over windowing, partition by causes data to tilt.

2.15.28 你的公司方向是电子商务,自营的还是提货平台?你们会有自己的商品吗?
2.15.28 Is your company oriented towards e-commerce, self-employed or delivery platform? Do you have your own merchandise?

Our company is self-operated and has its own commodities. We have our own commodity dimension table, as well as business data and log data of a series of processes from procurement to shelf, delivery and receipt of commodities.

You need to download the APP to answer flexibly according to your own situation!

2.15.29 ods事实表中订单状态会发生变化,你们是通过什么方式去监测数据变化的
2.15.29 ods fact table order status will change, how do you monitor data changes

The change of order status will cause the change of status code, which corresponds to different business processes and can be extracted into different transaction fact tables. No matter which node the refund occurs in, the business processes of successful payment, delivery and receipt can be recorded in the corresponding transaction fact tables.

2.15.30 用户域你们构建了哪些事实表?登录事实表有哪些核心字段和指标?用户交易域连接起来有哪些表?
2.15.30 User Domain What fact tables have you constructed? What are the core fields and metrics of the login fact table? What tables are there for linking user transaction domains?

In the offline warehouse, we create a variety of fact tables to store different types of user data.

1)登录事实表(Login Fact Table)
1) Login Fact Table

Core fields may include:


- Login date (login_date)

- login_time

- Device type (device_type)



- App version (app_version)

Core metrics may include:

– Daily Active Users (DAU)

– Monthly Active Users (MAU)

– Number of active users by device type

– Average number of logins per day

2)用户交易事实表(Transaction Fact Table)
2) Transaction Fact Table

Core fields may include:



– Transaction date (transaction_date)

– Trading hours (transaction_time)

– Transaction Type (transaction_type)



– Payment Methods (payment_method)

Core metrics may include:

– Total trading volume

– Average transaction amount

– Volume by transaction type

– Transaction volume by payment method

To connect user logins with transactions, we also built the following dimension tables:

(1)用户维度表(User Dimension Table):包含用户ID、用户名、邮箱、注册日期等用户信息。
(1) User Dimension Table: Contains user information such as user ID, user name, email address, registration date, etc.

(2)会话维度表(Session Dimension Table):包含会话ID、登录时间、登出时间、访问页面数等会话信息。
(2) Session Dimension Table: Contains session information such as session ID, login time, logout time, and number of pages visited.

(3)日期维度表(Date Dimension Table):包含日期ID、年、月、日、周等日期信息。
(3) Date Dimension Table: Contains date information such as date ID, year, month, day, week, etc.

(4)设备维度表(Device Dimension Table):包含设备类型、品牌、操作系统等设备信息。
(4) Device Dimension Table: Contains device information such as device type, brand, operating system, etc.

By establishing associations between fact tables and dimension tables, you can easily query and analyze the relationship between user logins and transaction behavior.

2.15.31 当天订单没有闭环结束的数据量?
2.15.31 Data volume of orders without closed loop closure on the day?

In offline warehouse, the amount of data that the order has not closed loop on the day usually refers to the following data:

Based on 100,000 orders of the day

Unpaid order: refers to the order generated on the same day, but the customer has not completed the payment behavior data.


Order to be shipped: refers to the order that has received payment on the same day, but the data of delivery behavior has not been processed.


Return order: refers to the data of the return request initiated by the customer on the same day, but the merchant has not completed the return process.


2.15.32 你们维度数据要做ETL吗?除了用户信息脱敏?没有做其他ETL吗
2.15.32 Do you want ETL for dimensional data? Besides user information desensitization? No other ETL?

In addition to user information desensitization, we also parse IP, UA and other information in the user buried point data. At the same time, we deduplicate the data, process null values and abnormal data, and merge some dimensional data.

2.15.33 怎么做加密,加密数据要用怎么办,我讲的md5,他问我md5怎么做恢复
2.15.33 How to do encryption, how to use encrypted data, I talked about md5, he asked me how to do md5 recovery

In practical applications, various encryption algorithms can be used to encrypt data to ensure data security.

Symmetric encryption: Symmetric encryption uses the same key for encryption and decryption. Common symmetric encryption algorithms are AES, DES, 3DES and so on. When symmetric encryption is used, it is necessary to ensure the security of the key, otherwise the security of the encrypted data may be compromised.

(2) Asymmetric encryption: Asymmetric encryption uses a pair of keys for encryption and decryption. The public key is used to encrypt data and the private key is used to decrypt data. Common asymmetric encryption algorithms are RSA, ECC and so on.

(3) Hashing algorithm: Hashing algorithm uses fixed-length output to represent input data. Common hash algorithms are MD5, SHA-1, SHA-2, etc. When using hashing algorithms, the original data cannot be recovered, and the integrity of the data can only be verified by calculating the hash value again.

If you want to use encrypted data, you can decrypt it using the corresponding decryption algorithm. If the hash algorithm is used, the encrypted hash value is directly used as the original field.

For MD5 encryption algorithm, because it is an irreversible hash algorithm, it cannot recover the original data. It is usually used to verify the integrity and consistency of data rather than encrypting it. If you need to encrypt the data and you need to be able to recover the original data, you can use symmetric encryption or asymmetric encryption algorithms.

2.15.34 真实项目流程
2.15.34 Real Project Flow

1) Before the warehouse is built:

(1) Requirements analysis: understand the requirements with the product, and clarify the functions of the warehouse.

(2) Data source analysis: analyze whether data sources are sufficient and whether new data sources need to be introduced.

(3) Data model design

(4) Technical selection

(5) Data processing flow design: Hivesql

2) After building the warehouse:

(1) Development and maintenance

(2) Data monitoring and alarm

(3) Data quality management

(4) Performance optimization

(5) Development of new requirements

(6) Reporting and visualization

(7) Documentation and knowledge sharing

2.15.35 指标的口径怎么统一的(离线这边口径变了,实时这边怎么去获取的口径)
2.15.35 How to unify the caliber of indicators (offline caliber changed, how to obtain the caliber in real time)

Unifying the indicator calibre in offline and real-time calculations can be handled from the following aspects:

Define uniform metrics: First, define clear, unambiguous metrics and ensure that all team members understand and follow them. This includes the calculation method of the indicator, data sources, time frame, etc.

2)版本控制和文档:使用版本控制工具(如 Git)来管理指标计算代码,并确保所有更改都经过审查和测试。同时,为指标计算方法编写详细的文档,以便团队成员能够理解和维护这些方法。
Version control and documentation: Use version control tools such as Git to manage metric calculation code and ensure that all changes are reviewed and tested. At the same time, document metrics calculation methods in detail so team members can understand and maintain them.

(3) Regular inspection and verification: Regular inspection and verification of offline and real-time calculation results to ensure the consistency of indicator caliber. Automated test cases can be written to compare results from offline and real-time calculations to ensure they are consistent across scenarios.

(4) Monitoring and alarm: Establish monitoring and alarm mechanism to find and solve problems in time when the indicators calculated offline and real-time are inconsistent.

2.15.36 表生命周期管理怎么做的?
2.15.36 How is life cycle management done?

Table life cycle management is an important aspect of data warehouse and database management, which needs to pay attention to the stages of table creation, use, optimization and deletion. The lifecycle of management tables has several stages:

1) Table design

2) Data import and update

3) Data storage and backup

4) Data security and rights management

5) Performance optimization and monitoring

6) Data archiving and deletion

2.15.37 如果上游数据链路非常的多,层级也非常的深,再知道处理链路和表的血缘的情况下,下游数据出现波动怎么处理?
2.15.37 If the upstream data link is very many and the hierarchy is very deep, and then know how to deal with the link and the kinship of the table, how to deal with the fluctuation of the downstream data?

Backup idea, similar to Spark cache idea

2.15.38 十亿条数据要一次查询多行用什么数据库比较好?
2.15.38 Billion pieces of data to query multiple rows at once What database is better?

Hbase Elasticsearch

2.16 埋点
2.16 buried-point

1) Buried point selection

Free Buried Point: Class Demo. The front-end programmer buries himself.

Buried point of charges: God policy, Baidu statistics, Friends League statistics.

2) There are two main ways to bury points:

(1) According to the page buried points, there are several pages to create several tables.

(2)按照用户行为:页面数据事件数据、曝光数据、启动数据和错误数据。 咱们项目中采用的这种方式。
(2) According to user behavior: page data, event data, exposure data, startup data, and error data. This is the way we've been working on this project.

3) Buried Point Data Log Format

In order to reduce data transmission over the network, log formats are designed with common information.


"common": { -- 公共信息
"common": { --public information

"ar": "230000", -- 地区编码
"ar": "230000", --area code

"ba": "iPhone", -- 手机品牌
"ba": "iPhone", --mobile phone brand

"ch": "Appstore", -- 渠道

"md": "iPhone 8", -- 手机型号
"md": "iPhone 8", --Phone model

"mid": "YXfhjAYH6As2z9Iq", -- 设备id

"os": "iOS 13.2.9", -- 操作系统
"os": "iOS 13.2.9", --OS

"uid": "485", -- 会员id

"vc": "v2.1.134" -- app版本号
"vc": "v2.1.134" -- app version


"actions": [ --动作(事件)
"actions": [ --actions (events)


"action_id": "favor_add", --动作id

"item": "3", --目标id
"item": "3", --target id

"item_type": "sku_id", --目标类型
"Item _ type": "sku _ id",--target type

"ts": 1585744376605 --动作时间戳
"ts": 1585744376605 --Action timestamp



"displays": [


"displayType": "query", -- 曝光类型

"item": "3", -- 曝光对象id
"item": "3", --Exposure object id

"item_type": "sku_id", -- 曝光对象类型
"item_type": "sku_id", -- The type of object to be exposed

"order": 1 --出现顺序
"order": 1 -- the order in which it appears



"displayType": "promotion",

"item": "6",

"item_type": "sku_id",

"order": 2



"displayType": "promotion",

"item": "9",

"item_type": "sku_id",

"order": 3



"page": { --页面信息
"page": { --pageinfo.}

"during_time": 7648, -- 持续时间毫秒
"during_time": 7648, -- Duration in milliseconds

"item": "3", -- 目标id
"item": "3", -- Target ID

"item_type": "sku_id", -- 目标类型
"item_type": "sku_id", -- Target type

"last_page_id": "login", -- 上页类型
"last_page_id": "login", -- Previous type

"page_id": "good_detail", -- 页面ID

"sourceType": "promotion" -- 来源类型


"err":{ --错误
"err": { --Error

"error_code": "1234", --错误码

"msg": "***********" --错误信息
"msg": "***********" -- error message


"ts": 1585744374423 --跳入时间戳
"ts": 1585744374423 -- jump in timestamp


第3章 实时数仓项目
Chapter 3 Real-time Data Warehouse Project

3.1 为什么做这个项目
3.1 Why did you do this project

With the continuous development of the company's business, product requirements and internal decision-making requirements for real-time data are becoming more and more urgent, and the traditional offline data warehouse T+1 mode can no longer meet the requirements, so the ability of real-time data warehouse is needed to empower it.

3.2 项目架构
3.2 Project Structure

3.3 框架版本选型
3.3 Framework version selection

Be consistent with offline.

3.4 服务器选型
3.4 Server Selection

Be consistent with offline.

3.5 集群规模
3.5 Cluster size

1) Scale of production clusters and Flink clusters (10 for example)

Flink is deployed on all worker nodes as a client for job submission

For example, if the number of jobs is about 20, you need 10 servers

Clickhouse is deployed separately, and the server uses 128G and 64C
























































3.6 项目建模
3.6 Project Modeling

1) Data research

(1)先和Java人员要表,表中最好有字段的描述或者有表和字段的说明文档。(项目经理帮助协调) =》 快速熟悉表中业务。梳理清楚业务线,找到事实表和维度表。
(1) First ask Java personnel for a table, and it is best to have a description of the fields in the table or a description of the table and fields. (Project manager helps coordinate) => Quickly familiarize yourself with the business in the table. Sort out the lines of business and find the fact table and dimension table.

(2)和业务人员聊 =》 验证你猜测的是否正确
(2) Talk to the business person =" Verify that your guess is correct

(3) Talk to the product manager

Demand: derived indicators, derived indicators

派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
Derivative indicator = atomic indicator (business process + metric + aggregation logic)+ statistical period + statistical granularity + business limit

The business processes in the requirements must correspond to the actual background business.

2) Clarify the data domain

(1) User domain: login, registration

(2) Flow domain: start, page, action, fault, exposure

(3) Transaction domain: purchase, order, payment, logistics

(4) Tool domain: receive coupons, place orders with coupons, pay with coupons

(5) Interactive domain: likes, comments, favorites

3) Build Business Matrix

User, Product, Event, Time, Region, Coupon

(1) User domain:

Login, Register

(2)流量域: √
(2) Flow field: √

Start, page, action, fault, exposure

(3) Transaction domain:

Purchase, Order, Payment, Logistics

(4) Toolfields:

Receive coupons, order with coupons, pay with coupons

(5) Interactive areas:

Like, comment, collect

4)建模 至下而上
4) Modeling from bottom to top


①存Kafka: topic_log\topic_db ,保持数据原貌不做处理
① Save Kafka: topic_log\topic_db , keep the data as it is and do not process it

(2)DWD层 事实表
(2) DWD layer fact table

① Transaction type fact table

atomic operation

a) Select Business Process

选择感兴趣的业务过程。 产品经理提出的指标中需要的。
Select the business process of interest. Required in the metrics proposed by the product manager.

b) Declaration granularity

Granularity: what a line of information represents. It can be a single order, a week order, a month order.

If it is a month's order, there is no way to count an order. Maintain minimum granularity.

As long as you don't aggregate yourself.

c) Determination of dimensions

确定感兴趣的维度。 产品经理提出的指标中需要的。
Determine the dimension of interest. Required in the metrics proposed by the product manager.

For example: users, products, activities, time, region, coupons

d) Establishment of facts

确定事实表的度量值。 可以累加的值,例如,个数、件数、次数、金额。
Determine the measure for the fact table. Value that can be accumulated, for example, number, amount.

e) Dimensional degradation

通过Lookupjoin 将字典表中字段退化到明细表中
Degenerate fields from dictionary tables into schedule tables via Lookupjoin

(3)DIM层 维度表
(3) DIM layer dimension table

① Dimension data is stored in Hbase without dimension integration.

5)指标体系建设 至上而下
5) Index system construction from top to bottom


Demand, Daily Activity, New, Retention, Conversion Rate, GMV

(2)DWS层 聚合层
(2) DWS Layer Aggregate Layer

Demand: derived indicators, derived indicators

派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
Derivative indicator = atomic indicator (business process + metric + aggregation logic)+ statistical period + statistical granularity + business limit

For example, statistics, the total number of mobile phone brand transactions in various provinces every day

交易总额 (下单 + 金额 + sum ) + 每天 + 省份 + 手机品牌
Total transaction amount (order + amount + sum)+ daily + province + mobile phone brand

找公共的:业务过程 + 统计周期 + 统计粒度 建宽表
Find common: business process + statistical period + statistical granularity broadening table

3.7 数据量
3.7 data size

3.7.1 数据分层数据量
3.7.1 Data Stratification Data Volume


(1)用户行为数据(100g => 1亿条;1g => 100万条)
(1) User behavior data (100g => 100 million items;1g => 1 million items)

曝光(60g or 600万条)、页面(20g)、动作(10g)、故障 + 启动(10g)
Exposure (60g or 6 million), page (20g), action (10g), fault + start (10g)

(2)业务数据(1-2g => 100万-200万条)
(2) Business data (1-2g => 1 million-2 million items)

Login (200,000), registration (100-1000);

Additional purchase (daily increment of 200,000, full amount of 1 million), order (100,000), payment (90,000), logistics (90,000), cancellation of order (500), refund (500);

Receive coupons (50,000), order with coupons (40,000), pay with coupons (30,000);

Likes (1000), Comments (1000), Collections (1000);

Users (1 million active users, 1,000 new users, 10 million total users), product SPU (1-20,000), product SKU (10-200,000), activity (1,000), time (ignore), region (ignore)

2)DWD层 + DIM层
2) DWD layer + DIM layer

Almost identical to the ODS layer;


After mild polymerization, 20g-50g.


Between 10-50m, negligible.

3.7.2 实时组件存储数据量
3.7.2 The amount of data stored by the real-time component


ODS layer and DWD layer data

The ods and dwd data are consistent, about 200G data per day

Consider that there are two copies of Kafka that are stored for three days, and Kafka stores 400 GB of data


存储 dim层数据,与离线一致
Stores DIM layer data, consistent with offline


Store data at the DWS layer, about 20~30G data per day

Consider that the data at the dws layer is stored for one year and three copies of Clickhouse

The data is about 15~20T

3.7.3 实时QPS峰值数据量
3.7.3 Real-time QPS peak data volume

Peak QPS: 20,000 pieces/s or 2 Mbit/s

3.8 项目中遇到哪些问题及如何解决?
3.8 What problems are encountered in the project and how to solve them?

3.8.1 业务数据采集框架选型问题
3.8.1 Selection of business data collection framework

See Chapter 1 (FlinkCDC, Maxwell, Canal) for comparison

3.8.2 项目中哪里用到状态编程,状态是如何存储的,怎么解决大状态问题
3.8.2 Where state programming is used in the project, how state is stored, and how to solve large state problems

1) Dim dynamic shunt uses broadcast state, new and old visitors repair uses keyed state

HashMap is used for less data in state, RocksDB is used for more data in state

2) Large state optimization means


(2) Enable incremental checkpoints, local recovery, and set up multiple directories

(3)设置预定义选项为 磁盘+内存 的策略,自动设定 writerbuffer、blockcache等
(3) Set the predefined option to disk + memory policy, automatically set writerbuffer, blockcache, etc.

3.8.3 项目中哪里遇到了反压,造成的危害,定位解决(*重点*
3.8.3 In the project, where the back pressure is encountered, the harm caused by it shall be located and solved (* key points *)

1) Reasons for back pressure in the project

Flood peak: no need to solve

Frequent GC: For example, temporary objects are created in large numbers in code

Big Status: New Old Visitor Repair

Associating external databases: reading dimension data from Hbase or writing data to Clickhouse

Data tilt: inconsistent amount of data in different groups after keyby

2) Harm of back pressure

Problem: Checkpoint timeout failure causes job to fail

OOM caused by increased memory pressure causes job to hang

timeliness reduction

3) Positioning back pressure

(1)利用Web UI定位
(1) Using Web UI positioning

定位到造成反压的节点,排查的时候,先把operator chain禁用,方便定位到具体算子。
Locate the node causing back pressure. When checking, disable the operator chain first to facilitate locating the specific operator.

Flink 现在在UI上通过颜色和数值来展示繁忙和反压的程度。
Flink now displays the degree of busyness and backpressure in the UI by color and numeric values.

The upstream is high, and the first node found is ok.

(2) Positioning with Metrics

可以根据指标分析反压: buffer.inPoolUsagebuffer.outPoolUsage
Backpressure can be analyzed according to indicators: buffer.inPoolUsage, buffer.outPoolUsage

Data transmission can be analyzed

4) Dealing with back pressure

反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。
Backpressure may be temporary and may be caused by a data backlog caused by a load spike, CheckPoint, or job restart. If the backpressure is temporary, it should be ignored.

(1) Check whether the data is tilted

(2) Use flame diagram analysis to see which function at the top occupies the largest width. Any "plateaus" indicates that the function may have performance problems.

(3) Analyze GC logs and adjust codes

(4) Unreasonable resources: adjust resources

5) Interacting with external systems:

Write MySQL, Clickhouse: Save Batch Write

Read HBase: asynchronous IO, bypass cache

3.8.4 数据倾斜问题如何解决(****重点***
3.8.4 How to solve the data skew problem (**** key **)

1) Data tilt phenomenon:

相同Task 的多个 Subtask 中,个别Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
Among multiple Subtasks of the same Task, the amount of data received by individual Subtasks is significantly larger than that received by other Subtasks. Through Flink Web UI, you can accurately see how much data each Subtask has processed, and you can judge whether there is data skew in Flink tasks. Usually, data skewing also causes backpressure.

2) Data tilt resolution

(1) Data source tilt

For example, Kafka is consumed, but the data between the partitions of Kafka's Topic is unbalanced.

After reading in, call the repartition operator: rescale, rebalance, shuffle, etc.

(2) Single table grouping aggregation (pure flow) tilt

API: Use flatmap to save batches and pre-aggregate


(3) Single table grouped windowing aggregation tilt

The first stage of aggregation: key concatenate random number prefix or suffix to perform keyby, windowing, and aggregation

Note: After the aggregation is completed, it is no longer WindowedStream, you need to get WindowEnd as the window marker as the second stage of grouping by, to avoid the results of different windows being aggregated together)

The second stage of aggregation: keyby and aggregation based on the original key and windowEnd

在我们项目中,用到了Clickhouse,我们可以第一阶段打散聚合后,直接写入Click house,查clickhouse再处理第二阶段
In our project, we use Clickhouse, we can break up the aggregation in the first stage, write it directly to the clickhouse, check the clickhouse, and then process the second stage

3.8.5 数据如何保证一致性问题
3.8.5 How to ensure data consistency

Upstream: Kafka ensures that the offset can be retransmitted, and Kafka implements it by default

Flink: set Checkpoint to Exactly_once execution mode

Downstream: Use transactions to write to Kafka, idempotent writes to Clickhouse, and uses final queries to queries

3.8.6 FlinkSQL性能比较慢如何优化
3.8.6 How to optimize FlinkSQL performance is slow

(1) Set the idle state retention time



(4)开启Split Distinct

(5) Multi-dimensional Distinct uses Filter

3.8.7 Kafka分区动态增加,Flink监控不到新分区数据导致数据丢失
3.8.7 Kafka partitions are dynamically increased, and Flink cannot monitor the data of the new partitions, resulting in data loss

Set the parameters for Flink to dynamically monitor a Kafka partition

3.8.9 Kafka某个分区没有数据,导致下游水位线无法抬升,窗口无法关闭计算
3.8.9 If there is no data in a Kafka partition, the downstream watermark cannot be raised and the calculation window cannot be closed

When injecting into the watermark, set the minimum waiting time

3.8.10 Hbase的rowkey设计不合理导致的数据热点问题
3.8.10 Data hotspots caused by the unreasonable rowkey design of HBase

For details, see Hbase's rowkey design principles

3.8.11 Redis和HBase的数据不一致问题
3.8.11 Data inconsistencies between Redis and HBase

There are two ways to manipulate Redis and databases:

(1) First operate (delete) Redis, then operate database

Concurrency can cause data consistency problems.

上面的图表示,Thread-1 是个更新流程,Thread-2 是个查询流程,CPU 执行顺序是:Thread-1 删除缓存成功,此时 Thread-2 获取到 CPU 执行查询缓存没有数据,然后查询数据库把数据库的值写入缓存,因为此时 Thread-1 更新数据库还没有执行,所以缓存里的值是一个旧值(old),最后 CPU 执行 Thread-1 更新数据库成功的代码,那么此时数据库的值是新增(new),这样就产生了数据不一致行的问题。
The diagram above shows that Thread-1 is an update process, Thread-2 is a query process, and the CPU execution order is: Thread-1 deletes cache successfully. At this time, Thread-2 obtains that there is no data in CPU execution query cache, and then queries database to write database value into cache. Because Thread-1 updates database at this time, the value in cache is an old value (old). Finally, CPU executes Thread-1 to update database successfully. Then the value of the database is new, which creates the problem of inconsistent rows of data.

There are two ways to solve these problems:

①加锁,使线程顺序执行:如果一个服务部署到了多个机器,就变成了分布式锁,或者是分布式队列按顺序去操作数据库或者 Redis,带来的副作用就是:数据库本来是并发的,现在变成串行的了,加锁或者排队执行的方案降低了系统性能,所以这个方案看起来不太可行。
Locking, making threads execute sequentially: If a service is deployed to multiple machines, it becomes a distributed lock, or a distributed queue operates the database or Redis in order, and the side effect is that the database was originally concurrent, but now it becomes serial. The locking or queuing scheme reduces system performance, so this scheme does not seem feasible.

② Double deletion: delete the cache first, then update the database, and delete the cache once after sleeping for a period of time after updating the data.

(2)先操作数据库,再操作(删除) Redis
(2) First operate the database, then operate (delete) Redis

我们如果更新数据库成功,删除 Redis 失败,那么 Redis 里存放的就是一个旧值,也就是删除缓存失败导致缓存和数据库的数据不一致了
If we update the database successfully and delete Redis fails, then Redis stores an old value, that is, the failure to delete the cache causes the cache and database data to be inconsistent.

In both cases, we hope that the data operation will either succeed or fail, that is, it is best to be an atomic operation. We do not want to see a failure and a successful result, because this will create data inconsistency problems.

3.8.12 双流join关联不上如何解决
3.8.12 How to solve the problem that the dual-stream join is not related

(1)使用interval join调整上下限时间,但是依然会有迟到数据关联不上
(1) Use interval join to adjust the upper and lower limit time, but there will still be late data that is not associated.

(2)使用left join,带回撤关联
(2) Use left join to bring back the association

(3) You can associate two streams using Cogroup+Connect.

3.9 生产经验
3.9 production experience

3.9.1 Flink任务提交使用那种模式,为何选用这种模式
3.9.1 Which mode does Flink task submission use and why?

Per-job pattern used in project submission, because each job is resource isolated, fault isolated, and tuned independently

3.9.2 Flink任务提交参数,JobManager和TaskManager分别给多少
3.9.2 Flink Task Submission Parameters: How many JobManager and TaskManager are given respectively?

JobManager: Memory default 1G, CPU default 1 core

TaskManager: jobs with a large amount of data, e.g. Topic_log jobs with offloading can be given to 8G

Job with less data, e.g. Topic_db job with branching can be given to 4G

实时:并行度与Kafka分区一致,CPU与Slot比 13
Real-time: parallelism consistent with Kafka partition, CPU to Slot ratio 1:3

20M/s -> 3个分区 -> CPU与Slot比 1:3 -> 3个Slot -> Core数1-> CPU与内存比 1:4 -> TM 1 slot -> TM 4G资源
20M/s -> 3 partitions-> CPU to Slot ratio 13 -> 3 slots-> 1 core-> CPU to memory ratio 14 -> TM 1 slot -> TM 4G resources

JobManager 2G内存 1CPU

平均 一个Flink作业6G内存,2Core
Average Flink job 6 GB memory, 2 Core

3.9.3 Flink任务并行度如何设置
3.9.3 How to set Flink task parallelism

The global parallelism setting and kafka partition number are kept consistent at 5, and the operators with large calculations after Keyby are specified separately.

3.9.4 项目中Flink作业Checkpoint参数如何设置
3.9.4 How Do I Set the Checkpoint Parameters of a Flink Job in a Project?

Checkpoint interval: How often a job triggers a checkpoint, adjusted by the job state size and recovery, generally recommended for 3~5 minutes, and the S level can be set for high timeliness requirements.

Checkpoint timeout: The execution time of the checkpoint is limited, after which the checkpoint is discarded, and it is recommended that the checkpoint be discarded within 10 minutes.

Checkpoint Minimum Interval: To avoid checkpoints being too frequent, you can set the minute level.


Checkpoint's storage backend: HDFS is generally stored.

3.9.5 迟到数据如何解决
3.9.5 How to solve late data

(1) Set the disorder time

(2) Allowable window delay time

(3) Side output flow

Production mid-side output stream, needs to be processed separately by Flink, written in Clickhouse, calculated again through interface

3.9.6 实时数仓延迟多少
3.9.6 How much is the delay in real-time counting

Backpressure, state size, low resources, machine performance, checkpoint time will affect the bin delay.

Generally, the largest impact is the window size, usually 5s.

If two-phase commit write Kafka is enabled and downstream settings read committed, then CheckPoint interval time needs to be added.

3.9.7 项目开发多久,维护了多久
3.9.7 How long has the project been developed and maintained?

Development cycle half a year, maintenance more than half a year

3.9.8 如何处理缓存冷启动问题
3.9.8 How to handle cache cold start issues

Initial startup, Redis has no cache data, a large number of read requests access Habse, similar to cache avalanche

From offline statistics hot dimension data, last three days user purchases, active sku, manual insert Redis.

3.9.9 如何处理动态分流冷启动问题(主流数据先到,丢失数据怎么处理)
3.9.9 How to deal with dynamic shunt cold start problem (mainstream data arrives first, how to deal with lost data)

Preload configuration information into HashMap in Open method to prevent configuration information from arriving later.

3.9.10 代码升级,修改代码,如何上线
3.9.10 Code upgrade, code modification, how to go online

Savepoint stops the program and resumes the program via Savepoint.

Code changes are large, savepoint recovery can not do, see historical data do not want to, to run from scratch, do not apply savepoint recovery directly submitted to run.

3.9.11 如果现在做了5个Checkpoint,Flink Job挂掉之后想恢复到第三次Checkpoint保存的状态上,如何操作
3.9.11 If 5 Checkpoints have been made now, how to restore the state saved by the third Checkpoint after Flink Job is suspended?

In Flink, we can enable externalized checkpoints by setting externalized-checkpoint, and to recover jobs from a specific checkpoint (e.g., the third checkpoint), we need to manually specify which checkpoint to recover from (we need to specify to the chk-xx directory).

3.9.12 需要使用flink记录一群人,从北京出发到上海,记录出发时间和到达时间,同时要显示每个人用时多久,需要实时显示,如果让你来做,你怎么设计?
3.9.12 Need to use flink to record a group of people, from Beijing to Shanghai, record departure time and arrival time, at the same time to show how long each person takes, need real-time display, if let you do, how do you design?

Save departure time to status per person KeyBy, use arrival time minus departure time when arriving.

3.9.13 flink内部的数据质量和数据的时效怎么把控的
3.9.13 How to control the quality and timeliness of data inside flink

Internal data quality: internal consistency checkpoints

Data timeliness: answer in conjunction with 3.9.5

3.9.14 实时任务问题(延迟)怎么排查
3.9.14 How to troubleshoot real-time task problems (delays)

When real-time tasks are delayed, they can be investigated from the following aspects:

(1) Monitoring indicators: see if there is back pressure

(2) Log information: View log information when the task is running to locate potential problems and exceptions. For example, network fluctuations, hardware failures, improper configuration, etc.

External events: If the delay occurs after a large number of external events, other factors (such as external system failures, network fluctuations, etc.) may need to be considered. Frameworks mixed up, resource scramble!

3.9.15 维度数据查询并发量
3.9.15 Dimension Data Query Concurrent Volume

Before optimization, there are thousands of QPS, after Redis cache optimization, down to dozens

3.9.16 Prometheus+Grafana是自己搭的吗,监控哪些指标
3.9.16 Is Prometheus+Grafana built by itself? What indicators are monitored?

We built it ourselves to monitor Flink tasks and cluster metrics.

1)TaskManager Metrics:这些指标提供有关TaskManager的信息,例如CPU使用率、内存使用率、网络IO等。
TaskManager Metrics: These metrics provide information about TaskManager, such as CPU usage, memory usage, network IO, and more.

2)Task Metrics:这些指标提供有关任务的信息,例如任务的延迟时间、记录丢失数、输入输出速率等。
2) Task Metrics: These metrics provide information about the task, such as the delay time of the task, the number of records lost, the input and output rate, etc.

3)Checkpoint Metrics:这些指标提供有关检查点的信息,例如检查点的持续时间、成功/失败的检查点数量、检查点大小等。
Checkpoint Metrics: These metrics provide information about checkpoints, such as checkpoint duration, number of successful/failed checkpoints, checkpoint size, etc.

4)Operator Metrics:这些指标提供有关Flink操作符的信息,例如操作符的输入/输出记录数、处理时间、缓存大小等。
4) Operator Metrics: These metrics provide information about the Flink operator, such as the number of input/output records for the operator, processing time, cache size, etc.

3.9.17 怎样在不停止任务的情况下改flink参数
3.9.17 How to change the flink parameter without stopping the task

Dynamic shunt, others have not done!

3.9.18 hbase中有表,里面的1月份到3月份的数据我不要了,我需要删除它(彻底删除),要怎么做
3.9.18 There is a table in hbase. I don't want the data from January to March in it. I need to delete it (delete it completely). How do I do it?

To completely delete the data in the table in HBase, you need to perform the following steps:

(1) Prohibited list

(2) Create a new table

(3) Copy the data that needs to be preserved, copying the data that needs to be preserved from the old table to the new table.

(4) Delete the old table

(5) Renaming the new table

Before performing these steps, it is recommended that you backup your data to prevent accidental data loss. In addition, if the amount of data in the old table is very large, the process of copying data into the new table can take a long time.

3.9.19 如果flink程序的数据倾斜是偶然出现的,可能白天可能晚上突然出现,然后几个月都没有出现,没办法复现,怎么解决?
3.9.19 If the data skew of flink program appears accidentally, it may suddenly appear during the day or at night, and then it does not appear for several months. There is no way to reproduce it. How to solve it?

Flink itself has a backpressure mechanism, and the short-term data tilt problem can be digested by itself, so it is not handled for this accidental data tilt.

3.9.20 维度数据改变之后,如何保证新join的维度数据是正确的数据
3.920 After dimensional data changes, how to ensure that the dimensional data of the new join is correct data

(1) We are using low-latency incremental updates, which are inherently delayed and cannot guarantee complete correct data.

(2) If you must get the correct result, you can only read MySQL data directly, but you need to consider concurrency and MySQL machine performance.

3.10 实时---业务
3.10 Real-time--business

3.10.1 数据采集到ODS层
3.10.1 Data Acquisition to ODS Layer

1) Why collect another piece of behavioral data from the front-end burial point?


Kafka storage for 3 days, disk enough: original 1T, now 2T, no pressure

2) Why choose Kafka?

Real-time writing, real-time reading

=》 消息队列适合,其他数据库受不了
Message queue is suitable, other databases can not stand

3) Why Maxwell? How does historical data synchronization ensure consistency?

FlinkCDC was released in July of '20.


Maxwell supports synchronization of historical data

Maxwell supports breakpoint restore (metadata exists)

Lighter data format

At least once, promise not to lose.

4) How long does Kafka last? What if you need previous data?

Consistent with offline projects: 3 days

Our project doesn't need it, if you need it, you can go to the database or Hive. ClickHouse also has historical wide table data.

3.10.2 ODS层

1) Storage of raw data

2个topic:埋点的行为数据 ods_base_log、业务数据 ods_base_db
2 topics: behavior data ods_base_log of buried points, business data ods_base_db

2) Order of business data:

maxwell配置,指定生产者分区的key为 table。
maxwell configuration, specifying that the producer partition key is table.

3.10.3 DWD+DIM

1) Storage location, why dimension tables store HBase?

Kafka in fact table and HBase in dimension table

Join scheme based on hot-storage loaded dimension table:


long-term consideration

Suitable for real-time reading and writing

2) Buried point behavior data diversion

(1) Repair new and old visitors (optional): Previously, it was the front end to try to distinguish new and old visitors, which was not accurate enough.

(2) Split flow: side output flow

分了3个topic: 启动、页面、曝光
There are three topics: launch, page, exposure

(3) User pop-up, independent visitor statistics

3) Business data processing

(1) Dynamic distribution: FlinkSQL reads topic_base_db data, filters out each schedule and writes it back to kafka.

(2) Order preprocessing table design: double-stream join, left-join

(3) Degeneration of dictionary table dimension

4) Dimension data written to Hbase

(1) In order to avoid restarting tasks due to dimensional data changes, a configuration table is stored in mysql for dynamic configuration.

Dynamic implementation: by broadcasting status

=》 读取一张配置表 ===》 维护这张配置表
=> Read a configuration table ==> Maintain this configuration table

source来源 sink写到哪 操作类型 字段 主键 扩展
source source sink to which operation type field primary key extension

=》实时获取配置表的变化 ==》CDC工具
=》Real-time access to configuration table changes ==》CDC tool

=》 FlinkCDC

=》 使用了sql的方式,去同步这张配置表
=》Use sql to synchronize this configuration table

=》SQL data format is more convenient

(2) How to write HBase: with the help of phoenix

No dimensional degradation is done

The data volume of the dimension table is small and the frequency of change is slow

(3) How is HBase's rowkey designed? Are there any data hotspots?

Largest dimension table: User dimension table

= 》Millions of daily active users, 20 million registered users as an example, 1 average 1k: 20 million * 1k = about 20G

Use the salt table created by Phoenix to avoid data hotspots

3.10.4 DWS层

1) Why ClickHouse

(1)适合大宽表、数据量多、聚合统计分析 =》 快
(1) Suitable for large and wide tables, large amount of data, aggregate statistical analysis = fast

(2) Wide tables no longer require Join, which is very suitable

2) Correlation dimension data

(1) Dimension association scheme: preloading, reading external databases, dual-stream joining, and LookupJoining

(2) Read the dimension data in HBase in the project

(3) Optimization 1: Asynchronous I/O

An asynchronous query actually hosts the query operation of the dimension table to a separate thread pool, so that it will not be blocked by a single query, and a single parallel can send multiple requests in a row, improving concurrency efficiency.

This method is especially aimed at operations involving network I/O to reduce the cost of request waiting.

Flink在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
In Flink 1.2, Async I/O is introduced, which asynchronously makes I/O operations asynchronous, so that a single parallel can send multiple requests in a row, and whichever request is returned first is processed first, so that there is no need for blocking waiting between consecutive requests, which greatly improves the efficiency of stream processing.

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
Async I/O is a highly requested feature that Alibaba has contributed to the community, solving the problem of network latency becoming a bottleneck when interacting with external systems.

(4) Optimization 2: Bypass cache

The Cache Bypass pattern is a very common pattern in which caches are allocated on demand. As shown in the figure, any request preferentially accesses the cache, the cache hits, and the data return request is directly obtained. If it misses, the database is queried and the results are cached for subsequent requests.

(5) How to ensure cache consistency

Solution 1: When we get the updated data of the dimension table, that is, when the operation type of the dimension table is update:

When updating HBase, delete the previously cached data in Redis

Redis sets an expiration time of 24 hours

Scenario 2: Dual-write

3) Mild polymerization

(1) The DWS layer has to deal with a lot of real-time queries, and if it is a complete detail, the query pressure is very large. Combining more real-time data in a thematic way makes it easier to manage and reduces the number of dimension queries.

(2) Open a small window, a 5s scrolling window

(3) At the same time, it reduces the pressure of writing ClickHouse and reduces the time for subsequent aggregation

(4)几张表? 表名、字段
(4) How many tables? Table name and field

Visitors, products, regions, keywords

3.10.5 ADS层

1) Implement the solution

Provides a data API for querying data in ClickHouse for the large visualization screen service.

2) How to ensure the consistency of ClickHouse?

ReplacingMergeTree can only guarantee eventual consistency, SQL syntax at query time plus deduplication logic.

3) How Flink tasks are monitored

Flink和ClickHouse都使用了Prometheus + Grafana

第4章 数据考评平台项目
Chapter 4: The Data Evaluation Platform Project

4.1 Project Background

4.1.1 为什么做数据治理
4.1.1 Why do we do data governance?

With the popularization of big data technology, more and more enterprises are building data warehouses, but due to the complexity of multiple data sources in enterprises, different calibers before and after indicators, inconsistent development specifications, and enterprise personnel flow, data warehouses are as follows:

数据计算不准确甚至错误,导致决策不可靠;各部门数据无法有效整合导致数据孤岛;数据缺乏保护措施,增加数据风险;规范不统一,在传输、存储、计算中增加理解难度,降低使用率; 数据难以创造价值,降低用户体验感等。
Inaccurate or even erroneous data calculations, resulting in unreliable decision-making; The data of various departments cannot be effectively integrated, resulting in data silos; Lack of data protection measures, increasing data risks; The specifications are not uniform, which increases the difficulty of understanding and reduces the utilization rate in transmission, storage, and computing. It is difficult for data to create value, and the user experience is reduced.

Based on this background, many companies need to do data governance.

4.1.2 数据治理概念
4.1.2 Data Governance Concepts

Data governance is a systematic approach to improving data quality, consistency, security, and integrity in an enterprise.

Design strategies, processes, technologies, tools.

The data governance enterprise generally relies on the data middle platform, which is a "one-stop" data processing and governance platform, which generally includes: data access integration, cleaning and conversion, storage and management, quality management, metadata management and lineage management, security, visualization and other functions.

4.1.3 数据治理考评平台做的是什么
4.1.3 What does the data governance evaluation platform do?

The data evaluation platform is a lightweight web platform that scores each table in the data warehouse from the perspectives of standardization, storage, computing, security, and quality, just like a computer health master scans the data warehouse every day to find out the tables that do not meet the specifications and make adjustments, so as to improve the quality of the data warehouse.

4.1.4 考评指标
4.1.4 Evaluation indicators

Specification: Whether the table name is standardized, whether it has table comments, field comments, and whether there is a responsible person

Storage: Stores specified lifecycles, similar tables, and empty tables

Calculation: calculation without output, no access, calculation error, simple processing

Security: security level, directory file access

Quality: indicator calculation time exceeds fluctuation, indicator calculation result exceeds fluctuation

4.2 技术架构
4.2 technical framework

4.3 项目实现了哪些功能
4.3 What functions does the project implement?

4.3.1 元数据的加载与处理及各表数据的页面接口
4.3.1 Loading and processing of metadata and page interface of each table data

(1) Use hiveClient object to call getTable method to obtain table object, and then obtain metadata, and write it to Mysql.

(2) Create hdfs client object with FileSystem class and read metadata of corresponding path

(3) Implement the interface, manually supplement some table information on the Web page, and persist it to Mysql.

(4)表查询列表 、单表信息、辅助信息接口实现
(4) Interface implementation of table query list, single table information and auxiliary information

Basic additions, deletions, corrections, and searches

4.3.2 数据治理考评链路(**核心**
4.3.2 Data Governance Assessment Link (** Core **)

Mysql中:元数据表,权重表,指标类型表 -》 治理考评明细
Mysql: metadata table, weight table, indicator type table-Governance Evaluation Details

(1) Generate the most basic (bean, control layer, service layer, data layer) code based on database tables through Mybatis-plus code generation tools.

(2)取得所有待考评表的列表 (List<TableMetaInfo>)
(2) Get a list of all tables to be evaluated (List<TableMetaInfo>)

(3) Obtain a list of all indicators to be evaluated (List<GovernanceMetric>)

(4) Two lists are iterated in two layers to obtain the evaluator of each index item, and the parameters required by the evaluator are transferred to the evaluator to evaluate and score, so as to obtain the scoring results (List ) of each index item in each table<GovernanceAssessDetail>.

(5) Save evaluation results to MySQL

4.3.3 数据治理考评结果核算
4.3.3 Accounting of data governance evaluation results

主要的计算方式就是利用sql group by 进行计算。
The main calculation method is to use SQL group by calculation.

When calculating the accounting table, the governance type of the evaluation index should be considered. Different governance types correspond to different weights. The score is multiplied by the weight to calculate the score for that governance type.

(1) Calculate the evaluation score of each table

(2) Calculate the evaluation score of each technical director

(3) Calculate the global evaluation score

(4) The evaluation tasks are connected in series and dispatched uniformly.

(5) Scheduling task calculation with springtask

Advantages: Simple and easy to use.

Disadvantages: Distributed scheduling is not possible.

springboot启动程序上增加注解 @EnableScheduling
Add annotations to the SpringBoot launcher @EnableScheduling

@Scheduled in Method (Parameter)

4.3.4 可视化治理考评提供数据接口
4.3.4 Visual governance evaluation provides data interfaces

Implementation interfaces: the number of problems of each governance type, the ranking of grouped personnel, and the manual triggering of evaluation

4.4 项目中的问题/及优化
4.4 Problems in the project and/or optimization

4.4.1 计算hdfs路径数据量大小、最后修改访问时间
4.4.1 Calculate the size of the HDFS path data and the last modified access time

Utilize recursive implementation

4.4.2 考评器作用是什么?
4.4.2 What is the purpose of the evaluator?

Template schema design: An abstract class exposes a template/template that defines the way in which it is executed. Its subclasses can be implemented by overriding methods as needed, but the calls will be made in the way defined in the abstract class.

Features: The parent abstract class is responsible for controlling the call, and the different subclasses are responsible for the core function.

Benefits: In line with the principle of open and closed, that is, closed to modification and extended development. Code responsibility cleaning is easy to maintain.

4.4.3 稍微难度考评器实现思路
4.4.3 Slightly difficult evaluator implementation ideas

Whether the table name is compliant: Use regular matching to determine the table name

Is the life cycle reasonable:

Similarity table:

Table Output Data Volume Monitoring:

Directory file data access exceeds permissions Recommended values:

Simple processing: sql parse syntax tree, define node processor

1) Whether there is complex processing logic, which tables are queried, and which fields are filtered

4.4.4 利用多线程优化考评计算
4.4.4 Using multithreading to optimize evaluation calculations

使用线程池+ CompletableFuture异步执行考评指标计算,提升效率
Use thread pool + CompletableFuture to perform evaluation index calculation asynchronously to improve efficiency

4.4.5 实现过哪些指标
4.4.5 What indicators have been achieved

Simple indicators: whether there is a business owner, whether there is a table comment, whether there is a missing field comment, whether the table is empty, whether the security level is set, whether the table has not been accessed for a long time, and whether the table has not been produced for a long time.

DS-related indicators: error reporting for tasks of the day, time-effectiveness monitoring for table output, simple processing, select* included in sql, data tilt check

第4章 用户画像项目
Chapter 4: The User Portrait Project

4.1 画像系统主要做了哪些事
4.1 What does the portrait system mainly do?

1) User information labeling

2) Application of tagged data (clustering, insight analysis)

3) How labels are modeled and what labels are there

According to user demand, coordinate product manager to plan four-level label together. The first two levels are classification, the third level is label, and the fourth level is label value.

4.2 项目整体架构
4.2 Overall project structure

4.3 讲一下标签计算的调度过程
4.3 Let's talk about the scheduling process of label calculation.

4.4 整个标签的处理过程
4.4 Batch process of whole label

Four missions:

(1) Produce label sheet tables by writing SQL according to the business logic of each label.

(2) Merge label single tables into label wide tables.

(3) Export the label width table to the label width table in Clickhouse.

(4) Dump the label table in Clickhouse into a Bitmap table.

The four tasks were accomplished by writing Spark programs. And through the portrait platform scheduling, in the future to add labels only need to fill in the label definition, SQL and related parameters in the platform.

4.5 你们的画像平台有哪些功能 ?
4.5 What are the functions of your portrait platform?

(1) Definition of labels

(2) Label task setting

(3) Task scheduling

(4) Task monitoring

(5) Cluster creation and maintenance

(6) Crowd Insight

4.6 是否做过Web应用开发,实现了什么功能 
4.6 Have you done Web application development and what functions have been implemented?

(1)画像平台   分群
(1) Portrait platform group

(2)画像平台 其他功能(可选)
(2) Other functions of portrait platform (optional)

(3)实时数仓   数据接口 
(3) Real-time warehouse data interface

4.7 画像平台的上下游 
4.7 Upstream and downstream of the portrait platform

(1)上游:  数仓系统 
(1) Upstream: warehouse system

(2)下游:  写入到Redis中,由广告、运营系统访问。
(2) Downstream: written to Redis, accessed by advertising and operation systems.

4.8 BitMap原理,为什么可以提高性能
4.8 BitMap Principle and Why It Can Improve Performance

Bitmap是一个二进制集合,用0或1 标识某个值是否存在。
Bitmap is a binary set that identifies whether a value exists with a 0 or 1.

在求两个集合的交集运算时,不需要遍历两个集合,只要对位进行与运算即可。无论是比较次数的降低(从O(N^2) 到O(N) ,还是比较方式的改善(位运算),都给性能带来巨大的提升。
When finding the intersection of two sets, you don't need to traverse the two sets, just perform the sum operation on the bits. Both the reduction in the number of comparisons (from O(N^2) to O(N)) and the improvement in comparison methods (bitwise arithmetic) have resulted in a huge performance improvement.

业务场景:把每个标签的用户id集合放在一个Bitmap中,那多个标签求交集(比如:女性 + 90后)这种分群筛选时,就可以通过两个标签的Bitmap求交集运算即可。
Business scenario: Put the user ID set of each tag in a Bitmap, and when the multiple tags are intersected (for example, female + post-90s), you can use the Bitmap intersection operation of the two tags.

第5章 数据湖项目
Chapter 5: Data Lake Project

5.1 数据湖与数据仓库对比
5.1 Data lakes vs. data warehouses

数据湖(Data Lake)是一个存储企业的各种各样原始数据的大型仓库,其中的数据可供存取、处理、分析及传输。
A data lake is a large warehouse that stores a wide variety of raw data for an enterprise, which can be accessed, processed, analyzed, and transferred.

HudiIcebergData Lake、Paimon

5.2 为什么做这个项目?解决了什么痛点?
5.2 Why did you do this project? What pain points have been solved?

(1) Pain points of offline data warehouses

Timeliness: T+1 mode, poor timeliness

Data updates can only be overwritten, which consumes resources

(2) Real-time data warehouse pain points

Data consistency issues: Troublesome maintenance

Historical data correction: There is no persistent detailed data, which needs to be re-run, and the process is cumbersome

(3) The development direction of traditional data warehouses

Integration of flow and batch: One set of architecture, one set of code, can run batch or stream

==》节省 资源 人力
==》Save resources and manpower

(4) Advantages of Hudi Data Lake

Reduced offline timeliness to minutes (5~10 minutes)

Incremental processing is supported

UPSERT is supported for data updates

With the development trend of big data technology, the company is not satisfied with a single data lake and data architecture, and wants to integrate data lake and data warehouse, build on the low-cost data storage architecture of the data lake, and inherit the data processing and management functions of the data warehouse.

5.3 项目架构
5.3 Project Structure

5.4 业务
5.4 Business

The business is the same as that of the real-time data warehouse.

5.5 优化or遇到的问题怎么解决
5.5 How to solve the problems encountered by the optimization or

1) How to handle resumable data collection

FlinkCDC分为全量和binlog,他们都是基于Flink state的能力,同步过程会将进度存储在state中,如果失败了,下一次会从state中恢复即可。
FlinkCDC is divided into full and binary logs, both of which are based on the capabilities of Flink state, and the synchronization process will store the progress in the state, and if it fails, it will be restored from the state next time.

2) Write Hudi table data skew problem

In the full phase of FlinkCDC, after reading a table and then reading the next table, if multiple sinks are connected downstream, only one sink writes data.

Use the multi-table hybrid reading mode to solve the problem.

大状态:regular join + 无TTL
Large status: regular join + no TTL

rocksdb +增量
rocksdb + incremental

Hudi optimized

(1) MOR table, offline compaction (not bound to the writing process)

(2) Related concurrency, memory

Compaction、write并发 =》4

内存:compaction =》1G

(3)大状态:rocksdb +增量
(3) Large state: rocksdb + increment

Hudi Phase II Planning

(1) solve the problem of large state ="do not use multi-stay join, use" partial column "update scheme

=> hudi<=0.12, not officially provided, custom payload implementation class is required (big factory implementation)

=》0.13.0,官方加入了 部分列更新的payload类
=》0.13.0, the official added the payload class updated in some columns

(2) The DWS layer is replaced with olap=》clickhouse to store details

I hope that DWS is a kind of detail, and supports flexible self-service analysis == "In the future, even real-time projects can be killed

6章 测试&上线流程
Chapter 6 Testing & Go-live Process

6.1 测试相关
6.1 Testing

6.1.1 公司有多少台测试服务器
6.1.1 How many test servers does the company have?

There are generally three test servers.

6.1.2 测试服务器配置
6.1.2 Test Server Configuration?

Wealthy companies have the same computer configuration as the production environment.

The configuration of the test environment in a typical company is half that of production.

6.1.3 测试数据哪来的?
6.1.3 Where does the test data come from?

Some of them write their own Java programs (more flexible), and some of them take some of them from the production environment (more realistic).

6.1.4 如何保证写的SQL正确(重点)
6.1.4 How to Ensure the Correctness of Written SQL (Important)

First calculate the results in MySQL's business library; compare the results calculated in the ads layer for you;

You need to create some specific test data, tests.

Grab some data from the production environment, how much data you know, and the calculation should meet your expectations.

离线数据和实时数据分析的结果比较。(日活1万 实时10100),倾向取离线。
Comparison of offline and real-time data analysis results. (10,000 live days in real time), tend to take offline.

heterogeneous algorithm

Real-time data quality monitoring (scripts, schedulers, visualizations, fault alarms)

6.1.5 测试之后如何上线?
6.1.5 How to go online after testing?

Big companies: when you go live, package scripts and submit them to git. First email copied to manager and director, operations. Operation and maintenance is responsible for on-line.

Small company: talk to the project manager, the project manager technical check, the project manager can go online after passing. Risk awareness.

The so-called online is to write scripts and schedule jobs in Dolphin Scheduler.

6.1.6 A/B测试了解
6.1.6 A/B Test Understanding

1)什么是 A/B 测试?
1) What is A/B testing?

A / B测试本质上是一种实验,即随机向用户显示变量的两个或多个版本,并使用统计分析来确定哪个变量更适合给定的转化目标。
A / B testing is essentially an experiment in which two or more versions of a variable are randomly presented to the user and statistical analysis is used to determine which variable is better suited for a given transformation goal.

2) Why do I need to do an A/B test?

举例:字节跳动有一款中视频产品叫西瓜视频,最早它叫做头条视频。为了提升产品的品牌辨识度,团队想给它起个更好的名字。经过一些内部调研和头脑风暴,征集到了西瓜视频、奇妙视频、筷子视频、阳光视频4个名字,于是团队就针对一共5个APP 名称进行了A/B实验。
For example: ByteDance has a medium video product called watermelon video, which was originally called headline video. In order to enhance the brand recognition of the product, the team wanted to give it a better name. After some internal research and brainstorming, four names were collected: watermelon video, wonderful video, chopsticks video and sunshine video, so the team conducted A/B experiments on a total of five APP names.

The only change in this experiment was the name and logo of the product in the app market, and the purpose of the experiment was to verify which app name could better improve the click rate of the "Headline Video" APP in the app store. Finally, the click rate of watermelon video and wonderful video ranked in the top two, but the difference was not significant. Combined with the comprehensive consideration of factors such as user tonality, it was finally decided that the headline video was officially renamed watermelon video.

通过这个案例可以看到,A/B测试可以帮助业务做最终决策。结合案例的直观感受,我们可以这样来定义A/B 测试:在同一时间对目标受众做科学抽样、分组测试以评估效果。
As you can see from this example, A/B testing can help businesses make final decisions. Combined with the intuitive feel of the case, we can define A/B testing as follows: scientific sampling of the target audience at the same time, group testing to evaluate the effect.

For example, suppose we have 1 million users who want to do A/B testing:

First select the target audience, such as users in first-tier cities. A/B testing cannot be done on all users, so it is necessary to conduct scientific sampling and select a small number of traffic for experiments.

After sampling, the sample needs to be grouped, such as group A to maintain the status quo, and group B to change a certain factor.

When you experiment at the same time after grouping, you can see the change in user behavior after changing variables.

The results of the experiment are then evaluated according to the indicators corresponding to the experimental objectives, such as the click-through rate.

There are 3 main reasons to do A/B testing:

(1) Risk control: Small flow experiments can avoid losses caused by poor direct online results. Secondly, in the process of experimental iteration, the decisions are scientifically based, which can avoid systematic bias.

(2) Causal inference: We believe that the optimizations and changes in A/B experiments can ultimately affect online data and user behavior. Under this premise, A/B testing is the best tool for causal inference.

(3) Compound interest effect: A/B testing is an experiment that can be carried out continuously, even if the effect of an experiment is not large, but the accumulation of compound interest effect will produce great changes and returns in the long run.

3) Which homepage new UI version is more popular

The overall style of ToutiaoUI has been criticized for a long time, which is not conducive to the generalization of young and female users. Therefore, the team designed an A/B experiment with the goal of changing the UI with better user reviews within an acceptable negative range. Several A/B experiments were performed on the following variables using the control variable method:

头部色值饱和度字号字重上下间距左右间距底部 tab icon
Head color value saturation, font size, weight, top and bottom spacing, left and right spacing, bottom tab icon.

结合用户调研(结果显示:年轻用户和女性用户对新 UI 更偏好)
Combined with user research (results show that younger and female users prefer the new UI).

综合来看,效果最好的 UI 版本如下图所示,全量上线。
Overall, the best-performing UI version is shown in the figure below, and it is fully launched.

新 UI 上线后,Stay duration 显著负向从-0.38% 降至 -0.24%,图文类时长显著 +1.66%,搜索渗透显著 +1.47%,高频用户(占 71%)已逐渐适应新 UI。
After the launch of the new UI, the Stay duration decreased significantly from -0.38% to -0.24%, the duration of graphics and texts was significantly +1.66%, and the search penetration was significantly +1.47%, and high-frequency users (71%) have gradually adapted to the new UI.

6.2 项目实际工作流程
6.2 Actual Project Workflow

Here's the overall development process for active user demand.

The product manager is responsible for gathering requirements: the source of the requirements, customer feedback, and the opinion of the boss.

Step 1: Determine the business caliber of the indicator

由产品经理主导,找到提出该指标的运营负责人沟通。首先要问清楚指标是怎么定义的,比如活跃用户是指启动过APP的用户。设备id 还是用户id
The product manager leads the communication with the operations leader who proposed the metric. First of all, ask how the metric is defined, for example, an active user is a user who has launched an app. Device ID or User ID.

The product manager writes the requirements document and draws the prototype diagram. Don't talk about demand.

Step 2: Requirements Review

由产品经理主导设计原型,对于活跃主题,我们最终要展示的是最近n天的活跃用户数变化趋势 ,效果如下图所示。此处大数据开发工程师、后端开发工程师、前端开发工程师一同参与,一起说明整个功能的价值和详细的操作流程,确保大家理解的一致。
Product managers lead the design of prototypes. For active topics, we will eventually show the trend of active users in the last n days, as shown in the following figure. Here, big data development engineers, back-end development engineers and front-end development engineers participate together to explain the value of the whole function and detailed operation process to ensure that everyone understands the same.


Interface: data format, field type, responsible person.

Step 3: Big Data Development

Big data development engineers synchronize data to ODS layer through data synchronization tools such as Flume, Datax, Maxwell, etc., and then calculate it layer by layer to DWD and DWS layer through SQL, and finally form data that can be directly served by application to fill ADS layer.

Step 4: Backend Development

Back-end engineers are responsible for providing business data interfaces to big data engineers.

Also responsible for reading ADS layer analysis, write MySQL data.

Step 5: Front End Development

Front end engineer responsible, front end buried point.

Visualize the result data after analysis.

Step 6: Joint debugging

At this time, big data development engineers, front-end development engineers, and back-end development engineers should all participate. At this time, the big data development engineer will be required to perform calculation tasks based on historical data, and the big data development engineer will undertake the verification of data accuracy. Before and after the end to solve the user operation related BUG to ensure that there is no low-level problem to complete the self-test.

Step 7: Testing

Test engineers test the entire big data system. Test methods include boundary values, equivalence classes, etc.

The software that submitted the test exception was: Zen Road (tester records test problem 1.0, what is the input, what is the result, different from expectations-> developer explanation, is a bug, the next version solves 1.1-> tester tests again. Test 1.1ok-> Test Manager Close bug)

1周开发写代码 =2周测试时间
1 week development code writing = 2 weeks testing time

Step 8: Go online

The Ops Engineer will work with our backend development engineers to update the latest version to the server. At this point, the product manager needs to find the person responsible for the indicator to follow up on the accuracy of the indicator for a long time. Important indicators must be verified internally every cycle to ensure the accuracy of the data.

6.3 项目当前版本号是多少?多久升级一次版本
6.3 What is the current version number of the project? How often do you update your version?

Agile development (small requirements => code writing => testing => small requirements => code writing => testing…), also known as small steps.

It iterates about once a month. Every month there are festivals (New Year's Day, Spring Festival, Valentine's Day, March 8 Women's Day, Dragon Boat Festival, June 18, National Day, Mid-Autumn Festival, 1111/6.1/5.1, birthday, weekend) New products, new areas.

We propose optimization requirements for the product or us, and then evaluate the time. Every week we meet to plan for the week and wrap up for the week. (Daily, weekly, monthly, quarterly, annual) demand 1 week time, Wednesday must be completed. Thursdays and Fridays (help coworkers write code, learn extra skills for work).


5 is a major version number: must be a major upgrade

1: Generally core module changes

2: General Version Changes

6.4 项目实现一个需求大概多长时间
6.4 How long does it take to implement a requirement in a project?

(1) The first requirement for new employment takes about 7 days. After becoming familiar with the business, an average of one demand a day.

(2) Factors affecting time: familiarity with business, meeting to discuss requirements, authorization application for forms, testing, etc. New employee training (company rules and regulations, code specifications)

6.5 项目开发中每天做什么事
6.5 What do you do every day during project development?

(1)新需求(活动、优化、新产品、新市场) 60%
(1) New requirements (activities, optimization, new products, new markets). 60%

(2) Fault analysis: There is a problem in any step of the warehouse, and it is necessary to check the problem, such as daily activity, monthly activity decline or rapid rise, etc. 20%

(3)新技术的预言(比如湖仓一体 数据湖 Doris 实时数据质量监控)10%
(3) Prediction of new technologies (such as Lake Warehouse Integrated Data Lake Doris real-time data quality monitoring) 10%

(4)其临时任务 10%
(4) Temporary assignment 10%

(5) Morning meeting-> 10 exercises-> Discuss what to eat at noon-> 12:00 go out to eat 1:00-> Sleep until 2:00-> 3:00 tea break fruit-> What to eat at night-> Eat overtime meal-> Meeting-> 6:00 pm eat-> 7:00 start work-10:00-> 11:00

7章 数据治理
Chapter 7: Data Governance

7.1 元数据管理
7.1 metadata management

Atlas framework is widely used among the open source frameworks for metadata management. Then there is the use of self-developed systems.

1) Metadata management underlying implementation principle

Parse the following HQL to obtain the direct dependency relationship between the corresponding source data table and target table.

insert into table ads_user

select id, name from dws_user

依赖关系能够做到:表级别和字段级别 neo4j
Dependency can be: table-level and field-level neo4j

2) Use: Failure to execute the job, evaluate its scope of influence. Mainly used for companies with more tables

Atlas version issues:



Framework Version:

Apache 0.84 2.0 2.1

CDH 2.0

3) Metadata management of Shangda self-research

7.2 数据质量监控
7.2 Data Quality Monitoring

7.2.1 监控原则
7.2.1 Monitoring principles

1) Single meter data volume monitoring

The number of records in a table is within a known range, or does not fluctuate above or below a certain threshold

SQL结果:var 数据量 = select count(*)from where 时间等过滤条件
SQL result: var data amount = select count (*) from table where time filter condition

报警触发条件设置:如果数据量不在[数值下限, 数值上限], 则触发报警
Alarm trigger condition setting: If the data volume is not in [lower limit, upper limit], the alarm will be triggered.

同比增加:如果((本周的数据量 - 上周的数据量)/上周的数据量*100)不在 [比例下线,比例上限],则触发报警
Year-on-year increase: If ((this week's data volume-last week's data volume)/last week's data volume *100) is not in [scale lower line, scale upper limit], an alarm will be triggered.

环比增加:如果((今天的数据量 - 昨天的数据量)/昨天的数据量*100)不在 [比例下线,比例上限],则触发报警
Ring increase: if ((today's data volume-yesterday's data volume)/yesterday's data volume *100) is not in [scale lower line, scale upper limit], trigger alarm

Alarm trigger conditions must be set. If there is no configured threshold, monitoring cannot be done

Daily, weekly, monthly, survival (day, week, month), conversion rate (day, week, month) GMV (day, week, month)

复购率(日周月) 30%
Repurchase rate (day, week, month) 30%

2) Single table null value detection

The number of records with a field empty is within a range, or the percentage of the total is within a threshold

Target field: select the field to be monitored. None cannot be selected.

SQL结果:var 异常数据量 = select count(*) from where 目标字段 is null
SQL result: var Exception = select count(*) from table where target field is null

单次检测:如果异常数据量不在[数值下限, 数值上限],则触发报警
Single detection: if (abnormal data volume) is not in [lower limit, upper limit], trigger alarm

3) Single table duplicate value detection

Whether one or more fields satisfy certain rules

目标字段:第一步先正常统计条数;select count(*) form 表;
Target field: the first step is to count the number of normal items;select count(*) form table;

第二步,去重统计;select count(*) from group by 某个字段
Select count(*) from table group by field

Subtract the value of step 1 from the value of step 2 to see if it is within the upper and lower line thresholds

单次检测:如果异常数据量不在[数值下限, 数值上限], 则触发报警
Single detection: if (abnormal data volume) is not in [lower limit, upper limit], trigger alarm

4) Single table range detection

One or more fields have no duplicate records

Target field: select the field to monitor. Multiple selections are supported.

检测规则:填写“目标字段”要满足的条件。其中$1表示第一个目标字段,$2表示第二个目标字段,以此类推。上图中的“检测规则”经过渲染后变为“delivery_fee = delivery_fee_base+delivery_fee_extra”
Detection rule: fill in the conditions to be satisfied by the Target Field. Where $1 represents the first target field,$2 represents the second target field, and so on. The "detection rule" in the above image is rendered to "delivery_fee = delivery_fee_base+delivery_fee_extra"

The threshold configuration is the same as null detection

5) Comparison of data volume across tables

Mainly for synchronization process, monitor whether the data volume of two tables is consistent

SQL结果:count本表 - count关联表
SQL results: count- count

The threshold configuration is the same as null detection

7.2.2 数据质量实现
7.2.2 Data Quality Realization

7.2.3 实现数据质量监控,你具体怎么做,详细说?
7.2.3 Data quality monitoring, how do you do it, in detail?

To realize the function of data quality detection, we need to first clarify the dimensions of data quality, such as accuracy, completeness, uniqueness, timeliness and consistency.

1) Determine the data source

Identify data sources that require data quality testing. This could be database tables, files, APIs, etc.

2) Define quality rules

Define specific rules for each data quality dimension. For example:

Accuracy: Check whether the data fits the expected range or distribution.

Integrity: Check data for missing or null values.

Uniqueness: Check data for duplicates.

Timeliness: Check if the data is updated within the expected time frame.

Consistency: Check whether data conforms to predefined formats or standards.

3) Realize detection function

Write functions that check data quality using a programming language such as SQL. These functions may include:

- Data Import: Import data from a data source.

- Data cleaning: Pre-processing of data, such as removing spaces, converting data types, etc.

- Apply quality rules: implement corresponding detection functions according to defined quality rules. For example, check for missing values, duplicates, or data ranges.

Output reports: Generate data quality reports, such as summarizing test results into tables or visual charts.

4) Automation and monitoring

Integrate data quality inspection capabilities into data pipelines or ETL processes for automated inspection. In addition, monitoring and alert mechanisms can be set up to notify relevant personnel when data quality problems are detected.

7.3 权限管理(Ranger)
7.3 Authority Management (Ranger)

7.4 用户认证(Kerberos)
7.4 User authentication (Kerberos)

7.5 数据治理
7.5 data governance

Quantitative model of asset health.

According to the key factors of data asset health management, define the quantitative points rules. According to the integrity of data basic information, data storage and data calculation health degree, rationality of data quality monitoring rules, etc., complete calculation of data asset health score.

1) Basic logic of asset health

(1) Basic principles for setting health scores:

Health score adopts centenary system, 100 is the highest, 0 is the lowest;

The table is the finest granularity of health degree, and each table has a health score.

Health scores of individuals, business segments, teams, first-level departments, and groups are weighted averages of health scores of the tables to which they belong;

数据表权重=表字节数 + 1再开立方根;空表的权重为1;
Weight of data table =(number of bytes in table + 1) and cube root; weight of empty table is 1;

(2) Data sheet Asset health score:

数据表资产健康分score =(规范合规健康分*10% + 存储健康分*30% + 计算健康分*30% + 数据质量健康分*15% + 数据安全健康分 * 15%
Data table asset health score =(specification compliance health score *10% + storage health score *30% + calculation health score * 30% + data quality health score *15% + data safety health score * 15%);

2) List of data asset characteristics:

Asset Health Type


Eigenscore computation logic

规范 Specification

规范健康分= 100 * sum(特征分)/count(特征)
Normative health score = 100 * sum(feature score)/count(feature score)

Technical owner


Business owner


There is partition information


There is a department to which it belongs


Table naming compliance


Warehouse Layered Compliance


The table has comments


Field has comment information

有备注字段数 / 总字段数
Number of fields with comments/total number of fields

存储 Storage
Storage Storage

存储健康分= 100 * 完成度
Storage Health Score = 100 * Completion

life cycle rationality

永久保留表:不可再生源头表、白名单表、冷备表、最近93天有访问非分区表, 按100%完成度;
Permanent reservation table: non-renewable source table, white list table, cold standby table, non-partition table visited in the last 93 days, according to 100% completion degree;

Unmanaged table: partition table but no life cycle configured, 0% completion;

无访问表:在93天前创建,但最近 93 天无访问,按0%完成度;
No access table: created 93 days ago, but no access in the last 93 days, according to 0% completion;

New table: created less than 93 days ago, access data has not been accumulated and reasonable life cycle has not been configured, default is 80% completion;

Ordinary table: In addition to the above ordinary tables, the system calculates the ratio of the recommended retention days to the current life cycle as the completion degree;

计算 Calc
Calculate Calc

计算健康分= 100 * sum(特征分)/count(特征)
Calculate health score = 100 * sum(feature score)/count(feature score)

hdfs path deleted

Deleted, 0; otherwise 1

Output is null

No data output for 15 consecutive days, 0 points; otherwise, 1;

Output table not read

No reading of output data table in the last 30 days, 0 point; otherwise, 1 point;

run error

0 for errors in task operation in the last 3 days; otherwise 1;

Duplicate/Similar Tables

0 points for 50% similarity with other data tables, otherwise 1;

责任人不合理 ​
The person responsible is unreasonable.

对应的调度节点责任人已经离职,或调度节点责任人在职但与数据表责任人不一致 ,0分;否则为1;
The corresponding dispatching node responsible person has resigned, or the dispatching node responsible person is on-the-job but inconsistent with the data table responsible person, 0 points; otherwise, 1;

simple processing

生产sql只简单 select字段出来,没有 join、group;where 条件只有分区字段 ;0分,否则为1;
Production sql only has simple select field, without join and group;where condition only has partition field;0 points, otherwise 1;

Violent scanning

表中被查询分区大于 90 天,同时被查询分区的总存储量大于 100G ;0分,否则为1;
The queried partition in the table is greater than 90 days, and the total storage capacity of the queried partition is greater than 100G ;0 points, otherwise 1;

Inconsistent type on both sides

Similar to this example:

select ... from table1 t1 join table2 t2 on t1.a_bigint=t2.a_string;

这种情况在 on 条件中两边都被 double了,这个其实不合理;是个大坑, 会导致行为和用户期待不一致
In this case, both sides are doubled in the on condition, which is actually unreasonable; it is a big pit, which will lead to inconsistent behavior and user expectations.

数据倾斜 ​
data skew

长尾运行实例耗费时高于平均值 20%,分数记为0,否则记为1;
Long-tailed running instances cost 20% more than average, score 0, otherwise score 1;

Column clipping required

判断 select 语句及后续使用逻辑,是否 select 出来的列都被使 用,用被使用的列数/总 select 列数计算使用率,低于 50% 就需要列剪裁,0分,否则为1;
Judge the select statement and subsequent use logic, whether all the selected columns are used, calculate the utilization rate by using the number of columns used/the total number of selected columns, if it is less than 50%, column pruning is required, 0 points, otherwise 1;

质量 Quality

质量健康分= 100 * sum(特征分)/count(特征)
Quality health score = 100 * sum(feature score)/count(feature score)

Table Output Timing Monitoring

qdc有定义产出时间预警或已经归属于某个生产基线; 0/1;
qdc has a defined output time warning or has been attributed to a production baseline; 0/1;

Table content monitoring

有配置表级规则; 0/1;
Configuration table level rule exists; 0/1;

Field Content Monitoring

有配置字段级规则; 0/1;
There are configuration field-level rules; 0/1;

Table Output SLA

X-point timeliness SLA measurement function:

realTime: actual data output time

expectTime: 期望数据产出的时间点
expectTime: The point in time at which data is expected to be produced

n: Data output cycle (for multiple scheduling)

Table Content SLA

1-Trigger monitoring rules/total monitoring rules

Field Content SLA

1-Trigger monitoring rules/total monitoring rules

安全 Security

安全健康分= 100 * sum(特征分)/count(特征)
Safety and health score = 100 * sum(feature score)/count(feature score)

data classification

有明确设置归属的“资产目录” ; 0/1;
There is an Asset Directory with clear attribution settings; 0/1;

asset classification

有指定资产等级; 0/1;
There is a specified asset class; 0/1;

Field-level security level

有字段设置了安全等级; 0/1;
There are fields where security levels are set; 0/1;

第8章 中台
Chapter 8: The Last Day

8.1 什么是中台?
8.1 What is the middle stage?

1) What is Front Desk?

First of all, the "foreground" and "front end" are not the same thing. The so-called foreground includes various interfaces that interact directly with users, such as web pages and mobile apps; it also includes various business logics that respond to user requests in real time on the server side, such as commodity queries, order systems, etc.

2) What is Backstage?

The background is not directly user-oriented, but a configuration management system for operators, such as commodity management, logistics management, and settlement management. The background provides some simple configuration for the foreground.

3) Why do you want to be in the middle?

Traditional project pain point: repeat wheel building.

8.2 各家中台
8.2 Gejia Zhongtai


2) Alibaba put forward the strategy of "large and medium-sized platform, small foreground"

3) Huawei put forward the strategy of "platform artillery support elite combat"

8.3 中台具体划分
8.3 Specific division of middle platform

1)业务中台 & 技术中台
1) Business & Technology Center

业务中台 技术中台
Figure Business Middle Station Figure Technical Middle Station

2)数据中台 & 算法中台
2) Data Center & Algorithm Center

图 数据中台 图 算法中台
Table in graph data Table in graph algorithm

8.4 中台使用场景
8.4 Central Station Usage Scenarios

1) From 0 to 1, there is no need to build a middle stage.

Start-up companies from 0 to 1, the primary goal is to survive, to build products as quickly as possible, to prove their market value.

At this time, letting the project grow savagely was the best choice. If you don't hurry to build the middle platform first, I'm afraid the company will starve to death before the middle platform is built.

2) The stage from 1 to N is suitable for building the middle stage.

When the enterprise has a certain scale and the product has been recognized by the market, the primary purpose of the company is no longer to survive, but to live better.

At this time, while the complexity of the project is not particularly high, you can consider sinking the common parts of each project and forming a middle platform to facilitate subsequent attempts of new projects and iterations of old projects.

3) From N to N+1, it is imperative to build a middle platform.

When the enterprise has a large scale and various products, services and departments are complicated, it will be painful to make structural adjustments at this time.

However, long-term pain is better than short-term pain. For the long-term development of the project, it is still necessary to adjust the architecture as soon as possible to realize platformization, so as not to become more and more difficult to maintain in the future.

8.5 中台的痛点
8.5 Pain point in the middle stage

A pull starts the whole body, and small changes in the middle stage need to be strictly tested. The cycle is longer.

Large factories generally have a general middle station, but also a department-level middle station to ensure efficiency.

9章 算法题(LeetCode)
Chapter 9: LeetCode

9.1 时间复杂度、空间复杂度理解
9.1 Time complexity, spatial complexity understanding

In computer algorithm theory, the performance of algorithms is measured from these two aspects by time complexity and spatial complexity.

1)时间复杂度(Time Complexity)
1) Time Complexity

The time complexity of an algorithm refers to the amount of computational work required to execute the algorithm.

一般来说,计算机算法是问题规模n 的函数fn,算法的时间复杂度也因此记做:Tn= Οfn))。
In general, a computer algorithm is a function of f(n) of the problem size n, and the time complexity of the algorithm is therefore denoted as: T(n) = Ο(f(n)).

问题的规模n 越大,算法执行的时间的增长率与fn的增长率正相关,称作渐进时间复杂度(Asymptotic Time Complexity)。
The larger the size of the problem n, the growth rate of the algorithm execution time is positively correlated with the growth rate of f(n), which is called the Asymptotic Time Complexity.

2) Spatial complexity

The spatial complexity of the algorithm refers to the memory space consumed by the algorithm. Sometimes recursive calls are made, and the space occupied by the call stack needs to be considered.

Its calculation and expression methods are similar to time complexity, which is generally expressed by the asymptotic property of complexity. Spatial complexity analysis is much simpler than temporal complexity analysis.

Therefore, our analysis of program complexity generally focuses on time complexity.

9.2 常见算法求解思想
9.2 Common algorithm solution ideas

1) Violent solution

Not recommended.

2) Dynamic planning

动态规划(Dynamic Programming,DP)是运筹学的一个分支,是求解决策过程最优化的过程。
Dynamic programming (DP) is a branch of operations research, which is the process of solving the optimization of decision-making process.

The dynamic programming process is to divide the original problem into multiple "stages" and make "decisions" in turn to obtain the current local solution; each decision depends on the current "state" and will immediately cause the state to transition.

这样,一个决策序列就是在变化的状态中,“动态”产生出来的,这种多阶段的、最优化决策,解决问题的过程就称为动态规划(Dynamic Programming,DP)
In this way, a decision sequence is "dynamically" generated in a changing state, and this multi-stage, optimal decision-making, problem-solving process is called dynamic programming (DP).

3) Branches

For complex optimization problems, it is often necessary to traverse the search solution space tree. The most intuitive strategy is to search all branches of the current node in turn, and then search for the solution of the whole problem. In order to speed up the search process, we can add some constraints to calculate the priority value, get the branch of priority search, and thus find the optimal solution faster: this strategy is called "branch and bound method".

Branch-and-bound methods often search the solution space tree of the problem in a breadth-first (BFS) or least-cost (maximum-benefit)-first manner.

9.3 基本算法
9.3 basic algorithm

9.3.1 冒泡排序
Bubble Sort

Bubble sort is a simple sort algorithm.

The basic principle is to repeatedly scan the sequence to be sorted, comparing two elements at a time, and swapping them if they are in the wrong order of size. In this way, at the end of a scan, we can ensure that the largest (smallest) value is moved to the end of the sequence. The algorithm gets its name because smaller elements slowly float to the top of the sequence by swapping.

The time complexity of bubble sort is O (n2).

public void bubbleSort(int nums[]) {

int n = nums.length;

for(int i = 0; i < n - 1; i++) {

for(int j = 0; j < n - i - 1; j++) {

if(nums[j + 1] < nums[j])

swap(nums, j, j + 1);




9.3.2 快速排序
9.3.2 Quick Sorting

The basic idea of quick sorting: by sorting, the records to be sorted are separated into two independent parts, one of which has a smaller keyword than the other, and the two parts of the records can be sorted separately to achieve the order of the whole sequence.

The fast arrangement applies the idea of partition, which is generally implemented by recursion.

The time complexity of quick sort can be O (nlogn), and it is widely used in many frameworks and data structure designs.

public void qSort(int[] nums, int start, int end){

if (start >= end) return;

int mid = partition(nums, start, end);

qSort(nums, start, mid - 1);

qSort(nums, mid + 1, end);


// 定义分区方法,把数组按一个基准划分两部分,左侧元素一定小于基准,右侧大于基准
//Define the partition method, divide the array into two parts according to a benchmark, the left element must be smaller than the benchmark, and the right is larger than the benchmark

private static int partition( int[] nums, int start, int end ){

// 以当前数组起始元素为pivot
//start with the current array element pivot

int pivot = nums[start];

int left = start;

int right = end;

while ( left < right ){

while ( left < right && nums[right] >= pivot )

right --;

nums[left] = nums[right];

while ( left < right && nums[left] <= pivot )

left ++;

nums[right] = nums[left];


nums[left] = pivot;

return left;


9.3.3 归并排序
9.3.3 Merge Sort

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。
Merge sort is an efficient sort algorithm based on merge operation. This algorithm is a very typical application of Divide and Conquer.

The ordered subsequence is merged to obtain a completely ordered sequence, that is, each subsequence is ordered first, and then the subsequence segments are ordered. If two ordered tables are merged into one ordered table, it is called a 2-way merge.

The time complexity of merge sort is O(nlogn). The cost is extra memory space.

public void mergeSort(int[] nums, int start, int end){

if (start >= end ) return;

int mid = (start + end) / 2;

mergeSort(nums, start, mid);

mergeSort(nums, mid + 1, end);

merge(nums, start, mid, mid + 1, end);


private static void merge(int[] nums, int lstart, int lend, int rstart, int rend){

int[] result = new int[rend - lstart + 1];

int left = lstart;

int right = rstart;

int i = 0;

while (left <= lend && right <= rend){

if (nums[left] <= nums[right])

result[i++] = nums[left++];


result[i++] = nums[right++];


while (left <= lend)

result[i++] = nums[left++];

while (right <= rend)

result[i++] = nums[right++];

System.arraycopy(result, 0, nums, lstart, result.length);


9.3.4 遍历二叉树
9.3.4 Traversing Binary Trees

Title: Find the following binary tree of all kinds of traversal (preorder, middle order, postorder, hierarchy)

In-order traversal: i.e. left-root-right traversal, for a given binary tree root, find its left subtree; for the root of its left subtree, then find its left subtree; recursively traverse until finding the leftmost node i, which must be a leaf, then traverse the parent node of i, and then traverse the sibling node of i. As the recursion gradually pops out of the stack, the traversal is finally completed

Precedent traversal: root-left-right traversal

Post-order traversal: left-right-root traversal

Sequence traversal: traverse all nodes layer by layer from top to bottom and from left to right.

9.3.5 二分查找
9.3.5 Binary search

Given an ordered (ascending) array of n integers numbers and a target value target, write a function to search for target in numbers, returning subscripts if the target value exists, or-1 otherwise.

二分查找也称折半查找(Binary Search),它是一种效率较高的查找方法,前提是数据结构必须先排好序,可以在对数时间复杂度内完成查找。
Binary search, also known as Binary Search, is a highly efficient search method, provided that the data structure must be sorted first and the search can be completed in logarithmic time complexity.

二分查找事实上采用的就是一种分治策略,它充分利用了元素间的次序关系,可在最坏的情况下用O(log n)完成搜索任务。
In fact, binary search is a divide-and-conquer strategy, which makes full use of the order relationship between elements and can complete the search task in O (log n) in the worst case.


* @param a 要查找的有序int数组
* @param a Ordered int array to find

* @param key 要查找的数值元素
* @param key The numeric element to find

* @return 返回找到的元素下标;如果没有找到,返回-1
* @return Returns the index of the element found; if not found, returns-1


public int binarySearch(int[] a, int key){

int low = 0;

int high = a.length - 1;

if ( key < a[low] || key > a[high] )

return -1;

while ( low <= high){

int mid = ( low + high ) / 2;

if( a[mid] < key)

low = mid + 1;

else if( a[mid] > key )

high = mid - 1;


return mid;


return -1;


9.4 小青蛙跳台阶
9.4 Frog jumping steps

A frog can jump up one step at a time or two steps at a time. How many jumps does the frog have on an n step?

9.5 最长回文子串
9.5 longest palindromic substring

Given a string s, find the longest palindrome substring in s.


输入:s = “babad”
Input: s ="bad"

Output: "bab"

Explanation: "aba" is also the answer to the question

9.6 数字字符转化成IP
9.6 Digital characters converted to IP

Title: Now there is a string containing only numbers, convert that string to the form of an IP address, and return all possible cases.


The given string is "25525511135"

返回["", ""](顺序没有关系)
Return to ["", ""](order doesn't matter)

9.7 最大公约数
9.7 greatest common denominator

9.8 链表反转
9.8 chain inversion

9.9 数组寻找峰值
9.9 Array search for peaks

10章 场景题
Chapter 10: The Last Day

10.1 手写Flink的UV
10.1 Handwritten Flink UV

10.2 Flink的分组TopN

10.3 Spark的分组TopN

Method 1:

(1) Aggregate data by key (groupByKey)

(2) Convert value to array, sort by sortBy or sortWith of scala (mapValues) The amount of data is too large, and OOM will occur.

Method 2:

1) Remove all keys

(2) Iterative over keys, taking out one key at a time and sorting using spark's sorting operator

Method 3:

(1) Custom partition, partition according to key, so that different keys enter different partitions

(2) Sort each partition using the sort operator of spark

10.4 如何快速从40亿条数据中快速判断,数据123是否存在
10.4 How to quickly determine whether data 123 exists from 4 billion data

10.5 给你100G数据,1G内存,如何排序?
10.5 Give you 100 gigabytes of data, 1 gigabyte of memory, how to sort?

10.6 公平调度器容器集中在同一个服务器上?
10.6 Fair scheduler containers centralized on the same server?

10.7 匹马赛跑,1个赛道,每次5匹进行比赛,无法对每次比赛计时,但知道每次比赛结果的先后顺序,最少赛多少次可以找出前三名?
10.7 A horse race, 1 track, 5 horses at a time, unable to time each race, but know the order of each race results, at least how many races can find the top three?

10.8 给定一个点、一条线、一个三角形、一个有向无环图,请用java面向对象的思想进行建模
10.8 Given a point, a line, a triangle, and a directed acyclic graph, model it using java object-oriented thinking.

10.9 现场出了一道sql题,让说出sql的优化,优化后效率提升了多少
10.9 A sql question was given on the spot. Let's say how much sql optimization has improved efficiency after optimization.

select 2d from t_order where 2d in (SELECT 2d from t_order_f)

对于这条 SQL 语句,可以使用内连接(INNER JOIN)来代替子查询(IN)。这通常可以提高查询性能,因为内连接在大多数数据库系统中的性能优化更为成熟。以下是优化后的 SQL 语句:
For this SQL statement, you can use INNER JOIN instead of IN. This usually improves query performance because inner joins are more mature for performance optimization in most database systems. Here is the optimized SQL statement:

SELECT t1.2d

FROM t_order t1

INNER JOIN t_order_f t2 ON t1.2d = t2.2d

After optimization, how to judge how much efficiency has been improved?

查看执行时间:执行优化前后的 SQL 语句,比较它们的执行时间。执行时间的减少表示性能得到了提升。
View execution time: execute SQL statements before and after optimization and compare their execution time. A reduction in execution time indicates improved performance.

Website of Shangda Self-research Brush Topic Website:

HQL brush module, brush points to more than 1000 points.

12章 面试说明
Chapter 12: The Last Day

12.1 面试过程最关键的是什么?
12.1 What is the most important part of the interview process?

(1) Talk openly and relax

(2) Reflect advantages and avoid disadvantages

12.2 面试时该怎么说?
12.2 What should I say during the interview?

1) The language is clear.

(1) Clear thinking logic and smooth expression

(2) Level 1, 2, 3

2) The content is not wrong

(1) Do not speak ill of your former employer or yourself

(2) Say what you are good at

(3) In essence, for the examiner, the content has been heard, which is self-affirmation; if it has not been heard, it is a learning process.

12.3 面试技巧
12.3 interviewing skills/techniques

12.3.1 六个常见问题
12.3.1 Six Frequently Asked Questions

1) What are your strengths?

Boldly state your strengths and strengths in all aspects

2) What are your weaknesses?

Don't talk about your real problems; set your strengths against your weaknesses.

3) What is your reason for leaving?

Don't speak ill of your former employer, even if you've been hurt

reasonable and legal

Don't say more than one reason.

4) What are your salary expectations?

Not talking about salary until the end

Just say intervals, not numbers.

The bottom line is no less than the current salary

No specific number required, middle of range, or +20% of current salary

5) Do you have any other questions?

This is a question of personal vision and hierarchy.

The question itself is not what answer the interviewer wants, but how you compare yourself to other candidates

Standard answer:

What kind of problems does the company expect me to solve within 3-6 months after I join the company

What is the future strategic plan for the company (or for the department)?

Given what you know about me now, how long do you think it will take me to blend in?

6) How soon can you start?

A week or so, if the company needs it, it can be advanced appropriately.

12.3.2 两个注意事项
12.3.2 Two considerations

1) Professional language

2) Professional image

12.3.3 自我介绍
12.3.3 Introduction

1) Basic personal information

2) Work experience

Time, company name, position, main job content, job performance, reason for resignation.