这是用户在 2024-4-10 17:27 为 https://kafka.apache.org/quickstart#quickstart_createtopic 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?


第一步:获取 KAFKA


下载最新的 Kafka 版本并解压它:

$ tar -xzf kafka_2.13-3.7.0.tgz
$ cd kafka_2.13-3.7.0


第二步:启动 KAFKA 环境


注意:您的本地环境必须安装了 Java 8 或以上版本。


Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动。要开始使用任一配置,请按照下面的其中一个部分操作,但不要两个都操作。


使用 ZooKeeper 的 Kafka


按顺序运行以下命令以正确顺序启动所有服务:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties


打开另一个终端会话并运行:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties


一旦所有服务成功启动,你将拥有一个基本的 Kafka 环境,它正在运行并准备使用。


使用 KRaft 的 Kafka


Kafka 可以使用 KRaft 模式通过本地脚本和下载的文件或 docker 镜像运行。按照下面的其中一个部分操作来启动 kafka 服务器,但不要两个都操作。


使用下载的文件


生成一个集群 UUID

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"


格式化日志目录

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties


启动 Kafka 服务器

$ bin/kafka-server-start.sh config/kraft/server.properties

使用 Docker 镜像


获取 Docker 镜像

$ docker pull apache/kafka:3.7.0


启动 Kafka Docker 容器

$ docker run -p 9092:9092 apache/kafka:3.7.0


一旦 Kafka 服务器成功启动,你就会拥有一个基本的 Kafka 环境,可以开始使用了。


第三步:创建一个主题来存储你的事件


Kafka 是一个分布式事件流平台,它允许你在多台机器上读取、写入、存储和处理事件(在文档中也称为记录或消息)。


事件示例包括支付交易、移动电话的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量数据,等等。这些事件被组织并存储在主题中。非常简单地说,一个主题类似于文件系统中的一个文件夹,而事件就是该文件夹中的文件。


所以,在你写入第一个事件之前,你必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092


Kafka 的所有命令行工具都有额外的选项:运行 kafka-topics.sh 命令而不带任何参数来显示使用信息。例如,它还可以向你展示新主题的分区数量等细节:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0


第四步:向主题中写入一些事件


Kafka 客户端通过网络与 Kafka 代理通信来写入(或读取)事件。一旦接收到,代理将以持久和容错的方式存储事件,你需要多久就存储多久——甚至是永远。


运行控制台生产者客户端,向你的主题中写入一些事件。默认情况下,你输入的每一行都会导致一个单独的事件被写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event


你可以随时使用 Ctrl-C 停止生产者客户端。


第五步:读取事件


打开另一个终端会话,并运行控制台消费者客户端来读取你刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event


你可以随时使用 Ctrl-C 停止消费者客户端。


随意尝试:例如,切换回你的生产者终端(上一步)来写入更多事件,并观察这些事件如何立即出现在你的消费者终端中。


因为事件在 Kafka 中被持久存储,所以你可以想读多少次就读多少次,想有多少消费者读就有多少消费者读。你可以通过再打开一个终端会话并重新运行之前的命令来轻松验证这一点。


第 6 步:使用 Kafka Connect 将你的数据作为事件流导入/导出


你可能在现有系统中拥有大量数据,比如关系型数据库或传统的消息系统,同时有许多应用程序已经使用这些系统。Kafka Connect 允许你持续地从外部系统中导入数据到 Kafka,反之亦然。它是一个可扩展的工具,运行连接器,这些连接器实现了与外部系统交互的自定义逻辑。因此,将现有系统与 Kafka 集成非常容易。为了使这个过程更加简单,已经有数百个这样的连接器可以直接使用。


在这个快速入门中,我们将看到如何使用简单的连接器在 Kafka Connect 中运行,这些连接器可以将数据从文件导入到 Kafka 主题,也可以将数据从 Kafka 主题导出到文件。


