当前位置: 首页 > news >正文

坂田网站建设公司logo设计公司深圳

坂田网站建设公司,logo设计公司深圳,潜江做网站的公司有哪些,做网站用到什么开发语言一、引言 在大数据和消息队列领域#xff0c;Apache Kafka 无疑是一颗璀璨的明星。作为一个分布式流处理平台#xff0c;Kafka 以其高吞吐量、低延迟、可扩展性和容错性等优势#xff0c;被广泛应用于日志收集、实时数据处理、流计算、数据集成等众多场景。无论是大型互联网…一、引言 在大数据和消息队列领域Apache Kafka 无疑是一颗璀璨的明星。作为一个分布式流处理平台Kafka 以其高吞吐量、低延迟、可扩展性和容错性等优势被广泛应用于日志收集、实时数据处理、流计算、数据集成等众多场景。无论是大型互联网公司还是新兴的创业企业Kafka 都在其技术栈中扮演着至关重要的角色助力企业实现高效的数据处理和业务发展。​ 对于开发者而言深入了解 Kafka 的最佳方式之一就是搭建本地环境进行学习和实践。通过搭建本地环境我们可以在自己的电脑上模拟 Kafka 集群熟悉 Kafka 的各种操作和功能掌握如何使用 Kafka 客户端进行消息的发送和消费为在实际项目中应用 Kafka 奠定坚实的基础。 二、Docker 部署 Kafka ​ 2.1 单节点部署​ 使用 Docker 部署 Kafka 单节点非常简单以下是具体步骤​ 安装 Docker如果你的系统尚未安装 Docker请根据你使用的操作系统如 Linux、Windows、macOS按照Docker 官方文档的指引进行安装。Docker 是一个开源的应用容器引擎它允许我们将应用及其依赖打包到一个可移植的容器中然后发布到任何支持 Docker 的环境中运行这极大地简化了 Kafka 的部署过程。​创建 Docker 容器安装完成后在终端或命令提示符中执行以下命令创建一个名为kafka的 Docker 容器 docker run -d --name kafka \ -p 9092:9092 \ -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1 \ confluentinc/cp-kafka:latest 上述命令中​ -d表示以后台守护进程的方式运行容器​--name kafka为容器命名为kafka方便后续管理和操作​-p 9092:9092将容器内部的 9092 端口映射到主机的 9092 端口这样我们就可以通过主机的 9092 端口访问 Kafka 服务​-e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092设置 Kafka 对外发布的监听地址这里设置为PLAINTEXT://localhost:9092表示 Kafka 将以明文协议在本地的 9092 端口监听客户端连接​-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1设置__consumer_offsets主题的副本因子为 1__consumer_offsets主题用于存储消费者的偏移量信息​confluentinc/cp-kafka:latest指定使用 Confluent 提供的官方 Kafka 镜像并且是最新版本该镜像已经预先配置好了 Kafka 的环境使用起来非常方便。 测试 Kafka容器创建成功后我们可以使用 Kafka 自带的命令行工具来测试 Kafka 是否正常工作。例如创建一个名为test的主题 docker exec -it kafka kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 这里使用docker exec命令进入正在运行的kafka容器并执行kafka-topics.sh脚本创建主题。--bootstrap-server localhost:9092指定 Kafka 服务器的地址和端口--replication-factor 1表示副本因子为 1--partitions 1表示分区数为 1--topic test指定要创建的主题名为test。 2.2 多节点部署​ 部署 Kafka 多节点集群可以提高系统的可靠性和性能以下是使用 Docker Compose 部署一个简单的三节点 Kafka 集群的步骤​ 前期准备确保你已经安装了 Docker 和 Docker Compose。Docker Compose 是一个用于定义和运行多容器 Docker 应用程序的工具它使用 YAML 文件来配置应用程序的服务、网络和卷等通过一条命令就可以启动或停止整个应用程序。​创建网络为了让 Kafka 容器之间能够相互通信我们先创建一个 Docker 网络 docker network create kafka-net 这个命令创建了一个名为kafka-net的 Docker 网络后续我们的 Kafka 容器都将加入到这个网络中使得它们可以通过容器名相互访问。 编写 docker-compose.yml 文件在一个空目录下创建一个名为docker-compose.yml的文件并编辑内容如下 version: 3 services:zookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperports:- 2181:2181environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000networks:- kafka-netkafka1:image: confluentinc/cp-kafka:latestcontainer_name: kafka1ports:- 9092:9092environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_LOG_DIRS: /var/lib/kafka/datadepends_on:- zookeepernetworks:- kafka-netkafka2:image: confluentinc/cp-kafka:latestcontainer_name: kafka2ports:- 9093:9093environment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093KAFKA_LOG_DIRS: /var/lib/kafka/datadepends_on:- zookeepernetworks:- kafka-netkafka3:image: confluentinc/cp-kafka:latestcontainer_name: kafka3ports:- 9094:9094environment:KAFKA_BROKER_ID: 3KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094KAFKA_LOG_DIRS: /var/lib/kafka/datadepends_on:- zookeepernetworks:- kafka-netnetworks:kafka-net:external: true 在这个docker-compose.yml文件中​ 首先定义了一个zookeeper服务使用confluentinc/cp-zookeeper:latest镜像将容器的 2181 端口映射到主机的 2181 端口并设置了 Zookeeper 的客户端端口和心跳时间同时将其加入到kafka-net网络中。Kafka 依赖 Zookeeper 来管理集群的元数据、控制器选举等重要功能所以 Zookeeper 是 Kafka 集群不可或缺的一部分。​接着定义了三个kafka服务分别命名为kafka1、kafka2和kafka3它们都使用confluentinc/cp-kafka:latest镜像并将各自的端口映射到主机的不同端口9092、9093、9094。每个 Kafka 服务通过KAFKA_BROKER_ID来指定唯一的 ID通过KAFKA_ZOOKEEPER_CONNECT连接到 Zookeeper 服务通过KAFKA_ADVERTISED_LISTENERS指定对外发布的监听地址使用容器名方便容器间通信通过KAFKA_LISTENERS指定容器内部监听的地址通过KAFKA_LOG_DIRS指定日志存储目录。并且它们都依赖于zookeeper服务确保 Zookeeper 先启动然后 Kafka 服务再启动。最后将这三个 Kafka 服务也加入到kafka-net网络中。 启动集群在包含docker-compose.yml文件的目录下执行以下命令启动 Kafka 集群 docker-compose up -d -d参数表示以后台守护进程的方式启动所有服务。执行该命令后Docker Compose 会根据docker-compose.yml文件的配置依次启动 Zookeeper 和三个 Kafka 节点稍等片刻一个三节点的 Kafka 集群就部署完成了。你可以使用docker-compose ps命令查看所有服务的运行状态确保所有容器都处于运行状态。 三、Java 环境配置Kafka 客户端依赖​ 在 Java 项目中使用 Kafka首先需要引入 Kafka 客户端依赖。org.apache.kafka:kafka-clients是 Kafka 官方提供的 Java 客户端库它提供了一系列用于与 Kafka 集群进行交互的 API包括生产者Producer、消费者Consumer和管理客户端AdminClient等。通过引入这个依赖我们可以在 Java 代码中方便地实现消息的发送、接收以及对 Kafka 集群的管理操作如创建主题、删除主题、查询集群元数据等。​ 如果你使用的是 Maven 项目在pom.xml文件中添加如下依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.4.0/version !-- 可根据实际情况修改版本号 -- /dependency 如果你使用的是 Gradle 项目在build.gradle文件中添加如下依赖 implementation org.apache.kafka:kafka-clients:3.4.0 // 可根据实际情况修改版本号 添加依赖后Maven 或 Gradle 会自动下载kafka-clients及其相关的依赖包到本地仓库并将其添加到项目的类路径中以便我们在代码中使用。 四、基础命令实战​ 在掌握了 Kafka 的本地环境搭建和 Java 环境配置后接下来我们通过实战来深入了解 Kafka 的基本操作。我们将分别从 Kafka CLI 和 Java 代码两个版本来演示如何创建 Topic、发送消息和消费消息。​ 4.1 Kafka CLI 版本​ Kafka 提供了一系列命令行工具方便我们对 Kafka 集群进行操作和管理。这些工具位于 Kafka 安装目录的bin目录下通过命令行执行这些工具并传入相应的参数就可以完成各种操作如创建、删除和管理主题发送和接收消息管理消费者组等。​ 创建 Topic kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic --create表示创建主题的操作指令​--bootstrap-server localhost:9092指定 Kafka 服务器的地址和端口这里假设 Kafka 服务器运行在本地端口为 9092​--replication-factor 1指定副本因子为 1即每个分区有 1 个副本副本因子决定了消息在 Kafka 集群中的冗余程度提高副本因子可以增强数据的可靠性但也会占用更多的磁盘空间和网络带宽​--partitions 1指定分区数为 1分区是 Kafka 中数据存储和并行处理的基本单位增加分区数可以提高消息处理的吞吐量但也会增加管理和维护的复杂度​--topic my-topic指定要创建的主题名为my-topic主题是 Kafka 中消息的逻辑分类生产者将消息发送到指定的主题消费者从感兴趣的主题中拉取消息。 发送消息 kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic 执行上述命令后会进入命令行输入模式此时你可以逐行输入要发送的消息。例如输入Hello, Kafka!然后按回车键这条消息就会被发送到my-topic主题中。输入完成后按Ctrl C组合键可以退出消息发送模式。这里--broker-list localhost:9092指定了 Kafka 代理服务器的地址和端口--topic my-topic指定了要发送消息的主题。​ 消费消息 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --bootstrap-server localhost:9092指定 Kafka 服务器的地址和端口​--topic my-topic指定要消费消息的主题​--from-beginning表示从主题的起始位置开始消费消息如果不指定该参数默认从最新的消息开始消费。执行该命令后会实时输出my-topic主题中的消息内容包括消息的偏移量、键、值等信息。 4.2 Java 代码版本​ 在 Java 代码中我们使用org.apache.kafka:kafka-clients库提供的 API 来实现 Kafka 的操作。以 Maven 项目为例在完成前文提到的依赖引入后我们可以编写如下代码来创建生产者和消费者。​ 创建生产者 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置生产者属性Properties props new Properties();// 指定Kafka broker地址可以是多个以逗号分隔props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 指定key的序列化器将对象转换为字节数组以便在网络中传输props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 指定value的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建Kafka生产者实例KafkaProducerString, String producer new KafkaProducer(props);// 要发送的主题String topic my-topic;// 要发送的消息内容这里构造了一条简单的消息包含一个键和一个值ProducerRecordString, String record new ProducerRecord(topic, key1, Hello, Kafka from Java!);try {// 发送消息这是一个异步操作producer会将消息发送到Kafka集群并立即返回不会阻塞当前线程producer.send(record);System.out.println(Message sent successfully);} catch (Exception e) {// 如果发送过程中出现异常捕获并打印异常信息e.printStackTrace();} finally {// 关闭生产者释放资源确保资源的正确关闭是良好的编程习惯避免资源泄漏producer.close();}} } 创建消费者 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Collections; import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 配置消费者属性Properties props new Properties();// 指定Kafka broker地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 指定消费者组ID同一消费者组内的消费者共同消费主题中的消息通过组ID来标识属于同一个消费组props.put(ConsumerConfig.GROUP_ID_CONFIG, my-group);// 指定key的反序列化器将接收到的字节数组转换为对象props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定value的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建Kafka消费者实例KafkaConsumerString, String consumer new KafkaConsumer(props);// 订阅主题可以订阅多个主题这里使用Collections.singletonList方法将单个主题名封装成列表consumer.subscribe(Collections.singletonList(my-topic));try {while (true) {// 拉取消息设置超时时间为100毫秒consumer.poll方法会从Kafka集群中拉取消息并返回一个包含消息的ConsumerRecords对象ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));// 遍历接收到的消息records.forEach(record - {// 打印消息的相关信息包括主题、分区、偏移量、键和值System.out.printf(Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n,record.topic(), record.partition(), record.offset(), record.key(), record.value());});}} catch (Exception e) {// 如果消费过程中出现异常捕获并打印异常信息e.printStackTrace();} finally {// 关闭消费者释放资源consumer.close();}} } 在上述 Java 代码中生产者通过KafkaProducer类的send方法发送消息消费者通过KafkaConsumer类的poll方法拉取消息。同时我们设置了一些关键的配置参数如bootstrap.servers指定 Kafka 服务器地址key.serializer和value.serializer用于生产者的序列化key.deserializer和value.deserializer用于消费者的反序列化group.id用于标识消费者组 。通过这些配置和代码我们可以在 Java 程序中实现与 Kafka 集群的交互进行消息的发送和消费。 五、总结 通过本文我们成功地完成了 Kafka 本地环境的搭建包括使用 Docker 部署单节点和多节点的 Kafka 集群配置 Java 项目的 Kafka 客户端依赖以及通过 Kafka CLI 和 Java 代码进行创建 Topic、发送 / 消费消息的实战操作。这些基础知识和实践经验是我们深入学习 Kafka 的基石为后续探索 Kafka 的高级特性如分区策略、消息压缩、事务处理、监控与调优等打下了坚实的基础。​
http://www.dnsts.com.cn/news/227636.html

