本文档,适合于对多线程有一定基础的开发人员。对多线程的一些基础性的解读,请参考《java并发编程》的前5章。
对于源代码的解读,本人认为可读可不读。如果你想成为一位顶级的程序员,那就培养自己底层的逻辑能力,自己写算法,然后让别人学习你的源代码。研究源代码这件事,更多的是针对于初学者。贡献源码的人,也是程序员,只不过是级别不同,或者在理论上,更加高屋建瓴。在现实中,能够兼顾理论和编程的程序员不多,如果谁想成为一流程序员的话,建议从理论上入手,代码量不能代表全部。对于多线程,本人仍然认为,理论很重要。

多线程编程,在软件开发中占有十分重要的地位。本人对线程同步的本质的理解是:把对一个或者多个的共享状态的复合操作转变为原子性的操作,同时保证共享状态在内存中的可见性。抽象起来就是原子性和可见性。

1.多线程并发时,会存在竞态条件。常见的竞态条件包括先检查后执行机制的竞争和原子性操作竞争,比如同时对一个整数++操作,这个操作可以分割为三个步骤:读取、加法操作与写入(生效)。解决先检查后执行机制的竞态条件的有效手段是采用双检索。对方法加锁,会大大滴降低吞吐量和性能,因此,不建议直接对方法加锁,常见的做法是,对多个线程同时竞争的变量加锁,或者采用ReentrantLock底层的CAS算法(free-lock).如果想深入理解ReentrantLock的原理,请查看java.util.concurrent包下的源代码。
2.任务执行策略与中断策略和饱和策略:在多线程环境中,当定义好了公共资源类,与执行任务时(比如生产者与消费者任务),接下来就要考虑任务执行策略与中断策略和饱和策略,以提升系统的吞吐量和性能,同时在运行时,要考虑吞吐量与CPU占有率的折中。在多线程中,最重要的就是以上三种策略的定制。采用默认的,不一定能满足要求。线程池底层,调用的是ThreadPoolExecutor这个类,我们可以扩展他,实现自己的需求。在这里,先讲一下,默认的任务执行策略。(任务执行策略包括:是否为每一个任务开启一个线程,还是所有任务在一个线程中执行,任务执行的顺序,比如FIFO,还是按照优先级等等),所以, 这里涉及到两个比较重要的东西:一是数量问题,包括线程池的基本容量,最大容量以及BlockingQueue<Runnable> 是采用有界的还是无界的,二是BlockingQueue的数据结构,如果执行顺序是FIFO,就采用非优先级的Queue,如果是按优先级,那就使用PriorityLinkedQueue。下面,结合一下ThreadPoolExecutor源代码讲解一下:
在使用时,我们一般会这样:
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(Runnable);
先从execute方法开始,一层一层剖析:
ThreadPoolExecutor中的几个重要变量:
 
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
 
The workerCount is the number of workers that have been permitted to start and not permitted to stop. 
ctl是一个重要的变量,主要包装两个重要的概念:一是workerCount:effective number of threads,二是runState:  indicating whether running, shutting down etc   
英文解释:
The main pool control state, ctl, is an atomic integer packing
two conceptual fields
workerCount, indicating the effective number of threads
runState,    indicating whether running, shutting down etc

在以上状态变量中,RUNNING可以接受新的task,并且可以处理queue中的task,SHUTDOWN不可以接受新的task,但是可以处理queue中的task,其他的全都不可以。还是英文解释比较好,研究源代码,最好是看英文原版的,不要看汉语版的:

RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {---------------------①
            if (addWorker(command, true))//如果添加失败,返回false,可能是由于创建线程时遇到意外,比如terminated,重新调用ctl.get()计算wc
                return;
            c = ctl.get();
        }//如果当前执行的线程数量小于corePoolSize,但是添加任务时,遇到了意外,或者,当前执行的线程数量大于corePoolSize,这两种情况,都会进入②处代码
        if (isRunning(c) && workQueue.offer(command)) {------------------②//如果当前线程池中的线程正处于RUNNING状态,并且阻塞队列的容量没有达到上限,重新检查ctl.get()返回的状态
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);//如果此处状态不是RUNNING,也不是SHUTDOWN,那么,拒绝任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//由于任务放到了BlockingQueue中,此处,在Worker中,不添加task,而是运行任务时,从queue取出task
        }
        else if (!addWorker(command, false))-------------------------③//除了以上情况以外,比如BlockingQueue饱和了,线程池容量也饱和了,执行饱和策略,默认为AbortPolicy,拒绝任务
            reject(command);
    }
 
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
 
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
 
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))//此处判断非常重要
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);------------------------------①//把firstTask加到Worker中,并创建一个线程
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
 
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);------------------------------②//把worker加到Set<Worker>中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;------------------------------③//添加成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();------------------------------④执行任务
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);------------------------------⑤//添加失败,从Set<Worker>中移除Worker
        }
        return workerStarted;
    }
