写点什么

AI 智能体 - 并行模式

作者:Hernon AI
  • 2025-11-11
    浙江
  • 本文字数:10947 字

    阅读完需:约 36 分钟

AI智能体 - 并行模式

在我们的 AI 智能体设计系列中,我们已经探讨了两种基础模式:


  1. 提示链模式(Prompt Chaining):如同精密的流水线,通过严格的顺序执行来保证复杂任务的可靠性。

  2. 路由模式(Routing Pattern):如同智能的“交通枢纽”,通过条件逻辑让智能体学会了决策和“分诊”。


这些模式构建了一个强大、有逻辑的智能体。但它们共同面对一个无法回避的敌人:时间


在现实世界中,顺序执行和复杂的决策都可能导致一个致命问题:延迟。当用户等待你的智能体逐一调用 API、逐一搜索来源时,他们的耐心正在被消耗。这时并行模式(Parallelization Pattern) 就变得至关重要。它不是对前两种模式的替代,而是赋予它们“翅膀”的关键技术,让你的智能体学会“分身术”,同时执行多个子任务。

一、并行模式

许多复杂的智能体任务,其本质并非是严格线性的。它们往往可以分解为多个互不依赖的子任务。


想象一下,你是一位主厨,需要准备一场宴会。


  • 顺序执行(提示链): 你先花 30 分钟洗菜,然后花 60 分钟切菜,再花 90 分钟烹饪。总耗时:3 个半小时。

  • 并行执行(并行模式): 你(协调器)指挥三个助手(子智能体):助手 A 负责洗菜,助手 B 负责切菜,助手 C 负责准备汤料。他们同时开始工作。最终,你只需要等待耗时最长的那个任务(比如烹饪)完成,总时间可能缩短到 2 小时。


并行模式涉及同时执行多个组件,例如并行的大语言模型(LLM)调用、并行的工具使用,甚至是并行的子智能体。

为什么并行如此重要?

在智能体工作流中,最大的时间杀手通常不是计算本身,而是 I/O 等待(Input/Output Wait)


当你调用一个外部 API(如谷歌搜索、天气查询、股票数据)时,你的智能体大部分时间都在“等待”网络另一端的服务器响应。在顺序执行中,这种等待是累加的,灾难性的。


一个更具体的例子:智能体研究课题“AIGC 的未来趋势”


传统的顺序(串行)工作流:


  1. [开始]

  2. 调用 Google Search("AIGC最新行业报告") (等待 3 秒)

  3. 获取报告,调用 llm.summarize(报告) (等待 5 秒)

  4. 调用 Google Search("AIGC顶级公司动态") (等待 3 秒)

  5. 获取动态,调用 llm.summarize(动态) (等待 5 秒)

  6. 调用 llm.synthesize(总结A, 总结B) (等待 4 秒)

  7. [结束] 总耗时:3 + 5 + 3 + 5 + 4 = 20 秒


用户在这 20 秒内只能盯着一个旋转的加载图标。


采用并行模式的工作流:


  1. [开始]

  2. (并行分支 A)

  3. 调用 Google Search("AIGC最新行业报告") (等待 3 秒)

  4. 获取报告,调用 llm.summarize(报告) (等待 5 秒)

  5. (分支 A 结束,耗时 8 秒)

  6. (并行分支 B)

  7. 调用 Google Search("AIGC顶级公司动态") (等待 3 秒)

  8. 获取动态,调用 llm.summarize(动态) (等待 5 秒)

  9. (分支 B 结束,耗时 8 秒)

  10. [屏障:等待 A 和 B 全部完成] (总耗时 = Max(8, 8) = 8 秒)

  11. 调用 llm.synthesize(总结A, 总结B) (等待 4 秒)

  12. [结束] 总耗时:8 + 4 = 12 秒


通过并行化,我们将总执行时间缩短了 40%。这就是并行模式的核心价值:找出工作流中互不依赖的环节,并将它们并行执行,从而大幅减少总体等待时间。


实现并行化通常需要支持异步执行(Asynchronous Execution)、多线程或多进程的框架。幸运的是,现代智能体框架(如 LangChain、LangGraph、Google ADK)都提供了原生的异步支持和并行机制,让我们可以方便地定义和运行这些并发步骤。



