当前位置: 首页 > news >正文

网站建设罗贤伟远离有害不良网站应该怎么做

网站建设罗贤伟,远离有害不良网站应该怎么做,营销网站大全,网站建设设计师的工作内容下载 Apache Kafka 演示window 安装 编写启动脚本,脚本的路径根据自己实际的来 启动说明 先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper 巧记#xff1a; 铲屎官#xff08;zookeeper#xff09;总是第一个到#xff0c;最后一个走 启动zookeeper call bi… 下载 Apache Kafka 演示window 安装 编写启动脚本,脚本的路径根据自己实际的来 启动说明 先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper 巧记 铲屎官zookeeper总是第一个到最后一个走 启动zookeeper call bin/windows/zookeeper-server-start.bat config/zookeeper.properties 启动kafka   call bin/windows/kafka-server-start.bat config/server.properties 测试脚本主要用于创建主题 ‘test-topic’ # 创建主题窗口1 bin/window kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --create# 查看主题 bin/window kafka-topics.bat --bootstrap-server localhost:9092 --list bin/window kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe# 修改某主题的分区 bin/window kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2# 生产消息窗口2向test-topic主题发送消息 bin/window kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic hello kafka# 消费消息窗口3消费test-topic主题的消息 bin/window kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic package com.ldj.kafka.admin;import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic;import java.util.*;/*** User: ldj* Date: 2024/6/13* Time: 0:00* Description: 创建主题*/ public class AdminTopic {public static void main(String[] args) {MapString, Object adminConfigMap new HashMap();adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(adminConfigMap);/*** 使用kafka默认的分区算法创建分区*/NewTopic topic1 new NewTopic(topic-01, 1, (short) 1);NewTopic topic2 new NewTopic(topic-02, 2, (short) 2);CreateTopicsResult addResult1 adminClient.createTopics(Arrays.asList(topic1, topic2));/*** 手动为主题topic-03分配分区* topic-03主题下的0号分区有2个副本它们中的一个在节点id1中一个在节点id2中* list里第一个副本就是leader主写后面都是follower(主备份)* 例如0分区nodeId1的节点里的副本是主写、2分区nodeId3的节点里的副本是主写*/MapInteger, ListInteger partition new HashMap();partition.put(0, Arrays.asList(1, 2));partition.put(1, Arrays.asList(2, 3));partition.put(2, Arrays.asList(3, 1));NewTopic topic3 new NewTopic(topic-03, partition);CreateTopicsResult addResult2 adminClient.createTopics(Collections.singletonList(topic3));//DeleteTopicsResult delResult adminClient.deleteTopics(Arrays.asList(topic-02));adminClient.close();}}package com.ldj.kafka.producer;import com.alibaba.fastjson.JSON; import com.ldj.kafka.model.UserEntity; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Future;/*** User: ldj* Date: 2024/6/12* Time: 21:08* Description: 生产者*/ public class KfkProducer {public static void main(String[] args) throws Exception {//生产者配置MapString, Object producerConfigMap new HashMap();producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2);//消息传输应答安全级别 0-消息到达broker(效率高但不安全) 1-消息在leader副本持久化折中方案 -1/all -消息在leader和flower副本都持久化安全但效率低producerConfigMap.put(ProducerConfig.ACKS_CONFIG, all);//ProducerState 缓存5条数据重试数据会与5条数据做比较结论只能保证一个分区的数据幂等性跨会话幂等性需要通过事务操作解决重启后全局消息id的随机id会发生改变//消息发送失败重试次数重试会导致消息重复!!考虑幂等性消息乱序判断偏移量是否连续错乱消息回到在缓冲区重新排序!!producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3);//kafka有消息幂等性处理全局唯一消息id/随机id-分区-偏移量,默认false-不开启producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//解决跨会话幂等性还需结合事务操作,忽略//producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tx_id);//创建生产者KafkaProducerString, String producer new KafkaProducer(producerConfigMap);//TODO 事务初始化方法//producer.initTransactions();//构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, IterableHeader headers)try {//TODO 开启事务//producer.beginTransaction();for (int i 0; i 10; i) {UserEntity userEntity new UserEntity().setUserId(2436687942335620L i).setUsername(lisi).setGender(1).setAge(18);ProducerRecordString, String record new ProducerRecord(test-topic,userEntity.getUserId().toString(),JSON.toJSONString(userEntity));//发送数据到BrokerFutureRecordMetadata future producer.send(record, (RecordMetadata var1, Exception var2) - {if (Objects.isNull(var2)) {System.out.printf([%s]消息发送成功, userEntity.getUserId());} else {System.out.printf([%s]消息发送失败err:%s, userEntity.getUserId(), var2.getCause());}});//TODO 提交事务//producer.commitTransaction();//注意没有下面这行代码是异步线程从缓冲区读取数据异步发送消息反之是同步发送必须等待回调消息返回才会往下执行System.out.printf(发送消息[%s]----, userEntity.getUserId());RecordMetadata recordMetadata future.get();System.out.println(recordMetadata.offset());}} finally {//TODO 终止事务//producer.abortTransaction();//关闭通道producer.close();}}}package com.ldj.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map;/*** User: ldj* Date: 2024/6/12* Time: 21:10* Description: 消费者*/ public class KfkConsumer {public static void main(String[] args) {//消费者配置MapString, Object consumerConfigMap new HashMap();consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//所属消费组consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, test123456);//创建消费者KafkaConsumerString, String consumer new KafkaConsumer(consumerConfigMap);//消费主题的消息 ConsumerRebalanceListenerconsumer.subscribe(Collections.singletonList(test-topic));try {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(5));//数据存储结构MapTopicPartition, ListConsumerRecordK, V records;for (ConsumerRecordString, String record : records) {System.out.println(record.value());}}} finally {//关闭消费者consumer.close();}}}
http://www.dnsts.com.cn/news/217052.html

