`

netty心跳检查之UDP篇

阅读更多
  部分UDP通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端IP,以便服务器主动发送控制命令。如移动通信,内网穿越等。
  使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。

  学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
  程序逻辑比较简单,不多解释,请看注释。

一、辅助类
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.util.AttributeKey;

/**
 * 记录一些常量。真正的应用要从配置文件中读取。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午11:41:26,作者:wallimn
 */
public class Config {
	private Config(){};
	
	public static final AttributeKey<InetSocketAddress> SERVER_ADDR_ATTR=AttributeKey.newInstance("SERVER_ADDR_ATTR");
	
	//原来打算将客户端的ID记录在Channel的属性中,后来发现对于UDP不适用。
	//public static final AttributeKey<String> CLIENT_ID=AttributeKey.newInstance("CLIENT_ID");
	public static final int IDLE_TIME=5;//允许的发呆时间
	
	public static final int SERVER_PORT=8585;
	public static final String SERVER_IP="localhost";
	
	public static final long CLIENT_VALID_THRESHOLD=5000;//客户端地址有效的时间阀值。单位为毫秒。
}


package com.wallimn.iteye.netty.heart;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/**
 * 用来模拟持有数据
 * 
 * <br>
 * <br>时间:2019年9月14日 下午7:58:40,作者:wallimn
 */
public class DataHolder {
	
	private DataHolder(){}
	
	/**
	 * 记录客户端的消息
	 */
	public static ConcurrentLinkedQueue<ClientMessage> clientMessageQueue = new ConcurrentLinkedQueue<ClientMessage>();
	
	/**
	 * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息
	 */
	public static ConcurrentMap<String, ClientInformation> clientInformationMap = new ConcurrentHashMap<String, ClientInformation>();
}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.Date;

import lombok.Data;

/**
 * 客户端信息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientInformation {
	/**
	 * 客户端唯一标识
	 */
	private String id;
	/**
	 * 收到时间
	 */
	private Date recordTime;
	/**
	 * 客户端地址,
	 */
	private InetSocketAddress address;
}


package com.wallimn.iteye.netty.heart;

import lombok.Data;

/**
 * 客户端发来的消息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientMessage {
	/**
	 * 消息
	 */
	private String message;
	
	/**
	 * 客户端信息
	 */
	private ClientInformation client;
}


二、客户端代码
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.UUID;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

/**
 * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午9:09:47,作者:wallimn
 */
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println(msg);
		
		//如果收到exit信息,关闭channel
		if("exit".equals(msg)){
			ctx.close();
		}
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		InetSocketAddress addr = ctx.channel().attr(Config.SERVER_ADDR_ATTR).get();
		String clientId;
		String message;
		
		//发送1条心跳
		clientId = UUID.randomUUID().toString().toUpperCase().replace("-", "");
//		message = clientId+";"+"heart";//发送的信息
//		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
//		System.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录


		message = clientId+";"+"time";//发送的信息
		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
		System.out.println("发送对时信息");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		
		ctx.close();
	}


}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

/**
 * 客户端程序
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ClientApp
 * 
 * <br>
 * <br>时间:2019年9月14日 上午8:58:13,作者:wallimn
 */
public class ClientApp {
	

	public static void main(String[] args) {
		int port = Config.SERVER_PORT;
		new ClientApp().run(port);
	}

	
	public void run(int port){
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
			.channel(NioDatagramChannel.class)
			//.option(ChannelOption.SO_BROADCAST, true)
			.handler(new ClientHandler());
		InetSocketAddress addr = new InetSocketAddress(Config.SERVER_IP,port);
		b.attr(Config.SERVER_ADDR_ATTR, addr);
		
		try {
			Channel ch = b.bind(0).sync().channel();//使用一个随机端口
			
			//最长运行30秒
			if(!ch.closeFuture().await(30000)){
				System.out.println("操作超时");
			}
			
			System.out.println("退出");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		finally{
			group.shutdownGracefully();
		}
	}
}


三、服务器端代码
package com.wallimn.iteye.netty.heart;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
 * 服务器处理器(handler)
 * 
 * <br>
 * <br>时间:2019年9月15日 上午8:37:15,作者:wallimn
 */
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		System.out.println("读取信息,channelShortId="+ctx.channel().id().asShortText());
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println("host: " + packet.sender().getHostString());
		System.out.println("port: " + packet.sender().getPort());
		System.out.println("content: " + msg);
		String[] fields = msg.split(";");
		if (fields.length != 2) {
			return;
		}
		ClientInformation client = new ClientInformation();
		client.setId(fields[0]);
		client.setRecordTime(new Date());
		client.setAddress(packet.sender());
		ClientMessage message = new ClientMessage();
		message.setClient(client);
		message.setMessage(fields[1]);
		
		System.out.println("加入待处理数据队列");
		
		//标注客户端的ID

		// 不对消息进行处理,只是加入队列,由其他线程进行处理
		if(!"heart".equals(message.getMessage())){//如果不是心跳消息
			DataHolder.clientMessageQueue.add(message);		
		}
		
		//不管什么消息,更新客户端的信息
		DataHolder.clientInformationMap.put(client.getId(), client);
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		super.exceptionCaught(ctx, cause);
	}
	
//这个方法对于UDP没有什么意义
//	@Override
//	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//		if (evt instanceof IdleStateEvent) {
//			IdleStateEvent e = (IdleStateEvent) evt;
//			switch (e.state()) {
//			case READER_IDLE:
//				System.out.println("READER_IDLE");
//				break;
//			case WRITER_IDLE:
//				System.out.println("WRITER_IDLE");
//				break;
//			case ALL_IDLE:
//				System.out.println("ALL_IDLE");
//				break;
//			default:
//				break;
//			}
//		}
//	}

}


package com.wallimn.iteye.netty.heart;

import java.util.Date;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

/**
 * 服务器应用
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ServerApp
 * <br>
 * <br>
 * 时间:2019年9月14日 上午9:47:29,作者:wallimn
 */
public class ServerApp {


	//public static ConcurrentMap<String, ClientMessage> clientMessageMap = new ConcurrentHashMap<String, ClientMessage>();

	// 内部放置多个Task,
	public static HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 1, // tick一下的时间
			TimeUnit.SECONDS, 3);// 放置Timer的数量

	public static Channel channel = null;

	public static final long RESPONSE_TIMEER_DELAY = 1L;
	public static final long CHECK_TIMEER_DELAY = 5L;
	public static final long HELLO_TIMEER_DELAY = 2L;

	/**
	 * 处理客户端发来的消息
	 */
	public static TimerTask responseTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			//System.out.println("responseTasker run, size is "+DataHolder.clientMessageQueue.size());
			
			ClientMessage message;
			for (Iterator<ClientMessage> iterator=DataHolder.clientMessageQueue.iterator();iterator.hasNext();) {
				message = iterator.next();
				System.out.println(message.getMessage());
				if("time".equals(message.getMessage())){
					channel.writeAndFlush(
							new DatagramPacket(Unpooled.copiedBuffer("18:18", CharsetUtil.UTF_8), message.getClient().getAddress()));
				}
				else{
					;
				}
				//处理完后清除
				iterator.remove();
			}

		}
	};

	/**
	 * 用于检查客户端是否有效
	 */
	public static TimerTask checkTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			long now = new Date().getTime();
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				if (now - client.getRecordTime().getTime() > Config.CLIENT_VALID_THRESHOLD) {

					System.out.println("client kick : " + client.getId());
					DataHolder.clientInformationMap.remove(entry.getKey());
				}
			}

		}

	};
	
	/**
	 * 用于模拟主动向客户端发送消息
	 */
	public static TimerTask helloTimer = new TimerTask(){

		public void run(Timeout timeout) throws Exception {
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			timer.newTimeout(this, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				System.out.println("helloTimer run. send to "+client.getId());
				
				channel.writeAndFlush(
						new DatagramPacket(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8), client.getAddress()));
			}
		}
		
	};
	
	
	public static void main(String[] args) throws Exception {
		int port = Config.SERVER_PORT;
		timer.newTimeout(responseTasker, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(checkTasker, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(helloTimer, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
		new ServerApp().run(port);

	}

	public void run(int port) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();

		b.group(group).channel(NioDatagramChannel.class)
				// .option(ChannelOption.SO_BROADCAST, true)
				.handler(new ChannelInitializer<Channel>(){

					@Override
					protected void initChannel(Channel ch) throws Exception {
						//ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
						ch.pipeline().addLast(new ServerHandler());
					}
					
				});

		ChannelFuture future = b.bind(port).sync();
		channel = future.channel();
		System.out.println("服务器准备就绪,channelShortId="+channel.id().asShortText());
		channel.closeFuture().await();

		group.shutdownGracefully();
	}
}



2
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics