JAVA并发编程

反射和代理

反射和动态代理使用场景非常广。

动态代理:动态代理是Java的一种机制,可以在运行时创建一个代理对象,而不需要在编译时就确定代理类。

反射是指在运行时检查类的信息(如方法、属性、构造函数等)并操作它们的能力。

场景场景有:

  • AOP切面
  • 懒加载
  • 权限控制
  • 日志记录
  • 远程代理
  • 序列化和反序列化
  • 配置文件解析
  • 动态调用等

具体就是,在运行时创建代理对象,以实现方法调用的拦截和增强功能。在运行时检查和操作类的属性和方法,实现对对象的动态访问。

接下来一步一步看看

定义接口

public interface IUserApi {
    String queryUserInfo();
}

实现接口

public class UserApi implements IUserApi {
    @Override
    public String queryUserInfo() {
        return "这个一个用户信息";
    }
}

简单的反射

public class Test_reflect {
    public static void main(String[] args) throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
        // 获取类
        Class<UserApi> userApiClass = UserApi.class;
        // 获取类的方法
        Method queryUserInfo = userApiClass.getMethod("queryUserInfo");
        // 调用方法创建实例
        Object invoke = queryUserInfo.invoke(userApiClass.newInstance());
        // 输出值 Object会类型转为String
        System.out.println(invoke);
    }
}

image-20250108111927102

JDK代理方式

public class JDKProxy {
    // <T> 表示一个泛型类型参数 表示该方法是一个泛型方法
    public static <T> T getProxy(Class clazz) throws Exception {
        // 获得类加载器
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        return (T) Proxy.newProxyInstance(classLoader, new Class[]{clazz}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                System.out.println(method.getName() + " 你被代理了,By JDKProxy!");
                return "现在信息已经被更改为: 这个是JDK代理信息";
            }
        });
    }
}

测试

 IUsetApi userApi = JDKProxy.getProxy(IUsetApi.class);
 String invoke = userApi.queryUserInfo();
 System.out.println(invoke);

image-20250108113310496

能够发现,原来的queryUserInfo函数内部已经被修改。注意是只是在当前代理中被修改。

这里需要详细解释一下为什么传入一个接口,能够创建出具体的实例。因为普通接口本身不能直接实例化。

实际上就是 调用 Proxy.newProxyInstance 方法来创建一个代理实例时,JDK 会根据提供的接口类型动态生成一个新的类,这个类会实现你指定的接口。

  • 提供接口: 在创建代理实例时,你必须提供接口类型。
  • 动态生成代理类 : JDK 在运行时根据提供的接口,生成一个新的类.。这个代理类将在執行中产生,并且是在内存中动态生成的,并不需要在编译时显式定义。
  • 实现InvocationHandler: 这个方法在代理对象调用的接口方法时被调用。
  • 调用方法:当你调用代理对象的方法(例如 userApi.queryUserInfo())时,实际上调用的是被动态生成的代理类中的实现,这个实现会转发调用到你在 InvocationHandler 中定义的 invoke 方法

经常的场景为: 中间件开发、设计模式中代理模式和装饰器模式应用。

CGLIB代理方式

public class CglibProxy implements MethodInterceptor {
    public Object newInstall(Object object) {
        return Enhancer.create(object.getClass(), this);
    }
    public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
        System.out.println("我被CglibProxy代理了");
        return methodProxy.invokeSuper(o, objects);
    }
}

测试

CglibProxy cglibProxy = new CglibProxy();
UserApi userApi = (UserApi) cglibProxy.newInstall(new UserApi());
String invoke = userApi.queryUserInfo();

采用ASM字节码框架在类中修改指令码实现代理,不需要传入接口,比JDKPRoxy快1.5-2倍。

经常的场景为: Spring 、AOP切面、鉴权服务、中间件开发、RPC框架等

三种字节码操作框架

ASMProxy 、ByteBuddyProxy 、JavassistProxy ,等后续有空再做进一步学习。

什么是线程和进程?

线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈

进程和线程的生命周期和状态

进程:

  • 创建
  • 就绪
    • 就绪挂起:移除主存到辅助存储
  • 运行
  • 阻塞
    • 阻塞挂起:移除主存到辅助存储
  • 终止

线程:

  • 初始
  • 运行
    • 运行中
    • 就绪
  • 等待
    • Waiting : 进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。
    • Timed_Wating : 当超时时间结束后,线程将会返回到 RUNNABLE 状态。
  • 阻塞
  • 终止

