利用 Playwright MCP 与 LLM 构建复杂的工作流与 AI 智能体
- 2025-10-15 北京
本文字数:8166 字
阅读完需:约 27 分钟
在当今快速发展的 AI 领域,将大型语言模型(LLM)与实际应用场景相结合已成为提升生产力的关键。然而,LLM 本身存在局限性——它们无法直接与现实世界交互、操作应用程序或执行复杂的工作流。这就是为什么我们需要像 Playwright MCP 这样的工具来弥合这一差距。
本文将深入探讨如何利用 Playwright MCP 与 LLM 协同工作,构建能够处理复杂任务的工作流和智能 AI 代理。
什么是 Playwright MCP?
Playwright MCP 是一个基于 Model Context Protocol 的桥接工具,它将强大的浏览器自动化框架 Playwright 与 LLM 连接起来。MCP 协议允许 LLM 访问外部工具和资源,而 Playwright 则提供了跨浏览器的自动化能力。
核心组件
Playwright:Microsoft 开发的跨浏览器自动化工具,支持 Chromium、Firefox 和 WebKit
MCP Server:处理 LLM 与 Playwright 之间的通信
LLM 接口:提供自然语言理解和任务规划能力
环境设置与安装
前置
Node.js 16+
Python 3.8+
访问 LLM API(如 OpenAI GPT、Claude 等)
安装步骤
# 安装Playwrightnpm install playwrightnpx playwright install
# 安装MCP相关依赖pip install mcp-client playwright-async
# 克隆Playwright MCP仓库git clone https://github.com/your-repo/playwright-mcp.gitcd playwright-mcp基础配置
# config.pyimport osfrom mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_client
class PlaywrightMCPConfig: def __init__(self): self.browser_type = "chromium"# chromium, firefox, webkit self.headless = False self.timeout = 30000 self.llm_api_key = os.getenv("LLM_API_KEY") def get_server_params(self): return StdioServerParameters( command="node", args=["path/to/playwright-mcp-server.js"] )构建基础工作流
1. 初始化连接
import asynciofrom mcp.client.stdio import stdio_clientfrom mcp import ClientSessionfrom config import PlaywrightMCPConfig
class PlaywrightMCPClient: def __init__(self, config: PlaywrightMCPConfig): self.config = config self.session = None asyncdef connect(self): server_params = self.config.get_server_params() asyncwith stdio_client(server_params) as (read, write): asyncwith ClientSession(read, write) as session: self.session = session # 初始化会话 await session.initialize() return self2. 基本网页操作
class WebAutomationWorkflow: def __init__(self, mcp_client): self.client = mcp_client asyncdef navigate_to_page(self, url: str): """导航到指定页面""" result = await self.client.session.call_tool( "navigate", {"url": url} ) return result asyncdef fill_form(self, selector: str, value: str): """填写表单""" result = await self.client.session.call_tool( "fill", {"selector": selector, "value": value} ) return result asyncdef click_element(self, selector: str): """点击元素""" result = await self.client.session.call_tool( "click", {"selector": selector} ) return result asyncdef extract_text(self, selector: str): """提取文本内容""" result = await self.client.session.call_tool( "get_text", {"selector": selector} ) return result集成 LLM 创建智能工作流
1. LLM 任务规划器
import openaifrom typing import List, Dict, Any
class LLMTaskPlanner: def __init__(self, api_key: str): self.client = openai.OpenAI(api_key=api_key) def plan_workflow(self, user_request: str) -> List[Dict[str, Any]]: """使用LLM解析用户请求并生成工作流步骤""" prompt = f""" 根据以下用户请求,生成一个详细的Playwright自动化工作流。 用户请求: {user_request} 请以JSON格式返回步骤列表,每个步骤包含: - action: 操作类型 (navigate, click, fill, extract, wait, etc.) - parameters: 操作参数 - description: 步骤描述 只返回JSON格式的结果。 """ response = self.client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.1 ) return self._parse_response(response.choices[0].message.content) def _parse_response(self, response: str) -> List[Dict[str, Any]]: """解析LLM响应为结构化工作流""" import json try: # 清理响应并提取JSON cleaned_response = response.strip() if"```json"in cleaned_response: cleaned_response = cleaned_response.split("```json")[1].split("```")[0] elif"```"in cleaned_response: cleaned_response = cleaned_response.split("```")[1] return json.loads(cleaned_response) except Exception as e: print(f"解析LLM响应失败: {e}") return []2. 智能工作流执行器
class IntelligentWorkflowExecutor: def __init__(self, mcp_client, llm_planner): self.mcp_client = mcp_client self.planner = llm_planner self.automation = WebAutomationWorkflow(mcp_client) asyncdef execute_user_request(self, user_request: str): """执行用户自然语言请求的完整工作流""" print(f"处理用户请求: {user_request}") # 1. 使用LLM规划工作流 workflow_steps = self.planner.plan_workflow(user_request) print(f"生成的工作流步骤: {len(workflow_steps)}步") # 2. 执行工作流 results = [] for i, step in enumerate(workflow_steps, 1): print(f"执行步骤 {i}: {step['description']}") try: result = await self._execute_step(step) results.append({ "step": i, "description": step["description"], "result": result, "status": "success" }) except Exception as e: results.append({ "step": i, "description": step["description"], "error": str(e), "status": "failed" }) print(f"步骤 {i} 执行失败: {e}") break return results asyncdef _execute_step(self, step: Dict[str, Any]): """执行单个工作流步骤""" action = step["action"] params = step["parameters"] if action == "navigate": returnawait self.automation.navigate_to_page(params["url"]) elif action == "click": returnawait self.automation.click_element(params["selector"]) elif action == "fill": returnawait self.automation.fill_form(params["selector"], params["value"]) elif action == "extract": returnawait self.automation.extract_text(params["selector"]) elif action == "wait": await asyncio.sleep(params.get("seconds", 2)) return"等待完成" else: raise ValueError(f"未知操作: {action}")高级应用:构建 AI 智能体
1. 自适应智能体
class AdaptiveAIAgent: def __init__(self, mcp_client, llm_planner, executor): self.mcp_client = mcp_client self.planner = llm_planner self.executor = executor self.conversation_history = [] asyncdef process_request(self, user_input: str, context: Dict = None): """处理用户输入并执行相应操作""" # 添加上下文到对话历史 self.conversation_history.append({"role": "user", "content": user_input}) # 分析用户意图 intent = await self._analyze_intent(user_input, context) if intent["type"] == "automation": # 执行自动化工作流 results = await self.executor.execute_user_request(user_input) # 生成自然语言总结 summary = await self._generate_summary(user_input, results) self.conversation_history.append({ "role": "assistant", "content": summary }) return { "type": "automation", "results": results, "summary": summary } elif intent["type"] == "query": # 处理查询请求 response = await self._handle_query(user_input) return { "type": "query", "response": response } asyncdef _analyze_intent(self, user_input: str, context: Dict) -> Dict: """使用LLM分析用户意图""" # 简化的意图分析实现 automation_keywords = ["打开", "点击", "填写", "导航", "提取", "自动化"] if any(keyword in user_input for keyword in automation_keywords): return {"type": "automation", "confidence": 0.9} else: return {"type": "query", "confidence": 0.7} asyncdef _generate_summary(self, request: str, results: List) -> str: """生成工作流执行总结""" success_steps = [r for r in results if r["status"] == "success"] returnf""" 已完成您的要求: {request} 执行统计: - 总步骤数: {len(results)} - 成功步骤: {len(success_steps)} - 失败步骤: {len(results) - len(success_steps)} {'所有步骤均成功完成!' if len(success_steps) == len(results) else '部分步骤执行失败,请检查错误信息。'} """2. 复杂工作流示例:电商数据采集
class EcommerceDataAgent: def __init__(self, base_agent): self.agent = base_agent asyncdef collect_product_data(self, product_url: str, data_points: List[str]): """采集电商产品数据""" workflow_request = f""" 请执行以下电商数据采集任务: 1. 导航到产品页面: {product_url} 2. 提取产品标题 3. 提取产品价格 4. 提取产品评分 5. 提取产品描述 6. 提取客户评论数量 """ # 执行数据采集 results = await self.agent.process_request(workflow_request) # 数据清洗和结构化 structured_data = await self._structure_product_data(results) return structured_data asyncdef _structure_product_data(self, raw_results: Dict) -> Dict: """将采集的数据结构化""" # 实现数据解析和结构化逻辑 structured = {} for result in raw_results.get("results", []): if"result"in result and result["result"]: # 解析提取的数据 text_content = result["result"].get("content", "") # 根据步骤描述识别数据类型 if"标题"in result["description"]: structured["title"] = self._clean_text(text_content) elif"价格"in result["description"]: structured["price"] = self._extract_price(text_content) elif"评分"in result["description"]: structured["rating"] = self._extract_rating(text_content) return structured def _clean_text(self, text: str) -> str: """清理文本数据""" return text.strip() if text else"" def _extract_price(self, text: str) -> float: """提取价格信息""" import re matches = re.findall(r'[\d.,]+', text) return float(matches[0].replace(',', '')) if matches else0.0错误处理与优化
1. 鲁棒性增强
class RobustWorkflowExecutor(IntelligentWorkflowExecutor): asyncdef execute_with_retry(self, user_request: str, max_retries: int = 3): """带重试机制的工作流执行""" for attempt in range(max_retries): try: results = await self.execute_user_request(user_request) # 检查是否有失败步骤 failed_steps = [r for r in results if r["status"] == "failed"] ifnot failed_steps: return results print(f"第 {attempt + 1} 次尝试,{len(failed_steps)} 个步骤失败") # 最后一次尝试仍然失败,抛出异常 if attempt == max_retries - 1: raise Exception(f"工作流执行失败,{len(failed_steps)} 个步骤未完成") except Exception as e: print(f"第 {attempt + 1} 次尝试失败: {e}") if attempt == max_retries - 1: raise await asyncio.sleep(2) # 重试前等待 return [] asyncdef _execute_step_with_fallback(self, step: Dict): """带备用方案的步骤执行""" try: returnawait self._execute_step(step) except Exception as e: print(f"步骤执行失败: {e},尝试备用方案") # 实现备用执行逻辑 if step["action"] == "click": # 尝试不同的选择器 returnawait self._try_alternative_selectors(step) elif step["action"] == "extract": # 尝试不同的数据提取方法 returnawait self._try_alternative_extraction(step) else: raise2. 性能监控
import timefrom dataclasses import dataclassfrom typing import List
@dataclassclass PerformanceMetrics: total_steps: int successful_steps: int failed_steps: int total_time: float average_step_time: float
class PerformanceMonitor: def __init__(self): self.metrics_history: List[PerformanceMetrics] = [] def start_execution(self): self.start_time = time.time() self.step_times = [] def record_step(self, success: bool, step_time: float): self.step_times.append(step_time) def end_execution(self, total_steps: int, successful_steps: int): total_time = time.time() - self.start_time avg_time = sum(self.step_times) / len(self.step_times) if self.step_times else0 metrics = PerformanceMetrics( total_steps=total_steps, successful_steps=successful_steps, failed_steps=total_steps - successful_steps, total_time=total_time, average_step_time=avg_time ) self.metrics_history.append(metrics) return metrics实际应用场景
1. 自动化测试智能体
class TestingAutomationAgent: def __init__(self, base_agent): self.agent = base_agent async def run_e2e_test(self, test_scenario: str): """执行端到端测试""" test_request = f""" 执行以下端到端测试场景: {test_scenario} 包括: 1. 导航到测试页面 2. 执行测试步骤 3. 验证预期结果 4. 生成测试报告 """ return await self.agent.process_request(test_request)2. 数据监控智能体
class MonitoringAgent: def __init__(self, base_agent, alert_thresholds: Dict): self.agent = base_agent self.thresholds = alert_thresholds asyncdef monitor_website(self, url: str, check_interval: int = 3600): """定期监控网站状态""" whileTrue: try: status = await self._check_website_status(url) ifnot status["is_healthy"]: await self._send_alert(f"网站异常: {status['issues']}") except Exception as e: await self._send_alert(f"监控检查失败: {e}") await asyncio.sleep(check_interval) asyncdef _check_website_status(self, url: str) -> Dict: """检查网站健康状态""" check_request = f""" 检查网站健康状况: 1. 访问 {url} 2. 检查页面加载时间 3. 验证关键功能是否正常 4. 检查错误信息 """ results = await self.agent.process_request(check_request) return self._analyze_health_status(results)结论
通过结合 Playwright MCP 和 LLM,我们能够构建强大的 AI 智能体和工作流系统,这些系统能够:
理解自然语言指令并转化为具体操作
自动化复杂业务流程,减少人工干预
自适应处理异常情况,提高系统鲁棒性
持续学习和优化执行策略
这种技术组合为自动化测试、数据采集、监控警报等场景提供了全新的解决方案。随着 AI 技术的不断发展,这种模式将在更多领域展现其价值,推动企业数字化转型和智能化升级。
Playwright MCP 与 LLM 的结合只是 AI 驱动自动化的开始,这个领域的发展潜力无限,值得我们持续关注和探索。
测试人
专注于软件测试开发 2022-08-29 加入
霍格沃兹测试开发学社,测试人社区:https://ceshiren.com/t/topic/22284







评论