1、线程和进程
进程:程序的集合。一个进程往往可以包含多个线程,至少包含一个
java默认有两个线程:一个main线程,一个gc线程
线程:线程是cpu调度和执行的单位,对于java而言:Thread、Runnable、Callable
并发编程
并发编程:并发和并行
并发(多线程操作同一个资源)
并行(多个人一起行走)
并发编程的本质:充分利用cpu的资源
wait和sleep的区别
- 来自不同的了类 wait–>Object ,sleep–>Thread
- wait会释放锁,sleep不会释放锁
- 使用的范围不同,wait必须在同步代码块,sleep不需要
- 是否需要捕获异常 wait不需要捕获异常 sleep必须要捕获异常
2、Lock(锁)
synchronized
ReentrantLock(可重入锁):可以设置公平锁,默认非公平锁
公平锁:十分公平,先来后到
非公平锁:十分不公平,可以插队
Lock和synchronized的区别
- synchronized 内置的关键字,Lock是一个java类
- synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁
- synchronized 会自动释放锁,Lock必须要手动释放锁,如果不释放锁,产生死锁
- synchronized 如果两个线程,第一个获得锁,第二个线程会等待,如果线程一阻塞了,线程二会一直等待,Lock锁不会
- synchronized 可重入锁,不可以中断的,非公平;Lock可重入锁,可以判断锁,非公平(可以自己设置)
- synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码
3、 生产者消费者问题synchronized版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.bestrookie;
public class A { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } class Data{ private int num = 0; public synchronized void increment() throws InterruptedException { if (num != 0){ this.wait(); } num++; System.out.println(Thread.currentThread().getName()+": 我加完了"+num); this.notifyAll(); } public synchronized void decrement() throws InterruptedException { if (num ==0){ this.wait(); } num--; System.out.println(Thread.currentThread().getName()+": 我减完了"+num); this.notifyAll(); } }
|
问题存在:两个线程没有问题,出现多个线程,多条通信时出现问题,出现虚假唤醒问题(使用if等待唤醒时)。改为while 问题解决
4、JUC版的生产者消费者问题
lock取代synchronized,新建一个对象Condition condition = lock.newCondition(),**condition.await()**取代wait(),**condition.signalAll()**取代notifyAll();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| package com.bestrookie;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class B { public static void main(String[] args) { Data2 data2 = new Data2(); new Thread(()->{ for (int i = 0; i < 20; i++) { try { data2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 20; i++) { try { data2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(()->{ for (int i = 0; i < 20; i++) { try { data2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for (int i = 0; i < 20; i++) { try { data2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } class Data2{ private int num = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void increment() throws InterruptedException { lock.lock(); try { while (num != 0){ condition.await(); } num++; System.out.println(Thread.currentThread().getName()+": 我加完了"+num); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try{ while (num ==0){ condition.await(); } num--; System.out.println(Thread.currentThread().getName()+": 我减完了"+num); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
|
Condition的优势:精准的通知和唤醒线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| package com.bestrookie.pc;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class C { public static void main(String[] args) { Data3 data3 = new Data3(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printA(); } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printB(); } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printC(); } },"C").start(); } } class Data3{ private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2= lock.newCondition(); private Condition condition3 = lock.newCondition(); private int num = 1; public void printA(){ lock.lock(); try { while (num !=1){ condition1.await(); } System.out.println(Thread.currentThread().getName()+"---->AAAA"); num = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); }
} public void printB(){ lock.lock(); try { while (num!=2){ condition2.await(); } System.out.println(Thread.currentThread().getName()+"--->BBBBB"); num = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void printC(){ lock.lock(); try { while (num != 3){ condition3.await(); } System.out.println(Thread.currentThread().getName()+"--->CCCC"); num = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
|
5、8锁现象
锁会锁住:对象、class
问题一:
两个线程先打印发短信还是打电话?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class dome01 { public static void main(String[] args) { Phone phone = new Phone();
new Thread(() -> { phone.sendMs(); }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.call(); }).start(); } }
class Phone { public synchronized void sendMs() { System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } }
|
输出结果为
发短信
打电话
为什么? 如果你认为是顺序在前? 这个答案是错误的!
问题2:
我们再来看:我们让发短信 延迟4s
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone();
new Thread(() -> { try { phone.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.call(); }).start(); } }
class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } }
|
现在结果是什么呢?
结果:还是先发短信,然后再打电话!
why?
原因:并不是顺序执行,而是synchronized 锁住的对象是方法的调用!对于两个方法用的是同一个锁,谁先拿到谁先执行,另外一个等待
问题三
加一个普通方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone();
new Thread(() -> { try { phone.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.hello(); }).start(); } }
class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
|
输出结果为
hello
发短信
原因:hello是一个普通方法,不受synchronized锁的影响,不用等待锁的释放
问题四
如果我们使用的是两个对象,一个调用发短信,一个调用打电话,那么整个顺序是怎么样的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); Phone phone2 = new Phone();
new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone2.call(); }).start(); } }
class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
|
输出结果
打电话
发短信
原因:两个对象两把锁,不会出现等待的情况,发短信睡了4s,所以先执行打电话
问题五、六
如果我们把synchronized的方法加上static变成静态方法!那么顺序又是怎么样的呢?
(1)我们先来使用一个对象调用两个方法!
答案是:先发短信,后打电话
(2)如果我们使用两个对象调用两个方法!
答案是:还是先发短信,后打电话
原因是什么呢? 为什么加了static就始终前面一个对象先执行呢!为什么后面会等待呢?
原因是:对于static静态方法来说,对于整个类Class来说只有一份,对于不同的对象使用的是同一份方法,相当于这个方法是属于这个类的,如果静态static方法使用synchronized锁定,那么这个synchronized锁会锁住整个对象!不管多少个对象,对于静态的锁都只有一把锁,谁先拿到这个锁就先执行,其他的进程都需要等待!
问题七
如果我们使用一个静态同步方法、一个同步方法、一个对象调用顺序是什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone();
new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone1.call(); }).start(); } }
class Phone { public static synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
|
输出结果
打电话
发短信
原因:因为一个锁的是Class类的模板,一个锁的是对象的调用者。所以不存在等待,直接运行。
问题八
如果我们使用一个静态同步方法、一个同步方法、两个对象调用顺序是什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); Phone phone2 = new Phone();
new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone2.call(); }).start(); } }
class Phone { public static synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
|
输出结果
打电话
发短信
原因:两把锁锁的不是同一个东西
小解
new 出来的 this 是具体的一个对象
static Class 是唯一的一个模板
6. 集合类不安全
List类不安全
我们在一般使用集合类的时候,往往在单线程的时候使用,所以一般不会出现什么安全问题,但是在并发的情况下,使用ArryList是不安全的,会触发java.util.ConcurrentModificationException 并发修改异常!
解决方案:
1
| List<String> list = new Vector<>();
|
1
| List<String> list = Collections.synchronizedList(new ArrayList<>());
|
1
| List<String> list = new CopyOnWriteArrayList<>();
|
推荐使用方案3:CopyOnWriteArrayList:写入时复制! COW 计算机程序设计领域的一种优化策略
核心思想:如果有多个调用者同时要求相同的资源(Callers)(如内存或者磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源内容是,系统才会真正复制一份专用的副本(private copy)给调用者,而其他调用者所见到的最初资源仍然保持不变。这过程对其他的调用者都是透明的(transportly)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。
CopyOnWriteArrayList比Vector好在哪里:Vector底层使用的synchronized关键字来实现,效率低下,而CopyOnWriteArraryList使用的Lock,效率会更加高效。
Set类不安全
同理和List类一样
解决方案:
1
| Set<String> set = Collections.synchronizedSet(new HashSet<>());
|
1
| Set<String> set = CopyOnWriteArrySet<>();
|
hashSet的底层:就是hashMap
1 2 3 4 5 6 7 8 9 10 11
| public HashSet() { map = new HashMap<>(); }
public boolean add(E e) { return map.put(e, PRESENT)==null; }
private static final Object PRESENT = new Object();
|
Map不安全
解决方案:
1
| Map<String,String> map = new Collections.synchronizedMap<String,String>();
|
1
| Map<String,String>map = new ConcurrentHashMap<>();
|
7、Callable
- 可以有返回值
- 可以抛出异常
- 方法不同,用的时call()而不是run()
callable的创建和启动:注意Thread不能直接运行Callable,Thread只能运行Runnable,所以想要运行Callable,必须让Callbale与Runnable有关系,所以需要FutureTask做一个适配类,然后将FutureTask丢到Thread里面运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.bestrookie.pc;
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;
public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { MyThread myThread = new MyThread();; FutureTask futureTask = new FutureTask(myThread); new Thread(futureTask).start(); Integer result = (Integer)futureTask.get(); System.out.println(result); } } class MyThread implements Callable<Integer>{
@Override public Integer call() throws Exception { System.out.println("callable()"); return 123; } }
|
注意:callable存在缓存,如果结果耗时比较长,可能会堵塞
8、常用的辅助类
1. CountDownLatch
减法计数器
原理:
1
| countDownLatch.countDown();
|
用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.bestrookie.add;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" go out"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("close door"); } }
|
2. CyclicBarrier
加法计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.bestrookie.add;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo { public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("神龙出世"); }); for (int i = 1; i < 8; i++) { final int temp = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集了第"+temp+"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
|
3. Semaphore(信号量)
抢车位案例:这个工具类一般用于限流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.bestrookie.add;
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i < 7; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到了车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }).start(); } } }
|
原理:
semaphore.acquire():获得,假设已经满了,等待,等待被释放为止
semaphore.release():释放,会将当前的信号量释放+1,然后唤醒等待的线程
作用:多个共享资源互斥的使用,并发限流,控制最大的线程数。
9、读写锁
读-读:可以共存
写-写:不能共存
读-写:不能共存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.bestrookie.add;
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo { public static void main(String[] args) throws InterruptedException { MyCacheLock myCache = new MyCacheLock(); for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(()->{ myCache.put(temp+"",temp+""); },String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(()->{ myCache.get(temp+""); },String.valueOf(i)).start(); } } } class MyCache{ private volatile Map<String, Object> map = new HashMap<>(); public void put(String key,Object value){ System.out.println(Thread.currentThread().getName()+"正在写入"+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入完成"); } public void get(String key){ System.out.println(Thread.currentThread().getName()+"正在读取"); Object o = map.get(key); System.out.println(Thread.currentThread().getName()+"读取完成"); } } class MyCacheLock{ private volatile Map<String, Object> map = new HashMap<>(); private ReadWriteLock lock = new ReentrantReadWriteLock(); public void put(String key,Object value){ lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"正在写入"+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入完成"); } catch (Exception e) { e.printStackTrace(); }finally { lock.writeLock().unlock(); } } public void get(String key){ lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"正在读取"); Object o = map.get(key); System.out.println(Thread.currentThread().getName()+"读取完成"); } catch (Exception e) { e.printStackTrace(); }finally { lock.readLock().unlock(); } } }
|
10、阻塞队列
什么情况下使用阻塞队列:多线程并发处理,线程池
学会使用队列
添加、删除
四组API
方式 |
抛出异常 |
不抛出异常,有返回值 |
阻塞等待 |
超时等待 |
添加 |
add() |
offer() |
put() |
offer(, , ) |
移除 |
remove() |
poll() |
take() |
poll(,) |
检测队首元素 |
element() |
peek() |
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package com.bestrookie.bq;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;
public class Test { public static void main(String[] args) throws InterruptedException { test4(); }
public static void test1(){ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.element()); System.out.println("====================================="); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove());
}
public static void test2(){ ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); System.out.println(arrayBlockingQueue.offer("a")); System.out.println(arrayBlockingQueue.offer("b")); System.out.println(arrayBlockingQueue.offer("c")); System.out.println("====================================="); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); }
public static void test3() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); arrayBlockingQueue.put("a"); arrayBlockingQueue.put("b"); arrayBlockingQueue.put("c"); System.out.println("====================================="); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); }
public static void test4() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3); System.out.println(arrayBlockingQueue.offer("a")); System.out.println(arrayBlockingQueue.offer("b")); System.out.println(arrayBlockingQueue.offer("c")); System.out.println(arrayBlockingQueue.offer("c",2,TimeUnit.SECONDS)); System.out.println("====================================="); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS)); } }
|
SynchronousQueue同步队列
同步队列和其他的BlockQueue不一样,SynchronousQueue不存储元素
put了一个元素,必须从里面先take取出来,否则不能在put进去值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.bestrookie.bq;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit;
public class Test2 { public static void main(String[] args) { SynchronousQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+"put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+"put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start();
} }
|
11、线程池
池化技术:事先准备好一些资源,有人要用就来池子里拿,用完之后还给池子。
线程池的好处:
- 降低资源的消耗
- 提高响应的速度
- 方便管理
线程复用、可以控制最大并发数、管理线程
线程池三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.bestrookie.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool();
try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" OK"); }); } } finally { threadPool.shutdown(); } } }
|
线程池的七大参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。
第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。
第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。
第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。
第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。
第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程
第 7 个参数:RejectedExecutionHandler 表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。
手写线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.bestrookie.pool;
import java.util.concurrent.*;
public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" OK"); }); } } finally { threadPool.shutdown(); } } }
|
四种拒绝策略
AbortPolicy,终止策略,线程池会抛出异常并终止执行,它是默认的拒绝策略;
CallerRunsPolicy,把任务交给当前线程来执行;
DiscardPolicy,忽略此任务(最新的任务);
DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)。
最大线程应该怎么定义
- CPU密集型:几核的就是几,可以保持CUP的最大效率
- IO密集型:判断你的程序中十分耗IO的线程
1
| Runtime.getRuntime().availableProcessors();获取cpu核数
|
12、四大函数式接口
函数式接口:只有一个方法的接口
Function函数式接口
Function函数式接口,有一个输入参数,一个输出
只要是函数式接口都可用lambda表达式简化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.bestrookie.function;
import java.util.function.Function;
public class Demo01 { public static void main(String[] args) {
Function<String, String> function = (str)->{return str;}; System.out.println(function.apply("aaa")); } }
|
Predicate(断定型接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.bestrookie.function;
import java.util.function.Predicate;
public class Demo02 { public static void main(String[] args) {
Predicate<String> predicate = (str)->{return str.isEmpty();}; System.out.println(predicate.test("")); } }
|
Cunsumer(消费型接口)
只有输入,没有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.bestrookie.function;
import java.util.function.Consumer;
public class Demo03 { public static void main(String[] args) {
Consumer<String> consumer = (s)->{ System.out.println(s); }; consumer.accept("a"); } }
|
Supplier(供给型接口)
没有参数只有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.bestrookie.function;
import java.util.function.Supplier;
public class Demo04 { public static void main(String[] args) {
Supplier supplier = ()->{return 1024;}; System.out.println(supplier.get()); } }
|
13、Stream流式计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.bestrookie.stream;
import java.util.Arrays; import java.util.List; import java.util.Locale;
public class Test { public static void main(String[] args) { User u1 = new User(1,"a",21); User u2 = new User(2,"b",22); User u3 = new User(3,"c",23); User u4 = new User(4,"d",24); User u5 = new User(6,"e",25); List<User> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream() .filter((u)->{return u.getId() %2 ==0;}) .filter((u)->{return u.getAge() > 23;}) .map((u)->{return u.getName().toUpperCase(Locale.ROOT);}) .sorted((uu1,uu2)->{return uu2.compareTo(uu1);}) .limit(1) .forEach(System.out::println);
} }
|
14、ForkJoin
什么事ForkJoin在JDK1.7里面,并行执行任务,提高效率
ForkJoin特点:工作窃取
这个里面维护的双端队列
大数据量计算例子(三种方式)
forkjoin创建方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.bestrookie.forkjoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end; private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { if (end - start < temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; }else { long mid = (start + end) / 2; ForkJoinDemo task1 = new ForkJoinDemo(start,mid); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(mid+1,end); task2.fork(); return task1.join()+task2.join(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.bestrookie.forkjoin;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream;
public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { test1(); test2(); test3(); }
public static void test1(){ Long sum =0L; long startTime = System.currentTimeMillis(); for (Long i = 1L; i <=10_0000_0000 ; i++) { sum += i; } long endTime = System.currentTimeMillis(); System.out.println("sum="+sum+" 时间:"+(endTime-startTime)); }
public static void test2() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long endTime = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(endTime-startTime)); }
public static void test3(){ long startTime = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long endTime = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(endTime-startTime)); } }
|
15、异步回调
Futuer设计的初衷:对将来的某个事件进行建模。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.bestrookie.future;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync"); return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t); System.out.println("t=>" + u); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233; }).get()); } }
|
16、JMM
请你谈谈对Volatile的理解
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM
JMM:java内存模型,不存在的东西,是一个概念
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存
- 线程加锁前,必须读取主存中最新的值到工作内存中
- 加锁和解锁必须是同一把锁
线程:工作内存、主内存
8种操作
- Read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
- load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中;
- Use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令;
- assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中;
- store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用;
- write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中;
- lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态;
- unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
JMM对这8种操作给了相应的规定:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
问题:程序不知道主内存的值已经被修改过了
17、Volatile
- 保证可见性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.bestrookie.tvolatile;
import java.util.concurrent.TimeUnit;
public class Demo { private volatile static int num = 0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (num==0){
} }).start(); TimeUnit.SECONDS.sleep(2); num++; System.out.println(num); } }
|
不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰,也不能被分割,要么同时成功,要么同时失败。
如果不加lock或者synchronized怎样保证原子性。
使用原子类解决原子性问题
- 禁止指令重排
指令重排:你写的程序,计算机并不是按照你写的顺序去执行。
怎样避免呢:
内存屏障。cpu指令,作用:
- 保证特定的操作的执行顺序
- 可以保证某些变量的内存可见性
18、单例模式
饿汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.bestrookie.single;
public class Hungry { private Hungry(){
} private final static Hungry HUNGRY = new Hungry(); public static Hungry getInstance(){ return HUNGRY; } }
|
DCL懒汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.bestrookie.single;
public class LazyMan { private LazyMan(){
} private volatile static LazyMan lazyMan; public static LazyMan getInstance(){ if (lazyMan ==null){ synchronized (LazyMan.class){ if (lazyMan == null){ lazyMan = new LazyMan(); } } } return lazyMan; } }
|
静态内部类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.bestrookie.single;
public class Holder { private Holder(){
} public static Holder getInstance(){ return InnerClass.HOLDER; } public static class InnerClass{ private static final Holder HOLDER = new Holder(); } }
|
单例不安全:反射可以破坏
可以使用枚举
19、深入理解CAS
什么是CAS呢?Compare-and-Swap,即比较并替换,也有叫做Compare-and-Set的,比较并设置。
1、比较:读取到了一个值A,在将其更新为B之前,检查原值是否仍为A(未被其他线程改动)。
2、设置:如果是,将A更新为B,结束。如果不是,则什么都不做。
缺点:
- 底层是自旋锁,循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA问题
20、原子引用
解决ABA问题,对应的思想:就是使用了乐观锁~
带版本号的 原子操作!
Integer 使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.marchsoft.lockdemo;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
public static void main(String[] args) { new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println("a1=>" + stamp); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1); System.out.println("a2=>" + atomicStampedReference.getStamp()); System.out.println(atomicStampedReference.compareAndSet(2, 1, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a3=>" + atomicStampedReference.getStamp()); }, "a").start(); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println("b1=>" + stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(1, 3, stamp, stamp + 1)); System.out.println("b2=>" + atomicStampedReference.getStamp()); }, "b").start(); } }
|