RabbitMQ——实战篇1(原生API)

阅读: 评论:0

RabbitMQ——实战篇1(原生API)

RabbitMQ——实战篇1(原生API)

实战篇mu目录

  • 其他文章地址
  • RabbitMQ实战
    • 1、简单模式 HelloWorld
      • 1.1、生成者代码
      • 1.2、消费者代码
    • 2、工作队列模式 Work Queue
      • 2.1、生产者
      • 2.2、消费者
    • 3、发布订阅模式 Publish/subscribe
      • 3.1、生产者
      • 3.2、消费者
    • 4、路由模式 Routing
      • 4.1、生产者
      • 4.2、消费者
    • 5、通配符模式 Topic
      • 5.1、生产者
      • 5.2、消费者
    • 6、confirm机制
      • 6.1、生产者
      • 6.2、消费者

其他文章地址

1、RabbitMQ——单机版安装(3.6.5)
2、RabbitMQ——入门篇
3、RabbitMQ——实战篇1(原生API)
4、RabbitMQ——实战篇2(Spring集成)
5、RabbitMQ——实战篇3(Spring集成高级特性:死信队列,消息丢失,延迟队列)
6、RabbitMQ——实战篇4(SpringBoot集成)

RabbitMQ实战

项目地址:
注意:只要是exchange都需要自己在rabbitmq服务端配置,也就是界面上
依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId&le.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><optional>true</optional></dependency></dependencies>

公共代码

package com.zhz.utils;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 公共静态变量**/
public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
package com.zhz.utils;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description:公共类**/
public class RabbitUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();static {connectionFactory.setHost("192.168.0.66");connectionFactory.setPort(5672);connectionFactory.setUsername("zhzmq");connectionFactory.setPassword("zhzmq");connectionFactory.setVirtualHost("/zhztest");}public static Connection getConnection(){Connection conn = null;try {conn = wConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}}
}

1、简单模式 HelloWorld

1.1、生成者代码

package com.zhz.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;
import urrent.TimeoutException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 生产者**/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//获取TCP长连接Connection connection = Connection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = ateChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);String message="zhz6dsad66";//四个参数//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到//队列名称//额外的设置属性//最后一个参数是要传递的消息字节数组channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,Bytes());channel.close();connection.close();System.out.println("======数据发送成功===");}
}

1.2、消费者代码

package com.zhz.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class Consumer {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = Connection();//创建通信"通道",相当于TCP中的虚拟连接Channel channel = ateChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//从MQ服务器中获取数据//创建一个消息消费者//第一个参数:队列名//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法//第三个参数要传入DefaultConsumer的实现类channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Receiver(channel));}
}
package com.zhz.helloworld;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: **/
public class Receiver extends DefaultConsumer {private Channel channel;/***   重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到*/public Receiver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body);System.out.println("消费者接收到的消息:"+message);System.out.println("消息的TagId:"&#DeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.DeliveryTag(),false);}
}

2、工作队列模式 Work Queue

2.1、生产者

package com.zhz.workqueue;le.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;
import urrent.TimeoutException;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 发送方(服务端/生产者)**/
public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = Connection();Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);for (int i = 1; i <= 100; i++) {Sms sms=new Sms("乘客"+i,"10086"+i,"您的车票已经预定");String json = new Gson().toJson(sms);channel.basicPublish("",RabbitConstant.QUEUE_SMS,Bytes());}System.out.println("发送数据成功");channel.close();connection.close();}
}

2.2、消费者

package com.zhz.workqueue;import lombok.*;
perimental.Accessors;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 短信实体类**/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class Sms {private String name;private String mobile;private String content;
}
package com.zhz.workqueue;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class SmsSend1 {public static void main(String[] args) throws Exception {Connection connection = Connection();final Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS1-发送短信成功" + json);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.DeliveryTag(),false);}});}
}
package com.zhz.workqueue;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class SmsSend2 {public static void main(String[] args) throws Exception {Connection connection = Connection();final Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS2-发送短信成功" + json);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.DeliveryTag(),false);}});}
}
package com.zhz.workqueue;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/31* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class SmsSend3 {public static void main(String[] args) throws Exception {Connection connection = Connection();final Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);System.out.println("SMS3-发送短信成功" + json);try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.DeliveryTag(),false);}});}
}

