Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions crewai_tools/tools/agent_scheduler_tool/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# agent-scheduler tool
## Description
This tool uses a Reinforcement Learning (RL) environment to help optimize how agents collaborate in CrewAI. It uses a custom RL environment built using OpenAI Gym where we simulate the agents working together. The goal is to improve task completion time and efficiency by adjusting how agents communicate and collaborate, based on the task at hand.

## Installation
Install the crewai_tools package
```shell
pip install 'crewai[tools]'
```

## Example
For example, when agents are given a task, they dynamically adjust how they interact based on previous performance, like avoiding overlapping efforts or optimizing resource allocation.

```python
from crewai_tools import AgentSchedulerTool
from crewai import LLM

Agent(
...
tools=[AgentSchedulerTool()],
)
tool = AgentSchedulerTool(agent_ids=["agent_alpha", "agent_beta", "agent_gamma"])
llm = LLM(model="azure/gpt-4o", api_version="2023-05-15")

agent = Agent(
name="Scheduler Agent",
role="Agent Performance Monitor",
goal="Optimize agent retraining schedules based on recent outcomes",
backstory="This agent reviews logs and adjusts how frequently agents should be retrained.",
tools=[tool],
llm=llm
)

task = Task(
description="Use the agent_scheduler tool to analyze agent_alpha performance with 'True,False,True,True,False,False,True' and suggest a retraining interval.",
expected_output="Suggest how often agent_alpha should be retrained",
agent=agent
)

crew = Crew(agents=[agent], tasks=[task], verbose=False)
```
Empty file.
54 changes: 54 additions & 0 deletions crewai_tools/tools/agent_scheduler_tool/agent_scheduler_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import random
from typing import Dict, List
from crewai.tools import BaseTool
from pydantic import Field


class AgentScheduler:
"""
Tracks agent performance and suggests dynamic retraining intervals.
"""

def __init__(self, agent_ids: List[str]):
self.performance_log: Dict[str, List[float]] = {
agent_id: [] for agent_id in agent_ids
}

def track_performance(self, agent_id: str, success: bool):
self.performance_log[agent_id].append(1.0 if success else 0.0)

def adjust_training_schedule(self, agent_id: str) -> int:
log = self.performance_log.get(agent_id, [])
if not log:
return 3 # Default if no data

avg_score = sum(log[-10:]) / min(len(log), 10)
if avg_score < 0.5:
return 1 # Frequent retraining
elif avg_score > 0.8:
return 5 # Rare retraining
return 3 # Moderate


class AgentSchedulerTool(BaseTool):
name: str = "agent_scheduler"
description: str = (
"Tracks agent performance and suggests dynamic retraining intervals. "
"Takes agent_id (e.g., 'agent_alpha') and performance (comma-separated values like 'True,False,True')"
)
agent_ids: List[str]
scheduler: AgentScheduler = Field(default=None)

def __init__(self, agent_ids: List[str]):
super().__init__(agent_ids=agent_ids)
object.__setattr__(self, 'scheduler', AgentScheduler(agent_ids))

def _run(self, agent_id: str, performance: str) -> str:
try:
performance_list = [x.strip() == "True" for x in performance.split(",")]
for result in performance_list:
self.scheduler.track_performance(agent_id, result)
interval = self.scheduler.adjust_training_schedule(agent_id)
return f"Recommended retraining interval for {agent_id}: {interval} days"
except Exception as e:
return f"Error processing input: {e}"
12 changes: 12 additions & 0 deletions crewai_tools/tools/collaboration_optimizer_tool/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Collaboration Optimizer
## Description
This tool uses a Reinforcement Learning (RL) environment to help optimize how agents collaborate in CrewAI. It uses a custom RL environment built using OpenAI Gym where we simulate the agents working together. The goal is to improve task completion time and efficiency by adjusting how agents communicate and collaborate, based on the task at hand.

## Installation
Install the crewai_tools package
```shell
pip install 'crewai[tools]'
```

## Example
For example, when agents are given a task, they dynamically adjust how they interact based on previous performance, like avoiding overlapping efforts or optimizing resource allocation.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from crewai.tools import BaseTool
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Discrete, Box

class AgentCollaborationEnv(gym.
Env):
def __init__(self, num_agents: int = 3):
super(AgentCollaborationEnv, self).__init__()
self.num_agents = num_agents
self.observation_space = Box(low=0, high=1, shape=(self.num_agents,), dtype=np.float32)
self.action_space = Discrete(self.num_agents * 2)
self.state = np.zeros(self.num_agents, dtype=np.float32)

def reset(self, seed=None, options=None):
self.state = np.random.rand(self.num_agents).astype(np.float32)
return self.state, {}

def step(self, action):
self.state = np.random.rand(self.num_agents).astype(np.float32)
reward = float(np.mean(self.state))
terminated = np.random.rand() > 0.95
truncated = False
return self.state, reward, terminated, truncated, {}


class CollaborationOptimizerTool(BaseTool):
name: str = "collaboration_optimizer"
description: str = "Optimizes collaboration strategies among agents using reinforcement learning."

