參考资料:  阿里巴巴开源项目 CobarClient  源代码实现。

分享作者:闫建忠

分享时间:2014年5月7日

---------------------------------------------------------------------------------------

并行调度封装类设计: BXexample.java

package org.hdht.business.ordermanager.quartzjob;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.dao.ConcurrencyFailureException; public class BXexample { private static ExecutorService createCustomExecutorService(int poolSize, final String method) {
int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量
if (poolSize < coreSize) {
coreSize = poolSize;
}
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");
t.setDaemon(true);
return t;
}
};
BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>();
final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60,
TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy()); return executor;
} public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) {
if (list == null || list.isEmpty()) {
return null;
}
int startIndex = skip;
int endIndex = skip + pageSize;
if (startIndex > endIndex || startIndex > list.size()) {
return null;
}
if (endIndex > list.size()) {
endIndex = list.size();
}
return list.subList(startIndex, endIndex);
} public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){
//构建运行器
ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection");
try {
//监视器
final CountDownLatch latch = new CountDownLatch(paramCollection.size());
final StringBuffer exceptionStaktrace = new StringBuffer();
Iterator<?> iter = paramCollection.iterator();
while (iter.hasNext()) {
final Object entity = iter.next();
Runnable task = new Runnable() {
public void run() {
try {
ecb.doExectue(entity);
} catch (Throwable t) {
exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t));
} finally {
latch.countDown();
}
}
};
executor.execute(task);//并行调度
} try {
latch.await();//监视器等待全部线程运行完成
} catch (InterruptedException e) {
//调度异常
throw new ConcurrencyFailureException(
"unexpected interruption when re-arranging parameter collection into sub-collections ",e);
}
if (exceptionStaktrace.length() > 0) {
//业务异常
throw new ConcurrencyFailureException(
"unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace);
} } finally {
executor.shutdown();//运行器关闭
}
} }

回调接口类设计:ExectueCallBack.java

package org.hdht.business.ordermanager.quartzjob;

public interface ExectueCallBack {
void doExectue(Object executor) throws Exception;
}

演示样例(hello 演示样例)

	public static void main(String[] args) {

		List<String> paramCollection  = new ArrayList<String>();
paramCollection.add("9");
paramCollection.add("2");
paramCollection.add("18");
paramCollection.add("7");
paramCollection.add("6");
paramCollection.add("1");
paramCollection.add("3");
paramCollection.add("4");
paramCollection.add("14");
paramCollection.add("13"); int freesize = 3;//当前处理能力 for(int i=0;i<paramCollection.size();i=i+freesize){ List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize); BXexample.BXfunction(tl,new ExectueCallBack() {
public void doExectue(Object executor) throws Exception {
int k = Integer.parseInt((String)executor); for(int i=0;i<k*10000000;i++){
//运行循环
}
System.out.println(k+":hello world");
}
}); }
}

演示样例(实际业务应用演示样例)

/**
* 并行调度相关处理
*
* 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 加入到paramMapList列表中
*/
List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>();
for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) {
OrderParamSatellite paramSatellite = iterator.next(); paramMapList.addAll(this.getParamMapList(paramSatellite));
} //依据集群最大处理能力,分页处理任务列表,作为list截取的步长 int fsize = HostServerQueue.getInstance().freeSize();
for(int i=0;i<paramMapList.size();i=i+fsize){
List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i, fsize);
//并行调度
BXexample.BXfunction(tl,new ExectueCallBack(){
public void doExectue(Object executor) throws Exception {
ExecuteOrderBTask((Map<String, Object>)executor);
}
}); //动态查找空暇节点数量,即集群最大处理能力
fsize = HostServerQueue.getInstance().freeSize();
}

最新文章

  1. ThinkPHP 表单提交操作成功后执行JS操作如何刷新父页面或关闭当前页等操作
  2. cout中的执行顺序_a++和++a
  3. 在线c++编译器(gcc)
  4. sikuli实战记录
  5. Android手机浏览器访问本地网络相关问题
  6. MVP 实例
  7. Tomcat中负载的Session解决办法
  8. Kindle 实用技巧
  9. ios7自带的晃动效果
  10. 点击推送消息跳转处理(iOS)
  11. 在windows server里,对于同一个账号,禁止或允许多个用户使用该账户,同时登录
  12. 如何将dtb反编译成dts
  13. 【转载】chown和chmod使用
  14. Orleans学习总结(四)--集群配置篇
  15. API返回错误信息的最佳实践
  16. 折叠菜单slideUp
  17. Html 列表实现展开和收起
  18. Spring MVC 框架结构介绍(二)
  19. php发邮件:swiftmailer, php邮件库——swiftmailer
  20. mysql + php 中文乱码 全是? 解决方法

热门文章

  1. [原]Unity3D深入浅出 - GUI控件
  2. Robotium 系列(2) - 简单介绍Monkey和MonkeyRunner
  3. 命令mv
  4. 【转载】gcc和g++的区别
  5. MySQL在线备份与恢复工具 --&gt; Xtrabackup
  6. hadoop中MapReduce中压缩的使用及4种压缩格式的特征的比较
  7. Code Understanding Step by Step - We Need a Task
  8. POJ3016-K-Monotonic(左偏树+DP)
  9. MySQL Workbench 导出数据库脚本(图文)
  10. 问题-关于 in []使用过程中报错&quot; Constant expression violates subrange bounds&quot;