写点什么

Graph 流式迭代过程

作者:Disaster
  • 2025-09-23
    上海
  • 本文字数:5681 字

    阅读完需:约 19 分钟

Graph 流式迭代过程

1. 背景

随着 spring-ai-alibaba-graph 模块的广泛应用,社区中出现了许多关于其流式处理实现机制的疑问和使用文档需求。其中最突出的问题是:graph 模块的流式实现如何与当前主流的响应式流(Reactive Streams)框架进行集成。


当前 graph 模块采用传统的迭代器模式实现流式输出,这种实现方式与主流的响应式编程范式存在较大差异。在与 Project Reactor、RxJava 等响应式流框架集成时,开发者需要进行大量额外的适配工作,增加了技术复杂性和维护成本。


为解决这一问题并提升框架的现代化程度,团队对 graph 内核中的流式输出机制进行了重构,采用基于 Project Reactor 的响应式流实现,以更好地与现代响应式生态系统集成,降低开发者的使用门槛并提高整体性能表现。

2. 迭代器实现流式输出分析

2.1 设计理念

2.1.1 传统迭代器模式在流式输出场景下的设计思路

传统迭代器模式在 Spring AI Alibaba 项目的 old_version 包中通过 AsyncGenerator<E> 接口实现流式输出。其核心设计思路是将异步数据流抽象为一个可迭代的对象,消费者通过调用 next() 方法逐个获取数据元素。


该模式遵循以下原则:


  • 拉取模式(Pull-based):消费者主动从生成器中拉取数据,而非被动接收推送

  • 状态封装:通过 Data<E> 类封装异步操作的各种状态(正常数据、完成状态、错误状态)

  • 装饰器扩展:通过装饰器模式为基本生成器添加额外功能(如结果值获取、嵌套生成器支持)

2.1.2 与响应式流模式的架构哲学差异


传统迭代器模式更符合命令式编程思维,而响应式流模式则体现了声明式编程的理念。

2.1.3 Java 并发模型角度的线程模型和资源管理策略

传统迭代器模式主要依赖以下 Java 并发机制:


  1. CompletableFuture 链式调用:通过 thenComposethenApply 等方法构建异步操作链

  2. 线程池管理:依赖底层线程池执行异步任务

  3. 手动资源管理:需要显式管理异步任务的生命周期,防止资源泄漏


// AsyncGenerator接口中的toCompletableFuture方法体现了链式调用思想default CompletableFuture<Object> toCompletableFuture() {    final Data<E> next = next();    if (next.isDone()) {        return completedFuture(next.resultValue);    }    return next.data.thenCompose(v -> toCompletableFuture());}
复制代码

2.2 核心源码深度解析

2.2.1 AsyncGenerator<E> 接口及其实现类的职责划分

AsyncGenerator<E> 接口定义了异步生成器的核心契约:


public interface AsyncGenerator<E> extends Iterable<E> {    Data<E> next(); // 获取下一个异步数据元素    default CompletableFuture<Object> toCompletableFuture() { ... } // 转换为CompletableFuture    default Stream<E> stream() { ... } // 转换为Stream    default Iterator<E> iterator() { ... } // 获取迭代器}
复制代码


主要实现类包括:


  1. 基本实现:通过 lambda 表达式直接实现接口

  2. WithResult 装饰器:添加结果值获取功能

