alexpdh's blog

Spring Boot 集成 socket.io 后端实现消息实时通信

socketio.jpg

概述

基于 socket.io 来说,采用 node 实现更加合适,本文使用两个后端的开源框架实现,服务端使用 netty-socketio ,客户端使用 socket.io-client。使用 Scheduledexecutorservice 实现消息可配置任务调度。


socket.io

socket.io:是一个面向实时 web 应用的 JavaScript 库。它使得服务器和客户端之间实时双向的通信成为可能。他有两个部分:在浏览器中运行的客户端库,和一个面向Node.js的服务端库。两者有着几乎一样的API。像Node.js一样,它也是事件驱动的.

Socket.IO 主要使用WebSocket协议。但是如果需要的话,Socket.io可以回退到几种其它方法,例如Adobe Flash Sockets,JSONP拉取,或是传统的AJAX拉取,[2]并且在同时提供完全相同的接口。尽管它可以被用作WebSocket的包装库,它还是提供了许多其它功能,比如广播至多个套接字,存储与不同客户有关的数据,和异步IO操作。


项目搭建

引入依赖包

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.12</version>
</dependency>

创建服务端类 SocketServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.socket.socketio;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.socket.domain.TimeMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* socketio 服务端
*netty-socketio 实现的 socke.io 服务端
*
* @author pengdh
* @date: 2017-09-03 1:13
*/
@Component
public class SocketServer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static SocketIOServer server = initServer();
/**
* 初始化服务端
* @return
*/
private static SocketIOServer initServer() {
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(9090);
server = new SocketIOServer(config);
return server;
}
/**
* 启动服务端
*/
public void startServer() {
// 添加连接监听
server.addConnectListener(new ConnectListener() {
@Override
public void onConnect(SocketIOClient socketIOClient) {
logger.info("server 服务端启动成功");
}
});
// 添加断开连接监听
server.addDisconnectListener(new DisconnectListener() {
@Override
public void onDisconnect(SocketIOClient socketIOClient) {
logger.info("server 服务端断开连接");
}
});
// 添加事件监听
server.addEventListener("join", String.class, new DataListener<String>() {
@Override
public void onData(SocketIOClient socketIOClient, String str,
AckRequest ackRequest)
throws Exception {
logger.info("收到客户端加入消息:" + str);
server.getBroadcastOperations().sendEvent("joinSuccess", "join success");
}
});
// 添加事件监听
server.addEventListener("chatMessage", TimeMessage.class, new DataListener<TimeMessage>() {
@Override
public void onData(SocketIOClient socketIOClient, TimeMessage message,
AckRequest ackRequest)
throws Exception {
logger.info("收到客户端消息:" + message.toString());
server.getBroadcastOperations().sendEvent("return message", message.toString());
}
});
// 启动服务端
server.start();
}
/**
* 停止服务端
*/
public void stopServer() {
server.stop();
}
}
  1. initServer 方法 初始化服务端本地 9090 端口;
  2. startServer 方法中,添加一系列事件监听器,并根据事件作出响应;
  3. 通过 server.start() 启动服务端,server.stop() 停止服务端;

