不朽
不朽
发布于 2022-03-17 / 19 阅读
0
0

juc并发常用类

juc并发常用类

1.Callable类-有返回值的线程

package JUC;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**

* 线程创建4种方式:

* 继承Thread、实现Runnable、实现Callable(带返回值)、线程池

* @author JHS
  */
  public class CallableDemo{
   public static void main(String[] args) throws ExecutionException, InterruptedException {
       //FutureTask实现了Runnable
       FutureTask<String> futureTask = new FutureTask<>(new MyCallable());

        new Thread(futureTask).start();
        //接受返回值
       System.out.println(futureTask.get());

   }
}

class MyCallable implements Callable<String> {
   @Override
   public String call() throws Exception {
       return "Callable";
   }
}

2.生产者消费者-可重入锁

package JUC;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**

* 生成者与消费者
  *
* @author JHS
  */
  public class ConsumerAndProvider {
   public static void main(String[] args) throws InterruptedException {
       Resource2 resource2 = new Resource2();
       new Thread(() -> {
           for (int i = 0; i < 6; i++) {
               try {
                   resource2.show();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
    
       }).start();

       new Thread(() -> {
           for (int i = 0; i < 6; i++) {
               try {
                   resource2.show2();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
    
       }).start();
   }
}

/**
* == 原始资源类 ==
  */
  class Resource {
   private int num = 0;

   public synchronized void show() throws InterruptedException {
       while (num != 0) {
           this.wait();
       }
       num++;
       System.out.println(Thread.currentThread().getName() + "--" + num);
       this.notifyAll();
   }

   public synchronized void show2() throws InterruptedException {
       while (num == 0) {
           this.wait();
       }
       num--;
       System.out.println(Thread.currentThread().getName() + "--" + num);
       this.notifyAll();
   }

}

/**
* == 改写后资源类 ==
  */
  class Resource2 {
   private int num = 0;
   //可重入锁
   private Lock lock = new ReentrantLock();

   private Condition condition = lock.newCondition();

   //替换Object中的wait()与notify、notifyAll
   public void show() throws InterruptedException {
       //上锁
       lock.lock();
       try {
           while (num != 0) {
               //等待
               condition.await();
           }
           num++;
           TimeUnit.SECONDS.sleep(2);
           System.out.println(Thread.currentThread().getName() + "--" + "\t 生产产品" + num + "个");
           condition.signalAll();
       } finally {
           lock.unlock();
       }

   }

   public synchronized void show2() throws InterruptedException {

       //上锁
       lock.lock();
       try {
           while (num == 0) {
               //等待
               condition.await();
           }
           num--;
           TimeUnit.SECONDS.sleep(2);
           System.out.println(Thread.currentThread().getName() + "--" + "\t 剩余产品" + num + "个");
           condition.signalAll();
       } finally {
           lock.unlock();
       }

   }

}


3.CountDownLatch类-栅栏线程

package JUC;

import java.util.concurrent.CountDownLatch;

/**

 * 需求:使用多个线程收集树叶,最后收集完毕

 * CountDownLatch:累减直到到达指定的值

 * @author JHS
   */
   public class CountDownLatchDemo {
   public static void main(String[] args) throws InterruptedException {
       //计数减
       CountDownLatch count = new CountDownLatch(6);
       for (int i = 1; i <= 6; i++) {
       new Thread(()->{
               System.out.println(Thread.currentThread().getName()+"\t 正收集树叶");
               //减
               count.countDown();
       }).start();
   }
       //等待,直到减到0才唤醒
       count.await();
       System.out.println(Thread.currentThread().getName()+"树叶收集完毕");

   }

}


4.CyclicBarrier类-循环栅栏

package JUC;


import java.util.concurrent.CyclicBarrier;

/**

 * CyclicBarrier 循环栅栏

 * 需求:集齐七颗龙珠召唤神龙

 * @author JHS
   */
   public class CyclicBarrierDemo {
   public static void main(String[] args) {
       CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("召唤神龙!"); });

       for (int i = 1; i <=7 ; i++) {
           int finalI = i;
           new Thread(()->{
               try {
               System.out.println(Thread.currentThread().getName()+"收集"+ finalI +"颗龙珠");


                    cyclicBarrier.await();
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }).start();
        }
    
    }

}


5.Semaphore类-信号量

package JUC;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**

 * juc- Semaphore: 信号量

 * 在 semaphore.acquire() 和 semaphore.release()之间的代码,同一时刻只允许制定个数(默认为一)的线程进入

 * @author JHS
   */
   public class SemaphoreDemo {
   public static void main(String[] args) {
       //模拟资源类,有3个空闲车位
       Semaphore semaphore = new Semaphore(3);

       //模拟7辆车抢占3个车位
       for (int i = 1; i <7 ; i++) {
           new Thread(()->{
               try {
                   semaphore.acquire();
                   System.out.println(Thread.currentThread().getName()+"\t 抢占到了车位");
                   TimeUnit.SECONDS.sleep(3);
                   System.out.println(Thread.currentThread().getName()+"\t 离开了车位");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }finally {
                   semaphore.release();
               }
           }).start();
       }

   }
   }

6.ReadWriteLock类-读写锁

package JUC;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**

 * ReadWriteLock: 读写锁

 * 读-读: 多线程一起读

 * 读-写: 同步、

 * 写-写: 同步
   *

 * @author JHS
   */
   public class ReadWriteLockDemo {
   public static void main(String[] args) {
       Map<Integer, String> map = new HashMap<>();
       ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
       for (int i = 1; i < 4; i++) {
           int v = i;
           new Thread(() -> {
               //加锁
               readWriteLock.writeLock().lock();
               try {
                   //写入数据
                   System.out.println("写入数据");
                   TimeUnit.SECONDS.sleep(2);
                   map.put(v, Thread.currentThread().getName());
                   System.out.println("写入完成");

               } catch (InterruptedException e) {
                   e.printStackTrace();
               } finally {
                   readWriteLock.writeLock().unlock();
               }
           }).start();
       }
       for (int j = 1; j <4 ; j++) {
           int v=j;
           new Thread(()->{
               //加锁
               readWriteLock.readLock().lock();
               try {
                   //写入数据
                   System.out.println("读取数据");
                   TimeUnit.SECONDS.sleep(2);
                   map.get(v);
                   System.out.println("读取完成");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }finally {
                   readWriteLock.readLock().unlock();
               }
           }).start();
       }

   }
   }

7.ForkJoinPool类-拆分任务同时执行,合并结果

package JUC;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**

 * 需求:计算1到100的数

 * forkjoin: 分而治之

 * 它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果(类似于大数据的计算

 * ForkJoinPool: 线程池,主要是将大任务划分小任务

 * ForkJoinTask就是ForkJoinPool里面的每一个任务

 * ForkJoinTask有两个主要的子类

 * (1)RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)

 * (2)RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

 * RecursiveAction和RecursiveTask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果

 * @author JHS
   */
   public class ForkJoinDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException {

       Mange myTack = new Mange(0, 100);
       ForkJoinPool pool = new ForkJoinPool();
       ForkJoinTask<Integer> task = pool.submit(myTack);
       //获取结果
       System.out.println(task.get());
       //关闭资源
       pool.shutdown();

   }
   }

class Mange extends RecursiveTask<Integer> {
    private static final Integer PART_VALUE=10;//将任务拆分为多个任务,每10
    //从哪里开始
    private int begin;
    //结束数字
    private int end;
    //结果
    private int result;
     public Mange(int begin,int end){
        this.begin=begin;
        this.end=end;
        this.result=0;
    }

    @Override
    protected Integer compute() {
         //小任务直接计算
      if (end-begin<=PART_VALUE){
          for(int i=begin;i<=end;i++){
              result=result+i;
          }
      }else {
            int Middle=(begin+end)/2;
            //100分成一半分别计算
          Mange task1 = new Mange(begin, Middle);
          Mange task2 = new Mange(Middle+1, end);
          //分开
            task1.fork();
            task2.fork();
            //合并结果
            result=task1.join()+task2.join();
      }
        return result;
    }

}


8.ThreadPoolExecutor-线程池

package JUC;

import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;


/**

 * ThreadPoolExecutor线程池
 * @author JHS
   */


public class MutilsThread {

    private static final Logger logger=Logger.getLogger("log");
    /**
     * 线程池七大参数
    * 1.corePoolSize: 常驻核心线程数(银行平时所开窗口)
    * 2.maximumPoolSize: 所能容纳最大线程数(银行最大可开窗口)
    * 3.keepAliveTime: 空闲线程存活时间 (当高峰过去,空闲窗口存活时间)
    * 4.TimeUnit: 时间单位
    * 5.BlockingQueue: 阻塞队列 (银行等候室)
         * 常用阻塞队列
         * ArrayBlockingQueue、LinkedBlockingQueue
    * 6.ThreadFactory: 池中创建线程的线程工程,保持默认就好 (银行商标)
    * 7.RejectedExecutionHandler: 拒绝策略,当最大线程数与阻塞队列满后所采用的拒绝策略
         * 拒绝策略:
         * 1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
         * 2. DiscardPolicy:丢弃任务,但是不抛出异常
         * 3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
         * 4. CallerRunsPolicy:由调用线程(提交任务的线程(一般主线程))处理该任务
    * @param args
    * @return void
    */
    public static void main(String[] args) throws InterruptedException {

//        TimeUnit.SECONDS.sleep(20); //休眠20秒
        //实际请求数量=最大容量+阻塞队列容量(10)
      ThreadPoolExecutor pool1=new ThreadPoolExecutor(3,
              6, 60L, TimeUnit.SECONDS,
              new ArrayBlockingQueue<>(4),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
        logger.setLevel(Level.INFO);
        logger.info("启动");
        for (int i = 0; i <6 ; i++) {
            TimeUnit.SECONDS.sleep(2);
            Runnable runnable1=()-> System.out.println("窗口--"+Thread.currentThread().getName()+"号 \t 正在处理业务");
            pool1.execute(runnable1);
        }
        //关闭资源
        pool1.shutdown();


    }

}

9.CompletableFuture-异步回调

package JUC;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**

 * CompletableFuture: 异步回调

 * @author JHS
   */
   public class CompletableFutureDemo {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
       //方式一
       CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
           System.out.println(Thread.currentThread().getName() + "第一种---没有返回值");
       });
       System.out.println("==============================");
       System.out.println("==============================");
       //方式二有返回值

       CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
           System.out.println(Thread.currentThread().getName() + "第二种有返回----supplyAsync");
           int i=4/0;
           return 200;
       });
       
       //传入一个Bicosumer,两个参数的消费型函数式接口
       Integer result = integerCompletableFuture.whenComplete((t, u) -> {
           //正常运行的参数
           System.out.println("t" + "--->" + t);
           //发生异常参数
           System.out.println("u" + "--->" + u);
       }).exceptionally(f -> {
           System.out.println("发生异常" + f.getMessage());
           return 500;
       }).get();//get为获取返回值
       System.out.println(result);


    }

}

评论