AI 智能体 - 并行模式

在我们的 AI 智能体设计系列中,我们已经探讨了两种基础模式:
提示链模式(Prompt Chaining):如同精密的流水线,通过严格的顺序执行来保证复杂任务的可靠性。
路由模式(Routing Pattern):如同智能的“交通枢纽”,通过条件逻辑让智能体学会了决策和“分诊”。
这些模式构建了一个强大、有逻辑的智能体。但它们共同面对一个无法回避的敌人:时间。
在现实世界中,顺序执行和复杂的决策都可能导致一个致命问题:延迟。当用户等待你的智能体逐一调用 API、逐一搜索来源时,他们的耐心正在被消耗。这时并行模式(Parallelization Pattern) 就变得至关重要。它不是对前两种模式的替代,而是赋予它们“翅膀”的关键技术,让你的智能体学会“分身术”,同时执行多个子任务。
一、并行模式
许多复杂的智能体任务,其本质并非是严格线性的。它们往往可以分解为多个互不依赖的子任务。
想象一下,你是一位主厨,需要准备一场宴会。
顺序执行(提示链): 你先花 30 分钟洗菜,然后花 60 分钟切菜,再花 90 分钟烹饪。总耗时:3 个半小时。
并行执行(并行模式): 你(协调器)指挥三个助手(子智能体):助手 A 负责洗菜,助手 B 负责切菜,助手 C 负责准备汤料。他们同时开始工作。最终,你只需要等待耗时最长的那个任务(比如烹饪)完成,总时间可能缩短到 2 小时。
并行模式涉及同时执行多个组件,例如并行的大语言模型(LLM)调用、并行的工具使用,甚至是并行的子智能体。
为什么并行如此重要?
在智能体工作流中,最大的时间杀手通常不是计算本身,而是 I/O 等待(Input/Output Wait)。
当你调用一个外部 API(如谷歌搜索、天气查询、股票数据)时,你的智能体大部分时间都在“等待”网络另一端的服务器响应。在顺序执行中,这种等待是累加的,灾难性的。
一个更具体的例子:智能体研究课题“AIGC 的未来趋势”
传统的顺序(串行)工作流:
[开始]
调用
Google Search("AIGC最新行业报告")(等待 3 秒)获取报告,调用
llm.summarize(报告)(等待 5 秒)调用
Google Search("AIGC顶级公司动态")(等待 3 秒)获取动态,调用
llm.summarize(动态)(等待 5 秒)调用
llm.synthesize(总结A, 总结B)(等待 4 秒)[结束] 总耗时:3 + 5 + 3 + 5 + 4 = 20 秒
用户在这 20 秒内只能盯着一个旋转的加载图标。
采用并行模式的工作流:
[开始]
(并行分支 A)
调用
Google Search("AIGC最新行业报告")(等待 3 秒)获取报告,调用
llm.summarize(报告)(等待 5 秒)(分支 A 结束,耗时 8 秒)
(并行分支 B)
调用
Google Search("AIGC顶级公司动态")(等待 3 秒)获取动态,调用
llm.summarize(动态)(等待 5 秒)(分支 B 结束,耗时 8 秒)
[屏障:等待 A 和 B 全部完成] (总耗时 = Max(8, 8) = 8 秒)
调用
llm.synthesize(总结A, 总结B)(等待 4 秒)[结束] 总耗时:8 + 4 = 12 秒
通过并行化,我们将总执行时间缩短了 40%。这就是并行模式的核心价值:找出工作流中互不依赖的环节,并将它们并行执行,从而大幅减少总体等待时间。
实现并行化通常需要支持异步执行(Asynchronous Execution)、多线程或多进程的框架。幸运的是,现代智能体框架(如 LangChain、LangGraph、Google ADK)都提供了原生的异步支持和并行机制,让我们可以方便地定义和运行这些并发步骤。
二、实际应用场景:并行模式的 7 个“加速器”
并行模式是提升智能体性能的强大“加速器”。让我们通过更深入的顺序与并行对比,来探索它的 7 个典型应用场景。
1. 信息收集和研究(多源情报)
用例: 投资分析智能体,任务是“全面分析 X 公司”。
顺序(缓慢且片面):
“正在查询 X 公司最新新闻...” (调用 News API, 等待 3 秒)
“正在拉取 X 公司实时股价...” (调用 Stock API, 等待 2 秒)
“正在监测 X 公司社交媒体情绪...” (调用 Social API, 等待 4 秒)
“正在查询 X 公司内部财报数据库...” (调用 DB, 等待 3 秒
总耗时: 12 秒。用户体验极差,每一步操作都需要等待反馈。
并行(快速且全面):
“正在分析 X 公司,请稍候...”
同时发起四项任务:
task_news = async_call_news_api()task_stock = async_call_stock_api()task_social = async_call_social_api()task_db = async_query_db()[屏障]
await asyncio.gather(task_news, task_stock, task_social, task_db)总耗时:
Max(3, 2, 4, 3)= 4 秒。智能体在 4 秒后,就能拿到所有情报,然后进行下一步的综合分析。
2. 数据处理和分析(多维洞察)
用例: “客户之声”(VoC)智能体,任务是“分析上周的 1000 条客户反馈”。
顺序(漫长且昂贵):
for review in all_reviews:(循环 1000 次)sentiment = llm.call(review, "情感分析")(等待 2 秒)keywords = llm.call(review, "关键词提取")(等待 2 秒)category = llm.call(review, "意图分类")(等待 2 秒总耗时:
1000 * (2 + 2 + 2) = 6000秒(约 1.6 小时)。这在实际业务中是不可接受的。并行(高效且深入):
启动三个“专家智能体”,它们同时处理同一批数据:
情感专家:
results_sentiment = await parallel_process(all_reviews, sentiment_task)关键词专家:
results_keywords = await parallel_process(all_reviews, keyword_task)分类专家:
results_category = await parallel_process(all_reviews, category_task)(这些
parallel_process函数内部也会使用asyncio.gather总耗时:
Max(情感处理总时间, 关键词处理总时间, 分类处理总时间)。通过批量并发,总时间可能缩短到几分钟。这实现了从“不可用”到“高效”的飞跃。
3. 多个 API 或工具交互(无缝体验)
用例: 旅行规划智能体,任务是“帮我规划下周末去上海的行程”。
顺序(笨拙且低效):
智能体: “好的,我先为您查询航班。” (调用 Flight API, 等待 4 秒)
智能体: “航班已找到。现在我为您查询酒店。” (调用 Hotel API, 等待 5 秒)
智能体: “酒店已找到。现在我为您查询当地天气。” (调用 Weather API, 等待 2 秒)
智能体: “天气已找到。现在我为您查询热门餐厅。” (调用 Restaurant API, 等待 3 秒
总耗时: 14 秒。这种“挤牙膏”式的交互方式非常折磨人。
并行(流畅且智能):
智能体: “好的,正在为您规划上海周末行程...”
同时发起四项任务:
task_flights = async_call_flight_api()task_hotels = async_call_hotel_api()task_weather = async_call_weather_api()task_restaurants = async_call_restaurant_api()[屏障]
await asyncio.gather(...)总耗时:
Max(4, 5, 2, 3)= 5 秒智能体: “行程已规划完毕:已为您筛选[航班信息],推荐入住[酒店信息],当地天气[天气情况],并找到三家高分餐厅[餐厅列表]。” 这种一次性交付完整答案的体验,才是用户想要的“智能”。
4. 多组件内容生成(协同创作)
用例: 营销内容智能体,任务是“为我们的新产品写一篇推广博文”。
顺序(思维受限):
title = llm.call("生成一个吸引人的标题")body = llm.call("根据标题'{title}'生成正文")image_prompt = llm.call("根据正文'{body}'生成图片提示词")问题: 这种方式看似有逻辑,但每一步都严重依赖上一步,导致思维固化。如果第一步的标题生成得不好,后面全盘皆输。
并行(协同创作):
核心思想: 好的标题、正文、图片都应源自同一个核心创意,而不是彼此的派生。
core_brief = "产品:AI降噪耳机;目标用户:通勤者;卖点:沉浸式体验"同时发起三项创作任务:
task_title = async_llm.call(core_brief, "生成5个爆款标题")task_body = async_llm.call(core_brief, "撰写一篇500字博文正文")task_image = async_llm.call(core_brief, "生成3个DALL-E提示词")[屏障]
await asyncio.gather(...)好处: 在几秒钟内,智能体就“分身”成了文案专家、内容专家和视觉专家,同时完成了工作。这不仅更快,而且产出的内容更具一致性,因为它们都忠于同一个
core_brief。
5. 验证和核实(即时反馈)
用例: 用户注册智能体,任务是“验证新用户注册信息”。
顺序(逐项检查):
“正在检查邮箱格式...” (本地检查, 0.1 秒)
“正在验证手机号真实性...” (调用 Twilio API, 等待 2 秒)
“正在核实地址有效性...” (调用 Address API, 等待 2.5 秒)
“正在对用户名进行内容安全审核...” (调用 Moderation LLM, 等待 1.5 秒
总耗时:
0.1 + 2 + 2.5 + 1.5= 6.1 秒。用户在注册页面干等 6 秒,流失率会大大增加。并行(即时通过):
“正在检查邮箱格式...” (0.1 秒)
同时发起三项外部验证:
task_phone = async_call_twilio_api()task_address = async_call_address_api()task_username = async_call_moderation_llm()[屏障]
await asyncio.gather(...)总耗时:
0.1 + Max(2, 2.5, 1.5)= 2.6 秒。将注册验证时间缩短 60%以上,这是提升转化率的实战技术。
6. 多模态处理(全维理解)
用例: 社交媒体监控智能体,任务是“分析新发布的包含视频和文字的帖子”。
顺序(信息割裂):
“正在分析帖子文本...” (调用 Text NLP API, 等待 1 秒)
“正在分析帖子视频...” (调用 Video Vision API, 需转录和理解, 等待 10 秒
总耗时: 11 秒。问题是,文本和视频是互补的,割裂分析会丢失关键信息(比如“看我视频里的操作”)。
并行(融合理解):
同时启动两个模态的分析器:
task_text = async_nlp_api.process(post.text)task_video = async_vision_api.process(post.video)[屏障]
await asyncio.gather(...)总耗时:
Max(1, 10)= 10 秒下一步(融合): 将两个分析结果
text_analysis和video_analysis同时喂给一个 LLM 进行多模态融合:llm.call("综合分析文本'{...}'和视频内容'{...}',判断真实意图。")。这不仅更快,而且是实现真正多模态理解的必要前提。
7. A/B 测试或多种方案生成(优中选优)
用例: 创意广告智能体,任务是“为同一产品生成三种不同风格的广告文案”。
顺序(单一思维):
llm.call("写个文案")-> 得到 Allm.call("换个风格再写一个")-> 得到 Bllm.call("再换个幽默的风格")问题: 耗时是 3 倍,且 LLM 的后续回答容易受到前序对话的“污染”。
并行(思维发散):
定义三个不同的“专家”提示(Persona):
prompt_bold = "你是一个大胆的营销者...写一个有冲击力的文案"prompt_witty = "你是一个幽默的段子手...写一个风趣的文案"prompt_formal = "你是一个专业的工程师...写一个严谨的文案"同时发起三个 LLM 调用:
task_a = async_llm.call(prompt_bold, product_info)task_b = async_llm.call(prompt_witty, product_info)task_c = async_llm.call(prompt_formal, product_info)[屏障]
await asyncio.gather(...)好处: 在 1 倍的时间内,获得了三个风格迥异、互不干扰的高质量备选方案。这使得智能体能快速生成多个选项,供人类决策者挑选或进行下一步的 A/B 测试。
三、实战示例 (LangChain):构建“并行研究链”
在 LangChain 框架中,通过其强大的表达式语言(LCEL) 可以极其优雅地实现并行执行。核心组件是 RunnableParallel(在 LCEL 中,一个字典本身就常常被视为一个并行可运行对象)。
下面的示例将构建一个工作流:用户输入一个主题(如“太空探索史”),工作流将同时启动三个独立的操作(总结、提问、提取关键词),然后将它们各自的输出合并,生成一个最终的综合答案。
1. 代码实现 (Python)
2. 代码深度解析
这个示例精妙地展示了 LangChain 如何将复杂的异步逻辑抽象为声明式的管道:
异步的本质 (Concurrency vs. Parallelism): 必须指出,在标准的 Python 环境中,由于全局解释器锁(GIL)的存在,这里的
asyncio实现的是并发(Concurrency),而不是真正的并行(Parallelism)。并发: 任务在单个线程中快速切换执行。当一个任务(如
summarize_chain)发起网络请求(await llm.call(...))并“等待”时,事件循环会立即切换到另一个任务(如questions_chain)并开始执行,直到它也进入等待。这极大地利用了 I/O 等待时间,非常适合 LLM 和 API 调用。并行: 任务在多个 CPU 核心上同时运行。这适用于 CPU 密集型任务(如复杂的数学计算)。
对于智能体来说,并发通常就是我们所追求的,因为它完美地解决了 I/O 等待的瓶颈。
RunnableParallel的魔力:map_chain这个字典是 LCEL 的核心。当你ainvoke(异步调用)它时,LCEL 执行器会检查字典中的所有值(summarize_chain,questions_chain...),并自动将它们提交到asyncio事件循环中并发执行。RunnablePassthrough的作用: 这是一个精巧的工具。map_chain中的三个子链都需要{topic}作为输入,但最终的synthesis_prompt也需要{topic}。RunnablePassthrough()确保了原始的topic字符串能够“毫发无伤”地穿过并行处理阶段,并与其他并行结果一起,以"topic": topic_string的形式出现在map_chain的输出字典中,供synthesis_prompt使用。“并行-串行”混合模式:
full_parallel_chain = map_chain | synthesis_prompt | ...这个表达式清晰地定义了一个“并行-串行”工作流。map_chain是并行阶段,synthesis_prompt是串行阶段(它必须等待map_chain的所有结果)。这是优化智能体性能最常用和最有效的模式。
四、实战示例 (Google ADK):构建“并行研究团队”
现在,我们使用 Google Agent Development Kit (ADK) 来演示同样的概念。ADK 采用了一种略有不同的、基于“智能体”的抽象方法。我们不再是构建“链”,而是定义具有特定职责的智能体,并通过一个协调器智能体来管理它们。
此示例将构建一个研究团队:一个ParallelAgent(项目经理)同时派出三个LlmAgent(研究员)去研究不同主题,然后一个MergerAgent(首席分析师)将他们的报告汇总起来。
1. 代码实现 (Python - ADK 风格)
2. 代码深度解析
ADK 的示例展示了一种基于“智能体(Actor)”模型的并行化思想:
状态管理 (
output_key): 这是 ADK 实现数据在智能体之间传递的关键。researcher_agent_1完成后,将其结果写入会话状态的renewable_energy_result键中。ParallelAgent不关心这些结果是什么,它只关心它的所有子智能体是否都已完成。ParallelAgent(协调器):parallel_research_agent充当了一个“项目经理”。它在 ADK 的执行框架内同时“激活”了三个研究员智能体。ADK 的运行时(Runner)会(同样基于asyncio)并发地执行它们。MergerAgent(合并器):merger_agent的提示是这个设计的核心。注意它的instruction是如何通过{...}占位符直接从会话状态中读取数据的。这是一种**“接地”(Grounding)**的提示,它被严格限制只能使用上游智能体提供的上下文,这极大地减少了幻觉。SequentialAgent(主管道):sequential_pipeline_agent再次展示了“并行-串行”模式。它的sub_agents列表[parallel_research_agent, merger_agent]定义了一个清晰的两步流程:步骤一(并行): 运行
parallel_research_agent。步骤二(串行): 等待步骤一完成后,运行
merger_agent。
ADK 的方法更侧重于定义**“谁”(LlmAgent)以及“他们的职责”(instruction),而框架(ParallelAgent, SequentialAgent)则负责处理“如何”**执行(并发或顺序)。
五、要点与结论
要点
问题所在:智能体工作流中的许多子任务(如 API 调用、数据库查询)涉及大量的 I/O 等待。在纯粹的顺序执行中,这些等待时间会累加起来,导致智能体响应极其缓慢,性能低下,用户体验差。
解决之道:并行模式通过识别工作流中互不依赖的任务,并同时(并发) 执行它们,提供了一种标准化的解决方案。主流程可以启动多个并行的子任务,然后在一个“屏障”处等待它们全部完成,再进入下一个串行步骤。这能大幅缩短总执行时间。
经验法则:当你的工作流中存在多个可以独立运行的任务时,就应该使用并行模式。
何时使用: 同时从多个 API 拉取数据(如旅行规划);并行处理不同数据分片(如客户反馈分析);同时生成需要合并的多个内容部分(如营销邮件)。
目标: 最小化 I/O 等待,缩短总体执行时间,提升系统响应速度。
结论
核心价值: 并行模式的核心是缩短总耗时和提高效率,它通过并发执行独立任务来实现这一目标。
适用场景: 在任务需要等待外部资源(如 API 调用、数据库查询、LLM 响应)时,并行模式的效果最为显著。
成本考量: 采用并发或并行架构会显著增加复杂性。它对代码设计、状态管理、错误处理和日志调试都提出了更高的要求。
框架支持: 幸运的是,现代智能体框架(如 LangChain、Google ADK)都内置了强大的并行执行机制,帮助开发者管理这种复杂性。
LangChain (LCEL): 在 LCEL 中,
RunnableParallel(或直接使用字典)是用于声明式定义并行执行的核心组件。Google ADK: ADK 通过
ParallelAgent和多智能体委派机制来实现并行化。协调器智能体(或 LLM)识别出独立任务,并将它们分派给可以并发执行的专用子智能体。终极模式: 最强大、最高效的智能体系统,几乎总是顺序(链式)、条件(路由)和并行三种模式的混合体。
最后的思考:从“机器人”到“智能团队”
如果我们把简单的提示链看作一个按部就班的“工人”,把路由模式看作一个会做决策的“经理”,那么并行模式就是将这个单人作坊升级为了一个高效的“精英团队”。
并行化不仅仅是一种技术优化,它是一种思维方式的转变。它迫使我们不再将任务视为单一的线性流程,而是将其解构为一张依赖关系图,从而找出可以“分身”处理的部分。
掌握了并行模式,我们的智能体才能真正摆脱“一问一答”的延迟束缚,才能在用户失去耐心之前,迅速地从多个维度收集信息、从多个角度思考问题、从多个来源生成内容。
将链式(结构)、路由(决策)和并行(效率) 这三者结合起来,我们才能最终构建出那些我们真正期待的——反应迅速、智能高效、能力强大的 AI 智能体。
参考资料
1.LangChain : https://python.langchain.com/docs/concepts/lcel/
2.Google ADK : https://google.github.io/adk-docs/agents/multi-agents/
3.Python asyncio : https://docs.python.org/3/library/asyncio.html
版权声明: 本文为 InfoQ 作者【Hernon AI】的原创文章。
原文链接:【http://xie.infoq.cn/article/6d5c8a36d195be546634eeff9】。文章转载请联系作者。







评论