Java 异步网络编程示例

随着对编写非阻塞代码的需求不断增长,我们需要异步执行代码的方法。本节将介绍几种使用 Java 实现异步编程的方法。另外,还将探索一些提供即用型解决方案的 Java 库。

异步编程

Java 中的异步编程

异步编程的代码中,Java 计算阶乘的代码:

    public static long factorial(int number) {
        long result = 1;
        for(int i=number;i>0;i--) {
            result *= i;
        }
        return result;
    }

Thread

我们可以创建一个新线程来异步执行任何操作。随着 Java 8 中 lambda 表达式的发布,它变得更干净,更易读。

让我们创建一个新的线程来计算和打印数字的阶乘:

    public static Thread byThread(int number) {
        Thread newThread = new Thread(() -> {
            System.out.println("Factorial of " + number + " is: " + factorial(number));
        });

        return newThread;
    }

FutureTask

从 Java 5 开始,Future 接口提供了一种使用 FutureTask 执行异步操作的方法。
我们可以使用提交的方法 ExecutorService 的异步执行任务并返回的实例 FutureTask。

因此,可以用下面的方式计算数字的阶乘:

    public static Future<Long> byFutureTask(int number) {
        Future<Long> futureTask = threadpool.submit(() -> factorial(number));

        while (!futureTask.isDone()) {
            System.out.println("FutureTask is not finished yet...");
        }
        return futureTask;
    }

示例代码中,使用了 Future 接口提供的 isDone 方法来检查任务是否完成。完成后,可以使用 get 方法检索结果。

CompletableFuture

Java 8 结合了 Future 和 CompletionStage 引入了 CompletableFuture。它为异步编程提供了诸如 supplyAsyncrunAsyncthenApplyAsync 之类的各种方法。

下面使用 CompletableFuture 代替 FutureTask 计算数字的阶乘:

    public static Long factorialUsingCompletableFuture(int number) {
        CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> factorial(number));
        while (!completableFuture.isDone()) {
            System.out.println("CompletableFuture is not finished yet...");
        }
        return completableFuture.get();
    }

我们不需要显式使用 ExecutorService ,该 CompletableFuture 内部使用 ForkJoinPool 异步处理任务。因此,它使的代码更加整洁。

Guava

Guava 提供了 ListenableFuture 类来执行异步操作:

将 Guava 的 Maven 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.2-jre</version>
</dependency>

使用 ListenableFuture 计算数字的阶乘:

ExecutorService threadpool = Executors.newCachedThreadPool();
ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
ListenableFuture<Long> guavaFuture = (ListenableFuture<Long>) service.submit(()-> factorial(number));
long result = guavaFuture.get();

在这里,MoreExecutors 类提供了 ListeningExecutorService 类的实例。然后,ListeningExecutorService.submit 方法异步执行任务,并返回 ListenableFuture 的实例。

Guava 还提供一个 Futures 类,该类提供诸如 commitAsyncscheduleAsynctransformAsync 之类的方法来链接类似于 CompletableFuture 的 ListenableFutures。

使用 Futures.submitAsync 代替 ListeningExecutorService.submit 方法:

ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
AsyncCallable<Long> asyncCallable = Callables.asAsyncCallable(new Callable<Long>() {
    public Long call() {
        return factorial(number);
    }
}, service);
ListenableFuture<Long> guavaFuture = Futures.submitAsync(asyncCallable, service);

此外,Futures 类提供了 addCallback 方法来注册成功和失败回调:

Futures.addCallback(
  factorialFuture,
  new FutureCallback<Long>() {
      public void onSuccess(Long factorial) {
          System.out.println(factorial);
      }
      public void onFailure(Throwable thrown) {
          thrown.getCause();
      }
  },
  service);

EA Async

EA 通过 ea-async 库将 .NET 的 async-await 功能引入了 Java 生态系统。

该库允许顺序编写异步(非阻塞)代码。因此,它使异步编程更加容易并且可以自然扩展:

将 ea-async 的 Maven 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>com.ea.async</groupId>
    <artifactId>ea-async</artifactId>
    <version>1.2.3</version>
</dependency>

使用 EA 的 Async 类提供的 await 方法来转换先前讨论的 CompletableFuture 代码:

static {
    Async.init();
}

public long factorialUsingEAAsync(int number) {
    CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> factorial(number));
    long result = Async.await(completableFuture);
}

在这里,我们在静态块中调用 Async.init 方法以初始化 Async 运行时工具。

异步检测会在运行时转换代码,并将对 await 方法的调用重写为与使用 CompletableFuture 链类似的行为。

因此,对 await 方法的调用类似于调用 Future.join。

我们可以将 –javaagent JVM 参数用于编译时检测。这是 Async.init 方法的替代方法:

java -javaagent:ea-async-1.2.3.jar -cp <claspath> <MainClass>

让我们来看一下顺序编写异步代码的另一个示例。

首先,使用链式调用的写法获取 completableFuture :

CompletableFuture<Void> completableFuture = hello()
  .thenComposeAsync(hello -> mergeWorld(hello))
  .thenAcceptAsync(helloWorld -> print(helloWorld))
  .exceptionally(throwable -> {
      System.out.println(throwable.getCause());
      return null;
  });
completableFuture.get();

然后,使用 EA 的 Async.await() 转换代码:

