今天有兴趣重新看了一下heritrix3.1.0系统里面的线程池源码,heritrix系统没有采用java的cocurrency包里面的并发框架,而是采用了线程组ThreadGroup类来实现线程池的(线程组类似于树结构,一个线程组包含多个子线程组或多个子线程,数据结构类似于composite模式,不过枝节点与叶子节点没有实现类似composite模式的共同接口)

关键类是org.archive.crawler.framework包里面的ToePool类与ToeThread类,前者继承自ThreadGroup类,后者继承自Thread类

ToeThread显然是工作线程,用于执行采集任务,构造函数初始化成员变量CrawlController controller,用于获取Frontier对象及相关处理器链

    private CrawlController controller; 
private String coreName;
private CrawlURI currentCuri; /**
* Create a ToeThread
*
* @param g ToeThreadGroup
* @param sn serial number
*/
public ToeThread(ToePool g, int sn) {
// TODO: add crawl name?
super(g,"ToeThread #" + sn);
coreName="ToeThread #" + sn + ": ";
controller = g.getController();
serialNumber = sn;
setPriority(DEFAULT_PRIORITY);
int outBufferSize = controller.getRecorderOutBufferBytes();
int inBufferSize = controller.getRecorderInBufferBytes();
httpRecorder = new Recorder(controller.getScratchDir().getFile(),
"tt" + sn + "http", outBufferSize, inBufferSize);
lastFinishTime = System.currentTimeMillis();
} /** (non-Javadoc)
* @see java.lang.Thread#run()
*/
public void run() {
String name = controller.getMetadata().getJobName();
logger.fine(getName()+" started for order '"+name+"'");
Recorder.setHttpRecorder(httpRecorder); try {
while ( true ) {
ArchiveUtils.continueCheck(); setStep(Step.ABOUT_TO_GET_URI, null); CrawlURI curi = controller.getFrontier().next(); synchronized(this) {
ArchiveUtils.continueCheck();
setCurrentCuri(curi);
currentCuri.setThreadNumber(this.serialNumber);
lastStartTime = System.currentTimeMillis();
currentCuri.setRecorder(httpRecorder);
} try {
KeyedProperties.loadOverridesFrom(curi); controller.getFetchChain().process(curi,this); controller.getFrontier().beginDisposition(curi); controller.getDispositionChain().process(curi,this); } catch (RuntimeExceptionWrapper e) {
// Workaround to get cause from BDB
if(e.getCause() == null) {
e.initCause(e.getCause());
}
recoverableProblem(e);
} catch (AssertionError ae) {
// This risks leaving crawl in fatally inconsistent state,
// but is often reasonable for per-Processor assertion problems
recoverableProblem(ae);
} catch (RuntimeException e) {
recoverableProblem(e);
} catch (InterruptedException e) {
if(currentCuri!=null) {
recoverableProblem(e);
Thread.interrupted(); // clear interrupt status
} else {
throw e;
}
} catch (StackOverflowError err) {
recoverableProblem(err);
} catch (Error err) {
// OutOfMemory and any others
seriousError(err);
} finally {
httpRecorder.endReplays();
KeyedProperties.clearOverridesFrom(curi);
} setStep(Step.ABOUT_TO_RETURN_URI, null);
ArchiveUtils.continueCheck(); synchronized(this) {
controller.getFrontier().finished(currentCuri);
controller.getFrontier().endDisposition();
setCurrentCuri(null);
}
curi = null; setStep(Step.FINISHING_PROCESS, null);
lastFinishTime = System.currentTimeMillis();
if(shouldRetire) {
break; // from while(true)
}
}
} catch (InterruptedException e) {
if(currentCuri!=null){
logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);
}
// thread interrupted, ok to end
logger.log(Level.FINE,this.getName()+ " ended with Interruption");
} catch (Exception e) {
// everything else (including interruption)
logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
} catch (OutOfMemoryError err) {
seriousError(err);
} finally {
controller.getFrontier().endDisposition(); } setCurrentCuri(null);
// Do cleanup so that objects can be GC.
this.httpRecorder.closeRecorders();
this.httpRecorder = null; logger.fine(getName()+" finished for order '"+name+"'");
setStep(Step.FINISHED, null);
controller = null;
}

