Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

1、线程和进程

进程:程序的集合。一个进程往往可以包含多个线程,至少包含一个

java默认有两个线程:一个main线程,一个gc线程

线程:线程是cpu调度和执行的单位,对于java而言:Thread、Runnable、Callable

并发编程

并发编程:并发和并行

并发(多线程操作同一个资源)

  • cpu一核,模拟出来多条线程。

并行(多个人一起行走)

  • cpu多核,多个线程可以同时执行

并发编程的本质:充分利用cpu的资源

wait和sleep的区别

  1. 来自不同的了类 wait–>Object ,sleep–>Thread
  2. wait会释放锁,sleep不会释放锁
  3. 使用的范围不同,wait必须在同步代码块,sleep不需要
  4. 是否需要捕获异常 wait不需要捕获异常 sleep必须要捕获异常

2、Lock(锁)

synchronized

ReentrantLock(可重入锁):可以设置公平锁,默认非公平锁

公平锁:十分公平,先来后到

非公平锁:十分不公平,可以插队

Lock和synchronized的区别

  1. synchronized 内置的关键字,Lock是一个java类
  2. synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁
  3. synchronized 会自动释放锁,Lock必须要手动释放锁,如果不释放锁,产生死锁
  4. synchronized 如果两个线程,第一个获得锁,第二个线程会等待,如果线程一阻塞了,线程二会一直等待,Lock锁不会
  5. synchronized 可重入锁,不可以中断的,非公平;Lock可重入锁,可以判断锁,非公平(可以自己设置)
  6. synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码

3、 生产者消费者问题synchronized版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.bestrookie;

/**
* @author : bestrookie
* @date : 21:40 2021/1/26
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data{
private int num = 0;
//+1
public synchronized void increment() throws InterruptedException {
if (num != 0){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+": 我加完了"+num);
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (num ==0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+": 我减完了"+num);
this.notifyAll();
}
}

问题存在:两个线程没有问题,出现多个线程,多条通信时出现问题,出现虚假唤醒问题(使用if等待唤醒时)。改为while 问题解决

4、JUC版的生产者消费者问题

image-20210127173149242

lock取代synchronized,新建一个对象Condition condition = lock.newCondition(),**condition.await()**取代wait(),**condition.signalAll()**取代notifyAll();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.bestrookie;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author : bestrookie
* @date : 16:40 2021/1/27
*/
public class B {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data2{
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (num != 0){
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+": 我加完了"+num);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try{
while (num ==0){
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+": 我减完了"+num);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

Condition的优势:精准的通知和唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.bestrookie.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author : bestrookie
* @date : 17:15 2021/1/27
*/
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printC();
}
},"C").start();
}
}
class Data3{
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2= lock.newCondition();
private Condition condition3 = lock.newCondition();
private int num = 1;
public void printA(){
lock.lock();
try {
while (num !=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"---->AAAA");
num = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}

}
public void printB(){
lock.lock();
try {
while (num!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"--->BBBBB");
num = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while (num != 3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"--->CCCC");
num = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

5、8锁现象

锁会锁住:对象、class

问题一:

两个线程先打印发短信还是打电话?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class dome01 {
public static void main(String[] args) {
Phone phone = new Phone();

new Thread(() -> { phone.sendMs(); }).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.call(); }).start();
}
}

class Phone {
public synchronized void sendMs() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}

输出结果为

发短信

打电话

为什么? 如果你认为是顺序在前? 这个答案是错误的!

问题2:

我们再来看:我们让发短信 延迟4s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class dome01 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();

new Thread(() -> {
try {
phone.sendMs();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.call(); }).start();
}
}

class Phone {
public synchronized void sendMs() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}

现在结果是什么呢?

结果:还是先发短信,然后再打电话!

why?

原因:并不是顺序执行,而是synchronized 锁住的对象是方法的调用!对于两个方法用的是同一个锁,谁先拿到谁先执行,另外一个等待

问题三

加一个普通方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class dome01 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();

new Thread(() -> {
try {
phone.sendMs();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone.hello(); }).start();
}
}

class Phone {
public synchronized void sendMs() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}

输出结果为

hello

发短信

原因:hello是一个普通方法,不受synchronized锁的影响,不用等待锁的释放

问题四

如果我们使用的是两个对象,一个调用发短信,一个调用打电话,那么整个顺序是怎么样的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class dome01 {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();

new Thread(() -> {
try {
phone1.sendMs();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone2.call(); }).start();
}
}

class Phone {
public synchronized void sendMs() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}

输出结果

打电话

发短信

原因:两个对象两把锁,不会出现等待的情况,发短信睡了4s,所以先执行打电话

问题五、六

如果我们把synchronized的方法加上static变成静态方法!那么顺序又是怎么样的呢?

(1)我们先来使用一个对象调用两个方法!

答案是:先发短信,后打电话

(2)如果我们使用两个对象调用两个方法!

答案是:还是先发短信,后打电话

原因是什么呢? 为什么加了static就始终前面一个对象先执行呢!为什么后面会等待呢?

原因是:对于static静态方法来说,对于整个类Class来说只有一份,对于不同的对象使用的是同一份方法,相当于这个方法是属于这个类的,如果静态static方法使用synchronized锁定,那么这个synchronized锁会锁住整个对象!不管多少个对象,对于静态的锁都只有一把锁,谁先拿到这个锁就先执行,其他的进程都需要等待!

问题七

如果我们使用一个静态同步方法、一个同步方法、一个对象调用顺序是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class dome01 {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
// Phone phone2 = new Phone();

new Thread(() -> {
try {
phone1.sendMs();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone1.call(); }).start();
}
}

class Phone {
public static synchronized void sendMs() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}

输出结果

打电话

发短信

原因:因为一个锁的是Class类的模板,一个锁的是对象的调用者。所以不存在等待,直接运行。

问题八

如果我们使用一个静态同步方法、一个同步方法、两个对象调用顺序是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class dome01 {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();

new Thread(() -> {
try {
phone1.sendMs();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> { phone2.call(); }).start();
}
}

class Phone {
public static synchronized void sendMs() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}

输出结果

打电话

发短信

原因:两把锁锁的不是同一个东西

小解

new 出来的 this 是具体的一个对象

static Class 是唯一的一个模板

6. 集合类不安全

List类不安全

我们在一般使用集合类的时候,往往在单线程的时候使用,所以一般不会出现什么安全问题,但是在并发的情况下,使用ArryList是不安全的,会触发java.util.ConcurrentModificationException 并发修改异常!

解决方案:

1
List<String> list = new Vector<>();
1
List<String> list = Collections.synchronizedList(new ArrayList<>());
1
List<String> list = new CopyOnWriteArrayList<>();

推荐使用方案3:CopyOnWriteArrayList:写入时复制! COW 计算机程序设计领域的一种优化策略

核心思想:如果有多个调用者同时要求相同的资源(Callers)(如内存或者磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源内容是,系统才会真正复制一份专用的副本(private copy)给调用者,而其他调用者所见到的最初资源仍然保持不变。这过程对其他的调用者都是透明的(transportly)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。

CopyOnWriteArrayListVector好在哪里:Vector底层使用的synchronized关键字来实现,效率低下,而CopyOnWriteArraryList使用的Lock,效率会更加高效。

Set类不安全

同理和List类一样

解决方案:

1
Set<String> set = Collections.synchronizedSet(new HashSet<>());
1
Set<String> set = CopyOnWriteArrySet<>();

hashSet的底层:就是hashMap

1
2
3
4
5
6
7
8
9
10
11
public HashSet() {
map = new HashMap<>();
}

//add 本质其实就是一个map的key,map的key是无法重复的,所以使用的就是map存储
//hashSet就是使用了hashmap key不能重复的原理
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//PRESENT是什么? 是一个常量 不会改变的常量 无用的占位
private static final Object PRESENT = new Object();

Map不安全

解决方案:

1
Map<String,String> map = new Collections.synchronizedMap<String,String>();
1
Map<String,String>map = new ConcurrentHashMap<>();

7、Callable

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同,用的时call()而不是run()

callable的创建和启动:注意Thread不能直接运行Callable,Thread只能运行Runnable,所以想要运行Callable,必须让Callbale与Runnable有关系,所以需要FutureTask做一个适配类,然后将FutureTask丢到Thread里面运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.bestrookie.pc;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* @author : bestrookie
* @date : 21:40 2021/1/27
*/
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();;
FutureTask futureTask = new FutureTask(myThread);
new Thread(futureTask).start();
Integer result = (Integer)futureTask.get();//get方法可能产生堵塞
System.out.println(result);
}
}
class MyThread implements Callable<Integer>{

@Override
public Integer call() throws Exception {
System.out.println("callable()");
return 123;
}
}

