<返回更多

使用 Netty 实现一个 IM 即时通讯系统

2022-09-23  今日头条  爱马士团团长
加入收藏

一、目录介绍

二、需求梳理

通过前面两章内容的学习,我们基本学会了如何使用.NETty 建立一个长连接,接下来我们就在这个基础上,实现一个单机版的 im 系统。

主要功能,我梳理了一下:

使用到的相关组件:

三、具体实现

本期的内容是基于,原理篇一的 dome 代码基础上进行的,没有看过的原理一的小伙伴,建议先回顾一下原理篇一。

原理篇一的代码结构:

实战篇一的代码结构:

 

代码的层级结构如上所示,接下来,我们将会一个个模块对逻辑进行讲解。

1、登录

1)实现逻辑

不管是长连接还是短连接,鉴权这个动作都是要有的,我相信这个功能模块,大家是很好理解的。我这里就不在过多的赘述了,具体实现步骤如下所示:

1、前后端建立 ws 连接

2、前端发送登录类型的报文,如下所示:

{
    "token": "2",
    "type": "10"
}

token:这里的 token,就是用户登录标识,大家可以根据自己所依赖的业务系统,进行修改。

type:这里表示消息报文的类型,本文所有类型定义如下所示:

示例代码如下图所示,WsMsgDispatcher.dispatch

 

消息类型

3、后端对 token 进行校验,校验成功就记录用户登录信息。

示例代码如下图所示,UserLoginProcessor.login

 

登录业务逻辑

2)具体效果

主要的业务代码我们已经讲解完毕了,接下来我们来看看效果:

 

用户登录

从上图,我们可以看到,我们登录的两个用户都成功了,并且返回了对应的用户信息。

2、维持连接、心跳检测

这个模块的功能,其实我们在原理篇二的时候已经讲过了具体的实现方案了,这里也不再过多的赘述了,我们直接来看具体实现方法吧。

1)维持连接

1、前端每10秒发送一次心跳消息,报文如下所示:

{
    "type": "40",
    "fromId": "2"
}

注:前端发送的每个消息,理论上都是需要带上用户表示的,后端都是需要进行鉴权操作的。我们这里为了方便讲解(偷懒,bushi)将这部分逻辑进行了简化,大家在具体实现的时候,记得一定要加上 鉴权逻辑

2、后端检测,用户是否还在线,如果在线,则刷新用户的最新在线时间,并回复 PONG 消息。

示例代码如下图所示,HeartBeatProcessor.process

 

维持连接1

 

维持连接2

2)心跳检测

这里主要是基于 IdleStateEvent 事件实现的。

TextWebSocketFrameHandler 继承 SimpleChannelInboundHandler 类,并实现 userEventTriggered 方法,具体代码如下所示:

 

心跳检测

这里详细说一下,三种事件的区别:

所以,我们这里检测 ALL_IDLE 事件即可。

3)具体效果

维持连接效果如下所示:

 

维持连接效果

心跳检测效果如下所示:

 

心跳超时效果1

 

心跳超时效果2

3、聊天消息

聊天消息模块主要分为两部分:

这边主要的难点在于,服务端将消息推送到指定的客户端,具体场景有2种情况:

本文由于是 单机版 的 im,所以只会有第一种情况发生,第二种情况就留给大家自由发挥了。

1)消息接收

具体步骤如下所示:

1、客户端发送类型为80的报文,如下所示:

{
    "type": "80",
    "fromId": "1",
    "toId": "2",
    "content": {
        "contentType": 1,
        "body": "测试消息"
    }
}

2、服务端(ChatProcessor)对消息进行处理,具体代码如下所示:

 

消息接收

2)消息推送

具体步骤如下所示:

1、获取消息接受者所连接的服务器 ip 地址 2、判断当前服务器 ip 地址是否和上面的 ip 地址相同,如果相同则推送消息,否则转发给目标服务器

具体代码如下所示:

 

消息推送

3)具体效果

1、我们先登录两个用户,分别是张三、李四,如下图所示:

 

聊天登录

2、张三发送消息给李四,如下图所示:

 

张三发送消息给李四

3、李四发送消息给张三,如下图所示:

 

李四发送消息给张三

4、消息 ack

因为网络环境异常或者其他异常状况的发送,可能会出现消息推送失败的情况,这时候就需要 消息 ack 机制和重试,来保证我们的消息可以推送成功。

1)消息 ack 机制

具体步骤如下:

1、客户端收到 80 类型的消息,解析并发送 ack 报文,如下所示:

{
    "type": "90",
    "msgId": "2bfea133-72a8-4315-82aa-80049fe4fb7b"
}

2、服务端收到 ack 消息,变更消息状态(AckProcessor),具体代码如下图所示:

 

消息ack

2)消息重试

这里因为是单机版 im,所以直接采用 SpringBoot-Job 实现,Job 代码如下所示:

 

消息重试

以上文章来源于JAVA知音 ,作者啊杰

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>