相关文章:

  • 北京网站设计的公司价格慈溪做网站什么价
  • 汕头人事考试网seo快速优化文章排名
  • wordpress中菜单免费网络推广及优化
  • 网站营销推广有哪些淘宝网站是语言用什么做的
  • 成都网站优化排名桂林漓江在哪个位置
  • 怎样申请网站域名和空间做交互的设计网站
  • 如何做旅游攻略网站专业做数据的网站有哪些方面
  • 网站的构建是怎样的一级做ae视频片段怎么做
  • 宁波做网站哪家好市场监督管理局是工商局吗
  • 全球网站流量排名100建设网站大概要花多少钱
  • 百度云域名怎么做网站如何查询一个网站的icp
  • 白酒 网站模板免费做logo设计的网站
  • 网站建设电子书做网站要营业执照吗
  • 浪漫网站建设网站建设特效代码
  • 深圳建站推广公司沈阳网站建设模块
  • 免费源码资源源码站怎么样黑进网站后台
  • 织梦网站搬家湖南铁军工程建设有限公司网站
  • 学校网站开发文档百度保障中心人工电话
  • 网站建设招标采购需求山西网站建设服务公司
  • 服务器建设一个自己的网站wordpress+浮框
  • 东明住房和城乡建设局网站郑州营销网站托管
  • 福田做商城网站建设哪家服务周到有什么做动图比较方便的网站
  • 福建建设工程交易中心网站京东网上商城购买
  • 美容院网站模板网站推广营销服务
  • 常用wap网站开发工具 手机网站制国外低代码平台
  • 织梦网站制作教程购物商城网站功能设计
  • 关于制作网站收费标准第三方物流网站建设
  • 公司的网站建设要记到什么科目服务器网站搭建教程
  • 成都 专业 网站建设黄骅信誉楼罗茂莲事件
  • 网站可以个人备案吗站长素材网