死锁

多线程请求共享进程中的堆和方法区资源,会存在死锁,即多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。

经典一个面试题: 手写一个死锁代码

public class DeadLock {
    private static Object resource1 = new Object();
    private static Object resource2 = new Object();


    public static void main(String[] args) {

        new Thread(
                () -> {
                    synchronized (resource1){
                        System.out.println(Thread.currentThread() + "取得资源1");
                        try {
                            Thread.sleep(1000);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread() + "等待资源2");
                        synchronized (resource2){
                            System.out.println(Thread.currentThread() + "取得资源2");
                        }
                    }

                } , "线程1"
        ).start();

        new Thread(
                () -> {
                    synchronized (resource2){
                        System.out.println(Thread.currentThread() + "取得资源2");
                        try {
                            Thread.sleep(1000);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread() + "等待资源1");
                        synchronized (resource1){
                            System.out.println(Thread.currentThread() + "取得资源1");
                        }
                    }
                }, "线程2"
        ).start();;
    }
}

image-20250107150249600

一直死锁。产生死锁的四个必要条件

  • 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
  • 互斥条件:该资源任意一个时刻只由一个线程占用。
  • 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
  • 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。

多线程

几种多线程实现方式

  1. 继承 Thread: 通过继承 java.lang.Thread 类并重写其 run() 方法来定义线程的执行逻辑
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running");
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread thread1 = new MyThread();
        thread1.start();
        
        MyThread thread2 = new MyThread();
        thread2.start();
    }
}
  1. 实现 Runnable 接口: 实现 Runnable 接口,并将其实现类作为 Thread 对象的参数
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running");
    }
}

public class Main {
    public static void main(String[] args) {
        Thread thread1 = new Thread(new MyRunnable());
        thread1.start();
        
        Thread thread2 = new Thread(new MyRunnable());
        thread2.start();
    }
}
  1. Callable 接口与 Runnable 的功能类似,但它可以有返回值,并能够抛出异常。通常与 FutureFutureTask 搭配使用。
public class CallableFutureTaskExample {
    public static void main(String[] args) {

        // 定义一个 Callable 任务 (任务需要返回一个值)
        Callable<Integer> task = () -> {
            System.out.println(Thread.currentThread().getName() + ": is executing...");
            int sum = 0;
            for (int i = 1; i <= 10; i++) {
                sum += i;
                Thread.sleep(100); // 模拟耗时计算
            }
            return sum; // 返回计算结果
        };

        // 使用 FutureTask 包装 Callable
        FutureTask<Integer> futureTask = new FutureTask<>(task);

        // 使用 Thread 启动线程并执行任务
        Thread thread = new Thread(futureTask);
        thread.start();

        // 主线程可以继续做其他事情
        System.out.println("Main thread is doing something else...");

        // 获取 Callable 的返回结果 (会阻塞直到任务执行完成)
        try {
            int result = futureTask.get(); // 阻塞,直到任务执行完成并返回结果
            System.out.println("Result from Callable: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("Main thread finished!");
    }
}
  1. Executor : 一种线程池管理方案,用于管理和复用线程
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running");
    }
}

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new MyRunnable());
        executorService.submit(new MyRunnable());
        
        // 关闭线程池
        executorService.shutdown();
    }
}
  1. Lambda 表达式
public class Main {
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> System.out.println(Thread.currentThread().getName() + " is running"));
        thread1.start();
        
        Thread thread2 = new Thread(() -> System.out.println(Thread.currentThread().getName() + " is running"));
        thread2.start();
    }
}

经典多线程算法题:多线程实现1百万个随机数据数组求和

public class FastAdd {

    // 数据数组长度
    private static final int ARRAY_SIZE = 1_000_000;

    // 线程数
    private static final int NUM_THREADS = 4;

    // 用于保存数组数据
    private static int[] data = new int[ARRAY_SIZE];

    public static void main(String[] args) {
        // 初始化随机数据
        Random random = new Random();
        for (int i = 0; i < ARRAY_SIZE; i++) {
            data[i] = random.nextInt(100) + 1; // 生成 1-100 的随机整数
        }

        // 划分数据分片
        int chunkSize = ARRAY_SIZE / NUM_THREADS;

        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);

        // 原子操作
        AtomicInteger totalSum = new AtomicInteger(0);

