SpringBoot单机和分布式(RocketMQ解决)WebSocket聊天室

阅读: 评论:0

SpringBoot单机和分布式(RocketMQ解决)WebSocket聊天室

SpringBoot单机和分布式(RocketMQ解决)WebSocket聊天室

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

SpringBoot单机和分布式WebSocket聊天室

  • github下载地址
  • 前言
    • WebSocke
  • 源码解读
    • 实现原理
    • 单机
    • POM依赖
    • 创建configure
    • 编写Websocket Server
    • 单机如何测试
    • 分布式的问题
  • 需要改进的地方


注意,两个用户在同一个房间才可以互相聊天

在线体验地址: 47.103.194.1:8081/

github下载地址


前言

HTTP 协议有一个缺陷:通信只能由客户端发起。 HTTP 协议做不到服务器主动向客户端推送信息。

这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。我们只能使用"轮询":每隔一段时候,就发出一个询问,了解服务器有没有新的信息。
轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此,出现了 WebSocket。

WebSocke

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
在WebSocket中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

所以可通过Websocket实现网络在线聊天室的功能

源码解读

实现原理

当用户登录后,向房间1发送消息,服务器收到消息后,找到所有在房间1的用户,并且向这些用户转发这条消息。即可实现网络聊天室的功能。

单机

首先创建一个SpringBoot项目

POM依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--webSocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>ketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>

创建configure

package com.mabo.websocket;import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Component
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

编写Websocket Server

package com.mabo.websocket;import com.alibaba.fastjson.JSONObject;
import kMQ.producer.WebsocketProducer;
slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
SimpleDateFormat;
import java.util.Date;
import urrent.CopyOnWriteArraySet;
//测试网站 /
// wss:////websocket/2/2//ws://127.0.0.1:8080/websocket/房间号/用户id
@Slf4j
@ServerEndpoint(value = "/websocket/{chatroom}/{userId}")
@Component
public class WebSocketServer {private static SimpleDateFormat sdf=new SimpleDateFormat("MM月dd日 HH:mm:ss");private  static  WebsocketProducer websocketProducer;@Autowiredpublic void setWebsocketProducer(WebsocketProducer websocketProducer) {WebSocketServer.websocketProducer = websocketProducer;}//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。public static int onlineCount = 0;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();//与某个客户端的连接会话,需要通过它来给客户端发送数据public Session session;//接收参数中的用户IDpublic String userId;//接收用户中的平台类型public String chatroom;/*** 连接建立成功调用的方法* 接收url中的参数*/@OnOpenpublic void onOpen(Session session,@PathParam("chatroom") String chatroom, @PathParam("userId") String userId) throws IOException {log.info("有新连接加入!  userId==== " + userId + "  chatroom==== " + chatroom);this.session = session;this.userId = userId;this.chatroom = chatroom;log.info("用户名  userId==== " + userId + "  chatroom==== " + chatroom+ "  session==== " + Id());webSocketSet.add(this);     //加入set中addOnlineCount();           //在线数加1try {sendMessage("连接成功");} catch (IOException e) {("websocket IO异常");}}/*** 连接关闭调用的方法* 如果服务端主动关闭当前连接,客户端感知不到*需要调用http请求通知客户端已经下线*/@OnClosepublic void onClose(Session session, @PathParam("chatroom") String chatroom, @PathParam("userId") String userId) throws IOException {boolean close=false;WebSocketServer closeUser=null;for (WebSocketServer item : webSocketSet) {try {if (item.userId.equals(userId)) {close=true;closeUser=item;break;}} catch (Exception e) {e.printStackTrace();}}if (close){
//            sendMessage(session,userId+"用户离线");ve(closeUser);  //从set中删除subOnlineCount();           //在线数减1}}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) {for (WebSocketServer item : webSocketSet) {try {if (item.session.equals(session)) {log.info( "用户 "+item.userId+" 向房间 "+item.chatroom+" 发送消息: "+message);JSONObject jsonObject = new JSONObject();jsonObject.put("sender",item.userId);jsonObject.put("msg",message);jsonObject.put("date",sdf.format(new Date()));WebSocketServer.sendChatroom(item.chatroom,jsonObject);//单机方式websocketProducer.sendMsg(item.chatroom,item.userId,message);//分布式部署}} catch (Exception e) {e.printStackTrace();}}}/*** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {("发生错误" + error);error.printStackTrace();}public void sendMessage(String message) throws IOException {BasicRemote().sendText(message);}public void sendMessage(Session session,String message) throws IOException {BasicRemote().sendText(message);}/*** 私发** @param message* @throws IOException*/public static void sendInfo(Long userId, String message) throws IOException {for (WebSocketServer item : webSocketSet) {try {if (item.userId.equals(userId)) {item.sendMessage(message);}} catch (IOException e) {break;}}}/*** 发送到聊天室*/public static void sendChatroom(String chatroom, JSONObject json) throws IOException {for (WebSocketServer item : webSocketSet) {try {if (item.chatroom.equals(chatroom)) {item.JSONString());}} catch (IOException e) {continue;}}}/*** 群发自定义消息*/public static void sendInfos(String message) throws IOException {log.info(message);for (WebSocketServer item : webSocketSet) {try {item.sendMessage(message);} catch (IOException e) {continue;}}}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {lineCount++;log.info("有新连接加入!当前在线人数为" + getOnlineCount() );}public static synchronized void subOnlineCount() {lineCount--;log.info("有一连接关闭!当前在线人数为" + getOnlineCount());}
}

