SpringBoot WebSocket STOMP 广播配置

1. 前言

WebSocket是一种在单个TCP连接上进行全双工通信的协议,常用于实时通信的场景。在没有使用高层级线路协议的情况下,直接使用WebSocket是很难实现发布订阅的功能。而STOMP是在WebSocket之上提供了一个基于帧的线路格式层,STOMP客户端可以同时作为生产者和消费者两种模式。为发布订阅的功能提供了基础。

2. STOMP协议

3. SpringBoot WebSocket集成

SpringBoot集成WebSocket非常方便,只需要简单的三个步骤:导包、配置、提供接口

3.1 导入websocket包

compile('org.springframework.boot:spring-boot-starter-websocket')

3.2 配置WebSocket

第一步:创建WebSocketConfig类,通过@EnableWebSocketMessageBroker 启用代理支持的消息传递。

第二步:重写registerStompEndpoints和configureMessageBroker方法。

第三步:注册对外可访问的stomp端点、访问方式和连接跨域设置。

第四步:配置消息代理。可设置广播模式和点对点通讯。也可以添加订阅通道的前缀。

package com.itdragon.server.config
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer @Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer { override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 设置订阅Broker名称,/topic为广播模式
config.enableSimpleBroker("/topic")
// 设置应用程序全局目标前缀
config.setApplicationDestinationPrefixes("/itdragon")
} override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 允许使用socketJs方式访问,访问端点为socket,并允许跨域
registry.addEndpoint("/socket").setAllowedOrigins("*").withSockJS()
} }

注意:

若使用了setApplicationDestinationPrefixes方法,则作用主要体现在@SubscribeMapping和@MessageMapping上。如控制层配置@MessageMapping("/sendToServer"),则客户端发送的地址是 /itdragon/sendToServer

3.3 对外暴露接口

第一步:创建WebSocket的控制层类,并注入用于发送消息的SimpMessagingTemplate。

第二步:配置通过@MessageMapping注解修饰的方法来接收客户端SEND的操作。

第三步:配置通过@SubscribeMapping注解修饰的方法来接收客户端SUBSCRIBE的操作。

第四步:配置通过@SendTo注解的方法来直接将消息推送的指定地址上。

package com.itdragon.server.api.rest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.SendTo
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.messaging.simp.annotation.SubscribeMapping
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestMapping
import java.time.Instant @Controller
class WebSocketController { @Autowired
lateinit var simpMessagingTemplate: SimpMessagingTemplate /**
* 订阅广播,服务器主动推给连接的客户端
* 通过Http请求的方式触发订阅操作
*/
@RequestMapping("/subscribeTopic")
fun subscribeTopicByHttp() {
while (true) {
// 可以灵活设置成通道地址,实现发布订阅的功能
val channel = "/topic/subscribeTopic"
simpMessagingTemplate.convertAndSend(channel, Instant.now())
Thread.sleep(10*1000)
}
} /**
* 订阅广播,服务器主动推给连接的客户端
* 通过Websocket的subscribe操作触发订阅操作
*/
@SubscribeMapping("/subscribeTopic")
fun subscribeTopicByWebSocket(): Long {
return Instant.now().toEpochMilli()
} /**
* 服务端接收客户端发送的消息,类似OnMessage方法
*/
@MessageMapping("/sendToServer")
fun handleMessage(message: String) {
println("message:{$message}")
} /**
* 将客户端发送的消息广播出去
*/
@MessageMapping("/sendToTopic")
@SendTo("/topic/subscribeTopic")
fun sendToTopic(message: String): String {
return message
} }

WebSocket的订阅功能,可以用@SubscribeMapping注解,也可以用HTTP的方式触发。ITDragon龙 比较倾向HTTP的方式,因为在实现身份验证的功能上会比较方便。在客户端发送订阅操作之前,先发送HTTP请求做身份验证,验证成功后再返回指定的订阅通道地址。

4. 前端对接测试

在做消息通道对接的测试中,最常见的对话就是:连上了吗?没连上;收到了吗?没收到;收到了吗?收到了,后端报错....... 作为技术人员,我们有必要对各个领域的知识都有一定的了解。只有清楚明白了前端和移动端的开发思维,我们才能提供更合适的接口。

