Apache Kafka 是一个开源的分布式事件流平台,被众多公司用于高性能数据管道、流分析、数据集成等。本章将讨论 kafka 的自动化部署。
什么是事件流: “事件流” 是按时间排序的一系列业务事件。事件流处理平台实时从事件源 (比如数据库、移动设备、传感器、应用程序等) 捕获数据,持久化存储这些事件流以供检索、处理和响应事件流,并根据需要将事件流路由到不同的目标技术。因此,事件流可确保数据的连续性、正确解释事件,从而使正确的信息在正确的时间出现在正确的位置。
本章包含以下主题:
Kafka 是一个分布式系统,由服务端和客户端组成,通过 TCP 网络协议通信。它可以部署在物理机、虚拟机或者容器上。
Servers:Kafka 以一个或多个 Servers 集群的形式运行,可以跨多个数据中心或云区域。提供存储层的 Servers 被成为 Brokers。其他 Servers 运行 Kafka Connect,以事件流的形式持续导入和导出数据,将 Kafka 与现有的系统 (如关系型数据库) 以及其他 Kafka 集群集成。Kafka 集群具有高度的可扩展性和容错性,如果任何一个 Server 故障,其他 Server 将接管工作,以确保连续运行,没有任何数据损失。
Clients:通过 Client 可以编写分布式应用和微服务,以并行、大规模的方式读取、写入和处理事件流,即使在网络问题或机器故障的情况下也能容错。
36.1.1 主要概念和术语
Event (事件) 记录了真实 “发生的事情”,在文档中也被称为记录或消息。当向 Kafka 读写数据时,是以 event 的形式进行的。从概念上讲,Event 包含键、值、时间戳和可选的元数据头。如示例 36.1。
示例 36.1,事件示例:
Event key: "Official accounts" Event value: "sretech" Event timestamp: "Dec. 24, 2022 at 2:06 p.m."
Producers (生产者) 是向 Kafka 发布 (写) events 的客户端应用程序。Consumers 是订阅 (读和处理) events 的应用程序。在 Kafka 中,Producers 和 consumers 完全解耦,互不相干,这是实现 Kafka 高度可扩展性的一个关键设计因素。例如,Producers 从来不需要等待 consumers。
Topics (主题) 用于分类存储 events。可以将 topics 想象成文件夹,而 events 就是文件夹中的文件。Kafka 中的 topics 可以多 producers 和多 consumers:一个 topic 可以有零个、一个或多个 producers 向其写入事件,也可以有零个、一个或多个 consumers 订阅这些事件。一个主题中的事件可以根据需要反复读取——与传统的消息传递系统不同,事件在消费后不会被删除。可以通过每个主题的配置设置来定义 Kafka 应该保留事件多长时间,之后旧的事件会被丢弃。Kafka 的性能相对于数据大小来说是有效恒定的,所以存储数据很长时间是完全可以的。
主题是分区 (partitioned) 的,一个主题被分散到位于不同 Kafka brokers 的若干 “桶” 中。这种分布式的数据放置对可扩展性非常重要,因为它允许客户端应用程序同时从/向多个 brokers 读取和写入数据。当一个新的事件被发布到一个主题时,它实际上是被附加到主题的一个分区中。具有相同 key 的事件(例如,customer 或 vehicle ID)被写入相同的分区,Kafka 保证一个给定的主题分区的任何消费者将始终以完全相同的顺序读取该分区的事件。
图 36.1 图片来自 Kafka 官网
这个例子的主题有四个分区 P1-P4。两个不同的生产者客户端通过网络向主题的分区写入事件,向主题发布新的事件,彼此独立。具有相同密钥的事件(在图中用颜色表示)被写到同一个分区中。注意,如果合适的话,两个生产者都可以写到同一个分区。
为了使数据具有容错性和高可用性,每个主题都可以被复制,甚至跨地理区域或数据中心,这样总有多个 Broker 拥有数据的副本,以防出错。一个常见的生产设置是复制系数为3,也就是说,数据总是有三个副本。这种复制是在主题分区的层面上进行的。
36.2 部署环境介绍
完成本教程至少需要四个节点:一个节点作为 Ansible 控制节点,三个节点部署 zookeeper 集群及 Kafka 集群。
Kafka 可以基于 Zookeeper 部署,也可以基于 KRaft 协议部署。KRaft 的方式部署起来相对简单,因此本教程仅演示基于 Zookeeper 的部署方式。36.2.1 节点信息
四个节点分别为 2 核 CPU、2 GB 内存的虚拟机。
节点信息:
# 系统版本:Rocky linux release 9.1
ansible Inventory hosts:
[zookeeper] zk1.server.aiops.red zk2.server.aiops.red zk3.server.aiops.red [kafka] kafka1.server.aiops.red kafka2.server.aiops.red kafka3.server.aiops.red
36.2.2 节点要求
为使安装过程顺利进行,节点应满足以下要求。
36.2.2.1 时钟同步
Zookeeper、Kafka 集群节点时钟须保持同步。
要自动化实现时钟同步,可以参考 “Linux 9 自动化部署 NTP 服务”。
36.2.2.2 主机名解析
Ansible 控制节点能够解析 Zookeeper、Kafka 节点的主机名,通过主机名访问节点。
要实现主机名称解析,可以在 Ansible 控制节点的 /etc/hosts 文件中指定节点的 IP、主机名条目,或者参考 “Linux 9 自动化部署 DNS 服务” 一文配置 DNS 服务。
36.2.2.3 主机名解析
Ansible 控制节点可以免密登录各节点,并能够免密执行sudo。可以参考 “Linux 9 自动化部署 NTP 服务” 中的 “部署环境要求” 一节实现。
36.2.2.4 依赖的服务及组件
Zookeeper、Kafka 集群的安装依赖 JDK。JDK 的自动化安装可以参考 ”Linux 9 自动化部署 JDK“。
Zookeeper 集群的自动化安装请参考 “Linux 9 自动化部署 Zookeeper 集群”。
36.3 自动化部署 Kafka
自动化部署 Kafka 集群通过两个 Ansible Role 实现:一个用于下载、分发 Kafka 安装包,一个用于部署 Kafka 集群。
使用独立的用户运行应用程序,能够起到一定程度的隔离性与安全性。因此,首先创建运行 Kafka 服务的用户。
36.3.1 创建用户
创建用户是一个常见的操作,因此为创建用户编写一个独立的 Ansible Role。
创建 create_user Role:
ansible-galaxy role init --init-path ~/roles create_user cd ~/roles/create_user/
图 36.2 提供创建用户的角色
编辑 tasks/main.yml 文件,内容如下:
--- # tasks file for create_user - name: create user task ansible.builtin.user: name: "{{ item.0 }}" home: "{{ item.1 }}" shell: "{{ item.2 }}" with_together: - "{{ user_name }}" - "{{ user_dir }}" - "{{ user_shell }}"
在ansible.builtin.user任务中包含了三个列表类型的变量,user_name 定义创建的用户名称,user_dir 定义创建的用户家目录,user_shell 定义创建的用户 Shell。之所以使用列表类型的变量,是为了满足一次性创建多个用户的需求。可以为变量指定一个或多个值,对应创建一个或多个用户。在指定多个值时,要注意三个变量的顺序要一致。这三个变量,将在 Playbook 中提供。
36.3.2 下载并分发 Kafka 软件包
Kafka 下载地址:https://kafka.apache.org/downloads,当前稳定版是 3.3.1。
创建下载 Kafka 软件包的 Ansible Role。可以把该角色抽象为下载所有二进制 tarball,而不是只针对 Kafka:
ansible-galaxy role init --init-path ~/roles download_binary_tarball cd ~/roles/download_binary_tarball/
图 36.3 创建下载二进制包的角色
编辑 download_binary_tarball 角色的 tasks/main.yml 文件,添加下载软件包的任务,内容如下:
--- # tasks file for download_binary_tarball - name: create download and unarchive directory task ansible.builtin.file: path: "{{ item }}" state: directory with_items: "{{ unarchive_path }}" - name: download and unarchive tarball task ansible.builtin.unarchive: src: "{{ item.0 }}" dest: "{{ item.1 }}" remote_src: true extra_opts: - --strip-components=1 with_together: - "{{ package_url }}" - "{{ unarchive_path }}"
在任务的主配置文件中添加了两个任务:ansible.builtin.file任务创建存储下载软件包的目录;ansible.builtin.unarchive任务将软件包解压到ansible.builtin.file创建的对应目录中。
任务使用了两个列表类型的变量:unarchive_path 和 package_url,这样既可以解压单个软件包,也可以同时解压多个软件包。下载、解压多个软件包时,为这两个变量定义多个值,但它们之前的顺序要对应。也就是说,package_url 第一个值指定的软件包将被解压到 unarchive_path 第一个值指定的目录中。
这两个变量在具体的 Playbook 中定义,这样在部署任何二进制 tarball 时,都可以使用 download_binary_tarball 角色下载软件包。
创建 ~/playbooks/deploy_kafka/ 目录,在该目录下创建 download_kafka.yaml Playbook 文件:
mkdir ~/playbooks/deploy_kafka cd ~/playbooks/deploy_kafka vim download_kafka.yaml
图 36.4 创建下载 Kafka 的 Playbook
download_kafka.yaml 内容如下:
--- - name: download kafka binary tarball play hosts: localhost become: false gather_facts: false vars_files: - vars.yaml roles: - role: download_binary_tarball tags: download_binary_tarball - name: create user play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml roles: - role: create_user tags: create_user - name: distribute binary packages play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml tasks: - name: copy the installation file task ansible.posix.synchronize: src: "{{ unarchive_path.0 }}" dest: "{{ user_dir.0 }}" - name: set the file owner task ansible.builtin.shell: "chown {{ user_name.0 }}.{{ user_name.0 }} {{ user_dir.0 }} -R" ...
download_kafka.yaml Playbook 文件中包含了三个 Play。第一个 Play 使用 download_binary_tarball Role,将 Kafka 二进制文件下载到 localhost 主机。第二个 Play 使用 create_user Role 在托管节点上创建运行 Kafka 服务的用户。第三个 Play 中包含两个任务,作用是将 Kafka 文件分发到 Kafka 托管节点,并设置文件属主及数组为运行 Kafka 的用户。
创建 vars.yaml 变量文件,在文件中定义 unarchive_path 和 package_url 变量:
mkdir vars vim vars/vars.yaml
图 36.5 创建变量文件
vars.yaml 文件内容如下:
package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin
执行 download_kafka.yaml Playbook,下载 Kafka 软件包:
ansible-playbook download_kafka.yaml
图 36.6 下载并分发 Kafka 安装文件
查看 Kafka 安装文件:
ssh kafka1.server.aiops.red sudo tree -L 1 /opt/sretech/kafka
图 36.7 查看 Kafka 安装文件
36.3.3 部署 Kafka 集群
创建部署 Kafka 集群的 Ansible 角色:
ansible-galaxy role init --init-path ~/roles deploy_kafka cd ~/roles/deploy_kafka/
图 36.8 创建部署 Kafka 的角色
36.3.3.1 创建日志目录
为 Kafka 服务提供日志目录。编辑 tasks/main.yml 文件,添加以下任务:
- name: create logs directory task ansible.builtin.file: path: "{{ logs_dir }}" state: directory owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}"
36.3.3.2 设置 Local Facts
在 Kafka 集群配置中,需要设置 broker.id,用以区分 broker。Kafka 在启动时会在 zookeeper 中 /brokers/ids 路径下创建一个以当前 broker.id 为名称的虚节点,Kafka 的健康状态检查就依赖于此节点。当 broker 下线时,该虚节点会自动删除,其他 broker 或者客户端通过判断 /brokers/ids 路径下是否有此 broker 的 id 来确定该 broker 的健康状态。
创建三个 Facts 文件:
for bid in $(seq 0 2); do echo -e "[kafka]nbroker_id=${bid}" > broker${bid}.fact; done
图 36.9 创建 Local Facts 变量
在 Kafka 节点上创建 /etc/ansible/facts.d/ 目录:
ansible kafka -m file -a "path=/etc/ansible/facts.d state=directory" -b
将三个文件拷贝到对应节点的 /etc/ansible/facts.d/ 目录:
ansible kafka1.server.aiops.red -m copy -a "src=broker0.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka2.server.aiops.red -m copy -a "src=broker1.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka3.server.aiops.red -m copy -a "src=broker2.fact dest=/etc/ansible/facts.d/kf.fact" -b
36.3.3.3 配置文件模板
为 Kafka 服务提供配置文件模板,在 templates/ 目录下创建 server.properties.j2 文件,内容如下:
broker.id={{ ansible_local.kf.kafka.broker_id }} log.dirs={{ logs_dir }} zookeeper.connect={% for host in groups['zookeeper'] %}{{ host }}:{{ zookeeper_port }}{% if not loop.last %},{% endif %}{% endfor %}
在 tasks/main.yml 文件中新增任务,将 server.properties.j2 模板文件拷贝到托管节点:
- name: generate configuration files task ansible.builtin.template: src: server.properties.j2 dest: "{{ user_dir.0 }}/kafka/config/server.properties" owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}" mode: 0644 notify: Restart kafka service handler
ansible.builtin.template任务用于生成 Kafka 配置文件,当文件有变化时,通过notify触发 Handlers。
36.3.3.4 单元文件模板
在 templates/ 目录下创建systemd单元模板文件 kafka.service.j2,内容如下:
[Unit] Description=Kafka Wants.NETwork.target After=network.target [Service] Type=simple User={{ user_name.0 }} Group= {{ user_name.0 }} ExecStart=/bin/sh -c '{{ user_dir.0 }}/kafka/bin/kafka-server-start.sh {{ user_dir.0 }}/kafka/config/server.properties > {{ logs_dir }}/start-kafka.log 2>&1' ExecStop={{ user_dir.0 }}/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
在 tasks/main.yml 文件中新增任务,将 kafka.service.j2 模板文件拷贝到托管节点:
- name: generate systemd unit files task ansible.builtin.template: src: kafka.service.j2 dest: /usr/lib/systemd/system/kafka.service mode: 0644
36.3.3.5 开启防火墙
Kafka 启动后会监听 TCP 9092 端口。在 tasks/main.yml 文件中新增任务,为该端口开启防火墙:
- name: turn on kafka ports in the firewalld task ansible.builtin.firewalld: port: "{{ kafka_port }}/tcp" permanent: true immediate: true state: enabled
36.3.3.6 设置 JAVA_HOME
在 Kafka 的 kafka-run-class.sh 文件中添加 JAVA_HOME 变量。编辑 tasks/main.yml 文件中新增lineinfile任务:
- name: set JAVA_HOME in kafka-run-class.sh file task ansible.builtin.lineinfile: path: "{{ user_dir }}/kafka/bin/kafka-run-class.sh" line: "export JAVA_HOME=/opt/jdk19" insertafter: "/bin/bash"
36.3.3.7 启动 Kafka 服务
在 tasks/main.yml 文件中新增任务,启动 Kafka 服务,并将其设置为开机启动:
- name: started kafka service task ansible.builtin.systemd: name: kafka.service state: started enabled: true daemon_reload: true
36.3.3.8 部署 Kafka 集群
在 ~/playbooks/deploy_kafka/ 目录中创建 deploy_kafka.yaml Playbook 文件,内容如下:
--- - name: deploy kafka cluster play hosts: kafka become: true gather_facts: true vars_files: - vars.yaml roles: - role: deploy_kafka tags: deploy_kafka ...
在 vars/vars.yaml 文件中添加所需变量,最终的文件内容如下:
package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin logs_dir: /var/log/kafka kafka_port: 9092 zookeeper_port: 2181
执行 deploy_kafka.yaml Playbook 文件,自动化完成 Kafka 集群的部署:
ansible-playbook deploy_kafka.yaml
图 36.10 自动化部署 Kafka 集群
36.4 验证
本小节对 Kafka 服务进行一些操作,以验证集群是否可用。
36.4.1 创建 Topic
在 kafka1.server.aiops.red 上创建副本为 3,分区为 1,名称为 aiops 的 topic:
~/software/kafka_tools/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka1.server.aiops.red:9092 --replication-factor 3 --partitions 1 --topic aiops
图 36.11 在 kafka1.server.aiops.red 上创建名为 aiops 的 Topic
创建完成后,Topic 会同步到集群的另外两个 broker kafka2.server.aiops.red 和 kafka3.server.aiops.red 上。
36.4.2 查询 Topic
通过--list命令查看 Broker 上的可用 Topic。查询 kafka1|2|3.server.aiops.red 上的可用 Topic:
~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka2.server.aiops.red:9092 --list ~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka3.server.aiops.red:9092 --list
图 36.12 查询 Broker 上的可用 Topic
从输出看出,集群中的三个 Broker 都包含名为 aiops 的 Topic。
36.4.3 生产消息
向 kafka1.server.aiops.red 的 aiops Topic 写入消息:
~/software/kafka_tools/kafka/bin/kafka-console-producer.sh --broker-list kafka1.server.aiops.red:9092 --topic aiops
图 36.13 向 Topic 写入数据
36.4.4 消费消息
从 kafka2.server.aiops.red 的 aiops Topic 读取消息:
~/software/kafka_tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka2.server.aiops.red:9092 --topic aiops --from-beginning
图 36.14 从 Topic 读取消息
36.5 总结
Kafka 是常用的消息队列系统,本教程演示了 Kafka 集群的自动化部署及简单操作。教程同样适用于其他基于 RPM 的 Linux 发行版。
来源:魏文第