Camel支持一种更复杂的异步的处理模型,异步处理器实现一个继承自Processor接口的AsyncProcessor接口,使用异步Processor的长处:

a.异步Processor不会因等待堵塞调用而耗尽线程。这样在处理相同工作量的情况下,通过降低线程的数量能够添加系统的伸缩性

b.使用异步Processor,能够将路由分阶段处理。不同的线程池处理其对应的路由阶段。这就意味着路由能够并行处理。

缺点:实现异步的Processor要比同步的Processor复杂得多。

异步Processor与同步Processor的差别:

a.必须提供一个AsyncCallback对象,该对象在exchange处理完毕后被通知

b.在异步Processor处理exchange的时候不能抛出不论什么异常,而应该将异常存储在exchange的Exception属性中

c.异步Processor必须知道它将以什么方式完毕处理,异步或同步,假设process方法返回true。则是同步完毕。假设process方法返回false,则是异步完毕。

d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法。sync參数必须与process方法的返回值一致。



对于一个路由来说,全然使用异步模式能够减少线程的使用量,这要求从Consumer開始就必须使用异步的处理API(即调用异步的

process方法),假设Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制堵塞。

有一点必须注意的是当你调用了异步的API。这并不意味着处理过程就是异步的,这不过为不捆绑在调用者线程提供了可能。

至于是否是进行异步处理依赖于Camel路由的配置.

以上是Camel官方对异步Processor的解释,以下是本人用于測试的一个样例:

public static void main(String[] args) throws Exception {
RouteBuilder builder = new RouteBuilder() { @Override
public void configure() throws Exception {
RouteDefinition definition1 = this.from("file:H:/temp/in");
RouteDefinition definition2 = definition1.process(new Processor() { @Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process1");
}
}).process(new AsyncProcessor() { @Override
public void process(Exchange exchange) throws Exception {
System.out.println("process");
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
System.out.println(Thread.currentThread().getName());
System.out.println("async process");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.done(false);
return false;
}
}).process(new Processor() { @Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process2");
}
});
definition2.to("file:H:/temp/out");
}
}; DefaultCamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(builder);
camelContext.start(); Object object = new Object();
synchronized (object) {
object.wait();
} }

当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的样例中,三个线程名称是一样的,

而且在堵塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中运行的,这也是堵塞10秒的原因。

我个人觉得是对Camel异步Processor的"异步"两字理解出现了偏差。这里的异步仅仅为processor的processor方法。提供一个

回调函数,而不是另启线程。

并且我们自己写Processor处理器对这个异步的使用也非常有限,由于我们写的处理器是被调用者。AsyncCallback是由上层提供的,我们仅仅是能调用其done方法通知上层本次处理完毕。而我们很多其它的需求应该是自己去注冊回调函数,并且我们可以控制这个回调函数的回调时机。而如今我们无法提供回调函数的注冊。那我们不禁要问,这个AsyncCallback对象那究竟是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的样例来说是在FileConsumer类中。例如以下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)

getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
if (log.isTraceEnabled()) {
log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
}
}
});

这时创建的AsyncCallback对象就是源始的回调对象。当然在路由运行的兴许过程中,该回调对象能够被包装。当中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个样例:

callback = new InternalCallback(states, exchange, callback);

这里我们不禁又会问。既然CamelInternalProcessor可以对源始AsyncCallback对象进行包装增加自己的回调逻辑,为什么我们自己不行呢。其原来还是我们写的Processor是被调用者。是被包装者,详细过程可參看Camel路由启动过程



假设非要加入自己的回调逻辑也不是不可能。就仅仅能自己写消费者。自己写消费者就能控制源AsyncCallback对象,其兴许仅仅是对

源AsyncCallback对象的一个包装的过程。仅仅要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,假设在第二个Processor中假设不运行callback.done(false);的话路由过程将永远不会结束,由于上层一直觉得下层处理还未结束。

当然假设我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,可是假设我们写了异步Processor就一定要调用callback.done方法。



所以这么一通下来。并没有感受到官方提及的不堵塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,并且这个回调功能还非常有限,当然这也有可能是自己什么地方理解错了,如若如此。尽请指正......

最新文章

  1. sqlite的简介
  2. Surface Normal Vector in OpenCascade
  3. OC中的面向对象语法3
  4. python获取知乎日报另存为txt文件
  5. 启动tomcat报错 Could not reserve enough space for object heap的解决办法
  6. java 错误集锦
  7. javascript的词法分析
  8. Android应用换肤总结
  9. logstash 通过type判断
  10. Redis 数据库的安装
  11. Sublime Text Packages Control 安装
  12. ArrayBlockingQueue和LinkedBlockingQueue的区别
  13. netsh winsock reset命令
  14. python数据包之利器scapy用法!
  15. centos7 mongodb安装
  16. linux(fedora) 第三课
  17. 七牛 OCR 接口调试 & 七牛鉴权 Token 应用
  18. Sign Up Account In CloudAMQP
  19. if语法
  20. 【转载】WCF 客户端识别认证之UserName认证

热门文章

  1. ROS知识(12)----cv_bridge依赖opencv版本的问题
  2. 1、安装Redis的PHP扩展
  3. 如何使用git工具向github提交代码
  4. PHP获取数组中奇偶数
  5. python语法32[装饰器decorator](转)
  6. C# 结构体定义 转换字节数组 z
  7. navicat for mysql中添加注释
  8. 使用命名参数处理 CallableStatement
  9. pom-4.0.0.xml中心仓库
  10. ubuntu 键盘布局修改