进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务。
1.线程的基本使用 1.创建线程
创建进程方式1
1 2 3 4 5 6 7 8 Thread t = new Thread (){ @Override public void run () { log.info("test" ); } }; t.start(); log.info("main" );
创建进程方式2
1 2 3 4 5 6 7 8 9 Runnable runnable = new Runnable () { @Override public void run () { log.info("test1" ); } }; Thread t = new Thread (runnable,"test1" );t.start(); log.info("main1" );
创建进程方式3 lamda简化s
1 2 3 4 new Thread (()->{ log.info("test2" ); },"test2" ).start(); log.info("main2" );
2.sleep&yield
1.调用sleep会让当前线程从Running
进入ThreadWaitting
状态
2.其他线程可以使用interrupt方法打断正在睡眠的进程,这是sleep方法会抛出异常
3.睡眠结束的线程未必会直接运行
4.建议用TimeUnit的sleep代替Thread的sleep方法
3.同步应用
join() 等待线程结束
join(long n) 等待n毫秒,如果没有执行完成,将不再等待
4.interrupt 1.打断sleep wait join的线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Thread t1 = new Thread (){ @Override public void run () { log.info("sleep" ); try { Thread.sleep(500 ); }catch (InterruptedException e){ e.printStackTrace(); } } }; t1.setName("t1" ); t1.start(); Thread.sleep(1000 ); log.info("interrupter" ); t1.interrupt(); log.info("打断标记{}" ,t1.isInterrupted());
[t1] INFO richuff.top.MyThread - sleep [main] INFO richuff.top.MyThread - interrupter java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at richuff.top.MyThread$1.run(MyThread.java:13) [main] INFO richuff.top.MyThread - 打断标记false
2.打断正常线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread t1 = new Thread (){ @Override public void run () { while (true ){ } } }; t1.setName("t1" ); t1.start(); Thread.sleep(1000 ); log.info("interrupter" ); t1.interrupt(); log.info("打断标记{}" ,t1.isInterrupted());
打断标记的用途
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Thread t1 = new Thread (){ @Override public void run () { log.info("sleep" ); while (true ){ boolean interrupted = Thread.currentThread().isInterrupted(); if (interrupted){ log.info("quit" ); break ; } } } }; t1.setName("t1" ); t1.start(); Thread.sleep(1000 ); log.info("interrupter" ); t1.interrupt(); log.info("打断标记{}" ,t1.isInterrupted());
[t1] INFO richuff.top.MyThread - sleep [main] INFO richuff.top.MyThread - interrupter [t1] INFO richuff.top.MyThread - quit [main] INFO richuff.top.MyThread - 打断标记true
5.两阶段终止模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class WatchThread { public static void main (String[] args) throws Exception{ TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination (); twoPhaseTermination.start(); Thread.sleep(6000 ); twoPhaseTermination.stop(); } } @Slf4j(topic = "TwoPhaseTermination") class TwoPhaseTermination { private Thread minitor; public void start () { minitor = new Thread (()->{ while (true ){ Thread current = Thread.currentThread(); if (current.isInterrupted()){ log.info("处理后续事务" ); break ; } try { Thread.sleep(1000 ); log.info("处理逻辑" ); }catch (InterruptedException e){ e.printStackTrace(); current.interrupt(); } } },"minitor" ); minitor.start(); } public void stop () { minitor.interrupt(); } }
6.进程的五种状态
初始状态:仅在语言层面创建了对象,还未与操作系统相关联
可运行状态 可以由CPU进行调度
运行状态 指获取了CPU时间片中的状态
阻塞状态
终止状态 线程执行完毕,生命周期已经结束
2.synchronized 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static int count = 0 ;public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { synchronized (sync.class) { count++; } } }, "t1" ); Thread t2 = new Thread (()->{ for (int i = 0 ;i<5000 ;i++){ synchronized (sync.class) { count--; } } },"t2" ); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); }
3.Monitor(锁)
每个Java对象都可以关联一个Monitor对象
1 2 3 4 5 6 7 8 9 10 11 12 13 |-------------------------------------------------------|--------------------| | Mark Word (32 bits) | State | |-------------------------------------------------------|--------------------| | identity_hashcode:25 | age:4 | biased_lock:1 | lock:2 | Normal | |-------------------------------------------------------|--------------------| | thread:23 | epoch:2 | age:4 | biased_lock:1 | lock:2 | Biased | |-------------------------------------------------------|--------------------| | ptr_to_lock_record:30 | lock:2 | Lightweight Locked | |-------------------------------------------------------|--------------------| | ptr_to_heavyweight_monitor:30 | lock:2 | Heavyweight Locked | |-------------------------------------------------------|--------------------| | | lock:2 | Marked for GC | |-------------------------------------------------------|--------------------|
4.设计模式 - 保护性暂停 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package richuff.top.richu6;public class GuardObject { private Object response; public void get () { synchronized (this ){ while (response == null ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void set (Object response) { synchronized (this ){ this .response = response; this .notify(); } } }
运行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) { GuardObject guard = new GuardObject (); new Thread (()->{ log.info("begin... get" ); guard.get(); log.info("end... get" ); },"t1" ).start(); new Thread (()->{ Object response = new Object (); log.info("begin...response" ); guard.set(response); log.info("end...response" ); },"t2" ).start(); }
[t1] INFO richuff.top.richu6.A - begin… get [t2] INFO richuff.top.richu6.A - begin…response [t2] INFO richuff.top.richu6.A - begin [t1] INFO richuff.top.richu6.A - end… get
优化改进,增加超时设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class GuardObject { private Object response; public void get (int sTime) { long begin = System.currentTimeMillis(); long end = 0 ; synchronized (this ){ while (response == null ){ long hTIme = sTime - end; if (hTIme <= 0 ){ break ; } try { this .wait(hTIme); } catch (InterruptedException e) { e.printStackTrace(); } end = System.currentTimeMillis() - begin; } } } public void set (Object response) { synchronized (this ){ this .response = response; this .notify(); } } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) { GuardObject guard = new GuardObject (); new Thread (()->{ log.info("begin... get" ); guard.get(2000 ); log.info("end... get" ); },"t1" ).start(); new Thread (()->{ Object response = new Object (); log.info("begin...response" ); try { Thread.sleep(3000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } guard.set(response); log.info("end...response" ); },"t2" ).start(); }
结果
[t1] INFO richuff.top.richu6.A - begin… get [t2] INFO richuff.top.richu6.A - begin…response [t1] INFO richuff.top.richu6.A - end… get [t2] INFO richuff.top.richu6.A -end…response
进一步优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class GuardObject { private Object response; private int id; public GuardObject (int id) { this .id = id; } public int getId () { return id; } public void setId (int id) { this .id = id; } public Object get (int sTime) { long begin = System.currentTimeMillis(); long end = 0 ; synchronized (this ){ while (response == null ){ long hTIme = sTime - end; if (hTIme <= 0 ){ break ; } try { this .wait(hTIme); } catch (InterruptedException e) { e.printStackTrace(); } end = System.currentTimeMillis() - begin; } return response; } } public void set (Object response) { synchronized (this ){ this .response = response; this .notifyAll(); } } }
实现通过id进行接收的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class Post extends Thread { private static final Logger log = LoggerFactory.getLogger(Post.class); private int id; private String msg; public Post (int id, String msg) { this .id = id; this .msg = msg; } @Override public void run () { GuardObject guardObject = Boxes.getGuardById(id); log.info("发送: id{},内容{}" ,guardObject.getId(),msg); guardObject.set(msg); } } class Position extends Thread { private static final Logger log = LoggerFactory.getLogger(Position.class); @Override public void run () { GuardObject guardObject = Boxes.createGuardObject(); log.info("开始接收收: id{}" ,guardObject.getId()); Object o = guardObject.get(5000 ); log.info("收到内容: {}" ,o.toString()); } } class Boxes { private static Map<Integer,GuardObject> boxes = new HashMap <>(); private static int id = 1 ; private static synchronized int generatedId () { return id++; } public static GuardObject createGuardObject () { GuardObject guardObject = new GuardObject (generatedId()); boxes.put(guardObject.getId(),guardObject); return guardObject; } public static Set<Integer> getAll () { return boxes.keySet(); } public static GuardObject getGuardById (int id) { return boxes.remove(id); } }
Post发送消息,Position通过id接收消息,两张间使用Boxes通信
5.ReentrantLock 1.Lock
Lock.lock()打断无效果
Lock.lockInterruptibly();可打断锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static ReentrantLock Lock = new ReentrantLock ();public static void main (String[] args) { Thread t1 = new Thread (()->{ try { log.info("尝试获取锁" ); Lock.lockInterruptibly(); }catch (Exception e){ e.printStackTrace(); log.info("没有获取到锁" ); return ; }try { log.info("获取到锁" ); }finally { Lock.unlock(); } }); Lock.lock(); t1.start(); }
2.锁超时
tryLock尝试加锁,成功上锁返回true,未成功返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static ReentrantLock Lock = new ReentrantLock ();public static void main (String[] args) { Thread t1 = new Thread (()->{ log.info("尝试获取锁" ); if (! Lock.tryLock()){ log.info("获取不到锁" ); return ; } try { log.info("获取到锁" ); }catch (Exception e){ e.printStackTrace(); }finally { Lock.unlock(); } }); Lock.lock(); t1.start(); }
加参数的tryLock() 等待指定时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static ReentrantLock Lock = new ReentrantLock ();public static void main (String[] args) { Thread t1 = new Thread (()->{ log.info("尝试获取锁" ); try { if (! Lock.tryLock(1 , TimeUnit.SECONDS)){ log.info("获取不到锁" ); return ; } }catch (Exception e){ e.printStackTrace(); } try { log.info("获取到锁" ); }catch (Exception e){ e.printStackTrace(); }finally { Lock.unlock(); } }); Lock.lock(); t1.start(); }
3.公平锁
ReentrantLock默认是不公平的锁
1 private static ReentrantLock Lock = new ReentrantLock (true );
4.条件变量
lock.newCondition();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static ReentrantLock lock = new ReentrantLock ();public static void main (String[] args) throws Exception{ Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); lock.lock(); condition1.wait(); condition1.signal(); condition1.signalAll(); }
5.固定运行顺序
t2 比 t1先执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static Logger logger = LoggerFactory.getLogger(Thread.class);public static boolean t2haverun = false ;public static final Object lock = new Object ();public static void main (String[] args) { Thread t1 = new Thread (()->{ synchronized (lock){ while (!t2haverun){ try { lock.wait(); }catch (Exception e){ e.printStackTrace(); } } logger.info("1" ); } },"t1" ); Thread t2 = new Thread (()->{ synchronized (lock){ logger.info("2" ); t2haverun = true ; lock.notify(); } },"t2" ); t1.start(); t2.start(); }
使用park和unpark
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) { Thread t1 = new Thread (()->{ LockSupport.park(); logger.info("1" ); },"t1" ); Thread t2 = new Thread (()->{ logger.info("2" ); LockSupport.unpark(t1); },"t2" ); t1.start(); t2.start(); }
6.循环顺序打印 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class Test { public static void main (String[] args) { WaitNotify wn = new WaitNotify (1 , 5 ); new Thread (() -> { wn.Print("a" , 1 , 2 ); }).start(); new Thread (() -> { wn.Print("b" , 2 , 3 ); }).start(); new Thread (() -> { wn.Print("c" , 3 , 1 ); }).start(); } } class WaitNotify { public void Print (String str, int waitFlag, int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ){ while (falg != waitFlag) { try { this .wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return ; } } System.out.println(str); falg = nextFlag; this .notifyAll(); } } } private int falg; private int loopNumber; public WaitNotify (int falg, int loopNumber) { this .falg = falg; this .loopNumber = loopNumber; } }
6.JMM
JAVA内存模型
原子性 保证指令不受线程上下文切换的影响
可见性 保证指令不会受CPU缓存的影响
有序性 保证指令不会受CPU指令并行优化的影响
1.Unsafe对象的获取
通过反射获取
1 2 3 4 5 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" );theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe) theUnsafe.get(null );System.out.println(unsafe);
2.不可变对象的设计
模拟连接池的连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class Pool { private final int poolSize; private Connection[] connPools; private AtomicIntegerArray states; public Pool (int poolSize) { this .states = new AtomicIntegerArray (new int [poolSize]); this .connPools = new Connection [poolSize]; this .poolSize = poolSize; } public Connection borrow () { while (true ){ for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ){ if (states.compareAndSet(i,0 ,1 )){ return connPools[i]; } } } synchronized (this ){ try { wait(); }catch (Exception e){ e.printStackTrace(); } } } } public void free (Connection conn) { for (int i = 0 ; i < poolSize; i++) { if (conn == connPools[i] && states.get(i) == 1 ){ states.set(i,0 ); synchronized (this ){ this .notifyAll(); } break ; } } } }
3.线程队列的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 public class BlockingQueue <T>{ private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class); private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capcity; public BlockingQueue (int capcity) { this .capcity = capcity; } public T poll (long time, TimeUnit unit) { lock.lock(); try { long nanoTime = unit.toNanos(time); while (queue.isEmpty()){ if (nanoTime <= 0 ){ return null ; } nanoTime = emptyWaitSet.awaitNanos(nanoTime); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } fullWaitSet.signalAll(); return queue.removeFirst(); } public T take () { lock.lock(); try { while (queue.isEmpty()){ emptyWaitSet.await(); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } fullWaitSet.signalAll(); return queue.removeFirst(); } public void put (T t) { lock.lock(); while (queue.size() == capcity){ try { fullWaitSet.await(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } queue.addLast(t); emptyWaitSet.signalAll(); } public int size () { lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } }
4.线程池的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 @Slf4j class TestPool { public static Logger log = LoggerFactory.getLogger(TestPool.class); public static void main (String[] args) { log.info("begin" ); ThreadPool threadPool = new ThreadPool (1000 , 2 , TimeUnit.MILLISECONDS, 10 ); for (int i = 0 ; i < 15 ; i++) { threadPool.excute(()->{ try { Thread.sleep(1000000L ); }catch (Exception e){ e.printStackTrace(); } log.info("test" ); }); } } } public class BlockingQueue <T>{ private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class); private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capcity; public BlockingQueue (int capcity) { this .capcity = capcity; } public T poll (long time, TimeUnit unit) { lock.lock(); try { long nanoTime = unit.toNanos(time); while (queue.isEmpty()){ if (nanoTime <= 0 ){ return null ; } nanoTime = emptyWaitSet.awaitNanos(nanoTime); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } log.info("唤醒线程" ); fullWaitSet.signalAll(); return queue.removeFirst(); } public T take () { lock.lock(); try { while (queue.isEmpty()){ emptyWaitSet.await(); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } log.info("唤醒线程" ); fullWaitSet.signalAll(); return queue.removeFirst(); } public void put (T t) { lock.lock(); try { while (queue.size() == capcity) { try { fullWaitSet.await(); } catch (Exception e) { e.printStackTrace(); } } log.info("加入任务队列" ); queue.addLast(t); emptyWaitSet.signalAll(); } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } } class ThreadPool { private static final Logger log = LoggerFactory.getLogger(ThreadPool.class); private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet <>(); private int currentSize; private long timeout; private TimeUnit unit; public void excute (Runnable task) { synchronized (workers){ if (workers.size() < currentSize){ Worker worker = new Worker (task); log.info("加入工作队列" ); workers.add(worker); worker.start(); }else { taskQueue.put(task); } } } public ThreadPool (long timeout, int currentSize, TimeUnit unit,int capacity) { this .timeout = timeout; this .currentSize = currentSize; this .taskQueue = new BlockingQueue <>(capacity); this .unit = unit; } class Worker extends Thread { private Runnable task; Worker(Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout,TimeUnit.MILLISECONDS)) != null ) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { log.info("移除worker{}" , this ); workers.remove(this ); } } @Override public String toString () { return super .toString(); } } }
5.带拒绝策略的线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 class TestPool { public static Logger log = LoggerFactory.getLogger(TestPool.class); public static void main (String[] args) { log.info("begin" ); ThreadPool threadPool = new ThreadPool (1000 , 2 , TimeUnit.MILLISECONDS, 10 ,(queue,task)->{ queue.offer(task,1000 ,TimeUnit.MILLISECONDS); }); for (int i = 0 ; i < 15 ; i++) { threadPool.excute(()->{ try { Thread.sleep(20000L ); }catch (Exception e){ e.printStackTrace(); } log.info("test" ); }); } } } @FunctionalInterface interface RejectPolicy <T>{ void reject (BlockingQueue<T> queue,T task) ; } public class BlockingQueue <T>{ private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class); private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capcity; public BlockingQueue (int capcity) { this .capcity = capcity; } public T poll (long time, TimeUnit unit) { lock.lock(); try { long nanoTime = unit.toNanos(time); while (queue.isEmpty()){ if (nanoTime <= 0 ){ return null ; } nanoTime = emptyWaitSet.awaitNanos(nanoTime); } log.info("唤醒线程" ); fullWaitSet.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return queue.removeFirst(); } public T take () { lock.lock(); try { while (queue.isEmpty()){ emptyWaitSet.await(); } log.info("唤醒线程" ); fullWaitSet.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return queue.removeFirst(); } public void put (T t) { lock.lock(); try { while (queue.size() == capcity) { try { fullWaitSet.await(); } catch (Exception e) { e.printStackTrace(); } } log.info("加入任务队列" ); queue.addLast(t); emptyWaitSet.signalAll(); } finally { lock.unlock(); } } public boolean offer (T t,long time,TimeUnit unit) { lock.lock(); try { long nanoTime = unit.toNanos(time); while (queue.size() == capcity) { try { if (nanoTime <= 0 ){ log.info("任务超时" ); return false ; } nanoTime = fullWaitSet.awaitNanos(nanoTime); } catch (Exception e) { e.printStackTrace(); } } log.info("加入任务队列" ); queue.addLast(t); emptyWaitSet.signalAll(); } finally { lock.unlock(); } return true ; } public int size () { lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capcity){ rejectPolicy.reject(this ,task); }else { log.info("加入任务队列" ); queue.addLast(task); emptyWaitSet.signalAll(); } }finally { lock.unlock(); } } } class ThreadPool { private static final Logger log = LoggerFactory.getLogger(ThreadPool.class); private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet <>(); private int currentSize; private long timeout; private TimeUnit unit; private RejectPolicy<Runnable> rejectPolicy; public void excute (Runnable task) { synchronized (workers){ if (workers.size() < currentSize){ Worker worker = new Worker (task); log.info("加入工作队列" ); workers.add(worker); worker.start(); }else { taskQueue.tryPut(rejectPolicy,task); } } } public ThreadPool (long timeout, int currentSize, TimeUnit unit,int capacity,RejectPolicy<Runnable> rejectPolicy) { this .timeout = timeout; this .currentSize = currentSize; this .taskQueue = new BlockingQueue <>(capacity); this .unit = unit; this .rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; Worker(Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout,TimeUnit.MILLISECONDS)) != null ) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { log.info("移除worker{}" , this ); workers.remove(this ); } } @Override public String toString () { return super .toString(); } } }
6.ThreadPoolExcutor 1.newFixedThreadPool
线程池的线程数是固定的,即使任务数量增加,线程池中的线程数也不会超过指定的数量。线程池中的线程是复用的,当某个线程完成了任务,它可以继续执行下一个任务,而不需要重新创建线程 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ExecutorService pool = Executors.newFixedThreadPool(2 , new ThreadFactory () { private AtomicInteger t = new AtomicInteger (1 ); @Override public Thread newThread (Runnable r) { return new Thread ( r,"my_thread" +t.getAndIncrement() ); } }); pool.execute(()->{ log.info("1" ); }); pool.execute(()->{ log.info("2" ); }); pool.execute(()->{ log.info("3" ); });
2.CachedThreadPool 1 ExecutorService service = Executors.newCachedThreadPool();
3.提交任务
excute() 提交Runable
submit() 提交Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService pool = Executors.newFixedThreadPool(2 );Future<String> future = pool.submit(()->{ Thread.sleep(1000 ); return "hhh" ; }); log.info(future.get());
invokeAll() 执行其中所有的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ExecutorService pool = Executors.newFixedThreadPool(2 );List<Future<String>> futures = pool.invokeAll(Arrays.asList( () -> { Thread.sleep(500 ); return "1" ; }, () -> { Thread.sleep(1000 ); return "2" ; }, () -> { Thread.sleep(2000 ); return "3" ; } )); for (Future<String> future:futures){ log.info(future.get()); }
invokeAny() 哪个任务先完成,就直接返回任务结果,其他的任务终止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ExecutorService pool = Executors.newFixedThreadPool(2 );String res = pool.invokeAny(Arrays.asList( () -> { Thread.sleep(500 ); return "1" ; }, () -> { Thread.sleep(1000 ); return "2" ; }, () -> { Thread.sleep(2000 ); return "3" ; } )); log.info("{}" ,res);
7.newScheduledThreadPool 1.scheduleAtFixedRate 1 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
command : 一个实现了 Runnable
接口的无参数函数,它将在指定的时间间隔内重复执行。
initialDelay : 首次执行前的初始延迟时间。如果设置为0,则任务将立即执行。
period : 连续执行之间的时间间隔。在这个例子中,任务将每隔给定的时间间隔执行一次。
unit : period
参数的时间单位,TimeUnit
是一个枚举,定义了时间的单位(如天、小时、分钟、秒、毫秒等)。
2.schedule 1 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
command : 一个实现了 Runnable
接口的无参数函数,它将在指定的延迟后执行一次。
delay : 首次执行的延迟时间。这个时间是从现在开始计算的。
unit : delay
参数的时间单位,TimeUnit
是一个枚举,定义了时间的单位(如天、小时、分钟、秒、毫秒等)。
1 2 3 4 pool.schedule(()->{ log.info("test" ); },1 ,TimeUnit.SECONDS);
3.scheduleWithFixedDelay
command
:这是一个Runnable
任务,它将在线程池中的某个线程上执行。
initialDelay
:这是在首次执行任务之前的初始延迟时间。在这个例子中,initialDelay
的值是1秒,表示任务将在启动线程池后的1秒开始执行。
delay
:这是在每次任务执行完毕后,到下一次任务开始执行之前的固定延迟时间。在这个例子中,delay
的值也是1秒,意味着每次任务执行完成后,都会等待1秒再开始下一次执行。
unit
:这是initialDelay
和delay
参数的时间单位。在这个例子中,TimeUnit.SECONDS
表示时间单位是秒。
1 2 3 4 pool.scheduleWithFixedDelay(()->{ log.info("fixed" ); },1 ,1 ,TimeUnit.SECONDS);
8.每周执行一次任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 LocalDateTime localDateTime = LocalDateTime.now();LocalDateTime dateTime = localDateTime.withHour(16 ).withMinute(0 ).withSecond(0 ).with(DayOfWeek.MONDAY);if (localDateTime.isAfter(dateTime)){ dateTime.plusWeeks(1 ); } long initialDelay = Duration.between(localDateTime,dateTime).toMillis();long period = 1000 * 60 * 60 * 24 * 7 ;ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 );pool.scheduleAtFixedRate(()->{ log.info("test" ); },initialDelay,period, TimeUnit.MILLISECONDS);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class MRennterLock { public static Logger log = LoggerFactory.getLogger(MRennterLock.class); public static RFLock lock = new RFLock (); public static void main (String[] args) { new Thread (()->{ log.info("t1 begin.." ); lock.lock(); try { Thread.sleep(3000 ); }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); log.info("t1 end..." ); } },"t1" ).start(); new Thread (()->{ log.info("t2 begin.." ); lock.lock(); try { Thread.sleep(1000 ); }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); log.info("t2 end..." ); } },"t2" ).start(); } } class RFLock implements Lock { static class ReRFLock extends AbstractQueuedSynchronizer { @Override protected boolean tryRelease (int arg) { if (compareAndSetState(0 ,1 )){ setExclusiveOwnerThread(Thread.currentThread()); return true ; }else { return false ; } } @Override protected boolean tryAcquire (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public Condition newCondition () { return new ConditionObject (); } @Override public String toString () { return super .toString(); } } private final ReRFLock reRFLock = new ReRFLock (); @Override public void lock () { reRFLock.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { reRFLock.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return reRFLock.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return reRFLock.tryAcquireSharedNanos(1 , unit.toNanos(time)); } @Override public void unlock () { reRFLock.release(1 ); } @Override public Condition newCondition () { return reRFLock.newCondition(); } }
7.动态代理 1.介绍 在 Java 动态代理机制中 InvocationHandler
接口和 Proxy
类是核心。**
Proxy
类中使用频率最高的方法是:newProxyInstance()
,这个方法主要用来生成一个代理对象。
1 2 3 4 5 6 7 public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException { ...... }
这个方法一共有 3 个参数:
loader :类加载器,用于加载代理对象。
interfaces : 被代理类实现的一些接口;
h : 实现了 InvocationHandler
接口的对象;
要实现动态代理的话,还必须需要实现InvocationHandler
来自定义处理逻辑。 当我们的动态代理对象调用一个方法时,这个方法的调用就会被转发到实现InvocationHandler
接口类的 invoke
方法来调用。
1 2 3 4 5 6 7 8 public interface InvocationHandler { /** * 当你使用代理对象调用方法的时候实际会调用到这个方法 */ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable; }
invoke()
方法有下面三个参数:
proxy :动态生成的代理类
method : 与代理类对象调用的方法相对应
args : 当前 method 方法的参数
也就是说:你通过Proxy
类的 newProxyInstance()
创建的代理对象在调用方法的时候,实际会调用到实现InvocationHandler
接口的类的 invoke()
方法。 你可以在 invoke()
方法中自定义处理逻辑,比如在方法执行前后做什么事情。
2. JDK 动态代理类使用步骤
定义一个接口及其实现类;
自定义 InvocationHandler
并重写invoke
方法,在 invoke
方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑;
通过 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h)
方法创建代理对象;
3. 代码示例 这样说可能会有点空洞和难以理解,我上个例子,大家感受一下吧!
1.定义发送短信的接口
1 2 3 public interface SmsService { String send(String message); }
2.实现发送短信的接口
1 2 3 4 5 6 public class SmsServiceImpl implements SmsService { public String send(String message) { System.out.println("send message:" + message); return message; } }
3.定义一个 JDK 动态代理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; /** * @author shuang.kou * @createTime 2020年05月11日 11:23:00 */ public class DebugInvocationHandler implements InvocationHandler { /** * 代理类中的真实对象 */ private final Object target; public DebugInvocationHandler(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws InvocationTargetException, IllegalAccessException { //调用方法之前,我们可以添加自己的操作 System.out.println("before method " + method.getName()); Object result = method.invoke(target, args); //调用方法之后,我们同样可以添加自己的操作 System.out.println("after method " + method.getName()); return result; } }
invoke()
方法: 当我们的动态代理对象调用原生方法的时候,最终实际上调用到的是 invoke()
方法,然后 invoke()
方法代替我们去调用了被代理对象的原生方法。
4.获取代理对象的工厂类 1 2 3 4 5 6 7 8 9 public class JdkProxyFactory { public static Object getProxy(Object target) { return Proxy.newProxyInstance( target.getClass().getClassLoader(), // 目标类的类加载器 target.getClass().getInterfaces(), // 代理需要实现的接口,可指定多个 new DebugInvocationHandler(target) // 代理对象对应的自定义 InvocationHandler ); } }
getProxy()
:主要通过Proxy.newProxyInstance()
方法获取某个类的代理对象
5.实际使用 1 2 SmsService smsService = (SmsService) JdkProxyFactory.getProxy(new SmsServiceImpl()); smsService.send("java");
运行上述代码之后,控制台打印出:
1 2 3 before method send send message:java after method send