写点什么

图思维胜过链式思维:JGraphlet 构建任务流水线的八大核心原则

作者:qife122
  • 2025-09-25
    福建
  • 本文字数:3167 字

    阅读完需:约 10 分钟

图思维胜过链式思维:JGraphlet 构建任务流水线 🚀

JGraphlet 是一个轻量级、零依赖的 Java 库,用于构建任务流水线。它的强大之处不在于冗长的功能列表,而在于一小套协同工作的核心设计原则。


JGraphlet 的核心是简洁性,基于图模型构建。向流水线添加任务并连接它们以创建图。每个任务都有输入和输出,TaskPipeline 构建并执行流水线,同时管理每个任务的 I/O。例如,使用 Map 进行扇入操作,使用 Record 定义自定义数据模型等。TaskPipeline 还包含 PipelineContext 在任务间共享数据,此外任务还可以被缓存,避免重复计算。


您可以自定义任务流水线的流程,并选择使用 SyncTask 或 AsyncTask。默认情况下所有任务都是异步的。

1. 图优先执行模型

JGraphlet 将工作流视为有向无环图。您将任务定义为节点,并显式绘制它们之间的连接。这使得扇出和扇入等复杂模式变得自然。


import dev.shaaf.jgraphlet.*;import java.util.Map;import java.util.concurrent.CompletableFuture;
try (TaskPipeline pipeline = new TaskPipeline()) { Task<String, String> fetchInfo = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Info for " + id); Task<String, String> fetchFeed = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Feed for " + id); Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() -> inputs.get("infoNode") + " | " + inputs.get("feedNode") );
pipeline.addTask("infoNode", fetchInfo) .addTask("feedNode", fetchFeed) .addTask("summaryNode", combine);
pipeline.connect("infoNode", "summaryNode") .connect("feedNode", "summaryNode");
String result = (String) pipeline.run("user123").join(); System.out.println(result); // "Info for user123 | Feed for user123"}
复制代码

2. 两种任务风格:Task<I,O>和 SyncTask<I,O>

JGraphlet 提供两种可混合使用的任务类型:


  • Task<I, O>(异步):返回 CompletableFuture<O>,适合 I/O 操作或繁重计算

  • SyncTask<I, O>(同步):直接返回 O,适合快速的 CPU 密集型操作


try (TaskPipeline pipeline = new TaskPipeline()) {    Task<String, String> fetchName = (userId, ctx) ->        CompletableFuture.supplyAsync(() -> "John Doe");
SyncTask<String, String> toUpper = (name, ctx) -> name.toUpperCase();
pipeline.add("fetch", fetchName) .then("transform", toUpper);
String result = (String) pipeline.run("user-42").join(); System.out.println(result); // "JOHN DOE"}
复制代码

3. 简单显式的 API

JGraphlet 避免复杂的构建器或魔法配置,API 简洁明了:


  • 创建流水线:new TaskPipeline()

  • 注册节点:addTask("uniqueId", task)

  • 连接节点:connect("fromId", "toId")


try (TaskPipeline pipeline = new TaskPipeline()) {    SyncTask<String, Integer> lengthTask = (s, c) -> s.length();    SyncTask<Integer, String> formatTask = (i, c) -> "Length is " + i;
pipeline.addTask("calculateLength", lengthTask); pipeline.addTask("formatOutput", formatTask);
pipeline.connect("calculateLength", "formatOutput");
String result = (String) pipeline.run("Hello").join(); System.out.println(result); // "Length is 5"}
复制代码

4. 清晰的扇入输入形状

扇入任务接收 Map<String, Object>,其中键是父任务 ID,值是它们的结果。


try (TaskPipeline pipeline = new TaskPipeline()) {    SyncTask<String, String> fetchUser = (id, ctx) -> "User: " + id;    SyncTask<String, String> fetchPerms = (id, ctx) -> "Role: admin";
Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() -> { String userData = (String) inputs.get("userNode"); String permsData = (String) inputs.get("permsNode"); return userData + " (" + permsData + ")"; });
pipeline.addTask("userNode", fetchUser) .addTask("permsNode", fetchPerms) .addTask("combiner", combine);
pipeline.connect("userNode", "combiner").connect("permsNode", "combiner");
String result = (String) pipeline.run("user-1").join(); System.out.println(result); // "User: user-1 (Role: admin)"}
复制代码

5. 清晰的运行契约

执行流水线很简单:pipeline.run(input)返回最终结果的 CompletableFuture。您可以使用.join()阻塞或使用异步链式调用。


String input = "my-data";
// 阻塞方式try { String result = (String) pipeline.run(input).join(); System.out.println("Result (blocking): " + result);} catch (Exception e) { System.err.println("Pipeline failed: " + e.getMessage());}
// 非阻塞方式pipeline.run(input) .thenAccept(result -> System.out.println("Result (non-blocking): " + result)) .exceptionally(ex -> { System.err.println("Async pipeline failed: " + ex.getMessage()); return null; });
复制代码

6. 内置资源生命周期

JGraphlet 实现 AutoCloseable。使用 try-with-resources 保证内部资源的安全关闭。


try (TaskPipeline pipeline = new TaskPipeline()) {    pipeline.add("taskA", new SyncTask<String, String>() {        @Override        public String executeSync(String input, PipelineContext context) {            if (input == null) {                throw new IllegalArgumentException("Input cannot be null");            }            return "Processed: " + input;        }    });
pipeline.run("data").join();
} // pipeline.shutdown()自动调用System.out.println("Pipeline resources have been released.");
复制代码

7. 上下文

PipelineContext 是线程安全的、每次运行的工作空间,用于存储元数据。


SyncTask<String, String> taskA = (input, ctx) -> {    ctx.put("requestID", "xyz-123");    return input;};SyncTask<String, String> taskB = (input, ctx) -> {    String reqId = ctx.get("requestID", String.class).orElse("unknown");    return "Processed input " + input + " for request: " + reqId;};
复制代码

8. 可选缓存

任务可以选择加入缓存以避免重复计算。


Task<String, String> expensiveApiCall = new Task<>() {    @Override    public CompletableFuture<String> execute(String input, PipelineContext context) {        System.out.println("Performing expensive call for: " + input);        return CompletableFuture.completedFuture("Data for " + input);    }    @Override    public boolean isCacheable() { return true; }};
try (TaskPipeline pipeline = new TaskPipeline()) { pipeline.add("expensive", expensiveApiCall);
System.out.println("First call..."); pipeline.run("same-key").join();
System.out.println("Second call..."); pipeline.run("same-key").join(); // 结果来自缓存}
复制代码


最终结果是提供了一种清晰、可测试的方式来编排同步或异步任务,用于组合复杂流程,如并行检索、合并、判断和防护栏,而无需引入重量级的工作流引擎。


了解更多或尝试使用:


  • Maven 中央仓库

  • Github 仓库更多精彩内容 请关注我的个人公众号 公众号(办公 AI 智能小助手)公众号二维码

  • 办公AI智能小助手
用户头像

qife122

关注

还未添加个人签名 2021-05-19 加入

还未添加个人简介

评论

发布
暂无评论
图思维胜过链式思维:JGraphlet构建任务流水线的八大核心原则_Java_qife122_InfoQ写作社区