单机如何测试

到这里就可以启动websocket服务器进行测试了,
但是需要客户端进行测试
下载git前端文件

直接打开即可进行测试

分布式的问题

Websocket识别用户并且发送消息时根据用户的session来进行发送的,其他jvm中的websocket时无法获取的,所以需要依赖中间件来解决这个问题

分布式下的websocket消息无法依靠websocket实现消息发送,
该demo使用RocketMQ的广播消息模式,对所有服务器发送消息,如果当前服务器连接了该用户则该服务器对用户发送消息,通过这种方式可以实现分布式部署情况下实现网络聊天室

消费者代码

package kMQ.listener;import com.alibaba.fastjson.JSONObject;
import com.mabo.websocket.WebSocketServer;
slf4j.Slf4j;
import ketmq.spring.annotation.MessageModel;
import ketmq.spring.annotation.RocketMQMessageListener;
import RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.io.IOException;
SimpleDateFormat;
import java.util.Date;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "websocket", topic = "websocket",messageModel = MessageModel.BROADCASTING)
//MessageModel 设置为广播模式BROADCASTING
public class WebsocketConsumer implements RocketMQListener<String> {private static SimpleDateFormat sdf=new SimpleDateFormat("MM月dd日 HH:mm:ss");@Overridepublic void onMessage(String s) {JSONObject parse = (JSONObject) JSONObject.parse(s);//消息类型      1发送消息,2关闭客户端String type = String("type");//data   包括//userID  roomIdlog.info("接收到消息,开始消费..message:" + s);if (type.equals("1")){JSONObject jsonObject = new JSONObject();jsonObject.put("sender",String("userId"));jsonObject.put("msg",String("msg"));jsonObject.put("date",sdf.format(new Date()));log.info( "用户 "&#String("userId")+" 向房间 "&#String("classRoom")+" 发送消息: "&#String("msg"));try {WebSocketServer.String("classRoom"),jsonObject);} catch (IOException e) {e.printStackTrace();}}}
}

生产者代码

package kMQ.producer;import com.alibaba.fastjson.JSONObject;
import RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class WebsocketProducer {private static final Logger log = Logger(WebsocketProducer.class);@AutowiredRocketMQTemplate rocketMQTemplate;public void sendMsg(String classRoom,String userId,String msg) {JSONObject jsonObject = new JSONObject();jsonObject.put("type",1);jsonObject.put("classRoom",classRoom);jsonObject.put("userId",userId);jsonObject.put("msg",msg);vertAndSend("websocket", JSONString());log.info("send message success"+jsonObject);}public void closeUser(String classRoom,String userId,String msg) {JSONObject jsonObject = new JSONObject();jsonObject.put("type",2);jsonObject.put("classRoom",classRoom);jsonObject.put("userId",userId);jsonObject.put("msg",msg);vertAndSend("websocket", JSONString());log.info("send message success"+jsonObject);}
}

配置文件

server:port: 8080
rocketmq:nameServer: 127.0.0.1:9876producer:group: maboGrouptopicName: websocket

需要改进的地方

单机下所有的用户信息都是存储在static修饰的静态变量中,每一次消息发送都需要所有服务器通过该变量轮询服务器中是否存在用户,造成了效率低下。

可以采用Redis缓存,用户的登录了哪个服务器存储到缓存中,每次发送消息只需要发送给缓存中的服务器(或者添加服务器标记),可以提高消息发送的效率。

或者将遍历的客户WebSocketServer 存储到hashmap中,根据用户id进行查询,也可以提高用户查找效率

本文发布于:2024-02-01 09:57:31,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170675265135829.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23