        for(int i=0; i<NUM_THREADS; i++){
            final int start = i * chunkSize;
            final int end = (i + 1)  * chunkSize;
            // 线程池执行任务
            executorService.submit(
                    () -> {
                        int partialSum = 0;
                        for(int j = start; j < end; j++){
                            partialSum += data[j];
                        }
                        // 原子操作 , 线程安全
                        totalSum.addAndGet(partialSum);
                    }
            );
        }

        executorService.shutdown();

        System.out.println("Total Sum: " + totalSum);
    }
}

经典多线程算法题:两个线程奇偶输出

public class OddAdd {
    static final Object lock = new Object();
    static int count  = 1;
    static final  int total = 100;
    public static void main(String[] args) {
        new Thread(
                () -> {
                    synchronized (lock){
                        while (count < total){
                            if((count & 1) != 0){
                                System.out.println(Thread.currentThread().getName() + ":" + count);
                                count++;
                                lock.notify();
                            }else {
                                try {
                                    lock.wait();
                                }catch (InterruptedException e){
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                }, "线程1"
        ).start();
        new Thread(
                () -> {
                    synchronized (lock){
                        while (count < total){
                            if((count & 1) == 0){
                                System.out.println(Thread.currentThread().getName() + ":" + count);
                                count++;
                                lock.notify();
                            }else {
                                try {
                                    lock.wait();
                                }catch (InterruptedException e){
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                }, "线程2"
        ).start();
    }
}

为什么使用count本身作为锁对象的原因在于,锁只能锁对象,引用类型,并且如果为Integer,每次进行++会创建新得Integer对象,造成锁失效。

乐观锁和悲观锁

悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。

适合多写少读

// 方法1
public void performSynchronisedTask() {
    synchronized (this) {
        // 需要同步的操作
    }
}
// 方法2
private Lock lock = new ReentrantLock();
lock.lock();
try {
   // 需要同步的操作
} finally {
    lock.unlock();
}

乐观锁总是假设最好的情况,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了(具体方法可以使用版本号机制或 CAS 算法)。

适合多读少写

乐观锁版本号法

一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会加一。当线程 A 要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值为当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。

乐观锁CAS法

CAS 的全称是 Compare And Swap(比较与交换) ,用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。

存在ABA问题,可以通过版本号或者时间戳解决。

volatile

volatile保证可见性的原理

原理:当使用volatile修饰时,线程一一旦对自身拷贝的CPU内存中公共变量进行修改后,会直接在Main内存上进行修改,将其刷新,当线程2获取值的时候,会把自己CPU内存的变量进行过期,之后从主内存读取。涉及一点点JMM知识,补充一手。通过内存屏障(Memnory Barrier)来实现的。

image-20250108145309837

volatile 能够防止指令重排

原理是:JVM会对指令进行优化,加快并行速度,对指令重新编排。这个命令就是禁止。

事实上,不加volatile也有可见性, 由JVM来保证的,但不会时延低,没得强制刷新。

synchronized

主要解决的是多个线程之间访问资源的同步性,可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。

  1. 修饰实例方法: 给当前对象实例加锁,进入同步代码前要获得 当前对象实例的锁
  2. 修饰静态方法:给当前类加锁,会作用于类的所有对象实例 ,进入同步代码前要获得 当前 class 的锁。(非互斥)
  3. 修饰代码块: 对括号里指定的对象/类加锁:

四大特性

  • 原子性: synchronized 可以保证统一时间只有一个线程能拿到锁,进入到代码块执行

  • 可见性: 线程A修改值后,线程B使用此变量可以做出相应的反应,比如 while(!变量) 退出。

    原理:线程解锁前,会把共享变量的最新值刷新到主内存。线程加锁前,会清空工作区的共享变量,从主存中重新读取最新值。靠操作系统内核互斥锁实现。

    public static boolean sign = false;
    public static void main(String[] args) {
        Thread Thread01 = new Thread(() -> {
            int i = 0;
            while (!sign) {
                i++;
                add(i);
            }
        });
        Thread Thread02 = new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ignore) {
            }
            sign = true;
            logger.info("vt.sign = true  while (!sign)")
        });
        Thread01.start();
        Thread02.start();
    }
    
    public static synchronized  int add(int i) {
        return i + 1;
    }
    

    这里解释一下,刚开始线程1、2在启动之后会各自拷贝sign变量的副本,线程2在执行结束会将副本值进行刷新到主内存。而线程1在一直的循环执行过程中,因为synchronized的缘故会不断清空副本,然后从主内存中重新拉取值,所以当线程2执行结束,之后能够立即结束。

  • 有序性:保证不管编译器和处理器为了性能优化会如何进行指令重排序,都需要保证单线程下的运行结果的正确性。 synchronized不能保证指令重排, 一般要配和volatile关键字保持安全。若没有使用 volatile,可能会导致一个线程获得一个未完全初始化的实例,进而引发潜在的错误和程序不稳定性。

示例: 假设有两个线程同时执行 getInstance() 方法:

  1. 线程 A 发现 instance 为 null,于是进入同步块。
  2. 线程 A 创建了一个 Singleton 对象,但在构造器完成之前,线程 A 被挂起。
  3. 线程 B 进入 getInstance() 方法,发现 instance 现在非 null,于是返回 instance
  4. 线程 B 持有了一个尚未完成初始化的 Singleton 对象。
  • 可重入性:允许一个线程二次请求自己持有对象锁的临界资源,这种情况称为可重入锁。原理是,内部存在计数器,会记录获取锁的次数,执行完次数-1, 直到为0,释放锁。

    关于进一步的synchronized的锁升级过程,有空再进一步学习。

ReentrantLock

ReentrantLock 是一个可重入且独占式锁,具有与 synchronized 监视器(monitor enter、monitor exit)锁基本相同的行为和语意。但与 synchronized 相比,它更加灵活、强大、增加了轮询、超时、中断等高级功能以及可以创建公平和非公平锁。

支持超时等待可中断可实现公平锁

ThreadLocal

ThreadLocal是Java中用于解决线程安全问题的一种机制,它允许创建线程局部变量,即每个线程都有自己独立的变量副本,从而避免了线程间的资源共享和同步问题。

在线程内部,可以发现ThreadLocal内部是由ThreadLocalMap来实现的

public class Thread implements Runnable {
	/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
 }

对于ThreadLoad的set方法源码如下

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            //调用内部的方法进行set
            map.set(this, value);
        } else {
            createMap(t, value);
        }
    }
// ThreadLocalMap 是 ThreadLocal的一个静态内部类
ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

进一步看ThreadLocalMap的set方法,

static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
}
private void set(ThreadLocal<?> key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }
  • 可以发现底层是一个hashmap类型的key,value数组, 其index计算也类似
  • 每一个Entry节点由ThreadLocal 和 value组成

通过以上源码,我们便能发现ThreadLocal存在内存泄漏(即无法访问、也无法被回收)的原因,

每个线程Thread有自己的ThreadLocalMap的map,map是一个Entry数组,Entry是把ThreadLocal本身作为key,存储的值作为value。在一个ThreadLocal使用过程中,ThreadLocal不去使用后,因为ThreadLocalMap 中的 key 是 ThreadLocal的弱引用 WeakReference<ThreadLocal<?>>会被GC回收,value是强引用,就算key被回收,value任然存在。但是整个线程Thread可能因为线程池的原因一直没有关闭,所以导致存在value这个即无法访问、也无法被回收, 造成内存泄漏。

就算ThreadLocal不作为弱引用,那么内存泄漏将更多。

image-20250108172410114

实际上,在ThreadLocal内部,如果再次访问这个ThreadLocal就会进行回收, 但如果一直不访问,问题任然会发生。

private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

补充知识点

  • WeakReference意味着,如果 ThreadLocal实例不再被任何强引用指向,垃圾回收器会在下次 GC 时回收该实例,导致ThreadLocalMap中对应的 key 变为 null

  • 强引用: 在Java中,强引用是默认的引用类型。当一个对象被强引用时,垃圾回收器永远不会回收这个对象。即使系统内存不足,只有强引用存在,垃圾回收器也不会释放这个对象。

  • 弱引用是在 Java 中通过 java.lang.ref.WeakReference 类来实现的。与强引用不同,当对象只被弱引用引用时,垃圾回收器可以回收该对象。通常情况下,弱引用用于实现缓存、监听器、或者其他临时性的数据结构。

解决方案:

在使用完ThreadLocal之后,及时进行remove

线程池

线程池的核心目的就是资源的利用,避免重复创建线程带来的资源消耗。因此引入一个池化技术的思想,避免重复创建、销毁带来的性能开销。

手写一个简单的线程池

核心流程

  1. 创建线程池的大小--n个一直运行的线程
  2. 把线程提交给线程池运行
  3. 如果运行线程数量大于核心线程数,则把线程放入队列中
  4. 如果队列中容量已经满了,则判断当前运行的线程数是否小于设定的最大线程数。如果小于则创建线程, 大于则走拒绝策略。
  5. 当有空闲时,获取队列中等待的线程运行

image-20250108194752501

public class ThreadPoolTrader implements Executor {
	//用于记录线程池中线程数量
    private final AtomicInteger ctl = new AtomicInteger(0);
	// 线程池线程数
    private volatile int corePoolSize;
    // 线程池最大线程数
    private volatile int maximumPoolSize;
	//线程池队列
    private final BlockingQueue<Runnable> workQueue;

    public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }
	
    
    // 用于提交线程
    @Override
    public void execute(Runnable command) {
        int c = ctl.get();
        if (c < corePoolSize) {
            if (!addWorker(command)) {
                reject();
            }
            return;
        }
        if (!workQueue.offer(command)) {
            if (!addWorker(command)) {
                reject();
            }
        }
    }
	
    // 创建并执行线程
    private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;

        Worker worker = new Worker(firstTask);
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }
    //拒绝策略
    private void reject() {
        throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
    }

