由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

JAVA-线程池与JUC

JAVA 西门飞冰 855℃
[隐藏]

1.线程池

并发是伴随着多核处理器的诞生而产生的,为了充分 利用硬件资源,诞生了多线程技术。但是多线程又存在资源竞争的问题,引发了同步和互斥的问题,JDK 1.5推出的java.util.concurrent(并发工具包)来解决 这些问题。

1.1.new Thread的弊端

  • new Thread()新建对象,性能差
  • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,严重时会占用过多系统资源导致死机或OOM

1.2.ThreadPool — 线程池

  • 重用存在的线程,减少新建对象、消亡的开销
  • 线程总数可控,提高资源的利用率
  • 避免过多资源竞争,避免阻塞
  • 提供额外功能,定时执行、定期执行、监控等

1.3.线程池的种类

在java.util.concurrent中,提供了工具类Executors(调度器)对象来创建线程池,可创建的线程池有四种:

  1. CachedThreadPool – 可缓存线程池
  2. FixedThreadPool – 定长线程池
  3. SingleThreadExecutor – 单线程池
  4. ScheduledThreadPool – 调度线程池

可缓存线程池示例:

public class ThreadPoolSample1 {
    public static void main(String[] args) {
        //调度器对象
        //ExecutorService用于管理线程池
        ExecutorService threadPool = Executors.newCachedThreadPool();//创建一个可缓存线程池
        //可缓存线程池的特点是,无限大,如果线程池中没有可用的线程则创建,有空闲线程则利用起来
        for(int i = 1 ; i <= 1000 ; i++) {
            final  int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + index);
                }
            });
        }
        try {
            Thread.sleep(1000); //跟线程足够的运行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //shutdown() 代表关闭线程池(等待所有线程完成)
        //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用
        threadPool.shutdown();
    }
}

定长线程池示例:

public class ThreadPoolSample2 {
    public static void main(String[] args) {
        //调度器对象
        //ExecutorService用于管理线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);//创建一个可创建一个定长线程池
        //定长线程池的特点是固定线程总数,空间线程用于执行任务,如果线程都在使用后续任务则处于等待状态,在线程池中的线程
        //如果任务处于等待的状态,备选的等待算法默认为FIFO(先进先出) LIFO(后进先出)
        //执行任务后再执行后续的任务。
        for(int i = 1 ; i <= 1000 ; i++) {
            final  int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + index);
                }
            });
        }
        try {
            Thread.sleep(1000); //跟线程足够的运行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //shutdown() 代表关闭线程池(等待所有线程完成)
        //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用
        threadPool.shutdown();
    }
}

单线程池示例:通常用来当守护线程,企业中一切线程往线程池上靠,是最简单无脑的选择

public class ThreadPoolSample3 {
    public static void main(String[] args) {
        //调度器对象
        //ExecutorService用于管理线程池
        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单线程线程池
        for(int i = 1 ; i <= 1000 ; i++) {
            final  int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + index);
                }
            });
        }
        try {
            Thread.sleep(1000); //跟线程足够的运行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //shutdown() 代表关闭线程池(等待所有线程完成)
        //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用
        threadPool.shutdown();
    }
}

调度线程池示例:

public class ThreadPoolSample4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool =  Executors.newScheduledThreadPool(5);//可调度线程池
        /*//延迟三秒执行一次Run方法
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("延迟3秒执行");
            }
        } , 3 , TimeUnit.SECONDS);*/
        //Timer , 项目实际开发中scheduledThreadPool与Timer都不会用到,因为有成熟的调度框架Quartz,或者Spring自带调度,
        //程序的调度框架支持一种表达式叫做Cron表达式,有兴趣的童鞋可以了解一下。
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date() + "延迟1秒执行,每三秒执行一次");
            }
        }, 1, 3, TimeUnit.SECONDS);
    }
}

