Skip to content

[Dev]Identify _wait_for_stable_network as Performance Bottleneck #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 186 additions & 139 deletions browser_use/agent/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
)
from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync
from browser_use.wap.exact_replay import run_exact_replay
import time
from contextlib import contextmanager

@contextmanager
def timer(label: str):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"[AGENT TIMER] {label:<30} {elapsed:.3f}s")

load_dotenv()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -664,66 +673,89 @@ async def wap_smart_replay_step(self, step_info: Optional[AgentStepInfo] = None)

@time_execution_async('--step (agent)')
async def wap_exact_replay_step(self, step_info: Optional[AgentStepInfo] = None) -> None:
"""Execute one step of the task"""
"""Execute one step of the task, with timers around each sub-step."""
logger.info(f'📍 Exact Replay Step {self.exact_replay_list_index + 1}')
state = None
model_output = None
result: list[ActionResult] = []
step_start_time = time.time()
tokens = 0

try:
state = await self.browser_context.get_state()
await self._raise_if_stopped_or_paused()
self._message_manager.add_state_message(state, self.state.last_result, step_info, "", self.settings.use_vision)
# 1) Get current browser state
with timer("get_state"):
state = await self.browser_context.get_state()

# 2) Check for pauses or stops
with timer("raise_if_stopped_or_paused"):
await self._raise_if_stopped_or_paused()

# 3) Add state message
with timer("add_state_message"):
self._message_manager.add_state_message(
state,
self.state.last_result,
step_info,
"",
self.settings.use_vision
)

# 4) Prepare the action
cur_action = self.exact_replay_list[self.exact_replay_list_index]
result: list[ActionResult] = await run_exact_replay([cur_action],
self.controller,
self.browser_context,
self.ActionModel,
self.settings.page_extraction_llm,
self.sensitive_data,
self.settings.available_file_paths,
self.context)
self.exact_replay_list_index += 1
self.state.last_result = result

if len(result) > 0 and result[-1].is_done:
logger.info(f'📄 Result: {result[-1].extracted_content}')
# 5) Run the exact replay itself
with timer("run_exact_replay"):
result = await run_exact_replay(
[cur_action],
self.controller,
self.browser_context,
self.ActionModel,
self.settings.page_extraction_llm,
self.sensitive_data,
self.settings.available_file_paths,
self.context
)

self.state.consecutive_failures = 0
# 6) Advance internal index and record result
with timer("post_replay_index"):
self.exact_replay_list_index += 1
self.state.last_result = result
if result and result[-1].is_done:
logger.info(f'📄 Result: {result[-1].extracted_content}')
self.state.consecutive_failures = 0

except InterruptedError:
# logger.debug('Agent paused')
self.state.last_result = [
ActionResult(
error='The agent was paused mid-step - the last action might need to be repeated', include_in_memory=False
error='The agent was paused mid-step – retry needed',
include_in_memory=False
)
]
return

except asyncio.CancelledError:
# Directly handle the case where the step is cancelled at a higher level
# logger.debug('Task cancelled - agent was paused with Ctrl+C')
self.state.last_result = [ActionResult(error='The agent was paused with Ctrl+C', include_in_memory=False)]
self.state.last_result = [
ActionResult(
error='The agent was paused with Ctrl+C',
include_in_memory=False
)
]
raise InterruptedError('Step cancelled by user')

except Exception as e:
traceback.print_exc()
result = await self._handle_step_error(e)
self.state.last_result = result

finally:
# 7) Build history item (if any result)
step_end_time = time.time()
if not result:
return

if state:
metadata = StepMetadata(
step_number=self.state.n_steps,
step_start_time=step_start_time,
step_end_time=step_end_time,
input_tokens=tokens,
)
self._make_history_item(model_output, state, result, metadata)
if result:
with timer("make_history_item"):
metadata = StepMetadata(
step_number=self.state.n_steps,
step_start_time=step_start_time,
step_end_time=step_end_time,
input_tokens=0 # or track tokens if needed
)
self._make_history_item(model_output=None, state=state, result=result, metadata=metadata)


@time_execution_async('--handle_step_error (agent)')
Expand Down Expand Up @@ -937,119 +969,134 @@ async def take_step(self) -> tuple[bool, bool]:
return True, True

return False, False

# @observe(name='agent.run', ignore_output=True)
@time_execution_async('--run (agent)')
async def run(
self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, on_step_end: AgentHookFunc | None = None
self,
max_steps: int = 100,
on_step_start: AgentHookFunc | None = None,
on_step_end: AgentHookFunc | None = None
) -> AgentHistoryList:
"""Execute the task with maximum number of steps"""

loop = asyncio.get_event_loop()

# Set up the Ctrl+C signal handler with callbacks specific to this agent
from browser_use.utils import SignalHandler

signal_handler = SignalHandler(
loop=loop,
pause_callback=self.pause,
resume_callback=self.resume,
custom_exit_callback=None, # No special cleanup needed on forced exit
exit_on_second_int=True,
)
signal_handler.register()

# Start non-blocking LLM connection verification
assert self.llm._verified_api_keys, 'Failed to verify LLM API keys'

try:
self._log_agent_run()

# Execute initial actions if provided
if self.initial_actions:
result = await self.multi_act(self.initial_actions, check_for_new_elements=False)
self.state.last_result = result