    private final class Worker implements Runnable {

        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run() {
            Runnable task = firstTask;
            try {
                while (task != null || (task = getTask()) != null) {
                    task.run();
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null;
                }
            } finally {
                ctl.decrementAndGet();
            }
        }
		
        // 从队列中不断的获取未被执行的线程
        private Runnable getTask() {
            for (; ; ) {
                try {
                    System.out.println("workQueue.size:" + workQueue.size());
                    return workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }



    public static void main(String[] args) {
        ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 20; i++) {
            int finalI = i;
            threadPoolTrader.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务编号:" + finalI);
            });
        }
    }

}

四个拒绝的策略

  • AbortPolicy(抛异常方式拒绝):直接抛出一个任务被线程池拒绝的异常。
  • DiscardPolicy:不做任何处理,静默拒绝提交的任务
  • DiscardOldestPolicy: 丢弃存活时间最长的任务
  • CallerRunsPolicy: 提交的线程进行拒绝

线程异常后,是否销毁和复用

  • 使用execute提交任务进行销毁:异常会直接导致线程终止
  • 使用submit提交任务:异常会被Future封装

线程命名

  • 使用guava的ThreaFactoryBuilder
  • 实现ThreadFactory工厂进行,封装一个名字

如何根据任务优先级进行执行

  1. 通过PriorityBlockingQueue队列 来作为阻塞队列
  2. 提交任务实现Comparable接口,重写compareTo方法来比较
  3. 创建PriorityBlockingQueue的时候传入Comparator对象(推荐)

