Netty基础—7.Netty实现消息推送服务
- 作者: 五速梦信息网
- 时间: 2026年03月21日 04:36
{
...
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
sendHttpResponse(ctx, request, response);
return;
}
logger.info("Receive handshake request from client: " + ctx.channel());
//握手建立
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8998/push", null, false);
webSocketServerHandshaker = factory.newHandshaker(request);
if (webSocketServerHandshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
webSocketServerHandshaker.handshake(ctx.channel(), request);
logger.info("Netty push server handshake with client: " + ctx.channel());
}
}
//HTTP响应
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
if (response.status().code() != RESPONSE_CODE_OK) {
ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(byteBuf);
logger.info("Http Response is not ok: " + byteBuf.toString(CharsetUtil.UTF_8));
byteBuf.release();
}
ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);
if (response.status().code() != RESPONSE_CODE_OK) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
...
}
**(7)消息推送系统的运营客户端**
```csharp
public class OperationNettyClient {
private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);
private static final String WEB_SOCKET_SCHEME = "ws";
private static final String WSS_SCHEME = "wss";
private static final String LOCAL_HOST = "127.0.0.1";
private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");
private static URI uri;
private static String scheme;
private static String host;
private static int port;
private static SslContext sslContext;
private EventLoopGroup eventLoopGroup;
public void start() throws Exception {
//...
}
public static void main(String[] args) throws Exception {
uri = new URI(PUSH_SERVER_URI);
scheme = getScheme(uri);
host = getHost(uri);
port = getPort(uri, scheme);
checkScheme(scheme);
initSslContext(scheme);
}
private static String getScheme(URI pushServerUri) {
return pushServerUri.getScheme() == null ? WEB_SOCKET_SCHEME : pushServerUri.getScheme();
}
private static String getHost(URI pushServerUri) {
return pushServerUri.getHost() == null ? LOCAL_HOST : pushServerUri.getHost();
}
private static int getPort(URI pushServerUri, String scheme) {
int port;
if (pushServerUri.getPort() == -1) {
if (WEB_SOCKET_SCHEME.equals(scheme)) {
port = 80;
} else if(WSS_SCHEME.equals(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = pushServerUri.getPort();
}
return port;
}
//检查scheme是否是ws或wss
private static void checkScheme(String scheme) {
if (!WEB_SOCKET_SCHEME.equals(scheme) && !WSS_SCHEME.equals(scheme)) {
logger.error("Only Support ws or wss scheme.");
throw new RuntimeException("Only Support ws or wss scheme.");
}
}
//如果WebSocket使用了SSL,也就是wss,那么初始化对应的sslContext
private static void initSslContext(String scheme) throws Exception {
boolean enableSSL = WSS_SCHEME.equals(scheme);
if (enableSSL) {
sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslContext = null;
}
}
}
(8)运营客户端连接PushServer
public class OperationNettyClient {
private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);
private static final String WEB_SOCKET_SCHEME = "ws";
private static final String WSS_SCHEME = "wss";
private static final String LOCAL_HOST = "127.0.0.1";
private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");
private static final String INPUT_MESSAGE_QUIT = "quit";
private static final String INPUT_MESSAGE_CLOSE = "close";
private static final String INPUT_MESSAGE_PING = "ping";
private static URI uri;
private static String scheme;
private static String host;
private static int port;
private static SslContext sslContext;
private EventLoopGroup eventLoopGroup;
public Channel start() throws Exception {
logger.info("Operation Netty Client is connecting.");
eventLoopGroup = new NioEventLoopGroup();
WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
final OperationNettyClientHandler operationNettyClientHandler = new OperationNettyClientHandler(webSocketClientHandshaker);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
if (sslContext != null) {
channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));
}
channelPipeline.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(WebSocketClientCompressionHandler.INSTANCE)
.addLast(operationNettyClientHandler);
}
});
Channel channel = bootstrap.connect(uri.getHost(), port).sync().channel();
logger.info("Operation Netty Client connected to push server.");
operationNettyClientHandler.channelFuture().sync();
return channel;
}
public void shutdownGracefully() {
eventLoopGroup.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
uri = new URI(PUSH_SERVER_URI);
scheme = getScheme(uri);
host = getHost(uri);
port = getPort(uri, scheme);
checkScheme(scheme);
initSslContext(scheme);
OperationNettyClient operationNettyClient = new OperationNettyClient();
try {
Channel channel = operationNettyClient.start();
} finally {
operationNettyClient.shutdownGracefully();
}
}
...
}
(9)运营客户端的Handler处理器
public class OperationNettyClientHandler extends SimpleChannelInboundHandler {
private static final Logger logger = LogManager.getLogger(OperationNettyClientHandler.class);
private WebSocketClientHandshaker webSocketClientHandshaker;
private ChannelFuture channelFuture;
public OperationNettyClientHandler(WebSocketClientHandshaker webSocketClientHandshaker) {
this.webSocketClientHandshaker = webSocketClientHandshaker;
}
public ChannelFuture channelFuture() {
return channelFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
webSocketClientHandshaker.handshake(ctx.channel());
logger.info("Operation Netty Client send WebSocket handshake request.");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("netty client disconnected.");
}
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
if (!webSocketClientHandshaker.isHandshakeComplete()) {
try {
webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);
logger.info("Netty Client connected.");
((ChannelPromise)channelFuture).setSuccess();
} catch(WebSocketHandshakeException e) {
logger.error("WebSocket handshake failed.", e);
((ChannelPromise)channelFuture).setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException("Not Supported HTTP Response.");
}
WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
if (webSocketFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
logger.info("Receives text frame: " + textWebSocketFrame.text());
} else if(webSocketFrame instanceof PongWebSocketFrame) {
logger.info("Receives pong frame: " + webSocketFrame);
} else if(webSocketFrame instanceof CloseWebSocketFrame) {
logger.info("Receives close WebSocket frame, Netty Client is closing.");
channel.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Operation Netty client handler exception caught.", cause);
if (!channelFuture.isDone()) {
((ChannelPromise)channelFuture).setFailure(cause);
}
ctx.close();
}
}
(10)运营客户端发送推送消息
public class OperationNettyClient {
...
public void waitInputMessage(Channel channel) throws Exception {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while(true) {
logger.info("Wait for input message.");
String message = bufferedReader.readLine();
if (INPUT_MESSAGE_QUIT.equals(message)) {
break;
} else if(INPUT_MESSAGE_CLOSE.equals(message)) {
channel.writeAndFlush(new CloseWebSocketFrame());
channel.closeFuture().sync();
break;
} else if(INPUT_MESSAGE_PING.equals(message)) {
WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
channel.writeAndFlush(webSocketFrame);
} else {
WebSocketFrame webSocketFrame = new TextWebSocketFrame(message);
channel.writeAndFlush(webSocketFrame);
}
}
}
public static void main(String[] args) throws Exception {
uri = new URI(PUSH_SERVER_URI);
scheme = getScheme(uri);
host = getHost(uri);
port = getPort(uri, scheme);
checkScheme(scheme);
initSslContext(scheme);
OperationNettyClient operationNettyClient = new OperationNettyClient();
try {
Channel channel = operationNettyClient.start();
//运营客户端发送消息入口
operationNettyClient.waitInputMessage(channel);
} finally {
operationNettyClient.shutdownGracefully();
}
}
...
}
(11)浏览器客户端接收推送消息
<title>websocket网页</title>
本文来自投稿,不代表本站立场,如若转载,请注明出处:http//www.knowhub.vip/share/2/863
<div>
<div>
<div>
</div>
</div>
<p><img src="https://www.knowhub.vip/Detail/Visits/863" alt=""/></p>
<div>
<li>热门的技术博文分享</li>
<li>相关联分享</li>
</div>
</div>
相关文章
-
NETCore.Encrypt:最全的加解密开源库
NETCore.Encrypt:最全的加解密开源库
- 互联网
- 2026年03月21日
-
Nacos使用全解析:从注册中心到配置中心
Nacos使用全解析:从注册中心到配置中心
- 互联网
- 2026年03月21日
-
N+1查询:数据库性能的隐形杀手与终极拯救指南
N+1查询:数据库性能的隐形杀手与终极拯救指南
- 互联网
- 2026年03月21日
-
NFT艺术品是什么(nft艺术品最新信息)
NFT艺术品是什么(nft艺术品最新信息)
- 互联网
- 2026年03月21日
-
Nginx日志分析工具GoAccess,有没有好用的Nginx日志分析工具?
Nginx日志分析工具GoAccess,有没有好用的Nginx日志分析工具?
- 互联网
- 2026年03月21日
-
Nginx使用经验总结,好记性不比烂笔头(键盘)
Nginx使用经验总结,好记性不比烂笔头(键盘)
- 互联网
- 2026年03月21日








