介紹
由于前段時間我實現了一個庫【Spring Cloud】一個配置注解實現 WebSocket 集群方案
以至于我對WebSocket的各種集成方式做了一些研究
目前我所了解到的就是下面這些了(就一個破ws都有這么多花里胡哨的集成方式了?)
Javax
WebMVC
WebFlux
Java-WebSocket
SocketIO
Netty
今天主要介紹一下前3種方式,畢竟現在的主流框架還是Spring Boot
而后3種其實和Spring Boot并不強行綁定,基于Java就可以支持,不過我也會對后3種做個簡單的介紹,大家先混個眼熟就行了
那么接下來我們就來講講前3種方式(Javax,WebMVC,WebFlux)在Spring Boot中的服務端和客戶端配置(客戶端配置也超重要的有木有,平時用不到,用到了卻基本找不到文檔,這也太絕望了)
Javax
在java的擴展包javax.websocket中就定義了一套WebSocket的接口規范
服務端
一般使用注解的方式來進行配置
第一步
@Component @ServerEndpoint("/websocket/{type}") publicclassJavaxWebSocketServerEndpoint{ @OnOpen publicvoidonOpen(Sessionsession,EndpointConfigconfig, @PathParam(value="type")Stringtype){ //連接建立 } @OnClose publicvoidonClose(Sessionsession,CloseReasonreason){ //連接關閉 } @OnMessage publicvoidonMessage(Sessionsession,Stringmessage){ //接收文本信息 } @OnMessage publicvoidonMessage(Sessionsession,PongMessagemessage){ //接收pong信息 } @OnMessage publicvoidonMessage(Sessionsession,ByteBuffermessage){ //接收二進制信息,也可以用byte[]接收 } @OnError publicvoidonError(Sessionsession,Throwablee){ //異常處理 } }
我們在類上添加@ServerEndpoint注解來表示這是一個服務端點,同時可以在注解中配置路徑,這個路徑可以配置成動態的,使用{}包起來就可以了
@OnOpen用來標記對應的方法作為客戶端連接上來之后的回調,Session就相當于和客戶端的連接啦,我們可以把它緩存起來用于發送消息;通過@PathParam注解就可以獲得動態路徑中對應值了
@OnClose用來標記對應的方法作為客戶端斷開連接之后的回調,我們可以在這個方法中移除對應Session的緩存,同時可以接受一個CloseReason的參數用于獲取關閉原因
@OnMessage用來標記對應的方法作為接收到消息之后的回調,我們可以接受文本消息,二進制消息和pong消息
@OnError用來標記對應的方法作為拋出異常之后的回調,可以獲得對應的Session和異常對象
第二步
implementation'org.springframework.boot:spring-boot-starter-websocket' @Configuration(proxyBeanMethods=false) publicclassJavaxWebSocketConfiguration{ @Bean publicServerEndpointExporterserverEndpointExporter(){ returnnewServerEndpointExporter(); } }
依賴Spring的WebSocket模塊,手動注入ServerEndpointExporter就可以了
需要注意ServerEndpointExporter是Spring中的類,算是Spring為了支持javax.websocket的原生用法所提供的支持類
冷知識
javax.websocket庫中定義了PongMessage而沒有PingMessage
通過我的測試發現基本上所有的WebSocket包括前端js自帶的,都實現了自動回復;也就是說當接收到一個ping消息之后,是會自動回應一個pong消息,所以沒有必要再自己接受ping消息來處理了,即我們不會接受到ping消息
當然我上面講的ping和pong都是需要使用框架提供的api,如果是我們自己通過Message來自定義心跳數據的話是沒有任何的處理的,下面是對應的api
//發送ping session.getAsyncRemote().sendPing(ByteBufferbuffer); //發送pong session.getAsyncRemote().sendPong(ByteBufferbuffer);
然后我又發現js自帶的WebSocket是沒有發送ping的api的,所以是不是可以猜想當初就是約定服務端發送ping,客戶端回復pong
客戶端
客戶端也是使用注解配置
第一步
@ClientEndpoint publicclassJavaxWebSocketClientEndpoint{ @OnOpen publicvoidonOpen(Sessionsession){ //連接建立 } @OnClose publicvoidonClose(Sessionsession,CloseReasonreason){ //連接關閉 } @OnMessage publicvoidonMessage(Sessionsession,Stringmessage){ //接收文本消息 } @OnMessage publicvoidonMessage(Sessionsession,PongMessagemessage){ //接收pong消息 } @OnMessage publicvoidonMessage(Sessionsession,ByteBuffermessage){ //接收二進制消息 } @OnError publicvoidonError(Sessionsession,Throwablee){ //異常處理 } }
客戶端使用@ClientEndpoint來標記,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服務端一模一樣
第二步
WebSocketContainercontainer=ContainerProvider.getWebSocketContainer(); Sessionsession=container.connectToServer(JavaxWebSocketClientEndpoint.class,uri);
我們可以通過ContainerProvider來獲得一個WebSocketContainer,然后調用connectToServer方法將我們的客戶端類和連接的uri傳入就行了
冷知識
通過ContainerProvider#getWebSocketContainer獲得WebSocketContainer其實是基于SPI實現的
在Spring的環境中我更推薦大家使用ServletContextAware來獲得,代碼如下
@Component publicclassJavaxWebSocketContainerimplementsServletContextAware{ privatevolatileWebSocketContainercontainer; publicWebSocketContainergetContainer(){ if(container==null){ synchronized(this){ if(container==null){ container=ContainerProvider.getWebSocketContainer(); } } } returncontainer; } @Override publicvoidsetServletContext(@NonNullServletContextservletContext){ if(container==null){ container=(WebSocketContainer)servletContext .getAttribute("javax.websocket.server.ServerContainer"); } } }
發消息
Sessionsession=... //發送文本消息 session.getAsyncRemote().sendText(Stringmessage); //發送二進制消息 session.getAsyncRemote().sendBinary(ByteBuffermessage); //發送對象消息,會嘗試使用Encoder編碼 session.getAsyncRemote().sendObject(Objectmessage); //發送ping session.getAsyncRemote().sendPing(ByteBufferbuffer); //發送pong session.getAsyncRemote().sendPong(ByteBufferbuffer);
WebMVC
依賴肯定是必不可少的
implementation'org.springframework.boot:spring-boot-starter-websocket'
服務端
第一步
importorg.springframework.web.socket.WebSocketHandler; importorg.springframework.web.socket.WebSocketMessage; importorg.springframework.web.socket.WebSocketSession; publicclassServletWebSocketServerHandlerimplementsWebSocketHandler{ @Override publicvoidafterConnectionEstablished(@NonNullWebSocketSessionsession)throwsException{ //連接建立 } @Override publicvoidhandleMessage(@NonNullWebSocketSessionsession,@NonNullWebSocketMessage>message)throwsException{ //接收消息 } @Override publicvoidhandleTransportError(@NonNullWebSocketSessionsession,@NonNullThrowableexception)throwsException{ //異常處理 } @Override publicvoidafterConnectionClosed(@NonNullWebSocketSessionsession,@NonNullCloseStatuscloseStatus)throwsException{ //連接關閉 } @Override publicbooleansupportsPartialMessages(){ //是否支持接收不完整的消息 returnfalse; } }
我們實現一個WebSocketHandler來處理WebSocket的連接,關閉,消息和異常
第二步
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ registry //添加處理器到對應的路徑 .addHandler(newServletWebSocketServerHandler(),"/websocket") .setAllowedOrigins("*"); } }
首先需要添加@EnableWebSocket來啟用WebSocket
然后實現WebSocketConfigurer來注冊WebSocket路徑以及對應的WebSocketHandler
握手攔截
提供了HandshakeInterceptor來攔截握手
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ registry //添加處理器到對應的路徑 .addHandler(newServletWebSocketServerHandler(),"/websocket") //添加握手攔截器 .addInterceptors(newServletWebSocketHandshakeInterceptor()) .setAllowedOrigins("*"); } publicstaticclassServletWebSocketHandshakeInterceptorimplementsHandshakeInterceptor{ @Override publicbooleanbeforeHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Mapattributes)throwsException{ //握手之前 //繼續握手返回true,中斷握手返回false returnfalse; } @Override publicvoidafterHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Exceptionexception){ //握手之后 } } }
冷知識
我在集成的時候發現這種方式沒辦法動態匹配路徑,它的路徑就是固定的,沒辦法使用如/websocket/**這樣的通配符
我在研究了一下之后發現可以在UrlPathHelper上做點文章
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ if(registryinstanceofServletWebSocketHandlerRegistry){ //替換UrlPathHelper ((ServletWebSocketHandlerRegistry)registry) .setUrlPathHelper(newPrefixUrlPathHelper("/websocket")); } registry //添加處理器到對應的路徑 .addHandler(newServletWebSocketServerHandler(),"/websocket/**") .setAllowedOrigins("*"); } publicclassPrefixUrlPathHelperextendsUrlPathHelper{ privateStringprefix; @Override public@NonNullStringresolveAndCacheLookupPath(@NonNullHttpServletRequestrequest){ //獲得原本的Path Stringpath=super.resolveAndCacheLookupPath(request); //如果是指定前綴就返回對應的通配路徑 if(path.startsWith(prefix)){ returnprefix+"/**"; } returnpath; } } }
因為它內部實際上就是用一個Map
主要是有現成的AntPathMatcher實現通配應該不麻煩才對啊
客戶端
第一步
publicclassServletWebSocketClientHandlerimplementsWebSocketHandler{ @Override publicvoidafterConnectionEstablished(@NonNullWebSocketSessionsession)throwsException{ //連接建立 } @Override publicvoidhandleMessage(@NonNullWebSocketSessionsession,@NonNullWebSocketMessage>message)throwsException{ //接收消息 } @Override publicvoidhandleTransportError(@NonNullWebSocketSessionsession,@NonNullThrowableexception)throwsException{ //異常處理 } @Override publicvoidafterConnectionClosed(@NonNullWebSocketSessionsession,@NonNullCloseStatuscloseStatus)throwsException{ //連接關閉 } @Override publicbooleansupportsPartialMessages(){ //是否支持接收不完整的消息 returnfalse; } }
和服務端一樣我們需要先實現一個WebSocketHandler來處理WebSocket的連接,關閉,消息和異常
第二步
WebSocketClientclient=newStandardWebSocketClient(); WebSocketHandlerhandler=newServletWebSocketClientHandler(); WebSocketConnectionManagermanager=newWebSocketConnectionManager(client,handler,uri); manager.start();
首先我們需要先new一個StandardWebSocketClient,可以傳入一個WebSocketContainer參數,獲得該對象的方式我之前已經介紹過了,這邊就先略過
然后new一個WebSocketConnectionManager傳入WebSocketClient,WebSocketHandler還有路徑uri
最后調用一下WebSocketConnectionManager的start方法就可以啦
冷知識
這里如果大家去看WebSocketClient的實現類就會發現有StandardWebSocketClient還有JettyWebSocketClient等等,所以大家可以根據自身項目所使用的容器來選擇不同的WebSocketClient實現類
這里給大家貼一小段Spring適配不同容器WebSocket的代碼
publicabstractclassAbstractHandshakeHandlerimplementsHandshakeHandler,Lifecycle{ privatestaticfinalbooleantomcatWsPresent; privatestaticfinalbooleanjettyWsPresent; privatestaticfinalbooleanjetty10WsPresent; privatestaticfinalbooleanundertowWsPresent; privatestaticfinalbooleanglassfishWsPresent; privatestaticfinalbooleanweblogicWsPresent; privatestaticfinalbooleanwebsphereWsPresent; static{ ClassLoaderclassLoader=AbstractHandshakeHandler.class.getClassLoader(); tomcatWsPresent=ClassUtils.isPresent( "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler",classLoader); jetty10WsPresent=ClassUtils.isPresent( "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer",classLoader); jettyWsPresent=ClassUtils.isPresent( "org.eclipse.jetty.websocket.server.WebSocketServerFactory",classLoader); undertowWsPresent=ClassUtils.isPresent( "io.undertow.websockets.jsr.ServerWebSocketContainer",classLoader); glassfishWsPresent=ClassUtils.isPresent( "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler",classLoader); weblogicWsPresent=ClassUtils.isPresent( "weblogic.websocket.tyrus.TyrusServletWriter",classLoader); websphereWsPresent=ClassUtils.isPresent( "com.ibm.websphere.wsoc.WsWsocServerContainer",classLoader); } }
發消息
importorg.springframework.web.socket.*; WebSocketSessionsession=... //發送文本消息 session.sendMessage(newTextMessage(CharSequencemessage); //發送二進制消息 session.sendMessage(newBinaryMessage(ByteBuffermessage)); //發送ping session.sendMessage(newPingMessage(ByteBuffermessage)); //發送pong session.sendMessage(newPongMessage(ByteBuffermessage));
WebFlux
WebFlux的WebSocket不需要額外的依賴包
服務端
第一步
importorg.springframework.web.reactive.socket.WebSocketHandler; importorg.springframework.web.reactive.socket.WebSocketSession; publicclassReactiveWebSocketServerHandlerimplementsWebSocketHandler{ @NonNull @Override publicMonohandle(WebSocketSessionsession){ Mono send=session.send(Flux.create(sink->{ //可以持有sink對象在任意時候調用next發送消息 sink.next(WebSocketMessagemessage); })).doOnError(it->{ //異常處理 }); Mono receive=session.receive() .doOnNext(it->{ //接收消息 }) .doOnError(it->{ //異常處理 }) .then(); @SuppressWarnings("all") Disposabledisposable=session.closeStatus() .doOnError(it->{ //異常處理 }) .subscribe(it->{ //連接關閉 }); returnMono.zip(send,receive).then(); } }
首先需要注意這里的WebSocketHandler和WebSocketSession是reactive包下的
通過WebSocketSession#send方法來持有一個FluxSink
通過WebSocketSession#receive來訂閱消息
通過WebSocketSession#closeStatus來訂閱連接關閉事件
第二步
@Component publicclassReactiveWebSocketServerHandlerMappingextendsSimpleUrlHandlerMapping{ publicReactiveWebSocketServerHandlerMapping(){ Mapmap=newHashMap<>(); map.put("/websocket/**",newReactiveWebSocketServerHandler()); setUrlMap(map); setOrder(100); } }
注冊一個HandlerMapping同時配置路徑和對應的WebSocketHandler
第三步
@Configuration(proxyBeanMethods=false) publicclassReactiveWebSocketConfiguration{ @Bean publicWebSocketHandlerAdapterwebSocketHandlerAdapter(){ returnnewWebSocketHandlerAdapter(); } }
注入WebSocketHandlerAdapter
冷知識
我們自定義的HandlerMapping需要設置order,如果不設置,默認為Ordered.LOWEST_PRECEDENCE,會導致這個HandlerMapping被放在最后,當有客戶端連接上來時會被其他的HandlerMapping優先匹配上而連接失敗
客戶端
第一步
publicclassReactiveWebSocketClientHandlerimplementsWebSocketHandler{ @NonNull @Override publicMonohandle(WebSocketSessionsession){ Mono send=session.send(Flux.create(sink->{ //可以持有sink對象在任意時候調用next發送消息 sink.next(WebSocketMessagemessage); })).doOnError(it->{ //處理異常 }); Mono receive=session.receive() .doOnNext(it->{ //接收消息 }) .doOnError(it->{ //異常處理 }) .then(); @SuppressWarnings("all") Disposabledisposable=session.closeStatus() .doOnError(it->{ //異常處理 }) .subscribe(it->{ //連接關閉 }); returnMono.zip(send,receive).then(); } }
客戶端WebSocketHandler的寫法和服務端的一樣
第二步
importorg.springframework.web.reactive.socket.client.WebSocketClient; WebSocketClientclient=ReactorNettyWebSocketClient(); WebSocketHandlerhandler=newReactiveWebSocketClientHandler(); client.execute(uri,handler).subscribe();
首先我們需要先new一個ReactorNettyWebSocketClient
然后調用一下WebSocketClient的execute方法傳入路徑uri和WebSocketHandler并繼續調用subscribe方法就行啦
冷知識
和WebMVC中的WebSocketClient一樣,Reactive包中的WebSocketClient也有很多實現類,比如ReactorNettyWebSocketClient,JettyWebSocketClient,UndertowWebSocketClient,TomcatWebSocketClient等等,也是需要大家基于自身項目的容器使用不同的實現類
這里也給大家貼一小段Reactive適配不同容器WebSocket的代碼
publicclassHandshakeWebSocketServiceimplementsWebSocketService,Lifecycle{ privatestaticfinalbooleantomcatPresent; privatestaticfinalbooleanjettyPresent; privatestaticfinalbooleanjetty10Present; privatestaticfinalbooleanundertowPresent; privatestaticfinalbooleanreactorNettyPresent; static{ ClassLoaderloader=HandshakeWebSocketService.class.getClassLoader(); tomcatPresent=ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler",loader); jettyPresent=ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory",loader); jetty10Present=ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer",loader); undertowPresent=ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler",loader); reactorNettyPresent=ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse",loader); } }
發消息
我們需要使用在WebSocketHandler中獲得的FluxSink
importorg.springframework.web.reactive.socket.CloseStatus; importorg.springframework.web.reactive.socket.WebSocketMessage; importorg.springframework.web.reactive.socket.WebSocketSession; publicclassReactiveWebSocket{ privatefinalWebSocketSessionsession; privatefinalFluxSinksender; publicReactiveWebSocket(WebSocketSessionsession,FluxSink sender){ this.session=session; this.sender=sender; } publicStringgetId(){ returnsession.getId(); } publicURIgetUri(){ returnsession.getHandshakeInfo().getUri(); } publicvoidsend(Objectmessage){ if(messageinstanceofWebSocketMessage){ sender.next((WebSocketMessage)message); }elseif(messageinstanceofString){ //發送文本消息 sender.next(session.textMessage((String)message)); }elseif(messageinstanceofDataBuffer){ //發送二進制消息 sender.next(session.binaryMessage(factory->(DataBuffer)message)); }elseif(messageinstanceofByteBuffer){ 發送二進制消息 sender.next(session.binaryMessage(factory->factory.wrap((ByteBuffer)message))); }elseif(messageinstanceofbyte[]){ 發送二進制消息 sender.next(session.binaryMessage(factory->factory.wrap((byte[])message))); }else{ thrownewIllegalArgumentException("Messagetypenotmatch"); } } publicvoidping(){ //發送ping sender.next(session.pingMessage(factory->factory.wrap(ByteBuffer.allocate(0)))); } publicvoidpong(){ //發送pong sender.next(session.pongMessage(factory->factory.wrap(ByteBuffer.allocate(0)))); } publicvoidclose(CloseStatusreason){ sender.complete(); session.close(reason).subscribe(); } }
Java-WebSocket
這是一個純java的第三方庫,專門用于實現WebSocket
SocketIO
該庫使用的協議是經過自己封裝的,支持很多的語言,提供了統一的接口,所以需要使用它提供的Server和Client來連接,如socket.io-server-java和socket.io-client-java
這個庫我了解下來主要用于實時聊天等場景,所以如果只是普通的WebSocket功能就有點大材小用了
Netty
這個大家應該都比較熟悉了,就算沒用過肯定也聽過
網上的文檔和示例也非常多,我這里就不介紹有的沒的了,Github傳送門。
審核編輯:劉清
-
二進制
+關注
關注
2文章
803瀏覽量
42012 -
JAVA語言
+關注
關注
0文章
138瀏覽量
20436 -
緩存器
+關注
關注
0文章
63瀏覽量
11790 -
WebSocket
+關注
關注
0文章
30瀏覽量
3951
原文標題:WebSocket 的 6 種集成方式
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
淺談集成FPGA的兩種方式:eFPGA(SoC)& cFPGA(SiP)

求一種基于FPGA的A型數字式超聲系統的構成方式
SPWM信號主要有3種生成方式
基于TCP的一種新的網絡協議WebSocket
一種基于凸殼算法的SVM集成方法
智能建筑系統集成方式的研究及應用
一種BACnet網絡與Internet的集成方案
WebSocket有什么優點
鴻蒙上WebSocket的使用方法
websocket協議的原理

萬界星空科技MES數據的集成方式

AWTK-WEB 快速入門(6) - JS WebSocket 應用程序

評論