近来在oschina上看到一个很火的java 即时通讯项目talent-aio,恰巧想了解一下这方面的东西,就阅读了一下项目的源码,这里对自己阅读源码后的一些心得体会做一下备忘,也希望能够对其他项目中需要用到即时通讯功能的人有所帮助。

1 talent-aio是什么


talent-aio是基于java aio(JSR 203 )实现的即时通讯框架。对比与NIO,JSR 203 习惯上称为 NIO.2,主要包括新的异步io机制。在talent-aio中,server与client的实现主要使用了AsynchronousSocketChannel以及AsynchronousSocketChannel。作为一个简单的热身,我们先来一个小例子说明如何使用aio搭建一个简单的server。

server端

首先,创建一个channel group,之后server端accept等操作的回调就会在这个Channel group所拥有的线程池中执行。

final AsynchronousChannelGroup group
= AsynchronousChannelGroup.withFixedThreadPool(5, Executors.defaultThreadFactory());

然后将其与一个AsynchronousSocketChannel与上面的AsynchronousChannelGroup关联起来:

final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);

将这个listener bind到指定端口上:

InetSocketAddress hostAddress = new InetSocketAddress("localhost",  8888);
listener.bind(hostAddress);

之后就可以通过回调accept客户端发来的连接,而后读取客户端发来的数据了,记得在调用处理方法后后继续调用listen的accept方法来接收新的客户端请求。

final String att1 = "First connection";
listener.accept(att1, new CompletionHandler() {
@Override
public void completed(AsynchronousSocketChannel ch, Object att) {
System.out.println("Completed: " + att);
String msg = handleConnection(ch);
att = "next completed";
listener.accept(att, this);
}
@Override
public void failed(Throwable e, Object att) {
System.out.println(att + " - handler failed");
e.printStackTrace();
currentThread.interrupt();
}
});

其中handleConnection的代码为,在其中调用read,在read回调中打印出客户端发来的数据

  private String handleConnection(AsynchronousSocketChannel ch) {
ByteBuffer buffer = ByteBuffer.allocate(32);
ch.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) {
attachment.flip();
String msg = new String(attachment.array()).trim();
System.out.println("Message from client: " + msg);
attachment.clear();
if (msg.equals("close")) {
if (!group.isTerminated()) {
System.out.println("Terminating the group...");
try {
group.shutdownNow();
group.awaitTermination(10, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
System.out.println("Exception during group termination");
e.printStackTrace();
}
}
}
}
} @Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println(" - handler failed");
exc.printStackTrace();
currentThread.interupt();
}
});
}

客户端

客户端使用不带AsynchronousChannelGroup参数的open,则使用系统默认的AsynchronousChannelGroup。客户端没有使用回调的方式,而是使用future,限制通过get阻塞到连接建立完成,而后向服务器发送close,并轮询write返回的future查看数据是否发送完毕,最后关闭连接。

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883);
Future future = client.connect(hostAddress);
future.get(); // returns null System.out.println("Client is started");
System.out.println("Sending message to server: "); byte [] bytes = new String("close").getBytes();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Future result = client.write(buffer); while (! result.isDone()) {
System.out.println("... ");
} System.out.println(new String(buffer.array()).trim());
buffer.clear();
client.close();

可以看出,在不使用Future的情况下,aio的主要处理逻辑在acceptreadwriteclose等回调函数中。

2 talent-aio server端的处理流程:

通过上面的例子,相信大家已经对aio有了一个大概的了解,下面我们就来看看talent-aio server端是怎么处理客户端连接的。对于客户端,read、write、close等的处理方式与server端相似。

2.1 server处理流程

首先,accept客户端连接:

接受连接

然后,读取并处理客户端数据:

数据处理

2.2 处理流程相关的重要的接口与类说明

了解了大概的流程后,就需要深入细节来了解talent-aio的工作方式了。talent-aio的数据解码、包处理、数据发送、连接关闭分别在DecodeRunnableHandlerRunnableSendRunnable以及CloseRunnable几类Task中完成。这些Task都继承了抽象类AbstractQueueRunnable,该抽象类继承了AbstractSynRunnable,而AbstractSynRunnable实现了接口SynRunnableIntf

