Skip to content

Story Graph Agent

Overview

The Story Graph Agent is the central orchestrator of CASYS RPG, managing workflow and coordinating all other agents using LangGraph.

Core Architecture

graph TD
    subgraph Story Graph
        WF[Workflow Manager] --> SG[Story Graph]
        SG --> SM[State Manager]
    end

    subgraph Nodes
        N1[Start Node] --> N2[Rules Node]
        N1 --> N3[Narrator Node]
        N2 --> N4[Decision Node]
        N3 --> N4
        N4 --> N5[End Node]
    end

    SG --> |Manages| Nodes

Key Components

Workflow Management

  • Node System

    • Start/End nodes
    • Agent-specific nodes
    • Parallel processing
  • State Transitions

    • Section management
    • State validation
    • Error handling
  • Flow Control

    • Fan-out operations
    • Fan-in aggregation
    • Conditional branching
class StoryGraph:
    async def _setup_workflow(self) -> None:
        self._graph = StateGraph(GameState)

        # Add nodes
        self._graph.add_node("node_start", self.workflow_manager.start_workflow)
        self._graph.add_node("node_rules", self._process_rules)
        self._graph.add_node("node_narrator", self._process_narrative)
        self._graph.add_node("node_decision", self._process_decision)
        self._graph.add_node("node_end", self.workflow_manager.end_workflow)

        # Add edges
        self._graph.add_edge(START, "node_start")
        self._graph.add_edge("node_start", ["node_rules", "node_narrator"])
        self._graph.add_edge(["node_rules", "node_narrator"], "node_decision")

State Management

The Story Graph manages game state through:

  1. State Validation
  2. Input validation
  3. State consistency checks
  4. Error detection

  5. State Transitions

  6. Section progression
  7. State updates
  8. History tracking

  9. Checkpoint System

  10. State persistence
  11. Recovery points
  12. Rollback capability

Agent Coordination

The Story Graph coordinates other agents through:

  • Parallel Processing

    • Rules and Narrator agents run in parallel
    • Results aggregated for Decision agent
    • Trace agent logs all operations
  • State Sharing

    • Immutable state objects
    • Validated transitions
    • Cached results
  • Error Handling

    • Agent-specific errors
    • Graceful degradation
    • Recovery strategies
class StoryGraph:
    async def process_event(self, event: GameEvent) -> GameState:
        # Initialize workflow if needed
        if not self._graph:
            await self._setup_workflow()

        try:
            # Process event through workflow
            result = await self._graph.process({
                "event": event,
                "state": await self.state_manager.get_current_state()
            })

            return result

        except Exception as e:
            return await self.workflow_manager.handle_error(e)

Communication Flow

sequenceDiagram
    participant WF as Workflow Manager
    participant SG as Story Graph
    participant SM as State Manager
    participant AG as Agents

    WF->>SG: Start Workflow
    SG->>SM: Get Current State

    par Parallel Processing
        SG->>AG: Process Rules
        SG->>AG: Generate Narrative
    end

    AG-->>SG: Results
    SG->>AG: Make Decision
    AG-->>SG: Decision
    SG->>SM: Update State
    SG->>WF: Complete Workflow

Best Practices

  1. Graph Design
  2. Keep nodes focused and simple
  3. Use parallel processing when possible
  4. Implement proper error boundaries
  5. Monitor node performance

  6. State Management

  7. Validate all state changes
  8. Use immutable state objects
  9. Implement checkpoints
  10. Handle edge cases

  11. Agent Coordination

  12. Define clear protocols
  13. Use type hints
  14. Handle timeouts
  15. Log interactions

Error Handling

The Story Graph implements robust error handling:

try:
    # Process workflow
    result = await self._graph.process(input_data)
    return result
except GameError as e:
    # Handle game-specific errors
    return await self.workflow_manager.handle_error(e)
except Exception as e:
    # Handle unexpected errors
    logger.error("Unexpected error in workflow: {}", str(e))
    return await self.workflow_manager.handle_error(e)

Performance Considerations

  1. Parallelization
  2. Parallel agent execution
  3. Async/await patterns
  4. Resource pooling

  5. Caching

  6. State caching
  7. Result memoization
  8. Hot path optimization

  9. Memory Management

  10. State cleanup
  11. Resource release
  12. Memory monitoring