netty服务器
Netty配置
管道配置
自定义handler
推送消息接口及实现类
测试
学过 Netty 的都知道,Netty 对 NIO 进行了很好的封装,简单的 API,庞大的开源社区。深受广大程序员喜爱。基于此本文分享一下基础的 netty 使用。实战制作一个 Netty + websocket 的消息推送小栗子。
netty服务器
@Component
publicclassNettyServer{
staticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);
/**
*端口号
*/
@Value("${webSocket.netty.port:8888}")
intport;
EventLoopGroupbossGroup;
EventLoopGroupworkGroup;
@Autowired
ProjectInitializernettyInitializer;
@PostConstruct
publicvoidstart()throwsInterruptedException{
newThread(()->{
bossGroup=newNioEventLoopGroup();
workGroup=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
//bossGroup辅助客户端的tcp连接请求,workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup,workGroup);
//设置NIO类型的channel
bootstrap.channel(NioServerSocketChannel.class);
//设置监听端口
bootstrap.localAddress(newInetSocketAddress(port));
//设置管道
bootstrap.childHandler(nettyInitializer);
//配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuturechannelFuture=null;
try{
channelFuture=bootstrap.bind().sync();
log.info("Serverstartedandlistenon:{}",channelFuture.channel().localAddress());
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}
/**
*释放资源
*/
@PreDestroy
publicvoiddestroy()throwsInterruptedException{
if(bossGroup!=null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup!=null){
workGroup.shutdownGracefully().sync();
}
}
}
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
视频教程:https://doc.iocoder.cn/video/
Netty配置
管理全局Channel以及用户对应的channel(推送消息)
publicclassNettyConfig{
/**
*定义全局单利channel组管理所有channel
*/
privatestaticvolatileChannelGroupchannelGroup=null;
/**
*存放请求ID与channel的对应关系
*/
privatestaticvolatileConcurrentHashMapchannelMap=null;
/**
*定义两把锁
*/
privatestaticfinalObjectlock1=newObject();
privatestaticfinalObjectlock2=newObject();
publicstaticChannelGroupgetChannelGroup(){
if(null==channelGroup){
synchronized(lock1){
if(null==channelGroup){
channelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
returnchannelGroup;
}
publicstaticConcurrentHashMapgetChannelMap(){
if(null==channelMap){
synchronized(lock2){
if(null==channelMap){
channelMap=newConcurrentHashMap<>();
}
}
}
returnchannelMap;
}
publicstaticChannelgetChannel(StringuserId){
if(null==channelMap){
returngetChannelMap().get(userId);
}
returnchannelMap.get(userId);
}
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://gitee.com/zhijiantianya/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
管道配置
@Component publicclassProjectInitializerextendsChannelInitializer{ /** *webSocket协议名 */ staticfinalStringWEBSOCKET_PROTOCOL="WebSocket"; /** *webSocket路径 */ @Value("${webSocket.netty.path:/webSocket}") StringwebSocketPath; @Autowired WebSocketHandlerwebSocketHandler; @Override protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ //设置管道 ChannelPipelinepipeline=socketChannel.pipeline(); //流水线管理通道中的处理程序(Handler),用来处理业务 //webSocket协议本身是基于http协议的,所以这边也要使用http编解码器 pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newObjectEncoder()); //以块的方式来写的处理器 pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newHttpObjectAggregator(8192)); pipeline.addLast(newWebSocketServerProtocolHandler(webSocketPath,WEBSOCKET_PROTOCOL,true,65536*10)); //自定义的handler,处理业务逻辑 pipeline.addLast(webSocketHandler); } }
自定义handler
@Component @ChannelHandler.Sharable publicclassWebSocketHandlerextendsSimpleChannelInboundHandler{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class); /** *一旦连接,第一个被执行 */ @Override publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{ log.info("有新的客户端链接:[{}]",ctx.channel().id().asLongText()); //添加到channelGroup通道组 NettyConfig.getChannelGroup().add(ctx.channel()); } /** *读取数据 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{ log.info("服务器收到消息:{}",msg.text()); //获取用户ID,关联channel JSONObjectjsonObject=JSONUtil.parseObj(msg.text()); Stringuid=jsonObject.getStr("uid"); NettyConfig.getChannelMap().put(uid,ctx.channel()); //将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID AttributeKey key=AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid); //回复消息 ctx.channel().writeAndFlush(newTextWebSocketFrame("服务器收到消息啦")); } @Override publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{ log.info("用户下线了:{}",ctx.channel().id().asLongText()); //删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.info("异常:{}",cause.getMessage()); //删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** *删除用户与channel的对应关系 */ privatevoidremoveUserId(ChannelHandlerContextctx){ AttributeKey key=AttributeKey.valueOf("userId"); StringuserId=ctx.channel().attr(key).get(); NettyConfig.getChannelMap().remove(userId); } }
推送消息接口及实现类
publicinterfacePushMsgService{
/**
*推送给指定用户
*/
voidpushMsgToOne(StringuserId,Stringmsg);
/**
*推送给所有用户
*/
voidpushMsgToAll(Stringmsg);
}
@Service
publicclassPushMsgServiceImplimplementsPushMsgService{
@Override
publicvoidpushMsgToOne(StringuserId,Stringmsg){
Channelchannel=NettyConfig.getChannel(userId);
if(Objects.isNull(channel)){
thrownewRuntimeException("未连接socket服务器");
}
channel.writeAndFlush(newTextWebSocketFrame(msg));
}
@Override
publicvoidpushMsgToAll(Stringmsg){
NettyConfig.getChannelGroup().writeAndFlush(newTextWebSocketFrame(msg));
}
}
测试

链接服务器


发送消息


调用接口,往前端推送消息!


OK!
一个简单的 netty 小栗子就完成了。
-
接口
+关注
关注
33文章
9443浏览量
156115 -
封装
+关注
关注
128文章
9139浏览量
147872 -
服务器
+关注
关注
13文章
10093浏览量
90863
原文标题:Spring Boot+Netty+Websocket实现后台向前端推送信息
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
基于多路复用模型的Netty框架
基于阿里云移动推送的移动应用推送模式最佳实践
如何实现服务器自动推送消息?
怎么去理解netty
怎样使用springboot整合netty来开发一套高性能的通信系统呢
Springboot整合netty框架实现终端、通讯板子(单片机)TCP/UDP通信案例
Netty如何实现消息推送
Netty如何做到单机百万并发?
jdk17下netty导致堆内存疯涨原因排查

netty推送消息接口及实现
评论