ToePool是线程组,用于管理上面的工作线程,初始化、查看活动线程、中断或终止工作线程等

protected CrawlController controller;
protected int nextSerialNumber = 1;
protected int targetSize = 0; /**
* Constructor. Creates a pool of ToeThreads.
*
* @param c A reference to the CrawlController for the current crawl.
*/
public ToePool(AlertThreadGroup atg, CrawlController c) {
//传入父线程组
super(atg, "ToeThreads");
this.controller = c;
setDaemon(true);
} public void cleanup() {
// force all Toes waiting on queues, etc to proceed
Thread[] toes = getToes();
for(Thread toe : toes) {
if(toe!=null) {
toe.interrupt();
}
}
// this.controller = null;
} /**
* @return The number of ToeThreads that are not available (Approximation).
*/
public int getActiveToeCount() {
Thread[] toes = getToes();
int count = 0;
for (int i = 0; i < toes.length; i++) {
if((toes[i] instanceof ToeThread) &&
((ToeThread)toes[i]).isActive()) {
count++;
}
}
return count;
} /**
* @return The number of ToeThreads. This may include killed ToeThreads
* that were not replaced.
*/
public int getToeCount() {
Thread[] toes = getToes();
int count = 0;
for (int i = 0; i<toes.length; i++) {
if((toes[i] instanceof ToeThread)) {
count++;
}
}
return count;
}
//获取活动线程数组
private Thread[] getToes() {
Thread[] toes = new Thread[activeCount()+10];
this.enumerate(toes);
return toes;
} /**
* Change the number of ToeThreads.
*
* @param newsize The new number of ToeThreads.
*/
public void setSize(int newsize)
{
targetSize = newsize;
int difference = newsize - getToeCount();
if (difference > 0) {
// must create threads
for(int i = 1; i <= difference; i++) {
//启动线程
startNewThread();
}
} else {
//退出多余线程
// must retire extra threads
int retainedToes = targetSize;
Thread[] toes = this.getToes();
for (int i = 0; i < toes.length ; i++) {
if(!(toes[i] instanceof ToeThread)) {
continue;
}
retainedToes--;
if (retainedToes>=0) {
continue; // this toe is spared
}
// otherwise:
ToeThread tt = (ToeThread)toes[i];
tt.retire();
}
}
} /**
* Kills specified thread. Killed thread can be optionally replaced with a
* new thread.
*
* <p><b>WARNING:</b> This operation should be used with great care. It may
* destabilize the crawler.
*
* @param threadNumber Thread to kill
* @param replace If true then a new thread will be created to take the
* killed threads place. Otherwise the total number of threads
* will decrease by one.
*/
public void killThread(int threadNumber, boolean replace){ Thread[] toes = getToes();
for (int i = 0; i< toes.length; i++) {
if(! (toes[i] instanceof ToeThread)) {
continue;
}
ToeThread toe = (ToeThread) toes[i];
if(toe.getSerialNumber()==threadNumber) {
toe.kill();
}
} if(replace){
// Create a new toe thread to take its place. Replace toe
startNewThread();
}
}
//锁定,防止并发初始化线程
private synchronized void startNewThread() {
ToeThread newThread = new ToeThread(this, nextSerialNumber++);
newThread.setPriority(DEFAULT_TOE_PRIORITY);
newThread.start();
} public void waitForAll() {
while (true) try {
if (isAllAlive(getToes())) {
return;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
} private static boolean isAllAlive(Thread[] threads) {
for (Thread t: threads) {
if ((t != null) && (!t.isAlive())) {
return false;
}
}
return true;
}

最后,线程组的初始化及工作线程的相关管理在CrawlController对象的相关方法执行

/**
* Maximum number of threads processing URIs at the same time.
*/
int maxToeThreads;
public int getMaxToeThreads() {
return maxToeThreads;
}
@Value("25")
public void setMaxToeThreads(int maxToeThreads) {
this.maxToeThreads = maxToeThreads;
if(toePool!=null) {
toePool.setSize(this.maxToeThreads);
}
} private transient ToePool toePool; /**
* Called when the last toethread exits.
*/
protected void completeStop() {
LOGGER.fine("Entered complete stop."); statisticsTracker.getSnapshot(); // ??? this.reserveMemory = null;
if (this.toePool != null) {
this.toePool.cleanup();
}
this.toePool = null; LOGGER.fine("Finished crawl."); try {
appCtx.stop();
} catch (RuntimeException re) {
LOGGER.log(Level.SEVERE,re.getMessage(),re);
} sendCrawlStateChangeEvent(State.FINISHED, this.sExit); // CrawlJob needs to be sure all beans have received FINISHED signal before teardown
this.isStopComplete = true;
appCtx.publishEvent(new StopCompleteEvent(this));
} /**
* Operator requested for crawl to stop.
*/
public synchronized void requestCrawlStop() {
if(state == State.STOPPING) {
// second stop request; nudge the threads with interrupts
getToePool().cleanup();
}
requestCrawlStop(CrawlStatus.ABORTED);
} /**
* @return Active toe thread count.
*/
public int getActiveToeCount() {
if (toePool == null) {
return 0;
}
return toePool.getActiveToeCount();
} protected void setupToePool() {
toePool = new ToePool(alertThreadGroup,this);
// TODO: make # of toes self-optimizing
toePool.setSize(getMaxToeThreads());
toePool.waitForAll();
} /**
* @return The number of ToeThreads
*
* @see ToePool#getToeCount()
*/
public int getToeCount() {
return this.toePool == null? 0: this.toePool.getToeCount();
} /**
* @return The ToePool
*/
public ToePool getToePool() {
return toePool;
} /**
* Kills a thread. For details see
* {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
* ToePool.killThread(int, boolean)}.
* @param threadNumber Thread to kill.
* @param replace Should thread be replaced.
* @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
*/
public void killThread(int threadNumber, boolean replace){
toePool.killThread(threadNumber, replace);
}

说得够清楚吧

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

本人邮箱:chenying998179@163#com (#改为.)

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/p/3213556.html

最新文章

  1. 8.7 jquery-dom manipulation
  2. Redis 分布式session
  3. spring mvc 通过配置xml访问控制器的三种方式
  4. Jade之属性
  5. C#高级编程(第8版)
  6. Scala中的Apply
  7. 【转】ARM Linux 3.x的设备树(Device Tree)
  8. 【LeetCode题意分析&amp;解答】40. Combination Sum II
  9. 帝国cms修改[!--show.listpage--]分页页码所生成的html标签
  10. java实现定时任务
  11. python之@property
  12. java多线程快速入门(二十一)
  13. mongodb 备份
  14. 关于ř与tableau的集成---- k均值聚类
  15. T-SQL触发器,限制一次只能删除一条数据
  16. Americans are usually tolerant (Listen speak of Unit 2)
  17. 基于obs+nginx-rtmp-module搭建自己直播的系统
  18. Spring Annotation是怎么工作的?
  19. Java内存分配及值、引用的传递
  20. 和大于S的最小子数组 &#183; Minimum Size Subarray Sum

热门文章

  1. IE 火狐浏览器对时间格式的兼容性;使用原型对象的方式 prototype关键字;时间格式化
  2. (转载)UITableView的详细讲解
  3. Spring3.1中使用profile配置开发测试线上环境
  4. Hdu 1521 排列组合
  5. 操作系统——CPU、计算机的构成
  6. ProgressBar及其子类
  7. IPC_共享内存
  8. Ajax实现搜索栏中输入时的自动提示功能
  9. Golang几个常用记录日志对比
  10. 从高铁G18中高端如厕看12306的验证码