0%

JUC并发编程

JUC并发编程

线程和进程

进程:一个程序 一个进程可以包含多个线程 至少包含一个

java默认是有两个线程的:一个是main线程 一个是GC线程(做垃圾回收)

线程:开了一个IDEA进程,写完代码自动保存(线程负责的)

对于java而言:Thread、Runable、Callable

java真的可以开启线程吗?

java是没有权限去开启一个线程的,只能通过本地方法去调用底层的C++方法,java无法直接操作硬件,因为它是运行在虚拟机上的

并发和并行

并发(多个线程操作同一个资源)

  • CPU一核,模拟出来多条线程、天下武功 唯快不破,快速交替

并行 (多个人一起行走)

  • CPU多核,多个线程可以同时执行;线程池
1
2
3
4
//获取CPU的核数
//CPU密集型 IO密集型
//并发编程的
System.out.println(Runtime.getRuntime().availableProcessors());

并发编程的本质:充分利用CPU的资源

线程有几个状态

1
2
3
4
5
6
7
8
public enum State {
NEW,//新生
RUNNABLE,//运行
BLOCKED,//阻塞
WAITING,//等待 死等
TIMED_WAITING,//超时等待 有期限
TERMINATED;//终止
}

wait和sleep的区别

  • 来自不同的类

    wait=>Object

    sleep=>Thread

  • 关于锁的释放

    wait会释放锁

    sleep不会释放锁

  • 使用的范围是不同的

    wait必须在同步代码块中睡

    sleep可以在任何地方睡

  • 是否需要捕获异常

    wait需要捕获异常(中断异常)

    sleep必须捕获异常

Lock锁(重点)

传统Synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.fanchen.demo1;

//基本的卖票例子

/**
* 真正的多线程开发,公司中的开发 降低耦合
* 线程就是一个单独的资源类,没有任何附属的操作
* 1.属性、方法
*/
public class SaleTicketDemo01 {
public static void main(String[] args) {
//多个线程操作同一个资源类 把资源类丢入线程
//@FunctionalInterface函数式接口,Lambda表达式 (参数)->{代码}
Ticket ticket = new Ticket();
new Thread(()->{
for (int i = 0; i < 60; i++) {
ticket.saleTicket();
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 60; i++) {
ticket.saleTicket();
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 60; i++) {
ticket.saleTicket();
}
}, "C").start();
}
}

//资源类 OOP
class Ticket{
//属性、方法
private int number = 50;
//卖票
public synchronized void saleTicket(){
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余:" + number);
}
}
//锁主要锁 对象 还有 Class
}

Lock接口

1
2
3
4
public ReentrantLock(boolean fair) {
//FairSync公平锁 NonfairSync不公平锁
sync = fair ? new FairSync() : new NonfairSync();
}

公平锁:十分公平 先来后到

非公平锁:十分不公平 可以插队(默认是非公平锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.fanchen.demo1;

//基本的卖票例子

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 真正的多线程开发,公司中的开发 降低耦合
* 线程就是一个单独的资源类,没有任何附属的操作
* 1.属性、方法
*/
public class SaleTicketDemo02 {
public static void main(String[] args) {
Ticket02 ticket = new Ticket02();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.saleTicket(); }, "A").start();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.saleTicket(); }, "B").start();
new Thread(()->{ for (int i = 0; i < 40; i++) ticket.saleTicket(); }, "C").start();
}
}

//Lock锁
class Ticket02 {

private int number = 30;

Lock lock = new ReentrantLock();

public synchronized void saleTicket() {
lock.lock();//加锁
try {
//业务代码
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余:" + number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}

}
}

Synchronized和Lock锁区别

  • Synchronized 是内置的java关键字 而 Lock是一个java类
  • Synchronized 无法判断获取锁的状态,Lock锁是可以判断是否获取到锁
  • Synchronized 会自动释放锁,Lock锁必须要手动释放,如果不释放锁会造成死锁
  • Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去
  • Synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)
  • Synchronized 适合锁少量的代码同步问题;Lock锁适合大量的同步代码

锁是什么,如何判断锁的是谁

生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.fanchen.demo1.product;

/**
* 线程之间的通信问题:生产者消费者问题 通知唤醒 等待唤醒
* 线程交替执行A B操作同一个变量
*/
public class A {

public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}

}

