进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。

线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务。

1.线程的基本使用

1.创建线程

创建进程方式1

1
2
3
4
5
6
7
8
Thread t = new Thread(){
@Override
public void run() {
log.info("test");
}
};
t.start();
log.info("main");

创建进程方式2

1
2
3
4
5
6
7
8
9
Runnable runnable = new Runnable() {
@Override
public void run() {
log.info("test1");
}
};
Thread t = new Thread(runnable,"test1");
t.start();
log.info("main1");

创建进程方式3 lamda简化s

1
2
3
4
new Thread(()->{
log.info("test2");
},"test2").start();
log.info("main2");

2.sleep&yield

1.调用sleep会让当前线程从Running进入ThreadWaitting状态

2.其他线程可以使用interrupt方法打断正在睡眠的进程,这是sleep方法会抛出异常

3.睡眠结束的线程未必会直接运行

4.建议用TimeUnit的sleep代替Thread的sleep方法

3.同步应用

join() 等待线程结束

join(long n) 等待n毫秒,如果没有执行完成,将不再等待

4.interrupt

1.打断sleep wait join的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread t1 = new Thread(){
@Override
public void run() {
log.info("sleep");
try {
Thread.sleep(500);
}catch (InterruptedException e){
e.printStackTrace();
}
}
};
t1.setName("t1");
t1.start();
Thread.sleep(1000);

log.info("interrupter");
t1.interrupt();
log.info("打断标记{}",t1.isInterrupted());

[t1] INFO richuff.top.MyThread - sleep
[main] INFO richuff.top.MyThread - interrupter
java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at richuff.top.MyThread$1.run(MyThread.java:13)
[main] INFO richuff.top.MyThread - 打断标记false

2.打断正常线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Thread t1 = new Thread(){
@Override
public void run() {
while (true){

}
}
}; t1.setName("t1");
t1.start();
Thread.sleep(1000);

log.info("interrupter");
t1.interrupt();
log.info("打断标记{}",t1.isInterrupted());

打断标记的用途

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread t1 = new Thread(){
@Override
public void run() {
log.info("sleep");
while (true){
boolean interrupted = Thread.currentThread().isInterrupted();
if (interrupted){
log.info("quit");
break;
}
}
}
};
t1.setName("t1");
t1.start();
Thread.sleep(1000);

log.info("interrupter");
t1.interrupt();
log.info("打断标记{}",t1.isInterrupted());

[t1] INFO richuff.top.MyThread - sleep
[main] INFO richuff.top.MyThread - interrupter
[t1] INFO richuff.top.MyThread - quit
[main] INFO richuff.top.MyThread - 打断标记true

5.两阶段终止模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class WatchThread {
public static void main(String[] args) throws Exception{
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
Thread.sleep(6000);
twoPhaseTermination.stop();
}
}

@Slf4j(topic = "TwoPhaseTermination")
class TwoPhaseTermination{
private Thread minitor;

public void start(){
minitor = new Thread(()->{
while (true){
Thread current = Thread.currentThread();
if (current.isInterrupted()){
log.info("处理后续事务");
break;
}
try{
Thread.sleep(1000); //在睡眠时被打断
log.info("处理逻辑"); //在处理逻辑时被打断
}catch (InterruptedException e){
e.printStackTrace();
current.interrupt();
}
}
},"minitor");
minitor.start();
}

public void stop(){
minitor.interrupt();
}
}

6.进程的五种状态

  • 初始状态:仅在语言层面创建了对象,还未与操作系统相关联
  • 可运行状态 可以由CPU进行调度
  • 运行状态 指获取了CPU时间片中的状态
  • 阻塞状态
  • 终止状态 线程执行完毕,生命周期已经结束

2.synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (sync.class) {
count++;
}
}
}, "t1");