注意:callable存在缓存,如果结果耗时比较长,可能会堵塞

8、常用的辅助类

1. CountDownLatch

减法计数器

原理:

1
countDownLatch.countDown();//数量减一
1
countDownLatch.await();//等待计数器归零,然后向下执行

用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.bestrookie.add;

import java.util.concurrent.CountDownLatch;

/**
* @author : bestrookie
* @date : 15:46 2021/1/28
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);//设置总数
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" go out");
countDownLatch.countDown();//数量减一
},String.valueOf(i)).start();
}
countDownLatch.await();//减到零停止等待
System.out.println("close door");
}
}

2. CyclicBarrier

加法计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.bestrookie.add;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author : bestrookie
* @date : 16:25 2021/1/28
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐七颗龙珠召唤神龙
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("神龙出世");
});
for (int i = 1; i < 8; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了第"+temp+"颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

3. Semaphore(信号量)

抢车位案例:这个工具类一般用于限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.bestrookie.add;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* @author : bestrookie
* @date : 17:27 2021/1/28
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量:停车位,限流的时候会用
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i < 7; i++) {
new Thread(()->{
try {
semaphore.acquire();//得到
System.out.println(Thread.currentThread().getName()+"抢到了车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//释放
}
}).start();
}
}
}

原理:

semaphore.acquire():获得,假设已经满了,等待,等待被释放为止

semaphore.release():释放,会将当前的信号量释放+1,然后唤醒等待的线程

作用:多个共享资源互斥的使用,并发限流,控制最大的线程数。

9、读写锁

读-读:可以共存

写-写:不能共存

读-写:不能共存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.bestrookie.add;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* @author : bestrookie
* @date : 17:52 2021/1/28
*/
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCacheLock myCache = new MyCacheLock();
//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"正在写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完成");
}
public void get(String key){
System.out.println(Thread.currentThread().getName()+"正在读取");
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完成");
}
}
class MyCacheLock{
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key,Object value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.writeLock().unlock();
}
}
public void get(String key){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在读取");
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
}
}

10、阻塞队列

什么情况下使用阻塞队列:多线程并发处理,线程池

学会使用队列

添加、删除

四组API

方式 抛出异常 不抛出异常,有返回值 阻塞等待 超时等待
添加 add() offer() put() offer(, , )
移除 remove() poll() take() poll(,)
检测队首元素 element() peek()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.bestrookie.bq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : bestrookie
* @date : 19:29 2021/1/28
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
test4();
}

/**
* 抛出异常
*/
public static void test1(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
System.out.println("=====================================");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());

}

/**
* 不抛出异常
*/
public static void test2(){
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println("=====================================");
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
}

/**
* 阻塞等待
* @throws InterruptedException
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
System.out.println("=====================================");
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
}
/**
* 超时等待
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("c",2,TimeUnit.SECONDS));
System.out.println("=====================================");
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
}
}

SynchronousQueue同步队列

同步队列和其他的BlockQueue不一样,SynchronousQueue不存储元素

put了一个元素,必须从里面先take取出来,否则不能在put进去值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.bestrookie.bq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : bestrookie
* @date : 20:02 2021/1/28
*/
public class Test2 {
public static void main(String[] args) {
SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"---->"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();

}
}

11、线程池

池化技术:事先准备好一些资源,有人要用就来池子里拿,用完之后还给池子。

线程池的好处:

  1. 降低资源的消耗
  2. 提高响应的速度
  3. 方便管理

线程复用、可以控制最大并发数、管理线程

线程池三大方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.bestrookie.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author : bestrookie
* @date : 20:28 2021/1/28
*/
public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池大小
ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的

try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" OK");
});
}
} finally {
threadPool.shutdown();
}
}
}

线程池的七大参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必须大于 0,且必须大于 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程

第 7 个参数:RejectedExecutionHandler 表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。

手写线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.bestrookie.pool;


import java.util.concurrent.*;

/**自定义线程池
* @author : bestrookie
* @date : 20:28 2021/1/28
*/
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" OK");
});
}
} finally {
threadPool.shutdown();
}
}
}