class Data{//资源类
private int number = 0;

public synchronized void increment() throws InterruptedException {
if (number!=0){
//等待
this.wait();
}
number++;
//通知其他线程我加完了
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "=>" + number);
}

public synchronized void decrement() throws InterruptedException {
if (number == 0){
//等待
this.wait();
}
number--;
//通知其他线程我减完了
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "=>" + number);
}
}

问题存在,ABCD四个线程安全吗

多个线程用while可以避免虚假唤醒

JUC版本的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.fanchen.demo1.product;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 线程之间的通信问题:生产者消费者问题 通知唤醒 等待唤醒
* 线程交替执行A B操作同一个变量
*/
public class B {

public static void main(String[] args) {
Data02 data = new Data02();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}

}

class Data02{//资源类
private int number = 0;

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void increment() throws InterruptedException {
lock.lock();
try {
while (number!=0){
//等待
condition.await();
}
number++;
//通知其他线程我加完了
condition.signalAll();
System.out.println(Thread.currentThread().getName() + "=>" + number);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public void decrement() throws InterruptedException {
lock.lock();
try{
while (number == 0){
//等待
condition.await();
}
number--;
//通知其他线程我减完了
condition.signalAll();
System.out.println(Thread.currentThread().getName() + "=>" + number);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

任何一个新的技术肯定不是旧的代替,而是有新的技术和优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.fanchen.demo1.product;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 线程之间的通信问题:生产者消费者问题 通知唤醒 等待唤醒
* 线程交替执行A B操作同一个变量
*/
public class C {
public static void main(String[] args) {
Data03 data = new Data03();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printC();
}
}, "C").start();
}
}

class Data03 {
private final Lock lock = new ReentrantLock();
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
private final Condition condition3 = lock.newCondition();
private int flag = 1;

public void printA() {
lock.lock();
try {
while (flag != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "=>A");
flag = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printB() {
lock.lock();
try {
while (flag != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "=>B");
flag = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printC() {
lock.lock();
try {
while (flag != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "=>C");
flag = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

如何判断锁的是谁(8锁现象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.fanchen.demo1.lock8;

import java.util.concurrent.TimeUnit;

/**
* 关于锁的八个问题
* 1.标准情况 1发短信 2打电话
* 2.延迟四秒 1发短信 2打电话 抱着锁睡的
*/
public class Test08 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(phone::sendMessage, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(phone::call, "B").start();
}
}

class Phone{
//synchronized锁的对象是方法的调用者
//两个方法用的同一个锁 谁先拿到 谁先执行
//一次只能有一个线程访问该类方法
public synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}

public synchronized void call(){
System.out.println("打电话");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.fanchen.demo1.lock8;

import java.util.concurrent.TimeUnit;

/**
* 3.增加普通方法的话 先执行hello 然后才是发短信
* 4.两个对象 两个同步方法 先打电话 因为不是同一个锁
*/
public class Test082 {
public static void main(String[] args) {
//两个对象 两个调用者 两把锁
Phone02 phone1 = new Phone02();
Phone02 phone2 = new Phone02();
new Thread(phone1::sendMessage, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(phone2::call, "B").start();
}
}

class Phone02{
//synchronized锁的对象是方法的调用者
//两个方法用的同一个锁 谁先拿到 谁先执行
//一次只能有一个线程访问该类方法
public synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}

public synchronized void call(){
System.out.println("打电话");
}

//没有锁 不是同步方法 不受锁的影响
public void hello(){
System.out.println("hello");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.fanchen.demo1.lock8;

import java.util.concurrent.TimeUnit;

/**
* 5.增加两个静态的同步方法 只有一个对象 先发短信
* 6.两个对象!增加两个静态同步方法,先发短信
*/
public class Test083 {
public static void main(String[] args) {
new Thread(Phone03::sendMessage, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(Phone03::call, "B").start();
}
}

//锁的是Phone03唯一的一个Class对象
//Class<Phone03> aClass = Phone03.class; 唯一模板 static锁的是Class
class Phone03{
//synchronized锁的对象是方法的调用者
//static类一加载就有了 Class 模板
public static synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}

public static synchronized void call(){
System.out.println("打电话");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.fanchen.demo1.lock8;

import java.util.concurrent.TimeUnit;

/**
* 先看看是不是一个锁 再看谁先拿到锁
* 7.一个静态的同步方法 一个普通同步方法 一个对象 先打电话 两个锁
* 8.两个对象 一个静态的同步方法 一个普通的同步方法 先打电话
*/
public class Test084 {
public static void main(String[] args) {
Phone04 phone04 = new Phone04();
new Thread(Phone04::sendMessage, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(phone04::call, "B").start();
}
}


class Phone04{
//synchronized锁的对象是方法的调用者
//static类一加载就有了 Class 模板
public static synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}

public synchronized void call(){
System.out.println("打电话");
}

}

小结

new this 具体的一个手机

static Class 唯一的一个模板

集合类不安全

List不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.fanchen.demo1.unsafe;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListTest {

public static void main(String[] args) {
//并发下list是不安全的
/**
* 解决方法:
* 1.List<String> list = new Vector<>();
* 2.List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3.List<String> list = new CopyOnWriteArrayList<>();
*/
//CopyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略
//多个线程调用的时候,list,读取的时候,固定的,写入的
//在写入的时候避免覆盖造成数据问题
//比Vector厉害在哪里 只要有synchronized关键字写入效率会较低 CopyOnWriteArrayList用的是Lock锁
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, String.valueOf(i)).start();
}
}

}

Set不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.fanchen.demo1.unsafe;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* hashset底层就是hashmap
* 又出现了并发修改异常 java.util.ConcurrentModificationException
* 解决方案:
* 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2.Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString());
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}

hashset底层是什么?

1
2
3
4
5
6
7
8
9
//不变的值
private static final Object PRESENT = new Object();
public HashSet() {
map = new HashMap<>();
}
//add set 本质就是map key是无法重复的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

Map不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.fanchen.demo1.unsafe;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class MapTest {
//java.util.ConcurrentModificationException 异常
public static void main(String[] args) {
//map是这样用的吗?不是 默认等价于什么?Map<String, Object> map = new HashMap<>(16, 0.75); 0.75 16
//加载因子 初始化容量
/**
* 解决方案:
* 1.Map<String, Object> map = Collections.synchronizedMap(new HashMap<>());
* 2.Map<String, Object> map = new ConcurrentHashMap<>();
*/
Map<String, Object> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString());
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}

Callable

Callable接口类似于Runnable,因为他们都是为了其实例可能由另一个线程执行的类设计的,然而Runnable不返回结果也不抛出被检测的异常。

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同 run() call()
  4. 细节:有缓存;返回值可能会阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.fanchen.demo1.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
public static void main(String[] args) {
//Future接口可以获取值 FutureTask是Future的实现类 FutureTask实现了Runnable接口 所以使用new FutureTask<>(new MyThread())
FutureTask<String> task = new FutureTask<>(new MyThread());
new Thread(task,"A").start();
new Thread(task,"B").start();//结果会被缓存,提高效率
try {
String result = task.get();//get方法可能会产生阻塞!把它放到最后 或者 使用异步通信来处理
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

//泛型的类型和方法的返回值类型一致
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "范晨是真的帅";
}
}

常用的辅助类(必会)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.fanchen.demo1.add;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

//CountDownLatch指定线程执行完毕,再执行操作
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(10);
for (int i = 1; i <= 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " GO out");
count.countDown();
}, String.valueOf(i)).start();
}
count.await();
System.out.println("Close Door");
}

}

count.countDown(); 数量减一

count.await(); 等待计数器归零

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.fanchen.demo1.add;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* 集齐7龙珠
* CyclicBarrier : 指定个数线程执行完毕再执行操作
*/
public class CyclicBarrierDemo {

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "收集了" + temp + "个龙珠");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.fanchen.demo1.add;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量 Semaphore: 同一时间只能有指定数量个得到线程 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();//得到
System.out.println(Thread.currentThread().getName() + "获取到了");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "释放了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//释放
}
}).start();
}
}
}