Thread t2 = new Thread(()->{
for (int i = 0;i<5000;i++){
synchronized (sync.class) {
count--;
}
}
},"t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

3.Monitor(锁)

每个Java对象都可以关联一个Monitor对象

1
2
3
4
5
6
7
8
9
10
11
12
13
|-------------------------------------------------------|--------------------|
| Mark Word (32 bits) | State |
|-------------------------------------------------------|--------------------|
| identity_hashcode:25 | age:4 | biased_lock:1 | lock:2 | Normal |
|-------------------------------------------------------|--------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | lock:2 | Biased |
|-------------------------------------------------------|--------------------|
| ptr_to_lock_record:30 | lock:2 | Lightweight Locked |
|-------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:30 | lock:2 | Heavyweight Locked |
|-------------------------------------------------------|--------------------|
| | lock:2 | Marked for GC |
|-------------------------------------------------------|--------------------|

4.设计模式 - 保护性暂停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package richuff.top.richu6;

public class GuardObject{
private Object response;

public void get(){
synchronized (this){
while(response == null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void set(Object response){
synchronized (this){
this.response = response;
this.notify();
}
}
}

运行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
GuardObject guard = new GuardObject();

new Thread(()->{
log.info("begin... get");
guard.get();
log.info("end... get");
},"t1").start();

new Thread(()->{
Object response = new Object();
log.info("begin...response");
guard.set(response);
log.info("end...response");
},"t2").start();
}

[t1] INFO richuff.top.richu6.A - begin… get
[t2] INFO richuff.top.richu6.A - begin…response
[t2] INFO richuff.top.richu6.A - begin
[t1] INFO richuff.top.richu6.A - end… get

优化改进,增加超时设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class GuardObject{
private Object response;

public void get(int sTime){
long begin = System.currentTimeMillis();
long end = 0;
synchronized (this){
while(response == null){
long hTIme = sTime - end;
if (hTIme <= 0){
break;
}
try {
this.wait(hTIme);
} catch (InterruptedException e) {
e.printStackTrace();
}
end = System.currentTimeMillis() - begin;
}
}
}
public void set(Object response){
synchronized (this){
this.response = response;
this.notify();
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
GuardObject guard = new GuardObject();

new Thread(()->{
log.info("begin... get");
guard.get(2000);
log.info("end... get");
},"t1").start();

new Thread(()->{
Object response = new Object();
log.info("begin...response");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
guard.set(response);
log.info("end...response");
},"t2").start();
}

结果

[t1] INFO richuff.top.richu6.A - begin… get
[t2] INFO richuff.top.richu6.A - begin…response
[t1] INFO richuff.top.richu6.A - end… get
[t2] INFO richuff.top.richu6.A -end…response

进一步优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class GuardObject{
private Object response;
private int id;
public GuardObject(int id){
this.id = id;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public Object get(int sTime){
long begin = System.currentTimeMillis();
long end = 0;
synchronized (this){
while(response == null){
long hTIme = sTime - end;
if (hTIme <= 0){
break;
}
try {
this.wait(hTIme);
} catch (InterruptedException e) {
e.printStackTrace();
}
end = System.currentTimeMillis() - begin;
}
return response;
}
}
public void set(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}

实现通过id进行接收的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Post extends Thread{
private static final Logger log = LoggerFactory.getLogger(Post.class);
private int id;
private String msg;

public Post(int id, String msg) {
this.id = id;
this.msg = msg;
}

@Override
public void run() {
GuardObject guardObject = Boxes.getGuardById(id);
log.info("发送: id{},内容{}",guardObject.getId(),msg);
guardObject.set(msg);
}
}

class Position extends Thread{
private static final Logger log = LoggerFactory.getLogger(Position.class);
@Override
public void run() {
GuardObject guardObject = Boxes.createGuardObject();
log.info("开始接收收: id{}",guardObject.getId());
Object o = guardObject.get(5000);
log.info("收到内容: {}",o.toString());
}
}

class Boxes{
private static Map<Integer,GuardObject> boxes = new HashMap<>();
private static int id = 1;

private static synchronized int generatedId(){
return id++;
}

public static GuardObject createGuardObject(){
GuardObject guardObject = new GuardObject(generatedId());
boxes.put(guardObject.getId(),guardObject);
return guardObject;
}

public static Set<Integer> getAll(){
return boxes.keySet();
}

public static GuardObject getGuardById(int id){
return boxes.remove(id);
}
}

Post发送消息,Position通过id接收消息,两张间使用Boxes通信

5.ReentrantLock

1.Lock

Lock.lock()打断无效果

Lock.lockInterruptibly();可打断锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static ReentrantLock Lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(()->{
try {
//如果没有竞争那么此方法会获取Lock对象锁
//如果有竞争就进入阻塞队列,可以被其他线程用interrupt打断
log.info("尝试获取锁");
Lock.lockInterruptibly();
}catch (Exception e){
e.printStackTrace();
log.info("没有获取到锁");
return;
}try {
log.info("获取到锁");
}finally {
Lock.unlock();
}
});
Lock.lock();
t1.start();
}

2.锁超时

tryLock尝试加锁,成功上锁返回true,未成功返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static ReentrantLock Lock = new ReentrantLock();

public static void main(String[] args) {
Thread t1 = new Thread(()->{
log.info("尝试获取锁");
if (! Lock.tryLock()){
log.info("获取不到锁");
return;
}
try {
log.info("获取到锁");
}catch (Exception e){
e.printStackTrace();
}finally {
Lock.unlock();
}
});
Lock.lock();
t1.start();
}

加参数的tryLock() 等待指定时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static ReentrantLock Lock = new ReentrantLock();

public static void main(String[] args) {
Thread t1 = new Thread(()->{
log.info("尝试获取锁");
try {
if (! Lock.tryLock(1, TimeUnit.SECONDS)){
log.info("获取不到锁");
return;
}
}catch (Exception e){
e.printStackTrace();
}
try {
log.info("获取到锁");
}catch (Exception e){
e.printStackTrace();
}finally {
Lock.unlock();
}
});
Lock.lock();
t1.start();
}

3.公平锁

ReentrantLock默认是不公平的锁

1
private static ReentrantLock Lock = new ReentrantLock(true);

4.条件变量

lock.newCondition();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) throws Exception{
//创建条件变量
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

lock.lock();
//进入等待
condition1.wait();

condition1.signal();
condition1.signalAll();
}

5.固定运行顺序

t2 比 t1先执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static Logger logger = LoggerFactory.getLogger(Thread.class);
public static boolean t2haverun = false;
public static final Object lock = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(()->{
synchronized (lock){
while(!t2haverun){
try {
lock.wait();
}catch (Exception e){
e.printStackTrace();
}
}
logger.info("1");
}
},"t1");
Thread t2 = new Thread(()->{
synchronized (lock){
logger.info("2");
t2haverun = true;
lock.notify();
}
},"t2");
t1.start();
t2.start();
}

使用park和unpark

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
Thread t1 = new Thread(()->{
LockSupport.park();
logger.info("1");
},"t1");
Thread t2 = new Thread(()->{
logger.info("2");
LockSupport.unpark(t1);
},"t2");
t1.start();
t2.start();
}

6.循环顺序打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Test {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1, 5);
new Thread(() -> {
wn.Print("a", 1, 2);
}).start();
new Thread(() -> {
wn.Print("b", 2, 3);
}).start();
new Thread(() -> {
wn.Print("c", 3, 1);
}).start();
}
}

class WaitNotify {
public void Print(String str, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this){
while (falg != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
return; // 或者处理异常
}
}
System.out.println(str);
falg = nextFlag;
this.notifyAll();
}
}
}

