STOMP¶
STOMP是面向流文本的消息传输协议Streaming Text Oriented Messaging Protocol,是 WebSocket 通信标准的一部分,属于业务层的控制协议Wire protocol,协议规范。在通常的发布订阅语义之上,它通过 begin、 commit、rollback 事物序列以及ack确认机制来提供消息可靠的投递。由于协议简单且易于实现,几乎所有的编程语言都有 STOMP 的客户端实现。
可以通过STOMP协议访问WeEvent的发布订阅相关功能。
协议说明¶
- 支持STOMP协议的
1.1、1.2版本。暂时不支持消息确认ACK和事务Transaction语义。 - 传输协议方面,同时支持
STOMP Over WebSocket和STOMP Over SockJS。
Java语言¶
Spring Boot 环境¶
Spring对STOMP的支持需要依赖,以
gradle为例:implementation("org.springframework.boot:spring-boot-starter-websocket")
代码样例¶
第一步:创建链接
// standard web socket transport
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
// MappingJackson2MessageConverter
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
ListenableFuture<StompSession> f = stompClient.connect("ws://localhost:8080/weevent/stomp", getWebsocketSessionHandlerAdapter());
StompSession stompSession = f.get();
心跳说明
WeEvent使用单向心跳机制,客户端发送心跳,服务端不发心跳。默认时间间隔为30s。修改心跳方案。
配置心跳时间间隔:修改配置文件
./broker/conf/weevent.properties,stomp.heartbeats=30。
第二步:发布事件
StompSession.Receiptable receiptable = stompSession.send("com.weevent.test", "hello world, from web socket");
log.info("send result, receipt id: {}", receiptable.getReceiptId());
Topic 为com.weevent.test。用户可以获取到Receiptable,并且通过receiptable.getReceiptId(),可以获取相应的回执。
第三步:订阅事件
StompSession.Subscription subscription = stompSession.subscribe(topic, new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
logger.info("subscribe handleFrame, header: {} payload: {}", headers, payload);
}
});
说明
topic订阅的主题StompFrameHandler,对StompFrame和StompHeaders进行处理的方法。
订阅事件扩展
- 通过修改配置,进行
header扩展。 - 配置
eventId,提高订阅的效率。如果不设置,则默认为取最新内容。
StompHeaders header = new StompHeaders();
header.setLogin("root");
header.setPasscode("123456");
header.set("eventId","2cf24dba-59-1124");
header.setDestination(topic);
StompSession.Subscription subscription = stompSession.subscribe(header, new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
logger.info("subscribe handleFrame, header: {} payload: {}", headers, payload);
}
});
上述样例完整的代码,请参见STOMP代码样例 。
Spring环境¶
- 依赖说明
- 代码实现和上面
spring boot一样
其他语言的适配¶
各种语言的开源STOMP客户端,参见https://stomp.github.io/implementations.html。