在Socket通信中为了保证Server和Client连接的有效,一般会使用心跳来检测Server和Client通信是否畅通.

Server

  1. 心跳处理handler
package com.github.chamberlaincell.toy.monarch.transport.common;

import com.github.chamberlaincell.toy.monarch.common.domain.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 心跳处理 每个客户端连接的channel 都有一个handler实例
 *
 * @author chamberlaincell cyhyx521@gmail.com
 * @date 2020/10/3 17:50
 */
@Slf4j
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * timeout count
     */
    private AtomicInteger timeoutCount = new AtomicInteger(0);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        timeoutCount.set(0);
        if (msg.getType() == Message.PING) {
            log.info("receive client heartbeat");
            ctx.writeAndFlush(Message.PONG_MSG);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 如果是心跳消息 超过时间客户端没有连接 下线
            handlerHeartbeatTimeout(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * handler heartbeat timeout
     *
     * @param ctx ctx
     */
    private void handlerHeartbeatTimeout(ChannelHandlerContext ctx) {
        // todo 处理
        if (timeoutCount.getAndIncrement() >= 5) {
            ctx.close();
            log.warn("monarch timeout count more than 5,close!");
        } else {
            log.warn("monarch timeout, count: {}", timeoutCount.get());
        }
    }
}

  1. 注册Server Handler,配置IdleStateHandler

服务端需要添加IdleStateHandler,并且配置读取超时时间,超过时间没有获取到客户端的消息,触发event

	//多余的代码省略,具体代码可以查看底部的github地址
        socketChannel.pipeline()
                // when idle , send  heartbeat
                .addLast("IdleStateHandler", new IdleStateHandler(MonarchConstant.HEART_BEAT_TIME_OUT * MonarchConstant.HEART_BEAT_TIME_OUT_MAX_TIME, 0, 0, TimeUnit.SECONDS))
                // byte to message
                .addLast("MonarchDecoder", new MonarchDecoder(globalConfig.getSerializeConfig()))
                // message to byte
                .addLast("MonarchEncoder", new MonarchEncoder(globalConfig.getSerializeConfig()))
                // heartbeat
                .addLast("HeartbeatServerHandler", new HeartbeatServerHandler())
                .addLast("MonarchServerHandler", new MonarchServerHandler(this));

client

  1. Client Handler
package com.github.chamberlaincell.toy.monarch.transport.common;

import com.github.chamberlaincell.toy.monarch.common.domain.Message;
import com.github.chamberlaincell.toy.monarch.transport.monarch.constant.MonarchConstant;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 心跳处理 每个客户端连接的channel 都有一个handler实例
 *
 * @author chamberlaincell cyhyx521@gmail.com
 * @date 2020/10/3 17:50
 */
@Slf4j
public class HeartbeatClientHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * timeout count
     */
    private AtomicInteger timeoutCount = new AtomicInteger(0);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        timeoutCount.set(0);
        if (msg.getType() == Message.PONG) {
            log.info("receive server heartbeat");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 如果是心跳消息 超过时间客户端没有请求服务端
            handlerHeartbeatTimeout(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * handler heartbeat timeout
     *
     * @param ctx ctx
     */
    private void handlerHeartbeatTimeout(ChannelHandlerContext ctx) {
        if (timeoutCount.getAndIncrement() >= MonarchConstant.HEART_BEAT_TIME_OUT_MAX_TIME) {
            //todo 当前连接以及断开 尝试重连
            ctx.close();
            log.warn("monarch timeout count more than {},close!", MonarchConstant.HEART_BEAT_TIME_OUT_MAX_TIME);
        } else {
            // 超时未发送数据 客户端主动发送消息到server
            ctx.writeAndFlush(Message.PING_MSG);
            log.warn("monarch timeout, count: {}", timeoutCount.get());
        }
    }
}

  1. 注册Client Handler,配置IdleStateHandler

客户端和服务端基本相同,不同是我们需要配置IdleStateHandler的写超时,一旦没有向服务端发送消息,那么主动向服务端发起请求,说明自己还活着.

        socketChannel.pipeline()
                .addLast("IdleStateHandler", new IdleStateHandler(0, MonarchConstant.HEART_BEAT_TIME_OUT, 0))
                // byte to message
                .addLast("MonarchDecoder", new MonarchDecoder(globalConfig.getSerializeConfig()))
                // message to byte
                .addLast("MonarchEncoder", new MonarchEncoder(globalConfig.getSerializeConfig()))
                .addLast("HeartbeatClientHandler", new HeartbeatClientHandler())
                .addLast("MonarchClientHandler", new MonarchClientHandler(this));

代码地址

https://github.com/chamberlaincell/toy-monarch/tree/master/src/main/java/com/github/chamberlaincell/toy/monarch/transport/monarch

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

人生中没有四季 唯有那寒冬的荒野