private int falg;
private int loopNumber;

public WaitNotify(int falg, int loopNumber) {
this.falg = falg;
this.loopNumber = loopNumber;
}
}

6.JMM

JAVA内存模型

原子性 保证指令不受线程上下文切换的影响

可见性 保证指令不会受CPU缓存的影响

有序性 保证指令不会受CPU指令并行优化的影响

1.Unsafe对象的获取

通过反射获取

1
2
3
4
5
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);

System.out.println(unsafe);

2.不可变对象的设计

模拟连接池的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Pool{
private final int poolSize;

private Connection[] connPools;

private AtomicIntegerArray states;

public Pool(int poolSize) {
this.states = new AtomicIntegerArray(new int[poolSize]);
this.connPools = new Connection[poolSize];
this.poolSize = poolSize;
}

public Connection borrow() {
while (true){
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0){
if (states.compareAndSet(i,0,1)){
return connPools[i];
}
}
}
//无空闲连接
synchronized (this){
try{
wait();
}catch (Exception e){
e.printStackTrace();
}
}
}
}

public void free(Connection conn){
for (int i = 0; i < poolSize; i++) {
if (conn == connPools[i] && states.get(i) == 1){
states.set(i,0);
synchronized (this){
this.notifyAll();
}
break;
}
}

}
}