  3. WithEmbed 装饰器:支持生成器嵌套组合

2.2.2 Data<E> 封装类的设计考量

Data<E> 类是异步数据元素的核心封装,设计上考虑了多种状态:


class Data<E> {    final CompletableFuture<E> data; // 异步数据    final Embed<E> embed; // 嵌入式生成器    final Object resultValue; // 结果值
public boolean isDone() { // 完成状态判断 return data == null && embed == null; }
public boolean isError() { // 错误状态判断 return data != null && data.isCompletedExceptionally(); }}
复制代码


设计考量包括:


  1. 状态分离:通过不同字段表示不同状态,避免状态混淆

  2. 类型安全:使用泛型确保类型安全

  3. 不可变性:字段均为 final,确保线程安全

2.2.3 WithEmbed 和 WithResult 装饰器模式的应用

WithResult 装饰器:
class WithResult<E> implements AsyncGenerator<E>, HasResultValue {    protected final AsyncGenerator<E> delegate;    private Object resultValue;
@Override public final Data<E> next() { final Data<E> result = delegate.next(); if (result.isDone()) { resultValue = result.resultValue; } return result; }}
复制代码
WithEmbed 装饰器:
class WithEmbed<E> implements AsyncGenerator<E>, HasResultValue {    protected final Deque<Embed<E>> generatorsStack = new ArrayDeque<>(2);    private final Deque<Data<E>> returnValueStack = new ArrayDeque<>(2);
@Override public Data<E> next() { // 处理嵌套生成器栈 // 实现生成器组合逻辑 }}
复制代码

2.2.4 背压处理机制的实现方式和局限性

传统迭代器模式的背压处理主要通过以下方式实现:


  1. 阻塞式背压:当消费者处理速度慢于生产者时,next() 方法调用会阻塞

  2. 缓冲区管理:通过手动管理 CompletableFuture 链实现简单的缓冲


局限性:


  1. 缺乏精细化控制:无法根据消费者处理能力动态调整生产速度

  2. 资源消耗:阻塞等待会消耗线程资源

  3. 扩展性差:难以实现复杂的背压策略

2.3 结构图解

2.3.1 组件交互图

2.3.2 典型流式调用时序图

2.3.3 设计模式应用标注

  1. 迭代器模式:AsyncGenerator 继承 Iterable 接口

  2. 装饰器模式:WithResultWithEmbed

  3. 工厂模式:静态工厂方法创建各种生成器实例

  4. 模板方法模式:next() 方法定义算法框架

3. 响应式流模式实现分析

3.1 设计理念

3.1.1 整体架构设计理念

Spring AI Alibaba Graph 模块的流式处理设计采用了响应式编程范式,基于 Project Reactor 框架实现。其核心理念是:


  1. 非阻塞异步处理:通过 Flux 和 Mono 实现非阻塞的数据流处理,提高系统吞吐量

  2. 背压处理:利用 Reactor 的背压机制,确保生产者和消费者之间的速率平衡

  3. 状态一致性:通过 OverAllState 管理流式处理过程中的状态变化

  4. 模块化设计:将流式处理逻辑封装在独立的组件中,便于维护和扩展

3.1.2 采用 Flux 作为核心组件的原因

Flux 作为流式处理的核心组件具有以下优势:


  1. 丰富的操作符:提供 map、filter、zip 等丰富的流操作符,便于处理复杂的数据流

  2. 背压支持:内置背压处理机制,确保系统稳定性

  3. 错误处理:完善的错误处理机制,支持异常传播和恢复

  4. 与 Spring 生态集成:与 Spring WebFlux 无缝集成,便于构建响应式应用

3.1.3 并行节点流合并策略

ParallelNode 中,流合并策略采用以下设计:


  1. 分离处理:将非 Flux 和 Flux 类型的输出分别处理

  2. 统一合并:使用 Flux.zip 操作符合并多个 Flux 流

  3. 状态管理:通过 OverAllState.updateState 方法维护状态一致性

3.1.4 AsyncGenerator 与 Reactor Flux 的集成

为保持向后兼容性,项目提供了 AsyncGenerator 与 Flux 的双向转换:


  1. AsyncGenerator.fromFlux:将 Flux 转换为 AsyncGenerator

  2. FlowGenerator.fromPublisher:将 Publisher 转换为 AsyncGenerator

3.2 核心源码解析

3.2.1 ParallelNode 中多个 Flux 流的合并实现

ParallelNode 中,多个 Flux 流的合并通过以下步骤实现:


// 检查是否有Flux类型的输出boolean hasFlux = results.stream()    .flatMap(map -> map.values().stream())    .anyMatch(value -> value instanceof Flux);
if (hasFlux) { // 收集所有Flux流 List<Flux<Object>> fluxList = new ArrayList<>(); // ... 处理非Flux输出 ... // 合并Flux流 if (!fluxList.isEmpty()) { Flux<Object> mergedFlux = Flux.zip(fluxList, new Function<Object[], Object>() { @Override public Object apply(Object[] objects) { return null; // 简化的合并逻辑 } }); mergedState.put("__merged_stream__", mergedFlux); }}
复制代码

3.2.2 StreamingOutput 和 StreamingChatGenerator 处理流式输出

StreamingOutput 类封装了流式输出的数据:


public class StreamingOutput extends NodeOutput {    private final String chunk;    private final ChatResponse chatResponse;        public StreamingOutput(ChatResponse chatResponse, String node, OverAllState state) {        super(node, state);        this.chatResponse = chatResponse;        this.chunk = null;    }        public StreamingOutput(String chunk, String node, OverAllState state) {        super(node, state);        this.chunk = chunk;        this.chatResponse = null;    }}
复制代码


StreamingChatGenerator 构建流式聊天生成器:


public AsyncGenerator<? extends NodeOutput> buildInternal(Flux<ChatResponse> flux,        Function<ChatResponse, StreamingOutput> outputMapper) {    var result = new AtomicReference<ChatResponse>(null);        Consumer<ChatResponse> mergeMessage = (response) -> {        result.updateAndGet(lastResponse -> {            // 合并消息逻辑            // ...        });    };        var processedFlux = flux        .filter(response -> response.getResult() != null && response.getResult().getOutput() != null)        .doOnNext(mergeMessage)        .map(next -> new StreamingOutput(next.getResult().getOutput().getText(), startingNode, startingState));        return FlowGenerator.fromPublisher(FlowAdapters.toFlowPublisher(processedFlux),            () -> mapResult.apply(result.get()));}
复制代码

3.2.3 OverAllState 在流式处理中的状态管理

OverAllState 通过以下方式管理流式处理状态:


public static Map<String, Object> updateState(Map<String, Object> state, Map<String, Object> partialState,        Map<String, KeyStrategy> keyStrategies) {    Objects.requireNonNull(state, "state cannot be null");    if (partialState == null || partialState.isEmpty()) {        return state;    }
Map<String, Object> updatedPartialState = updatePartialStateFromSchema(state, partialState, keyStrategies);
return Stream.concat(state.entrySet().stream(), updatedPartialState.entrySet().stream()) .collect(toMapRemovingNulls(Map.Entry::getKey, Map.Entry::getValue, (currentValue, newValue) -> newValue));}
复制代码

3.2.4 AsyncGeneratorUtils 中多生成器合并的实现原理

AsyncGeneratorUtils 提供多生成器合并功能:


public static <T> AsyncGenerator<T> createMergedGenerator(List<AsyncGenerator<T>> generators,        Map<String, KeyStrategy> keyStrategyMap) {    return new AsyncGenerator<>() {        private final StampedLock lock = new StampedLock();        private AtomicInteger pollCounter = new AtomicInteger(0);        private Map<String, Object> mergedResult = new HashMap<>();        private final List<AsyncGenerator<T>> activeGenerators = new CopyOnWriteArrayList<>(generators);
@Override public Data<T> next() { // 轮询各个生成器,合并结果 // ... } };}
复制代码

3.3 技术细节

3.3.1 流式数据在并行执行节点间的传递和聚合

在并行执行中,流式数据通过以下方式传递和聚合:


  1. 并行执行:使用 CompletableFuture.allOf 并行执行多个节点

  2. 结果收集:收集所有节点的执行结果

  3. 类型分离:将 Flux 和非 Flux 类型的结果分别处理

  4. 状态更新:通过 OverAllState.updateState 更新全局状态

3.3.2 merged_stream 键在流合并过程中的作用

__merged_stream__ 键用于标识合并后的 Flux 流,在后续处理中可以识别和处理合并的流数据。

3.3.3 不同数据类型在流合并时的处理策略

项目通过类型检查来处理不同数据类型:


  1. Flux 类型:通过 instanceof Flux 检查,收集到 fluxList 中进行合并

  2. 非 Flux 类型:直接通过 OverAllState.updateState 方法更新状态

3.3.4 背压处理和性能优化措施

  1. Reactor 背压机制:利用 Flux 内置的背压处理机制

  2. 缓冲策略:使用 onBackpressureBuffer() 处理背压

  3. 异步处理:通过 CompletableFuture 实现异步执行

  4. 资源管理:合理管理线程池和内存资源

3.4 架构图示

3.4.1 关键组件交互图

graph TD    A[StateGraph] --> B[ParallelNode]    B --> C[AsyncParallelNodeAction]    C --> D[Node Actions]    D --> E[Flux Streams]    E --> F[Flux Merge]    F --> G[__merged_stream__]    G --> H[OverAllState]
复制代码

3.4.2 并行节点流合并时序图

sequenceDiagram    participant Client    participant ParallelNode    participant NodeAction1    participant NodeAction2    participant FluxMerge        Client->>ParallelNode: Execute    ParallelNode->>NodeAction1: Execute Async    ParallelNode->>NodeAction2: Execute Async    NodeAction1-->>ParallelNode: Return Flux    NodeAction2-->>ParallelNode: Return Flux    ParallelNode->>FluxMerge: Merge Flux Streams    FluxMerge-->>ParallelNode: Merged Flux    ParallelNode->>Client: Return Result
复制代码

4. 两种实现对比

4.1 性能和可扩展性分析

4.1.1 高并发场景性能对比

传统迭代器模式:
  • 每次 next() 调用可能涉及线程阻塞

  • CompletableFuture 链式调用增加内存开销

  • 缺乏统一的资源管理导致潜在内存泄漏

响应式流模式:
  • 基于事件循环的非阻塞处理提高吞吐量

  • 内置背压机制防止系统过载

  • 统一的资源管理减少内存泄漏风险

4.1.2 资源利用和内存管理优势

响应式流模式在资源利用方面具有明显优势:


  1. 内存效率:通过背压机制控制内存使用

  2. 线程效率:事件循环模型减少线程切换开销

  3. 资源回收:自动化的订阅/取消机制确保资源及时回收

4.1.3 扩展性和维护性对比

4.2 适用场景分析

传统迭代器模式适用于:

  • 简单的顺序处理场景

  • 不需要高并发处理的场景

  • 团队对响应式编程不熟悉的项目

响应式流模式适用于:

  • 高并发、低延迟要求的场景

  • 需要处理大量流式数据的场景

  • 与 Spring 生态系统深度集成的项目

发布于: 刚刚阅读数: 3
用户头像

Disaster

关注

talk is cheap,show me the code 2021-12-29 加入

A coder who likes open source, has worked in the field of network security and Android, and is now constantly exploring ing in the field of java

评论

发布
暂无评论
Graph 流式迭代过程_Disaster_InfoQ写作社区