Springboot + WebSocket 实现

在Springboot和vue中,使用WebSocket 实现聊天功能

前端socket.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import store from "./store";
import { showToast } from "vant";

var websock = null;
let rec; //断线重连后,延迟5秒重新创建WebSocket连接 rec用来存储延迟请求的代码
let isConnect = false; //连接标识 避免重复连接
//心跳发送/返回的信息 服务器和客户端收到的信息内容如果如下 就识别为心跳信息 不要做业务处理
let checkMsg = "heartbeat";

var globalCallback = new Map();

let createWebSocket = () => {
if (websock == null || websock.readyState != websock.OPEN) {
try {
initWebSocket(); //初始化websocket连接
} catch (e) {
console.log("尝试创建连接失败");
reConnect(); //如果无法连接上webSocket 那么重新连接!
//可能会因为服务器重新部署,或者短暂断网等导致无法创建连接
}
}
};

//定义重连函数
let reConnect = () => {
console.log("尝试重新连接");
if (isConnect) return; //如果已经连上就不在重连了
rec && clearTimeout(rec);
rec = setTimeout(function() { // 延迟5秒重连 避免过多次过频繁请求重连
createWebSocket();
}, 5000);
};

//设置关闭连接
let closeWebSocket = () => {
try {
websock.close();
} catch (e) {
console.log("不存在websocket连接");
}

};

//心跳设置
var heartCheck = {
timeout: 20000, //每段时间发送一次心跳包 这里设置为20s
timeoutObj: null, //延时发送消息对象(启动心跳新建这个对象,收到消息后重置对象)

start: function() {
this.timeoutObj = setInterval(function() {
//发送心跳包
//TODO 发送请求检测是否有未接收的消息
// console.log("hearting ....")
// if (isConnect) websock.send(checkMsg);
}, this.timeout);
},

reset: function() {
clearInterval(this.timeoutObj)
this.start();
},

stop: function() {
clearInterval(this.timeoutObj)
}
};

// 初始化websocket
function initWebSocket() {
//用户id
const userId = store.state.id;
//建立websocket客户端
const url = 'ws://localhost:8082/ws/' + userId;

// ws地址
websock = new WebSocket(url)
websock.onmessage = function(e) {
websocketonmessage(e)
}
websock.onclose = function(e) {
websocketclose(e)
}
websock.onopen = function() {
websocketOpen()
heartCheck.start();
}

// 连接发生错误的回调方法
websock.onerror = function() {
console.log('WebSocket连接发生错误')
isConnect = false; //连接断开修改标识
reConnect(); //连接错误 需要重连
}
}

// 实际调用的方法
function sendSock(agentData) {
if (!websock) {
initWebSocket()
}
if (websock.readyState === websock.OPEN) {
// 若是ws开启状态
websocketsend(agentData)
} else if (websock.readyState === websock.CONNECTING) {
// 若是 正在开启状态,则等待1s后重新调用
setTimeout(function() {
sendSock(agentData)
}, 2000)
} else {
// 若未开启 ,则等待1s后重新调用
setTimeout(function() {
sendSock(agentData)
}, 2000)
}
}

function getSock(key, callback) {
globalCallback.set(key, callback)
}


// 数据接收
function websocketonmessage(e) {
let ret = JSON.parse(decodeUnicode(e.data))

if (!ret) {
heartCheck.reset()
} else {
if (ret.error != null) {
showToast(ret.error);
}
console.log(ret);
}

// 字符转义
function decodeUnicode(str) {
str = str.replace(/\\/g, "%");
//转换中文
str = unescape(str);
//将其他受影响的转换回原来
str = str.replace(/%/g, "\\");
//对网址的链接进行处理
str = str.replace(/\\/g, "");
return str;
}
}

// 数据发送
function websocketsend(agentData) {
console.log("数据发送", JSON.stringify(agentData))
websock.send(JSON.stringify(agentData))
}

// 关闭
function websocketclose(e) {
console.log(e)
isConnect = false; //断开后修改标识
heartCheck.stop()
console.log('connection closed (' + e.code + ')')
}

// 创建 websocket 连接
function websocketOpen(e) {
isConnect = true
console.log('连接成功')
}

// 将方法暴露出去
export {
websock,
websocketonmessage,
sendSock,
getSock,
createWebSocket,
closeWebSocket,
initWebSocket
}

在页面中,如果需要实时监听是否收到消息可以用以下方法

1
2
3
4
5
6
7
8
9
10
11
import { websock,websocketonmessage } from '../socket.js'

//为了监听消息后刷新消息列表
//需要覆盖socket.js内的onmessage,保持原有的操作并添加在此页面需要执行的操作
if(loginState.value){
websock.onmessage = (e)=>{
websocketonmessage(e);
//以下是覆盖后新增的内容--刷新消息对象列表
getChatObjectList()
}
}

后端

1.配置类

