java基础-线程池

分类: Java

1.线程池是什么?

线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。
如果每个请求都创建一个线程去处理,那么服务器的资源很快就会被耗尽,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

2.线程池种类

1.单线程的线程池
/**

  • 这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,
  • 那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
    */
    public static void singleThreadPool() {
    ExecutorService pool = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
    pool.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
    });
    }
    }

2.固定大小的线程池
/**

  • 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,
  • 如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
    */
    public static void fixedThreadPool() {
    ExecutorService pool = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 10; i++) {
    pool.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\tfixedThreadPool开始发车啦....");
    });
    }
    }
    3.可缓存的线程池
    /**
  • 如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添
  • 加新线程来处理任务.
    */
    public static void cachedThreadPool() {
    ExecutorService pool = Executors.newCachedThreadPool();
    for (int i = 0; i < 100; i++) {
    pool.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\tcachedThreadPool开始发车啦....");
    });
    }
    }

4.定时以及周期性的线程池
/**

  • 此线程池支持定时以及周期性执行任务的需求。
    /
    public static void scheduledThreadPool() {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
    //延迟10秒执行任务
    /
    for (int i = 0; i < 10; i++) {
    pool.schedule(() -> {
    System.out.println(Thread.currentThread().getName() + "\tscheduledThreadPool开始发车啦....");
    }, 10, TimeUnit.SECONDS);
    }*/
    //每秒执行一次
    pool.scheduleAtFixedRate(() -> {
    System.out.println(Thread.currentThread().getName() + "\tscheduledThreadPool开始发车啦....");
    }, 1, 1, TimeUnit.SECONDS);
    }
    5.newWorkStealingPool
    /**
  • newWorkStealingPool是jdk1.8才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,
  • 底层用的ForkJoinPool来实现的。ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,
  • 把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,
  • 再将这些执行结果合并起来即可。
    */
    public static void workStealingPool() {
    ExecutorService pool=Executors.newWorkStealingPool();
    for (int i = 0; i < 100; i++) {
    pool.execute(() -> {
    System.out.println(Thread.currentThread().getName() + "\tcachedThreadPool开始发车啦....");
    });
    }
    }

3.线程池的拒绝策略

当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务.

RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

1.AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
    }
}

2.CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
                r.run();
        }
    }
}

3.DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

4.DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

4.execute和submit的区别

execute适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了。

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        pool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
        });
    }
}

submit方法适用于需要关注返回值的场景,submit方法的定义如下

public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  ...
}

其子类AbstractExecutorService实现了submit方法,可以看到无论参数是Callable还是Runnable,最终都会被封装成RunnableFuture,然后再调用execute执行。

submit(Callable task)

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Future<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello";
            }
        });
        String result = future.get();
        System.out.println(result);
    }
}

submit(Runnable task, T result)

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Data data = new Data();
        Future<Data> future = pool.submit(new MyRunnable(data), data);
        String result = future.get().getName();
        System.out.println(result);
    }
}
class Data {
    String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
class MyRunnable implements Runnable {
    private Data data;
    public MyRunnable(Data data) {
        this.data = data;
    }
    @Override
    public void run() {
        data.setName("yinjihuan");
    }
}

Future submit(Runnable task)
直接submit一个Runnable是拿不到返回值的,返回值就是null.

5.五种线程池的使用场景

newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。

newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。

newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。

newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。

newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。

6.线程池的关闭

关闭线程池可以调用shutdownNow和shutdown两个方法来实现
shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表。

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 5; i++) {
            System.err.println(i);
            pool.execute(() -> {
                try {
                    Thread.sleep(30000);
                    System.out.println("--");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        List<Runnable> runs = pool.shutdownNow();
    }
}

shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。
还有一些业务场景下需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,isShutdown只是返回你是否调用过shutdown的结果。

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 5; i++) {
            System.err.println(i);
            pool.execute(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println("--");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        pool.shutdown();
        while(true){  
            if(pool.isTerminated()){  
                System.out.println("所有的子线程都结束了!");  
                break;  
            }  
            Thread.sleep(1000);    
        }  
    }
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注