四种拒绝策略

AbortPolicy,终止策略,线程池会抛出异常并终止执行,它是默认的拒绝策略;
CallerRunsPolicy,把任务交给当前线程来执行;
DiscardPolicy,忽略此任务(最新的任务);
DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)。

最大线程应该怎么定义

  • CPU密集型:几核的就是几,可以保持CUP的最大效率
  • IO密集型:判断你的程序中十分耗IO的线程
1
Runtime.getRuntime().availableProcessors();获取cpu核数

12、四大函数式接口

函数式接口:只有一个方法的接口

Function函数式接口

Function函数式接口,有一个输入参数,一个输出

只要是函数式接口都可用lambda表达式简化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.bestrookie.function;

import java.util.function.Function;

/**
* @author : bestrookie
* @date : 21:30 2021/1/28
*/
public class Demo01 {
public static void main(String[] args) {
// Function<String, String> function = new Function<String, String>() {
// @Override
// public String apply(String s) {
// return s;
// }
// };
Function<String, String> function = (str)->{return str;};
System.out.println(function.apply("aaa"));
}
}

Predicate(断定型接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.bestrookie.function;

import java.util.function.Predicate;

/**
* @author : bestrookie
* @date : 21:37 2021/1/28
*/
public class Demo02 {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<String>() {
// @Override
// public boolean test(String s) {
// return s.isEmpty();
// }
// };
Predicate<String> predicate = (str)->{return str.isEmpty();};
System.out.println(predicate.test(""));
}
}

Cunsumer(消费型接口)

只有输入,没有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.bestrookie.function;

import java.util.function.Consumer;

/**
* @author : bestrookie
* @date : 21:42 2021/1/28
*/
public class Demo03 {
public static void main(String[] args) {
// Consumer<String> consumer =new Consumer<String>() {
// @Override
// public void accept(String s) {
// System.out.println(s);
// }
// };
Consumer<String> consumer = (s)->{
System.out.println(s);
};
consumer.accept("a");
}
}

Supplier(供给型接口)

没有参数只有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.bestrookie.function;

import java.util.function.Supplier;

/**
* @author : bestrookie
* @date : 21:46 2021/1/28
*/
public class Demo04 {
public static void main(String[] args) {
// Supplier supplier = new Supplier() {
// @Override
// public Object get() {
// return 1024;
// }
// };
Supplier supplier = ()->{return 1024;};
System.out.println(supplier.get());
}
}

13、Stream流式计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.bestrookie.stream;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

/**
* @author : bestrookie
* @date : 21:57 2021/1/28
* 题目要求:一分钟完成此题,只能用一行代码
* 1、ID必须必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字符
* 4、用户名字字母倒叙
* 5、只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
list.stream()
.filter((u)->{return u.getId() %2 ==0;})
.filter((u)->{return u.getAge() > 23;})
.map((u)->{return u.getName().toUpperCase(Locale.ROOT);})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);

}
}

14、ForkJoin

什么事ForkJoin在JDK1.7里面,并行执行任务,提高效率

image-20210129111817425

ForkJoin特点:工作窃取

这个里面维护的双端队列

image-20210129112011865

大数据量计算例子(三种方式)

forkjoin创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.bestrookie.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
* @author : bestrookie
* @date : 11:23 2021/1/29
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;

public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start < temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else {
long mid = (start + end) / 2;
ForkJoinDemo task1 = new ForkJoinDemo(start,mid);
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(mid+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.bestrookie.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
* @author : bestrookie
* @date : 11:38 2021/1/29
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}

/**
* 暴力循环计算
*/
public static void test1(){
Long sum =0L;
long startTime = System.currentTimeMillis();
for (Long i = 1L; i <=10_0000_0000 ; i++) {
sum += i;
}
long endTime = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(endTime-startTime));
}

/**
* ForkJoin计算
* @throws ExecutionException
* @throws InterruptedException
*/
public static void test2() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long endTime = System.currentTimeMillis();
System.out.println("sum="+sum+"时间:"+(endTime-startTime));
}

/**
*并行流计算
*/
public static void test3(){
long startTime = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long endTime = System.currentTimeMillis();
System.out.println("sum="+sum+"时间:"+(endTime-startTime));
}
}

15、异步回调

Futuer设计的初衷:对将来的某个事件进行建模。

