当前位置: 首页 > news >正文

哪里可以找到做网站的中企动力主要做什么的

哪里可以找到做网站的,中企动力主要做什么的,怎样设置自己的网站,wordpress百度分享插件文章目录 实时流式计算Kafka StreamKafka Streams 的关键概念KStreamKafka Stream入门案例编写SpringBoot 集成 Kafka Stream 实时流式计算 一般流式计算会与批量计算相比较 流式计算就相当于上图的右侧扶梯#xff0c;是可以源源不断的产生数据#xff0c;源源不断的接收数… 文章目录 实时流式计算Kafka StreamKafka Streams 的关键概念KStreamKafka Stream入门案例编写SpringBoot 集成 Kafka Stream 实时流式计算 一般流式计算会与批量计算相比较 流式计算就相当于上图的右侧扶梯是可以源源不断的产生数据源源不断的接收数据没有边界。 一般流式计算会与批量计算相比较。在流式计算模型中输入是持续的可以认为在时间上是无界的也就意味着永远拿不到全量数据去做计算。同时计算结果是持续输出的也即计算结果在时间上也是无界的。 流式计算一般对实时性要求较高同时一般是先定义目标计算然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率往往尽可能采用增量计算代替全量计算。 应用场景 日志分析 网站的用户访问日志进行实时的分析计算访问量用户画像留存率等等实时的进行数据分析帮助企业进行决策大屏看板统计 可以实时的查看网站注册数量订单数量购买数量金额等。公交实时数据 可以随时更新公交车方位计算多久到达站牌等实时文章分值计算 头条类文章的分值计算通过用户的行为实时文章的分值分值越高就越被推荐。 技术方案选型 HadoopApche StormFlinkKafka Stream 可以轻松地将其嵌入任何Java应用程序中并与用户为其流应用程序所拥有的任何现有打包部署和操作工具集成。 Kafka Stream Kafka Stream提供了对存储于 Kafka内 的数据进行流式处理和分析的功能 Kafka Stream的特点如下 Kafka Stream提供了一个非常简单而轻量的Library它可以非常方便地嵌入任意Java应用中也可以任意方式打包和部署除了Kafka外无任何外部依赖充分利用Kafka分区机制实现水平扩展和顺序性保证通过可容错的state store实现高效的状态操作如windowed join和aggregation支持正好一次处理语义提供记录级的处理能力从而实现毫秒级的低延迟支持基于事件时间的窗口操作并且可处理晚到的数据late arrival of records同时提供底层的处理原语Processor类似于Storm的spout和bolt以及高层抽象的DSL类似于Spark的map/group/reduce Kafka Streams 的关键概念 源处理器Source Processor源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。 Sink处理器sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题 KStream 数据结构类似于map,如下图key-value 键值对 KStream数据流data stream即是一段顺序的可以无限长不断更新的数据集。 Kafka Stream入门案例编写 需求分析求单词个数word count 创建原生的 kafka staream 入门案例 导入依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdexclusionsexclusionartifactIdconnect-json/artifactIdgroupIdorg.apache.kafka/groupId/exclusionexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions /dependencypackage com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** 流式处理*/ public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,streams-quickstart);//stream 构建器StreamsBuilder streamsBuilder new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容hello kafka hello itcast* param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);/*** 处理消息的value*/stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//按照value进行聚合处理.groupBy((key,value)-value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)-{System.out.println(key:key,vlaue:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);} }测试准备 使用生产者在 topic 为itcast_topic_input中发送多条消息stream 接收 itcast_topic_input 的数据进行聚合操作后将处理结果发送到 itcast_topic_out使用消费者接收 topic 为itcast_topic_out 结果 通过流式计算会把生产者的多条消息汇总成一条发送到消费者中输出 SpringBoot 集成 Kafka Stream 配置 package com.heima.kafka.config;import lombok.Getter; import lombok.Setter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap; import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象设置自定配置参数*/Setter Getter Configuration EnableKafkaStreams ConfigurationProperties(prefixkafka) public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE 16* 1024 * 1024;private String hosts;private String group;Bean(name KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {MapString, Object props new HashMap();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()_stream_aid);props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()_stream_cid);props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);} }application.yml kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}在配置类中定义方法 可注入StreamsBuilder返回值必须是KStream且放入spring容器中 package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.time.Duration; import java.util.Arrays;Configuration Slf4j public class KafkaStreamHelloListener {Beanpublic KStreamString,String kStream(StreamsBuilder streamsBuilder){//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//根据value进行聚合分组.groupBy((key,value)-value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)-{System.out.println(key:key,value:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);return stream;} }测试 启动 springboot 项目即可自动监听
http://www.dnsts.com.cn/news/5397.html

相关文章:

  • 教育网站开发需求说明书咨询公司网站源码
  • 南通高端网站设计wordpress外贸企业模板
  • 如何查询网站备案进度网站设计主流尺寸
  • 网站可免费做河南网站seo地址
  • 网站只能在vps里打开湖南省郴州市桂阳县邮政编码
  • 做产品网站淘宝百度外发加工网app
  • 小企业如何优化网站建设企业品牌营销策划公司
  • 大专学网站开发邢台网站建设报价多少钱
  • 濮阳建站公司哪个好网站建设目标是
  • 情感视频素材网站石门县建设局网站
  • 做厂房的网站编程自学网
  • 河北建设厅网站技术电话网站建设设计指标
  • 苏州企业网站建设定制wordpress在线代码编辑
  • 网站推广公司排名网站建设+设计那种连接线厂家
  • 网站模板颜色河北邯郸旅游景点
  • 网站淘宝推广怎么做商业网页设计
  • 广州化妆品网站建设云闪付当前页面设计隐私
  • 开封网站推广郑州建筑公司排名
  • 织梦网站密码忘记网站的好处
  • 经典的高端网站建设公司着陆页设计清除网站黑链
  • 门户网站 建设 如何写手机上干点啥能挣零花钱
  • 怎样做好网站建设设计网站域名哪些后缀更好
  • 公司网站建设方案网页设计网站开发培训
  • APP网站怎么做桂林刚刚发生的事
  • 站长之家html模板wordpress对php版本
  • 推广做网站联系方式无锡市城乡建设局网站
  • 网站客户评价做租赁的行业网站
  • 网络营销咨询网站源码免费广告投放网站
  • 做网站设计的网页加速器免费版 安卓
  • 内网网站建设所需硬件设备白云怎样优化网站建设