3、发布订阅模式 Publish/subscribe

3.1、生产者

package com.zhz.pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.util.Scanner;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 生产者**/
public class WeatherBureau {public static void main(String[] args) throws Exception {Connection connection = Connection();String input = new Scanner(System.in).next();Channel channel = ateChannel();//第一个参数交换机名字   其他参数和之前的一样channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , Bytes());channel.close();connection.close();}
}

3.2、消费者

package com.zhz.pubsub;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class BiaDu {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = Connection();//获取虚拟连接final Channel channel = ateChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}
package com.zhz.pubsub;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = Connection();//获取虚拟连接final Channel channel = ateChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}

4、路由模式 Routing

4.1、生产者

package uting;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Scanner;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 生产者**/
public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = Connection();Channel channel = ateChannel();Iterator<Map.Entry<String, String>> itr = Set().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = ();//第一个参数交换机名字   第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_Key() , null , me.getValue().getBytes());}channel.close();connection.close();}
}

4.2、消费者

package uting;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = Connection();//获取虚拟连接final Channel channel = ateChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}
package uting;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = Connection();final Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}

5、通配符模式 Topic

5.1、生产者

package pic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 生产者**/
public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = Connection();Channel channel = ateChannel();Iterator<Map.Entry<String, String>> itr = Set().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = ();//第一个参数交换机名字   第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_Key() , null , me.getValue().getBytes());}channel.close();connection.close();}
}

5.2、消费者

package pic;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = Connection();final Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");// channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}
package pic;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/01/30* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = Connection();//获取虚拟连接final Channel channel = ateChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.DeliveryTag() , false);}});}}

6、confirm机制

6.1、生产者

package firm;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;/*** @author :zhz* @date :Created in 2021/02/03* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 生产者=>不需要关闭,因为要监听**/
public class WeatherBureau {public static void main(String[] args) throws IOException {Map<String, String> area = new LinkedHashMap<>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = Connection();Channel channel = ateChannel();//开启confirm监听模式firmSelect();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {//第二个参数代表接收的数据是否为批量接收,一般不用System.out.println("消息已被Broker接收,Tag="+l);}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker拒收,Tag:" + l);}});channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return aReturn) {println("===========================");println("Return编码:" + ReplyCode() + "-Return描述:" + ReplyText());println("交换机:" + Exchange() + "-路由key:" + RoutingKey() );println("Return主题:" + new Body()));println("===========================");}});Iterator<Map.Entry<String, String>> iterator = Set().iterator();while (iterator.hasNext()){Map.Entry<String, String> map = ();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_Key(),true,Value().getBytes());}}
}

6.2、消费者

package firm;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/02/04* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description: 消费者**/
public class Sina {public static void main(String[] args) throws IOException {Connection connection = Connection();Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到的天气预报:"+new String(body));channel.DeliveryTag(),false);}});}
}
package firm;import com.rabbitmq.client.*;
import com.zhz.utils.RabbitConstant;
import com.zhz.utils.RabbitUtils;import java.io.IOException;/*** @author :zhz* @date :Created in 2021/02/05* @version: V1.0* @slogan: 天下风云出我辈,一入代码岁月催* @description:**/
public class Baidu {public static void main(String[] args) throws IOException {Connection connection = Connection();Channel channel = ateChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20201127");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到的天气预报:"+new String(body));channel.DeliveryTag(),false);}});}
}

我是小白弟弟,一个在互联网行业的小白,立志成为一名架构师
=1

本文发布于:2024-01-31 12:30:27,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170667542728526.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:实战篇   RabbitMQ   API
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23