使用 Tio 实现 WebSocket, 可以在 tio-websocket-showcase 的基础上进行修改 (据说能够达到企业级性能, 单机支持 30 万连接). 利用 Tio 提供的绑定功能实现消息群发和给指定的用户发送消息, 并且把用户对象直接存储到 ChannelContext 上, 还能省去自己管理用户的麻烦, Tio 也提供了心跳检测功能, IP 黑名单, 流量监控等, 我们只需要关注与业务层代码即可, 下面介绍一些相关的经验:
应用中只有一个 GroupContext, 发送消息, 获取小组信息等
一个连接对应一个 ChannelContext (ip:port), 可以使用
setAttribute()
存储业务数据,getAttribute()
获取数据绑定 (使用 Tio 进行绑定, 可参考让网络编程更轻松和有趣 t-io):
userid: 一个 userid 可以绑定多个 ChannelContext (实现同一个账号多个设备登录)
1
2bindUser(ChannelContext channelContext, String userid)
SetWithLock<ChannelContext> getChannelContextsByUserid(GroupContext groupContext, String userid)token: 一个 token 可以绑定多个 ChannelContext (实现同一个账号多个设备登录)
1
2bindToken(ChannelContext channelContext, String token)
SetWithLock<ChannelContext> getChannelContextsByToken(GroupContext groupContext, String token)bsId: 一个 bsId 只能绑定一个 ChannelContext (实现同一个账号只允许登录一个设备)
1
2bindBsId(ChannelContext channelContext, String bsId)
ChannelContext getChannelContextByBsId(GroupContext groupContext, String bsId)下面介绍的绑定以 bsId 为例, 其他的方式参考实现即可
与服务器建立连接
连接的 URL
传入参数 userId 和 username, 在握手的回调函数
IWsMsgHandler.handshake()
中绑定用户 user 和 ChannelContext:1
2
3// WebSocket 连接 URL
ws://ip:port?userId={userId}&username={username}
ws://ebagtest.edu-edu.com:3721/?userId=10001&username=小明踢掉已登录设备
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 如果 userId 已经绑定过其他的 channelContext (即在其他设备登录), 则踢掉前一个, 一个 userId 只允许登录一台设备
ChannelContext previousChannelContext = Tio.getChannelContextByBsId(channelContext.groupContext, userId);
if (!channelContext.equals(previousChannelContext) && previousChannelContext != null) {
user = (User) previousChannelContext.getAttribute('user')
previousChannelContext.setAttribute('kickOut', true); // 踢掉的标志
Tio.unbindBsId(previousChannelContext);
Tio.remove(previousChannelContext, "服务器断开客户端连接");
logger.info("踢掉 {} 已经登录的连接 {}", userId, previousChannelContext.getClientNode());
// 绑定前一个连接的小组
user.getGroups().forEach(groupName -> {
Tio.bindGroup(channelContext, groupName);
});
}标记连接是被踢掉的, 在
WsServerAioListener.onBeforeClose()
可以根据连接是否被踢掉的进行特殊处理绑定用户
1
2channelContext.setAttribute('user', user); // ChannelContext 中存储用户, 就不需要使用其他数据结构来保存了
Tio.bindBsId(channelContext, userId); // BS ID 和 ChannelContext 是一对一的发送私聊消息
绑定了 bsId 和 ChannelContext, Tio 可以使用
字符串的 bsId
给此用户发送信息, 不需要找到对应的 ChannelContext1
2WsResponse response = WsResponse.fromText(JSON.toJSONString(message), ServerConfig.CHARSET);
Tio.sendToBsId(channelContext.groupContext, userId, response);
与服务器断开连接
解绑用户
在
WsServerAioListener.onBeforeClose()
中解绑用户和 ChannelContext1
Tio.unbindBsId(channelContext);
加入群组
绑定群组
1
2Tio.bindGroup(channelContext, groupName);
channelContext.getGroups(); // 可以得到绑定的所有小组名字群发消息
绑定了 groupName 和 ChannelContext, Tio 可以使用
字符串的 groupName
给此群组用户发送消息, 不需要找到此群组所有用户的 ChannelContext 一个一个的发送消息1
2WsResponse response = WsResponse.fromText(JSON.toJSONString(message), ServerConfig.CHARSET);
Tio.sendToGroup(channelContext.groupContext, groupName, response);群组成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14public List<User> groupUsers(String groupName, ChannelContext channelContext) {
// 1. 获取小组的所有 channelContext
// 2. 得到每个 channelContext 的用户
// 3. 返回小组的所有用户
// 如果小组中没有成员, 返回 null
SetWithLock<ChannelContext> temp = Tio.getChannelContextsByGroup(channelContext.groupContext, groupName);
if (temp == null) {
return Collections.emptyList();
}
Set<ChannelContext> channels = temp.getObj();
return channels.stream().map(this::getUser).collect(Collectors.toList());
}
离开群组
解绑群组
1
Tio.unbindGroup(groupName, channelContext);
心跳检测
心跳检测是指在设定时间内如果服务器没有收到客户端的
任何消息
, 则认为客户端已经不是活跃的了, 服务器端就会断开客户端的连接 (Tio.remove() or Tio.close()). 所以客户端为了告知服务器我还在, 只需要定时的给服务器发送一条消息即可 (Tio 框架中心跳消息的内容不限).启动心跳检测只需要在创建 WsServerStarter 时设置心跳的 timeout 大于 0 就可以了:
1
wsServerStarter.getServerGroupContext().setHeartbeatTimeout(30000); // 30s 检测一次心跳
实现集群
Tio 内置了集群功能 (使用 Redis), 请参考 t-io 集群解决方案以及源码解析 进行配置. 使用集群时需要注意一下几个方面:
消息发送: 内置的集群实现了消息发送功能 (每个服务器只能给和自己直接连接的客户端发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private static Boolean sendToBsId(GroupContext groupContext, String bsId, Packet packet, boolean isBlock) {
ChannelContext channelContext = Tio.getChannelContextByBsId(groupContext, bsId);
if (channelContext == null) {
// 集群内广播的消息就不要再次广播了
if (groupContext.isCluster() && !packet.isFromCluster()) {
TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
if (tioClusterConfig.isCluster4bsId()) {
notifyClusterForBsId(groupContext, bsId, packet);
}
}
return false;
}
if (isBlock) {
return bSend(channelContext, packet);
} else {
return send(channelContext, packet);
}
}GroupConext.setTioClusterConfig()
,TioClusterMessageListener.onMessage()
: 收到广播消息时, 会忽略掉自己广播的消息, 因为已经给直连的客户端发送过了, 其他服务器广播过来的消息会继续发送, 如果本服务器内此 bsId, group, token 等绑定的 ChannelContext 存在就给他们发送消息, 不存在就什么也不操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static Boolean sendToBsId(GroupContext groupContext, String bsId, Packet packet, boolean isBlock) {
ChannelContext channelContext = Tio.getChannelContextByBsId(groupContext, bsId);
if (channelContext == null) {
if (groupContext.isCluster() && !packet.isFromCluster()) {
TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
if (tioClusterConfig.isCluster4bsId()) {
notifyClusterForBsId(groupContext, bsId, packet);
}
}
return false;
}
if (isBlock) {
return bSend(channelContext, packet);
} else {
return send(channelContext, packet);
}
}用户管理: 用 ChannelContext 的 attribute 来存储管理用户的方式只能管理直接和当前服务器连接的用户, 非集群环境中很方便. 由于集群环境中有多台服务器, 所有服务器的连接加在一起才是所有的用户连接, 不能使用这种方式管理用户. 既然内置的集群使用了 Redis, 为了不再引入第三方软件, 可以使用 Redis 来管理用户: 所有用户 (list), 小组用户 (hash)
重复登录:
Tio.getChannelContextByBsId()
只能查找当前 JVM 中的连接, 同一个用户不同次的登录可能被负载均衡分配到不同的服务器, 所以踢掉前一个登录连接需要集群内广播踢掉用户的消息. 可惜 TioClusterMessageListener 没有提供注入处理自定义消息的功能, 目前只能用其他方式进行广播了, 值得一提的是实现广播消息的时候可以参考使用TioClusterVo.CLIENTID
来判断是否自己广播的消息.