3.线程队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public class BlockingQueue<T>{
private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class);
private Deque<T> queue = new ArrayDeque<>();

private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

// 带超时的阻塞获取

public T poll(long time, TimeUnit unit){
//将时间转为统一的单位
lock.lock();
try {
long nanoTime = unit.toNanos(time);
while (queue.isEmpty()){
if (nanoTime <= 0){
return null;
}
nanoTime = emptyWaitSet.awaitNanos(nanoTime);
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
fullWaitSet.signalAll();
return queue.removeFirst();
}

//阻塞获取
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
emptyWaitSet.await();
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
fullWaitSet.signalAll();
return queue.removeFirst();
}

public void put(T t){
lock.lock();
while(queue.size() == capcity){
try{
fullWaitSet.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
queue.addLast(t);
emptyWaitSet.signalAll();
}

public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}

4.线程池的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@Slf4j
class TestPool{
public static Logger log = LoggerFactory.getLogger(TestPool.class);
public static void main(String[] args) {
log.info("begin");
ThreadPool threadPool = new ThreadPool(1000, 2, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 15; i++) {
threadPool.excute(()->{
try {
Thread.sleep(1000000L);
}catch (Exception e){
e.printStackTrace();
}
log.info("test");
});
}
}
}

public class BlockingQueue<T>{
private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class);
private Deque<T> queue = new ArrayDeque<>();

private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

// 带超时的阻塞获取
public T poll(long time, TimeUnit unit){
//将时间转为统一的单位
lock.lock();
try {
long nanoTime = unit.toNanos(time);
while (queue.isEmpty()){
if (nanoTime <= 0){
return null;
}
nanoTime = emptyWaitSet.awaitNanos(nanoTime);
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
log.info("唤醒线程");
fullWaitSet.signalAll();
return queue.removeFirst();
}

//阻塞获取
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
emptyWaitSet.await();
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
log.info("唤醒线程");
fullWaitSet.signalAll();
return queue.removeFirst();
}

public void put(T t){
lock.lock();
try {
while (queue.size() == capcity) {
try {
fullWaitSet.await();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("加入任务队列");
queue.addLast(t);
emptyWaitSet.signalAll();
}
finally {
lock.unlock();
}
}

public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}

class ThreadPool{
private static final Logger log = LoggerFactory.getLogger(ThreadPool.class);
private BlockingQueue<Runnable> taskQueue;

private HashSet<Worker> workers = new HashSet<>();

private int currentSize;

//设置超时时间
private long timeout;

private TimeUnit unit;

public void excute(Runnable task){
synchronized (workers){
if (workers.size() < currentSize){
Worker worker = new Worker(task);
log.info("加入工作队列");
workers.add(worker);
worker.start();
}else{
taskQueue.put(task);
}
}
}

public ThreadPool(long timeout, int currentSize, TimeUnit unit,int capacity) {
this.timeout = timeout;
this.currentSize = currentSize;
this.taskQueue = new BlockingQueue<>(capacity);
this.unit = unit;
}

class Worker extends Thread {
private Runnable task;

Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
//执行任务 如果当前任务对象不为空,并且从任务队列也不为空
while (task != null || (task = taskQueue.poll(timeout,TimeUnit.MILLISECONDS)) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.info("移除worker{}", this);
workers.remove(this);
}
}

@Override
public String toString() {
return super.toString();
}
}
}

5.带拒绝策略的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class TestPool{
public static Logger log = LoggerFactory.getLogger(TestPool.class);
public static void main(String[] args) {
log.info("begin");
ThreadPool threadPool = new ThreadPool(1000, 2, TimeUnit.MILLISECONDS, 10,(queue,task)->{
//queue.put(task);
queue.offer(task,1000,TimeUnit.MILLISECONDS);
});
for (int i = 0; i < 15; i++) {
threadPool.excute(()->{
try {
Thread.sleep(20000L);
}catch (Exception e){
e.printStackTrace();
}
log.info("test");
});
}
}
}

@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}

public class BlockingQueue<T>{
private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class);
private Deque<T> queue = new ArrayDeque<>();

private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//容量
private int capcity;


public BlockingQueue(int capcity) {
this.capcity = capcity;
}

// 带超时的阻塞获取
public T poll(long time, TimeUnit unit){
//将时间转为统一的单位
lock.lock();
try {
long nanoTime = unit.toNanos(time);
while (queue.isEmpty()){
if (nanoTime <= 0){
return null;
}
nanoTime = emptyWaitSet.awaitNanos(nanoTime);
}
log.info("唤醒线程");
fullWaitSet.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return queue.removeFirst();
}

//阻塞获取
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
emptyWaitSet.await();
}
log.info("唤醒线程");
fullWaitSet.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}

return queue.removeFirst();
}

public void put(T t){
lock.lock();
try {
while (queue.size() == capcity) {
try {
fullWaitSet.await();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("加入任务队列");
queue.addLast(t);
emptyWaitSet.signalAll();
}
finally {
lock.unlock();
}
}

public boolean offer(T t,long time,TimeUnit unit){
lock.lock();
try {
long nanoTime = unit.toNanos(time);
while (queue.size() == capcity) {
try {
if (nanoTime <= 0){
log.info("任务超时");
return false;
}
nanoTime = fullWaitSet.awaitNanos(nanoTime);
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("加入任务队列");
queue.addLast(t);
emptyWaitSet.signalAll();
}
finally {
lock.unlock();
}
return true;
}

public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capcity){
rejectPolicy.reject(this,task);
}else{
log.info("加入任务队列");
queue.addLast(task);
emptyWaitSet.signalAll();
}
}finally {
lock.unlock();
}
}
}

class ThreadPool{
private static final Logger log = LoggerFactory.getLogger(ThreadPool.class);
private BlockingQueue<Runnable> taskQueue;

private HashSet<Worker> workers = new HashSet<>();

private int currentSize;

//设置超时时间
private long timeout;

private TimeUnit unit;

private RejectPolicy<Runnable> rejectPolicy;

public void excute(Runnable task){
synchronized (workers){
if (workers.size() < currentSize){
Worker worker = new Worker(task);
log.info("加入工作队列");
workers.add(worker);
worker.start();
}else{
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy,task);
}
}
}

public ThreadPool(long timeout, int currentSize, TimeUnit unit,int capacity,RejectPolicy<Runnable> rejectPolicy) {
this.timeout = timeout;
this.currentSize = currentSize;
this.taskQueue = new BlockingQueue<>(capacity);
this.unit = unit;
this.rejectPolicy = rejectPolicy;
}

class Worker extends Thread {
private Runnable task;

Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
//执行任务 如果当前任务对象不为空,并且从任务队列也不为空
while (task != null || (task = taskQueue.poll(timeout,TimeUnit.MILLISECONDS)) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.info("移除worker{}", this);
workers.remove(this);
}
}

@Override
public String toString() {
return super.toString();
}
}
}

6.ThreadPoolExcutor

1.newFixedThreadPool

线程池的线程数是固定的,即使任务数量增加,线程池中的线程数也不会超过指定的数量。线程池中的线程是复用的,当某个线程完成了任务,它可以继续执行下一个任务,而不需要重新创建线程 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
return new Thread(
r,"my_thread"+t.getAndIncrement()
);
}
});
pool.execute(()->{
log.info("1");
});
pool.execute(()->{
log.info("2");
});
pool.execute(()->{
log.info("3");
});

2.CachedThreadPool

1
ExecutorService service = Executors.newCachedThreadPool();

