南沙外贸网站建设,代理招生平台,微信商城首页,德州极速网站建设百家号提示#xff1a;文章写完后#xff0c;目录可以自动生成#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、安装java#xff08;Kafka必须安装java#xff0c;因为kafka依赖java核心#xff09;二、安装以及配置Kafka、zookeeper1.下载Kafka#xff08;无需下载… 提示文章写完后目录可以自动生成如何生成可参考右边的帮助文档 文章目录 前言一、安装javaKafka必须安装java因为kafka依赖java核心二、安装以及配置Kafka、zookeeper1.下载Kafka无需下载zookeeper使用kafka自带的即可2.配置topid3.安装PHP的rdkafka这个网上教程很多基本上都是正确的 前言
提示windows环境安装失败Linux环境安装成功以下并没有windows安装示例
一、安装javaKafka必须安装java因为kafka依赖java核心
下载地址链接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux 将文件放在Linux目录中后进行解压
假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20 ./jdk
3、vim /etc/profile JAVA_HOME/root/src/uap/web/third/jdkPATH/root/src/uap/web/third/jdk/bin:$PATHexport JAVA_HOME
4、source /ect/profile
5、java -version 出现下图极为成功二、安装以及配置Kafka、zookeeper
1.下载Kafka无需下载zookeeper使用kafka自带的即可
下载地址https://kafka.apache.org/downloads 提示不要下载带src的那个具体我也不知道因为我也是个小白
假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12 ./kafka
3、创建kafka日志文件mkdir -p ./kafka_data/log/kafkamkdir -p ./kafka_data/log/zookeepermkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.propertieslistenersPLAINTEXT://localhost:9092 (34行左右添加对应的host、port)broker.id0port9092host.name192.168.1.241log.dirs/root/src/uap/web/third/kafka_data/log/kafkazookeeper.connectlocalhost:2181
wd
vim zookeeper.propertiesdataDir/root/src/uap/web/third/kafka_data/zookeeperdataLogDir/root/src/uap/web/third/kafka_data/log/zookeeperclientPort2181maxClientCnxns100tickTimes2000initLimit10syncLimit5
wd
5、cd ../ 进入kafka目录下
#启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
//如果其中报错大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有
./bin/kafka-server-start.sh -daemon ./config/server.properties 2.配置topid
代码如下示例
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值Created topic myt. 创建成功/否则失败3.安装PHP的rdkafka这个网上教程很多基本上都是正确的
例如阿里云开发者社区php安装rdkafka教程 剩下逻辑就直接贴代码了
生产者
public function producer(){$conf new RdKafka\Conf();$conf-set(metadata.broker.list, localhost:9092);$producer new RdKafka\Producer($conf);$topic $producer-newTopic(mytest);//获取数据库数据存入kafka中$wanchk $this-db-query(SELECT * FROM hf_alarm_wanchk);foreach ($wanchk as $k $v){$topic-produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));$producer-poll(0);}$result $producer-flush(10000);if (RD_KAFKA_RESP_ERR_NO_ERROR ! $result) {throw new \RuntimeException(Was unable to flush, messages might be lost!);}$producer-purge(RD_KAFKA_PURGE_F_QUEUE);$producer-flush(10000);}
消费者:
//这个代码需要使用终端运行
// /bin/php -c /etc/php.ini -f /入口文件目录/index.php 类consumer 方法consumerpublic function consumer(){$conf new \RdKafka\Conf();$conf-set(group.id, mytest);$rk new \RdKafka\Consumer($conf);$rk-addBrokers(127.0.0.1);$topicConf new \RdKafka\TopicConf();$topicConf-set(auto.commit.interval.ms, 100);$topicConf-set(offset.store.method, broker);$topicConf-set(auto.offset.reset, smallest);$topic $rk-newTopic(mytest, $topicConf);$topic-consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {$message $topic-consume(0, 120 * 10000);switch ($message-err) {case RD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo No more messages; will wait for more\n;break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo Timed out\n;break;default:throw new \Exception($message-errstr(), $message-err);break;}}}