2.CountDownLatch倒计时锁

  • CountDownLatch倒计时锁特别适合”总-分任务”, 例如多线程计算后的数据汇总
  • CountDownLatch类位于java.util.concurrent (J.U.C)包下,利用它可以实现类似计数器的功能。 比如有一个任务A,它要等待其他3个任务执行完毕之 后才能执行,此时就可以利用CountDownLatch来实 现这种功能了。

执行原理:

 

image-20220704093202043

代码示例:

public class CountDownSample {
    private static int count = 0;   //初始化累加的总数
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(100);     //定长线程池启动100个线程
        CountDownLatch cdl = new CountDownLatch(10000); //CDL总数和操作数保持一致
        for(int i = 1 ; i <= 10000 ; i++) {     //执行一万次累加操作
            final int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    synchronized (CountDownSample.class) {
                        try {
                            count = count + index;
                            //计数器减一
                        }catch(Exception e){
                            e.printStackTrace();
                        }finally {
                            cdl.countDown();
                        }
                    }
                }
            });
        }
/*        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        try {
            cdl.await(); //堵塞当前线程,直到cdl=0的时候再继续往下走
            //为了避免程序一直挂起,我们可以设置一个timeout时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
        threadPool.shutdown();
    }
}

3.Semephore信号量

Semaphore信号量经常用于限制获取某种资源的线程数量。下面举个例子, 比如说操场上有5个跑道,一个跑道一次只能有一个学生在上面跑步,一旦 所有跑道在使用,那么后面的学生就需要等待,直到有一个学生不跑了

image-20220704095907036

代码示例:

public class SemaphoreSample1 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();   //定义一个可缓存线程池
        Semaphore semaphore = new Semaphore(5);//定义5个信号量,也就是说服务器只允许5个人在里面玩
        for(int i = 1 ; i <= 20 ; i++) {
            final int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();//获取一个信号量,“占用一个跑到”
                        play();
                        semaphore.release();//执行完成后释放这个信号量,“从跑道出去”
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            });
        }
        threadPool.shutdown();
    }

    public static void play(){

        try {
            System.out.println(new Date() + " " + Thread.currentThread().getName() + ":获得紫禁之巅服务器进入资格");
            Thread.sleep(2000);
            System.out.println(new Date() + " " + Thread.currentThread().getName() + ":退出服务器");
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.CyclicBarrier循环屏障

CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同 的是该barrier在释放等待线程后可以重用,所以称它为循 环(Cyclic)的屏障(Barrier)

image-20220704101613766

CyclicBarrier用于让线程必须运行:

image-20220704101023014

CyclicBarrier的应用场景:CyclicBarrier适用于多线程必须同时开始的场景,比如跑分和秒杀以及抢票

代码示例:

public class CyclicBarrierSample {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i = 1 ; i<=20 ; i++) {
            final int index = i;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    go();
                }
            });

        }
        executorService.shutdown();
    }

    private static void go(){
        System.out.println(Thread.currentThread().getName() + ":准备就绪" );
        try {
            cyclicBarrier.await();//设置屏障点,当累计5个线程都准备好后,才运行后面的代码
            System.out.println(Thread.currentThread().getName() + ":开始运行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

5.ReentrantLock重入锁(不推荐使用)

  • 重入锁是指任意线程在获取到锁之后,再次获取该锁而不会被该锁 所阻塞
  • ReentrantLock设计的目标是用来替代synchronized关键字

ReentrantLock与synchronized的区别:

image-20220704104123336

6.Condition线程等待与唤醒

Condition条件唤醒:

  • 我们在并行程序中,避免不了某些线程要预先规定好的顺序执行, 例如:先新增再修改,先买后卖,先进后出……,对于这类场景,使 用JUC的Condition对象再合适不过了。
  • JUC中提供了Condition对象,用于让指定线程等待与唤醒,按预期 顺序执行。它必须和ReentrantLock重入锁配合使用。
  • Condition用于替代wait()/notify()方法

​– notify只能随机唤醒等待的线程,而Condition可以唤醒指定的线程,这有利于更好 的控制并发程序。

Condition核心方法:

  • await() – 阻塞当前线程,直到singal唤醒
  • signal() – 唤醒被await的线程,从中断处继续执行
  • signalAll() – 唤醒所有被await()阻塞的线程

执行过程:

image-20220704110754749

public class ConditionSample {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(); //Condition对象必须配合Lock一起使用
        Condition c1 = lock.newCondition(); //创建Condition
        Condition c2 = lock.newCondition();
        Condition c3 = lock.newCondition();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();    //加锁
                try {
                    c1.await();    //阻塞当前线程,直到有人调用c1.singal的时候线程激活继续执行
                    Thread.sleep(1000);
                    System.out.println("粒粒皆辛苦");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();  //解锁
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock(); //加锁
                try {
                    c2.await();
                    Thread.sleep(1000);
                    System.out.println("谁知盘中餐");
                    c1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    c3.await();
                    Thread.sleep(1000);
                    System.out.println("汗滴禾下土");
                    c2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    Thread.sleep(1000);
                    System.out.println("锄禾日当午");
                    c3.signal();    //T3线程继续执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }).start();
    }
}

7.Callable_Future

  • Callable和Runnable一样代表着任务,区别在于 Callable有返回值并且可以抛出异常。
  • Future 是一个接口。它用于表示异步计算的结果。提 供了检查计算是否完成的方法,以等待计算的完成, 并获取计算的结果。

代码示例:

public class FutureSample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for(int i = 2 ; i <= 10000 ; i++){
            Computor c = new Computor();
            c.setNum(i);
            //Future是对用于计算的线程进行监听,因为计算是在其他线程中执行的,所以这个返回结果的过程是异步的
            Future<Boolean> result = executorService.submit(c);//将c对象提交给线程池,如有空闲线程立即执行里面的call方法
            try {
                Boolean r = result.get(); //用于获取返回值,如果线程内部的call没有执行完成,则进入等待状态,直到计算完成
                if(r == true){
                    System.out.println(c.getNum());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }
}
class Computor implements Callable<Boolean>{
    private Integer num;

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    @Override
    public Boolean call() throws Exception {
        boolean isprime = true;
        for(int i = 2 ; i < num ; i++) {
            if (num % i == 0) {
                isprime = false;
                break;
            }
        }

        return isprime;
    }
}

8.同步容器

  • ArrayList -> CopyOnWriteArrayList – 写复制列表
  • HashSet -> CopyOnWriteArraySet – 写复制集合
  • HashMap -> ConcurrentHashMap – 分段锁映射

8.1.CopyOnWriteArrayList并发原理

CopyOnWriteArrayList通过“副本”解决并发问题

image-20220704150008972

代码示例:

public class CopyOnWriteArrayListSample {
    public static void main(String[] args) {
        //写复制列表
        List<Integer> list = new CopyOnWriteArrayList<>();
        for(int i = 0 ; i < 1000 ; i++){
            list.add(i);
        }
        Iterator<Integer> itr = list.iterator();
        while (itr.hasNext()) {
            Integer i = itr.next();
            list.remove(i);
        }
        System.out.println(list);
    }
}

8.2.ConcurrentHashMap

ConcurrentHashMap 采用”分段锁“的方式

image-20220704150053131

代码示例:

public class ConcurrentHashMapSample {
    public static int users = 100;  //同时模拟的并发访问用户数量
    public static int downTotal = 50000; //用户下载的真实总数
    public static ConcurrentHashMap count = new ConcurrentHashMap() ;//计数器

    public static void main(String[] args) {
        ExecutorService executorService  = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(users);
        for(int i = 0 ; i < downTotal ; i++){ final Integer index = i; executorService.execute(()->{
                //通过多线程模拟N个用户并发访问并下载
                try {
                    semaphore.acquire();
                    count.put(index, index);
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();//关闭调度服务
        System.out.println("下载总数:" + count.size());
    }
}

转载请注明:西门飞冰的博客 » JAVA-线程池与JUC

喜欢 (3)or分享 (0)