首先,确保在 Connect 工作器的配置中添加 connect-file-3.7.0.jarplugin.path 属性。为了这个快速入门,我们将使用相对路径,并将连接器包视为一个超级 jar 包,这在从安装目录运行快速入门命令时是有效的。然而,值得注意的是,对于生产部署来说,使用绝对路径总是更好的。有关如何设置此配置的详细说明,请参见 plugin.path。


编辑 config/connect-standalone.properties 文件,添加或更改 plugin.path 配置属性以匹配以下内容,并保存文件:

> echo "plugin.path=libs/connect-file-3.7.0.jar"


然后,开始创建一些种子数据进行测试:

> echo -e "foo\nbar" > test.txt

或者在 Windows 上:
> echo foo> test.txt
> echo bar>> test.txt


接下来,我们将启动两个在独立模式下运行的连接器,这意味着它们在一个单独的、本地的、专用的进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含诸如连接到的 Kafka 代理和数据序列化格式等公共配置。其余的配置文件各自指定一个要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类,以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties


这些随 Kafka 一起提供的示例配置文件,使用你之前启动的默认本地集群配置,并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每一行生产到一个 Kafka 主题;第二个是接收连接器,它从一个 Kafka 主题读取消息并将每条消息作为一行写入输出文件。


在启动过程中,你会看到许多日志消息,包括一些表明连接器正在被实例化的消息。一旦 Kafka Connect 进程启动,源连接器应该开始从 test.txt 读取行并将它们生产到主题 connect-test ,接收连接器应该开始从主题 connect-test 读取消息并将它们写入文件 test.sink.txt 。我们可以通过检查输出文件的内容来验证数据是否已经通过整个管道传输:

> more test.sink.txt
foo
bar


请注意,数据被存储在 Kafka 主题 connect-test 中,所以我们也可以运行一个控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...


连接器继续处理数据,所以我们可以向文件中添加数据,并看到它通过管道移动:

> echo Another line>> test.txt


你应该会在控制台消费者输出和接收文件中看到这行文字。


第七步:用 Kafka Streams 处理你的事件


一旦你的数据以事件的形式存储在 Kafka 中,你就可以使用 Kafka Streams 客户端库(适用于 Java/Scala)来处理这些数据。它允许你实现关键的实时应用程序和微服务,这些应用程序和微服务的输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 结合了在客户端编写和部署标准 Java 和 Scala 应用程序的简便性,以及 Kafka 服务器端集群技术的优势,使这些应用程序具有高可扩展性、弹性、容错性和分布式特点。该库支持精确一次处理、有状态操作和聚合、窗口、连接、基于事件时间的处理等等。


为了让你初尝滋味,这里是实现流行的 WordCount 算法的方法:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));


Kafka Streams 演示和应用程序开发教程展示了如何从头到尾编码和运行这样的流应用程序。


第八步:终止 Kafka 环境


现在你已经完成了快速入门的最后一步,随意拆除 Kafka 环境或继续玩耍。


  1. 如果你还没有这么做,用 Ctrl-C 停止生产者和消费者客户端。

  2. Ctrl-C 停止 Kafka 代理。

  3. 最后,如果你遵循了 Kafka 与 ZooKeeper 部分,用 Ctrl-C 停止 ZooKeeper 服务器。


如果你还想删除本地 Kafka 环境的任何数据,包括你一路上创建的任何事件,请运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs


恭喜!


你已经成功完成了 Apache Kafka 快速入门。


要了解更多信息,我们建议你采取以下下一步:


  • 阅读简介以了解 Kafka 的高层次工作原理、主要概念以及它与其他技术的比较。要更详细地了解 Kafka,请前往文档部分。

  • 浏览使用案例,学习我们全球社区中的其他用户如何从 Kafka 中获得价值。

  • 加入当地的 Kafka 聚会小组,并观看 Kafka 峰会的演讲,这是 Kafka 社区的主要会议。