二、实际应用场景:并行模式的 7 个“加速器”

并行模式是提升智能体性能的强大“加速器”。让我们通过更深入的顺序并行对比,来探索它的 7 个典型应用场景。

1. 信息收集和研究(多源情报)

  • 用例: 投资分析智能体,任务是“全面分析 X 公司”。

  • 顺序(缓慢且片面):

  • “正在查询 X 公司最新新闻...” (调用 News API, 等待 3 秒)

  • “正在拉取 X 公司实时股价...” (调用 Stock API, 等待 2 秒)

  • “正在监测 X 公司社交媒体情绪...” (调用 Social API, 等待 4 秒)

  • “正在查询 X 公司内部财报数据库...” (调用 DB, 等待 3 秒

  • 总耗时: 12 秒。用户体验极差,每一步操作都需要等待反馈。

  • 并行(快速且全面):

  • “正在分析 X 公司,请稍候...”

  • 同时发起四项任务:

  • task_news = async_call_news_api()

  • task_stock = async_call_stock_api()

  • task_social = async_call_social_api()

  • task_db = async_query_db()

  • [屏障] await asyncio.gather(task_news, task_stock, task_social, task_db)

  • 总耗时: Max(3, 2, 4, 3) = 4 秒。智能体在 4 秒后,就能拿到所有情报,然后进行下一步的综合分析。

2. 数据处理和分析(多维洞察)

  • 用例: “客户之声”(VoC)智能体,任务是“分析上周的 1000 条客户反馈”。

  • 顺序(漫长且昂贵):

  • for review in all_reviews: (循环 1000 次)

  • sentiment = llm.call(review, "情感分析") (等待 2 秒)

  • keywords = llm.call(review, "关键词提取") (等待 2 秒)

  • category = llm.call(review, "意图分类") (等待 2 秒

  • 总耗时: 1000 * (2 + 2 + 2) = 6000秒 (约 1.6 小时)。这在实际业务中是不可接受的。

  • 并行(高效且深入):

  • 启动三个“专家智能体”,它们同时处理同一批数据

  • 情感专家: results_sentiment = await parallel_process(all_reviews, sentiment_task)

  • 关键词专家: results_keywords = await parallel_process(all_reviews, keyword_task)

  • 分类专家: results_category = await parallel_process(all_reviews, category_task)

  • (这些parallel_process函数内部也会使用asyncio.gather

  • 总耗时: Max(情感处理总时间, 关键词处理总时间, 分类处理总时间)。通过批量并发,总时间可能缩短到几分钟。这实现了从“不可用”到“高效”的飞跃。

3. 多个 API 或工具交互(无缝体验)

  • 用例: 旅行规划智能体,任务是“帮我规划下周末去上海的行程”。

  • 顺序(笨拙且低效):

  • 智能体: “好的,我先为您查询航班。” (调用 Flight API, 等待 4 秒)

  • 智能体: “航班已找到。现在我为您查询酒店。” (调用 Hotel API, 等待 5 秒)

  • 智能体: “酒店已找到。现在我为您查询当地天气。” (调用 Weather API, 等待 2 秒)

  • 智能体: “天气已找到。现在我为您查询热门餐厅。” (调用 Restaurant API, 等待 3 秒

  • 总耗时: 14 秒。这种“挤牙膏”式的交互方式非常折磨人。

  • 并行(流畅且智能):

  • 智能体: “好的,正在为您规划上海周末行程...”

  • 同时发起四项任务:

  • task_flights = async_call_flight_api()

  • task_hotels = async_call_hotel_api()

  • task_weather = async_call_weather_api()

  • task_restaurants = async_call_restaurant_api()

  • [屏障] await asyncio.gather(...)

  • 总耗时: Max(4, 5, 2, 3) = 5 秒

  • 智能体: “行程已规划完毕:已为您筛选[航班信息],推荐入住[酒店信息],当地天气[天气情况],并找到三家高分餐厅[餐厅列表]。” 这种一次性交付完整答案的体验,才是用户想要的“智能”。

4. 多组件内容生成(协同创作)

  • 用例: 营销内容智能体,任务是“为我们的新产品写一篇推广博文”。

  • 顺序(思维受限):

  • title = llm.call("生成一个吸引人的标题")

  • body = llm.call("根据标题'{title}'生成正文")

  • image_prompt = llm.call("根据正文'{body}'生成图片提示词")

  • 问题: 这种方式看似有逻辑,但每一步都严重依赖上一步,导致思维固化。如果第一步的标题生成得不好,后面全盘皆输。

  • 并行(协同创作):

  • 核心思想: 好的标题、正文、图片都应源自同一个核心创意,而不是彼此的派生。

  • core_brief = "产品:AI降噪耳机;目标用户:通勤者;卖点:沉浸式体验"

  • 同时发起三项创作任务:

  • task_title = async_llm.call(core_brief, "生成5个爆款标题")

  • task_body = async_llm.call(core_brief, "撰写一篇500字博文正文")

  • task_image = async_llm.call(core_brief, "生成3个DALL-E提示词")

  • [屏障] await asyncio.gather(...)

  • 好处: 在几秒钟内,智能体就“分身”成了文案专家、内容专家和视觉专家,同时完成了工作。这不仅更快,而且产出的内容更具一致性,因为它们都忠于同一个core_brief

5. 验证和核实(即时反馈)

  • 用例: 用户注册智能体,任务是“验证新用户注册信息”。

  • 顺序(逐项检查):

  • “正在检查邮箱格式...” (本地检查, 0.1 秒)

  • “正在验证手机号真实性...” (调用 Twilio API, 等待 2 秒)

  • “正在核实地址有效性...” (调用 Address API, 等待 2.5 秒)

  • “正在对用户名进行内容安全审核...” (调用 Moderation LLM, 等待 1.5 秒

  • 总耗时: 0.1 + 2 + 2.5 + 1.5 = 6.1 秒。用户在注册页面干等 6 秒,流失率会大大增加。

  • 并行(即时通过):

  • “正在检查邮箱格式...” (0.1 秒)

  • 同时发起三项外部验证:

  • task_phone = async_call_twilio_api()

  • task_address = async_call_address_api()

  • task_username = async_call_moderation_llm()

  • [屏障] await asyncio.gather(...)

  • 总耗时: 0.1 + Max(2, 2.5, 1.5) = 2.6 秒。将注册验证时间缩短 60%以上,这是提升转化率的实战技术。

6. 多模态处理(全维理解)

  • 用例: 社交媒体监控智能体,任务是“分析新发布的包含视频和文字的帖子”。

  • 顺序(信息割裂):

  • “正在分析帖子文本...” (调用 Text NLP API, 等待 1 秒)

  • “正在分析帖子视频...” (调用 Video Vision API, 需转录和理解, 等待 10 秒

  • 总耗时: 11 秒。问题是,文本和视频是互补的,割裂分析会丢失关键信息(比如“看我视频里的操作”)。

  • 并行(融合理解):

  • 同时启动两个模态的分析器:

  • task_text = async_nlp_api.process(post.text)

  • task_video = async_vision_api.process(post.video)

  • [屏障] await asyncio.gather(...)

  • 总耗时: Max(1, 10) = 10 秒

  • 下一步(融合): 将两个分析结果text_analysisvideo_analysis同时喂给一个 LLM 进行多模态融合llm.call("综合分析文本'{...}'和视频内容'{...}',判断真实意图。")。这不仅更快,而且是实现真正多模态理解的必要前提。

7. A/B 测试或多种方案生成(优中选优)

  • 用例: 创意广告智能体,任务是“为同一产品生成三种不同风格的广告文案”。

  • 顺序(单一思维):

  • llm.call("写个文案") -> 得到 A

  • llm.call("换个风格再写一个") -> 得到 B

  • llm.call("再换个幽默的风格")

  • 问题: 耗时是 3 倍,且 LLM 的后续回答容易受到前序对话的“污染”。

  • 并行(思维发散):

  • 定义三个不同的“专家”提示(Persona):

  • prompt_bold = "你是一个大胆的营销者...写一个有冲击力的文案"

  • prompt_witty = "你是一个幽默的段子手...写一个风趣的文案"

  • prompt_formal = "你是一个专业的工程师...写一个严谨的文案"

  • 同时发起三个 LLM 调用:

  • task_a = async_llm.call(prompt_bold, product_info)

  • task_b = async_llm.call(prompt_witty, product_info)

  • task_c = async_llm.call(prompt_formal, product_info)

  • [屏障] await asyncio.gather(...)

  • 好处:1 倍的时间内,获得了三个风格迥异、互不干扰的高质量备选方案。这使得智能体能快速生成多个选项,供人类决策者挑选或进行下一步的 A/B 测试。



三、实战示例 (LangChain):构建“并行研究链”

在 LangChain 框架中,通过其强大的表达式语言(LCEL) 可以极其优雅地实现并行执行。核心组件是 RunnableParallel(在 LCEL 中,一个字典本身就常常被视为一个并行可运行对象)。


下面的示例将构建一个工作流:用户输入一个主题(如“太空探索史”),工作流将同时启动三个独立的操作(总结、提问、提取关键词),然后将它们各自的输出合并,生成一个最终的综合答案。

1. 代码实现 (Python)

import osimport asynciofrom typing import Optional
# 导入LangChain的核心组件from langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParser# RunnableParallel 用于声明并行执行# RunnablePassthrough 用于传递原始输入from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
# --- 0. 准备环境 ---# 建议使用 .env 文件管理 API 密钥# from dotenv import load_dotenv# load_dotenv()# os.environ["OPENAI_API_KEY"] = "sk-..."
# --- 1. 初始化语言模型 ---try: # 我们使用一个支持高并发的经济型模型 llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7) if llm: print(f"语言模型初始化成功: {llm.model_name}")except Exception as e: print(f"初始化语言模型失败: {e}") llm = None
# --- 2. 定义独立的“专家”链(我们的并行工作单元) ---# 这三条链代表彼此独立、可同时执行的任务。# 它们都只依赖一个输入:{topic}
# 专家A:总结链summarize_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "请你将以下主题进行简洁的总结(100字以内):"), ("user", "{topic}") ]) | llm | StrOutputParser())
# 专家B:提问链questions_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "请你针对以下主题,生成三个有深度的、启发性的问题:"), ("user", "{topic}") ]) | llm | StrOutputParser())
# 专家C:术语链terms_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "请你从以下主题中,识别出 5 到 10 个核心关键术语,并用逗号分隔:"), ("user", "{topic}") ]) | llm | StrOutputParser())
# --- 3. 构建“并行+综合”链 ---
# 3.1. 定义并行任务块 (map_chain)# 我们使用一个字典(它本身就是 RunnableParallel)来定义并行结构。# LCEL执行器会“看到”这个字典,并“智能地”并行执行其中的所有项。map_chain = RunnableParallel( { # "summary" 键的值将由 summarize_chain 的输出填充 "summary": summarize_chain, # "questions" 键的值将由 questions_chain 的输出填充 "questions": questions_chain, # "key_terms" 键的值将由 terms_chain 的输出填充 "key_terms": terms_chain, # 关键一步:我们还需要原始的 topic 传给下一步 # RunnablePassthrough() 会原封不动地传递输入(即 topic 字符串) "topic": RunnablePassthrough(), })
# 3.2. 定义最终的“综合”提示# 这一步是 *串行* 的,它必须等待 map_chain 中的所有并行任务完成。synthesis_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个高级研究助理。请根据以下由并行任务生成的信息,为用户合成一份全面且结构化的答案: --- **核心摘要:** {summary} --- **启发性问题:** {questions} --- **关键术语:** {key_terms} --- 请将以上所有信息整合成一份流畅、连贯的报告。"""), ("user", "原始主题:{topic}")])
# 3.3. 构建完整的端到端链# 管道操作符 | 将 map_chain 的输出(一个字典)# 传递给 synthesis_prompt 作为输入,然后是 LLM 和输出解析器。full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()

# --- 4. 异步运行链 ---async def run_parallel_example(topic: str) -> None: """ 使用特定主题异步调用并行处理链,并打印综合结果。 """ if not llm: print("LLM 未初始化。无法运行示例。") return
print(f"\n--- 正在为主题运行并行 LangChain 示例:'{topic}' ---") try: # 异步调用:ainvoke # 输入是单个 'topic' 字符串。 # LCEL 框架会自动将这个 topic 分发给 map_chain 中的每一个需要它的链。 response = await full_parallel_chain.ainvoke(topic) print("\n--- [并行任务完成] 最终综合响应 ---") print(response) except Exception as e: print(f"\n在链执行期间发生错误: {e}")
if __name__ == "__main__": test_topic = "航天探索史 (The history of space exploration)" # 注意:在生产环境中,这应该在已有的异步事件循环中运行 # 这里我们使用 asyncio.run() 来启动这个异步函数 print("...开始异步执行...") asyncio.run(run_parallel_example(test_topic))
复制代码

2. 代码深度解析

这个示例精妙地展示了 LangChain 如何将复杂的异步逻辑抽象为声明式的管道:


  • 异步的本质 (Concurrency vs. Parallelism): 必须指出,在标准的 Python 环境中,由于全局解释器锁(GIL)的存在,这里的asyncio实现的是并发(Concurrency),而不是真正的并行(Parallelism)

  • 并发: 任务在单个线程中快速切换执行。当一个任务(如summarize_chain)发起网络请求(await llm.call(...))并“等待”时,事件循环会立即切换到另一个任务(如questions_chain)并开始执行,直到它也进入等待。这极大地利用了 I/O 等待时间,非常适合 LLM 和 API 调用。

  • 并行: 任务在多个 CPU 核心上同时运行。这适用于 CPU 密集型任务(如复杂的数学计算)。

  • 对于智能体来说,并发通常就是我们所追求的,因为它完美地解决了 I/O 等待的瓶颈。

  • RunnableParallel 的魔力: map_chain这个字典是 LCEL 的核心。当你ainvoke(异步调用)它时,LCEL 执行器会检查字典中的所有值(summarize_chain, questions_chain...),并自动将它们提交到asyncio事件循环中并发执行。

  • RunnablePassthrough 的作用: 这是一个精巧的工具。map_chain中的三个子链都需要{topic}作为输入,但最终的synthesis_prompt也需要{topic}RunnablePassthrough()确保了原始的topic字符串能够“毫发无伤”地穿过并行处理阶段,并与其他并行结果一起,以"topic": topic_string的形式出现在map_chain的输出字典中,供synthesis_prompt使用。

  • “并行-串行”混合模式: full_parallel_chain = map_chain | synthesis_prompt | ... 这个表达式清晰地定义了一个“并行-串行”工作流。map_chain是并行阶段,synthesis_prompt是串行阶段(它必须等待map_chain的所有结果)。这是优化智能体性能最常用和最有效的模式。



四、实战示例 (Google ADK):构建“并行研究团队”

现在,我们使用 Google Agent Development Kit (ADK) 来演示同样的概念。ADK 采用了一种略有不同的、基于“智能体”的抽象方法。我们不再是构建“链”,而是定义具有特定职责的智能体,并通过一个协调器智能体来管理它们。


此示例将构建一个研究团队:一个ParallelAgent(项目经理)同时派出三个LlmAgent(研究员)去研究不同主题,然后一个MergerAgent(首席分析师)将他们的报告汇总起来。

1. 代码实现 (Python - ADK 风格)

# --- 1. 定义研究员子智能体 (并行执行单元) ---# 这三个智能体是独立的,可以被同时调用
# 研究员 1:可再生能源researcher_agent_1 = LlmAgent( name="RenewableEnergyResearcher", model=GEMINI_MODEL, instruction="""你是一名AI研究助理,专注于能源领域。研究'可再生能源'的最新进展。使用你被提供的 'Google Search' 工具。简洁地(1-2句话)总结你的核心发现。*只*输出总结内容。""", description="研究可再生能源。", tools=[google_search], # 关键:将结果存储到会话状态中,供合并智能体使用 output_key="renewable_energy_result")
# 研究员 2:电动汽车researcher_agent_2 = LlmAgent( name="EVResearcher", model=GEMINI_MODEL, instruction="""你是一名AI研究助理,专注于交通领域。研究'电动汽车技术'的最新发展。使用你被提供的 'Google Search' 工具。简洁地(1-2句话)总结你的核心发现。*只*输出总结内容。""", description="研究电动汽车技术。", tools=[google_search], output_key="ev_technology_result")
# 研究员 3:碳捕获researcher_agent_3 = LlmAgent( name="CarbonCaptureResearcher", model=GEMINI_MODEL, instruction="""你是一名AI研究助理,专注于气候解决方案。研究'碳捕获方法'的现状。使用你被提供的 'Google Search' 工具。简洁地(1-2句话)总结你的核心发现。*只*输出总结内容。""", description="研究碳捕获方法。", tools=[google_search], output_key="carbon_capture_result")
# --- 2. 创建 ParallelAgent (并行协调器) ---# 这个智能体协调三个研究员的 *并发* 执行。# 当 *所有* 研究员都完成并将其结果存入状态后,此智能体才算完成。
parallel_research_agent = ParallelAgent( name="ParallelWebResearchAgent", sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3], description="并行运行多个研究智能体以收集信息。")
# --- 3. 定义 MergerAgent (串行合并器) ---# 这个智能体在 *所有* 并行智能体 *之后* 运行。# 它从会话状态中读取由并行智能体存入的结果,# 并将它们合成为一个单一的、结构化的响应。
merger_agent = LlmAgent( name="SynthesisAgent", model=GEMINI_MODEL, # 合并任务可能需要更强的模型 instruction="""你是一名AI首席分析师,负责将多个研究发现合并为一份结构化报告。
你的主要任务是综合以下研究总结,并清晰地标明每个发现的来源领域。使用标题来组织每个主题。
**至关重要:你的整个回答必须 *严格* 基于下面'输入总结'中提供的信息。禁止添加这些特定总结中未出现的任何外部知识、事实或细节。**
**输入总结:**
* **可再生能源:** {renewable_energy_result}
* **电动汽车:** {ev_technology_result}
* **碳捕获:** {carbon_capture_result}
**输出格式:**
## 可持续技术最新进展摘要
### 可再生能源发现(基于 RenewableEnergyResearcher 的发现)[仅基于上面提供的 *可再生能源* 总结进行综合阐述。]
### 电动汽车发现(基于 EVResearcher 的发现)[仅基于上面提供的 *电动汽车* 总结进行综合阐述。]
### 碳捕获发现(基于 CarbonCaptureResearcher 的发现)[仅基于上面提供的 *碳捕获* 总结进行综合阐述。]
### 总体结论[提供一个简短(1-2句)的结论性陈述,*仅* 连接上面展示的发现。]
*只*输出遵循此格式的结构化报告。不要包含此结构之外的引言或结语。""", description="将来自并行智能体的研究发现合并为一份严格接地的结构化报告。", # 合并智能体通常不需要工具,它只处理内存中的状态)
# --- 4. 创建 SequentialAgent (主流程协调器) ---# 这是将要被运行的主智能体(根智能体)。# 它首先执行 ParallelAgent 来填充状态(并行阶段),# 然后执行 MergerAgent 来产生最终输出(串行阶段)。
sequential_pipeline_agent = SequentialAgent( name="ResearchAndSynthesisPipeline", # 1. 先并行研究, 2. 再串行合并 sub_agents=[parallel_research_agent, merger_agent], description="协调并行研究并综合结果。")
# 将这个管道设置root_agent = sequential_pipeline_agent
复制代码

2. 代码深度解析

ADK 的示例展示了一种基于“智能体(Actor)”模型的并行化思想:


  • 状态管理 (output_key): 这是 ADK 实现数据在智能体之间传递的关键。researcher_agent_1 完成后,将其结果写入会话状态的 renewable_energy_result 键中。ParallelAgent 不关心这些结果是什么,它只关心它的所有子智能体是否都已完成。

  • ParallelAgent (协调器): parallel_research_agent 充当了一个“项目经理”。它在 ADK 的执行框架内同时“激活”了三个研究员智能体。ADK 的运行时(Runner)会(同样基于asyncio)并发地执行它们。

  • MergerAgent (合并器): merger_agent 的提示是这个设计的核心。注意它的 instruction 是如何通过 {...} 占位符直接从会话状态中读取数据的。这是一种**“接地”(Grounding)**的提示,它被严格限制只能使用上游智能体提供的上下文,这极大地减少了幻觉。

  • SequentialAgent (主管道): sequential_pipeline_agent 再次展示了“并行-串行”模式。它的sub_agents列表 [parallel_research_agent, merger_agent] 定义了一个清晰的两步流程:

  • 步骤一(并行): 运行 parallel_research_agent

  • 步骤二(串行): 等待步骤一完成后,运行 merger_agent


ADK 的方法更侧重于定义**“谁”LlmAgent)以及“他们的职责”instruction),而框架(ParallelAgent, SequentialAgent)则负责处理“如何”**执行(并发或顺序)。



五、要点与结论

要点

  • 问题所在:智能体工作流中的许多子任务(如 API 调用、数据库查询)涉及大量的 I/O 等待。在纯粹的顺序执行中,这些等待时间会累加起来,导致智能体响应极其缓慢,性能低下,用户体验差。

  • 解决之道:并行模式通过识别工作流中互不依赖的任务,并同时(并发) 执行它们,提供了一种标准化的解决方案。主流程可以启动多个并行的子任务,然后在一个“屏障”处等待它们全部完成,再进入下一个串行步骤。这能大幅缩短总执行时间。

  • 经验法则:当你的工作流中存在多个可以独立运行的任务时,就应该使用并行模式。

  • 何时使用: 同时从多个 API 拉取数据(如旅行规划);并行处理不同数据分片(如客户反馈分析);同时生成需要合并的多个内容部分(如营销邮件)。

  • 目标: 最小化 I/O 等待,缩短总体执行时间,提升系统响应速度。

结论

  • 核心价值: 并行模式的核心是缩短总耗时提高效率,它通过并发执行独立任务来实现这一目标。

  • 适用场景: 在任务需要等待外部资源(如 API 调用、数据库查询、LLM 响应)时,并行模式的效果最为显著。

  • 成本考量: 采用并发或并行架构会显著增加复杂性。它对代码设计、状态管理、错误处理和日志调试都提出了更高的要求。

  • 框架支持: 幸运的是,现代智能体框架(如 LangChain、Google ADK)都内置了强大的并行执行机制,帮助开发者管理这种复杂性。

  • LangChain (LCEL): 在 LCEL 中,RunnableParallel(或直接使用字典)是用于声明式定义并行执行的核心组件。

  • Google ADK: ADK 通过ParallelAgent和多智能体委派机制来实现并行化。协调器智能体(或 LLM)识别出独立任务,并将它们分派给可以并发执行的专用子智能体。

  • 终极模式: 最强大、最高效的智能体系统,几乎总是顺序(链式)、条件(路由)和并行三种模式的混合体

最后的思考:从“机器人”到“智能团队”

如果我们把简单的提示链看作一个按部就班的“工人”,把路由模式看作一个会做决策的“经理”,那么并行模式就是将这个单人作坊升级为了一个高效的“精英团队”


并行化不仅仅是一种技术优化,它是一种思维方式的转变。它迫使我们不再将任务视为单一的线性流程,而是将其解构为一张依赖关系图,从而找出可以“分身”处理的部分。


掌握了并行模式,我们的智能体才能真正摆脱“一问一答”的延迟束缚,才能在用户失去耐心之前,迅速地从多个维度收集信息、从多个角度思考问题、从多个来源生成内容。


链式(结构)路由(决策)和并行(效率) 这三者结合起来,我们才能最终构建出那些我们真正期待的——反应迅速、智能高效、能力强大的 AI 智能体。


参考资料

1.LangChain : https://python.langchain.com/docs/concepts/lcel/

2.Google ADK : https://google.github.io/adk-docs/agents/multi-agents/

3.Python asyncio : https://docs.python.org/3/library/asyncio.html

发布于: 1 小时前阅读数: 8
用户头像

Hernon AI

关注

创意心中美好世界 2020-08-19 加入

AI爱好者

评论

发布
暂无评论
AI智能体 - 并行模式_langchain_Hernon AI_InfoQ写作社区