Source code for sherpa_ai.orchestrator

from typing import List, Optional

from langchain_openai import ChatOpenAI 
from pydantic import BaseModel 

from sherpa_ai.actions.planning import Plan
from sherpa_ai.agents import AgentPool, Critic, MLEngineer, Physicist, Planner
from sherpa_ai.agents.base import BaseAgent
from sherpa_ai.events import EventType
from sherpa_ai.memory import Belief, SharedMemory


[docs] class OrchestratorConfig(BaseModel): llm_name: str = "gpt-3.5-turbo" llm_temperature: float = 0.7 critic_rounds: int = 3
[docs] class Orchestrator: def __init__(self, config: OrchestratorConfig, agent_pool: AgentPool = AgentPool()): self.config = config self.agent_types = [MLEngineer, Physicist] self.llm = ChatOpenAI( model_name=self.config.llm_name, temperature=self.config.llm_temperature ) self.agent_pool = agent_pool self.shared_memory = SharedMemory( objective="", agent_pool=self.agent_pool)
[docs] def plan(self, task: str, planner: Planner, critic: Critic) -> Plan: # planner critic loop for _ in range(self.config.critic_rounds): # planner plan = planner.plan(task) # critic feedback = critic.get_feedback(task, str(plan)) if feedback == "": break return plan
[docs] def execute(self, plan: Plan, planner: Planner): agent_pool = self.agent_pool shared_memory = self.shared_memory for step in plan.steps: # log the task to the shared memory for the agent to execute agent = agent_pool.get_agent(step.agent_name) shared_memory.current_step = step shared_memory.add(EventType.task, planner.name, step.task) agent.run()
[docs] def add_agent(self, agent: BaseAgent): self.agent_pool.add_agent(agent)
[docs] def run(self, task): # initialize agents and shared_memories agent_pool = self.agent_pool shared_memory = self.shared_memory shared_memory.objective = task # initialize the planner and critic planner = Planner( name="planner", agent_pool=agent_pool, shared_memory=shared_memory, llm=self.llm, ) critic = Critic(shared_memory=shared_memory) plan = self.plan(task, planner, critic) shared_memory.plan = plan for step in plan.steps: # log the task to the shared memory for the agent to execute agent = agent_pool.get_agent(step.agent_name) shared_memory.current_step = step shared_memory.add(EventType.task, planner.name, step.task) agent.run()
[docs] def save(self, shared_memory: SharedMemory, agents: List[BaseAgent]): # save the shared memory and agents result = {} result["shared_memory"] = shared_memory.__dict__ result["agent_belief"] = { agent.name: agent.belief.__dict__ for agent in agents} return result
[docs] @classmethod def restore(cls, data: dict, agent_pool: AgentPool): # restore the shared memory and agents shared_memory = SharedMemory.from_dict( data["shared_memory"], agent_pool) agent_belief = data["agent_belief"] for name, agent in agent_pool.agents.items(): agent.belief = Belief.from_dict(agent_belief[name]) orchestrator = cls(OrchestratorConfig()) orchestrator.shared_memory = shared_memory orchestrator.agent_pool = agent_pool return orchestrator
[docs] def continue_with_user_feedback(self, user_feedback) -> Optional[str]: current_step = self.shared_memory.current_step current_step_id = self.shared_memory.events.index(current_step) agent_pool = self.agent_pool shared_memory = self.shared_memory self.shared_memory.add( EventType.result, current_step.agent_name, user_feedback) # continue with the next step for i in range(current_step_id + 1, len(self.shared_memory.plan.steps)): step = self.shared_memory.plan.steps[i] agent = agent_pool.get_agent(step.agent_name) shared_memory.current_step = step shared_memory.add(EventType.task, agent.name, step.task) agent.run()