学习目的
RabbitMQ是AMQP的一个实现。我们将跟随rabbitMQ官网上的例子,了解AMQP所支持的不同消息队列模式,以及代码如何实现。
安装RabbitMQ Server
Ubuntu自带的RabbitMQ Server,安装如下
sudo apt-get install rabbitmq-server sudo service rabbitmq-server start sudo rabbitmq-plugins enable rabbitmq_management 开启管理web sudo service rabbitmq-server restart sudo rabbitmqctl add_user test 123456 添加用户test,密码为123456 sudo rabbitmqctl set_user_tags teset administrator 将test为设置为administrator sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*" 设置权限
我们打开http://localhost:15672/就可以进行rabbitmq的web界面,可以用刚配置的账号进行登录,也可以用guest进行登录。缺省的guest也是一个administrator,我们可以用它在web登录,在web上创建用户。基于安全,我们可以将缺省的guest用户删除。
sudo rabbitmqctl delete_user guest
使用RabbitMQ的java client
在pom.xml中加入
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
不同的队列模式
极简模式,类似FIFO
网上资料众多,我们通过抓包来了解一下各个步骤。client和server之间有多种队列模式,我们先通过最简单的方式来看看client和server之间的交互,并对一些概念有初步的了解。具体请参阅 [1]
简单模式:发布者将消息放入指定的queue,接收者将消息从指定的queue中取走
publisher
/* 【1】和client和Server之间建立连接
* client -> server:Protocol-Header 9-1,告知client支持的min version =1, max version =9
* client <- server:Connection Start,给出server的一些情况,包括支持的验证的mechanism为AMQPLAIN和PLAIN
* client -> server:Connection Start-OK,根据相关的账号密码,以及mechanism(小例子为PLAIN),需要注意:不要使用guest/guest账号,会报告(Login was refused using authentication mechanism PLAIN)。我们无法使用guest/guest账号在页面上登录,故认为guest的密码并发是guest,我们可以重设这个密码,或者简单地采用其他用户账号登陆即可。
* client <- server:Connection Tune,给出tune信息,如channel-max(0),frame-max(131072),heartbeat(60)
* client -> server:Connection Tune-OK
* client -> server:Connection Open
* client <- server:Connection Open-OK */
//缺省的port为5671,virtualHost为/,我们也可以通过factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost")来一次性设置。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("191.8.1.107");
factory.setUsername("test");
factory.setPassword("123456");
Connection connection = factory.newConnection();
//【2】创建channel。一个连接中,可以多条channel。从编程的角度看,对每条channel我们可以设置其某些属性,这样就不需要为每次发送或者接收进行设置。
Channel channel = connection.createChannel();
//【3】声明队列:声明队列发布到那个queue,Rabbitmq Server中的队列是client创建的,如果不存在,将创建该队列。一个channel可以发布到若干给队列,根据队列的名字(routing_key)路由,这里private final static String QUEUE_NAME = "hello";这里建立一个queue,queue的名字为“hello”,在direct模式下,只有接收者route-id也为hello才会接收到。一旦接收,消息就从队列中删除。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//【4】发布消息 ,当basicPublish第一个参数exchange为"",简单模式不设置此参数,第二参数为routekey,表明将放在队列hello中。
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
... ...
channel.close();
connection.close();
deliver
接收方的代码小例子如下
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("191.8.1.107");
factory.setUsername("test");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//这里仍需要declare,如果recv的启动在send之前,而server上没有"hello"的队列,会报错
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
// handleDelivery是一个异步触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
log.info(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer); //Start a non-nolocal, non-exclusive consumer
...同样地,结束时应该关闭
channel.close();
connection.close();
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。