目录
官网
& | | | | |
1 Hello World!
RabbitMQ所需依赖
org.slf4j slf4j-simple 1.7.25 test org.slf4j slf4j-api 1.7.25 com.rabbitmq amqp-client 5.6.0
1.1 生产者demo producer
package com.smallShen.distributed.rmq.cpt01HelloWorld;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class Send { private static final String QUEUE_NAME = "HelloWorld RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 1000; i++) { String msg = "Hello World, your RabbitMQ! " + i; // String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .deliveryMode(2)// 2持久化 .build(); channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes()); System.out.println(String.format("Producer Sent msg[%s] at %s", msg, new Date())); } } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
1.2 消费者demo consumer
package com.smallShen.distributed.rmq.cpt01HelloWorld;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class Receive { private static final String QUEUE_NAME = "HelloWorld RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("Waiting for msg, To exit press Crtl+C"); DeliverCallback deliverCallback = new DeliverCallback() { public void handle(String consumerTag, Delivery delivery) throws IOException { String msg = new String(delivery.getBody(), "UTF-8"); System.out.println(msg); System.out.println(String.format("Receive msg[%s] at %s", msg, new Date())); } }; // String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback() { public void handle(String consumerTag) throws IOException { System.out.println("Receive cancel!"); } }); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
1.3 查看queue队列中的信息
页面查看,可看到有4条消息
, 用户名密码默认为guest
;打开页面客户端
命令查看
Linux版本命令:sudo rabbitmqctl list_queues
rabbitmqctl.bat list_queues
,如下可看到queue name为hello
的message有4条; C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmqctl.bat list_queuesTimeout: 60.0 seconds ...Listing queues for vhost / ...name messageshello 4C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>
2 Work queues
2.1 消费者设置消息回执
autoAck=false
,执行任务需要花费一些时间,如果消费者客户端在处理一个需要较长时间完成的消息时,中途程序中断了。当前默认配置下,RabbitMQ在将消息分发给一个消费者后,会直接将消息删除。在这种情况下,不仅仅是当前还没处理完的这条消息,已经分发给当前消费者的其余消息也会随之丢失。但我们并不想要丢失消息,为了避免这种情况,我们可以设置手工回执。当将autoAck
设置为false
后,RabbitMQ服务端在将消息分发给消费者后,会将消息标记为Unacked
状态,这在UI页面也可以看得到。如果没有收到回执,且消费者连接断开没有心跳的情况下,RabbitMQ会将消息回收重新发送给新的消费者。
Linux : sudo rabbitmqctl list_queues name messages_ready messages_unacknowledgedWindow : rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgede.gC:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgedTimeout: 60.0 seconds ...Listing queues for vhost / ...name messages_ready messages_unacknowledgedHelloWorld RMQ 0 662C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>
code:
DeliverCallback deliverCallback = new DeliverCallback() { public void handle(String consumerTag, Delivery delivery) throws IOException { String msg = new String(delivery.getBody(), "UTF-8"); System.out.println(String.format("Receive msg[%s] at %s", msg, new Date())); try { doWorker(msg); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(String.format("Receive msg[%s] and finish at %s", msg, new Date())); // 当关闭了自动回执,必须添加手动回执 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); } } }; // String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback // 关闭自动回执,启用手工回执 Boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, new CancelCallback() { public void handle(String consumerTag) throws IOException { System.out.println("Receive cancel!"); } });
2.2 队列持久化及消息持久化
durable = true
,当RabbitMQ服务器宕机,如果没有设置队列及消息的持久化,重启之后消息就会丢失;
// 设置队列持久化,RabbitMQ宕机或重启队列不会丢失 boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);//消息持久化 import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // or // String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body AMQP.BasicProperties props = new AMQP.BasicProperties().builder() // 2 消息持久化,RabbitMQ服务宕机或者重庆,消息不会删除 // 可满足一般情况,但并不能保证100%数据不丢失,因为假如生产者将消息推送至队列Queue,放入RabbitMQ内存中,还没有进行磁盘保存时服务宕机,内存中的这部分数据就丢失了 .deliveryMode(2) .build(); channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes());
2.3 多消费者同时消费,公平分发资源合理利用
当多消费者客户端消费同一队列中的消息时,默认会按消息数量均衡分配,比如队列中1000条消息,有A、B、C、D4个消费者,RabbitMQ会为每个消费者分配250条消息;
但是当1000条消息中有些消息的处理逻辑比较复杂,耗时较大,就可能造成收到该类型消息的消费者A处理250条消息需要花费很长时间,而此时B、C、D消费者已经早早处理完他们自己的消息,闲置了N久; 针对这种情况可以在消费者客户端中使用basicQos
设置prefetchCount = 1
,告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息; code // 告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息; // 如果不配置此参数,队列中的消息会均衡的分配给所有消息者,不论消费者的处理速度快慢,每个消费者接收到的消息数量是大致一致的 // 比如 100条消息, 消费者A,B,C每个接收33-34条,但A处理消息效率很慢,B,C很快,就会有B,C很快就将消息处理完毕开始闲置,而A可能还需要很久才能处理完毕; // 此时配置 prefetchCount 来提高利用率 int prefetchCount = 1; channel.basicQos(prefetchCount);
2.4 完整的生产者消费者代码
生产者:
package com.smallShen.distributed.rmq.cpt02WorkQueue;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class NewTask { private static final String QUEUE_NAME = "Work Queue RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments // 队列持久化,RabbitMQ服务宕机或者重庆,队列不会删除 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); for (int i = 0; i < 100; i++) { String msg = args != null && args.length > 0 ? String.join(" ", args) + i : "Hello World, your RabbitMQ! " + i; // String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body AMQP.BasicProperties props = new AMQP.BasicProperties().builder() // 2 消息持久化,RabbitMQ服务宕机或者重庆,消息不会删除 // 可满足一般情况,但并不能保证100%数据不丢失,因为假如生产者将消息推送至队列Queue,放入RabbitMQ内存中,还没有进行磁盘保存时服务宕机,内存中的这部分数据就丢失了 .deliveryMode(2) .build(); channel.basicPublish("", QUEUE_NAME, false, false, props, msg.getBytes()); //channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); System.out.println(String.format("Producer send msg[%s] at %s", msg, new Date())); } } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
消费者
package com.smallShen.distributed.rmq.cpt02WorkQueue;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class Receive { private static final String QUEUE_NAME = "Work Queue RMQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); System.out.println("Waiting for msg, To exit press Crtl+C"); // 告诉RabbitMQ每次只给消费者1条消息进行处理,换句话说,如果前一条消息的回执ack没有收到,不再给这个消费者推送消息; // 如果不配置此参数,队列中的消息会均衡的分配给所有消息者,不论消费者的处理速度快慢,每个消费者接收到的消息数量是大致一致的 // 比如 100条消息, 消费者A,B,C每个接收33-34条,但A处理消息效率很慢,B,C很快,就会有B,C很快就将消息处理完毕开始闲置,而A可能还需要很久才能处理完毕; // 此时配置 prefetchCount 来提高利用率 int prefetchCount = 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = new DeliverCallback() { public void handle(String consumerTag, Delivery delivery) throws IOException { String msg = new String(delivery.getBody(), "UTF-8"); System.out.println(String.format("Receive msg[%s] at %s", msg, new Date())); try { doWorker(msg); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(String.format("Receive msg[%s] and finish at %s", msg, new Date())); // 当关闭了自动回执,必须添加手动回执 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); } } }; // String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback // 关闭自动回执,启用手工回执 Boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, new CancelCallback() { public void handle(String consumerTag) throws IOException { System.out.println("Receive cancel!"); } }); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void doWorker(String msg) throws InterruptedException { for (char c : msg.toCharArray()) { if (c == '.') { TimeUnit.SECONDS.sleep(1); } } }}
3 Publish/Subscribe
3.1交换机
在RabbitMQ中,生产者从来不会直接将消息发送至队列Queue中,实际上,生产者连消息该发给那个队列都不知道,生产者只是将消息发送给交换机EXCHANGE。
Exchange交换机有两个作用,一是接收生产者发送的消息,二是将消息放入队列Queue中。交换机必须清楚接收到的消息该如何处理,是放到指定的队列 或者放到多个队列 或是 直接丢弃。 交换机有如下四种类型:public enum BuiltinExchangeType { DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); ... direct : 直连类型的交换机,完全匹配队列名称,然后将消息放入该队列 fanout : 广播类型的交换机,发布/订阅,1条消息同时发布给绑定到该交换机的多个队列 topic : 正则模式匹配的交换机,类似正则的规则模糊匹配队列,然后将消息放入这些匹配到的队列 headers : 很少使用
可以通过UI页面或者sudo rabbitmqctl list_exchanges
命令来查看有哪些交换机
3.2 临时队列
在创建队列的时候,我们可以指定队列名称,也可以通过String queueName = channel.queueDeclare().getQueue();
方式来创建,这样创建的队列由RabbitMQ随机指定名称,且是一种 非持久化,排他,自动删除的 临时队列。
String queueName = channel.queueDeclare().getQueue(); 临时队列随机名称类似: amq.gen-JzTY20BRgKO-HjmUJj0wLg
3.3广播Fanont类型交换机使用
发布Pub端
package com.smallShen.distributed.rmq.cpt03PubSub;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class ProducerPub { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); for (int i = 0; i < 10; i++) { String msg = "Producer publish msg " + i; // fanout类型的交换机,不需要指定路由关键字 routingKey channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println(String.format("Producer send msg[%s] success at %s", msg, new Date())); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
订阅Sub端
package com.smallShen.distributed.rmq.cpt03PubSub;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class ConsumerSub { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明队列时候不适用参数,表示RabbitMQ创建一个默认 非持久化, 排他, 自动删除的队列 String queueName = channel.queueDeclare().getQueue(); // 广播fanout类型的交换机,不需要指定路由关键字routingKey channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer accept msg[%s] at %s", new String(delivery.getBody()), new Date())); } }; //(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
4 Routing
之前一章使用的是fanout类型的交换机,生产者Producer将消息推送至交换机Exchange,这种模式不需要指定路由关键字Routing Key
,消息将推送给所有和该交换机绑定的队列Queue
中去,被消息者消费;
4.1 路由关键字 Routing, 绑定关键字 Binding
生产者发布消息,将消息推送至交换机,需要指定路由关键字routingKey
// 生产者 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8"));
消费者在接收消息时,是从队列中读取,队列中的消息是由交换机分发过来的,队列从交换机接收哪些类型的消息 也需要通过指明关键字routingKey来绑定
// 消费者 channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey);
当使用direct
直连类型的交换机时,消息发送的路由关键字必须和消息接收的绑定关键字一致。
4.2 直连类型交换机demo
生产者demo
package com.smallShen.distributed.rmq.cpt04Routing;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String routingKey = "log"; // "error", "warn", "info" for (int i = 0; i < 3; i++) { String msg = String.format("This is a msg, type[%s], send to exchange[%s]", routingKey, EXCHANGE_NAME) + " " + i; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8")); System.out.println(String.format("Producer send msg[%s] success", msg)); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
消费者demo
第一个消费者,绑定info | warn | log
三种关键字,三种类型的消息都能接收到
package com.smallShen.distributed.rmq.cpt04Routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class ConsumerCommon { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { String[] routingKeys = new String[3]; routingKeys[0] = "log"; routingKeys[1] = "info"; routingKeys[2] = "warn"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String defaultQueueName = channel.queueDeclare().getQueue(); for (String routingKey : routingKeys) { // 一个队列绑定多个路由关键字 System.out.println(String.format("Queue[%s] is binging exchange[%s] with key[%s]", defaultQueueName, EXCHANGE_NAME, routingKey)); channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer bind key[%s] accept msg[%s] at %s", String.join("|", routingKeys), new String(delivery.getBody(), "UTF-8"), new Date())); } }; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
第二个消费者,绑定了error
类型的关键字,只要发送消息时指定路由关键字是error
的才能接收到
package com.smallShen.distributed.rmq.cpt04Routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;public class ConsumerError { private static final String EXCHANGE_NAME = "MY_DIRECT_EXCHANGE"; public static void main(String[] args) { String[] params = new String[1]; params[0] = "error"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String defaultQueueName = channel.queueDeclare().getQueue(); for (String routingKey : params) { // 一个队列绑定多个路由关键字 System.out.println(String.format("Queue[%s] is binging exchange[%s] with key[%s]", defaultQueueName, EXCHANGE_NAME, routingKey)); channel.queueBind(defaultQueueName, EXCHANGE_NAME, routingKey); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer bind key[%s] accept msg[%s] at %s", String.join("|", params), new String(delivery.getBody(), "UTF-8"), new Date())); } }; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
5 Topics
5.1 Topic Exchange
第4章提到了Direct Exchange
直连类型的交换机,必须生产者routingKey
和消费者bindingKey
完全匹配才能互相通信,发送消息;Topic Exchange
正则类型的交换机可以提供更多的选择性,使用类似正则的方式匹配路由关键字routingKey
和绑定关键字bindingKey
,正则模糊匹配到后即可通信;
* 匹配一个单词 # 匹配0到多个单词 . 单词之间使用.分割如上图, 队列Q1绑定
*.orange.**
,队列Q2绑定*.*.rabbit
和lazy.#
: 当生产者发送消息指定路由关键字是quick.orange.rabbit
时,Q1,Q2都可匹配; 当生产者发送消息指定路由关键字是lazy.orange.elephant
时,Q1,Q2都可匹配; 当生产者发送消息指定路由关键字是quick.orange.fox
时,Q1可匹配; 当生产者发送消息指定路由关键字是lazy.brown.fox
时,Q2可匹配; 当生产者发送消息指定路由关键字是lazy.brown.fox
时,Q2可匹配; 当生产者发送消息指定路由关键字是lazy.pink.rabbit
时,Q2都可匹配,且只匹配1次; 当是quick.orange.male.rabbit
时,没有匹配队列,消息丢失。 5.2 Topic Exchange Demo
生产者 demo
package com.smallShen.distributed.rmq.cpt05Topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;/** * 发送消息 : * 速度 + 颜色 + 动物种类 * 格式 : ". . " * 队列1 接收 : * 红色的 (*.red.* ) * 队列2 接收 : * 兔子类型的消息 (*.*.rabbit) * 速度比较慢的消息 (lazy.#) */public class ProducerSend { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setUsername("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 路由关键字// String routingKey = "lazy.red.rabbit"; // 两个队列都能收到// String routingKey = "lazy.RED.rabbit"; // 只有队列2收到// String routingKey = "lazy.yellow.tiger.run"; // 队列2收到 String routingKey = "fast.yellow.tiger.run"; // 都收不到,丢弃 String msg = String.format("This is a %s, la la la", routingKey); channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println(String.format("Producer send msg[%s] to exchange[%s] with routingKey[%s] at %s", msg, EXCHANGE_NAME, routingKey, new Date())); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
消费者 demo
队列1
package com.smallShen.distributed.rmq.cpt05Topic;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;/** * 发送消息 : * 速度 + 颜色 + 动物种类 * 格式 : ". . " * 队列1 接收 : * 红色的 (*.red.* ) * 队列2 接收 : * 兔子类型的消息 (*.*.rabbit) * 速度比较慢的消息 (lazy.#) */public class ConsumerAccept01 { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { String[] bindingKeys = new String[1]; bindingKeys[0] = "*.red.*"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setUsername("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 使用默认的队列 String defaultQueueName = channel.queueDeclare().getQueue(); for (String bindingKey : bindingKeys) { channel.queueBind(defaultQueueName, EXCHANGE_NAME, bindingKey); System.out.println(String.format("Queue[%s] bind exchange[%s] with bindingKey[%s]", defaultQueueName, EXCHANGE_NAME, bindingKey)); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer accept msg[%s] from queue[%s] at %s", new String(delivery.getBody(),"UTF-8"), defaultQueueName, new Date())); } }; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
队列2
package com.smallShen.distributed.rmq.cpt05Topic;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Date;import java.util.concurrent.TimeoutException;/** * 发送消息 : * 速度 + 颜色 + 动物种类 * 格式 : ". . " * 队列1 接收 : * 红色的 (*.red.* ) * 队列2 接收 : * 兔子类型的消息 (*.*.rabbit) * 速度比较慢的消息 (lazy.#) */public class ConsumerAccept02 { private static final String EXCHANGE_NAME = "TOPIC_EXCHANGE"; public static void main(String[] args) { String[] bindingKeys = new String[2]; bindingKeys[0] = "*.*.rabbit"; bindingKeys[1] = "lazy.#"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setUsername("guest"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 使用默认的队列 String defaultQueueName = channel.queueDeclare().getQueue(); for (String bindingKey : bindingKeys) { channel.queueBind(defaultQueueName, EXCHANGE_NAME, bindingKey); System.out.println(String.format("Queue[%s] bind exchange[%s] with bindingKey[%s]", defaultQueueName, EXCHANGE_NAME, bindingKey)); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(String.format("Consumer accept msg[%s] from queue[%s] at %s", new String(delivery.getBody(),"UTF-8"), defaultQueueName, new Date())); } }; channel.basicConsume(defaultQueueName, true, deliverCallback, consumerTag -> {}); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
6 RPC
RabbitMQ实现RPC思路:建立两个队列Queue,通过第一个队列RPC客户端将消息发送至RPC服务器端,服务器端执行逻辑完毕将结果返回至第二个队列,供RPC客户端消费;其中RPC客户端每次发送请求,需要把消息内容,返回队列及信息唯一标识ID一起发送出去,RPC服务端处理完毕也根据这些发送来的额外信息将消息分发至指定队列;
6.1 RPC Client
package com.smallShen.distributed.rmq.cpt06RPC;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable { private Connection connection; private Channel channel; private String requestQueueName = "rpc_sample_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueueresponse = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); }}
6.2 RPC Server
package com.smallShen.distributed.rmq.cpt06RPC;import com.rabbitmq.client.*;public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_sample_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { System.out.println(delivery.getProperties().getCorrelationId()); System.out.println(delivery.getProperties().getReplyTo()); AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }}