查询公司的网站,二建报名时间2023年报名时间,企业邮箱申请域名,软件公司网站“这是一篇理论文章#xff0c;给大家讲一讲kafka” 简介
在大数据领域开发者常常会听到MQ这个术语#xff0c;该术语便是消息队列的意思#xff0c; Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布#xff0c;使用Scala语言编写#xff0c;与2010年…“这是一篇理论文章给大家讲一讲kafka” 简介
在大数据领域开发者常常会听到MQ这个术语该术语便是消息队列的意思 Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布使用Scala语言编写与2010年12月份开源成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
1.消息 Message
网络中的两台计算机或者两个通讯设备之间传递的数据。例如说文本、音乐、视频等内容。
2.队列 Queue(栈的特点FILO 队列FIFO)
一种特殊的线性表数据元素首尾相接特殊之处在于只允许在首部删除元素和在尾部追加元素。入队、出队
3.消息队列 MQ
消息队列保存消息的队列。消息的传输过程中的容器主要提供生产、消费接口供外部调用做数据的存储和获取。
消息队列分类
MQ主要分为两类点对点(p2p)、发布订阅(Pub/Sub)
1.共同点
消息生产者生产消息发送到queue中然后消息消费者从queue中读取并且消费消息。
2.不同点
p2p模型包括消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费消息就不在消息队列中)。比如说打电话。
Pub/Sub包含消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)每个消息可以有多个消费者彼此互不影响。比如我发布一个微博关注我的人都能够看到。
Kafka的特点
Kafka如此受欢迎而且有越来越多的系统支持与Kafka的集成主要由于Kafka具有如下特性。 ● 高吞吐量、低延迟Kafka每秒可以处理几十万条消息它的延迟最低只有几毫秒。 ● 可扩展性Kafka集群同Hadoop集群一样支持横向扩展。 ● 持久性、可靠性Kafka消息可以被持久化到本地磁盘并且支持Partition数据备份防止数据丢失。 ● 容错性允许Kafka集群中的节点失败如果Partition分区副本数量为n则最多允许n-1个节点失败。 ● 高并发单节点支持上千个客户端同时读写每秒钟有上百MB的吞吐量基本上达到了网卡的极限
Kafka组成
Topic主题Kafka处理的消息的不同分类。Broker消息代理Kafka集群中的一个kafka服务节点称为一个broker主要存储消息数据。存在硬盘中每个topic都是有分区的。PartitionTopic物理上的分组一个topic在broker中被分为1个或者多个partition分区在创建topic的时候指定。Replica数据副本可以为保存在Kafka中的数据指定副本数以提高数据冗余性防止数据丢失Message消息是通信的基本单位每个消息都属于一个partition
Kafka服务相关
Producer消息和数据的生产者向Kafka的一个topic发布消息。Consumer消息和数据的消费者定于topic并处理其发布的消息。Zookeeper协调kafka的正常运行。KRaftKafka的KRaft模式在2.8.0版本中被引入。从2.8.0版本开始Kafka提供了对KRaft的支持其中最大的变化之一就是不再依赖外部的ZooKeeper来管理Kafka的元数据。因此如果你使用2.8.0版本或更高版本的Kafka你将能够使用KRaft模式无需安装和配置ZooKeeper。 Kafka架构设计
一个典型的Kafka集群包含若干个生产者Producer、若干Kafka集群节点Broker、若干消费者Consumer以及一个Zookeeper集群或者KRaft模式。Kafka通过Zookeeper管理集群配置选举Leader以及在消费者发生变化时进行负载均衡。生产者使用推Push模式将消息发布到集群节点而消费者使用拉Pull模式从集群节点中订阅并消费消息。
主题和分区的具体定义如下。
● 主题是生产者发布到Kafka集群的每条信息所属的类别即Kafka是面向主题的一个主题可以分布在多个节点上。 ● 分区是Kafka集群横向扩展和一切并行化的基础每个Topic可以被切分为一个或多个分区。一个分区只对应一个集群节点每个分区内部的消息是强有序的。 ● Offset即偏移量是消息在分区中的编号每个分区中的编号是独立的。
Kafka分布式集群的构建
在kafka2.0版本以前是依赖于zookeeper集群中安装 ·|| Kafka使用Zookeeper作为其分布式协调框架能很好地将消息生产、消息存储、消息消费的过程结合在一起。同时借助Zookeeper,Kafka能够将生产者、消费者和集群节点在内的所有组件在无状态的情况下建立起生产者和消费者的订阅关系并实现生产者与消费者的负载均衡。 可以看出Kafka集群依赖于Zookeeper所以在安装Kafka之前需要提前安装Zookeeper。Zookeeper集群在前面Hadoop集群的构建过程中已经在使用Kafka可以共用之前安装的Zookeeper集群接下来只需要安装Kafka集群即可。
·|| 较新版本的 Apache Kafka从2.8.0版本开始引入了KRaft这是一个内置的分布式存储 系统用于管理Kafka的元数据信息不再需要依赖外部的 ZooKeeper。因此你在使用较新版本的Kafka时不再需要单独安装和配置 ZooKeeper。 在KRaft模式下Kafka内部有自己的元数据存储这消除了对外部 ZooKeeper 的依赖。这样做的目的是简化 Kafka 集群的维护和部署以及提高可用性。 在基于zookeeper和kraft两种集群管理机制下200万分区数据量下的耗时比较。基于自带的KRaft性能表现会更优。
基于KRaft下的kafka安装
解压压缩包
tar -zxvf kafka_2.12-3.6.0.tgz -C kafka编辑环境变量
export KAFKA_HOME/home/hadoop/kafka/kafka_2.12-3.6.0
export PATH$KAFKA_HOME/bin:PATH
编辑配置文件server.properties 文件所在路径${KAFKA_HOME}/config/kraft/server.properties 该文件中几个重点参数
process.rolesbroker,controller ##broker相当于从节点controller相当于主节点
node.id2 ##节点ID 每个节点必须唯一
controller.quorum.voters1vm02:9093,2vm03:9093,3vm04:9093
##参与主节点选举格式(node.id)(hostname):(port)
advertised.listenersPLAINTEXT://hostname:9092 ##对外服务地址消费者、生产者对该节点的访问生成集群ID
kafka-storage.sh random-uuid
6foHn9NLQpiMAirIK7EG4A
##生成6foHn9NLQpiMAirIK7EG4A 的uuid所有节点执行kafka初始化
kafka-storage.sh format -t 6foHn9NLQpiMAirIK7EG4A -c ./$KAFKA_HOME/config/kraft/server.properties
所有节点执行启动kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/kraft/server.properties
查看kafka进程
jps 使用示例
创建topic
kafka-topics.sh --create --topic your_topic --bootstrap-server vm02:9092,vm03:9092,vm04:9092 --partitions 3 --replication-factor 2注 --bootstrap-server vm02:9092,vm03:9092,vm04:9092 此处参数可以指定集群所有节点也可以指定localhost:9092,创建的主题并不意味着后期的消费者和生产者只能指定在 localhost 节点上。这里的 --bootstrap-server 参数在创建主题时主要是为了指定初始的 Kafka 节点它告诉 Kafka 工具在哪里查找集群的元数据。
后期的消费者和生产者在连接到 Kafka 集群时会从指定的初始节点获取集群的元数据然后与整个集群建立连接。一旦获取了元数据消费者和生产者就可以与整个 Kafka 集群进行通信而不仅仅限制在初始指定的节点上。因此使用 --bootstrap-server localhost:9092 创建的主题对于后期的消费者和生产者仍然可以在整个 Kafka 集群的任何节点上进行使用只要它们能够连接到集群并获取到正确的元数据信息。
查看已创建的topic的详细信息
kafka-topics.sh --describe --bootstrap-server vm02:9092,vm03:9092,vm04:9092 --topic your_topic修改已创建topic
在官方对于alter参数的解释中
--alter Alter the number of partitions and replica assignment. Update the configuration of an existing topic via --alter is no longer supported here (the kafka-configs CLI supports altering topic configs with a -- bootstrap-server option).
kafka-topics.sh --alter选项在最新版本中已不再支持更新现有主题的配置这意味着一旦主题被创建就不能使用–alter选项来更改其分区数和副本分配。可以通过使用kafka-configs.sh
修改主题的配置参数。
kafka-configs.sh --bootstrap-server vm02:9092,vm03:9092,vm04:9092 \
--entity-type topics --entity-name your_topic \
--alter --add-config retention.ms86400000
查看topic 定义相关参数信息。
kafka-configs.sh --bootstrap-server \
vm02:9092,vm03:9092,vm04:9092 \
--entity-type topics --entity-name your_topic --describe 删除已创建topic
kafka-topics.sh --delete --topic your_topic --bootstrap-server vm02:9092,vm03:9092,vm04:9092 创建生产者producer
kafka-console-producer.sh --broker-list 192.168.56.101:9092 --topic mrt
场景应用示例
以postgresql数据库中的public.conn_fdw表作为生产者producer身份把数据推向kafka然后在使用kafka把数据推推向消费者数据库Oracle
在postgresql数据库中创建测试数据表
CREATE TABLE public.conn_fdw (id int4 NULL,name varchar(50) NULL,age int4 NULL,city varchar(50) NULL,salary int4 NULL
);
在Oracle中创建同样的表结构
create table SYSTEM.CONN_FDW
(id NUMBER,name VARCHAR2(50),age NUMBER,city VARCHAR2(50),salary NUMBER,load_time timestamp default current_timestamp
);
创建主题conn_fdw
kafka-topics.sh --create --topic conn_fdw \
--bootstrap-server vm02:9092,vm03:9092,vm04:9092 \
--partitions 3 \
--replication-factor 2
查看已经创建的主题conn_fdw
kafka-topics.sh --describe \
--bootstrap-server vm02:9092,vm03:9092,vm04:9092 \
--topic conn_fdw 在此图中有
添加maven依赖
添加相应的依赖包以作为java代码class的支持 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.12/artifactIdversion2.3.0/version/dependencydependencygroupIdorg.postgresql/groupIdartifactIdpostgresql/artifactIdversion42.2.23/version !-- 使用你的 PostgreSQL 版本 --/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-classic/artifactIdversion1.2.3/version !-- 请使用最新版本 --/dependencydependencygroupIdcom.oracle.database.jdbc/groupIdartifactIdojdbc10/artifactId !-- 使用你的 Oracle JDBC 版本 --version19.8.0.0/version/dependency
Kafka生产者代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;public class PgKafkaProducer {public static void main(String[] args) {// Kafka 配置Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.0.0.102:9092,10.0.0.103:9092,10.0.0.104:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// PostgreSQL 连接配置String jdbcUrl jdbc:postgresql://10.0.0.108:5432/postgres;String username postgres;String password postgres;try (Connection connection DriverManager.getConnection(jdbcUrl, username, password);Statement statement connection.createStatement()) {// 查询 PostgreSQL 数据String query SELECT id,name,age,city,salary FROM public.conn_fdw;ResultSet resultSet statement.executeQuery(query);// Kafka 生产者try (ProducerString, String producer new KafkaProducer(props)) {while (resultSet.next()) {// 将每一行数据作为消息发送到 Kafka 主题String key String.valueOf(resultSet.getInt(id));String value resultSet.getString(name) , resultSet.getInt(age) , resultSet.getString(city) , resultSet.getInt(salary);ProducerRecordString, String record new ProducerRecord(conn_fdw, key, value);producer.send(record);}}} catch (Exception e) {e.printStackTrace();}}
}
消费者代码
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Collections;
import java.util.Properties;public class KafkaToOracleConsumer {public static void main(String[] args) {// Kafka 配置Properties kafkaProps new Properties();kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.0.0.102:9092,10.0.0.103:9092,10.0.0.104:9092);kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, conn_fdw_groupid);kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// Oracle 连接配置String jdbcUrl jdbc:oracle:thin:192.168.48.1:1521:orcl;String username system;String password system;try (Connection connection DriverManager.getConnection(jdbcUrl, username, password);PreparedStatement preparedStatement connection.prepareStatement(INSERT INTO SYSTEM.CONN_FDW (id, name, age, city, salary) VALUES (?, ?, ?, ?, ?))) {// Kafka 消费者try (ConsumerString, String consumer new KafkaConsumer(kafkaProps)) {consumer.subscribe(Collections.singletonList(conn_fdw));while (true) {ConsumerRecordsString, String records consumer.poll(100);records.forEach(record - {// 解析 Kafka 消息String[] values record.value().split(,);int id Integer.parseInt(values[0]);String name values[1];int age Integer.parseInt(values[2]);String city values[3];int salary Integer.parseInt(values[4]);// 插入到 Oracle 数据库try {preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.setString(4, city);preparedStatement.setInt(5, salary);preparedStatement.executeUpdate();} catch (Exception e) {e.printStackTrace();}});}}} catch (Exception e) {e.printStackTrace();}}
}
此时可以通过同时执行两段代码在跑起来的过程中向生产者PG数据库插入以下数据库然后到Oracle 数据库中观察数据流的流入情况。
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(1, John, 30, New York, 50000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(2, Alice, 25, Los Angeles, 60000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(3, Bob, 35, Chicago, 70000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(4, Eva, 28, San Francisco, 55000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(5, Mike, 32, Seattle, 65000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(6, Sophia, 29, Boston, 75000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(7, David, 27, Denver, 52000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(8, Emily, 31, Austin, 68000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(9, Daniel, 26, Phoenix, 58000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(10, Olivia, 33, Houston, 72000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(11, Liam, 24, Portland, 49000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(12, Ava, 34, Atlanta, 71000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(13, Logan, 30, Miami, 62000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(14, Mia, 28, Dallas, 54000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(15, Jackson, 29, Minneapolis, 67000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(16, Sophie, 31, Detroit, 59000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(17, William, 27, Philadelphia, 70000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(18, Emma, 32, San Diego, 66000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(19, James, 26, Raleigh, 63000);
INSERT INTO public.conn_fdw
(id, name, age, city, salary)
VALUES(20, Avery, 35, Tampa, 71000);
此时可以通过以下语句查看推送到conn_fdw主题的数据。
kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092,10.0.0.102:9092,10.0.0.102:9092 --topic conn_fdw --from-beginning······希望文章能帮助到给位读者对相关知识点如果有疑问欢迎私信进行技术交流。如果文章对你有帮助希望你能点赞关注