RabbitMQ 基础教程
RabbitMQ 是比较常用的消息队列中间件,在分布式系统的开发中经常会使用到,使用RabbitMQ的好处主要是
- 异步处理
- 日志处理
- 应用解耦
- 流量削峰
Rabbit的安装配置
emmm....这个可以自己百度或者查看官方文档,我是使用的docker容器 {% post_link docker-安装rabbitMQ %}
RabbitMQ 简单示例
{% asset_img simple.png %}
p : 生产者
中间为消息队列
c : 消费者
生产者
/////////////////////创建连接和channel/////////////////////
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
///////////////////////////////////////////////////////
///////////////////所有的操作都是基于channel//////////////
//声明消息队列 QUEUE_NAME为消息队列名
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
消费者
/////////////////////创建连接和channel/////////////////////
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
///////////////////////////////////////////////////////
//声明消息队列 QUEUE_NAME为消息队列名
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用DefaultConsumer 定义消息消费方式
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//设置channel消费的消息队列,打开自动应答模式,消费处理方式
channel.basicConsume(QUEUE_NAME, true, consumer);
RabbitMQ 工作队列
{% asset_img java-two.png %}
- 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将获取任务并最终执行作业。当您运行许多工作程序时,它们之间将共享任务。
轮询模式
-消费者代码
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
//执行相关业务逻辑,这里会耗费一定的时间
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // 应答模式
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
我们多运行几个消费者,每个消费者耗费不同时间,但是每个消费者受到的消息数量没有太多差别,应该RabbitMQ采用轮询模式,挨个发送,并不会管你时间耗费的多少。
消息应答
RabbitMQ默认消费者接受到消息后删除消息,如何消息消费失败,那么这个消息就丢失了,比如消费者受到消息后突然挂掉。消息应答可以防止这种情况。
- 消费者代码
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
//执行相关业务逻辑,这里会耗费一定的时间
doWork(message);
} finally {
System.out.println(" [x] Done");
//任务完成后告诉RabbitMQ
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false; // 自动应答模式关闭
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
公平分发
- 生产者
设置一次只对一个消费者发送一个消息,只有消息被消费者消费后,才会给他分发下一条消息,结合上面的消息应答使用
//设置消费者接受消息数量
int prefetchCount = 1;
channel.basicQos(prefetchCount);
消息持久化
RabbitMQ消息保存在内存,一旦重启消息会丢失。
- 消息队列声明时设置
//设置是否持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
RabbitMQ 发布/订阅模式
在上面的模式中,一条消息只会被一个消费者消费,如果我们需要多个消费者消费同一条消息,就需要发布/订阅模式,所有订阅了的消费者都会收到相同的消息。比如APP推送消息的时候,很多用户都是收到了一样的消息。
模式图:
{% asset_img exchanges-example.png %}
{% asset_img exchanges.png %}
在这种模式下,消息提供者会将消息发送到
exchanges(交换机)
,再由交换机根据指定的策略来进行转发,有下面几种策略fanout,direct,topic,header
,而订阅模式就是fanout
。
- 提供者
//我们声明一个交换机,使用fanout策略,命名为logs。
channel.exchangeDeclare("logs", "fanout");
//第一个参数为交换机
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机 发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//获取一个队列
String queueName = channel.queueDeclare().getQueue();
//绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
RabbitMQ 路由模式
相比订阅模式,路由模式会从绑定的消息队列中选择拥有对应路由键的消息队列转发.
direct
就是路由模式。
模式
{% asset_img route-example.png %}
{% asset_img route.png %}
- 提供者改变
//声明路由模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//severity 就是路由键,只有设置了对应的路由键的绑定队列才可以收到
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
- 消费者改变
//声明路由模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
//绑定队列到交换机,并且设置路由键
channel.queueBind(queueName, EXCHANGE_NAME, severity);
///////////////////多个路由键直接重复设置即可//////////////////////
//channel.queueBind(queueName, EXCHANGE_NAME, severity1);
//channel.queueBind(queueName, EXCHANGE_NAME, severity2);
//channel.queueBind(queueName, EXCHANGE_NAME, severity3);
/////////////////////////////////////////////////////////////////
官方代码
- 提供者
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
//..
}
- 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
RabbitMQ 主题模式
发送到主题交换机的消息不能具有任意
routing_key
- 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密钥中可以包含任意数量的单词,最多可达255个字节。
- 主题模式和路由模式没有太多区别,主要就是加入了通配符的功能,
#
匹配多个,*
匹配一个。
模式
{% asset_img topic-example.png %}
{% asset_img topic.png %}
官方代码
- 提供者
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
//routingkey可以设置
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
connection.close();
}
//...
}
- 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
//设置队列具体监听的主题bindingKey 比如logs.info 或者logs.#等
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
Q.E.D.
Comments | 0 条评论