网站建设1993seo,钦州浦北网站建设,小程序制作开发加盟,动画设计素材背景介绍
Kafka是一个分布式流处理平台#xff0c;可以处理大规模数据流并支持实时数据流的处理。
本文介绍了如何在WSL下使用Docker搭建Kafka容器#xff0c;并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时#xff0c;还将该服务整合到一个整体的d…背景介绍
Kafka是一个分布式流处理平台可以处理大规模数据流并支持实时数据流的处理。
本文介绍了如何在WSL下使用Docker搭建Kafka容器并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时还将该服务整合到一个整体的docker-compose中。文章详细介绍了Docker网络、Kafka环境变量配置、Python连接Kafka的方法以及API的开发。
实验环境
WSL2 Ubuntu18.04 | Docker
⚙️容器配置与搭建
镜像选择
bitnami/kafka镜像 Bitnami是一个提供开发、部署和管理应用程序的软件公司。Bitnami提供了Kafka的Docker镜像并有非常详细的文档。我们将使用这个镜像来搭建Kafka容器。
由于Kafka需要Zookeeper支持我们可以通过docker-compose来快速组合多个容器。
按照官方的文档我们可以通过如下的配置快速搭建一个KafkaZookeeper的中间件
# docker-compose.ymlversion: 3networks:app-tier:driver: bridgeservices:zookeeper:restart: alwaysimage: bitnami/zookeeper:latestnetworks:- app-tierports:- 2181:2181environment:- ALLOW_ANONYMOUS_LOGINyeskafka:restart: alwaysimage: bitnami/kafka:latestnetworks:- app-tierports:- 9092:9092- 9093:9093environment:- KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper:2181- ALLOW_PLAINTEXT_LISTENERyes- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPCLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_LISTENERSCLIENT://:9092,EXTERNAL://:9093- KAFKA_CFG_ADVERTISED_LISTENERSCLIENT://kafka:9092,EXTERNAL://localhost:9093- KAFKA_CFG_INTER_BROKER_LISTENER_NAMECLIENTdepends_on:- zookeeper在network部分我们一个定义了一个 Docker 网络名称为 “app-tier”驱动程序为 “bridge”。 在 Docker 中网络是一种虚拟网络使容器之间可以进行通信。 “bridge” 驱动程序是 Docker 网络的默认驱动程序它在单个 Docker 主机内创建一个内部网络允许容器使用其 IP 地址相互通信。通过定义名称为 “app-tier”驱动程序为 “bridge” 的网络连接到该网络的任何容器都将能够使用其在网络内的 IP 地址相互通信。这可以用于创建微服务架构或其他分布式系统其中多个容器需要彼此通信。 在Kafka的部分我们设置了 Kafka 的环境变量包括 Zookeeper 地址、允许明文监听器、监听器安全协议映射、监听器和广告监听器等。 注意当前的配置将允许明文监听Kafka这在实际生产环境中是不被允许的我们在此为了便于开发环境的测试允许直接监听。 剩余的环境配置可以在官网详细查询 容器使用
使用如下命令启动容器
docker-compose up使用如下命令进入容器
sudo docker ps
sudo docker exec -it xx bash注其中xx是ps命令得到的Kafka容器的id前两位
进入容器后我们可以通过如下命令开启生产者和消费者
// 确保你在/opt/bitnami/kafka目录下// 创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test// 启动生产者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning// 启动消费者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test你可以打开两个终端分别开启一个生产者和消费者。如果环境运行正常就可以在消费者的端口同步查看到生产的输入。
基于此我们快速地搭建了一个Kafka的开发环境并且成功地启动了生产者和消费者。在实际的开发中我们可以使用这个环境来进行Kafka相关的开发和测试工作。同时在生产环境中我们需要根据实际情况来进行更加严格和安全的配置以确保Kafka的安全和可靠性。 API构建与测试
除了命令行我们还可以使用第三方框架实现对Kafka的连接和操作。下面用Python连接kafka并搭建一个简单的API供调试。
安装Python依赖
kafka-python
fastapi
uvicorn
pydantic使用懒汉式的单例模式构建一个用于连接Kafka的上下文对象
# kafkaContext.pyimport random
from kafka import KafkaProducer, KafkaClient
import timedef _get_kafka_producer_connection(host, port) - KafkaProducer:while True:try:producer KafkaProducer(bootstrap_serversf{host}:{port})breakexcept Exception as e:print(producer failed to connect, retrying, e)time.sleep(5)print(producer connected, producer)return producerdef _get_kafka_client_connection(host, port) - KafkaClient:while True:try:client KafkaClient(bootstrap_serversf{host}:{port})breakexcept Exception as e:print(client failed to connect, retrying, e)time.sleep(5)print(client connected, client)return clientclass KafkaContext:# 构建一个单例模式的producerdef __init__(self, hostkafka, port9092):self.__host hostself.__port portself.__client Noneself.__producer Nonedef __connect_client(self):self.__client _get_kafka_client_connection(self.__host, self.__port)def __connect_producer(self):self.__producer _get_kafka_producer_connection(self.__host, self.__port)def is_client_connected(self) - bool:if self.__client is None:return Falsereturn self.__client.bootstrap_connected()def is_producer_connected(self) - bool:return self.__producer is not Nonedef add_topic(self, topic: str):if self.__client is None:self.__connect_client()self.__client.add_topic(topic)def send_msg(self, topic: str, msg: str) - str:if self.__producer is None:self.__connect_producer()# convert msg to bytesmsg_bytes bytes(msg, encodingutf-8)self.__producer.send(topic, msg_bytes)return msgkafkaContext KafkaContext()注意到我们使用了kafka 的解析去连接Kafka是因为我们后续要将该服务放在一个Docker容器内并且接入到和上文提到的Kafka模块的网络中
使用FastAPI构建一个API提供基本的状态检测、增加topic和生产消息的接口
# api.pyfrom datetime import datetime
from pydantic import BaseModelfrom KafkaContext import kafkaContext
from fastapi import FastAPIapp FastAPI(titlekafka-server, descriptionkafka-server, version0.1.0)class Message(BaseModel): # 继承了BaseModel定义了People的数据格式topic: strmsg: strapp.get(/)
def read_root():return {time: datetime.now(), status: ok}app.get(/health/client)
def health_client():return {data: kafkaContext.is_client_connected()}app.get(/health/producer)
def health_producer():return {data: kafkaContext.is_producer_connected()}app.get(/producer/add_topic/{topic})
def add_topic(topic: str):kafkaContext.add_topic(topic)return {status: ok,data: topic}app.post(/producer/send_msg)
def send_msg(message: Message):return {status: ok,data: kafkaContext.send_msg(message.topic, message.msg)}如果希望开发更多的接口可以阅读官方的文档 kafka-python · PyPI
在命令行输入命令启动服务
uvicorn api:app --host 0.0.0.0 --port 8000 --reload打开浏览器访问http://your_wsl_ip:8000/docs即可看到接口文档如果Kafka的消费者还在运行则可以尝试接口是否运行正常。 至此我们已经实现了用Docker搭建一个Kafka模块并使用FastAPI结合python-kafka实现了接口的开发。 容器整合
最后我们将该服务整合到一个整体的docker-compose中
首先我们先构建后端API的镜像该镜像使用了Python的环境。
FROM python
LABEL authorchene2000
ENV PYTHONIOENCODINGutf-8RUN mkdir -p /app
WORKDIR /app
COPY requirements.txt /app
RUN pip3 install -r requirements.txt -i https://pypi.doubanio.com/simple --trusted-host pypi.doubanio.comCOPY . /appCMD bash start-server.sh在docker-compose.yml中追加一个server的服务
server:restart: always# image: kafka-servercontainer_name: kafka-serverbuild:dockerfile: Dockerfilecontext: ./server/networks:- app-tierports:- 8002:8000depends_on:- zookeeper- kafka注意我们采用了如下的项目结构
├── docker-compose.yml
└── server├── api.py├── Dockerfile├── KafkaContext.py├── requirements.txt└── start-server.sh在docker-compose.yml的build处我们配置了Docker容器构建时的目录位置和构建所用的Dockerfile配置。
回到根目录运行如下命令
# 构建容器
sudo docker compose build# 启动容器
sudo docker compose up
# 启动容器后台运行
sudo docker compose up -d构建完毕 ️ 小结
在实际生产环境中应注意Kafka的安全性和可靠性。在使用第三方框架连接Kafka时需要使用Kafka的解析进行连接。最后将Kafka模块整合到docker-compose中方便进行部署和使用。
项目代码 https://github.com/ChenE2000/thesis-kafka-server