使用T-IO写的一个简易的聊天室Demo

介绍

依据官网上的样例稍加改造,改成了一个简约的聊天室Demo,如官网上所说,这个Demo实际上都可以放生产上,基本的通信已经解决了。只需要根据自己的业务适配一下即可。话不多说,直接整流程。

依赖

目前只用到了核心,引入即可

<dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-core</artifactId>
    <version>3.8.2.v20220628-RELEASE</version>
</dependency>

一些常量

public class ChatConst {
    /**
     * 绑定的群组标识
     */
    public static final String groupNo = "2333";
    /**
     * 服务的端口
     */
    public static final int port = 8011;

    /**
     * 默认为本机的IP
     */
    public static final String ip = "127.0.0.1";
}

数据包载体,可以理解为Vo

public class ChatPack extends Packet {
    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }

    // 消息头的长度,在读取写入时我们用了Int类型
    // buffer.putInt(),以及buffer.getInt()
    // 所以 这里为4个字节长度。
    // 注意这里的长度,并不是指数字的长度,而是字节长度!请务必理解,
    // 本人在这里迷惑了挺长时间。
    // 此处代码也不建议修改,修改的话需要动编解码中的代码
    public static final int HEAD_LENGTH = 4;
    private byte[] body;

    public static ChatPack getPack(String msg){
        ChatPack pack = new ChatPack();
        pack.setBody(msg.getBytes(StandardCharsets.UTF_8));
        return pack;
    }
    public String toStr(){
        return new String(this.getBody(),StandardCharsets.UTF_8);
    }

}

编码解码

/**
 * 客户端服务端编码解码通用,一般无需修改
 */
public class DecodeOrEncode {


    /**
     * 解码
     * @param buffer buffer
     * @param readableLength 实际数据长度
     */
    public static Packet decode(ByteBuffer buffer,int readableLength, ChannelContext channelContext) throws TioDecodeException {
        //提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据
        //收到的数据组不了业务包,则返回null以告诉框架数据不够
        if (readableLength < ChatPack.HEAD_LENGTH) {
            return null;
        }
        //读取消息体的长度
        int bodyLength = buffer.getInt();
        //数据不正确,则抛出AioDecodeException异常
        if (bodyLength < 0) {
            throw new TioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
        }
        //计算本次需要的数据长度
        int neededLength = ChatPack.HEAD_LENGTH + bodyLength;
        //收到的数据是否足够组包
        int isDataEnough = readableLength - neededLength;
        // 不够消息体长度(剩下的buffer组不了消息体)
        if (isDataEnough < 0) {
            return null;
        } else //组包成功
        {
            ChatPack chatPack = new ChatPack();
            if (bodyLength > 0) {
                byte[] dst = new byte[bodyLength];
                buffer.get(dst);
                chatPack.setBody(dst);
            }
            return chatPack;
        }
    }

    /**
     * 编码
     * @param packet 数据包
     * @param tioConfig 抽象
     * @return buffer
     */
    public static ByteBuffer encode(Packet packet, TioConfig tioConfig) {

        ChatPack chatPack = (ChatPack) packet;
        byte[] body = chatPack.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }
        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = ChatPack.HEAD_LENGTH + bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(tioConfig.getByteOrder());
        //写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);
        //写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
    }
}

服务端

服务端包括三个类,一个启动类,一个Handler处理类,一个监听类

监听类
ChatServerListener.java