相关文章:

  • 广州网站设计公司兴田德润在哪儿网站站内结构优化
  • 域名连接到网站怎么做wordpress 添加js
  • 深圳外贸建站网络推广公司企业邮箱认证
  • 网页设计网站制作一般多少钱wordpress获取子菜单
  • 做个网站需要多少钱.化妆培训
  • 网站开发入门需要学什么推广是做什么工作的
  • 杨凌网站开发php网站开发套模板
  • 请人做网站需要多少钱建设网站用户名是什么原因
  • 做网站怎么保证商品是正品大气金融php网站源码
  • phython 做的网站部分网站为什么网页打不开的原因及解决方法
  • 优化网站制作工作简历模板电子版
  • 外贸访问国外网站园区网站建设方案
  • 学生做网站的工作室长沙网站优化掌营天下
  • 山东网站建设最便宜河源建设工程交易中心网站
  • 厦门市建设保障性住房局网站自己的网站怎么做网盘
  • 长春城投建设投资有限公司网站未做301重定向的网站
  • 荣成市建设局网站是什么传统文化网站设计
  • 网站底部美化代码中国和城乡建设部网站
  • 湖南医院响应式网站建设企业上海外贸学院
  • 江苏网站建设yijuce南阳市网站建设
  • 手机网站seo视频网站怎么做算法
  • 网站快速建设软件下载全屏wordpress
  • 有没有代做课程设计的网站长春seo按天计费
  • dede手机网站模版企业建立网站的好处
  • 为什么做的网站在谷歌浏览器打不开免费学习的网站平台
  • 做网站的照片要多大像素网站建设怎么申请域名
  • 网站客户流失dw网页制作教程视频简单第二期
  • 哪里网站备案快希音跨境电商官网入口
  • 定制东莞网站制作公司百度上怎么发布作品
  • 建工行业建设标准网站企业信息查询系统入口