Kafka PRO & CUNSUMER 信息互传与PYTHON交互
Date Posted:2026-01-03
Creator:Rex(Prosperlytics)
实验记录:Kafka消费端和生产端消息互传,使用python在vscode中扮演生产者角色
Kafka 在本地分布式实验室中的部署踩坑总结
本文记录我在 Prosperlytics Distributed Lab 中,基于 Ubuntu Server + Docker 部署 Kafka 实现信息互传的的全过程,以及过程中遇到的真实问题与解决思路。
一、实验背景(Background)
在完成以下基础设施之后:
- ✅ 多台 mini PC(5-node)
- ✅ Ubuntu Server 安装
- ✅ hostname / hosts / SSH key 登录
- ✅ Docker & containerd 正常运行
- ✅ Java 17(OpenJDK)统一版本
我开始进入 streaming layer 的搭建阶段,也就是:
- Kafka:事件流(event stream)中枢
- Flink:实时计算 / 状态处理引擎
目标是后续构建一个 quant + risk streaming system,而不是单机脚本。
二、Kafka 安装阶段的主要坑
❌ 坑 1:python 做为生产者发送消息给搭建好的topic,Kafka出现报错!
rexwang@m1Air prosperlytics % /Users/rexwang/git/prosperlytics/.venvProsperlytics/bin/python /Users/rexwang/git/prosperlytics/kafka_test.py Traceback (most recent call last): File "/Users/rexwang/git/prosperlytics/kafka_test.py", line 3, in <module> producer = KafkaProducer(bootstrap_servers='localhost:9092') File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/producer/kafka.py", line 485, in __init__ client = self.config['kafka_client']( metrics=self._metrics, metric_group_prefix='producer', wakeup_timeout_ms=self.config['max_block_ms'], **self.config) File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/client_async.py", line 262, in __init__ self.config['api_version'] = self.check_version() ~~~~~~~~~~~~~~~~~~^^ File "/Users/rexwang/git/prosperlytics/.venvProsperlytics/lib/python3.13/site-packages/kafka/client_async.py", line 1076, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable rexwang@m1Air prosperlytics % source /Users/rexwang/git/prosperlytics/.venvProsperlytics/bin/activate
主要原因为,KAFKA的配置问题
下面的这个部分呢来自于你在代替Kafka定义身份的文件里,这个文件其实帮助我们定义kafka的一些自我介绍,比如说对外地址,其实node1:9092是错的,是因为我的node1的主机的名字其实是note1,我自己呢给每个节点做网络配置的时候又写了node1,这导致kafka对外网络其实是错的,导致的报错!
后来讲这个部分的代码改去直接的IP地址就好了:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://1xx.xxx.xxx.xx:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://node1:9092
四.docker-compose.yml 文件是你分布式风控系统的骨架,这个文件有什么?
1.Zookeeper 是 Kafka 的“管家”,负责管理集群元数据和选举。
image: confluentinc/cp-zookeeper:7.6.1: 使用 Confluent 官方维护的镜像。Confluent 是 Kafka 创始人开的公司,版本7.6.1非常稳定。container_name: zk: 给容器取个短名字。在 Docker 网络里,Kafka 找 Zookeeper 只需要访问zk:2181。ZOOKEEPER_CLIENT_PORT: 2181: Zookeeper 对外服务的标准端口。ZOOKEEPER_TICK_TIME: 2000: Zookeeper 的基本时间单位(2秒)。心跳检测和会话超时都以此为基准。
2. Services: Kafka (The Backbone)
这是风控系统最核心的数据总线。
depends_on: - zookeeper: 启动顺序策略。告诉 Docker:“先启动管家,管家活了再启动 Kafka”。KAFKA_BROKER_ID: 1: 节点 ID。在分布式集群中,每个节点必须有唯一的 ID。KAFKA_ZOOKEEPER_CONNECT: zk:2181: 注册地址。Kafka 启动后会去zk容器的2181端口“报到”。
3. Networking: 核心难点 (Listeners)
这几行是你最容易踩坑、也是最核心的配置,它们决定了数据能不能流转起来。
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT: 定义安全协议映射。因为你是内网测试,所以直接用PLAINTEXT(明文传输),不加密。KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092: 监听地址。告诉 Kafka:“在容器内部,请监听所有网卡上的 9092 端口”。KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.100.38:9092: 广播地址(非常关键!)。这是 Kafka 告诉外界(比如你的 Flink 节点):“如果你想找我,请访问这个 IP”。Strategic Insight: 因为你的 Flink 可能跑在另一台 Ubuntu 机器上,如果这里写localhost,Flink 就会在它自己家里找 Kafka,导致连接失败。写192.168.100.38确保了跨机器的通信。
4. Stability: 容错配置 (Replication)
因为你目前是单机版 Kafka(只有一个 Broker),所以这些必须设置为 1。
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1: 偏移量主题的副本数。KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1: 事务日志副本数。KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1: 最小同步副本数。Note: 默认通常是 3。如果你不改成 1,Kafka 在单机运行时会因为“副本数不足”而报错,导致无法处理数据。
💡 你需要注意的潜在风险 (Architectural Review)
- 没有 Persistence (持久化): 你目前的代码里没有
volumes。这意味着你一旦docker-compose down,所有的风控数据、Topic 都会被抹除。 建议: 如果想让数据留下来,需要加上类似这样的配置: - YAML
volumes: - /your/path/data:/var/lib/kafka/data - Flink 连通性: 你在 Flink 程序里配置 Kafka Source 时,记得服务器地址填
1xx.xxx.xxx.xx:9092。
留下评论