public interface SynRunnableIntf extends Runnable
{
public ReadWriteLock runningLock(); public boolean isNeededExecute(); public boolean isCanceled(); public void setCanceled(boolean isCanceled); /**
* Run task.
*/
public void runTask();
}

AbstractSynRunnable在该接口基础上添加了方法:

    /**
* @return the executor
*/
public Executor getExecutor()
{
return executor;
} /**
* @param executor the executor to set
*/
public void setExecutor(Executor executor)
{
this.executor = executor;
}

并且主要实现了Runnablerun方法:

    @Override
public final void run()
{
if (isCanceled()) //任务已经被取消
{
return;
} ReadWriteLock runningLock = runningLock();
Lock writeLock = runningLock.writeLock();
boolean trylock = writeLock.tryLock();
if (!trylock)
{
return;
} try
{
runTask();
} catch (Exception e)
{
log.error(e.toString(), e);
} finally
{
writeLock.unlock();
if (isNeededExecute())
{
getExecutor().execute(this);
}
}
}

run中,会尝试获取runningLock,如果获取失败,说明该runneable已经在执行了,可以立即退出。否则就运行runTask,最终根据是否需要继续执行决定要不要再次将该runnable提交到执行线程池中(比如处理完一个packet,发现该连接还有待处理的packet则需要继续处理)。

AbstractQueueRunnable还实现了QueueRunnableIntf<T>

public interface QueueRunnableIntf<T>
{
/**
* 获取数据队列.
*
* @return 保存着要处理的数据的队列
*/
ConcurrentLinkedQueue<T> getMsgQueue();
}

并且实现了方法isNeededExecute,通过检查数据队列中是否还有待处理数据来判断是否要继续提交该runnable到executor:

public boolean isNeededExecute()
{
return getMsgQueue().size() > 0;
}

这样,只要向msgQueue中添加一个任务,就可以在runTask方法中获取该任务,并且进行相应的处理了。对于解码,发送,处理,关闭的详细分析请关注本系列的第二篇文章。

转:http://www.jianshu.com/p/522446599d39

最新文章

  1. 在Asp.Net中操作PDF – iTextSharp - 使用表格
  2. 前端学HTTP之网络基础
  3. HTML5笔记1——HTML5的发展史及标签的改变
  4. RHEL查看CPU等机器信息
  5. StackOverflow Update: 560M Pageviews A Month, 25 Servers, And It&#39;s All About Performance
  6. 我的ipad应用备份
  7. HTML里面Textarea换行总结
  8. Java多线程之非线程安全
  9. HDU-4862-Jump
  10. bitnami gitlab 安装
  11. Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
  12. 第十二节:WebApi自动生成在线Api文档的两种方式
  13. matlab多个曲面如何画在一个坐标系中的疑问
  14. Linux 设置系统时间和时区2.Ubuntu
  15. 初学者--oracle安装完后出现的一些问题
  16. PXE自动装机
  17. CCF-权限查询-201612-3
  18. IIS导入.pfx证书时报错:&quot;A specified logon session does not exist. It may already have been terminated.&quot;
  19. AD域相关的属性和C#操作AD域
  20. JVM启动过程

热门文章

  1. HTTP杂记
  2. 使用uwsgi启动django项目
  3. Azure进阶攻略 | 下载还是在浏览器直接打开,MIME说了算!
  4. python-daemon
  5. check_mk 之 Configuration variables
  6. git 因线上分支名重复导致无法拉取代码
  7. 创建Gradle工程出现Could not install Gradle distribution from &#39;https://services.gradle.org/distributions/gradleXX&#39;.问题解决
  8. ubuntu linux断点续传下载工具 uGet 的安装
  9. Android(java)学习笔记70:TabActivity使用
  10. 关于VMware给系统分区扩容的一点经验