接下来,看看Woker:
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
 
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
 
 
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
 
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
         public void run() {----------------①
            runWorker(this);----------------②
         }
 
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);//重新置为0
            return true;
        }
 
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Worker的本质是Runnable,因此在addWorker()中的t.start()中,实际是调用worker的run()方法,看②处的runWorker()方法:
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {---------------------①
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);-------------------------②
                    Throwable thrown = null;
                    try {
                        task.run();-------------------------③
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);-------------------------④
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
最重要的地方,已经做了标识。对于①处,(task = getTask()) != null,这是在execute方法中,当workerCountof(recheck)== 0时,把task放到BlockingQueue中,所以用getTask()取出task。在execute之前和之后,可以做一些事情,自定义扩展,比如实现统计和计时功能。
以上为ThreadPoolExecutor源代码的关键地方的比较粗浅的解读,下面,来进入应用阶段:
Executors.newFixedThreadPool(x)中,默认的,BlockingQueue为无界的LinkedBlockingQueue,使用无界的queue,会因为queue的无限制扩展,而导致资源被耗尽,Executors.newCachedThreadPool()中,线程池的大小没有限制,队列采用的是SynchronousQueue,SynchronousQueue本质上并不是一个队列,而是基于线程间传递机制的一种运行策略。当向SynchronousQueue中添加task时,必须保证线程在等待接收task,可以与运行的线程直接交互。如果需要实现线程池的容量和queue的容量都有限制,并且需要自定义执行策略和饱和策略时,可以扩展ThreadPoolExecutor。ThreadPoolExecutor的构造器中结束如下参数:
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
其中有:colrePoolSize,线程池的基本大小, maximumPoolSize,线程池中能够同时运行的线程数量的上限,keepAliveTime,超过此时间,空闲线程将被回收,阻塞队列Blockin共Queue,还有RejectedExecutionHandler,任务拒绝处理类。
下面, 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略
package httpClient;
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
 
