目录
1. 模型简介
2. 示例流程
2.1 生产者发送消息
2.2 消费者获取消息(自动ACK)
2.3 消息确认机制(ACK)
2.3.1 消费者获取消息(手动ACK)
2.3.2 自动ACK存在的问题
2.3.3 手动ACK
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
- P(producer/ publisher):生产者,一个发送消息的用户应用程序。
- C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
- 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
我们将用Java编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将详细介绍Java API中的一些细节,这是一个消息传递的“Hello!! World!!!”。
我们将调用我们的消息发布者(发送者)Send和我们的消息消费者(接收者)Recv。发布者将连接到RabbitMQ,发送一条消息,然后退出。
ample.demorabbitmq.simple;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 生产者*/
public class Send {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = ateChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, Bytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
采用自动ACK应答机制
ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
这需要看消息的重要性:
ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者,手动进行ACK*/
public class Recv2 {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道final Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.DeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACK // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。则需要手动ACK。channel.basicConsume(QUEUE_NAME, false, consumer);}
}
运行消费者时,如果消费者程序抛出异常,但是消息依然被消费。
ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);int i = 100/0; //todo 异常抛出点System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
修改消费者Recv2,把自动改成手动
ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者,手动进行ACK*/
public class Recv2 {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道final Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);int i = 100/0;System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.DeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}
运行生产者send代码:
运行Recv2代码:
管理界面:
没有被消费掉。
本文发布于:2024-02-04 17:16:18,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170712536157712.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |