From 8a4d792f562b873c3bc7757faab0bf8610a66c27 Mon Sep 17 00:00:00 2001 From: Hongxiao Date: Thu, 19 Jun 2025 00:03:16 -0400 Subject: [PATCH] [Dev]identify `_wait_for_stable_network` as time performance bottleneck --- browser_use/agent/service.py | 325 +++++++++++++++++++-------------- browser_use/browser/context.py | 55 ++++-- run_replay.py | 140 +++++++------- 3 files changed, 303 insertions(+), 217 deletions(-) diff --git a/browser_use/agent/service.py b/browser_use/agent/service.py index 51f4eb9..a93db3b 100644 --- a/browser_use/agent/service.py +++ b/browser_use/agent/service.py @@ -59,6 +59,15 @@ ) from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync from browser_use.wap.exact_replay import run_exact_replay +import time +from contextlib import contextmanager + +@contextmanager +def timer(label: str): + start = time.perf_counter() + yield + elapsed = time.perf_counter() - start + print(f"[AGENT TIMER] {label:<30} {elapsed:.3f}s") load_dotenv() logger = logging.getLogger(__name__) @@ -664,66 +673,89 @@ async def wap_smart_replay_step(self, step_info: Optional[AgentStepInfo] = None) @time_execution_async('--step (agent)') async def wap_exact_replay_step(self, step_info: Optional[AgentStepInfo] = None) -> None: - """Execute one step of the task""" + """Execute one step of the task, with timers around each sub-step.""" logger.info(f'πŸ“ Exact Replay Step {self.exact_replay_list_index + 1}') - state = None - model_output = None result: list[ActionResult] = [] step_start_time = time.time() - tokens = 0 try: - state = await self.browser_context.get_state() - await self._raise_if_stopped_or_paused() - self._message_manager.add_state_message(state, self.state.last_result, step_info, "", self.settings.use_vision) + # 1) Get current browser state + with timer("get_state"): + state = await self.browser_context.get_state() + + # 2) Check for pauses or stops + with timer("raise_if_stopped_or_paused"): + await self._raise_if_stopped_or_paused() + + # 3) Add state message + with timer("add_state_message"): + self._message_manager.add_state_message( + state, + self.state.last_result, + step_info, + "", + self.settings.use_vision + ) + + # 4) Prepare the action cur_action = self.exact_replay_list[self.exact_replay_list_index] - result: list[ActionResult] = await run_exact_replay([cur_action], - self.controller, - self.browser_context, - self.ActionModel, - self.settings.page_extraction_llm, - self.sensitive_data, - self.settings.available_file_paths, - self.context) - self.exact_replay_list_index += 1 - self.state.last_result = result - if len(result) > 0 and result[-1].is_done: - logger.info(f'πŸ“„ Result: {result[-1].extracted_content}') + # 5) Run the exact replay itself + with timer("run_exact_replay"): + result = await run_exact_replay( + [cur_action], + self.controller, + self.browser_context, + self.ActionModel, + self.settings.page_extraction_llm, + self.sensitive_data, + self.settings.available_file_paths, + self.context + ) - self.state.consecutive_failures = 0 + # 6) Advance internal index and record result + with timer("post_replay_index"): + self.exact_replay_list_index += 1 + self.state.last_result = result + if result and result[-1].is_done: + logger.info(f'πŸ“„ Result: {result[-1].extracted_content}') + self.state.consecutive_failures = 0 except InterruptedError: - # logger.debug('Agent paused') self.state.last_result = [ ActionResult( - error='The agent was paused mid-step - the last action might need to be repeated', include_in_memory=False + error='The agent was paused mid-step – retry needed', + include_in_memory=False ) ] return + except asyncio.CancelledError: - # Directly handle the case where the step is cancelled at a higher level - # logger.debug('Task cancelled - agent was paused with Ctrl+C') - self.state.last_result = [ActionResult(error='The agent was paused with Ctrl+C', include_in_memory=False)] + self.state.last_result = [ + ActionResult( + error='The agent was paused with Ctrl+C', + include_in_memory=False + ) + ] raise InterruptedError('Step cancelled by user') + except Exception as e: traceback.print_exc() result = await self._handle_step_error(e) self.state.last_result = result finally: + # 7) Build history item (if any result) step_end_time = time.time() - if not result: - return - - if state: - metadata = StepMetadata( - step_number=self.state.n_steps, - step_start_time=step_start_time, - step_end_time=step_end_time, - input_tokens=tokens, - ) - self._make_history_item(model_output, state, result, metadata) + if result: + with timer("make_history_item"): + metadata = StepMetadata( + step_number=self.state.n_steps, + step_start_time=step_start_time, + step_end_time=step_end_time, + input_tokens=0 # or track tokens if needed + ) + self._make_history_item(model_output=None, state=state, result=result, metadata=metadata) @time_execution_async('--handle_step_error (agent)') @@ -937,119 +969,134 @@ async def take_step(self) -> tuple[bool, bool]: return True, True return False, False - + # @observe(name='agent.run', ignore_output=True) @time_execution_async('--run (agent)') async def run( - self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, on_step_end: AgentHookFunc | None = None + self, + max_steps: int = 100, + on_step_start: AgentHookFunc | None = None, + on_step_end: AgentHookFunc | None = None ) -> AgentHistoryList: - """Execute the task with maximum number of steps""" - - loop = asyncio.get_event_loop() - - # Set up the Ctrl+C signal handler with callbacks specific to this agent - from browser_use.utils import SignalHandler - - signal_handler = SignalHandler( - loop=loop, - pause_callback=self.pause, - resume_callback=self.resume, - custom_exit_callback=None, # No special cleanup needed on forced exit - exit_on_second_int=True, - ) - signal_handler.register() - - # Start non-blocking LLM connection verification - assert self.llm._verified_api_keys, 'Failed to verify LLM API keys' - - try: - self._log_agent_run() - - # Execute initial actions if provided - if self.initial_actions: - result = await self.multi_act(self.initial_actions, check_for_new_elements=False) - self.state.last_result = result - - for step in range(max_steps): - # Check if waiting for user input after Ctrl+C - if self.state.paused: - signal_handler.wait_for_resume() - signal_handler.reset() - - # Check if we should stop due to too many failures - if self.state.consecutive_failures >= self.settings.max_failures: - logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures') - break - - # Check control flags before each step - if self.state.stopped: - logger.info('Agent stopped') - break - - while self.state.paused: - await asyncio.sleep(0.2) # Small delay to prevent CPU spinning - if self.state.stopped: # Allow stopping while paused - break + """Execute the task with maximum number of steps, now with timers.""" + with timer("agent.run total"): + loop = asyncio.get_event_loop() - if on_step_start is not None: - await on_step_start(self) + # Set up Ctrl+C handler + from browser_use.utils import SignalHandler + signal_handler = SignalHandler( + loop=loop, + pause_callback=self.pause, + resume_callback=self.resume, + custom_exit_callback=None, + exit_on_second_int=True, + ) + signal_handler.register() - step_info = AgentStepInfo(step_number=step, max_steps=max_steps) + # Verify LLM connection + assert self.llm._verified_api_keys, 'Failed to verify LLM API keys' - if self.replay_mode == "exact_replay": - if len(self.exact_replay_list) <= self.exact_replay_list_index: - break - await self.wap_exact_replay_step(step_info) - elif self.replay_mode == "smart_replay": - await self.wap_smart_replay_step(step_info) + try: + self._log_agent_run() + + # Initial actions + if self.initial_actions: + with timer(" multi_act initial_actions"): + result = await self.multi_act( + self.initial_actions, check_for_new_elements=False + ) + self.state.last_result = result + + # Main loop + for step in range(max_steps): + with timer(f" step {step:>3} total"): + # Pause/resume checks + if self.state.paused: + signal_handler.wait_for_resume() + signal_handler.reset() + if self.state.consecutive_failures >= self.settings.max_failures: + logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures') + break + if self.state.stopped: + logger.info('Agent stopped') + break + while self.state.paused: + await asyncio.sleep(0.2) + if self.state.stopped: + break + + if on_step_start is not None: + await on_step_start(self) + + step_info = AgentStepInfo(step_number=step, max_steps=max_steps) + + # Replay or normal step timing + if self.replay_mode == "exact_replay": + with timer(" exact_replay_step"): + await self.wap_exact_replay_step(step_info) + elif self.replay_mode == "smart_replay": + with timer(" smart_replay_step"): + await self.wap_smart_replay_step(step_info) + else: + with timer(" base_step"): + await self.step(step_info) + + if on_step_end is not None: + await on_step_end(self) + + if self.state.history.is_done(): + if ( + self.settings.validate_output + and step < max_steps - 1 + and self.replay_mode not in ("exact_replay", "smart_replay") + ): + with timer(" validate_output"): + if not await self._validate_output(): + continue + with timer(" log_completion"): + await self.log_completion() + break else: - await self.step(step_info) - - if on_step_end is not None: - await on_step_end(self) - - if self.state.history.is_done(): - if self.settings.validate_output and step < max_steps - 1 and self.replay_mode != "exact_replay" and self.replay_mode != "smart_replay": - if not await self._validate_output(): - continue - - await self.log_completion() - break - else: - logger.info('❌ Failed to complete task in maximum steps') - - return self.state.history - - except KeyboardInterrupt: - # Already handled by our signal handler, but catch any direct KeyboardInterrupt as well - logger.info('Got KeyboardInterrupt during execution, returning current history') - return self.state.history - - finally: - # Unregister signal handlers before cleanup - signal_handler.unregister() - - self.telemetry.capture( - AgentEndTelemetryEvent( - agent_id=self.state.agent_id, - is_done=self.state.history.is_done(), - success=self.state.history.is_successful(), - steps=self.state.n_steps, - max_steps_reached=self.state.n_steps >= max_steps, - errors=self.state.history.errors(), - total_input_tokens=self.state.history.total_input_tokens(), - total_duration_seconds=self.state.history.total_duration_seconds(), + logger.info('❌ Failed to complete task in maximum steps') + + return self.state.history + + except KeyboardInterrupt: + logger.info('Got KeyboardInterrupt during execution, returning current history') + return self.state.history + + finally: + signal_handler.unregister() + + # Telemetry capture (unchanged) + self.telemetry.capture( + AgentEndTelemetryEvent( + agent_id=self.state.agent_id, + is_done=self.state.history.is_done(), + success=self.state.history.is_successful(), + steps=self.state.n_steps, + max_steps_reached=self.state.n_steps >= max_steps, + errors=self.state.history.errors(), + total_input_tokens=self.state.history.total_input_tokens(), + total_duration_seconds=self.state.history.total_duration_seconds(), + ) ) - ) - - await self.close() - - if self.settings.generate_gif: - output_path: str = 'agent_history.gif' - if isinstance(self.settings.generate_gif, str): - output_path = self.settings.generate_gif - create_history_gif(task=self.task, history=self.state.history, output_path=output_path) + # Close the agent + with timer(" close"): + await self.close() + + # Optional GIF + if self.settings.generate_gif: + output_path: str = 'agent_history.gif' + if isinstance(self.settings.generate_gif, str): + output_path = self.settings.generate_gif + with timer(" create_history_gif"): + create_history_gif( + task=self.task, + history=self.state.history, + output_path=output_path + ) # @observe(name='controller.multi_act') @time_execution_async('--multi-act (agent)') diff --git a/browser_use/browser/context.py b/browser_use/browser/context.py index 461124e..1c34aeb 100644 --- a/browser_use/browser/context.py +++ b/browser_use/browser/context.py @@ -684,31 +684,41 @@ async def _wait_for_page_and_frames_load(self, timeout_overwrite: float | None = Waits for either network to be idle or minimum WAIT_TIME, whichever is longer. Also checks if the loaded URL is allowed. """ - # Start timing - start_time = time.time() + # 1) Begin timing + t0 = time.perf_counter() - # Wait for page load + # 2) Wait for stable network + t1 = time.perf_counter() try: await self._wait_for_stable_network() + except Exception as e: + # still time it even if it errors + pass + t2 = time.perf_counter() + print(f"[LOAD] _wait_for_stable_network: {t2 - t1:.3f}s") - # Check if the loaded URL is allowed + # 3) Navigation check + t3 = time.perf_counter() + try: page = await self.get_current_page() await self._check_and_handle_navigation(page) - except URLNotAllowedError as e: - raise e except Exception: - logger.warning('⚠️ Page load failed, continuing...') + # swallow navigation errors pass + t4 = time.perf_counter() + print(f"[LOAD] _check_and_handle_navigation: {t4 - t3:.3f}s") - # Calculate remaining time to meet minimum WAIT_TIME - elapsed = time.time() - start_time - remaining = max((timeout_overwrite or self.config.minimum_wait_page_load_time) - elapsed, 0) - - logger.debug(f'--Page loaded in {elapsed:.2f} seconds, waiting for additional {remaining:.2f} seconds') + # 4) Enforce minimum wait time + elapsed = t4 - t0 + minimum = timeout_overwrite or self.config.minimum_wait_page_load_time + remaining = max(minimum - elapsed, 0) + print(f"[LOAD] elapsed before sleep: {elapsed:.3f}s, remaining sleep: {remaining:.3f}s") - # Sleep remaining time if needed if remaining > 0: + t5 = time.perf_counter() await asyncio.sleep(remaining) + t6 = time.perf_counter() + print(f"[LOAD] asyncio.sleep: {t6 - t5:.3f}s") def _is_url_allowed(self, url: str) -> bool: """Check if a URL is allowed based on the whitelist configuration.""" @@ -877,14 +887,27 @@ async def get_page_structure(self) -> str: structure = await page.evaluate(debug_script) return structure - @time_execution_sync('--get_state') # This decorator might need to be updated to handle async async def get_state(self) -> BrowserState: - """Get the current state of the browser""" + """Get the current state of the browser, with per-step timers.""" + # 1) Wait for page & frames to load + t0 = time.perf_counter() await self._wait_for_page_and_frames_load() + t1 = time.perf_counter() + print(f"[GET_STATE] _wait_for_page_and_frames_load: {t1 - t0:.3f}s") + + # 2) Retrieve or create session object + t0 = time.perf_counter() session = await self.get_session() + t1 = time.perf_counter() + print(f"[GET_STATE] get_session: {t1 - t0:.3f}s") + + # 3) Pull down the full state (DOM, screenshot, cookies, etc.) + t0 = time.perf_counter() session.cached_state = await self._update_state() + t1 = time.perf_counter() + print(f"[GET_STATE] _update_state: {t1 - t0:.3f}s") - # Save cookies if a file is specified + # 4) Fire-and-forget cookie save if self.config.cookies_file: asyncio.create_task(self.save_cookies()) diff --git a/run_replay.py b/run_replay.py index a766f22..bffab1f 100644 --- a/run_replay.py +++ b/run_replay.py @@ -18,6 +18,16 @@ from langchain_ollama import ChatOllama from pydantic import SecretStr +import time +from contextlib import contextmanager + +@contextmanager +def timer(label: str): + start = time.perf_counter() + yield + elapsed = time.perf_counter() - start + print(f"[TIMER] {label} took {elapsed:.3f}s") + load_dotenv() class TaskData(TypedDict): @@ -141,20 +151,23 @@ async def process_single_task( exact_replay_list = replay_list["action_list"] else: raise Exception("Error setting WAP replay mode, no mode type: ", replay_mode) - - agent = Agent( - task=task_str, - llm=client, - browser=browser, - validate_output=True, - generate_gif=False, - use_vision=False, - subgoal_list=subgoal_list, - exact_replay_list=exact_replay_list, - replay_mode=replay_mode - ) - history = await agent.run(max_steps=20) - history.save_to_file(task_dir / "history.json") + + with timer(" agent init"): + agent = Agent( + task=task_str, + llm=client, + browser=browser, + validate_output=True, + generate_gif=False, + use_vision=False, + subgoal_list=subgoal_list, + exact_replay_list=exact_replay_list, + replay_mode=replay_mode + ) + with timer(" agent.run"): + history = await agent.run(max_steps=20) + with timer(" history.save"): + history.save_to_file(task_dir / "history.json") except Exception as e: logging.error(f"Error processing task {replay_list['task_id']}: {str(e)}") @@ -164,75 +177,78 @@ async def process_single_task( await browser.close() -async def main(max_concurrent_tasks: int, - model_provider: str, - wap_replay_list_path: str = None) -> None: +async def main( + max_concurrent_tasks: int, + model_provider: str, + wap_replay_list_path: str = None +) -> None: try: - # Setup + # ─── Cleanup and concurrency control ────────────────────────────────── cleanup_webdriver_cache() semaphore = Semaphore(max_concurrent_tasks) - # Load tasks - replay_list = [] - with open(wap_replay_list_path, "r", encoding="utf-8") as f: - replay_list.append(json.load(f)) + # ─── Load the replay list ───────────────────────────────────────────── + with timer("WAP list load"): + replay_list = [] + with open(wap_replay_list_path, "r", encoding="utf-8") as f: + replay_list.append(json.load(f)) - # Initialize + # Prepare results directory results_dir = Path("results") results_dir.mkdir(parents=True, exist_ok=True) - # Process tasks concurrently with semaphore + # ─── Per‐task processor ─────────────────────────────────────────────── async def process_with_semaphore( - replay_list: Dict, + replay_item: dict, client: AzureChatOpenAI | ChatAnthropic | ChatOpenAI, ) -> None: async with semaphore: - print(f"\n=== Now at task {replay_list['task_id']} ===") + with timer(f"Task {replay_item['task_id']} total"): + print(f"\n=== Now at task {replay_item['task_id']} ===") - # Create browser instance inside the semaphore block - browser = Browser( - config=BrowserConfig( - # headless=True, - headless=False, - disable_security=True, - new_context_config=BrowserContextConfig( - disable_security=True, - wait_for_network_idle_page_load_time=5, - maximum_wait_page_load_time=20, - # no_viewport=True, - browser_window_size={ - "width": 1280, - "height": 1100, - }, - ), - ) - ) - await process_single_task( - replay_list, - client, - results_dir, - browser - ) - # Add this to ensure browser is always closed - try: - await browser.close() - except Exception as e: - logging.error(f"Error closing browser: {e}") + # 1) Browser startup + with timer(" browser startup"): + browser = Browser( + config=BrowserConfig( + headless=False, + disable_security=True, + new_context_config=BrowserContextConfig( + disable_security=True, + wait_for_network_idle_page_load_time=5, + maximum_wait_page_load_time=20, + browser_window_size={"width": 1280, "height": 1100}, + ), + ) + ) + + # 2) The actual work + with timer(" process_single_task"): + await process_single_task( + replay_item, + client, + results_dir, + browser + ) + + # 3) Browser teardown + with timer(" browser.close"): + try: + await browser.close() + except Exception as e: + logging.error(f"Error closing browser: {e}") - # Create and run all tasks + # ─── Spawn and run tasks ────────────────────────────────────────────── all_tasks = [] - for i, task in enumerate(replay_list): - model = next(get_llm_model_generator(model_provider)) - all_tasks.append(process_with_semaphore(task, model)) + for task in replay_list: + client = next(get_llm_model_generator(model_provider)) + all_tasks.append(process_with_semaphore(task, client)) - # Add timeout and better error handling await asyncio.gather(*all_tasks, return_exceptions=True) except Exception as e: traceback.print_exc() logging.error(f"Main loop error: {e}") finally: - # Cleanup code here - logging.info("Shutting down...") + logging.info("Shutting down…") if __name__ == "__main__":