3.提交任务

excute() 提交Runable

submit() 提交Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService pool = Executors.newFixedThreadPool(2);


/* Future<String> future = pool.submit(new Callable<String>() {

@Override
public String call() throws Exception {
Thread.sleep(1000);
return "hhh";
}
});*/
//lambda 简化
Future<String> future = pool.submit(()->{
Thread.sleep(1000);
return "hhh";
});
log.info(future.get());

invokeAll() 执行其中所有的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
() -> {
Thread.sleep(500);
return "1";
},
() -> {
Thread.sleep(1000);
return "2";
},
() -> {
Thread.sleep(2000);
return "3";
}
));
for (Future<String> future:futures){
log.info(future.get());
}

invokeAny() 哪个任务先完成,就直接返回任务结果,其他的任务终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService pool = Executors.newFixedThreadPool(2);
String res = pool.invokeAny(Arrays.asList(
() -> {
Thread.sleep(500);
return "1";
},
() -> {
Thread.sleep(1000);
return "2";
},
() -> {
Thread.sleep(2000);
return "3";
}
));
log.info("{}",res);

7.newScheduledThreadPool

1.scheduleAtFixedRate

1
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
  • command: 一个实现了 Runnable 接口的无参数函数,它将在指定的时间间隔内重复执行。
  • initialDelay: 首次执行前的初始延迟时间。如果设置为0,则任务将立即执行。
  • period: 连续执行之间的时间间隔。在这个例子中,任务将每隔给定的时间间隔执行一次。
  • unit: period 参数的时间单位,TimeUnit 是一个枚举,定义了时间的单位(如天、小时、分钟、秒、毫秒等)。

2.schedule

1
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
  • command: 一个实现了 Runnable 接口的无参数函数,它将在指定的延迟后执行一次。
  • delay: 首次执行的延迟时间。这个时间是从现在开始计算的。
  • unit: delay 参数的时间单位,TimeUnit 是一个枚举,定义了时间的单位(如天、小时、分钟、秒、毫秒等)。
1
2
3
4
//延迟执行
pool.schedule(()->{
log.info("test");
},1,TimeUnit.SECONDS);

3.scheduleWithFixedDelay

  • command:这是一个Runnable任务,它将在线程池中的某个线程上执行。

  • initialDelay:这是在首次执行任务之前的初始延迟时间。在这个例子中,initialDelay的值是1秒,表示任务将在启动线程池后的1秒开始执行。

  • delay:这是在每次任务执行完毕后,到下一次任务开始执行之前的固定延迟时间。在这个例子中,delay的值也是1秒,意味着每次任务执行完成后,都会等待1秒再开始下一次执行。

  • unit:这是initialDelaydelay参数的时间单位。在这个例子中,TimeUnit.SECONDS表示时间单位是秒。

1
2
3
4
//固定等待时长  不会等待前一个任务完成
pool.scheduleWithFixedDelay(()->{
log.info("fixed");
},1,1,TimeUnit.SECONDS);

8.每周执行一次任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//获取当前时间
LocalDateTime localDateTime = LocalDateTime.now();
//获取周四时间
LocalDateTime dateTime = localDateTime.withHour(16).withMinute(0).withSecond(0).with(DayOfWeek.MONDAY);

//如果当前时间大于周四
if (localDateTime.isAfter(dateTime)){
dateTime.plusWeeks(1);
}

long initialDelay = Duration.between(localDateTime,dateTime).toMillis();
//间隔一周
long period = 1000 * 60 * 60 * 24 * 7;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(()->{
log.info("test");
},initialDelay,period, TimeUnit.MILLISECONDS);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class MRennterLock {
public static Logger log = LoggerFactory.getLogger(MRennterLock.class);
public static RFLock lock = new RFLock();
public static void main(String[] args) {

new Thread(()->{
log.info("t1 begin..");
lock.lock();
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
log.info("t1 end...");
}
},"t1").start();

new Thread(()->{
log.info("t2 begin..");
lock.lock();
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
log.info("t2 end...");
}
},"t2").start();
}
}