def _run(self, num_agents: int = 3, timesteps: int = 2000):
env = AgentCollaborationEnv(num_agents)
check_env(env, warn=True)

model = PPO("MlpPolicy", env, verbose=0)
model.learn(total_timesteps=timesteps)

# Evaluation phase (returns average reward over 5 steps)
obs, _ = env.reset()
total_reward = 0
for _ in range(5):
action, _ = model.predict(obs)
obs, reward, terminated, truncated, _ = env.step(action)
total_reward += reward
if terminated or truncated:
break
avg_reward = total_reward / 5.0

return f"Average collaboration reward for {num_agents} agents: {avg_reward:.4f}"
56 changes: 56 additions & 0 deletions tests/tools/agent_scheduler_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
import unittest
from crewai import Agent, Task, Crew, LLM
from crewAI.src.crewai.tools.agent_scheduler import AgentSchedulerTool
from langchain_openai import AzureChatOpenAI


class TestAgentSchedulerTool(unittest.TestCase):
def setup(self):
os.environ["AXZURE_API_TYPE"] = "azure"
os.environ["AZURE_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY")
os.environ["AZURE_API_BASE"] = os.getenv("AZURE_OPENAI_ENDPOINT")
os.environ["AZURE_API_VERSION"] = "2025-01-01-preview"
os.environ["AZURE_DEPLOYMENT_NAME"] = "gpt-4o"

self.tool = AgentSchedulerTool(agent_ids=["agent_alpha", "agent_beta", "agent_gamma"])
self.llm = LLM(model="azure/gpt-4o", api_version="2023-05-15")

self.agent = Agent(
name="Scheduler Agent",
role="Agent Performance Monitor",
goal="Optimize agent retraining schedules based on recent outcomes",
backstory="This agent reviews logs and adjusts how frequently agents should be retrained.",
tools=[self.tool],
llm=self.llm
)

self.task = Task(
description="Use the agent_scheduler tool to analyze agent_alpha performance with 'True,False,True,True,False,False,True' and suggest a retraining interval.",
expected_output="Suggest how often agent_alpha should be retrained",
agent=self.agent
)

def test_tool_schema_structure(self):
schema = self.tool.args_schema.schema()
self.assertIn("agent_id", schema["properties"])
self.assertIn("performance", schema["properties"])

def test_agent_and_task_integration(self):
self.assertEqual(self.agent.name, "Scheduler Agent")
self.assertEqual(self.task.agent.name, "Scheduler Agent")
self.assertTrue(any(isinstance(t, AgentSchedulerTool) for t in self.agent.tools))

def test_crew_execution(self):
crew = Crew(agents=[self.agent], tasks=[self.task], verbose=False)
# This line will actually trigger execution. Comment if avoiding LLM calls.
# print(agent.tools[0].args_schema.schema_json(indent=2))
crew.kickoff()
# self.assertIn("retrain", result.lower())
# Instead, print schema for debug
print(self.agent.tools[0].args_schema.schema_json(indent=2))


# ag = TestAgentSchedulerTool()
# ag.setup()
# ag.test_crew_execution()
50 changes: 50 additions & 0 deletions tests/tools/collaboration_optimizer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import unittest
from crewai import Agent, Task, Crew, LLM
from crewAI.src.crewai.tools.collaboration_optimizer import CollaborationOptimizerTool

# Set Azure OpenAI credentials
os.environ["AZURE_API_TYPE"] = "azure"
os.environ["AZURE_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY")
os.environ["AZURE_API_BASE"] = os.getenv("AZURE_OPENAI_ENDPOINT")
os.environ["AZURE_API_VERSION"] = "2025-01-01-preview"
os.environ["AZURE_DEPLOYMENT_NAME"] = "gpt-4o"


class TestCollaborationOptimizerTool(unittest.TestCase):
def setup(self):
self.llm = LLM(model="azure/gpt-4o", api_version="2023-05-15")
self.agent = Agent(
name="Optimizer Agent",
role="Collaboration Strategist",
backstory="An AI agent specialized in optimizing teamwork among multiple agents through reinforcement learning strategies.",
goal="Maximize team collaboration efficiency",
tools=[CollaborationOptimizerTool()],
llm=self.llm,
verbose=True
)

self.task = Task(
description="Run a simulation to optimize collaboration among 4 agents.",
expected_output="Optimal reward score and strategy feedback",
agent=self.agent
)

self.crew = Crew(agents=[self.agent], tasks=[self.task], verbose=True)

def test_collaboration_optimizer_tool_attached_to_agent(self):
# Ensure the tool is properly attached
tool_names = [tool.name for tool in self.agent.tools]
self.assertIn("Collaboration Optimizer", tool_names)

def test_crew_kickoff_returns_result(self):
# Run the crew and assert the result format
result = self.crew.kickoff()
self.assertIsInstance(result, str) # Or dict, depending on what the tool returns
self.assertIn("Optimal", result) # You can refine this based on expected output

def test_tool_description_contains_expected_keywords(self):
tool = self.agent.tools[0]
self.assertIn("optimiz", tool.description.lower()) # fuzzy match for "optimize", etc.
self.assertTrue(tool.description) # Ensure description is not empty