try {
    String hello = await(hello());
    String helloWorld = await(mergeWorld(hello));
    await(CompletableFuture.runAsync(() -> print(helloWorld)));
} catch (Exception e) {
    e.printStackTrace();

该实现类似于顺序阻塞代码。但是,await 方法不会阻止代码。

如前所述,所有对 await 方法的调用都将由 Async 工具重写,以与 Future.join 方法类似地工作。

因此,一旦 hello 方法的异步执行完成,Future 结果将传递到 mergeWorld 方法。然后,使用 CompletableFuture.runAsync 方法将结果传递到最后。

Cactoos

Cactoos 是一个基于面向对象原理的 Java 库。

它是 Google Guava 和 Apache Commons 的替代方案,提供了用于执行各种操作的通用对象。

将 Cactoos 的 Maven 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>org.cactoos</groupId>
    <artifactId>cactoos</artifactId>
    <version>0.43</version>
</dependency>

该库为异步操作提供了一个 Async 类。

因此,我们可以使用 Cactoos 的 Async 类的实例找到数字的阶乘:

Async<Integer, Long> asyncFunction = new Async<Integer, Long>(input -> factorial(input));
Future<Long> asyncFuture = asyncFunction.apply(number);
long result = asyncFuture.get();

在这里,该应用方法执行使用操作 ExecutorService.submit 方法并返回的实例未来接口。

同样,Async 类具有 exec 方法,该方法提供相同的功能而没有返回值。

Cactoos 库处于开发的初始阶段,可能尚不适合生产使用。

Jcabi-Aspects

Jcabi-Aspects 通过 AspectJ AOP Aspects 为异步编程提供 @Async 注解。

将 Cactoos 的 Maven 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>com.jcabi</groupId>
    <artifactId>jcabi-aspects</artifactId>
    <version>0.22.6</version>
</dependency>

该 jcabi-aspects 库需要 AspectJ 运行支持。因此,我们将添加 aspectjrt Maven 依赖项:

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.9.5</version>
</dependency>

接下来,添加 jcabi-maven-plugin 插件,该插件使用 AspectJ 编织二进制文件。该插件提供了 ajc goal 用于完成工作:

<plugin>
    <groupId>com.jcabi</groupId>
    <artifactId>jcabi-maven-plugin</artifactId>
    <version>0.14.1</version>
    <executions>
        <execution>
            <goals>
                <goal>ajc</goal>
            </goals>
        </execution>
    </executions>
    <dependencies>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjtools</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.9.1</version>
        </dependency>
    </dependencies>
</plugin>

使用 AOP 切面进行异步编程:

@Async
@Loggable
public Future<Long> factorialUsingAspect(int number) {
    Future<Long> factorialFuture = CompletableFuture.completedFuture(factorial(number));
    return factorialFuture;
}

当我们编译代码时,该库将通过 AspectJ weaving 代替 @Async 批注注入 AOP 建议,以异步执行factorialUsingAspect 方法。

因此,让我们使用Maven命令编译该类:

mvn install

jcabi-maven-plugin 的输出可能类似于:

--- jcabi-maven-plugin:0.14.1:ajc (default) @ java-async ---
[INFO] jcabi-aspects 0.18/55a5c13 started new daemon thread jcabi-loggable for watching of @Loggable annotated methods
[INFO] Unwoven classes will be copied to /tutorials/java-async/target/unwoven
[INFO] jcabi-aspects 0.18/55a5c13 started new daemon thread jcabi-cacheable for automated cleaning of expired @Cacheable values
[INFO] ajc result: 10 file(s) processed, 0 pointcut(s) woven, 0 error(s), 0 warning(s)

通过检查由 Maven 插件生成的 jcabi-ajc.log 文件中的日志,来验证我们的类是否正确编织:

Join point 'method-execution(java.util.concurrent.Future 
com.baeldung.async.JavaAsync.factorialUsingJcabiAspect(int))' 
in Type 'com.baeldung.async.JavaAsync' (JavaAsync.java:158) 
advised by around advice from 'com.jcabi.aspects.aj.MethodAsyncRunner' 
(jcabi-aspects-0.22.6.jar!MethodAsyncRunner.class(from MethodAsyncRunner.java))

作为一个简单的 Java 应用程序运行,其输出将类似于:

17:46:58.245 [main] INFO com.jcabi.aspects.aj.NamedThreads - 
jcabi-aspects 0.22.6/3f0a1f7 started new daemon thread jcabi-loggable for watching of @Loggable annotated methods
17:46:58.355 [main] INFO com.jcabi.aspects.aj.NamedThreads - 
jcabi-aspects 0.22.6/3f0a1f7 started new daemon thread jcabi-async for Asynchronous method execution
17:46:58.358 [jcabi-async] INFO com.baeldung.async.JavaAsync - 
#factorialUsingJcabiAspect(20): 'java.util.concurrent.CompletableFuture@14e2d7c1[Completed normally]' in 44.64µs

输出内容中,可以看到由异步执行任务的库,创建了一个新的守护进程线程 jcabi-async。

同样,通过库提供的 @Loggable 注解启用日志记录。

结语

本篇介绍了多种 Java 异步编程的方法,有 Java 原声支持的方式,也有通过引入其他库的扩展方法。

转载请注明出处:码谱记录 » Java 异步网络编程示例