for step in range(max_steps):
# Check if waiting for user input after Ctrl+C
if self.state.paused:
signal_handler.wait_for_resume()
signal_handler.reset()

# Check if we should stop due to too many failures
if self.state.consecutive_failures >= self.settings.max_failures:
logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures')
break

# Check control flags before each step
if self.state.stopped:
logger.info('Agent stopped')
break

while self.state.paused:
await asyncio.sleep(0.2) # Small delay to prevent CPU spinning
if self.state.stopped: # Allow stopping while paused
break
"""Execute the task with maximum number of steps, now with timers."""
with timer("agent.run total"):
loop = asyncio.get_event_loop()

if on_step_start is not None:
await on_step_start(self)
# Set up Ctrl+C handler
from browser_use.utils import SignalHandler
signal_handler = SignalHandler(
loop=loop,
pause_callback=self.pause,
resume_callback=self.resume,
custom_exit_callback=None,
exit_on_second_int=True,
)
signal_handler.register()

step_info = AgentStepInfo(step_number=step, max_steps=max_steps)
# Verify LLM connection
assert self.llm._verified_api_keys, 'Failed to verify LLM API keys'

if self.replay_mode == "exact_replay":
if len(self.exact_replay_list) <= self.exact_replay_list_index:
break
await self.wap_exact_replay_step(step_info)
elif self.replay_mode == "smart_replay":
await self.wap_smart_replay_step(step_info)
try:
self._log_agent_run()

# Initial actions
if self.initial_actions:
with timer(" multi_act initial_actions"):
result = await self.multi_act(
self.initial_actions, check_for_new_elements=False
)
self.state.last_result = result

# Main loop
for step in range(max_steps):
with timer(f" step {step:>3} total"):
# Pause/resume checks
if self.state.paused:
signal_handler.wait_for_resume()
signal_handler.reset()
if self.state.consecutive_failures >= self.settings.max_failures:
logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures')
break
if self.state.stopped:
logger.info('Agent stopped')
break
while self.state.paused:
await asyncio.sleep(0.2)
if self.state.stopped:
break

if on_step_start is not None:
await on_step_start(self)

step_info = AgentStepInfo(step_number=step, max_steps=max_steps)

# Replay or normal step timing
if self.replay_mode == "exact_replay":
with timer(" exact_replay_step"):
await self.wap_exact_replay_step(step_info)
elif self.replay_mode == "smart_replay":
with timer(" smart_replay_step"):
await self.wap_smart_replay_step(step_info)
else:
with timer(" base_step"):
await self.step(step_info)

if on_step_end is not None:
await on_step_end(self)

if self.state.history.is_done():
if (
self.settings.validate_output
and step < max_steps - 1
and self.replay_mode not in ("exact_replay", "smart_replay")
):
with timer(" validate_output"):
if not await self._validate_output():
continue
with timer(" log_completion"):
await self.log_completion()
break
else:
await self.step(step_info)

if on_step_end is not None:
await on_step_end(self)

if self.state.history.is_done():
if self.settings.validate_output and step < max_steps - 1 and self.replay_mode != "exact_replay" and self.replay_mode != "smart_replay":
if not await self._validate_output():
continue

await self.log_completion()
break
else:
logger.info('❌ Failed to complete task in maximum steps')

return self.state.history

except KeyboardInterrupt:
# Already handled by our signal handler, but catch any direct KeyboardInterrupt as well
logger.info('Got KeyboardInterrupt during execution, returning current history')
return self.state.history

finally:
# Unregister signal handlers before cleanup
signal_handler.unregister()

self.telemetry.capture(
AgentEndTelemetryEvent(
agent_id=self.state.agent_id,
is_done=self.state.history.is_done(),
success=self.state.history.is_successful(),
steps=self.state.n_steps,
max_steps_reached=self.state.n_steps >= max_steps,
errors=self.state.history.errors(),
total_input_tokens=self.state.history.total_input_tokens(),
total_duration_seconds=self.state.history.total_duration_seconds(),
logger.info('❌ Failed to complete task in maximum steps')

return self.state.history

except KeyboardInterrupt:
logger.info('Got KeyboardInterrupt during execution, returning current history')
return self.state.history

finally:
signal_handler.unregister()

# Telemetry capture (unchanged)
self.telemetry.capture(
AgentEndTelemetryEvent(
agent_id=self.state.agent_id,
is_done=self.state.history.is_done(),
success=self.state.history.is_successful(),
steps=self.state.n_steps,
max_steps_reached=self.state.n_steps >= max_steps,
errors=self.state.history.errors(),
total_input_tokens=self.state.history.total_input_tokens(),
total_duration_seconds=self.state.history.total_duration_seconds(),
)
)
)

await self.close()

if self.settings.generate_gif:
output_path: str = 'agent_history.gif'
if isinstance(self.settings.generate_gif, str):
output_path = self.settings.generate_gif

create_history_gif(task=self.task, history=self.state.history, output_path=output_path)
# Close the agent
with timer(" close"):
await self.close()

# Optional GIF
if self.settings.generate_gif:
output_path: str = 'agent_history.gif'
if isinstance(self.settings.generate_gif, str):
output_path = self.settings.generate_gif
with timer(" create_history_gif"):
create_history_gif(
task=self.task,
history=self.state.history,
output_path=output_path
)

# @observe(name='controller.multi_act')
@time_execution_async('--multi-act (agent)')
Expand Down
Loading