望星辰大海 阅读(14) 评论(0)

简言

  1. 首先我们得先弄清楚websocket和tcpSocket的区别。websocket也是基于tcp的应用层协议,只是在传统的socket上进行了封装。
  2. 然后我们要知道netty的handle支持动态增删。

综上所述,相信脑海里已经有对应的方案了。那就是动态设置编解码处理器。

 

关键代码如下:

        // 1.socket方式服务
            // 设置N秒没有读到数据,则触发一个READER_IDLE事件。
            pipeline.addLast(new IdleStateHandler(readerIdleTime,0,0, TimeUnit.SECONDS));
            pipeline.addLast("active",new ChannelActiveHandle());
            pipeline.addLast("socketChoose",new SocketChooseHandle());

            //tcpsocket编码解码handle,如果是websocket链接,会将其删除
            pipeline.addLast("lengthEncode",new LengthFieldPrepender(4, false));
            pipeline.addLast("lengthDecoder",new LengthFieldBasedFrameDecoder(2000, 0, 4,0, 4));
            pipeline.addLast(bytebufToByteHandle);

            //因为接收类型的泛型不对,所以在websocket握手的时候不会进入该handle
            //此handle为最后的socket消息分解,web和tcp通用
            pipeline.addLast("byteToBuf",byteToByteBufHandle);
            pipeline.addLast("protocolResolve",protocolResolveHandle);
/**
 * 协议初始化解码器.
 *
 * 用来判定实际使用什么协议.</b>
 *
 */
public class SocketChooseHandle extends ByteToMessageDecoder {
    /** 默认暗号长度为23 */
    private static final int MAX_LENGTH = 23;
    /** WebSocket握手的协议前缀 */
    private static final String WEBSOCKET_PREFIX = "GET /";


    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        String protocol = getBufStart(in);
        if (protocol.startsWith(WEBSOCKET_PREFIX)) {
            SpringApplicationContextHolder.getSpringBeanForClass(PipelineAdd.class).websocketAdd(ctx);

            ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class);
            ctx.pipeline().remove(LengthFieldPrepender.class);
            ctx.pipeline().remove(BytebufToByteHandle.class);
        }

        in.resetReaderIndex();
        ctx.pipeline().remove(this.getClass());

    }





    private String getBufStart(ByteBuf in){
        int length = in.readableBytes();
        if (length > MAX_LENGTH) {
            length = MAX_LENGTH;
        }

        // 标记读位置
        in.markReaderIndex();
        byte[] content = new byte[length];
        in.readBytes(content);
        return new String(content);
    }
}
  public  void websocketAdd(ChannelHandlerContext ctx){

        // HttpServerCodec:将请求和应答消息解码为HTTP消息
        ctx.pipeline().addBefore("byteToBuf","http-codec",new HttpServerCodec());

        // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
        ctx.pipeline().addBefore("byteToBuf","aggregator",new HttpObjectAggregator(65535));

        // ChunkedWriteHandler:向客户端发送HTML5文件
        ctx.pipeline().addBefore("byteToBuf","http-chunked",new ChunkedWriteHandler());

        ctx.pipeline().addBefore("byteToBuf","WebSocketAggregator",new WebSocketFrameAggregator(65535));

        // 在管道中添加我们自己的接收数据实现方法
        ctx.pipeline().addBefore("byteToBuf","ws-handShake",wsHandShakeServerHandle);

        // 后续直接走消息处理
        ctx.pipeline().addBefore("byteToBuf","wsPack",wsPacketHandle);

        // 编码。将通用byteBuf编码成binaryWebSocketFrame.通过前面的编码器
        ctx.pipeline().addBefore("byteToBuf","bufToFrame",bytebufToBinaryFrameHandle);


    }

 

说明:

首先我们先添加好SocketChooseHandle(),这是我们的handle判断处理器。如果判断协议是以GET /开头的话,那么必定是websocket的连接握手。

而又因为socket连接是不进SocketChooseHandle的破方法的,导致我们必须在初始化的时候就把socket的处理写在后面。

继续说websocket的处理。

当我们检测到时websocket连接的时候,我们会移除掉socket的编解码处理器,然后再移除自己。(下次进来就直接处理websocketframe了,所以不需要再次进行这个判断处理器)

然后websocket顺利的进入动态添加的编码器,进行websocket的握手handshake。然后进行下一轮通信。

 

反之,如果是tcpsocket连接,会直接走tcpSocketHandle处理。这里我们用的LengthFieldBasedFrameDecoder长度占包粘包处理器。

protocolResolveHandle是我们的业务处理器handle。

 

 

详细代码请参考nafos-network: https://gitee.com/huangxinyu/BC-NETTYGO