Building a State-Driven Workflow Engine for AI Applications
This article presents a Shell and Node pattern that decouples state, nodes, and routing into composable units, instead of traditional if/else chains.
Join the DZone community and get the full member experience.
Join For FreeWhen building AI-powered applications, we quickly encounter a challenge that traditional API architectures struggle to handle: AI workflows are inherently multi-step, branching, and asynchronous. A single user request might trigger intent analysis, prompt refinement, credit checking, task submission, and result delivery, each with different timing and failure modes.
This pattern emerged while building Banana AI, an AI-powered creative platform where user requests trigger complex workflows involving LLM calls, image generation, and video processing. The common approach of handling this with nested if/else chains in API routes works for simple cases but becomes unmaintainable as features grow.
This article presents a "Shell and Node" architecture pattern that decouples state management, node logic, and routing into composable units, enabling you to add new features without modifying the core engine.
The Shell and Node Pattern
The pattern consists of four core components that work together like an assembly line:
| Component | Responsibility | Analogy |
|---|---|---|
| State | Global context object flowing through the pipeline | Tray carrying work in progress |
| Node | Pure function that performs a single task | Worker at a station |
| Router | Pure function that decides the next node based on state | Dispatcher directing traffic |
| Engine | Loop that executes nodes until completion | Conveyor belt moving trays |
This separation provides several benefits. Each node is a pure function with a single responsibility, making it easy to test in isolation. The router logic is centralized in one place, giving you a complete view of all possible paths through the system. Adding a new feature requires only adding a new node and updating the router, with no changes to the engine itself.
This pattern particularly suits AI applications because AI workflows often involve multiple external service calls with different latency profiles. An image generation request might need intent analysis via an LLM (fast), followed by actual generation via an external API (slow), then storage upload and credit deduction. Each step has different failure modes and timing requirements.
State Bus Design
The state bus is the single source of truth for the entire workflow. All nodes read from and write to this shared context object. Here is a TypeScript interface that demonstrates the pattern:
import type { ModelMessage } from 'ai';
/**
* The state bus flows through the entire pipeline.
* Each node reads what it needs and writes its output.
*/
export interface AgentState {
// Immutable input, set by API route and never modified
input: {
messages: ModelMessage[];
userUuid: string;
sessionUuid: string;
selectedModel: string;
aspectRatio?: string;
};
// Phase specific sub states, each written by a specific node
evaluation?: {
intent: 'GENERATE_MEDIA' | 'GENERAL_CHAT' | 'ASK_FOR_INFO';
reasoning: string;
refinedPrompt?: string;
mediaPayload?: MediaPayload;
};
credit?: {
reservationId: string;
amount: number;
};
submit?: {
predictionId: string;
messageUuid: string;
};
upload?: {
uploadedMedia: UploadedMedia[];
};
// Error state, checked by router to short circuit to end
error?: { code: string; message: string };
// Control flow: which node should execute next
nextStep: NodeName;
The key insight is that each sub-object is written entirely by one node. The engine uses shallow merge when updating state: state = { ...state, ...updates }. This means nodes do not need to worry about partial merges or deep merging logic.
The optional sub-objects serve a dual purpose. They hold the data produced by each node, but they also act as completion flags. The router checks whether state.evaluation exists to know if the evaluation has run. This eliminates the need for separate status tracking fields.
For type safety with polymorphic payloads, we use discriminated unions:
// Discriminated Union ensures type safety
// The mediaType field is the discriminant
export type MediaPayload = ImagePayload | VideoPayload;
export interface ImagePayload {
mediaType: 'image'; // Discriminant
model: string;
prompt: string;
aspectRatio: string;
creditsCost: number;
}
export interface VideoPayload {
mediaType: 'video'; // Discriminant
model: string;
prompt: string;
duration: number;
creditsCost: number;
}
// TypeScript automatically narrows the type based on discriminant
function processPayload(payload: MediaPayload) {
if (payload.mediaType === 'image') {
// TypeScript knows payload.aspectRatio exists here
console.log(payload.aspectRatio);
} else {
// TypeScript knows payload.duration exists here
console.log(payload.duration);
}
}
This pattern prevents invalid states at compile time. A payload with mediaType: 'image' and duration: 30 would be a compile error, not a runtime bug.
Node Implementation
Nodes are pure functions that take the current state and an optional stream writer, then return a partial state update. The signature is consistent across all nodes:
type WorkflowNode = (
state: AgentState,
writer?: UIMessageStreamWriter
Let us examine the evaluator node, which serves as the "brain" of the system:
import { generateObject } from 'ai';
import { createOpenRouter } from '@openrouter/ai-sdk-provider';
import { z } from 'zod';
const openrouter = createOpenRouter({ apiKey: process.env.OPENROUTER_API_KEY });
// Define strict schema for LLM output
const evaluationSchema = z.object({
intent: z.enum(['GENERATE_MEDIA', 'GENERAL_CHAT', 'ASK_FOR_INFO']),
reasoning: z.string(),
refinedPrompt: z.string().optional(),
detectedMediaType: z.enum(['image', 'video']).optional(),
});
export async function evaluatorNode(
state: AgentState,
writer?: UIMessageStreamWriter
): Promise<Partial<AgentState>> {
const { input } = state;
// Notify frontend that evaluation is starting
writer?.write({
type: 'data-thinking',
data: { stage: 'understanding', message: 'Analyzing your request...' },
transient: true,
});
// Call LLM for intent analysis and prompt refinement
const result = await generateObject({
model: openrouter('openai/gpt-4o-mini'),
messages: input.messages,
schema: evaluationSchema,
system: `You are an intent classifier for a creative AI application.
Default to GENERATE_MEDIA when the user wants to create something.
Use GENERAL_CHAT for casual conversation.
Use ASK_FOR_INFO only when critical information is missing.
Always refine the user's prompt into a detailed English description.`,
});
const { intent, reasoning, refinedPrompt, detectedMediaType } = result.object;
// Build media payload if generating content
let mediaPayload: MediaPayload | undefined;
if (intent === 'GENERATE_MEDIA' && refinedPrompt) {
const model = input.selectedModel;
const creditsCost = getModelCreditsCost(model);
if (detectedMediaType === 'video') {
mediaPayload = {
mediaType: 'video',
model,
prompt: refinedPrompt,
duration: 5,
creditsCost,
};
} else {
mediaPayload = {
mediaType: 'image',
model,
prompt: refinedPrompt,
aspectRatio: input.aspectRatio || '1:1',
creditsCost,
};
}
}
return {
evaluation: {
intent,
reasoning,
refinedPrompt,
mediaPayload,
},
};
}
The credit node demonstrates atomic operations with a reserve pattern:
export async function creditNode(
state: AgentState,
writer?: UIMessageStreamWriter
): Promise<Partial<AgentState>> {
const { input, evaluation } = state;
// Determine credit amount based on intent
const amount = evaluation?.intent === 'GENERATE_MEDIA'
? evaluation.mediaPayload!.creditsCost
: 1; // Chat costs 1 credit
// Reserve credits atomically using Durable Objects
// This prevents race conditions when users make concurrent requests
const reservation = await reserveUserCredits({
userUuid: input.userUuid,
amount,
taskType: evaluation?.intent === 'GENERATE_MEDIA'
? `chat_${evaluation.mediaPayload!.mediaType === 'video' ? 'text_to_video' : 'text_to_image'}`
: 'chat_text',
});
if (!reservation.success) {
return {
error: {
code: 'insufficient_credits',
message: `You need ${amount} credits. Current balance: ${reservation.balance}`,
},
};
}
return {
credit: {
reservationId: reservation.predictionId,
amount,
},
};
}
The reserve pattern is crucial for consistency. We reserve credits before starting expensive operations, then confirm after success or cancel on failure. This prevents the common bug where credits are deducted, but the operation fails.
Router Logic
The router is a pure function that examines the current state and returns the name of the next node to execute. It uses the presence of sub-objects as completion flags:
export function route(state: AgentState): NodeName {
// Error always short circuits to end
if (state.error) return 'end';
// No evaluation yet means we need to evaluate first
if (!state.evaluation) return 'evaluator_node';
// Route based on intent
switch (state.evaluation.intent) {
case 'GENERATE_MEDIA':
// Phase 1: credit check and task submission
if (!state.credit) return 'credit_node';
if (!state.submit) return 'submit_node';
// Phase 2: results handling (triggered by webhook)
if (!state.upload?.uploadedMedia?.length) return 'upload_node';
if (!state.upload.uploadedMedia.every(m => m.workUuid)) return 'confirm_node';
return 'end';
case 'GENERAL_CHAT':
if (!state.credit) return 'credit_node';
return 'chat_node';
case 'ASK_FOR_INFO':
return 'clarify_node';
default:
return 'end';
}
}
The router logic reads like a description of the workflow. For media generation, we first check credits, then submit the task, then wait for results (via webhook), then upload and confirm. Each condition checks whether the previous step has completed.
This centralized routing logic makes the system easy to understand and modify. To add a new step in the flow, you add a new condition and a new node. The router becomes a living document of all possible paths through the system.
Engine Runtime
The engine is the conveyor belt that moves the state through nodes until completion. Its implementation is surprisingly simple:
export interface EngineResult {
state: AgentState;
outcome: 'completed' | 'suspended';
}
export async function runWorkflow(
initialState: AgentState,
writer?: UIMessageStreamWriter
): Promise<EngineResult> {
let state = { ...initialState };
// Node registry: add new nodes here
const nodes: Record<string, WorkflowNode> = {
evaluator_node: evaluatorNode,
credit_node: creditNode,
submit_node: submitNode,
upload_node: uploadNode,
confirm_node: confirmNode,
chat_node: chatNode,
clarify_node: clarifyNode,
};
// Main loop
while (state.nextStep !== 'end' && state.nextStep !== 'suspend') {
const nodeName = route(state);
// Handle terminal states
if (nodeName === 'end' || nodeName === 'suspend') {
state.nextStep = nodeName;
break;
}
const node = nodes[nodeName];
if (!node) {
state.error = {
code: 'unknown_node',
message: `Node "${nodeName}" not found in registry`,
};
break;
}
try {
// Execute node and merge results
const updates = await node(state, writer);
state = { ...state, ...updates };
state.nextStep = route(state);
} catch (err) {
// Cancel any pending credit reservation on error
if (state.credit?.reservationId && !state.upload?.uploadedMedia?.length) {
await cancelCreditReservation({
userUuid: state.input.userUuid,
predictionId: state.credit.reservationId,
reason: err instanceof Error ? err.message : 'Unknown error',
});
}
state.error = {
code: 'node_error',
message: err instanceof Error ? err.message : 'Unknown error',
};
state.nextStep = 'end';
}
}
// Stream error to frontend if we have a writer
if (state.error && writer) {
writer.write({
type: 'data-error',
data: { code: state.error.code, message: state.error.message },
transient: true,
});
}
return {
state,
outcome: state.nextStep === 'suspend' ? 'suspended' : 'completed',
};
}
The engine handles several concerns that would otherwise be scattered across the codebase:
- Node registry: All nodes are registered in one place, making it easy to see what exists
- Error handling: Any unhandled node error triggers credit cancellation and terminates the workflow
- Suspension: For async tasks, the engine can pause execution and return control to the caller
The suspend state is special. It indicates that the workflow is waiting for an external event (typically a webhook callback) before continuing. This brings us to the two-phase execution model.
Two-Phase Execution for Async Tasks
Serverless functions have execution time limits. Cloudflare Workers, for example, limits CPU time on the free tier. AI image and video generation can take 10 to 60 seconds, far exceeding these limits.
The solution is to split the workflow into two phases connected by a webhook:
flowchart TD
subgraph Phase1["Phase 1: Stream (User Request)"]
A[User Message] --> B[evaluator_node]
B --> C[credit_node]
C --> D[submit_node]
D --> E[SUSPEND]
end
subgraph External["External Service"]
E --> F[AI Model Processing<br/>10-60 seconds]
end
subgraph Phase2["Phase 2: Webhook Resume"]
F --> G[Webhook Callback]
G --> H[Load State from DO]
H --> I[upload_node]
I --> J[confirm_node]
J --> K[END]
The submit node initiates the external task without waiting for results:
export async function submitNode(
state: AgentState,
writer?: UIMessageStreamWriter
): Promise<Partial<AgentState>> {
const { input, evaluation, credit } = state;
const payload = evaluation!.mediaPayload!;
// Generate unique prediction ID
const predictionId = `pred_${nanoid()}`;
// Notify frontend
writer?.write({
type: 'data-status',
data: { stage: 'generating', predictionId },
transient: true,
});
// Submit to Replicate (fire and forget pattern)
// We do NOT await this call
const replicateResponse = await fetch('https://api.replicate.com/v1/predictions', {
method: 'POST',
headers: {
'Authorization': `Token ${process.env.REPLICATE_API_TOKEN}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
version: getModelVersion(payload.model),
input: {
prompt: payload.prompt,
aspect_ratio: payload.mediaType === 'image' ? payload.aspectRatio : undefined,
},
webhook: `${process.env.APP_URL}/api/webhook/replicate`,
webhook_events_filter: ['completed'],
}),
});
if (!replicateResponse.ok) {
throw new Error(`Replicate submission failed: ${replicateResponse.statusText}`);
}
// Persist state to Durable Object for webhook resume
await persistSuspendedState({
predictionId,
state: {
...state,
submit: { predictionId, messageUuid: nanoid() },
},
});
return {
submit: { predictionId, messageUuid: nanoid() },
nextStep: 'suspend',
};
The webhook handler reconstructs the state and resumes the engine:
// app/api/webhook/replicate/route.ts
export async function POST(req: Request) {
const payload = await req.json();
// Verify webhook signature
if (!verifyReplicateSignature(req, payload)) {
return new Response('Invalid signature', { status: 401 });
}
const predictionId = payload.id;
// Load suspended state from Durable Object
const suspendedState = await loadSuspendedState(predictionId);
if (!suspendedState) {
return new Response('State not found', { status: 404 });
}
// Inject generation results
const state: AgentState = {
...suspendedState,
generation: {
outputs: extractOutputs(payload),
},
nextStep: 'router', // Trigger router to continue
};
// Resume engine (no writer, results go to DO for polling)
const result = await runWorkflow(state);
return new Response('OK');
}
This pattern allows the workflow to span multiple function invocations while maintaining a single coherent state. The user gets immediate feedback during Phase 1 (streaming status updates), then polls for final results after the webhook completes Phase 2.
Extensibility Examples
The true test of an architecture is how easily it accommodates new requirements. Here are two examples of extending the system:
Adding Video Generation
// 1. Add node implementation
export async function videoGenNode(
state: AgentState,
writer?: UIMessageStreamWriter
): Promise<Partial<AgentState>> {
// Video specific logic here
}
// 2. Update router to include video path
export function route(state: AgentState): NodeName {
// ... existing code ...
switch (state.evaluation.intent) {
case 'GENERATE_MEDIA':
// Check if this is a video request
if (state.evaluation.mediaPayload?.mediaType === 'video') {
if (!state.credit) return 'credit_node';
if (!state.submit) return 'submit_node'; // Same submit node works!
// ... rest of video handling
}
// ... existing image handling
}
}
// 3. Register in engine
const nodes: Record<string, WorkflowNode> = {
// ... existing nodes
video_gen_node: videoGenNode,
};
// Done! No engine changes required.
Adding Style Presets as Middleware
// Style preset node runs between evaluator and credit
export async function stylePresetNode(
state: AgentState,
writer?: UIMessageStreamWriter
): Promise<Partial<AgentState>> {
const { evaluation } = state;
const userStyle = await getUserStylePreset(state.input.userUuid);
if (userStyle && evaluation?.mediaPayload) {
// Inject style into prompt
evaluation.mediaPayload.prompt = `${evaluation.mediaPayload.prompt}, ${userStyle.modifiers}`;
}
return { evaluation };
}
// Update router to include style step
export function route(state: AgentState): NodeName {
if (!state.evaluation) return 'evaluator_node';
// New style preset step for media generation
if (state.evaluation.intent === 'GENERATE_MEDIA' && !state.styleApplied) {
return 'style_preset_node';
}
// ... rest of routing
}
The node pattern makes these extensions straightforward. Each new feature is isolated in its own file with a clear contract. Testing is simple because each node is a pure function.
Lessons Learned
After implementing this pattern in a production AI application, several insights emerged:
When this pattern is overkill:
Simple CRUD operations do not benefit from this architecture. If your API route just validates input, writes to a database, and returns a response, the added abstraction is not worth it. The pattern shines when you have branching logic, multiple external service calls, or complex state transitions.
When this pattern shines:
Multi-step workflows with different latency profiles benefit most. Our chat to generate a feature involves intent analysis (fast LLM call), optional media generation (slow external API), credit management, and storage operations. The node pattern keeps each concern isolated while the router provides a clear map of all possible paths.
Performance considerations:
The main loop has minimal overhead since it is just function calls and object spreading. The real performance considerations are in the individual nodes. Because nodes are pure functions, they are easy to optimize in isolation. You can add caching to the evaluator node, connection pooling to the database node, or batching to the upload node without affecting other parts of the system.
Testing benefits:
Each node can be unit tested in isolation with a mock state object. Integration tests can verify the router logic. End-to-end tests only need to verify the complete flow, not every permutation. This layered testing strategy catches bugs early while maintaining confidence in the overall system.
Conclusion
The Shell and Node pattern provides a clean separation of concerns for complex AI workflows. By decoupling state management, node logic, and routing, you can add new features without modifying existing code. The centralized router serves as documentation of all possible paths through the system. Pure function nodes are easy to test and reason about in isolation.
This architecture powers the chat to generate features at our AI image generation platform, handling text chat, image generation, and video generation through the same unified workflow. The two-phase execution model enables async operations within serverless constraints while maintaining a single coherent state.
For applications with complex branching logic, multiple external service integrations, or workflows that span multiple function invocations, this pattern offers a maintainable alternative to nested conditionals and scattered state management.
Opinions expressed by DZone contributors are their own.
Comments