例子:RPC的实现
这是一个通过AMQP使用RPC的例子,RPC是个同步的处理,需要等待响应。在实际应用需要特别消息,server可能性能很慢,server可能关闭。我们是否一定要使用RPC,是否可以用异步的方式替代。在此,我们只是演示如何利用RabbitMQ提供的消息管理来实现一个RPC。思路如下:
- RPC Server从一个队列rpc_queue中接收消息,RPC client将请求消息发往rpc_queue队列。
- 消息的Properties中Correlation-Id带有这个请求的UUID,用于和响应对应。
- 消息的Properties中Reply-to带有client希望从那个队列中接收这个响应。
- RPC的调用是同步的,通过BlockingQueue来实现这一个过程。
小例子将简单模拟Fibonacci函数
RPC Server的代码
public class FibonacciRPCServer { private static final Logger log = LogManager.getLogger(); private static final String RPC_QUEUE_NAME = "rpc_queue"; private Connection connection = null; private Channel channel = null; public FibonacciRPCServer() throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); connection = factory.newConnection(); channel = connection.createChannel(); } public void open() throws IOException { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); log.info(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { try { BasicProperties replyProps = new BasicProperties.Builder() .correlationId(properties.getCorrelationId()).build(); String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); log.info(" [.] fib(" + message + ")"); String response = String.valueOf(fib(n)); channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 需要应答的 } public void close(){ try { if(channel != null) channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } try{ if(connection != null) connection.close(); }catch (IOException e) { e.printStackTrace(); } } private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } }
RPC Client的代码
public class FibonacciRPCClient { private static final Logger log = LogManager.getLogger(); private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public FibonacciRPCClient() throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("191.8.1.107"); factory.setUsername("test"); factory.setPassword("123456"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws UnsupportedEncodingException, IOException, InterruptedException{ String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close(){ // ... 略 ...关闭channel和connection } }
上面,我们注意到
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
我们看一看抓包:
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。