前言
在如今的互联网时代,消息队列中间件已然成为了分布式系统中重要的组件,它可以解决应用耦合,异步消息,流量削峰等应用场景。借助消息队列,我们可以实现架构的高可用、高性能、可伸缩等特性,是大型分布式系统架构中不可或缺的中间件。
目前比较流行的消息队列中间件主要有:RabbitMQ, NATS, Kafka, ZeroMQ, Amazon SQS, ServiceStack, Apache Pulsar, RocketMQ, ActiveMQ, IBM MQ等等。
本文主要为大家分享的是在.NET 5应用程序中使用消息中间件RabbitMQ的示例教程。
准备工作
在开始本文实战之前,请准备以下环境:
- 消息中间件:RabbitMQ
- 开发工具:Visual Studio 2019或VS Code或Rider
笔者使用的开发工具是Rider 2021.1.2。
准备解决方案和项目
创建项目
打开Rider,创建一个名为RabbitDemo的解决方案,再依次创建三个基于.NET 5的项目,分别为:RabbitDemo.Shared
, RabbitDemo.Send
以及RabbitDemo.Receive
。
- RabbitDemo.Shared 项目主要用于存放共用的RabbitMQ的连接相关的类;
- RabbitDemo.Send 项目主要用于模拟生产者(发布者);
- RabbitDemo.Receive 项目主要用于模拟消费者(订阅者)
安装依赖包
首先,在以上创建的三个项目中分别使用包管理工具或者命令行工具安装RabbitMQ.Client
依赖包,如下:
编写RabbitDemo.Shared项目
RabbitDemo.Shared项目主要用于存放共用的RabbitMQ的连接相关的类。这里我们创建一个RabbitChannel
类,然后在其中添加创建一些连接RabbitMQ相关的方法,包括初始化RabbitMQ的连接,关闭RabbitMQ连接等,代码如下:
编写消息生产者
在项目RabbitDemo.Send中,引用项目RabbitDemo.Shared
,然后创建一个名为Send.cs
类,并在其中编写生产者的代码,如下:
以上示例模拟的是一个生产者一次生产了5条消息,并将消息存储到了RabbitMQ的消息队列中。
修改此项目中的Program.cs
代码如下:
编写消息消费者
在项目RabbitDemo.Receive中,引用项目RabbitDemo.Shared
,然后创建一个名为Receive.cs
的类,并在其中编写消费者的代码,如下:
修改此项目中的Program.cs
代码如下:
运行
分别生成和运行生产者和消费者项目,运行效果如下:
从上图可以看出,整个演示过程,RabbitMQ的消息消息是非常即时的,消费者几乎可以实时地消费生产者生产的消息。
需要确认的消息队列
在上面的生产者/消费者示例中,消息一经消费者消费,RabbitMQ会立即将消息从队列中移除。但在某些场景中,我们需要消费者确认消息被正确消费后再将其从队列中移除,RabbitMQ提供了消费确认的功能,下面我们来使用示例演示。
首先在RabbitDemo.Send中创建名为Worker.cs
的类,并编写如下代码:
此示例模拟生和了5条消息,并将消息存放到了RabbitMQ的task_queue
队列中,其中我们还通过QueueDeclare()
方法的arguments
参数设置了队列的优先级,也通过basicProperties
参数添加了自定义的消息头(Header)参数order-no
。
接着,在项目RabbitDemo.Receive项目中创建一个名为Task.cs
的类,并编写如下的消费者代码:
在这段消费者代码中,我们主要关注的是如何向RabbitMQ确认消息的正确消费,与上面的Receive.cs
消费者相比,此示例中设置了
这里的设置表示:一个消费者最多只能一次拉取1条消息
和
这条语句表示消息已确认被正常消费。
修改生产者的Program.cs
:
修改消费的Program.cs
:
效果如下:
版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。
发表评论
登录用户才能发表评论, 请 登 录 或者 注册