以技术洞察驱动商业进化

Prosperlytics Lab · 实验记录 004

Kafka PRO & CUNSUMER 信息互传与PYTHON交互 Date Posted…

Kafka PRO & CUNSUMER 信息互传与PYTHON交互

Date Posted:2026-01-03
Creator:Rex(Prosperlytics)

实验记录:Kafka消费端和生产端消息互传,使用python在vscode中扮演生产者角色

Kafka 在本地分布式实验室中的部署踩坑总结

本文记录我在 Prosperlytics Distributed Lab 中,基于 Ubuntu Server + Docker 部署 Kafka 实现信息互传的的全过程,以及过程中遇到的真实问题与解决思路。


一、实验背景(Background)

在完成以下基础设施之后:

  • ✅ 多台 mini PC(5-node)
  • ✅ Ubuntu Server 安装
  • ✅ hostname / hosts / SSH key 登录
  • ✅ Docker & containerd 正常运行
  • ✅ Java 17(OpenJDK)统一版本

我开始进入 streaming layer 的搭建阶段,也就是:

  • Kafka:事件流(event stream)中枢
  • Flink:实时计算 / 状态处理引擎

目标是后续构建一个 quant + risk streaming system,而不是单机脚本。


二、Kafka 安装阶段的主要坑

❌ 坑 1:python 做为生产者发送消息给搭建好的topic,Kafka出现报错!

rexwang@m1Air prosperlytics % /Users/rexwang/git/prosperlytics/.venvProsperlytics/bin/python /Users/rexwang/git/prosperlytics/kafka_test.py Traceback (most recent call last): File "/Users/rexwang/git/prosperlytics/kafka_test.py", line 3, in <module> producer = KafkaProducer(bootstrap_servers='localhost:9092') File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/producer/kafka.py", line 485, in __init__ client = self.config['kafka_client']( metrics=self._metrics, metric_group_prefix='producer', wakeup_timeout_ms=self.config['max_block_ms'], **self.config) File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/client_async.py", line 262, in __init__ self.config['api_version'] = self.check_version() ~~~~~~~~~~~~~~~~~~^^ File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/client_async.py", line 1076, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable rexwang@m1Air prosperlytics % source /Users/rexwang/git/prosperlytics/.venvProsperlytics/bin/activate

主要原因为,KAFKA的配置问题
下面的这个部分呢来自于你在代替Kafka定义身份的文件里,这个文件其实帮助我们定义kafka的一些自我介绍,比如说对外地址,其实node1:9092是错的,是因为我的node1的主机的名字其实是note1,我自己呢给每个节点做网络配置的时候又写了node1,这导致kafka对外网络其实是错的,导致的报错!
后来讲这个部分的代码改去直接的IP地址就好了:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://1xx.xxx.xxx.xx:9092

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://node1:9092

四.docker-compose.yml 文件是你分布式风控系统的骨架,这个文件有什么?

1.Zookeeper 是 Kafka 的“管家”,负责管理集群元数据和选举。

  • image: confluentinc/cp-zookeeper:7.6.1: 使用 Confluent 官方维护的镜像。Confluent 是 Kafka 创始人开的公司,版本 7.6.1 非常稳定。
  • container_name: zk: 给容器取个短名字。在 Docker 网络里,Kafka 找 Zookeeper 只需要访问 zk:2181
  • ZOOKEEPER_CLIENT_PORT: 2181: Zookeeper 对外服务的标准端口。
  • ZOOKEEPER_TICK_TIME: 2000: Zookeeper 的基本时间单位(2秒)。心跳检测和会话超时都以此为基准。

2. Services: Kafka (The Backbone)

这是风控系统最核心的数据总线。

  • depends_on: - zookeeper: 启动顺序策略。告诉 Docker:“先启动管家,管家活了再启动 Kafka”。
  • KAFKA_BROKER_ID: 1: 节点 ID。在分布式集群中,每个节点必须有唯一的 ID。
  • KAFKA_ZOOKEEPER_CONNECT: zk:2181: 注册地址。Kafka 启动后会去 zk 容器的 2181 端口“报到”。

3. Networking: 核心难点 (Listeners)

这几行是你最容易踩坑、也是最核心的配置,它们决定了数据能不能流转起来。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT: 定义安全协议映射。因为你是内网测试,所以直接用 PLAINTEXT(明文传输),不加密。
  • KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092: 监听地址。告诉 Kafka:“在容器内部,请监听所有网卡上的 9092 端口”。
  • KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.100.38:9092: 广播地址(非常关键!)。这是 Kafka 告诉外界(比如你的 Flink 节点):“如果你想找我,请访问这个 IP”。Strategic Insight: 因为你的 Flink 可能跑在另一台 Ubuntu 机器上,如果这里写 localhost,Flink 就会在它自己家里找 Kafka,导致连接失败。写 192.168.100.38 确保了跨机器的通信。

4. Stability: 容错配置 (Replication)

因为你目前是单机版 Kafka(只有一个 Broker),所以这些必须设置为 1

  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1: 偏移量主题的副本数。
  • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1: 事务日志副本数。
  • KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1: 最小同步副本数。Note: 默认通常是 3。如果你不改成 1,Kafka 在单机运行时会因为“副本数不足”而报错,导致无法处理数据。

💡 你需要注意的潜在风险 (Architectural Review)

  1. 没有 Persistence (持久化): 你目前的代码里没有 volumes。这意味着你一旦 docker-compose down,所有的风控数据、Topic 都会被抹除。 建议: 如果想让数据留下来,需要加上类似这样的配置:
  2. YAMLvolumes: - /your/path/data:/var/lib/kafka/data
  3. Flink 连通性: 你在 Flink 程序里配置 Kafka Source 时,记得服务器地址填 1xx.xxx.xxx.xx:9092
+

留下评论