前言
- 在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类
- 用于定义类似于线程的自定义子系统,包括线程池,异步IO和轻量级任务框架;还提供了用于多线程上下文中的 Collection实现等。
Java 内存模型中的可见性、原子性和有序性。
可见性:
- 可见性,是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的。
原子性:
- 原子是世界上的最小单位,具有不可分割性。比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以不是一个原子操作。
有序性:
- Java 语言提供了
volatile
和synchronized
两个关键字来保证线程之间操作的有序性,volatile 是因为其本身包含“禁止指令重排序”的语义,synchronized 是由“一个变量在同一个时刻只允许一条线程对其进行 lock 操作”
这条规则获得的,此规则决定了持有同一个对象锁的两个同步块只能串行执行。
volatile关键字
- 用法:
- 当多个线程操作
共享数据
时,可以保证内存中的数据可见。用这个关键字修饰共享数据,就会及时的把线程缓存中的数据刷新到主存中去,也可以理解为,就是直接操作主存中的数据。所以在不使用锁
的情况下,可以使用volatile。- 相较于 synchronized 是一种较为轻量级的同步策略;
//标记 public volatile boolean flag=false;
- volatile 不具备”互斥性”(当一个线程持有锁时,其他线程进不来,这就是互斥性)。
- volatile 不能保证变量的”原子性”;
- synchronized和volatile的区别: (1)synchronized可以实现互斥性和内存可见性,不能禁止指令重排序。 (2)volatile可以实现内存可见性,禁止指令重排序,不能保证原子性(互斥性)。
- volatile 性能:
- volatile 的读性能消耗与普通变量几乎相同,但是写操作稍慢,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行。
i++的原子性问题
- (1) i++的操作实际上分为三个步骤: “读-改-写”;
- i++可拆分为:
- int temp1=i;
- int temp2=temp+1;
- i=temp2;
- 使用
javap -c Demo.class
可查看字节码- (2) 原子性: 就是”i++”的”读-改-写”是不可分割的三个步骤;
- (3) 原子变量: JDK1.5 以后, java.util.concurrent.atomic包下,提供了
常用的原子变量
;
- 原子变量中的值,使用volatile 修饰,保证了内存可见性;
CAS(Compare-And-Swap) 算法
保证数据的原子性;
- 两种解决非原子性方案:
- 使用synchronized加上锁,但是效率比较低
- 使用原子变量(例如:AtomicInteger类)
代码案例:(使用原子变量)
public class AtomicThread implements Runnable{ //private int num=0; private AtomicInteger atomicinteger=new AtomicInteger(0); @Override public void run() { System.out.println(atomicinteger.getAndIncrement()); //i++ } }
CAS算法
- CAS(Compare-And-Swap) 算法是硬件对于并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问;
- CAS 是一种
无锁的非阻塞算法
(属于乐观锁)的实现;- CAS 包含了三个操作数:
- 进行比较的旧预估值: A
- 需要读写的内存值: V
- 将写入的更新值: B
- 当且仅当 A == V 时, V = B, 否则,将不做任何操作,并且这个比较交换过程属于原子操作;
- 模拟CAS算法
public class CompareAndSwapDemo { public static void main(String[] args) { CompareAndSwap compareAndSwap = new CompareAndSwap(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { int expect = compareAndSwap.get(); boolean b = compareAndSwap.compareAndSwap(expect, new Random().nextInt(101)); System.out.println(b); } }).start(); } } } class CompareAndSwap { private int value; /** * 获取值 * * @return */ public synchronized int get() { return value; } public synchronized boolean compareAndSwap(int expect, int newValue) { if (this.value == expect) { this.value = newValue; return true; } return false; } }
ABA问题:
- 在CAS算法中,需要取出内存中某时刻的数据(由用户完成),在下一时刻比较并替换(由CPU完成,该操作是原子的)。这个时间差中,会导致数据的变化。
- 假设如下事件序列:
- 线程 1 从内存位置V中取出A。
- 线程 2 从位置V中取出A。
- 线程 2 进行了一些操作,将B写入位置V。
- 线程 2 将A再次写入位置V。
- 线程 1 进行CAS操作,发现位置V中仍然是A,操作成功。
- 尽管线程 1 的CAS操作成功,但不代表这个过程没有问题——对于线程 1 ,线程 2 的修改已经丢失。
public class AbaDemo { private static AtomicStampedReference<Integer> integer=new AtomicStampedReference<Integer>(0, 0); public static void main(String[] args) throws Exception{ for(int i=0;i<100;i++) { //Thread.sleep(10); new Thread(new Runnable() { @Override public void run() { while(true) { int stamp = integer.getStamp(); Integer reference = integer.getReference(); if(integer.compareAndSet(reference, reference+1, stamp, stamp+1)) { System.out.println(reference+1); break; } } } }).start(); } } }
Lock接口
- synchronized的缺陷:
- (1)获取锁的线程如果由于某种原因,不能及时释放锁(除非发生异常),其他线程只能等待
- (2)使用同一个锁会进入同一个等待队列,所以需要唤醒所有线程
- (3)无法实现读写锁操作
- 使用lock接口可以解决该问题,下面案例演示lock锁
- 案例:使用Lock实现三个线程交替输出20遍A、B、C
/** * 多线程实现交替输出 A B C ,连续输出20遍 * */ public class Alternative { private Lock lock=new ReentrantLock(); //三个对象 Condition conditionA=lock.newCondition(); Condition conditionB=lock.newCondition(); Condition conditionC=lock.newCondition(); private int num=1;// 1 a 2 b 3 c private int count=1; public void outputA() { lock.lock();//上锁 try { if(num!=1) { try { conditionA.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("A"); num=2; conditionB.signal(); } finally { lock.unlock(); } } public void outputB() { lock.lock();//上锁 try { if(num!=2) { try { conditionB.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("B"); num=3; conditionC.signal(); } finally { lock.unlock(); } } public void outputC() { lock.lock();//上锁 try { if(num!=3) { try { conditionC.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("C"); System.out.println("------"+count+"------"); count++; num=1; conditionA.signal(); } finally { lock.unlock(); } } }
并发集合
- 之前学习了List(ArrayList、LinkedList)、Set(HashSet、TreeSet)、Map(HashMap、TreeMap)集合,这些集合只适合在
单线程情况
下使用。在Collecionts工具类
中有synchronized开头方法可以把单线程集合转成支持并发的集合
,但是效率不高,很少使用。
- 问题演示:
public class Demo3 { public static void main(String[] args) { List<String> all = new ArrayList<String>() ; for (int x = 0; x < 20; x++) { int temp = x; new Thread(() -> { for (int y = 0; y < 30; y++) { all.add(Thread.currentThread().getName() + " - " + temp + " - " + y); System.out.println(all); } }).start(); } } }
- 原因是当你保存的容量个数和你的实际操作数可能不匹配的时候就会出现此异常。
- 为了更好的实现集合的高并发访问处理,创建了一组新的集合工具类。
- List和Set集合:
- CopyOnWriteArrayList相当于线程安全的ArrayList,实现了List接口。
- CopyOnWriteArrayList是支持高并发的;
- CopyOnWriteArraySet相当于线程安全的HashSet,它继承了AbstractSet类,
- CopyOnWriteArraySet内部包含一个CopyOnWriteArrayList对象,
- 它是通过CopyOnWriteArrayList实现的。
- Map集合:
- ConcurrentHashMap是线程安全的哈希表(相当于线程安全的HashMap);
- 它继承于AbstractMap类,并且实现ConcurrentMap接口。
- ConcurrentHashMap是通过“锁分段”来实现的,它支持并发;
- ConcurrentSkipListMap是线程安全的有序的哈希表(相当于线程安全的TreeMap);
- 它继承于AbstactMap类,并且实现ConcurrentNavigableMap接口。
- ConcurrentSkipListMap是通过“跳表”来实现的,它支持并发;
- ConcurrentSkipListSet是线程安全的有序的集合(相当于线程安全的TreeSet);
- 它继承于AbstractSet,并实现了NavigableSet接口。
- ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,它也支持并发;
- Queue队列:
- ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列;
- LinkedBlockingQueue是单向链表实现的(指定大小)阻塞队列,该队列按FIFO(先进先出)排序元素;
- LinkedBlockingDeque是双向链表实现的(指定大小)双向并发阻塞队列,
- 该阻塞队列同时支持FIFO和FILO两种操作方式;
- ConcurrentLinkedQueue是单向链表实现的无界队列,该队列按FIFO(先进先出)排序元素。
- ConcurrentLinkedDeque是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。
使用CopyOnWriteArrayList实现多线程异步访问
List<String> all = new CopyOnWriteArrayList<String>() ; for (int x = 0; x < 20; x++) { int temp = x ; new Thread(()->{ for (int y = 0; y < 30; y++) { all.add(Thread.currentThread().getName() + " - " + temp + " - " + y) ; System.out.println(all); } }).start(); }
使用CopyOnWriteArraySet实现多线程异步访问
Set<String> all = new CopyOnWriteArraySet<String>() ; for (int x = 0; x < 20; x++) { int temp = x ; new Thread(()->{ for (int y = 0; y < 30; y++) { all.add(Thread.currentThread().getName() + " - " + temp + " - " + y) ; System.out.println(all); } }).start(); }
ConcurrentHashMap的使用
- ConcurrentHashMap是HashMap的多线程版本,在并发情况下使用。
基本使用
Map<String, String> all = new ConcurrentHashMap<String,String>() ; for (int x = 0; x < 20; x++) { int temp = x ; new Thread(()->{ for (int y = 0; y < 10; y++) { all.put(Thread.currentThread().getName(),"x = " + temp + "、y = " + y); System.out.println(all); } }).start(); ; }
- Map集合的主要特征是做数据的查询处理操作,所以在ConcurrentHashMap设计的时候考虑到了数据更新的安全性与数据查询的并发性。
- JDK1.7之前
- ConcurrentHashMap采用
锁分段机制
,默认并发级别为16。- 特点是写的时候同步写入,使用独占锁,读的时候为了保证性能使用了共享锁。
- JDK1.8以后
- ConcurrentHashMap写的时候采用
CAS无锁算法
进一步提高写入效率。
ArrayBlockingQueue
- ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列,可以作为线程通信同步工具类使用。
- 使用ArrayBlockingQueue实现生产者消费者
- Producer.java
public class Producer extends Thread { private ArrayBlockingQueue<String> bq; public Producer(ArrayBlockingQueue<String> bq){ this.bq=bq; } @Override public void run() { for(int i=0;i<30;i++){ try { bq.put(Thread.currentThread().getName()+"生产"+(i+1)+"号产品"); System.out.println(Thread.currentThread().getName()+"生产"+(i+1)+"号产品"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- Consumer.java
public class Consumer extends Thread { private ArrayBlockingQueue<String> bq; public Consumer(ArrayBlockingQueue<String> bq){ this.bq=bq; } @Override public void run() { for(int i=0;i<30;i++){ try { String take = bq.take(); System.out.println(Thread.currentThread().getName()+"消费了"+take); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public static void main(String[] args) { ArrayBlockingQueue<String> bq=new ArrayBlockingQueue<>(1); new Producer(bq).start(); new Consumer(bq).start(); }
同步工具类 CountDownLatch、CyclicBarrier、Semaphore
CountDownLatch类
- CountDownLatch(闭锁)是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
- 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才能继续执行: (1)确保某个计算在其需要的所有资源都被初始化后才能继续执行。 (2)确保某个服务在其依赖的所有其他服务都已经启动之后才启动。 (3)等待直到某个操作所有参与者都执行完毕其他线程才能继续执行。
- 通俗理解:等待每个线程结束时,才会执行其他程序
- 代码演示:
//计算线程执行的时间 public class Demo1 { public static void main(String[] args) { CountDownLatch countDownLatch=new CountDownLatch(5); long start =System.currentTimeMillis(); for(int i=0;i<5;i++){ new MyThread(countDownLatch).start(); } try { System.out.println("等待执行结束"); countDownLatch.await(); //System.out.println("执行结束"); } catch (InterruptedException e) { e.printStackTrace(); } long end=System.currentTimeMillis(); System.out.println("最终用时:"+(end-start)); } public static class MyThread extends Thread{ private CountDownLatch countDownLatch; public MyThread(CountDownLatch countDownLatch){ this.countDownLatch=countDownLatch; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"开始执行....."); int sum=0; for(int i=0;i<99999;i++){ sum+=i; for(int j=99999;j>0;j--){ sum-=j; } } System.out.println(Thread.currentThread().getName()+"执行完毕"+sum); countDownLatch.countDown(); } } }
CyclicBarrier类
- CyclicBarrier和CountDownLatch类似(屏障)表面意思理解为可循环使用的屏障,作用是让一组线程在到达一个屏障时被阻塞,等到最后一个线程到达屏障点,才会运行被拦截的线程继续运行。 (1)构造函数 CyclicBarrier(int parties) 屏障拦截的线程数量 (2)await() 调用该方法时表示线程已经到达屏障,随即阻塞
- 通俗理解:就是等待每个线程启动后,才可让线程运行
- 代码演示:
//5个线程同时执行 public class Demo3 { public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(5); for(int i=0;i<5;i++){ new MyThread(cyclicBarrier).start(); } } static class MyThread extends Thread{ private CyclicBarrier cyclicBarrier; public MyThread(CyclicBarrier cyclicBarrier){ this.cyclicBarrier=cyclicBarrier; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"准备好...."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"要执行了...."); } } }
- CountDownLatch和CyclicBarrier的区别
- CountDownLatch只能用一次,CyclicBarrier可以reset(),且适合处理更复杂的业务
- CyclicBarrier还有getNumberWaiting 获取当前阻塞的线程数量,isBroken()判断阻塞线程是否被中断。
Semaphore信号量
- Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
- 用来控制同时访问特定资源的线程数量,通过协调保证合理的使用公共资源。
- 比作控制车流的红绿灯,如马路要控制流量,只限制100辆车通行,其他必须在路口处等待,不能行驶在马路上,当其中有5辆离开马路,那么允许后面5辆进入马路。
- 通俗理解:就是规定一次可以通过2个线程,那么一次就只能通过2个线程)
- 代码演示:
public class Demo5 { public static void main(String[] args) { Semaphore semaphore=new Semaphore(2);//同步关键类,构造方法传入的数字是多少,则同一个时刻,只允许多少个线程同时运行指定代码 for(int i=0;i<10;i++){ new MyThread(semaphore).start(); } } static class MyThread extends Thread{ private SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); public Semaphore semaphore; public MyThread(Semaphore semaphore){ this.semaphore=semaphore; } @Override public void run() { try { semaphore.acquire(); //获取一个锁 System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getTime()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getTime()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public String getTime(){ return sf.format(new Date()); } } }