读写锁

读的时候可以被多个线程读,写的时候只能有一个线程去写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.fanchen.demo1.rw;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock cache = new MyCacheLock();
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
cache.put(String.valueOf(temp), temp);
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
cache.get(String.valueOf(temp));
}, String.valueOf(i)).start();
}
}
}

class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
//存
public void put(String key, Object value){
System.out.println(Thread.currentThread().getName() + " 写入 " + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入 OK " + key);
}
//取
public void get(String key){
System.out.println(Thread.currentThread().getName() + " 读取 " + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取 OK " + o);
}
}

class MyCacheLock{
private volatile Map<String, Object> map = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//存
public void put(String key, Object value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入 " + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入 OK " + key);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.writeLock().unlock();
}
}
//取
public void get(String key){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取 " + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取 OK " + o);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
}
}

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.fanchen.demo1.bq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueenTest {
public static void main(String[] args) throws InterruptedException {
test4();
}

public static void test1(){
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.add("a"));
System.out.println(queue.add("b"));
System.out.println(queue.add("c"));
System.out.println("队首" + queue.element());
// System.out.println(queue.add("d")); java.lang.IllegalStateException: Queue full
System.out.println("===================");
System.out.println(queue.remove());
System.out.println("队首" + queue.element());
System.out.println(queue.remove());
System.out.println(queue.remove());
// System.out.println(queue.remove());java.util.NoSuchElementException
}