image-20210129214403388

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.bestrookie.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @author : bestrookie
* @date : 18:21 2021/1/29
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//不带回调函数
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName()+"runAsync");
// });
// System.out.println("1111111");
// completableFuture.get();
//带回调函数的
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync");
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t);//正常的返回结果
System.out.println("t=>" + u);//错误的信息
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get());
}
}

16、JMM

请你谈谈对Volatile的理解

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

什么是JMM

JMM:java内存模型,不存在的东西,是一个概念

关于JMM的一些同步的约定:

  1. 线程解锁前,必须把共享变量立刻刷回主存
  2. 线程加锁前,必须读取主存中最新的值到工作内存中
  3. 加锁和解锁必须是同一把锁

线程:工作内存、主内存

8种操作

  • Read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
  • load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中;
  • Use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令;
  • assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中;
  • store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用;
  • write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中;
  • lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态;
  • unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;

JMM对这8种操作给了相应的规定

  • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  • 不允许一个线程将没有assign的数据从工作内存同步回主内存
  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
  • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

image-20210130102929744

image-20210130103205327

问题:程序不知道主内存的值已经被修改过了

17、Volatile

  1. 保证可见性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.bestrookie.tvolatile;

import java.util.concurrent.TimeUnit;

/**
* @author : bestrookie
* @date : 14:22 2021/1/30
*/
public class Demo {
private volatile static int num = 0;//如果不加vlolatile 程序会一直循环
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (num==0){

}
}).start();
TimeUnit.SECONDS.sleep(2);
num++;
System.out.println(num);
}
}
  1. 不保证原子性

    原子性:不可分割

    线程A在执行任务的时候,不能被打扰,也不能被分割,要么同时成功,要么同时失败。

如果不加lock或者synchronized怎样保证原子性。

使用原子类解决原子性问题

  1. 禁止指令重排

指令重排:你写的程序,计算机并不是按照你写的顺序去执行。

怎样避免呢:

内存屏障。cpu指令,作用:

  • 保证特定的操作的执行顺序
  • 可以保证某些变量的内存可见性

18、单例模式

饿汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.bestrookie.single;

/**饿汉式单例
* @author : bestrookie
* @date : 14:58 2021/1/30
*/
public class Hungry {
private Hungry(){

}
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}

DCL懒汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.bestrookie.single;

/**懒汉式单例
* @author : bestrookie
* @date : 16:28 2021/1/30
*/
public class LazyMan {
private LazyMan(){

}
private volatile static LazyMan lazyMan;
public static LazyMan getInstance(){
//双重检测锁模式的懒汉式单例 DCL懒汉式
if (lazyMan ==null){
synchronized (LazyMan.class){
if (lazyMan == null){
lazyMan = new LazyMan();
}
}
}
return lazyMan;
}
}

静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.bestrookie.single;

/**
* @author : bestrookie
* @date : 17:11 2021/1/30
*/
public class Holder {
private Holder(){

}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}

单例不安全:反射可以破坏

可以使用枚举

19、深入理解CAS

什么是CAS呢?Compare-and-Swap,即比较并替换,也有叫做Compare-and-Set的,比较并设置

1、比较:读取到了一个值A,在将其更新为B之前,检查原值是否仍为A(未被其他线程改动)。

2、设置:如果是,将A更新为B,结束。如果不是,则什么都不做。

缺点:

  • 底层是自旋锁,循环会耗时
  • 一次性只能保证一个共享变量的原子性
  • ABA问题

20、原子引用

解决ABA问题,对应的思想:就是使用了乐观锁~

带版本号的 原子操作!

Integer 使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.marchsoft.lockdemo;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;


public class CASDemo {
/**AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
* 正常在业务操作,这里面比较的都是一个个对象
*/
static AtomicStampedReference<Integer> atomicStampedReference = new
AtomicStampedReference<>(1, 1);

// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>" + stamp);

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改操作时,版本号更新 + 1
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);

System.out.println("a2=>" + atomicStampedReference.getStamp());
// 重新把值改回去, 版本号更新 + 1
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("a3=>" + atomicStampedReference.getStamp());
}, "a").start();

// 乐观锁的原理相同!
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 3,
stamp, stamp + 1));
System.out.println("b2=>" + atomicStampedReference.getStamp());
}, "b").start();
}
}

评论

parallax