进一步存在的问题及其解决:

  1. OOM:重写PriorityBlockingQueue的offer方法设置阈值丢弃逻辑
  2. 饥饿问题(低优先级任务一直得不到解决):等待时间比较长的任务间隔一段时间提高优先级
  3. 使用ReentrantLock保持排序和线程安全带来的性能损失

CompletableFuture

CompletableFuture 还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

经典场景就是,一个任务依赖两个任务执行完执行,可以采用CompletableFuture.allOf()等待两个任务执行完毕

当一个任务失败的时候,解决方案有:

  • handle方法,返回新结果
  • whenComplete触发回调,并进行异常处理
  • exceptionally处理异常并抛出
  • CompletableFuture.allOf

为什么要使用自定义的线程池:

  • Completabel自己具有全局共享的线程池,所有任务都会默认执行该线程池,导致资源不够用。

AQS

AQS全称AbstractQueuedSynchronizer, 是一个抽象队列同步器,主要用来构建锁和同步器,常用的ReentrantLock等都是基于AQS实现。

image-20250109104813603

AQS的基本思想和上面的都差不多,就是如果空余就执行并占用资源,如果占用即进入队列等待被唤醒。AQS采用CLH(Craig,Landin,and Hagersten) 队列来实现这个机制。

CLH是一个双向链表组成的队列,每一个请求的线程会被封装成一个CLH的节点。

image-20250109110711306

而对于AQS整体来说,原理如下:

image-20250109110850524

private volatile int state;

AQS 使用 int 成员变量 state表示同步状态,通过内置的线程等待队列来完成获取资源线程的排队工作。

通过CAS原子操作能够安全的设置state

 protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

CountDownLatch的实现就是利用了AQS的这个state,每次执行完就state-1,直到state=0。

Semaphore的实现也是借用这个,主要是来控制访问特定资源的线程数。

参考

https://bugstack.cn/

https://javaguide.cn/

gpt