RabbitMQ基本使用,java下载百度文库

阅读: 评论:0

RabbitMQ基本使用,java下载百度文库

RabbitMQ基本使用,java下载百度文库

  • AMQP( Advanced Message Queuing Protocol),是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。Eang中的实现有 Rabbits等

  • JMS (Java Messageservice),是由Sun公司早期提出的消息标准,旨在为Java应用提供统一的消息操作,包括 create、send、 receive 等。JMS已经成为 java Enterprise Edition的一部分。从使用角度看,JMS和JDBC担任差不多的角色,用户都是根据相应的接口可以和实现了MS的服务进行通信,进行相关的操作。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

RabbitMQ


RabbitMQ是基于AMQP协议,erlang语言开发的一款消息管理系统。其稳定性很好,支持主流的操作系统, Linux、 Windows、 Madox等。 同时支持多种开发语言:Java、 Python、 Ruby、NET、PHP、C/C++、

官网: /

官方教程: .html

搭建工程


目录结构:

新建一个Meven工程,添加l依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns=“.0.0”

xmlns:xsi=“”

xsi:schemaLocation=“.0.0 .0.0.xsd”>

4.0.0

rabbitMQ

1.0-SNAPSHOT

org.springframework.boot

spring-boot-starter-parent

2.0.6.RELEASE

<java.version>1.8</java.version>

org.apachemons

commons-lang3

3.3.2

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

在util包中,建立一个RabbitMQ连接的工具类,方便其他程序获取连接:

public class ConnectionUtil {

/**

  • 建立与RabbitMQ的连接

  • @return

  • @throws Exception

*/

public static Connection getConnection() throws Exception {

//定义连接工厂

ConnectionFactory factory = new ConnectionFactory();

//设置服务地址

factory.setHost(“192.168.42.136”);

//端口

factory.setPort(5672);

//设置账号信息,用户名、密码、vhost

factory.setVirtualHost("/ly");

factory.setUsername(“ly”);

factory.setPassword(“123456”);

// 通过工程获取连接

Connection connection = wConnection();

return connection;

}

}

六种消息模型


RabbitMQ提供了6种消息模型:

| 简单模型:生产者->队列->消费者 | |

| — | — |

| 工作模型:一个队列对应多个消费者,一个消息只能被消费一次 | |

| 订阅模型(fanout):生产者->交换机,交换机对应多个队列多个消费者,一个消息可以被多个消费者消费 | |

| 订阅模式(direct):添加了路由信息routingKey,消息会发送给符合routingKey的队列 | |

| 订阅模式(topic):通配符,#:匹配一个或多个词,*:匹配一个词 | |

| RPC远程过程调用,不同服务器上通过网络来表达调用的语义和传达调用的数据,并不是MQ | |

基本消息模型


RabbitMQ是一个消息代理: 它接受和转发消息(二进制数据块)。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区:

案例:调用消息发布者(发送者)Send,连接到RabbitMQ,发送一条消息,消息消费者(接收者)Recv,接收消息。

生产者发送消息:

public class Send {

private final static String QUEUE_NAME = “simple_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = Connection();

// 从连接中创建通道,这是完成大部分API的地方。

Channel channel = ateChannel();

// 声明(创建)队列,必须声明队列才能够发送消息,我们可以把消息发送到队列中。

// 声明一个队列是幂等的 - 只有当它不存在时才会被创建

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 消息内容

String message = “Hello World!”;

channel.basicPublish("", QUEUE_NAME, null, Bytes());

System.out.println(" [x] Sent ‘" + message + "’");

//关闭通道和连接

channel.close();

connection.close();

}

}

控制台:

管理工具中查看消息:

进入队列页面,可以看到新建了一个队列:simple_queue

点击队列名称,进入详情页,可以查看消息:

在控制台查看消息并不会将消息消费,所以消息还在。

消费者获取消息:

public class Recv {

private final static String QUEUE_NAME = “simple_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 创建通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [x] received : " + msg + “!”);

}

};

// 监听队列,第二个参数:是否自动进行消息确认。

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

控制台:

这个时候,队列中的消息就没了:

我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.

消息确认机制(ACK):

一个问题: 消息一旦被消费者接收,队列中的消息就会被删除,如果消费者领取消息后,还没执行操作就挂掉了,使消息消费失败,但是RabbitMQ无从得知,依然会对消息进行消费,这样消息就丢失了!

RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK,(适合不太重要的消息)

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用(适合很重要,不容丢失的消息)

修改ACK机制为手动ACK:

public class Recv2 {

private final static String QUEUE_NAME = “simple_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 创建通道

final Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [x] received : " + msg + “!”);

// 手动进行ACK

channel.DeliveryTag(), false);

}

};

// 监听队列,第二个参数false,手动进行ACK

channel.basicConsume(QUEUE_NAME, false, consumer);

}

}

注意到最后一行代码:

channel.basicConsume(QUEUE_NAME, false, consumer);

如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:

work消息模型


工作队列或者竞争消费者模式

工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取

