参考:https://www.cnblogs.com/mc-74120/p/13622008.html

pom文件

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

启动类

@EnableFeignClients
@EnableDiscoveryClient
@EnableScheduling
@SpringBootApplication
@EnableAsync
public class ChimetaCoreApplication  implements CommandLineRunner{
    
@Autowired
private NettyServerListener nettyServerListener;

public static void main(String[] args) {
    SpringApplication.run(ChimetaCoreApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
   
   nettyServerListener.start();
}

}

服务端代码的listener

package com.chimeta.netty;

import javax.annotation.PreDestroy; import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;

import com.chimeta.netty.protobuf.ImProto;

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j;

/**

  • 服务启动监听器

  • @author mwan / @Component @Slf4j public class NettyServerListener { /*

    • 创建bootstrap / ServerBootstrap serverBootstrap = new ServerBootstrap(); /*
    • BOSS / EventLoopGroup boss = new NioEventLoopGroup(); /*
    • Worker / EventLoopGroup work = new NioEventLoopGroup(); /*
    • 通道适配器 / @Resource private ServerChannelHandlerAdapter channelHandlerAdapter; /*
    • 从配置中心获取NETTY服务器配置 */ @Value("${server.netty.port:10001}") private int NETTY_PORT;

    @Value("${server.netty.maxthreads:5000}") private int MAX_THREADS;

    /**

    • 关闭服务器方法 */ @PreDestroy public void close() { log.info("关闭服务器...."); //优雅退出 boss.shutdownGracefully(); work.shutdownGracefully(); }

    /**

    • 开启及服务线程 */ public void start() { serverBootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, MAX_THREADS) //最大客户端连接数为1024
      .handler(new LoggingHandler(LogLevel.INFO)).childOption(ChannelOption.SO_KEEPALIVE, true); ; try { //设置事件处理 serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 下面的每一个addLast都有自己的含义,需要每个都过一下 ch.pipeline().addLast(new IdleStateHandler(18,0,0)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); //ch.pipeline().addLast(new CustomProtobufInt32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ImProto.ImMsg.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); //ch.pipeline().addLast(new CustomProtobufInt32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); // 业务处理 ch.pipeline().addLast(channelHandlerAdapter); } }); log.info("netty服务器在[{}]端口启动监听", NETTY_PORT); ChannelFuture f = serverBootstrap.bind(NETTY_PORT).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("[出现异常] 释放资源", e); boss.shutdownGracefully(); work.shutdownGracefully(); log.info("服务已关闭!"); } } }

 ServerChannelHandlerAdapter处理类

package com.chimeta.netty;

import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

import com.chimeta.netty.model.SessionCloseReason; import com.chimeta.netty.protobuf.ImProto.ImMsg; import com.chimeta.netty.util.ChannelUtils; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j;

/**

  • 通信服务处理器 / @Component @Sharable @Slf4j public class ServerChannelHandlerAdapter extends ChannelInboundHandlerAdapter { /*

    • 注入请求分排器 */ @Autowired private MessageDispatcher messageDispatcher;

    @Autowired private DeviceSessionManager sessionManager;

    /** 用来记录当前在线连接数。应该把它设计成线程安全的。 */ //private AtomicInteger sessionCount = new AtomicInteger(0);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);

     if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {
       ctx.channel().close();
       log.error("Duplicate session,IP=[{}]",ChannelUtils.getRemoteIp(ctx.channel()));
    }     
    
     //String server_ip = NetworkUtils.getRealIp();//获得本机IP
     // 缓存计数器加1

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);

     // 缓存计数器减1
     //String server_ip = NetworkUtils.getRealIp();//获得本机IP
     log.info(ctx.channel().id()+"离开了");  

    }

    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

     ImMsg gameMessage = (ImMsg)msg;
     final Channel channel = ctx.channel();
    IoSession session = ChannelUtils.getSessionBy(channel);
    if(session.isHeartbeated()) {
       session.setHeartbeated(false);
    }
    
