博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Heritrix 3.1.0 源码解析(三十七)
阅读量:5927 次
发布时间:2019-06-19

本文共 11821 字,大约阅读时间需要 39 分钟。

今天有兴趣重新看了一下heritrix3.1.0系统里面的线程池源码,heritrix系统没有采用java的cocurrency包里面的并发框架,而是采用了线程组ThreadGroup类来实现线程池的(线程组类似于树结构,一个线程组包含多个子线程组或多个子线程,数据结构类似于composite模式,不过枝节点与叶子节点没有实现类似composite模式的共同接口)

关键类是org.archive.crawler.framework包里面的ToePool类与ToeThread类,前者继承自ThreadGroup类,后者继承自Thread类

ToeThread显然是工作线程,用于执行采集任务,构造函数初始化成员变量CrawlController controller,用于获取Frontier对象及相关处理器链

private CrawlController controller;     private String coreName;    private CrawlURI currentCuri;   /**     * Create a ToeThread     *      * @param g ToeThreadGroup     * @param sn serial number     */    public ToeThread(ToePool g, int sn) {        // TODO: add crawl name?        super(g,"ToeThread #" + sn);        coreName="ToeThread #" + sn + ": ";        controller = g.getController();        serialNumber = sn;        setPriority(DEFAULT_PRIORITY);        int outBufferSize = controller.getRecorderOutBufferBytes();        int inBufferSize = controller.getRecorderInBufferBytes();        httpRecorder = new Recorder(controller.getScratchDir().getFile(),            "tt" + sn + "http", outBufferSize, inBufferSize);        lastFinishTime = System.currentTimeMillis();    }    /** (non-Javadoc)     * @see java.lang.Thread#run()     */    public void run() {        String name = controller.getMetadata().getJobName();        logger.fine(getName()+" started for order '"+name+"'");        Recorder.setHttpRecorder(httpRecorder);                 try {            while ( true ) {                ArchiveUtils.continueCheck();                                setStep(Step.ABOUT_TO_GET_URI, null);                CrawlURI curi = controller.getFrontier().next();                                                synchronized(this) {                    ArchiveUtils.continueCheck();                    setCurrentCuri(curi);                    currentCuri.setThreadNumber(this.serialNumber);                    lastStartTime = System.currentTimeMillis();                    currentCuri.setRecorder(httpRecorder);                }                                try {                    KeyedProperties.loadOverridesFrom(curi);                                        controller.getFetchChain().process(curi,this);                                        controller.getFrontier().beginDisposition(curi);                                        controller.getDispositionChain().process(curi,this);                  } catch (RuntimeExceptionWrapper e) {                    // Workaround to get cause from BDB                    if(e.getCause() == null) {                        e.initCause(e.getCause());                    }                    recoverableProblem(e);                } catch (AssertionError ae) {                    // This risks leaving crawl in fatally inconsistent state,                     // but is often reasonable for per-Processor assertion problems                     recoverableProblem(ae);                } catch (RuntimeException e) {                    recoverableProblem(e);                } catch (InterruptedException e) {                    if(currentCuri!=null) {                        recoverableProblem(e);                        Thread.interrupted(); // clear interrupt status                    } else {                        throw e;                    }                } catch (StackOverflowError err) {                    recoverableProblem(err);                } catch (Error err) {                    // OutOfMemory and any others                    seriousError(err);                 } finally {                    httpRecorder.endReplays();                    KeyedProperties.clearOverridesFrom(curi);                 }                                setStep(Step.ABOUT_TO_RETURN_URI, null);                ArchiveUtils.continueCheck();                synchronized(this) {                    controller.getFrontier().finished(currentCuri);                    controller.getFrontier().endDisposition();                    setCurrentCuri(null);                }                curi = null;                                setStep(Step.FINISHING_PROCESS, null);                lastFinishTime = System.currentTimeMillis();                if(shouldRetire) {                    break; // from while(true)                }            }        } catch (InterruptedException e) {            if(currentCuri!=null){                logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);            }            // thread interrupted, ok to end            logger.log(Level.FINE,this.getName()+ " ended with Interruption");        } catch (Exception e) {            // everything else (including interruption)            logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);        } catch (OutOfMemoryError err) {            seriousError(err);        } finally {            controller.getFrontier().endDisposition();        }        setCurrentCuri(null);        // Do cleanup so that objects can be GC.        this.httpRecorder.closeRecorders();        this.httpRecorder = null;        logger.fine(getName()+" finished for order '"+name+"'");        setStep(Step.FINISHED, null);        controller = null;    }

ToePool是线程组,用于管理上面的工作线程,初始化、查看活动线程、中断或终止工作线程等

protected CrawlController controller;    protected int nextSerialNumber = 1;    protected int targetSize = 0;     /**     * Constructor. Creates a pool of ToeThreads.      *     * @param c A reference to the CrawlController for the current crawl.     */    public ToePool(AlertThreadGroup atg, CrawlController c) {        //传入父线程组        super(atg, "ToeThreads");                this.controller = c;        setDaemon(true);    }        public void cleanup() {        // force all Toes waiting on queues, etc to proceed        Thread[] toes = getToes();        for(Thread toe : toes) {            if(toe!=null) {                toe.interrupt();            }        }//        this.controller = null;    }    /**     * @return The number of ToeThreads that are not available (Approximation).     */    public int getActiveToeCount() {        Thread[] toes = getToes();        int count = 0;        for (int i = 0; i < toes.length; i++) {            if((toes[i] instanceof ToeThread) &&                    ((ToeThread)toes[i]).isActive()) {                count++;            }        }        return count;     }    /**     * @return The number of ToeThreads. This may include killed ToeThreads     *         that were not replaced.     */    public int getToeCount() {        Thread[] toes = getToes();        int count = 0;        for (int i = 0; i
0) { // must create threads for(int i = 1; i <= difference; i++) { //启动线程 startNewThread(); } } else { //退出多余线程 // must retire extra threads int retainedToes = targetSize; Thread[] toes = this.getToes(); for (int i = 0; i < toes.length ; i++) { if(!(toes[i] instanceof ToeThread)) { continue; } retainedToes--; if (retainedToes>=0) { continue; // this toe is spared } // otherwise: ToeThread tt = (ToeThread)toes[i]; tt.retire(); } } } /** * Kills specified thread. Killed thread can be optionally replaced with a * new thread. * *

WARNING: This operation should be used with great care. It may * destabilize the crawler. * * @param threadNumber Thread to kill * @param replace If true then a new thread will be created to take the * killed threads place. Otherwise the total number of threads * will decrease by one. */ public void killThread(int threadNumber, boolean replace){ Thread[] toes = getToes(); for (int i = 0; i< toes.length; i++) { if(! (toes[i] instanceof ToeThread)) { continue; } ToeThread toe = (ToeThread) toes[i]; if(toe.getSerialNumber()==threadNumber) { toe.kill(); } } if(replace){ // Create a new toe thread to take its place. Replace toe startNewThread(); } } //锁定,防止并发初始化线程 private synchronized void startNewThread() { ToeThread newThread = new ToeThread(this, nextSerialNumber++); newThread.setPriority(DEFAULT_TOE_PRIORITY); newThread.start(); }public void waitForAll() { while (true) try { if (isAllAlive(getToes())) { return; } Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } } private static boolean isAllAlive(Thread[] threads) { for (Thread t: threads) { if ((t != null) && (!t.isAlive())) { return false; } } return true; }

最后,线程组的初始化及工作线程的相关管理在CrawlController对象的相关方法执行

/**     * Maximum number of threads processing URIs at the same time.     */    int maxToeThreads;     public int getMaxToeThreads() {        return maxToeThreads;    }    @Value("25")    public void setMaxToeThreads(int maxToeThreads) {        this.maxToeThreads = maxToeThreads;        if(toePool!=null) {            toePool.setSize(this.maxToeThreads);        }    }private transient ToePool toePool;/**     * Called when the last toethread exits.     */    protected void completeStop() {        LOGGER.fine("Entered complete stop.");        statisticsTracker.getSnapshot(); // ???                this.reserveMemory = null;        if (this.toePool != null) {            this.toePool.cleanup();        }        this.toePool = null;        LOGGER.fine("Finished crawl.");        try {            appCtx.stop();         } catch (RuntimeException re) {            LOGGER.log(Level.SEVERE,re.getMessage(),re);        }                sendCrawlStateChangeEvent(State.FINISHED, this.sExit);        // CrawlJob needs to be sure all beans have received FINISHED signal before teardown        this.isStopComplete = true;        appCtx.publishEvent(new StopCompleteEvent(this));     }/**     * Operator requested for crawl to stop.     */    public synchronized void requestCrawlStop() {        if(state == State.STOPPING) {            // second stop request; nudge the threads with interrupts            getToePool().cleanup();        }        requestCrawlStop(CrawlStatus.ABORTED);    }/**     * @return Active toe thread count.     */    public int getActiveToeCount() {        if (toePool == null) {            return 0;        }        return toePool.getActiveToeCount();    }    protected void setupToePool() {        toePool = new ToePool(alertThreadGroup,this);        // TODO: make # of toes self-optimizing        toePool.setSize(getMaxToeThreads());        toePool.waitForAll();    }    /**     * @return The number of ToeThreads     *     * @see ToePool#getToeCount()     */    public int getToeCount() {        return this.toePool == null? 0: this.toePool.getToeCount();    }    /**     * @return The ToePool     */    public ToePool getToePool() {        return toePool;    }    /**     * Kills a thread. For details see     * {
@link org.archive.crawler.framework.ToePool#killThread(int, boolean) * ToePool.killThread(int, boolean)}. * @param threadNumber Thread to kill. * @param replace Should thread be replaced. * @see org.archive.crawler.framework.ToePool#killThread(int, boolean) */ public void killThread(int threadNumber, boolean replace){ toePool.killThread(threadNumber, replace); }

 说得够清楚吧

--------------------------------------------------------------------------- 

本系列Heritrix 3.1.0 源码解析系本人原创 

本人邮箱:chenying998179@163#com (#改为.)

转载请注明出处 博客园 刺猬的温驯 

本文链接

你可能感兴趣的文章
eclipse工程 'cocostudio/CocoStudio.h' file not found
查看>>
Google Chrome 总提示flash插件过期,用命令行模式解决
查看>>
REST
查看>>
js 深复制一个对象
查看>>
java例程练习(一维数组)
查看>>
我看的书籍
查看>>
pb 动态改变DW的WHERE子句
查看>>
ios 贝塞尔动画
查看>>
Mac 修改用户名
查看>>
Primes on Interval
查看>>
springmvc国际化
查看>>
利用bootstrap插件设置时间
查看>>
zend guard6的使用
查看>>
pta7-7旅游规划(dijkstra算法)
查看>>
pta l2-6(树的遍历)
查看>>
练习题|网络编程-socket开发
查看>>
python中的logger模块详细讲解
查看>>
经典面试题:用户反映你开发的网站访问很慢可能会是什么原因
查看>>
上周面试回来后写的Java面试总结,想进BAT必看
查看>>
Git 远程分支的查看及相关问题
查看>>