asp网站自动识别手机,找外包做网站不给代码,郑州贸网站建设公司,浏阳做网站公司1.什么是Spring Cloud Stream#xff1f;
Spring Cloud Stream的核心是Stream#xff0c;准确来讲Spring Cloud Stream提供了一整套数据流走向#xff08;流向#xff09;的API#xff0c; 它的最终目的是使我们不关心数据的流入和写出#xff0c;而只关心对数据的业务处…1.什么是Spring Cloud Stream
Spring Cloud Stream的核心是Stream准确来讲Spring Cloud Stream提供了一整套数据流走向流向的API 它的最终目的是使我们不关心数据的流入和写出而只关心对数据的业务处理 我们举一个例子你们公司有一套系统这套系统由多个模块组成你负责其中一个模块。数据会从第一个模块流入处理完后再交给下一个模块。对于你负责的这个模块来说它的功能就是接收上一个模块处理完成的数据自己再加工加工扔给下一个模块。 我们很容易总结出每个模块的流程
1、从上一个模块拉取数据 2、处理数据 3、将处理完成的数据发给下一个模块
其中流程1和3代表两个模块间的数据交互这种数据交互往往会采用一些中间件middleware。比如模块1和模块2间数据可能使用的是kafka模块1向kafka中push数据模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显它们的功能都是一样的提供数据的流向让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同需要使用不同的API。 为了消除这种数据流入输入和数据流出输出实现上的差异性因此便出现了Spring Cloud Stream。
2.环境准备
采用docker-compose搭建kafaka环境
version: 3networks:kafka:ipam:driver: defaultconfig:- subnet: 172.22.6.0/24services:zookepper:image: /zookeeper:latestcontainer_name: zookeeper-serverrestart: unless-stoppedvolumes:- /etc/localtime:/etc/localtimeenvironment:ALLOW_ANONYMOUS_LOGIN: yesports:- 2181:2181networks:kafka:ipv4_address: 172.22.6.11kafka:image: /kafka:3.4.1container_name: kafkarestart: unless-stoppedvolumes:- /etc/localtime:/etc/localtimeenvironment:ALLOW_PLAINTEXT_LISTENER: yesKAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092ports:- 9092:9092depends_on:- zookeppernetworks:kafka:ipv4_address: 172.22.6.12kafka-map:image: /kafka-mapcontainer_name: kafka-maprestart: unless-stoppedvolumes:- ./kafka/kafka-map/data:/usr/local/kafka-map/dataenvironment:DEFAULT_USERNAME: adminDEFAULT_PASSWORD: 123456ports:- 9080:8080depends_on: - kafkanetworks:kafka:ipv4_address: 172.22.6.13run
docker-compose -f docker-compose-kafka.yml -p kafka up -d3.代码工程 实验目标
1、生成UUID并将其发送到Kafka主题batch-in。 2、从batch-in主题接收UUID的批量消息移除其中的数字并将结果发送到batch-out主题。 3、监听batch-out主题并打印接收到的消息。
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdspringcloud-demo/artifactIdgroupIdcom.et/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdspring-cloud-stream-kafaka/artifactIdpropertiesmaven.compiler.source17/maven.compiler.sourcemaven.compiler.target17/maven.compiler.target/propertiesdependencies!-- Spring Boot Starter Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Boot Starter Test --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-kafka/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependencies/project处理流
package com.et;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;SpringBootApplication
public class CloudStreamsFunctionBatch {public static void main(String[] args) {SpringApplication.run(CloudStreamsFunctionBatch.class, args);}Beanpublic SupplierUUID stringSupplier() {return () - {var uuid UUID.randomUUID();System.out.println(uuid - batch-in);return uuid;};}Beanpublic FunctionListUUID, ListMessageString digitRemovingConsumer() {return idBatch - {System.out.println(Removed digits from batch of idBatch.size());return idBatch.stream().map(UUID::toString)// Remove all digits from the UUID.map(uuid - uuid.replaceAll(\\d,)).map(noDigitString - MessageBuilder.withPayload(noDigitString).build()).toList();};}KafkaListener(id batch-out, topics batch-out)public void listen(String in) {System.out.println(batch-out - in);}}定义一个名为stringSupplier的Bean它实现了Supplier接口。这个方法生成一个随机的UUID并打印到控制台表示这个UUID将被发送到batch-in主题。定义一个名为digitRemovingConsumer的Bean它实现了FunctionList, ListMessage接口。这个方法接受一个UUID的列表打印出处理的UUID数量然后将每个UUID转换为字符串移除其中的所有数字最后将结果封装为消息并返回。使用KafkaListener注解定义一个Kafka监听器监听batch-out主题。当接收到消息时调用listen方法并打印接收到的消息内容。
配置文件
spring:cloud:function:definition: stringSupplier;digitRemovingConsumerstream:bindings:stringSupplier-out-0:destination: batch-indigitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: truedigitRemovingConsumer-out-0:destination: batch-outkafka:binder:brokers: localhost:9092bindings:digitRemovingConsumer-in-0:consumer:configuration:# Forces consumer to wait 5 seconds before polling for messagesfetch.max.wait.ms: 5000fetch.min.bytes: 1000000000max.poll.records: 10000000参数解释 1、spring.cloud.function.definition定义了两个函数stringSupplier和digitRemovingConsumer。这两个函数将在应用程序中被使用。
spring:cloud:function:definition: stringSupplier;digitRemovingCon2、stream.bindings.stringSupplier-out-0.destination将stringSupplier函数的输出绑定到Kafka主题batch-in。
stream:bindings:stringSupplier-out-0:destination: batch-in3、stream.bindings.digitRemovingConsumer-in-0.destination将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in。
digitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: true4、group: batch-in指定消费者组为batch-in这意味着多个实例可以共享这个组来处理消息。
5、consumer.batch-mode: true启用批处理模式允许消费者一次处理多条消息。
6、stream.bindings.digitRemovingConsumer-out-0.destination将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out。
digitRemovingConsumer-out-0:destination: batch-out4.测试
启动弄Spring Boot应用可以看到控制台输出日志如下
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 - batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 - batch-in a661145b-2dd9-4927-8806-919ad258ade5 - batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c - batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 - batch-in Removed digits from batch of 5 batch-out - eacc-ee-dfb-b-dead batch-out - cbae-e-f-c-acfb batch-out - ab-dd—adade batch-out - db-fb-f-bbb-bfdec batch-out - bdb–d-ad-bbbb