public static void test2(){
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println("队首" + queue.peek());
System.out.println(queue.offer("d"));//false
System.out.println("=================");
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println("队首" + queue.peek());
System.out.println(queue.poll());//null
}

public static void test3() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.put("a");
queue.put("b");
queue.put("c");
// queue.put("d");一直阻塞
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// System.out.println(queue.take());一直阻塞
}

public static void test4() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println(queue.offer("d", 5, TimeUnit.SECONDS));//超时退出 直接返回false
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll(5, TimeUnit.SECONDS));//超时退出 直接返回null
}
}

同步队列SynchronizedQueue

没有容量 进去一个元素必须等取出来之后才能再往里面放一个元素put take

不存储元素,put了一个元素,必须先take取出来,否则不能存值

线程池

池化技术

程序的运行,本质:占用系统的资源,优化资源的使用=>池化技术

线程池 连接池 内存池 对象池 创建和销毁十分浪费资源

池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我

线程池有哪些好处

  1. 降低资源的消耗
  2. 提高响应的速度
  3. 方便管理
  4. 线程复用,可以控制最大并发数,管理线程

线程池 三大方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.fanchen.demo1;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DemoPool {
public static void main(String[] args) {
new ConcurrentHashMap<>();
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// ExecutorService threadPool = Executors.newCachedThreadPool();
try{
for (int i = 0; i < 100; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "=> OK");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}

七大方法

源码分析:本质=>ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//21亿 OOM
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}



public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//存活时间 超时没被调用就释放
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂 创建线程的 一般不动
RejectedExecutionHandler handler) {//拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

手动创建一个线程池

四种拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.fanchen.demo1;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());//拒绝策略
/**
* AbortPolicy() 满了 还有线程要进来 就不处理这个人的 并抛出异常
* CallerRunsPolicy() 哪来的去哪里 去来的线程(main线程)执行
* DiscardPolicy() 队列满了 不会抛出异常 丢掉任务
* DiscardOldestPolicy() 队列满了 把队列最靠前的任务丢弃,重新尝试执行当前任务
*/
try {
for (int i = 0; i < 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " => OK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

小结和拓展

最大线程到底该如何定义?

  1. CPU密集型 4条线程同时执行(几个核心就是几线程)

  2. IO密集型 判断你程序中十分耗费IO的线程

  3. //自定义线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2,
            Runtime.getRuntime().availableProcessors(),
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());//拒绝策略
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    ## 四大函数式接口(必需掌握)

    新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算

    > 函数式接口:只有一个方法的接口

    ```java
    @FunctionalInterface
    public interface Runnable {
    public abstract void run();
    }
    //超级多FunctionalInterface 简化编程模式 在新版本的框架底层大量应用 foreach(消费者类型的函数式接口)
    ![](https://pic.rmb.bdstatic.com/bjh/1aa6c1a73daceff0f1f4335761546f0a.png)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.fanchen.demo1.function;

import java.util.function.Function;

/**
* Function 函数型接口
*/
public class Demo01 {
public static void main(String[] args) {

// Function<String, String> function = new Function<String, String>() {
// @Override
// public String apply(String str) {
// return str;
// }
// };

Function<String, String> function = (str)->{
if (str.equals("123")){
return "您输入的是123";
}else {
return "您输入的不是123";
}
};

System.out.println(function.apply("321"));

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.fanchen.demo1.function;

import java.util.function.Predicate;

/**
* Predicate 断定型接口
*/
public class Demo02 {
public static void main(String[] args) {
Predicate<String> predicate = String::isEmpty;
System.out.println(predicate.test(""));
System.out.println(predicate.test("123"));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.fanchen.demo1.function;

import java.util.function.Consumer;

/**
* Consumer 消费型接口:只有输入,没有返回值
*/
public class Demo03 {
public static void main(String[] args) {
Consumer<String> consumer = System.out::println;
consumer.accept("我真帅");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.fanchen.demo1.function;

import java.util.function.Supplier;

/**
* Supplier 供给型接口
*/
public class Demo04 {
public static void main(String[] args) {
Supplier<String> supplier = ()-> "1024";
System.out.println(supplier.get());
}
}

Stream流式计算

什么是Stream流式计算

大数据:存储+计算

存储:集合、MySQL 本质就是存储东西得

计算都应该交给 流 来操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.fanchen.demo1.stream;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

/**
* 题目要求:一分钟内完成此题,只能用一行代码实现!
* 现在有5个用户! 筛选:
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户!
*/
public class UserTest {
public static void main(String[] args) {
User a = new User(1, "a", 21);
User b = new User(2, "b", 22);
User c = new User(3, "c", 23);
User d = new User(4, "d", 24);
User e = new User(6, "e", 25);
//集合就是存储
List<User> users = Arrays.asList(a, b, c, d, e);
//计算交给Stream 链式编程
users.stream()
.filter(user -> user.getId() % 2 == 0)
.filter(user -> user.getAge() >= 23)
.map(user -> new User(user.getId(), user.getName().toUpperCase(Locale.ROOT), user.getAge()))
.limit(2)
.forEach(System.out::println);
}
}

ForkJoin

ForkJoin特点:工作窃取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.fanchen.demo1.forkjoin;

import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask<Long> {
private long start;
private long end;
//临界值
private final long temp = 1_0000_0000L;

public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}

//计算方法
@Override
protected Long compute() {
if ((end - start) > temp) {
long middle = (end + start) / 2;
ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(start, middle);
forkJoinDemo1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle+1, end);
forkJoinDemo2.fork();//拆分任务,把任务压入线程队列
return forkJoinDemo1.join() + forkJoinDemo2.join();
} else {
long sum = 0;
for (long i = start; i < 10_0000_0000; i++) {
sum += i;
}
return sum;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.fanchen.demo1.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class ForkTest {

public static void test1(){
long start = System.currentTimeMillis();
long sum = 0;
for (long i = 1; i <= 10_0000_0000; i++) {
sum += i;
}

// long i = 10_0000_0000;
// long res = i / 2;
// long sum = res * 10_0000_0001;
long end = System.currentTimeMillis();
System.out.println("结果:" + sum + "消耗时间:" + (end-start));
}

public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo task = new ForkJoinDemo(0, 10_0000_0000);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();

long end = System.currentTimeMillis();
System.out.println("结果:" + sum + "消耗时间:" + (end-start));
}

public static void test3(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("结果:" + sum + "消耗时间:" + (end-start));
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
test3();
}
}

异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.fanchen.demo1.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class FutureDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调runAsync
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + " runAsync => void");
// });
// System.out.println("hello world");
// completableFuture.get();
//有返回值的异步回调supplyAsync
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " supplyAsync => Integer");
// int i = 1/0;
return 1024;
});
System.out.println("1111111111111111111111111111111111");
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t);//正常的返回结果
System.out.println("u=>" + u);//异常信息
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 2333;
}).get());
}
}

JMM(重点)

请你谈谈对volatile的理解

volatile是java虚拟机提供的轻量级的同步机制

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

什么是JMM

JMM:java内存模型,不存在的东西,是一个概念,约定!

关于JMM的一些同步的约定:

  1. 线程解锁前,必须把共享变量立刻刷新回主存
  2. 线程加锁前,必须读取主存中的最新值到工作内存中
  3. 加锁和解锁是同一把锁

线程 工作内存 主内存

8种操作:

Volatile

保证可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.fanchen.demo1.vol;

import java.util.concurrent.TimeUnit;

public class TestDemo {
private static volatile int number = 0;
public static void main(String[] args) throws InterruptedException {
//线程一 不加volatile会死循环 可以保证可见性
new Thread(()->{
while (number == 0){

}
}).start();
TimeUnit.SECONDS.sleep(1);
number = 1;
System.out.println(number);
}
}

不保证原子性

原子性:不可分割

线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。ACID原则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.fanchen.demo1.vol;

public class TestDemo02 {
//volatile不保证原子性
private volatile static int num = 0;

//synchronized
public static void add(){
num++;
}

public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " => " + num);
}
}

原子类为什么这么高级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.fanchen.demo1.vol;

import java.util.concurrent.atomic.AtomicInteger;

public class TestDemo02 {
//原子类
private static AtomicInteger num = new AtomicInteger();

//synchronized
public static void add(){
num.getAndIncrement();
}

public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " => " + num);
}
}

这些类的底层都直接和操作系统挂钩。Unsafe 特殊的存在

指令重排

什么是指令重排:你写的程序,计算机并不是按照你的那样去执行的

源代码 => 编译优化的重排 => 指令并行也可能会重排 => 内存系统也会重排 => 执行

处理器在金总指令重排的时候,考虑:数据之间的依赖性

1
2
3
4
5
6
7
int x = 1;//1
int y = 2;//2
x = x + 5;//3
y = x * x;//4

我们所期望的:1234 但是执行的时候可能编程 2134 1324
但是不可能是4123 数据之间的依赖性!

只要加了volatile就可以避免指令重排

内存屏障。CPU指令。作用:

  1. 保证特定的操作的执行顺序
  2. 可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)

volatile 保持可见性 不能保证原子性 由于内存屏障可以避免指令重排的现象产生

单例模式

饿汉式 DCL饿汉式

1
2
3
4
5
6
7
8
9
10
11
12
package com.fanchen.demo1.single;

public class Hungry {
private Hungry(){
}

private final static Hungry HUNGRY = new Hungry();

public static Hungry getInstance(){
return HUNGRY;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.fanchen.demo1.single;

public class Holder {
private Holder(){}

public static Holder getInstance(){
return InnerClass.HOLDER;
}

public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.fanchen.demo1.single;

public class LazyMan {
private LazyMan(){}
private volatile static LazyMan lazyMan;
public static LazyMan getInstance(){
if (lazyMan == null){
synchronized (LazyMan.class){
lazyMan = new LazyMan();
}
}
return lazyMan;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.fanchen.demo1.single;

public enum Single {
INSTANCE;
public Single getInstance(){
return INSTANCE;
}
public void whateverMethod() {
System.out.println("do somethings");
}

public static void main(String[] args) {
Single.INSTANCE.whateverMethod();
}
}

深入理解CAS

什么是CAS

CAS比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环。

缺点:

  1. 循环会耗时
  2. 一次性只能保证一个共享变量的原子性
  3. ABA问题

CAS:ABA问题(狸猫换太子)

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.fanchen.demo1.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {
//什么是cas compareAndSwapInt:比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// public final boolean compareAndSet(int expect 期望, int update 更新)
atomicInteger.compareAndSet(2020, 2021);
System.out.println(atomicInteger.get());
}
}

原子引用 解决ABA问题 对应的思想=>乐观锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.fanchen.demo1.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class CASDemo {
//如果泛型是包装类 注意对象的应用问题
public static void main(String[] args) throws InterruptedException {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//获得版本号 时间戳
System.out.println("A=>" + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 120, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1);
System.out.println("A=>" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(120, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1);
System.out.println("A=>" + atomicStampedReference.getStamp());
}, "A").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("B=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 66, stamp, stamp+1);
System.out.println("B=>" + atomicStampedReference.getStamp());
}, "B").start();
TimeUnit.SECONDS.sleep(5);
System.out.println(atomicStampedReference.getReference());
}
}

各种锁的理解

公平锁、非公平锁

公平锁:非常公平 不允许插队 必须先来后到

非公平锁:非常不公平 可以插队 默认都是非公平

1
2
3
4
5
6
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁

可重入锁(递归锁)

可重入锁也就是某个线程已经获得某个锁,可以再次获取锁而不会出现死锁

自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.fanchen.demo1.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* 自旋锁
*/
public class Demo02 {
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "=>MyLock");
while (!atomicReference.compareAndSet(null, thread)){

}
}

public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "=>MyUnLock");
atomicReference.compareAndSet(thread, null);
}

public static void main(String[] args) throws InterruptedException {
Demo02 lock = new Demo02();
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.myUnLock();
}
}).start();
TimeUnit.SECONDS.sleep(2);
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.myUnLock();
}
}).start();
}
}

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.fanchen.demo1.lock;

import java.util.concurrent.TimeUnit;

public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB)).start();
new Thread(new MyThread(lockB, lockA)).start();
}
}

class MyThread implements Runnable {
private String lockA;
private String lockB;

public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}

@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockA + "=>get " + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockB + "=>get " + lockA);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
E:\框架学习\spring-boot-study>jps -l
2404 sun.tools.jps.Jps
4996 org.jetbrains.idea.maven.server.RemoteMavenServer36
10248 org.jetbrains.kotlin.daemon.KotlinCompileDaemon
5500 com.fanchen.demo1.lock.DeadLockDemo
7868
8972 org.jetbrains.jps.cmdline.Launcher

E:\框架学习\spring-boot-study>jstack 5500
2021-10-29 12:10:08
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b15 mixed mode):

"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000002de2800 nid=0x14c8 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001da31000 nid=0x3610 waiting for monitor entry [0x000000001e6ff000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.fanchen.demo1.lock.MyThread.run(DeadLockDemo.java:33)
- waiting to lock <0x000000076b635da0> (a java.lang.String)
- locked <0x000000076b635dd8> (a java.lang.String)
at java.lang.Thread.run(Thread.java:745)

"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001da2c800 nid=0x346c waiting for monitor entry [0x000000001e5ff000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.fanchen.demo1.lock.MyThread.run(DeadLockDemo.java:33)
- waiting to lock <0x000000076b635dd8> (a java.lang.String)
- locked <0x000000076b635da0> (a java.lang.String)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001d9cb800 nid=0x2618 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000001d94d000 nid=0x111c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x000000001d94b800 nid=0x21fc waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x000000001d943800 nid=0x1594 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000001d942000 nid=0x3248 runnable [0x000000001dffe000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x000000076b507b00> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x000000076b507b00> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:61)

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001c564800 nid=0x1fcc waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001c520000 nid=0x1ecc runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x0000000002ed7800 nid=0x3390 in Object.wait() [0x000000001d87f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b388ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x000000076b388ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x0000000002ed2000 nid=0x32a0 in Object.wait() [0x000000001d77f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b386b50> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x000000076b386b50> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"VM Thread" os_prio=2 tid=0x000000001c4d7800 nid=0x2688 runnable

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x0000000002df8000 nid=0xf78 runnable

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002df9800 nid=0x1cb8 runnable

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002dfb800 nid=0x1bdc runnable

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002dfe000 nid=0x2a28 runnable

"VM Periodic Task Thread" os_prio=2 tid=0x000000001da2a000 nid=0x28f8 waiting on condition

JNI global references: 33


Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x000000001c4fdc48 (object 0x000000076b635da0, a java.lang.String),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x000000001c500638 (object 0x000000076b635dd8, a java.lang.String),
which is held by "Thread-1"

Java stack information for the threads listed above:
===================================================
"Thread-1":
at com.fanchen.demo1.lock.MyThread.run(DeadLockDemo.java:33)
- waiting to lock <0x000000076b635da0> (a java.lang.String)
- locked <0x000000076b635dd8> (a java.lang.String)
at java.lang.Thread.run(Thread.java:745)
"Thread-0":
at com.fanchen.demo1.lock.MyThread.run(DeadLockDemo.java:33)
- waiting to lock <0x000000076b635dd8> (a java.lang.String)
- locked <0x000000076b635da0> (a java.lang.String)
at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

springboot项目如何使用线程池提升业务处理速度

beanFactory中singletonObject下有一个applicationTaskExecutor

spring提供的一个对java本身的线程池封装后的一个类 启动后再IOC容器中已经有这个类了

使用异步回调的处理方式可以提升响应速度

如果applicationTaskExecutor不满足我们的需求,可以通过自定义线程池,然后通过@Bean注入到容器中,再进行后续对应的操作。

本文标题:JUC并发编程

文章作者:fanchen

发布时间:2021年10月21日 - 11:40:09

最后更新:2021年10月29日 - 12:23:46

原始链接:http://88fanchen.github.io/posts/eb9166f8/

许可协议:署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。