写点什么

Kafka+PostgreSql,构建一个总线服务

作者:为自己带盐
  • 2024-09-12
    河北
  • 本文字数:2881 字

    阅读完需:约 9 分钟

Kafka+PostgreSql,构建一个总线服务

背景

之前开发的系统,用到了 RabbitMQ 和 SQL Server 作为总线服务的传输层和存储层,最近一直在看 Kafka 和 PostgreSql 相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。

环境安装

安装 Kafka

官方文档:https://kafka.apache.org/documentation/#uses,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。


我这里是在本地 WSL 2 环境下进行的安装,安装过程就参考官方文档的推荐流程即可****

下载安装包

注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。



wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
复制代码

安装

tar -xzf kafka_2.13-3.8.0.tgzcd kafka_2.13-3.8.0
复制代码


这里安装完成后的路径是这样子的



重点关注的就是 bin,config 和 logs 这 3 个目录。

启动服务

官方提供了 2 中启动策略,一个是 KRaft,一个是 Zookeeper,我这里用的 zookeeper


先启动 zookeeper 服务


bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码


在启动 kafka 服务


bin/kafka-server-start.sh config/server.properties
复制代码


后面的 zookeeper.properties 和 server.properties 是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid 等等,长这样👇



启动后控制台的输出是这样



这样,一个 kafka 的服务节点就启动了。


对了,kafka 是依赖 java 环境的,安装之前本地要安装 jdk,我这里使用的是 openjdk,也是 ok 的。

*端口转发(仅 WSL2 环境)

在 WSL2 环境下,需要配置下端口转发,不然宿主机连接不到 broker,


netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79
复制代码


后面那个 ip 地址就写宿主机给 WSL 环境下发的地址



此外,宿主机和 wsl 环境都放开 9092(或者你设置的)端口

链接测试

这里有很多客户端的 ui 工具或者插件可以连接 Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令



本地开发的话,我这里用的 vs code 的 tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可



至此,一个本地的 Kafka 节点就基本配置完成了

安装 PostgreSql

这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种 GPT


也可以用 docker,快速部署一个节点做本地的测试。


docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
复制代码

开发测试

新建项目

这里因为我是用的 IDE 做开发,所以直接创建个 web 项目就好,也可以用命令行来创建。


总之创建完成后,我的项目长这样


安装依赖

我这里是用的是 dotnet.cap 这个系列组件,然后为了测试方便,数据库的 orm 适用的是 dapper,主要是图快,大家实际项目中可以用习惯的 orm 就好。


这里我的项目文件长这样


<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <TargetFramework>net8.0</TargetFramework> <Nullable>enable</Nullable> <ImplicitUsings>enable</ImplicitUsings> </PropertyGroup> <ItemGroup> <PackageReference Include="Dapper" Version="2.1.35" /> <PackageReference Include="DotNetCore.CAP" Version="8.2.0" /> <PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /> <PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /> <PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /> </ItemGroup>
</Project>
复制代码

注入服务

这里主要注入 pg 和 Kafka


builder.Services.AddCap(x =>{    x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");
x.UseKafka("localhost:9092");
x.UseDashboard();});
复制代码

测试的业务代码

在常规的 controller 中注入服务


public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe{    /*    业务代码    */}//上面这是最新的写法,以前那种构造函数的写法也是ok的public class Values2Controller : Controller{    private ICapPublisher _capPublisher;    public Values2Controller(ICapPublisher capPublisher)    {        _capPublisher = capPublisher;    }}
复制代码


写一个生产者接口


public async Task<IActionResult> Producer(){    Console.WriteLine("生产者发布消息: " + DateTime.Now);    await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);
return Ok();}
复制代码


再写一个延时发送消息的生产者接口


public async Task<IActionResult> ProducerDelay(){    Console.WriteLine("生产者发布延时消息: " + DateTime.Now);    await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);
return Ok();}
复制代码


创建消费者


[CapSubscribe("sample.kafka.postgrsql")]public void Test2(DateTime value){    Console.WriteLine("订阅到消息: " + value);}
复制代码


我们访问下接口看下控制台的打印效果



可以看到,订阅到的时间和生产者发送的实际是一致的。


再试下延时发送



我们延时了 10 秒发布,这里生产者执行生产消息后,过了 10 秒,被消费者订阅到。

再看下 PostgreSQL 里保存的消息记录

这是生产记录


这是消费记录


注意,在 CAP 的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如 7 天,一个月,到期后,这些持久化的数据就会自动清除掉。


CAP 的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了 CAP 还有 MediatR,MassTransit 这类组件,也可以轻松实现消息总线的机制。


好了,到此我们的测试就结束了,从安装 Kafka,到创建这个新项目并跑通这个测试服务,也就 2 个小时,所以,这个迁移成本应该还是非常高效的。

小总结

实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是 RabbitMQ 和 SQL Server,RabbitMQ 还好,SQL Server 在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而 PostgreSql 是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。

至于 Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的 RocketMQ,也是基于 Kafka 的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自 Kafka。而且就生态环境而言,无论国内还是国外,Kafka 都是遥遥领先,对 dotnet 框架的支持,Kafka 也远比 RocketMQ 更好(RocketMQ 更多的还是用在 java 环境里),所以我们再选型的时候,优先考虑的还是 Kafka。


更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个 AI 问一下。


好了,就这些吧。


发布于: 2024-09-12阅读数: 10
用户头像

学着写代码 2019-04-11 加入

是一枚,热爱技术,天赋不高,又有点轴,的猿。。

评论

发布
暂无评论
Kafka+PostgreSql,构建一个总线服务_postgresql_为自己带盐_InfoQ写作社区