做淘宝客网站用什么系统,本人急招一名临时工,赣州市建设局建管科网站,大岭山东莞网站建设之前开发的系统#xff0c;用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层#xff0c;最近一直在看Kafka和PostgreSql相关的知识#xff0c;想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试#xff0c;过程还是比较顺利的#xff0c;后续…之前开发的系统用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层最近一直在看Kafka和PostgreSql相关的知识想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试过程还是比较顺利的后续就是搭建基础服务的事情了。这里简单分享一下。
环境安装
安装Kafka
官方文档Apache Kafka可以直接参考我这里简单介绍下我在本地搭建开发环境的过程还是遇到了一个小坑。
我这里是在本地WSL 2环境下进行的安装安装过程就参考官方文档的推荐流程即可
下载安装包
注意这里要下载编译后的包不嫌麻烦的话可以下载源代码编译后再使用。 wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
安装
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
这里安装完成后的路径是这样子的 重点关注的就是binconfig和logs这3个目录。
启动服务
官方提供了2中启动策略一个是KRaft一个是Zookeeper我这里用的zookeeper
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
在启动kafka服务
bin/kafka-server-start.sh config/server.properties
后面的zookeeper.properties和server.properties是配置文件后续有配置需求的时候可以修改比如监听地址brokerid等等长这样 启动后控制台的输出是这样 这样一个kafka的服务节点就启动了。
对了kafka是依赖java环境的安装之前本地要安装jdk我这里使用的是openjdk也是ok的。
*端口转发仅WSL2环境
在WSL2环境下需要配置下端口转发不然宿主机连接不到broker
netsh interface portproxy add v4tov4 listenport9092 listenaddress0.0.0.0 connectport9092 connectaddress172.28.240.79
后面那个ip地址就写宿主机给WSL环境下发的地址 此外宿主机和wsl环境都放开9092或者你设置的端口
链接测试
这里有很多客户端的ui工具或者插件可以连接Kafka官方本身也提供了测试命令比如官方文档里给的测试案例就是用这几个命令 本地开发的话我这里用的vs code的tools for apache kafka 这个插件在插件市场用关键字搜索完成安装即可 至此一个本地的Kafka节点就基本配置完成了
安装PostgreSql
这个我老早就装好了一些安装过程没有截图就忽略吧大家有需求的可以问一下各种GPT
也可以用docker快速部署一个节点做本地的测试。
docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORDmysecretpassword -d postgres
开发测试
新建项目
这里因为我是用的IDE做开发所以直接创建个web项目就好也可以用命令行来创建。
总之创建完成后我的项目长这样 安装依赖
我这里是用的是dotnet.cap这个系列组件然后为了测试方便数据库的orm适用的是dapper主要是图快大家实际项目中可以用习惯的orm就好。
这里我的项目文件长这样
Project SdkMicrosoft.NET.Sdk.WebPropertyGroupTargetFrameworknet8.0/TargetFrameworkNullableenable/NullableImplicitUsingsenable/ImplicitUsings/PropertyGroupItemGroupPackageReference IncludeDapper Version2.1.35 /PackageReference IncludeDotNetCore.CAP Version8.2.0 /PackageReference IncludeDotNetCore.CAP.Dashboard Version8.2.0 /PackageReference IncludeDotNetCore.CAP.Kafka Version8.2.0 /PackageReference IncludeDotNetCore.CAP.PostgreSql Version8.2.0 //ItemGroup/Project
注入服务
这里主要注入pg和Kafka
builder.Services.AddCap(x
{x.UsePostgreSql(User ID{pg用户名};Password{pg密码};Host{pg地址};Port5432;DatabasemaigcTestDb;);x.UseKafka(localhost:9092);x.UseDashboard();
});
测试的业务代码
在常规的controller中注入服务
public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher capPublisher;}
}
写一个生产者接口
public async TaskIActionResult Producer()
{Console.WriteLine(生产者发布消息: DateTime.Now);await producer.PublishAsync(sample.kafka.postgrsql, DateTime.Now);return Ok();
}
再写一个延时发送消息的生产者接口
public async TaskIActionResult ProducerDelay()
{Console.WriteLine(生产者发布延时消息: DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), sample.kafka.postgrsql, DateTime.Now);return Ok();
}
创建消费者
[CapSubscribe(sample.kafka.postgrsql)]
public void Test2(DateTime value)
{Console.WriteLine(订阅到消息: value);
}
我们访问下接口看下控制台的打印效果 可以看到订阅到的时间和生产者发送的实际是一致的。 再试下延时发送 我们延时了10秒发布这里生产者执行生产消息后过了10秒被消费者订阅到。
我们延时了10秒发布这里生产者执行生产消息后过了10秒被消费者订阅到。
再看下PostG里保存的消息记录
这是生产记录 这是消费记录 注意在CAP的机制里这些持久化的消息记录是可以设置过期时间的也就是如果我们每天的并发量很高产生的消息非常多可以设置一个过期时间比如7天一个月到期后这些持久化的数据就会自动清除掉。
CAP的官方文档里还有更多案例大家感兴趣也可以去试试当然除了CAP还有MediatRMassTransit这类组件也可以轻松实现消息总线的机制。
好了到此我们的测试就结束了从安装Kafka到创建这个新项目并跑通这个测试服务也就2个小时所以这个迁移成本应该还是非常高效的。
小总结
实际上我们的生产环境中正正常运行的一套总线服务依赖的是RabbitMQ和SQL ServerRabbitMQ还好SQL Server在以后应该不会是做项目的首选数据库了尤其是做一些高并发的项目不是说它性能不够而是成本太高社区版的限制有太多还是要早做规划提前准备更加适合未来发展的方案而PostgreSql是目前最受全球开发者欢迎的关系数据库社区活跃度非常高开源协议对企业也十分友好即便是面对国内高标准的信创要求也完全没问题是绝佳的首选。
至于Kafka这是目前世界上最为流行的消息队列性能可用性可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整根骨还是来自Kafka。而且就生态环境而言无论国内还是国外Kafka都是遥遥领先对dotnet框架的支持Kafka也远比RocketMQ更好RocketMQ更多的还是用在java环境里所以我们再选型的时候优先考虑的还是Kafka。
更多关于这些内容的知识大家感兴趣可以去搜一下或者找个AI问一下。
好了就这些吧。