Work Queues:竞争消费模式(competing consumers pattern)

简单模式,也可以用于多个client(subsriber)去获取消息,和简单模式对比:
- 我们希望数据能够真正被client处理完,client处理好后应发送Basic.ACK消息给server,如果client因为某个原因未能处理好,当channel关闭,connection关闭或中断的时候,这个消息应该能够分配给其他的client进行处理。
- 我们希望一次只获取一个消息(或者若干),而不是将队列中所有消息都取走,这样才能更好地进行负载均衡。
- 我们希望在server重启后,未处理的消息仍然在队列中。
- 我们还将学习添加AMQP的header。
发布的代码,忽略了channel和connection的关闭。可参见 [1]
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //【3】参数2:durable true if we are declaring a durable queue (the queue will survive a server restart),这里满足需求:我们希望在server重启后,未处理的消息仍然在队列中。需要注意的是,如果已经声明的durable(true或者false),是不允许更改的,否则会报异常,也就是server中的queue一旦创建,这个属性是不能变更的。 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //【4】参数3:添加Content-Type为text/plain作为AMQP的头,演示了如何设置AMQP的header。 //【3】MessageProperties.PERSISTENT_TEXT_PLAIN表明消息是以文本的方式存放在磁盘来进行持续化。需要注意的是RabbitMQ的消息持续化并不很能确保,因此收到消息到写磁盘,有一定的时间差,如果这时候意外停掉,没有办法。如果需要很稳健地确保,需要某种方式向发布者返回确认,publish batches of messages and wait for outstanding confirms。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
接收的代码,忽略了channel和connection的关闭。我们通过sleep一段时间,来模拟处理消息所耗时间。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //【2】maximum number of messages that the server will deliver, 0 if unlimited,即每次只从server中获取一个消息。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); /* 小例子中,下面三个log的显示 * 11:11:33.903 [pool-2-thread-6] [INFO ] Envelope(deliveryTag=6, redeliver=false, exchange=, routingKey=task_queue-1) * 11:11:33.903 [pool-2-thread-6] [INFO ] #contentHeader<basic>(content-type=text/plain, content-encoding=null, headers=null, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) * 11:11:33.903 [pool-2-thread-6] [INFO ] properties - ContentType : text/plain */ log.info("Envelope : {}",envelope); log.info("properties : {}",properties); log.info("properties - ContentType : {}",properties.getContentType()); try{ sleep(message.length()); //模拟异步处理消耗一定的时间 }finally{ log.info(" [x] Done"); //【1】向Server回复了一个Basic.ACK的消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //【1】参数2为false:告知server,client处理完后,回复一个ACK channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

我们可以在管理网页上,或者通过命令行来查看没有收到ack的消息
rabbitmqctl list_queues name messages_ready messages_unacknowledged
RabbitMQ server会以轮询的机制,将消息均匀地发送到各个client,即便某个client的处理能力强,很快回复了Basic.ACK,仍如还是按round-robin的方式,依次发送,因为server并不是等待Basic.ACK后,才发生,而是收到message后,就根据round-robin的轮询方式发送,在收到Basic.ACK后,将消息从队列中删除。如果我们很明确知道某个client的处理能力很强,可以利用 channel.basicQos(2); ,每次分发两个消息给它,相当于weight=2。
Publish/Subscribe: Sending messages to many consumers at once
事实上,producer并没有将消息写到队列,而是发送到Exchange,由exchange决定将消息放入某个队列,某些队列,亦或者丢弃这个消息。exchange有几种模式: direct, topic, headers 和 fanout 。publish/subcribe模式使用的是fanout,即广播到绑定在该exchange的所有队列。
下面是发送和接收方的代码,同样的省略了channel和connection的关闭。也可参见 [2]
发送方:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建一个类型为FANOUT的exchange,名字为EXCHANGE_NAME //在前面两个例子中,exchange的名字设置为"",即使用缺省的exchange,也称为nameless exchange,将消息路由到和routing_key名字相同的队列中 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //发布到特定的exchange上,在fanout中,将忽略routint_key(队列名字),我们填入""即可。 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
接收方:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); /* 临时队列。本例将演示临时队列的生产(一个新的空队列,没有旧的消息),由server来给出随机的名字,并在client断开连接时,自动删除队列。不带参数的channel.queueDeclare()将满足这个需求。*/ String queueName = channel.queueDeclare().getQueue(); //名字例子:amq.gen-JzTY20BRgKO-HjmUJj0wLg /* Binging:即告知exchange将消息发往某个queue。在fanout的方式下,exchange会将消息发往所有绑定的队列。*/ channel.queueBind(queueName, EXCHANGE_NAME, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info(tag + " [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);
Routing:Receiving messages selectively

在发布/订阅模式上在进一步,队列只接受某发布的某类型消息。上图以log exchange为例,不同的队列接收不同的log级别。这就是路由模式。具体可参考 [3] 。
我们将重点学习 exchange模式为direct时,队列不仅与exchange关联,还与具体的routing_key绑定。
我们通过代码进一步学习,同样的忽略channel和connection的关闭,另外,也忽略了创建connection和创建channel的代码,这些代码和之前的无异。
private String[] testRoutingKey = {"error","info","error","warning","info"}; private void routingSend() throws Exception{ ... 创建connection和channel ... channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for(int i = 0; i <5 ; i ++){ Thread.sleep(2_000L); String message = "message-" + i + "-" + testRoutingKey[i]; log.info(" [x] Sent '" + message + "'"); channel.basicPublish(EXCHANGE_NAME, testRoutingKey[i], null, message.getBytes("UTF-8")); } } private void routingRecv(String...routingKeys ) throws Exception{ ... 创建connection和channel ... //exchange类型为direct,即会根据routing_key来判断送往哪个队列。之前学习的发布订阅模式采用的fanout是广播类型的,只要和该exchange绑定的队列,都会送往,因此会忽略routing_key。 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); for(String routingKey : routingKeys){ //同时绑定了exchange和routing_key,exchange会将该routing_key的消息送往该队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "', {}", properties); } }; channel.basicConsume(queueName, true, consumer); }
Topics: Receiving messages based on a pattern (topics)
topic模式和direct模式类似,但其提供了对routing_key的模糊匹配。routing_key的格式为A.B.C.D的格式,每个词中间用.分隔,模糊匹配支持*和#
- *表示一个词
- #表示零或者多个词
下面的代码和direct模式很是相似,同样的我们省略了channel和connection的关闭
发送的代码:
...略:创建connection和channel ... channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic"); channel.basicPublish(TOPIC_EXCHANGE_NAME, "quick.orange.fox", null, message1.getBytes("UTF-8")); //将被发送到Q1 channel.basicPublish(TOPIC_EXCHANGE_NAME, "lazy.pink.rabbit", null, message2.getBytes("UTF-8")); //将被发送到Q2
接收的代码,以Q2为例:
...略:创建connection和channel ... channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, TOPIC_EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, TOPIC_EXCHANGE_NAME, "lazy.#"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info(tag + " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "', {}", properties); } }; channel.basicConsume(queueName, true, consumer);
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。