创建客户端类 SocketClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package com.socket.socketio;
import com.socket.common.date.DateUtils;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter.Listener;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* socketio 客户端实现
*
* @author pengdh
* @date: 2017-09-03 1:06
*/
@Component
public class SocketClient {
// 初始化连接
private static Socket socket = initSocket();
// 初始化连接池
private static ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(
10);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
// 连接标识
private boolean isConnected;
private String onMessageContent = null;
/**
* 用于存放每个进来任务的 future ,key:为任务id,value:future,
* 目的是为了可以通过条件控制任务,通过接口调用可以 cancel 对应的 future
*/
private Map<String, Future> futureMap = new HashMap<String, Future>();
/**
* 连接监听事件
* 实现消息回调接口
*/
private Listener onConnect = new Listener() {
@Override
public void call(Object... objects) {
logger.info("client 连接服务端成功:");
if (!isConnected) {
socket.emit("connect message", "hello");
}
isConnected = true;
}
};
/**
* 断开连接端口监听
*/
private Listener onDisconnect = new Listener() {
@Override
public void call(Object... objects) {
logger.info("client 断开服务端连接:" + objects[0]);
isConnected = false;
}
};
/**
* 连接错误监听
*/
private Listener onConnetError = new Listener() {
@Override
public void call(Object... objects) {
logger.info("client 连接服务端错误:" + objects[0]);
}
};
/**
* 连接超时监听
*/
private Listener onConnetTimeout = new Listener() {
@Override
public void call(Object... objects) {
logger.info("client 连接服务端超时:" + objects[0]);
}
};
/**
* 消息监听事件
*/
private Listener onMessage = new Listener() {
@Override
public void call(Object... objects) {
logger.info("收到返回监听事件:" + objects[0]);
onMessageContent = (String) objects[0];
}
};
/**
* 初始化 socket 连接
*/
public static Socket initSocket() {
try {
socket = IO.socket("http://localhost:9090");
} catch (URISyntaxException e) {
e.printStackTrace();
}
return socket;
}
/**
* 连接处理
*/
public void connectSocket() {
// 连接
socket.connect();
// 添加监听事件
addConnectListenerEvent();
}
/**
* 断开连接
*/
public void disConnectSocket() {
// 断开连接
socket.disconnect();
// 添加监听事件
addDisConnectListenerEvent();
}
/**
* 添加连接监听事件
*/
private void addConnectListenerEvent() {
socket.on(Socket.EVENT_CONNECT, onConnect); // 连接成功
socket.on(Socket.EVENT_DISCONNECT, onDisconnect); // 断开连接
socket.on(Socket.EVENT_CONNECT_ERROR, onConnetError);// 连接错误
socket.on(Socket.EVENT_CONNECT_TIMEOUT, onConnetTimeout); // 连接超时
}
/**
* 添加断开连接监听事件
*/
public void addDisConnectListenerEvent() {
socket.off(Socket.EVENT_CONNECT, onConnect); // 连接成功
socket.off(Socket.EVENT_DISCONNECT, onDisconnect); // 断开连接
socket.off(Socket.EVENT_CONNECT_ERROR, onConnetError);// 连接错误
socket.off(Socket.EVENT_CONNECT_TIMEOUT, onConnetTimeout); // 连接超时
}
/**
* 启动客户端并指定发送消息任务开始时间
* @param id 指定的任务id
* @param startTime 任务开始时间
*/
public void startClient(String id, String startTime) {
try {
this.judgeConnection();
this.judgeHandleExistFuture(id);
this.emitJoined(id);
// 指定从当前时间延迟多久后开始执行定时的任务,时间单位可以在调用方法时指定
long delay = 0;
// 指定每次执行任务的时间间隔
long period = 10000;
Date date = DateUtils.parseShortDateTime(startTime);
// 计算任务开始时间到当前时间的毫秒差
long targetTimeMillis = date.getTime();
long initDelay = targetTimeMillis - System.currentTimeMillis();
delay = initDelay > 0 ? initDelay : delay;
logger.info("启动定时任务:delay=" + delay + " period=" + period);
// 因为每天会定时结束所有服务,当再次有任务进来时要先初始化线程池
if (scheduledExecutorService.isShutdown()) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
}
Future future = scheduledExecutorService
.scheduleAtFixedRate(new HandleMessageRunnable(id), delay, period,
TimeUnit.MILLISECONDS);
// 将 future 放入map
futureMap.put(id, future);
logger.info("已有客户端:" + futureMap.keySet());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 加入事件监听
* @param id
*/
private void emitJoined(String id) {
socket.emit("join", id).on("joinSuccess", onMessage);
}
/**
* 判断连接
*/
private void judgeConnection() {
if (!isConnected) {
this.initSocket();
this.connectSocket();
}
}
/**
* cancel 任务
* @param id
*/
public void judgeHandleExistFuture (String id) {
if (futureMap.containsKey(id)) {
Future future = futureMap.get(id);
future.cancel(true);
futureMap.remove(id);
logger.info(id + " task cancelled!");
logger.info("剩余客户端:" + futureMap.keySet());
}
}
/**
* cancel 掉 id 对应的任务
*/
public void endClient(String id) {
try {
this.judgeHandleExistFuture(id);
} catch (Throwable e) {
logger.error(e.getMessage());
}
}
/**
* 结束所有任务
*/
public void shutdown() {
scheduledExecutorService.shutdown();
futureMap.clear();
logger.info("task have shutdown!");
}
/**
* 处理任务
* 通过固定频率发送消息到服务端
*/
private class HandleMessageRunnable implements Runnable {
private String id;
public HandleMessageRunnable(String id) {
this.id = id;
}
@Override
public void run() {
JSONObject jsonObject = new JSONObject();
try {
// 先发送 joined 事件,当收到回执后再发送 chatMessage 消息
if (null != onMessageContent && onMessageContent.equals("join success")) {
String currentTime = DateUtils.formatStandardDateTime(new Date());
jsonObject.put("id", id);
jsonObject.put("currentTime", currentTime);
logger.info("客户端发送消息:" + jsonObject.toString());
socket.emit("chatMessage", jsonObject);
} else {
logger.info("未收到加入返回事件");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
  1. 通过 initSocket 初始化客户端,初始化地址 http://localhost:9090;
  2. 通过 new ScheduledThreadPoolExecutor(10),初始化任务调度线程池;
  3. 使用 Map 用于存放每个进来任务的 future ,key:为任务id,value:future,目的是为了可以通过条件控制任务,通过接口调用可以 cancel 对应的 future;
  4. 通过 connectSocket 连接服务端,并调用 addConnectListenerEvent 添加一些列监听事件;
  5. 通过 startClient 启动任务。
  6. 通过 HandleMessageRunnable 类实现具体消息的处理。

创建定时任务调度控制器 ScheduledController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.socket.web;
import com.socket.socketio.SocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 定时任务调度控制器
*
* @author pengdh
* @date: 2017-09-03 1:53
*/
@RestController
public class ScheduledController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private SocketClient client;
/**
* 启动客户端任务
* @param id 任务id
* @param startTime 任务开始时间
* @return
*/
@RequestMapping("/start/{id}")
public ResponseEntity<String> startClient(@PathVariable String id, @RequestParam String startTime) {
client.startClient(id, startTime);
return new ResponseEntity<String>(HttpStatus.NO_CONTENT);
}
/**
* 取消消息任务
* @param id
* @return
*/
@RequestMapping("/end/{id}")
public ResponseEntity<String> endClient(@PathVariable String id) {
client.endClient(id);
return new ResponseEntity<String>(HttpStatus.NO_CONTENT);
}
/**
* 通过接口调用手动结束所有任务
*
* @return
*/
@RequestMapping("/end")
public ResponseEntity<String> shutdown() {
logger.info("手动结束所有任务");
client.shutdown();
return new ResponseEntity<String>(HttpStatus.NO_CONTENT);
}
/**
* 定时结束所有任务
*/
@Scheduled(cron = "0 0 2 * * ?")
public void stopClient() {
logger.info("定时结束所有任务");
client.shutdown();
}
}
  1. 调用 http://localhost:8080/socket-demo/start/{id}?startTime=yyyyMMddHHmmss 接口可以控制指定任务在指定时间开始执行;
  2. 调用 http://localhost:8080/socket-demo/end/{id} 取消指定任务;
  3. 调用 http://localhost:8080/socket-demo/end 可以手动接收当前在执行的所有任务,scheduledExecutorService.shutdown() 不会强制所有任务立即停止,而是会在正在执行的本次任务执行完后才 shutdown;
  4. 这里使用到了 @Scheduled ,需要到 Application 类中的加上 @EnableScheduling 注解以开启定时任务;
  5. stopClient 方法可以定时结束所有任务,以节省资源,这个根据业务自身决定是否需要。

修改 Appingcation 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.socket;
import com.socket.socketio.SocketServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@ComponentScan
public class SpringbootSocketApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootSocketApplication.class, args);
SocketServer server = new SocketServer();
server.startServer();
}
}

运行效果

  1. 启动项目
  2. 调用 http://localhost:8080/socket-demo/start/{id}?startTime=yyyyMMddHHmmss 接口启动任务。
  3. 查看控制台信息,注意 2 的时间和现在的时间,未到时间: 到时间:;
  4. 再次调用 http://localhost:8080/socket-demo/start/{id}?startTime=yyyyMMddHHmmss 加入新任务:
  5. http://localhost:8080/socket-demo/end/{id},可以取消任务。
  6. http://localhost:8080/socket-demo/end 结束所有任务。

源码地址:

alexpdh wechat
欢迎扫一扫关注 程序猿pdh 公众号!