python - How to sub-class LangGraph's MessageState or use Pydantic for channel separation - Stack Overflow

admin2025-04-30  1

I am trying to create a Hierarchical LLM Agent workflow using LangGraph. The workflow is intended to be setup where the research_team conducts the research and the writing_team writes the report. The entire workflow should be executed for each sub-section of the main report request. Below is the image of the setup:

Each Research Team and the Writing Team follows the same structure where there is a Team Supervisor node and Worker nodes. I want to have this whole graph executed in a way that both the teams communicate on same channels (states) while researching and writing the sub-sections of the report. This requires Team Supervisor LLM producing structured output to feed to the worker Prompts and workers updating the channel States after completing execution. All the resources out there uses MessageState or some kind of Pydantic or TypedDict approach but I can't find a good way to understand how the whole graph communicates on same channels while working on a particular sub-section. I'll much appreciate your help in figuring this out. Below is the code representing what I'm trying to do:

Supervisor Node Function:

class SupervisorInput(MessagesState):
    """User request."""
    main_topic: Annotated[str, ..., "The main topic of the request"]
    section_topic: Annotated[Optional[str], "Sub-section topic of the main topic"]
    section_content: Annotated[Optional[str], "Sub-section topic content"]

def make_supervisor_node(llm: BaseChatModel, system_prompt: str | None, members: List[str]) -> str:
    options = ["FINISH"] + members
    
    if system_prompt is None:
        system_prompt = (
            "You are a supervisor tasked with managing a conversation between the"
            f" following teams: {members}. Given the user request,"
            " respond with the team to act next. Each team will perform a"
            " task and respond with their results and status. You should verify"
            " the task performed by the teams to ensure it statisfies user request."
            " When finished, respond with FINISH."
        )
    
    class SupervisorAction(TypedDict):
        """Supervisor action."""
        # main_topic: SupervisorInput
        section_topic: Annotated[str, "Sub-section topic of the main topic"]
        section_search_query: Annotated[Optional[str], "Search query for the sub-section topic"]
        next: Literal[*options]

    def supervisor_node(state: SupervisorInput) -> Command[Literal[*members, "__end__"]]:
        """An LLM-based decision maker."""
        
        # print(f"Supervisor Node State: {state}")
        
        messages = [
            {"role": "system", "content": system_prompt},
        ] + state["messages"]
        
        response = llm.with_structured_output(SupervisorAction).invoke(messages)
        print(f"Supervisor reponse: {response}")
        
        goto = response["next"]
        
        print(f"Going to node: {goto}")
        
        if goto == "FINISH":
            goto = END

        return Command(goto=goto)

return supervisor_node

Research Team Graph:

## Define tools
research_tools = [TavilySearchResults(max_results=5), PubmedQueryRun(), SemanticScholarQueryRun()]

## Define LLM
research_llm=ChatOpenAI(model="gpt-4o-mini", temperature=0)
tavily_agent = create_react_agent(research_llm, tools=research_tools)

def tavily_node(state: SupervisorInput) -> Command[Literal["supervisor"]]:
    
    result = tavily_agent.invoke(state)
    
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="tavily")
            ]
        },
        # We want our workers to ALWAYS "report back" to the supervisor when done
        goto="supervisor",
    )

research_supervisor_prompt = ''.join(open("./prompts/research_supervisor_prompt.txt","r").readlines())
# print(research_supervisor_prompt)

research_supervisor_node = make_supervisor_node(research_llm, research_supervisor_prompt,
                                                ["tavily"])
## Define Research Team
research_team = StateGraph(SupervisorInput)
research_team.add_node("supervisor", research_supervisor_node)
research_team.add_node("tavily", tavily_node)
research_team.add_edge(START, "supervisor")
research_graph = research_teampile()

The above code works but the LLM output is disjointed where it will hit the __end__ without completing the research for all the sub-sections. Also, this code just keeps adding on to the list of messages which doesn't seem to be effectively working, hence I want separate channels (states) for update section_topic and section_content which each research team and writing team can collaborate on while researching and writing. So, probably the questions is how do I sub-class MessageState to have separate channels for communications and keep updating them while working a task?

转载请注明原文地址:http://anycun.com/QandA/1746026635a91529.html