1 同步与异步

同步和异步的概念与多线程编程有关,但并不仅限于多线程。它们是一种处理任务执行顺序和并发性的编程模型。

1.1 同步(Synchronous)

同步是指按照顺序执行任务,一个任务完成后才能开始下一个任务。在多线程环境中,同步通常涉及到线程之间的协调和同步执行,以确保共享资源的安全访问。

在同步编程中,任务之间存在依赖关系,某个任务的完成可能会导致另一个任务的开始。在单线程情况下,同步一般是默认的执行模式。在多线程环境中,同步更加重要,因为多个线程同时访问共享资源可能导致竞态条件和数据不一致性问题。

1.2 异步(Asynchronous)

异步是指任务可以独立于主程序执行,并且不必等待前一个任务的完成。在异步编程中,任务的完成顺序和开始顺序不一定一致,允许程序在等待某些任务完成的同时执行其他任务。

异步编程通常涉及到回调函数、事件驱动编程、Future/Promise等机制。在多线程环境中,异步通常涉及到多个线程执行不同的任务,但并不要求这些线程严格按照某个顺序执行。

1.3 多线程与同步/异步的关系

在多线程编程中,同步和异步的概念更为突出,因为多线程涉及到共享资源的访问、线程之间的协调等问题。同步用于避免多个线程同时访问共享资源而导致的数据不一致性,而异步则用于提高程序的并发性和响应性。

总的来说,同步和异步是一种任务执行的模型,多线程环境中它们更加显著,但并不局限于多线程。在单线程和多线程环境中,都可以使用同步和异步编程模型。

2. 同步

2.1 synchronized关键字

synchronized 是 Java 中用于实现同步的关键字,它提供了一种简单而有效的机制,用于控制对共享资源的访问,以确保多线程环境下的数据一致性。

public class SynchronizedExample {
    private int counter = 0;
​
    // 同步方法
    public synchronized void increment() {
        counter++;
    }
}

2.2 Lock锁

Lock 接口是 Java 并发包中提供的一种用于实现锁的机制。相较于传统的 synchronized 关键字,Lock 提供了更灵活、更可扩展的锁实现。最常见的实现类是 ReentrantLock(可重入锁)

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
public class LockExample {
    private int counter = 0;
    private Lock lock = new ReentrantLock();
​
    public void increment() {
        lock.lock(); // 获取锁
        try {
            counter++;
        } finally {
            lock.unlock(); // 释放锁,确保在发生异常时也能释放锁
        }
    }
}

3 异步

3.1 回调函数

通过回调函数(Callback)实现异步操作的一种方式。在异步操作完成时,系统会调用预定义的回调函数。

public interface Callback {
    void onComplete(String result);
}
​
public class AsyncOperation {
    public static void performAsyncOperation(Callback callback) {
        // 异步操作
        ......
        // 完成后调用回调函数
        callback.onComplete("Operation completed");
    }
}
​
// 使用示例
AsyncOperation.performAsyncOperation(result -> {
    System.out.println(result);
});

3.2 通过多线程

3.2.1 无返回值(Runable)

Runnable 是一个接口,用于表示可以通过线程执行的任务。Runnable 接口中定义了一个方法 run(),该方法包含需要执行的任务代码。通过实现 Runnable 接口,可以创建可以在线程中运行的类。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
​
public class ThreadPoolRunnableExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池,包含两个线程
        ExecutorService executor = Executors.newFixedThreadPool(2);
​
        // 提交两个 Runnable 任务给线程池执行
        executor.submit(new MyRunnable("Task 1"));
        executor.submit(new MyRunnable("Task 2"));
​
        // 关闭线程池
        executor.shutdown();
    }
}
​
class MyRunnable implements Runnable {
    private String taskName;
​
    public MyRunnable(String taskName) {
        this.taskName = taskName;
    }
​
    @Override
    public void run() {
        // 执行任务的代码
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + taskName + " - " + i);
        }
    }
}

3.2.2 有返回值

3.2.2.1 Future

Future 是一个接口,位于 java.util.concurrent 包中,用于表示异步计算的结果。Future 提供了一种机制,允许在未来的某个时刻获取操作的实际结果。

  • get() 方法: 该方法用于获取异步操作的结果。如果异步操作尚未完成,get() 方法将阻塞,直到操作完成为止。

  • get(long timeout, TimeUnit unit) 方法: 类似于 get() 方法,但允许设置最大等待时间,如果在指定时间内未完成,将抛出 TimeoutException

  • isDone() 方法: 该方法返回一个布尔值,指示异步操作是否已经完成。

  • cancel(boolean mayInterruptIfRunning) 方法: 用于取消异步操作。如果操作尚未开始,它将被取消;如果操作已经开始,取决于 mayInterruptIfRunning 参数,如果为 true,则尝试中断执行任务的线程。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