    String deviceCode="";
    if(session.getDevice() != null &amp;&amp; StringUtils.isNotBlank(session.getDevice().getDeviceCode())) {
       deviceCode = session.getDevice().getDeviceCode();
    }

// if(!MessagingConst.TYPE_UPOS_REQUEST.equals(gameMessage.getMsg().getTypeUrl())) { try { log.info("Inbound message is :" + JsonFormat.printer().usingTypeRegistry(DeviceSessionManager.typeRegistry).print(gameMessage.toBuilder()) + ", from device " + deviceCode); } catch (InvalidProtocolBufferException e) { log.info("Inbound message is :" + gameMessage.toString()); } // }

   messageDispatcher.dispatch(gameMessage, session);
}
 
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
    ctx.flush();  
} 

@Override  
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
        throws Exception {  
    
    log.error("通信发生异常:", cause);
    ctx.close();   
} 

/**
 * 一段时间未进行读写操作 回调
 */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    /*心跳处理*/
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (event.state() == IdleState.READER_IDLE) {
            /*读超时*/
            log.info("READER_IDLE read overtime,close session");
            final Channel channel = ctx.channel();
           IoSession session = ChannelUtils.getSessionBy(channel);
            
         /*
          * if(messageDispatcher.sendHeartbeat(session) == false) { //如果心跳检测失败,则连接异常,主动断开
          * session.setSessionCloseReason(SessionCloseReason.OVER_TIME); ctx.close(); };
          */
           
           session.setSessionCloseReason(SessionCloseReason.OVER_TIME);
          ctx.close();
            
        } else if (event.state() == IdleState.WRITER_IDLE) {
            /*写超时*/   
            log.info("WRITER_IDLE 写超时");
        } else if (event.state() == IdleState.ALL_IDLE) {
            /*总超时*/
            log.info("ALL_IDLE 总超时");
        }
    }
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    sessionManager.unregisterUserContext(ctx.channel());
    log.info(ctx.channel().id() + "已掉线!");
    // 这里加入玩家的掉线处理
    ctx.close();

}

}

MessageDispatcher分派各个处理器

package com.chimeta.netty;

import com.chimeta.netty.service.TerminalService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;

import com.chimeta.netty.constant.MessagingConst; import com.chimeta.netty.model.SessionCloseReason; import com.chimeta.netty.protobuf.ImProto.ImMsg; import com.chimeta.netty.service.LoginService; import com.chimeta.netty.util.MessageBuilder; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;

/**

  • 请求分排器 */ @Component @Slf4j public class MessageDispatcher{

    @Autowired private LoginService loginService;

    @Resource private TerminalService terminalService;

    /**

    • 消息分发处理

    • @param gameMsg

    • @throws InvalidProtocolBufferException */ @Async public void dispatch(ImMsg imMsg, IoSession currSession) throws InvalidProtocolBufferException {

      if(imMsg.getId() < 0) { currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "Invalid message!")); return; } //log.info("接收到的消息TypeUrl是: "+imMsg.getMsg().getTypeUrl()); switch(imMsg.getMsg().getTypeUrl()) {

       case MessagingConst.TYPE_ONLINE_REQUEST:
          // 处理设备上线请求
          loginService.doLogin(imMsg, currSession);
          break;
       case MessagingConst.TYPE_USER_LOGON_REQUEST:
          // 处理请求
          loginService.doUserLogon(imMsg, currSession);
          break;
       case MessagingConst.TYPE_USER_LOGOFF_REQUEST:
          // 处理请求
          loginService.doUserLogoff(imMsg, currSession);
          break;

      case MessagingConst.TYPE_TERMINAL_STATE_REQUEST: // 我写的 terminalService.multiInsert(imMsg, currSession); break; default: if(currSession != null) { // 返回客户端发来的心跳消息 responseHeartbeat(imMsg, currSession); } break; } }

    /**

    • 发送心跳包消息

    • @param gameMsg

    • @param currSession

    • @return */ public boolean sendHeartbeat(IoSession currSession) {

      try { if(currSession.isHeartbeated()) { return false; } ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();

      currSession.sendMessage(imMsgBuilder.build());

      currSession.setHeartbeated(true);

      return true; }catch(Exception e) { log.error("主动发送心跳包时发生异常:", e); currSession.close(SessionCloseReason.EXCEPTION); return false; }

    } /**

    • 返回客户端发来的心跳包消息

    • @param imMsg

    • @param currSession */ private void responseHeartbeat(ImMsg imMsg,IoSession currSession) { ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();

      currSession.sendMessage(imMsgBuilder.build()); }

}