1
2
3
4
5
6
7
8
9
10
11
12
/**
* WebSocket配置类,用于注册WebSocket的Bean
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

2.逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package com.me.yiqi.websocket;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.me.yiqi.domain.entity.ChatObject;
import com.me.yiqi.domain.entity.Message;
import com.me.yiqi.domain.entity.TeamUserCorr;
import com.me.yiqi.mapper.ChatObjectMapper;
import com.me.yiqi.mapper.MessageMapper;
import com.me.yiqi.mapper.TeamUserCorrMapper;
import com.me.yiqi.utils.MyBeanUtil;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.support.SessionFlashMapManager;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;

/**
* WebSocket服务
*/
@Component
@Configurable
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer{
//存放会话对象
private static Map<String, Session> sessionMap = new HashMap();

//由于Autowire注入为空
//所以通过自定义的工具类手动获取MessageMapper
private MessageMapper messageMapper
= MyBeanUtil.getBean(MessageMapper.class);
//同上
private ChatObjectMapper chatObjectMapper
= MyBeanUtil.getBean(ChatObjectMapper.class);
private TeamUserCorrMapper teamUserCorrMapper
= MyBeanUtil.getBean(TeamUserCorrMapper.class);



/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
System.out.println("客户端:" + userId + "建立连接");
sessionMap.put(userId, session);
}

/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message,
@PathParam("userId") String userId,
Session session) throws IOException {
System.out.println("收到来自客户端:" + userId + "的信息:" + message);


//解析json为Message对象
HashMap map = JSON.parseObject(message,HashMap.class);
ObjectMapper objectMapper = new ObjectMapper();
Message receivedMsg = objectMapper.convertValue(map, Message.class);

//校验消息
Map<String,Object> error = new HashMap();
//判定用户id是否为空
if(receivedMsg.getFromUserId()==null || userId==null){
error.put("error","用户未登录");
session.getBasicRemote().sendText(JSON.toJSONString(error));
return;
}
//type为私聊时toUserId不能为null.
//type为群聊时toTeamId不能为null.且用户需要在群内
if(receivedMsg.getType()==Message.MESSAGE_TYPE_PRIVATE){
if(receivedMsg.getToUserId()==null || receivedMsg.getToUserId()<=0){
error.put("error","参数toUserId不符合要求");
session.getBasicRemote().sendText(JSON.toJSONString(error));
return;
}
}else if(receivedMsg.getType()==Message.MESSAGE_TYPE_GROUP){
if(receivedMsg.getToTeamId()==null || receivedMsg.getToTeamId()<=0){
error.put("error","参数toTeamId不符合要求");
session.getBasicRemote().sendText(JSON.toJSONString(error));
return;
}else{
//判断用户是否在群内
if(teamUserCorrMapper
.selectByTeamIdAndUserId(receivedMsg.getToTeamId(),
Long.valueOf(userId))
==null
){
error.put("error","您已不在群内");
session.getBasicRemote().sendText(JSON.toJSONString(error));
return;
}
}
}

//判断消息类型
if(receivedMsg.getType()==Message.MESSAGE_TYPE_PRIVATE){
//1.私聊
//存入数据库
messageMapper.insert(receivedMsg);
//通知接收人
sendMessageToUser(userId);
}else if(receivedMsg.getType()==Message.MESSAGE_TYPE_GROUP){
//2.群聊
//获取群内所有成员的id
List<Long> userIds = teamUserCorrMapper
.selectUserIdsByTeamId(receivedMsg.getToTeamId());
//遍历群成员,为每个成员存入这一条消息
for (Long toUserId : userIds) {
Message teamMsg = new Message();
BeanUtils.copyProperties(receivedMsg,teamMsg);
teamMsg.setToUserId(toUserId);
messageMapper.insert(teamMsg);
}
//通知接收人
sendMessageToTeam(userId);
}

}

/**
* 连接关闭调用的方法
* @param userId
*/
@OnClose
public void onClose(@PathParam("userId") String userId) {
System.out.println("连接断开:" + userId);
sessionMap.remove(userId);
}

/**
* 群发
* 群发的消息会对应多个用户存储多条同样的信息
* 便于区分已读未读,以及统计未读数量
*/
public void sendMessageToTeam(String fromUserId) {
//1.查询属于该用户发的消息
LambdaQueryWrapper<Message> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Message::getFromUserId,Long.valueOf(fromUserId));
//属于群聊
wrapper.eq(Message::getType, Message.MESSAGE_TYPE_GROUP);
//且未被接收的消息 需要发送给用户
wrapper.eq(Message::getStatus, Message.MESSAGE_STATUS_UNRECEIVED);
//2.获取消息
List<Message> messages = messageMapper.selectList(wrapper);
//3.遍历发送消息
for (Message message : messages) {
//获取接收方的userId
Long toUserId = message.getToUserId();
//获取接收方登录的session
Session session = sessionMap.get(toUserId.toString());
if(session!=null){
try {
//发送消息并更新状态
session.getBasicRemote().sendText(JSON.toJSONString(message));
message.setStatus(Message.MESSAGE_STATUS_UNREAD);
messageMapper.updateById(message);
} catch (IOException e) {
e.printStackTrace();
}
}
//更新消息发送方和接收方两方的聊天对象列表
updateChatObjectList(message);
}

}

