Java UDP Server的轻量级实现
实现方法
接收线程:只处理收包,收完后之后放入工作线程
发送线程:负责发送udp包到其它的server
工作线程:解析包体,实现业务逻辑
工作线程消息处理:在工作线程中解析出协议包体后,根据messageId实现消息处理
主要的java类
ServerManager.java
SenderThread.java
ReceiverThread.java
WorkThread.java
ServerManager.java 相关代码
public static void
startListener()
throws UDPException{
|
SenderThread.java 相关代码:
逻辑:读取队列中的消息。并实现发送
@Override public void run() { log.info("UDP Sender thread started"); while(ServerManager.tdUdpServerIsStart) try { UDPMessage reqBody = senderQueue.take();//队列中没有时会一直等待 byte[] bodyBit = reqBody.getMsgBits(); if(reqBody.getHeader() == null){ log.warn("Header is null"); continue; } if(reqBody.getHeader().getMessageId() == null){ log.warn("MessageId is null!"); continue; } // log.debug("Send head:"+reqBody.getHeader().toString()); // log.info(String.format("Preprocess messageId:%s,len:%s,sessionId:%s to %s", reqBody.getHeader().getMessageId(),bodyBit.length, reqBody.getHeader().getSessionId(), reqBody.getHeader().getSocketAddress())); // byte[] body = ProtMsgData.makeMsgStream(reqBody.getSessionId(), reqBody.getMessageId().getMessageId(), bodyBit); byte[] body = reqBody.getMsgBits(); DatagramPacket dataPacket = new DatagramPacket(body, body.length, reqBody.getHeader().getSocketAddress()); //测试CODE // DatagramPacket dataPacket = new DatagramPacket(body, body.length, // InetAddress.getByName("10.10.10.100"), 6060); // DatagramSocket dataSocket = new DatagramSocket(reqBody.getHeader().getInetSocketAddress().getPort()); // DatagramSocket dataSocket = new DatagramSocket(45677); DatagramSocket dataSocket = getSocket(reqBody.getHeader().getInetSocketAddress().getPort()); if(dataSocket == null){ log.warn(String.format("Send error!Not found socket by port:%s",reqBody.getHeader().getInetSocketAddress().getPort())); continue; } log.info(String.format("Send message to %s,id:%s,len:%s,data:%s",reqBody.getHeader().getSocketAddress().toString(),reqBody.getHeader().getMessageId(),body.length,ProtUtils.getHexByString(body))); dataSocket.send(dataPacket); //如果需要接收消息,先将记录保存起来 // if (reqBody.isNeedReceive()) { // UDPSessionManager.add(reqBody); // } } catch (Exception e) { log.error("Send message error", e); } }
|
ReceiverThread.java 相关代码
封装数据,给工作线程使用。也可以在这里解析协议的头,方便后续使用
@Override public void run() { Init(); } public void Init() { while(UCSIServerManager.tdUdpServerIsStart){ try { byte[] receiveBody = new byte[BUFFER_SIZE]; DatagramPacket dataPacketBody = new DatagramPacket(receiveBody, receiveBody.length); if(dataPacketBody == null) continue; ServerManager.receive(dataPacketBody); if(dataPacketBody == null) continue; //添加到工作线程队列 ProtRespInfo respInfo = new ProtRespInfo(null,dataPacketBody); UDPWorkThread.push(respInfo); } catch (Exception e) { } } }
|
WorkThread.java 相关代码
@Override public void run() { while(UCSIServerManager.tdUdpServerIsStart){ try { ProtRespInfo packet = workQueue.take(); UDPReceiverHandler handler = new UDPReceiverHandler(packet); handler.receive(); } catch (InterruptedException e) { e.printStackTrace(); continue; } } } public static void push(ProtRespInfo respInfo){ try { workQueue.put(respInfo); } catch (Exception e) { e.printStackTrace(); } }
|
UDPReceiverHandler.java
实现在解析协议头和协议体。并根据messageId来调用对应的消息实现,我这里使用的注解的方式,程序启时扫描相关注解,如果新加协议只需要实现对应的接口,并加上注解。
public void receive(){ log.debug("UDPReceiver receive..."); UDPHeader header = getUDPHeader(respInfo.getDeviceType()); if(header == null){ log.warn("Not found head by port:"+respInfo.getDataPacket().getPort()); return; } header.setSocketAddress(respInfo.getDataPacket().getSocketAddress()); header.data = respInfo.getDataPacket().getData(); header.len = header.getHeaderSize(); header.decode(); header.setDeviceType(respInfo.getDeviceType()); respInfo.setHeader(header); if(respInfo.getHeader() == null || respInfo.getHeader().getMessageId() == null){ log.warn("消息头错误,来自IP:"+respInfo.getHeader().getRemoteAddr()+",端口:"+respInfo.getHeader().getRemotePort()); return; } try { IReceiverHandler handler = ReceiverHandlerManager.get(respInfo.getHeader().getMessageId().getMessageId()); if(handler == null){ log.error("没有找到对应的消息Handler,ID:"+respInfo.getHeader().getMessageId()); return; } handler.handler(header,respInfo); } catch (Exception e) { log.error("",e); } }
|
小结
丢包严重:可以尝试增加DatagramSocket.setReceiveBufferSize来增加缓冲区大小
数据解析:定义好协议后,将协议头和协议体抽象出来。我这里抽象了一个UDPCoder(一些数据的公用read方法和write方法),UDPHeader,UDPBody。
PS:有空之后一定把源码整理出来