使用T-IO写的一个简易的聊天室Demo
本文最后更新于:3 天前
介绍
依据官网上的样例稍加改造,改成了一个简约的聊天室Demo,如官网上所说,这个Demo实际上都可以放生产上,基本的通信已经解决了。只需要根据自己的业务适配一下即可。话不多说,直接整流程。
依赖
目前只用到了核心,引入即可
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
<version>3.8.2.v20220628-RELEASE</version>
</dependency>
一些常量
public class ChatConst {
/**
* 绑定的群组标识
*/
public static final String groupNo = "2333";
/**
* 服务的端口
*/
public static final int port = 8011;
/**
* 默认为本机的IP
*/
public static final String ip = "127.0.0.1";
}
数据包载体,可以理解为Vo
public class ChatPack extends Packet {
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
// 消息头的长度,在读取写入时我们用了Int类型
// buffer.putInt(),以及buffer.getInt()
// 所以 这里为4个字节长度。
// 注意这里的长度,并不是指数字的长度,而是字节长度!请务必理解,
// 本人在这里迷惑了挺长时间。
// 此处代码也不建议修改,修改的话需要动编解码中的代码
public static final int HEAD_LENGTH = 4;
private byte[] body;
public static ChatPack getPack(String msg){
ChatPack pack = new ChatPack();
pack.setBody(msg.getBytes(StandardCharsets.UTF_8));
return pack;
}
public String toStr(){
return new String(this.getBody(),StandardCharsets.UTF_8);
}
}
编码解码
/**
* 客户端服务端编码解码通用,一般无需修改
*/
public class DecodeOrEncode {
/**
* 解码
* @param buffer buffer
* @param readableLength 实际数据长度
*/
public static Packet decode(ByteBuffer buffer,int readableLength, ChannelContext channelContext) throws TioDecodeException {
//提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据
//收到的数据组不了业务包,则返回null以告诉框架数据不够
if (readableLength < ChatPack.HEAD_LENGTH) {
return null;
}
//读取消息体的长度
int bodyLength = buffer.getInt();
//数据不正确,则抛出AioDecodeException异常
if (bodyLength < 0) {
throw new TioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
}
//计算本次需要的数据长度
int neededLength = ChatPack.HEAD_LENGTH + bodyLength;
//收到的数据是否足够组包
int isDataEnough = readableLength - neededLength;
// 不够消息体长度(剩下的buffer组不了消息体)
if (isDataEnough < 0) {
return null;
} else //组包成功
{
ChatPack chatPack = new ChatPack();
if (bodyLength > 0) {
byte[] dst = new byte[bodyLength];
buffer.get(dst);
chatPack.setBody(dst);
}
return chatPack;
}
}
/**
* 编码
* @param packet 数据包
* @param tioConfig 抽象
* @return buffer
*/
public static ByteBuffer encode(Packet packet, TioConfig tioConfig) {
ChatPack chatPack = (ChatPack) packet;
byte[] body = chatPack.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
//bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
int allLen = ChatPack.HEAD_LENGTH + bodyLen;
//创建一个新的bytebuffer
ByteBuffer buffer = ByteBuffer.allocate(allLen);
//设置字节序
buffer.order(tioConfig.getByteOrder());
//写入消息头----消息头的内容就是消息体的长度
buffer.putInt(bodyLen);
//写入消息体
if (body != null) {
buffer.put(body);
}
return buffer;
}
}
服务端
服务端包括三个类,一个启动类,一个Handler处理类,一个监听类
监听类
ChatServerListener.java
public class ChatServerListener implements TioServerListener {
/**
* 服务器检查到心跳超时时,会调用这个函数(一般场景,该方法只需要直接返回false即可)
*
* @param interval 已经多久没有收发消息了,单位:毫秒
* @param heartbeatTimeoutCount 心跳超时次数,第一次超时此值是1,以此类推。此值被保存在:channelContext.stat.heartbeatTimeoutCount
* @return 返回true,那么服务器则不关闭此连接;返回false,服务器将按心跳超时关闭该连接
*/
@Override
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
if (heartbeatTimeoutCount > 10){
return false;
}
return true;
}
/**
* 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
*
* @param channelContext 包罗万象
* @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败
* @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
*/
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) {
if (isConnected){
System.out.println(channelContext.getId()+"--进入房间...");
Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,ChatPack.getPack(channelContext.getId()+"--进入房间..."));
// 绑定群组
Tio.bindGroup(channelContext, ChatConst.groupNo);
}
}
/**
* 原方法名:onAfterDecoded
* 解码成功后触发本方法
*
* @param channelContext 包罗万象
* @param packet 数据包
* @param packetSize 数据包大小
*/
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) {
}
/**
* 接收到TCP层传过来的数据后
*
* @param receivedBytes 本次接收了多少字节
*/
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) {
}
/**
* 消息包发送之后触发本方法
*
* @param isSentSuccess true:发送成功,false:发送失败
*/
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) {
}
/**
* 处理一个消息包后
* @param cost 本次处理消息耗时,单位:毫秒
*/
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) {
}
/**
* 连接关闭前触发本方法
*
* @param channelContext the channelcontext
* @param throwable the throwable 有可能为空
* @param remark the remark 有可能为空
* @author tanyaowu
*/
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
System.out.println(channelContext+"--离开房间...");
Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,
ChatPack.getPack(channelContext.getId()+"--离开房间..."));
}
}
处理类
ChatServerHandler.java
public class ChatServerHandler implements TioServerHandler {
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
// 这里可以做一些额外操作
return DecodeOrEncode.decode(buffer, limit, channelContext);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
// 这里可以做一些额外操作
return DecodeOrEncode.encode(packet, tioConfig);
}
@Override
public void handler(Packet packet, ChannelContext channelContext) {
ChatPack pack = (ChatPack) packet;
String msg = pack.toStr();
System.out.println(channelContext + "-发送消息=" + msg);
// 单体不需要了
//Tio.send(channelContext, ChatPack.getPack("服务端收到了你的消息-" + msg));
Tio.sendToGroup(channelContext.tioConfig, ChatConst.groupNo,ChatPack.getPack(channelContext.getId()+"--发送消息:"+msg));
}
}
启动类
ChatServer.java
public class ChatServer {
public static void main(String[] args) throws Exception {
new TioServer(
new TioServerConfig("chatServer",new ChatServerHandler(),new ChatServerListener())
).start(null,ChatConst.port);
}
}
客户端代码类似
客户端代码没有编写监听类,其实和服务端差不多。
处理类
ChatClientHandler.java
public class ChatClientHandler implements TioClientHandler {
@Override
public Packet heartbeatPacket(ChannelContext channelContext) {
return ChatPack.getPack("ping");
}
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
return DecodeOrEncode.decode(buffer, limit, channelContext);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return DecodeOrEncode.encode(packet, tioConfig);
}
@Override
public void handler(Packet packet, ChannelContext channelContext) {
ChatPack pack = (ChatPack) packet;
System.out.println("收到消息:"+pack.toStr());
}
}
启动类
ChatClient.java
public class ChatClient {
public static void main(String[] args) throws Exception {
ReconnConf reconnConf = new ReconnConf(5000);
// 失败十次就不再重连
reconnConf.setRetryCount(10);
Node node = new Node(ChatConst.ip, ChatConst.port);
TioClient client = new TioClient(
new TioClientConfig(new ChatClientHandler(),
null,
reconnConf));
ClientChannelContext connect = client.connect(node);
Tio.send(connect,ChatPack.getPack("你好世界"));
Scanner scanner = new Scanner(System.in);
while (true) {
String str = scanner.nextLine();
if ("exit".equals(str)) {
Tio.close(connect,"关闭连接");
break;
} else {
Tio.send(connect,ChatPack.getPack(str));
}
}
}
}
代码量算有一点小多的,但是这个代码确实可以直接放生产上,用来做一些报文的传输之类的,应该挺稳的。主要多多学习这个多客户端之间的通信操作吧,用这个Tio可以省很多时间。
封面
使用T-IO写的一个简易的聊天室Demo
https://wangijun.com/2022/08/06/net-03/