class RFLock implements Lock{

static class ReRFLock extends AbstractQueuedSynchronizer{

@Override
protected boolean tryRelease(int arg) {
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}else{
return false;
}
}

@Override
protected boolean tryAcquire(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public Condition newCondition(){
return new ConditionObject();
}

@Override
public String toString() {
return super.toString();
}
}

private final ReRFLock reRFLock = new ReRFLock();

@Override
public void lock() {
reRFLock.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
reRFLock.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return reRFLock.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return reRFLock.tryAcquireSharedNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
reRFLock.release(1);
}

@Override
public Condition newCondition() {
return reRFLock.newCondition();
}
}

7.动态代理

1.介绍

在 Java 动态代理机制中 InvocationHandler 接口和 Proxy 类是核心。**

Proxy 类中使用频率最高的方法是:newProxyInstance() ,这个方法主要用来生成一个代理对象。

1
2
3
4
5
6
7
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
{
......
}

这个方法一共有 3 个参数:

  1. loader :类加载器,用于加载代理对象。
  2. interfaces : 被代理类实现的一些接口;
  3. h : 实现了 InvocationHandler 接口的对象;

要实现动态代理的话,还必须需要实现InvocationHandler 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时,这个方法的调用就会被转发到实现InvocationHandler 接口类的 invoke 方法来调用。

1
2
3
4
5
6
7
8
public interface InvocationHandler {

/**
* 当你使用代理对象调用方法的时候实际会调用到这个方法
*/
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}

invoke() 方法有下面三个参数:

  1. proxy :动态生成的代理类
  2. method : 与代理类对象调用的方法相对应
  3. args : 当前 method 方法的参数

也就是说:你通过Proxy 类的 newProxyInstance() 创建的代理对象在调用方法的时候,实际会调用到实现InvocationHandler 接口的类的 invoke()方法。 你可以在 invoke() 方法中自定义处理逻辑,比如在方法执行前后做什么事情。

2. JDK 动态代理类使用步骤

  1. 定义一个接口及其实现类;
  2. 自定义 InvocationHandler 并重写invoke方法,在 invoke 方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑;
  3. 通过 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h) 方法创建代理对象;

3. 代码示例

这样说可能会有点空洞和难以理解,我上个例子,大家感受一下吧!

1.定义发送短信的接口

1
2
3
public interface SmsService {
String send(String message);
}

2.实现发送短信的接口

1
2
3
4
5
6
public class SmsServiceImpl implements SmsService {
public String send(String message) {
System.out.println("send message:" + message);
return message;
}
}

3.定义一个 JDK 动态代理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* @author shuang.kou
* @createTime 2020年05月11日 11:23:00
*/
public class DebugInvocationHandler implements InvocationHandler {
/**
* 代理类中的真实对象
*/
private final Object target;

public DebugInvocationHandler(Object target) {
this.target = target;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws InvocationTargetException, IllegalAccessException {
//调用方法之前,我们可以添加自己的操作
System.out.println("before method " + method.getName());
Object result = method.invoke(target, args);
//调用方法之后,我们同样可以添加自己的操作
System.out.println("after method " + method.getName());
return result;
}
}

invoke() 方法: 当我们的动态代理对象调用原生方法的时候,最终实际上调用到的是 invoke() 方法,然后 invoke() 方法代替我们去调用了被代理对象的原生方法。

4.获取代理对象的工厂类

1
2
3
4
5
6
7
8
9
public class JdkProxyFactory {
public static Object getProxy(Object target) {
return Proxy.newProxyInstance(
target.getClass().getClassLoader(), // 目标类的类加载器
target.getClass().getInterfaces(), // 代理需要实现的接口,可指定多个
new DebugInvocationHandler(target) // 代理对象对应的自定义 InvocationHandler
);
}
}

getProxy():主要通过Proxy.newProxyInstance()方法获取某个类的代理对象

5.实际使用

1
2
SmsService smsService = (SmsService) JdkProxyFactory.getProxy(new SmsServiceImpl());
smsService.send("java");

运行上述代码之后,控制台打印出:

1
2
3
before method send
send message:java
after method send