​
public class AsynchronousExample {
    public static void main(String[] args) throws Exception {
        // 创建一个线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 提交任务
        Future<String> future = executor.submit(() -> {
            // 异步执行的任务
            ......
            // 返回结果
            return "Task completed";
        });
​
        // 在任务完成前可以执行其他操作
        ......
​
        String result = future.get(); // 获取异步结果,阻塞直到任务完成,获取结果
        System.out.println(result);
        
        //关闭线程池
        executor.shutdown();
    }
}

3.2.2.2 CompletableFuture

CompletableFuturejava.util.concurrent 包下引入的一种异步编程的工具,它提供了一套强大的 API,使得异步编程更为灵活和便利。CompletableFuture 可以看作是对 Future 的扩展,同时结合了 Promise 的思想,使得异步任务的处理更加流畅。

以下是一些 CompletableFuture 的基本使用示例:

基本示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
​
public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个异步任务,返回一个 CompletableFuture 对象
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 异步任务
            return "Operation completed";
        });
​
        // 注册回调函数,在异步任务完成时执行
        future.thenAccept(result -> System.out.println("Result: " + result));
​
        // 阻塞等待异步任务完成
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述例子中,CompletableFuture.supplyAsync 方法用于执行一个异步任务,返回一个 CompletableFuture 对象。然后,通过 thenAccept 方法注册回调函数,在异步任务完成时执行。

组合多个 CompletableFuture:
import java.util.concurrent.CompletableFuture;
​
public class CompletableFutureComposeExample {
    public static void main(String[] args) {
        // 第一个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result from Future 1");
​
        // 在第一个任务完成后执行第二个异步任务
        CompletableFuture<String> future2 = future1.thenCompose(result1 ->
                CompletableFuture.supplyAsync(() -> result1 + ", Result from Future 2"));
​
        // 注册回调函数,在所有任务完成时执行
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
        allOfFuture.thenRun(() -> {
            try {
                // 获取第一个任务的结果
                String result1 = future1.get();
                System.out.println(result1);
​
                // 获取第二个任务的结果
                String result2 = future2.get();
                System.out.println(result2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
​
        // 注册回调函数,在任意一个任务完成时执行
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
        anyOfFuture.thenAccept(result -> System.out.println("Any of the tasks completed: " + result));
    }
}

在上述例子中,使用 thenCompose 方法组合了两个 CompletableFuture,在第一个任务完成后执行第二个任务。同时,使用 allOfanyOf 方法分别注册回调函数,以在所有任务完成和任意一个任务完成时执行相应的操作。

CompletableFuture 还提供了丰富的方法用于处理异常、组合多个任务的结果、以及其他更高级的异步编程需求。这使得在 Java 中进行异步编程变得更加方便和灵活。

3.3 Spring的@Async异步

我们在Spring中可以使用异步将一些无返回值任务分发给其他线程处理,以减少任务调用时间。

自定义线程池:

/**
 * 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
 **/
@EnableAsync
@Configuration
public class TaskPoolConfig {
 
    /**
     * 自定义线程池
     *
     * @author: jacklin
     * @since: 2021/11/16 17:41
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

使用@Async注解完成对方法的异步执行:

public interface AsyncService {
 
    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);
 
    MessageResult sendEmail(String email, String subject, String content);
}
 
@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {
 
    @Autowired
    private IMessageHandler mesageHandler;
 
    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {
 
            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);
 
        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
 
 
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {
 
            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);
 
        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}

在使用@Async时推荐使用自定义线程池,不建议使用SpringBoot默认线程池。

3.4 消息队列

在微服务项目中,我们也可以采用消息队列将任务交给其他组件处理。

3.5 RxJava

似乎与Android开发有关,我还不会......

4 关于异步

异步编程很难,但却是最近十年所有编程语言在发力的方向。

在面向 CPU 计算的场景下,多线程基本都能吃满 CPU 资源。但是在 I/O 场景下,多线程并不能解决问题,大部分时间线程都在等待 IO 调用的返回。

在I/O密集任务中,我们可以采用Reactor模型,Reactor 模型在处理多个 I/O 任务时,可以提升单个线程对多个 I/O 任务的处理速度,主要原因在于其采用了异步、非阻塞的编程模型。

在传统的同步阻塞模型中,每个 I/O 操作都会阻塞当前线程,导致线程无法处理其他任务。这种情况下,为了同时处理多个 I/O 操作,通常需要创建多个线程,每个线程负责处理一个 I/O 操作。然而,多线程会带来线程切换的开销、上下文切换的开销,以及线程安全等问题。

相比之下,Reactor 模型采用了异步、非阻塞的方式。在 Reactor 中,单个线程可以处理多个 I/O 操作,而无需阻塞等待每个操作的完成。它利用事件驱动的机制,将 I/O 操作注册为事件,并在事件完成时触发相应的回调。这样,单个线程可以在等待某个 I/O 操作的同时,继续执行其他任务。

使用 Reactor,单个线程可以高效地管理多个并发的 I/O 任务,减少了线程切换和上下文切换的开销,提高了系统的吞吐量。这对于高并发、大量 I/O 操作的场景,例如网络通信、数据库访问等,能够带来明显的性能提升。

5 阻塞/非阻塞

同步不一定阻塞,异步也不一定非阻塞。