📖 目录
RabbitMQ 基础概念
什么是 RabbitMQ?
RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它用于在分布式系统中传递消息,支持多种消息传递模式。
核心特性
- 可靠性:支持消息持久化、确认机制
- 灵活的路由:支持多种 Exchange 类型
- 集群支持:支持高可用集群部署
- 管理界面:提供 Web 管理界面
- 多协议支持:支持 AMQP、MQTT、STOMP 等
应用场景
- 异步处理:解耦系统,异步处理任务
- 应用解耦:系统间通过消息通信
- 流量削峰:缓冲突发流量
- 日志收集:集中式日志处理
- 任务队列:分布式任务调度
安装和启动
bash
# Ubuntu/Debian
sudo apt update
sudo apt install rabbitmq-server
# CentOS/RHEL
sudo yum install rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
# 停止服务
sudo systemctl stop rabbitmq-server
# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
# 访问管理界面
# http://localhost:15672
# 默认用户名/密码: guest/guest核心概念
Producer(生产者)
发送消息的应用程序。
Consumer(消费者)
接收消息的应用程序。
Queue(队列)
存储消息的缓冲区,类似于邮箱。
Exchange(交换机)
接收生产者发送的消息,并根据路由规则将消息路由到队列。
Binding(绑定)
连接 Exchange 和 Queue 的规则。
Routing Key(路由键)
生产者发送消息时指定的键,用于路由消息。
Connection(连接)
应用程序与 RabbitMQ 服务器之间的 TCP 连接。
Channel(通道)
连接中的虚拟连接,用于执行操作。
Exchange 类型
1. Direct Exchange(直连交换机)
路由键完全匹配时,消息被路由到队列。
java
// 声明 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");
// 绑定队列
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");2. Fanout Exchange(扇出交换机)
将消息路由到所有绑定的队列,忽略路由键。
java
// 声明 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");
// 绑定队列(路由键被忽略)
channel.queueBind(queueName, "logs", "");3. Topic Exchange(主题交换机)
根据路由键模式匹配,将消息路由到队列。
java
// 声明 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");
// 绑定队列(支持通配符)
channel.queueBind(queueName, "topic_logs", "*.error");
channel.queueBind(queueName, "topic_logs", "system.*");路由键模式:
*:匹配一个单词#:匹配零个或多个单词
4. Headers Exchange(头交换机)
根据消息头属性路由,忽略路由键。
java
// 声明 Headers Exchange
channel.exchangeDeclare("headers_logs", "headers");
// 绑定队列(匹配头属性)
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all"); // all 或 any
headers.put("level", "error");
headers.put("source", "system");
channel.queueBind(queueName, "headers_logs", "", headers);连接和通道
创建连接
java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();创建通道
java
import com.rabbitmq.client.Channel;
Channel channel = connection.createChannel();关闭连接
java
channel.close();
connection.close();Producer(生产者)
基本发送消息
java
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");发送到 Exchange
java
// 声明 Exchange
channel.exchangeDeclare("logs", "fanout");
// 发送消息
String message = "Hello World!";
channel.basicPublish("logs", "", null, message.getBytes());消息持久化
java
// 声明持久化队列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
// 发送持久化消息
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());设置消息属性
java
import com.rabbitmq.client.AMQP.BasicProperties;
BasicProperties props = new BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2) // 持久化
.priority(5)
.messageId("msg-001")
.timestamp(new Date())
.build();
channel.basicPublish("", "queue", props, message.getBytes());Consumer(消费者)
基本消费消息
java
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 创建消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});手动确认
java
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 处理消息
doWork(message);
} finally {
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume("task_queue", autoAck, deliverCallback, consumerTag -> {});拒绝消息
java
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
// 拒绝消息并丢弃
channel.basicNack(deliveryTag, false, false);
// 拒绝单条消息
channel.basicReject(deliveryTag, true); // true=重新入队预取数量
java
// 设置预取数量(QoS)
int prefetchCount = 1;
channel.basicQos(prefetchCount);队列和绑定
声明队列
java
// 基本队列
channel.queueDeclare("hello", false, false, false, null);
// 持久化队列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
// 排他队列(仅当前连接可见)
boolean exclusive = true;
channel.queueDeclare("exclusive_queue", false, exclusive, false, null);
// 自动删除队列(无消费者时删除)
boolean autoDelete = true;
channel.queueDeclare("temp_queue", false, false, autoDelete, null);绑定队列到 Exchange
java
// 绑定到 Direct Exchange
channel.queueBind(queueName, "direct_logs", "error");
// 绑定到 Fanout Exchange
channel.queueBind(queueName, "logs", "");
// 绑定到 Topic Exchange
channel.queueBind(queueName, "topic_logs", "*.error");解绑队列
java
channel.queueUnbind(queueName, "logs", "");消息确认
发布确认(Publisher Confirms)
java
// 启用发布确认
channel.confirmSelect();
// 等待确认
channel.waitForConfirms();
// 异步确认
channel.addConfirmListener((sequenceNumber, multiple) -> {
// 消息已确认
}, (sequenceNumber, multiple) -> {
// 消息被拒绝
});消费者确认
java
// 自动确认(不推荐)
boolean autoAck = true;
channel.basicConsume("queue", autoAck, deliverCallback, cancelCallback);
// 手动确认
boolean autoAck = false;
channel.basicConsume("queue", autoAck, deliverCallback, cancelCallback);
// 在回调中确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);常用命令
管理命令
bash
# 启动服务
sudo systemctl start rabbitmq-server
# 停止服务
sudo systemctl stop rabbitmq-server
# 查看状态
sudo rabbitmqctl status
# 查看队列
sudo rabbitmqctl list_queues
# 查看 Exchange
sudo rabbitmqctl list_exchanges
# 查看绑定
sudo rabbitmqctl list_bindings
# 查看连接
sudo rabbitmqctl list_connections
# 查看通道
sudo rabbitmqctl list_channels
# 查看消费者
sudo rabbitmqctl list_consumers
# 清除队列
sudo rabbitmqctl purge_queue queue_name
# 删除队列
sudo rabbitmqctl delete_queue queue_name用户管理
bash
# 添加用户
sudo rabbitmqctl add_user username password
# 删除用户
sudo rabbitmqctl delete_user username
# 修改密码
sudo rabbitmqctl change_password username new_password
# 设置用户标签
sudo rabbitmqctl set_user_tags username administrator
# 设置权限
sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
# 查看用户
sudo rabbitmqctl list_users集群管理
bash
# 加入集群
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@node1
sudo rabbitmqctl start_app
# 查看集群状态
sudo rabbitmqctl cluster_status
# 离开集群
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl start_app最佳实践
连接管理
- 复用连接:一个应用程序使用一个连接
- 通道复用:每个线程使用独立的通道
- 及时关闭:使用完毕后关闭通道和连接
队列设计
- 持久化队列:重要消息使用持久化队列
- 合理命名:使用有意义的队列名称
- 设置 TTL:为消息设置过期时间
消息处理
- 幂等性:确保消息处理是幂等的
- 错误处理:实现重试和死信队列
- 批量处理:合理使用批量操作
性能优化
- 预取数量:合理设置 QoS
- 消息大小:避免发送过大的消息
- 连接池:使用连接池管理连接
示例:完整的生产者
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Producer {
private static final String TASK_QUEUE = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明持久化队列
channel.queueDeclare(TASK_QUEUE, true, false, false, null);
String message = String.join(" ", argv);
// 发送持久化消息
channel.basicPublish("", TASK_QUEUE,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}示例:完整的消费者
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
private static final String TASK_QUEUE = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(TASK_QUEUE, true, false, false, null);
// 设置预取数量
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE, false, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) {
// 模拟工作
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}学习资源
💡 常用命令速查
bash
# 服务管理
sudo systemctl start rabbitmq-server
sudo systemctl stop rabbitmq-server
sudo rabbitmqctl status
# 队列管理
sudo rabbitmqctl list_queues
sudo rabbitmqctl purge_queue queue_name
sudo rabbitmqctl delete_queue queue_name
# Exchange 管理
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
# 用户管理
sudo rabbitmqctl add_user username password
sudo rabbitmqctl set_user_tags username administrator
sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
# 集群管理
sudo rabbitmqctl cluster_status