public class ChatServerListener implements TioServerListener {
    /**
     * 服务器检查到心跳超时时,会调用这个函数(一般场景,该方法只需要直接返回false即可)
     *
     * @param interval              已经多久没有收发消息了,单位:毫秒
     * @param heartbeatTimeoutCount 心跳超时次数,第一次超时此值是1,以此类推。此值被保存在:channelContext.stat.heartbeatTimeoutCount
     * @return 返回true,那么服务器则不关闭此连接;返回false,服务器将按心跳超时关闭该连接
     */
    @Override
    public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
        if (heartbeatTimeoutCount > 10){
            return false;
        }
        return true;
    }

    /**
     * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
     *
     * @param channelContext 包罗万象
     * @param isConnected    是否连接成功,true:表示连接成功,false:表示连接失败
     * @param isReconnect    是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
     */
    @Override
    public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) {
        if (isConnected){
            System.out.println(channelContext.getId()+"--进入房间...");
            Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,ChatPack.getPack(channelContext.getId()+"--进入房间..."));
            // 绑定群组
            Tio.bindGroup(channelContext, ChatConst.groupNo);
        }
    }

    /**
     * 原方法名:onAfterDecoded
     * 解码成功后触发本方法
     *
     * @param channelContext 包罗万象
     * @param packet 数据包
     * @param packetSize 数据包大小
     */
    @Override
    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) {

    }

    /**
     * 接收到TCP层传过来的数据后
     *
     * @param receivedBytes  本次接收了多少字节
     */
    @Override
    public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) {

    }

    /**
     * 消息包发送之后触发本方法
     *
     * @param isSentSuccess  true:发送成功,false:发送失败
     */
    @Override
    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) {

    }

    /**
     * 处理一个消息包后
     * @param cost           本次处理消息耗时,单位:毫秒
     */
    @Override
    public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) {

    }

    /**
     * 连接关闭前触发本方法
     *
     * @param channelContext the channelcontext
     * @param throwable      the throwable 有可能为空
     * @param remark         the remark 有可能为空
     * @author tanyaowu
     */
    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
        System.out.println(channelContext+"--离开房间...");
        Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,
                ChatPack.getPack(channelContext.getId()+"--离开房间..."));
    }
}

处理类
ChatServerHandler.java

public class ChatServerHandler implements TioServerHandler {
    @Override
    public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
        // 这里可以做一些额外操作
        return DecodeOrEncode.decode(buffer, limit, channelContext);
    }

    @Override
    public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
        // 这里可以做一些额外操作
        return DecodeOrEncode.encode(packet, tioConfig);
    }

    @Override
    public void handler(Packet packet, ChannelContext channelContext)  {
        ChatPack pack = (ChatPack) packet;
        String msg = pack.toStr();
        System.out.println(channelContext + "-发送消息=" + msg);
        // 单体不需要了
        //Tio.send(channelContext, ChatPack.getPack("服务端收到了你的消息-" + msg));
        Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,ChatPack.getPack(channelContext.getId()+"--发送消息:"+msg));
    }
}

启动类
ChatServer.java

public class ChatServer {
    public static void main(String[] args) throws Exception {
        new TioServer(
                new TioServerConfig("chatServer",new ChatServerHandler(),new ChatServerListener())
        ).start(null,ChatConst.port);
    }
}

客户端代码类似

客户端代码没有编写监听类,其实和服务端差不多。

处理类
ChatClientHandler.java

public class ChatClientHandler implements TioClientHandler {

    @Override
    public Packet heartbeatPacket(ChannelContext channelContext) {
        return ChatPack.getPack("ping");
    }

    @Override
    public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
        return DecodeOrEncode.decode(buffer, limit, channelContext);

    }

    @Override
    public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
        return DecodeOrEncode.encode(packet, tioConfig);
    }

    @Override
    public void handler(Packet packet, ChannelContext channelContext) {
        ChatPack pack = (ChatPack) packet;
        System.out.println("收到消息:"+pack.toStr());
    }
}

启动类
ChatClient.java

public class ChatClient {
    public static void main(String[] args) throws Exception {
        ReconnConf reconnConf = new ReconnConf(5000);
        // 失败十次就不再重连
        reconnConf.setRetryCount(10);
        Node node = new Node(ChatConst.ip, ChatConst.port);
        TioClient client = new TioClient(
                new TioClientConfig(new ChatClientHandler(),
                        null,
                        reconnConf));

        ClientChannelContext connect = client.connect(node);

        Tio.send(connect,ChatPack.getPack("你好世界"));

        Scanner scanner = new Scanner(System.in);
        while (true) {
            String str = scanner.nextLine();
            if ("exit".equals(str)) {
                Tio.close(connect,"关闭连接");
                break;
            } else {
                Tio.send(connect,ChatPack.getPack(str));
            }
        }

    }
}

代码量算有一点小多的,但是这个代码确实可以直接放生产上,用来做一些报文的传输之类的,应该挺稳的。主要多多学习这个多客户端之间的通信操作吧,用这个Tio可以省很多时间。

封面

集原美 湖光中的睡美人


使用T-IO写的一个简易的聊天室Demo
https://wangijun.com/2022/08/06/net-03/
作者
无良芳
发布于
2022年8月6日
许可协议