Content Table

Tio WebSocket 经验

使用 Tio 实现 WebSocket, 可以在 tio-websocket-showcase 的基础上进行修改 (据说能够达到企业级性能, 单机支持 30 万连接). 利用 Tio 提供的绑定功能实现消息群发和给指定的用户发送消息, 并且把用户对象直接存储到 ChannelContext 上, 还能省去自己管理用户的麻烦, Tio 也提供了心跳检测功能, IP 黑名单, 流量监控等, 我们只需要关注与业务层代码即可, 下面介绍一些相关的经验:

  • 应用中只有一个 GroupContext, 发送消息, 获取小组信息等

  • 一个连接对应一个 ChannelContext (ip:port), 可以使用 setAttribute() 存储业务数据, getAttribute() 获取数据

  • 绑定 (使用 Tio 进行绑定, 可参考让网络编程更轻松和有趣 t-io):

    • userid: 一个 userid 可以绑定多个 ChannelContext (实现同一个账号多个设备登录)

      1
      2
      bindUser(ChannelContext channelContext, String userid)
      SetWithLock<ChannelContext> getChannelContextsByUserid(GroupContext groupContext, String userid)
    • token: 一个 token 可以绑定多个 ChannelContext (实现同一个账号多个设备登录)

      1
      2
      bindToken(ChannelContext channelContext, String token)
      SetWithLock<ChannelContext> getChannelContextsByToken(GroupContext groupContext, String token)
    • bsId: 一个 bsId 只能绑定一个 ChannelContext (实现同一个账号只允许登录一个设备)

      1
      2
      bindBsId(ChannelContext channelContext, String bsId)
      ChannelContext getChannelContextByBsId(GroupContext groupContext, String bsId)

      下面介绍的绑定以 bsId 为例, 其他的方式参考实现即可

  • 与服务器建立连接

    • 连接的 URL

      传入参数 userId 和 username, 在握手的回调函数 IWsMsgHandler.handshake() 中绑定用户 user 和 ChannelContext:

      1
      2
      3
      // WebSocket 连接 URL
      ws://ip:port?userId={userId}&username={username}
      ws://ebagtest.edu-edu.com:3721/?userId=10001&username=小明
    • 踢掉已登录设备

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      // 如果 userId 已经绑定过其他的 channelContext (即在其他设备登录), 则踢掉前一个, 一个 userId 只允许登录一台设备
      ChannelContext previousChannelContext = Tio.getChannelContextByBsId(channelContext.groupContext, userId);
      if (!channelContext.equals(previousChannelContext) && previousChannelContext != null) {
      user = (User) previousChannelContext.getAttribute('user')

      previousChannelContext.setAttribute('kickOut', true); // 踢掉的标志
      Tio.unbindBsId(previousChannelContext);
      Tio.remove(previousChannelContext, "服务器断开客户端连接");
      logger.info("踢掉 {} 已经登录的连接 {}", userId, previousChannelContext.getClientNode());

      // 绑定前一个连接的小组
      user.getGroups().forEach(groupName -> {
      Tio.bindGroup(channelContext, groupName);
      });
      }

      标记连接是被踢掉的, 在 WsServerAioListener.onBeforeClose() 可以根据连接是否被踢掉的进行特殊处理

    • 绑定用户

      1
      2
      channelContext.setAttribute('user', user); // ChannelContext 中存储用户, 就不需要使用其他数据结构来保存了
      Tio.bindBsId(channelContext, userId); // BS ID 和 ChannelContext 是一对一的
    • 发送私聊消息

      绑定了 bsId 和 ChannelContext, Tio 可以使用字符串的 bsId 给此用户发送信息, 不需要找到对应的 ChannelContext

      1
      2
      WsResponse response = WsResponse.fromText(JSON.toJSONString(message), ServerConfig.CHARSET);
      Tio.sendToBsId(channelContext.groupContext, userId, response);
  • 与服务器断开连接

    • 解绑用户

      WsServerAioListener.onBeforeClose() 中解绑用户和 ChannelContext

      1
      Tio.unbindBsId(channelContext);
  • 加入群组

    • 绑定群组

      1
      2
      Tio.bindGroup(channelContext, groupName);
      channelContext.getGroups(); // 可以得到绑定的所有小组名字
    • 群发消息

      绑定了 groupName 和 ChannelContext, Tio 可以使用字符串的 groupName 给此群组用户发送消息, 不需要找到此群组所有用户的 ChannelContext 一个一个的发送消息

      1
      2
      WsResponse response = WsResponse.fromText(JSON.toJSONString(message), ServerConfig.CHARSET);
      Tio.sendToGroup(channelContext.groupContext, groupName, response);
    • 群组成员

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      public List<User> groupUsers(String groupName, ChannelContext channelContext) {
      // 1. 获取小组的所有 channelContext
      // 2. 得到每个 channelContext 的用户
      // 3. 返回小组的所有用户

      // 如果小组中没有成员, 返回 null
      SetWithLock<ChannelContext> temp = Tio.getChannelContextsByGroup(channelContext.groupContext, groupName);
      if (temp == null) {
      return Collections.emptyList();
      }

      Set<ChannelContext> channels = temp.getObj();
      return channels.stream().map(this::getUser).collect(Collectors.toList());
      }
  • 离开群组

    • 解绑群组

      1
      Tio.unbindGroup(groupName, channelContext);
  • 心跳检测

    心跳检测是指在设定时间内如果服务器没有收到客户端的任何消息, 则认为客户端已经不是活跃的了, 服务器端就会断开客户端的连接 (Tio.remove() or Tio.close()). 所以客户端为了告知服务器我还在, 只需要定时的给服务器发送一条消息即可 (Tio 框架中心跳消息的内容不限).

    启动心跳检测只需要在创建 WsServerStarter 时设置心跳的 timeout 大于 0 就可以了:

    1
    wsServerStarter.getServerGroupContext().setHeartbeatTimeout(30000); // 30s 检测一次心跳
  • 实现集群

    Tio 内置了集群功能 (使用 Redis), 请参考 t-io 集群解决方案以及源码解析 进行配置. 使用集群时需要注意一下几个方面:

    • 消息发送: 内置的集群实现了消息发送功能 (每个服务器只能给和自己直接连接的客户端发送消息)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      private static Boolean sendToBsId(GroupContext groupContext, String bsId, Packet packet, boolean isBlock) {
      ChannelContext channelContext = Tio.getChannelContextByBsId(groupContext, bsId);
      if (channelContext == null) {
      // 集群内广播的消息就不要再次广播了
      if (groupContext.isCluster() && !packet.isFromCluster()) {
      TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();

      if (tioClusterConfig.isCluster4bsId()) {
      notifyClusterForBsId(groupContext, bsId, packet);
      }
      }
      return false;
      }
      if (isBlock) {
      return bSend(channelContext, packet);
      } else {
      return send(channelContext, packet);
      }
      }

      GroupConext.setTioClusterConfig(), TioClusterMessageListener.onMessage(): 收到广播消息时, 会忽略掉自己广播的消息, 因为已经给直连的客户端发送过了, 其他服务器广播过来的消息会继续发送, 如果本服务器内此 bsId, group, token 等绑定的 ChannelContext 存在就给他们发送消息, 不存在就什么也不操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      private static Boolean sendToBsId(GroupContext groupContext, String bsId, Packet packet, boolean isBlock) {
      ChannelContext channelContext = Tio.getChannelContextByBsId(groupContext, bsId);
      if (channelContext == null) {
      if (groupContext.isCluster() && !packet.isFromCluster()) {
      TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();

      if (tioClusterConfig.isCluster4bsId()) {
      notifyClusterForBsId(groupContext, bsId, packet);
      }
      }
      return false;
      }
      if (isBlock) {
      return bSend(channelContext, packet);
      } else {
      return send(channelContext, packet);
      }
      }
    • 用户管理: 用 ChannelContext 的 attribute 来存储管理用户的方式只能管理直接和当前服务器连接的用户, 非集群环境中很方便. 由于集群环境中有多台服务器, 所有服务器的连接加在一起才是所有的用户连接, 不能使用这种方式管理用户. 既然内置的集群使用了 Redis, 为了不再引入第三方软件, 可以使用 Redis 来管理用户: 所有用户 (list), 小组用户 (hash)

    • 重复登录: Tio.getChannelContextByBsId() 只能查找当前 JVM 中的连接, 同一个用户不同次的登录可能被负载均衡分配到不同的服务器, 所以踢掉前一个登录连接需要集群内广播踢掉用户的消息. 可惜 TioClusterMessageListener 没有提供注入处理自定义消息的功能, 目前只能用其他方式进行广播了, 值得一提的是实现广播消息的时候可以参考使用 TioClusterVo.CLIENTID 来判断是否自己广播的消息.