/**
* sendMessage
* 发送私聊消息
* 该方法会查询该用户所有未被接收的消息并发送给对方
* 如果对方成功接收则将消息状态设为未读
*/
public void sendMessageToUser(String fromUserId){
LambdaQueryWrapper<Message> wrapper = new LambdaQueryWrapper<>();
//1.查询属于该用户发的消息
wrapper.eq(Message::getFromUserId,Long.valueOf(fromUserId));
//属于私聊
wrapper.eq(Message::getType, Message.MESSAGE_TYPE_PRIVATE);
//且未被接收的消息 需要发送给用户
wrapper.eq(Message::getStatus, Message.MESSAGE_STATUS_UNRECEIVED);
//2.查询消息
List<Message> messages = messageMapper.selectList(wrapper);
//3.遍历发送消息
for (Message message : messages) {
//获取接收方的userId
Long toUserId = message.getToUserId();
//获取接收方登录的session
Session session = sessionMap.get(toUserId.toString());
if(session!=null){
try {
//发送消息并更新状态
session.getBasicRemote().sendText(JSON.toJSONString(message));
message.setStatus(Message.MESSAGE_STATUS_UNREAD);
messageMapper.updateById(message);
} catch (IOException e) {
e.printStackTrace();
}
}else{
System.out.println("存在未接收的消息");
}
//更新消息发送方和接收方两方的聊天对象列表
updateChatObjectList(message);
}
}

/**
* 更新聊天对象列表
*/
public void updateChatObjectList(Message message){
Long fromUserId = message.getFromUserId();
Long toUserId = message.getToUserId();
Long toTeamId = message.getToTeamId();
boolean isChatWithTeam =
message.getType()==Message.MESSAGE_TYPE_GROUP?true:false;
//更新发送方的聊天对象列表
LambdaQueryWrapper<ChatObject> sender = new LambdaQueryWrapper<>();
sender.eq(ChatObject::getUserId,fromUserId)
.eq(ChatObject::getChatWithId,isChatWithTeam?toTeamId:toUserId)
.eq(ChatObject::getIsChatWithTeam,isChatWithTeam);
ChatObject senderChatObject = chatObjectMapper.selectOne(sender);
if(senderChatObject==null){
//第一次进行聊天则创建新的对象
senderChatObject = ChatObject.builder()
.userId(fromUserId)
.chatWithId(isChatWithTeam?toTeamId:toUserId)
.isChatWithTeam(isChatWithTeam)
.build();
chatObjectMapper.insert(senderChatObject);
}else{
//更新聊天对象列表
senderChatObject.setLastTime(new Date());
senderChatObject.setLastMessage(message.getContent());
chatObjectMapper.updateById(senderChatObject);
}
//更新接收方的聊天对象列表
//1.私聊则正常建立两个相对的聊天对象
if(!isChatWithTeam){
LambdaQueryWrapper<ChatObject> receiver = new LambdaQueryWrapper<>();
receiver.eq(ChatObject::getUserId,toUserId)
.eq(ChatObject::getChatWithId,fromUserId)
.eq(ChatObject::getIsChatWithTeam,isChatWithTeam);
ChatObject receiverChatObject = chatObjectMapper.selectOne(receiver);
if(receiverChatObject==null){
//第一次进行聊天则创建新的对象
receiverChatObject = ChatObject.builder()
.userId(toUserId)
.chatWithId(fromUserId)
.isChatWithTeam(isChatWithTeam)
.build();
chatObjectMapper.insert(receiverChatObject);
}else{
//更新聊天对象列表
receiverChatObject.setLastTime(new Date());
receiverChatObject.setLastMessage(message.getContent());
chatObjectMapper.updateById(receiverChatObject);
}
}else{
//2.如果是群聊,则需要为每个群成员建立一个该群的聊天对象
// 群员作为发送方,群作为接收方
//获取群成员id
List<Long> userIds = teamUserCorrMapper.selectUserIdsByTeamId(toTeamId);
for (Long userId : userIds) {
LambdaQueryWrapper<ChatObject> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(ChatObject::getUserId,userId)
.eq(ChatObject::getChatWithId,toTeamId)
.eq(ChatObject::getIsChatWithTeam,true);
ChatObject chatObject = chatObjectMapper.selectOne(wrapper);
if(chatObject==null){
//第一次进行聊天则创建新的对象
chatObject = ChatObject.builder()
.userId(userId)
.chatWithId(toTeamId)
.isChatWithTeam(isChatWithTeam)
.build();
chatObjectMapper.insert(chatObject);
}else {
//更新聊天对象列表
chatObject.setLastTime(new Date());
chatObject.setLastMessage(message.getContent());
chatObjectMapper.updateById(chatObject);
}
}

}


}

}

3.MyBeanUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class MyBeanUtil implements ApplicationContextAware {

protected static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext app) throws BeansException {
if (applicationContext == null) {
applicationContext = app;
}
}

/**
* 通过类的class从容器中手动获取对象
*/
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}