如何避免消息堆积:

1)采用workqueue,多个消费者监听同一队列。

2)接收到消息以后,而是通过线程池,异步消费。

案例:模拟一个生产者和多个消费者

生产者: 创建生产者,循环发送50条消息。

public class Send {

private final static String QUEUE_NAME = “test_work_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 循环发布任务

for (int i = 0; i < 30; i++) {

// 消息内容

String message = "task … " + i;

channel.b

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

asicPublish("", QUEUE_NAME, null, Bytes());

System.out.println(" [x] Sent ‘" + message + "’");

Thread.sleep(i * 2);

}

// 关闭通道和连接

channel.close();

connection.close();

}

}

创建一个工作队列,在多个工作者之间分配耗时任务,消费者2消费快,消费者1消费比较慢。

消费者1: 添加了模拟完成任务耗时:Thread.sleep(1000)

// 消费者1

public class Recv {

private final static String QUEUE_NAME = “test_work_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

final Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 设置每个消费者同时只能处理一条消息

channel.basicQos(1);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [消费者1] received : " + msg + “!”);

try {

// 模拟完成任务的耗时:1000ms

Thread.sleep(1000);

} catch (InterruptedException e) {

}

// 手动ACK

channel.DeliveryTag(), false);

}

};

// 监听队列。

channel.basicConsume(QUEUE_NAME, false, consumer);

}

}

消费者2:

//消费者2

public class Recv2 {

private final static String QUEUE_NAME = “test_work_queue”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

final Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 设置每个消费者同时只能处理一条消息

channel.basicQos(1);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [消费者2] received : " + msg + “!”);

// 手动ACK

channel.DeliveryTag(), false);

}

};

// 监听队列。

channel.basicConsume(QUEUE_NAME, false, consumer);

}

}

接下来,两个消费者一同启动,然后发送30条消息:可以发现,两个消费者各自消费了15条消息,而且各不相同,这就实现了任务的分发。

能者多劳:

消费者1比消费者2的效率要低,一次任务的耗时较长,然而两人最终消费的消息数量是一样的,这样消费者2大量时间处于空闲状态,而消费者1一直忙碌,这样并不合理,正确的做法应该是消费越快的人,消费的越多。

我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

再次测试:

订阅模型


传递一个信息给多个消费者。 这种模式被称为“发布/订阅”:

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

订阅模型-广播(Fanout)

流程图:

生产者:

public class Send {

//声明Exchange,不再声明Queue

private final static String EXCHANGE_NAME = “fanout_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明exchange,指定类型为fanout

// 消息内容

String message = “Hello everyone”;

// 发布消息到Exchange,不再发送到Queue

channel.basicPublish(EXCHANGE_NAME, “”, null, Bytes());

System.out.println(" [生产者] Sent ‘" + message + "’");

channel.close();

connection.close();

}

}

消费者1:

public class Recv {

private final static String QUEUE_NAME = “fanout_exchange_queue_1”;

private final static String EXCHANGE_NAME = “fanout_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//注意:队列需要和交换机绑定

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [消费者1] received : " + msg + “!”);

}

};

// 监听队列,自动返回完成

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

消费者2:

public class Recv2 {

private final static String QUEUE_NAME = “fanout_exchange_queue_2”;

private final static String EXCHANGE_NAME = “fanout_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [消费者2] received : " + msg + “!”);

}

};

// 监听队列,手动返回完成

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

测试:我们运行两个消费者,然后发送1条消息

订阅模型-定向(Direct)


有选择性的接收消息,在Direct模型下,队列与交换机不能任意绑定,消息的发送方在向Exchange发送消息时,我们需要指定一个RoutingKey(路由key),通过路由key,不同的队列可以消费不同的信息。

生产者:

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

public class Send {

private final static String EXCHANGE_NAME = “direct_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明exchange,指定类型为direct

// 消息内容

String message = “商品新增了, id = 1001”;

// 发送消息,并且指定routing key 为:insert ,代表新增商品

channel.basicPublish(EXCHANGE_NAME, “insert”, null, Bytes());

System.out.println(" [商品服务:] Sent ‘" + message + "’");

channel.close();

connection.close();

}

}

消费者1:

我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

public class Recv {

private final static String QUEUE_NAME = “direct_exchange_queue_1”;

private final static String EXCHANGE_NAME = “direct_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “update”);

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

String msg = new String(body);

System.out.println(" [消费者1] received : " + msg + “!”);

}

};

// 监听队列,自动ACK

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

消费者2:

我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

public class Recv2 {

private final static String QUEUE_NAME = “direct_exchange_queue_2”;

private final static String EXCHANGE_NAME = “direct_exchange_test”;

public static void main(String[] argv) throws Exception {

// 获取到连接

Connection connection = Connection();

// 获取通道

Channel channel = ateChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “insert”);

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “update”);

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);

// 定义队列的消费者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

byte[] body) throws IOException {

// body 即消息体

本文发布于:2024-02-04 17:16:48,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170712545457716.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:文库   RabbitMQ   java
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23