/**
 * 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略
 * @author TongXueQiang
 * @date 2016/05/19
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
    private final AtomicLong numTasks = new AtomicLong(1);
    private final AtomicLong totalTime = new AtomicLong();
 
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
 
    }
    /**
     * 任务执行前
     */
    protected void beforeExecute(Thread t,Runnable r){
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s",t,r));
        startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
    }
    /**
     * 任务执行后
     * @param r 任务
     * @param t 执行任务的线程
     */
    protected void afterExecutor(Runnable r,Throwable t){
        try {
            Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
            Long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
 
    protected void terminated () {
        try {
            log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }        
    }
}
 
//自定义简易爬虫
package httpClient;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 网页抓取
 * @author TongXueQiang
 * @date 2016/05/16
 */
public class UrlHanding {
    private final int THREADS = 10;
    private final ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
    BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
    private final ExecutorService consumerExecutor = new MyThreadPoolExecutor(10, 10, 1000,TimeUnit.MILLISECONDS, q, new ThreadPoolExecutor.CallerRunsPolicy());//调用者执行的饱和策略
    private final CountDownLatch startLatch = new CountDownLatch(1);
    private final CountDownLatch endLatch = new CountDownLatch(THREADS);
    private static UrlQueue queue;
 
    public void urlHanding(String[] seeds) throws InterruptedException {        
        queue = getUrlQueue();
        System.out.println("处理器数量:"+Runtime.getRuntime().availableProcessors());
        long start = (long) (System.nanoTime() / Math.pow(10, 9));
        producerExecutor.execute(new GetSeedUrlTask(queue,seeds,startLatch));        
        producerExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
        producerExecutor.shutdown();
        startLatch.await();
 
        UrlDataHandingTask []url_handings = new UrlDataHandingTask[THREADS];
        for (int i = 0;i < THREADS;i++) {
            url_handings[i] = new UrlDataHandingTask(startLatch,endLatch,queue);
            consumerExecutor.execute(url_handings[i]);            
        }
        consumerExecutor.shutdown();
        startLatch.countDown();
        doSomething();
        endLatch.await();
 
        long end = (long) (System.nanoTime() / Math.pow(10,9) - start);
        System.out.println("耗时: " + end + "秒");
    }
 
    private void doSomething() {
 
 
    }
 
    private UrlQueue getUrlQueue() {
        if (queue == null) {
            synchronized(UrlQueue.class){
                if (queue == null) {
                    queue = new UrlQueue();
                    return queue;
                }
            }
        }
        return queue;
    }
}
上面,是典型的生产者和消费者线程模式,把ArrayBlockingQueue当做公共资源,这里,要处理好消费者线程无限期阻塞的问题,通过在queue的最后加入“毒丸”对象,当每个线程从queue中取出的对象为“毒丸”对象时,停止迭代。
以下为消费者线程:
package httpClient;
 
import java.util.concurrent.CountDownLatch;
 
public class UrlDataHandingTask implements Runnable {
    private CountDownLatch startLatch;
    private CountDownLatch endLatch;
    private UrlQueue queue;
 
    public UrlDataHandingTask(CountDownLatch latch, CountDownLatch endLatch, UrlQueue queue) {
        this.startLatch = latch;
        this.endLatch = endLatch;
        this.queue = queue;        
    }
 
    /**
     * 下载对应的页面并抽取出链接,放入待处理队列中
     * 
     * @param url
     * @throws InterruptedException
     */
    public void dataHanding(String url) throws InterruptedException {
        getHrefOfContent(DownPage.getContentFromUrl(url));
        for (String url0 : VisitedUrlQueue.visitedUrlQueue) {
            System.out.println(url0);
        }
    }
 
    @Override
    public void run() {
        try {
            startLatch.await();
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        }
 
        while (!queue.isEmpty()) {
            try {
                String url = queue.outElem();
                if ("".equals(url.trim())) {//“毒丸”对象为空
                    queue.addElem(url);
                    break;
                }
                dataHanding(url);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        endLatch.countDown();
 
    }
 
    /**
     * 获取页面源代码中的超链接
     * 
     * @param content
     * @throws InterruptedException
     */
    public void getHrefOfContent(String content) throws InterruptedException {
        System.out.println("开始");
        String[] contents = content.split("<a href=\"");
        for (int i = 1; i < contents.length; i++) {
            int endHref = contents[i].indexOf("\"");
            String aHref = FunctionUtils.getHrefOfInOut(contents[i].substring(0, endHref));
            if (aHref != null) {
                String href = FunctionUtils.getHrefOfInOut(aHref);
                if (queue.isContains(href) && !VisitedUrlQueue.isContains(href)
                        && href.indexOf("/code/explore") != -1) {
                    // 放入待抓取队列中
                    queue.addElem(href);
                }
            }
        }
        System.out.println(queue.size() + "--抓取到的连接数");
        System.out.println(VisitedUrlQueue.size() + "--已处理的页面数");
    }
 
}
生产者线程:
package httpClient;
 
import java.util.concurrent.CountDownLatch;
 
public class GetSeedUrlTask implements Runnable {
    private UrlQueue queue;
    private String[] seeds;
    private CountDownLatch startLatch;
 
    public GetSeedUrlTask(UrlQueue queue, String[] seeds,CountDownLatch startLatch) {
        this.queue = queue;
        this.seeds = seeds;
        this.startLatch = startLatch;
    }
 
    public void addUrl() {
        try {
            for (String url : seeds) {
                queue.addElem(url);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
 
    @Override
    public void run() {
        addUrl();        
        try {
            queue.addElem("");//加入“毒丸”对象
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        startLatch.countDown();
    }
}
未完待续……
 
 

最新文章

  1. AC日记——I Hate It 洛谷 P1531
  2. 解决Tomcat7“At least one JAR was scanned for TLDs yet contained no TLDs”问题
  3. CSS3之让背景图片全部显示
  4. select框内容的编辑、修改、添加、删除操作
  5. hibernate 表配置文件如何设置表字段的默认值
  6. ASP.NET MVC学习之路由篇(2)
  7. Spring中的AOP应用
  8. poj 2349(最小生成树应用)
  9. postgresql大批量数据导入方法
  10. 转:传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确 .
  11. 更改css element.style
  12. RAM和DDR
  13. mongodb 更新数组出现can&#39;t append to array using string field name
  14. IOS 本地通知 UILocalNotification
  15. 每个Web开发人员应该知道的12个终端命令
  16. day8(字符串操作)
  17. unity零基础开始学习做游戏(三)鼠标输入,来个虚拟摇杆怎么样?
  18. 树莓派3B+上运行.Net Core项目
  19. [Swift]LeetCode238. 除自身以外数组的乘积 | Product of Array Except Self
  20. axios 中文文档(转载)

热门文章

  1. .net core web API使用Identity Server4 身份验证
  2. 关于 Nginx的相关学习
  3. docker-每天5分钟玩转Docker容器技术
  4. 心知天气数据API 产品的高并发实践
  5. 巧用CSS3之background渐变
  6. windows下git创建本地分支并建立对应远程分支
  7. [LeetCode] 62. 不同路径 ☆☆☆(动态规划)
  8. 无法将文件“E:\NetWorkPace\Permission\packages\EntityFramework.6.1.1\lib\net45\EntityFramework.xml”复制到“bin\EntityFramework.xml”。对路径“bin\EntityFramework.xml”的访问被拒绝。
  9. visual studio 应用场景
  10. GooglePlay测试支付遇到的问题