最后到service业务处理TerminalService

package com.chimeta.netty.service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.chimeta.common.entity.terminal.TerminalStateMonitorDO; import com.chimeta.netty.IoSession; import com.chimeta.netty.constant.MessagingConst; import com.chimeta.netty.model.DeviceInfo; import com.chimeta.netty.protobuf.ImProto; import com.chimeta.netty.util.MessageBuilder; import com.chimeta.terminal.mapper.TerminalStateMonitorMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils;

import java.math.BigDecimal; import java.util.ArrayList; import java.util.List;

/**

  • 盒子设备相关的实现类 */ @Service @Slf4j public class TerminalService extends ServiceImpl<TerminalStateMonitorMapper, TerminalStateMonitorDO> {

    @Transactional(rollbackFor = Exception.class) public void multiInsert(ImProto.ImMsg imMsg, IoSession currSession){ DeviceInfo deviceInfo = currSession.getDevice(); if(deviceInfo == null) { currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "device not online!")); return; } try { ImProto.TerminalStateList terminalStateList = imMsg.getMsg().unpack(ImProto.TerminalStateList.class); log.info("TerminalService multiInsert TerminalStateList:{}", terminalStateList); List<ImProto.TerminalState> requestTerminalStateList = terminalStateList.getTerminalStateList(); if (!CollectionUtils.isEmpty(requestTerminalStateList)){ List<TerminalStateMonitorDO> tmplist = new ArrayList<>(); for (ImProto.TerminalState requestTerminalState : requestTerminalStateList){ TerminalStateMonitorDO terminalStateMonitorDO = new TerminalStateMonitorDO(); terminalStateMonitorDO.setBatteryLevel(requestTerminalState.getBatteryLevel()); terminalStateMonitorDO.setChargingState(requestTerminalState.getChargingState()); terminalStateMonitorDO.setTemperature(BigDecimal.valueOf(requestTerminalState.getTemperature())); terminalStateMonitorDO.setUniqueCode(deviceInfo.getDeviceCode()); terminalStateMonitorDO.setStateTime(requestTerminalState.getStateTime()); tmplist.add(terminalStateMonitorDO); } this.saveBatch(tmplist); } } catch (Exception e) { log.error("TerminalService multiInsert error:{}", e); } }

}

至此,服务端的处理逻辑写完,然后比较费时间的是自己写client的请求,终于经过两三天时间总结好了,写了个test类,如下

package com.chimeta.core;

import com.chimeta.netty.protobuf.ImProto; import com.google.protobuf.Any; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner;

@Slf4j @RunWith(MockitoJUnitRunner.class) class NettyTerminalTest {

@Test
public void tryTest()  throws InterruptedException {

    ImProto.TerminalStateList terminalstateList = ImProto.TerminalStateList.newBuilder().build();
    for (int i = 0; i &lt; 3; i++) {
        ImProto.TerminalState build = ImProto.TerminalState.newBuilder()
                .setBatteryLevel(i)
                .setChargingState(i * 11)
                .setTemperature(i * 11.1)
                .setStateTime(i * 111)
                .build();
        terminalstateList = terminalstateList.toBuilder().addTerminalState(build).build();
    }

    ImProto.ImMsg imMsg = ImProto.ImMsg.newBuilder().setId(66).setMsg(Any.pack(terminalstateList)).build();

    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer&lt;NioSocketChannel&gt;() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("初始化连接...");
                    ch.pipeline().addLast("encode", new ProtobufEncoder())
                            .addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufVarint32LengthFieldPrepender());
                }
            })
            .channel(NioSocketChannel.class).connect("192.168.123.123", 10001)
            .sync()
            .channel();

// channel.pipeline().addLast(new StringEncoder()).writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(imMsg.toByteArray())); channel.pipeline().writeAndFlush(Unpooled.copiedBuffer(imMsg.toByteArray())); System.out.println("over!"); }

}

 好了,记录下,以后就不会忘记了