I am trying to create a Hierarchical LLM Agent workflow using LangGraph
. The workflow is intended to be setup where the research_team
conducts the research and the writing_team
writes the report. The entire workflow should be executed for each sub-section of the main report request. Below is the image of the setup:
Each Research Team and the Writing Team follows the same structure where there is a Team Supervisor
node and Worker
nodes. I want to have this whole graph executed in a way that both the teams communicate on same channels (states
) while researching and writing the sub-sections of the report. This requires Team Supervisor
LLM producing structured output
to feed to the worker Prompts
and workers updating the channel States
after completing execution. All the resources out there uses MessageState
or some kind of Pydantic
or TypedDict
approach but I can't find a good way to understand how the whole graph communicates on same channels while working on a particular sub-section. I'll much appreciate your help in figuring this out. Below is the code representing what I'm trying to do:
Supervisor Node Function:
class SupervisorInput(MessagesState):
"""User request."""
main_topic: Annotated[str, ..., "The main topic of the request"]
section_topic: Annotated[Optional[str], "Sub-section topic of the main topic"]
section_content: Annotated[Optional[str], "Sub-section topic content"]
def make_supervisor_node(llm: BaseChatModel, system_prompt: str | None, members: List[str]) -> str:
options = ["FINISH"] + members
if system_prompt is None:
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
f" following teams: {members}. Given the user request,"
" respond with the team to act next. Each team will perform a"
" task and respond with their results and status. You should verify"
" the task performed by the teams to ensure it statisfies user request."
" When finished, respond with FINISH."
)
class SupervisorAction(TypedDict):
"""Supervisor action."""
# main_topic: SupervisorInput
section_topic: Annotated[str, "Sub-section topic of the main topic"]
section_search_query: Annotated[Optional[str], "Search query for the sub-section topic"]
next: Literal[*options]
def supervisor_node(state: SupervisorInput) -> Command[Literal[*members, "__end__"]]:
"""An LLM-based decision maker."""
# print(f"Supervisor Node State: {state}")
messages = [
{"role": "system", "content": system_prompt},
] + state["messages"]
response = llm.with_structured_output(SupervisorAction).invoke(messages)
print(f"Supervisor reponse: {response}")
goto = response["next"]
print(f"Going to node: {goto}")
if goto == "FINISH":
goto = END
return Command(goto=goto)
return supervisor_node
Research Team Graph:
## Define tools
research_tools = [TavilySearchResults(max_results=5), PubmedQueryRun(), SemanticScholarQueryRun()]
## Define LLM
research_llm=ChatOpenAI(model="gpt-4o-mini", temperature=0)
tavily_agent = create_react_agent(research_llm, tools=research_tools)
def tavily_node(state: SupervisorInput) -> Command[Literal["supervisor"]]:
result = tavily_agent.invoke(state)
return Command(
update={
"messages": [
HumanMessage(content=result["messages"][-1].content, name="tavily")
]
},
# We want our workers to ALWAYS "report back" to the supervisor when done
goto="supervisor",
)
research_supervisor_prompt = ''.join(open("./prompts/research_supervisor_prompt.txt","r").readlines())
# print(research_supervisor_prompt)
research_supervisor_node = make_supervisor_node(research_llm, research_supervisor_prompt,
["tavily"])
## Define Research Team
research_team = StateGraph(SupervisorInput)
research_team.add_node("supervisor", research_supervisor_node)
research_team.add_node("tavily", tavily_node)
research_team.add_edge(START, "supervisor")
research_graph = research_teampile()
The above code works but the LLM output is disjointed where it will hit the __end__
without completing the research for all the sub-sections. Also, this code just keeps adding on to the list of messages
which doesn't seem to be effectively working, hence I want separate channels (states
) for update section_topic
and section_content
which each research team
and writing team
can collaborate on while researching and writing. So, probably the questions is how do I sub-class MessageState
to have separate channels for communications and keep updating them while working a task?