Netty+WebSocket 获取火币交易所数据项目
- 2019 年 10 月 3 日
- 筆記
Netty+WebSocket ?????????????
???????GitHub?? spring-boot-netty-websocket-huobi
????
????? SpringBoot+Netty???WebSocket???,??????Websocket????,???????????????????????
??????????????????????????????????????
?????????????????????????????,?????????????????????????????????????????????
????
SpringBoot2.1.5 +Netty4.1.25 + Maven3.5.4 + lombok(??)
????
????Springboot???Application.java
,??????????????????????
??
??????
1???????
???????????????????Websocket?????
/** * ?????????websocket?? */ @PostConstruct public void firstSub() { try { huobiProMainService.start(); } catch (Exception e) { log.error("huobi ????????", e); } }
2????????
????????????????????
???????????????????????
??????????????????????
@Override public synchronized List<String> getChannelCache() { // ??????????????? List<String> list = Lists.newArrayList("btcusdt"); return list; }
3????????Websocket,??????????
?????WebSocket????,????????????????????????????????????????????????????????????
/** * ????????? * * @param channelList ????? * @param topicFormat ????????? */ private void firstSub(List<String> channelList, String topicFormat) { //??huoBiProWebSocketService?? klineClient = new HuoBiProWebSocketClient(huoBiProWebSocketService); //???????websocket klineClient.start(); for (String channel : channelList) { //??????? klineClient.addSub(formatChannel(topicFormat, channel)); } }
???????websocket????
????????????????????????????Bootstrap
????????????????ServerBootstrap
?
?????????????????????????????????????handler???Handler
,????????SimpleChannelInboundHandler
?
/** * ??WebSocket? * * @param uri url???URI * @param handler ???? */ protected void connectWebSocket(final URI uri, SimpleChannelInboundHandler handler) { try { String scheme = uri.getScheme() == null ? "http" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("http".equalsIgnoreCase(scheme) || "ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { System.out.println("Only WS(S) is supported"); throw new UnsupportedAddressTypeException(); } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } group = new NioEventLoopGroup(2); //?????Bootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } //pipeline????????handler,????????hanler pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler); } }); channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { log.error(" webSocketClient start error.", e); if (group != null) { group.shutdownGracefully(); } } }
4????handler
???Handler??????????????????SimpleChannelInboundHandler???????????????channelRead0
???????msg??????????????????????
/** * @Description: ???WebSocket ????? * ??????handler ??????? */ @Slf4j public class HuoBiProWebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private WebSocketClientHandshaker handshaker; private HuoBiProWebSocketClient client; /** * ?handel??????? */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof BinaryWebSocketFrame) { //?????????????????????? BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; //????????? client.onReceive(decodeByteBuf(binaryFrame.content())); } else if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame; client.onReceive(textWebSocketFrame.text()); } } }
???????
1??????
?????????????????????????Websocket???????????
???????????????????????????????????????
????????????????????????????????
2???????????????
???????Websocket???????????????????????????????????????????????????????
????????????????????????????????????????????
?????????????btcusdt
,??????????????????
3???????
?????????????????????????????
??????????????????????????????????????????????????????????????
?????????????????
????????????????????????????????????????????????
?????????btcusdt
???,????????k?
??,??????????????????????,
?????????????????????????????????????????????????????????
??
????????? Netty ? Websocket ??????????????
????????????????????????????????????????????????????????????(1?