4.1 前端代码

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<title>WebSocket 发布订阅</title>
<link rel="stylesheet" type="text/css" href="https://cdn.staticfile.org/antd/3.23.6/antd.min.css">
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
</head>
<body>
<h2 id="connStatus"></h2>
<div style="padding-left:20px;">
<p>/topic/subscribeTopic 订阅广播通道;/sendToServer 向服务端推送消息;/sendToTopic 将消息广播出去</p>
<div>
<label>订阅通道: </label> <input type="text" id="subscribeChannel" value=""/><br>
<label>推送通道: </label> <input type="text" id="sendChannel" value=""/><br>
<button id="subscribe" onclick="subscribe()">订阅</button>
<button id="connect" onclick="connect()">连接</button>
<button id="disconnect" onclick="disconnect();">断开连接</button>
</div>
<br>
<div>
<label>用户名称: </label> <input type="text" id="name"/><br>
<label>输入消息: </label> <input type="text" id="message"/><br>
<button id="send" onclick="sendMsg();">发送</button>
<p id="response"></p>
</div>
</div> <script type="text/javascript">
var stompClient = null;
var host="http://localhost:8809";
function subscribe() {
$.get(host+'/subscribeTopic');
} function connect() {
var socket = new SockJS(host+'/socket');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
$('#connStatus').html('Connected:' + frame)
stompClient.subscribe($('#subscribeChannel').val(), function(response) {
$('#response').html(response.body);
});
},function(err){
console.log(err)
});
} function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
$('#connStatus').html('Disconnected')
} function sendMsg() {
var message = $('#message').val();
stompClient.send($('#sendChannel').val(), {}, message);
}
</script>
</body>
</html>

4.2 测试效果

简单测试了发布和订阅功能

5. 原生WebSocket配置

有的特殊场景需要检测WebSocket的生命周期,还是会用到原生的WebSocket配置,这里记录一下对应的坑。

5.1 配置类注册Bean

在任意一个配置类中添加ServerEndpointExporter的Bean配置

@Bean
fun serverEndpointExporter(): ServerEndpointExporter {
return ServerEndpointExporter()
}

问题:

  • 1)添加后单元测试启动失败,服务可以正常启动。网上说可以移除代码,由SpringBoot管理。可是移除后websocket链接会出现问题。解决方法目前未找到。

5.2 创建WebSocketServer

第一步:通过@ServerEndpoint注解修饰类,表示该类是WebSocket的Server,并对外暴露连接地址。

第二步:通过@OnOpen、@OnClose、@OnMessage、@OnError注解修饰方法,监控WebSocket的生命周期。

第三步:通过静态、私有、ConcurrentHashMap 修饰的变量管理客户端。

第四步:为程序其他类提供发送消息的方法。

package com.itdragon.server.config
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import javax.websocket.*
import javax.websocket.server.PathParam
import javax.websocket.server.ServerEndpoint @ServerEndpoint("/nativeSocket/{clientKey}")
@Service
class WebSocketServer { private var logger = LoggerFactory.getLogger(WebSocketServer::class.java)
private var session: Session? = null
private var clientKey = "" @OnOpen
fun onOpen(session: Session, @PathParam("clientKey") clientKey: String) {
this.session = session
this.clientKey = clientKey
if (webSocketMap.containsKey(clientKey)) {
webSocketMap.remove(clientKey)
webSocketMap[clientKey] = this
} else {
webSocketMap[clientKey] = this
}
logger.info("客户端:$clientKey 连接成功") } @OnClose
fun onClose() {
if (webSocketMap.containsKey(clientKey)) {
webSocketMap.remove(clientKey)
}
logger.warn("客户端:$clientKey 连接关闭") } @OnMessage
fun onMessage(message: String, session: Session) {
logger.info("客户端:$clientKey 收到消息:$message")
} @OnError
fun onError(session: Session, error: Throwable) {
logger.error("WebSocket客户端(${this.clientKey})错误: ${error.message}")
} @Throws(IOException::class)
fun sendMessage(message: String) {
this.session!!.basicRemote.sendText(message)
} companion object {
private val webSocketMap = ConcurrentHashMap<String, WebSocketServer>() @Throws(IOException::class)
fun sendMessage(clientKey: String, message: String) {
webSocketMap[clientKey]?.sendMessage(message)
} fun getStatus(clientKey: String): Boolean? {
return webSocketMap[clientKey]?.session?.isOpen
} }
}

问题:

5.3 前端测试

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<title>WebSocket 简单通讯</title>
<link rel="stylesheet" type="text/css" href="https://cdn.staticfile.org/antd/3.23.6/antd.min.css">
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
</head>
<body>
<div>
<label>订阅通道: </label> <input type="text" id="clientId" value=""/><br>
<label>推送消息: </label> <input type="text" id="message" value=""/><br>
<button id="subscribe" onclick="openSocket()">开启socket</button>
<button id="connect" onclick="sendMessage()">发送消息</button>
<button id="disconnect" onclick="closeSocket();">关闭socket</button>
</div>
<script>
var socket;
function openSocket() {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else{
console.log("您的浏览器支持WebSocket");
var socketUrl="ws://localhost:8809/nativeSocket/"+$("#clientId").val();
if(socket!=null){
socket.close();
socket=null;
}
socket = new WebSocket(socketUrl);
socket.onopen = function() {
console.log("websocket已打开");
};
socket.onmessage = function(msg) {
console.log(msg.data);
};
socket.onclose = function() {
console.log("websocket已关闭");
};
socket.onerror = function() {
console.log("websocket发生了错误");
}
}
}
function sendMessage() {
if(socket!=null){
socket.send($("#message").val());
}
}
function closeSocket() {
socket.close();
}
</script>
</body>
</html>

通过stomp客户端发起的认证操作可以看一下这篇文章:https://www.cnblogs.com/jmcui/p/8999998.html