(二)RabbitMQ消息模型——1基本消息模型

阅读: 评论:0

(二)RabbitMQ消息模型——1基本消息模型

(二)RabbitMQ消息模型——1基本消息模型

目录

1. 模型简介

 2. 示例流程

2.1 生产者发送消息

2.2 消费者获取消息(自动ACK)

2.3 消息确认机制(ACK)

2.3.1 消费者获取消息(手动ACK)

2.3.2 自动ACK存在的问题

2.3.3  手动ACK


1. 模型简介

 

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

  1. P(producer/ publisher):生产者,一个发送消息的用户应用程序。
  2. C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  3. 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

 

 2. 示例流程

我们将用Java编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将详细介绍Java API中的一些细节,这是一个消息传递的“Hello!! World!!!”。

我们将调用我们的消息发布者(发送者)Send和我们的消息消费者(接收者)Recv。发布者将连接到RabbitMQ,发送一条消息,然后退出。

2.1 生产者发送消息

 

 

 

 

ample.demorabbitmq.simple;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 生产者*/
public class Send {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = ateChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, Bytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}

 

2.2 消费者获取消息

采用自动ACK应答机制

ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}

我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.

 

2.3 消息确认机制(ACK)

 

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢?

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

这需要看消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

2.3.1 消费者获取消息(手动ACK)

ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者,手动进行ACK*/
public class Recv2 {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道final Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.DeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACK  // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。则需要手动ACK。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

2.3.2 自动ACK存在的问题

运行消费者时,如果消费者程序抛出异常,但是消息依然被消费。

 

  • 修改消费者Recv的代码:
ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);int i = 100/0; //todo 异常抛出点System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}

 

  • send生产者发消息

 

 

  • 运行消费者Recv 程序抛出异常

  • 查看rabbitmq界面,生产者发送的消息已经被消费掉

 

 

2.3.3  手动ACK

修改消费者Recv2,把自动改成手动

ample.demorabbitmq.simple;import java.io.IOException;ample.demorabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** 消费者,手动进行ACK*/
public class Recv2 {private final static String QUEUE_NAME = "simple_queue_demo";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = Connection();// 创建通道final Channel channel = ateChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);int i = 100/0;System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.DeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}

 

 

运行生产者send代码:

 

运行Recv2代码:

 

管理界面:

 

没有被消费掉。

本文发布于:2024-02-04 17:16:18,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170712536157712.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

下一篇:RabbitMQ
标签:模型   消息   RabbitMQ
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23