Guide to LangChain Runnable Architecture
Understanding the foundations behind the LangChain framework provides universal knowledge that can be applied when building complex agent-based systems.
Join the DZone community and get the full member experience.
Join For FreeThe LangChain framework is an incredibly powerful tool that significantly accelerates the effective use of LLMs in projects and agent development. The framework provides high-level abstractions that allow developers to start working with models and integrate them into their products right away. However, understanding the core concepts of LangChain, such as the architecture of Runnable, is extremely beneficial for developers building LLM agents and chains, as it provides a structured approach and insight into utilizing the framework.
The Basis of LangChain Architecture
The Runnable architecture in LangChain is built on the principles of the Command Pattern, a behavioral design pattern that encapsulates requests as objects. This design facilitates parameterization, queuing, and dynamic execution of commands, making Runnables modular, composable, and manageable in various workflows.
Runnables are particularly well-suited for workflow management, sequential task execution, handling conditional logic, and interacting with external systems. They deliver flexibility, reusability, and modularity. You can dynamically chain tasks together to create complex behavioral scenarios while maintaining a clean and manageable code structure.
Most high-level objects in LangChain that perform specific tasks implement the Runnable class. Any objects you plan to include in a chain must also implement the Runnable class in some capacity. Interestingly, Runnable serves as an abstraction for a command, a concrete command, and simultaneously acts as both the invoker and receiver.
A notable example is a pipe method available in this class, which is specifically designed for creating chains. This method allows seamless composition of multiple Runnables, making it a cornerstone for structuring and executing workflows within LangChain.
In the diagram above, you can see how Runnable operates in conjunction with its various implementations, which we will examine in detail throughout this article.
Creating Runnables
Practically, there are two ways to create a runnable: through RunnableLambda or by extending the base Runnable class.
Using RunnableLambda for Simple Functions
The easiest way to create a Runnable is by using RunnableLambda. This class lets you wrap any function as a Runnable, allowing dynamic behavior without the need for custom classes.
import { RunnableLambda } from "@langchain/core/runnables";
// Define a simple function
const toUpperCase = (text: string): string => text.toUpperCase();
// Wrap the function as a Runnable
const upperCaseRunnable = RunnableLambda.from(toUpperCase);
// Invoke the Runnable
const result = await upperCaseRunnable.invoke("hello world");
// Output: "HELLO WORLD"
Extending the Runnable Base Class
For more advanced use cases, you can extend the Runnable base class. This approach provides full control over the execution lifecycle, including methods like invoke, batch, and stream.
import { Runnable } from "@langchain/core/runnables";
class GreetUserRunnable extends Runnable<string, string> {
lc_namespace = ["GreetUser"];
onStart(data: { input: string }) {
console.log(`Starting with input: ${data.input}`);
}
onEnd(data: { result: string }) {
console.log(`Finished with result: ${data.result}`);
}
onError(error: unknown) {
console.error(`Error occurred: ${(error as Error).message}`);
}
// Custom execution logic
async invoke(name: string): Promise<string> {
this.onStart({ input: name });
try {
const greeting = `Hello, ${name}!`;
this.onEnd({ result: greeting });
return greeting;
} catch (error) {
this.onError(error);
throw error;
}
}
}
Building Workflows With Runnables
The Runnable architecture in LangChain is extended with specialized Runnables grouped by functionality, making it versatile and suitable for a variety of applications.
Routing and Branching
Runnables that manage execution flow based on conditions or input:
RouterRunnable
Directs input to specific Runnables based on a key, similar to a switch-case statement. Useful for dynamic task execution based on runtime parameters.
import { RouterRunnable, RunnableLambda } from "@langchain/core/runnables";
const router = new RouterRunnable({
runnables: {
billing: RunnableLambda.from((query: string) => `Billing Department: ${query}`),
technical: RunnableLambda.from((query: string) => `Technical Support: ${query}`),
general: RunnableLambda.from((query: string) => `General Inquiry: ${query}`),
},
});
// Route a billing question
const result1 = await router.invoke({ key: "billing", input: "I have a question about my invoice." });
// Output: "Billing Department: I have a question about my invoice."
// Route a technical issue
const result2 = await router.invoke({ key: "technical", input: "My internet is not working." });
// Output: "Technical Support: My internet is not working."
RunnableBranch
Executes a specific Runnable from multiple options based on conditional checks, allowing the workflow to adapt to different input scenarios.
const branch = RunnableBranch.from([
[
(user: { age: number }) => user.age < 18,
RunnableLambda.from((user) => `Hey ${user.name}, check out our new teen collection!`),
],
[
(user: { age: number }) => user.age >= 18 && user.age < 30,
RunnableLambda.from((user) => `Hi ${user.name}, explore our trendy outfits for young adults!`),
],
RunnableLambda.from((user) => `Hello ${user.name}, discover our premium range!`),
]);
const result = await branch.invoke({ name: "Alice", age: 25 });
// Output: "Hi Alice, explore our trendy outfits for young adults!"
Data Manipulation and Assignment
Runnables that transform or prepare data for subsequent tasks:
RunnableAssign
Enhances or modifies the input data by adding new fields or updating existing ones, preparing it for subsequent processing steps.
import { RunnableAssign, RunnableLambda } from "@langchain/core/runnables";
const getGeolocation = RunnableLambda.from(async (x: { ip: string }) => {
// Simulate an API call to get geolocation
return { location: `Location for IP ${x.ip}` };
});
const runnableAssign = new RunnableAssign({ getGeolocation });
const res = await runnableAssign.invoke({ name: "John Doe", ip: "192.168.1.1" });
// Output: { name: "John Doe", ip: "192.168.1.1", getGeolocation: { location: "Location for IP 192.168.1.1" } }
RunnablePick
Selects and extracts specific fields from the input data, allowing focused processing of relevant information.
import { RunnablePick } from "@langchain/core/runnables";
const orderData = {
orderId: "12345",
customerEmail: "customer@example.com",
items: [{ productId: "A1", quantity: 2 }],
totalAmount: 99.99,
shippingAddress: "123 Main St",
};
const receiptInfoRunnable = new RunnablePick(["orderId", "customerEmail", "totalAmount"]);
const res = await receiptInfoRunnable.invoke(orderData);
// Output: { orderId: '12345', customerEmail: 'customer@example.com', totalAmount: 99.99 }
RunnablePassthrough
Passes the input data through without any changes, which is useful for maintaining data integrity within a workflow.
const chain = RunnableSequence.from([
{
question: new RunnablePassthrough(),
context: async () => loadContextFromStore(),
},
prompt,
llm,
outputParser,
]);
const response = await chain.invoke(
"I can pass a single string instead of an object since I'm using `RunnablePassthrough`."
);
RunnableMap
Applies transformations to each field in a map object, enabling individual processing of key-value pairs.
const sensorDataRunnable = RunnableMap.from({
temperature: RunnableLambda.from((data: { temp: number }) => `Temperature is ${data.temp}°C`),
humidity: RunnableLambda.from((data: { humidity: number }) => `Humidity is ${data.humidity}%`),
});
const result = await sensorDataRunnable.invoke({ temp: 22, humidity: 45 });
// Output: { temperature: 'Temperature is 22°C', humidity: 'Humidity is 45%' }
Sequence and Workflow Composition
Runnables that structure and execute tasks sequentially, enabling the creation of complex workflows:
RunnableSequence
Chains multiple Runnables in a linear fashion where the output of one becomes the input for the next, forming a step-by-step processing pipeline.
const imageProcessingChain = RunnableSequence.from([
readImageRunnable,
resizeImageRunnable,
applyFilterRunnable,
saveImageRunnable,
]);
const result = await imageProcessingChain.invoke('path/to/input/image.jpg');
RunnableEach
Applies a Runnable to each element in a collection, similar to a map function over an array, allowing batch processing.
import { RunnableEach, RunnableLambda } from "@langchain/core/runnables";
const personalizeEmail = RunnableLambda.from((name: string) => `Dear ${name}, we have an offer for you!`);
const sendEmail = emailSendingRunnable; // Assume this is defined elsewhere
const emailChain = new RunnableEach({
bound: personalizeEmail.pipe(sendEmail),
});
const result = await emailChain.invoke(["Alice", "Bob", "Carol"]);
// Emails are sent to Alice, Bob, and Carol.
RunnableParallel
Executes multiple Runnables simultaneously on the same input, enabling concurrent processing for efficiency.
import { RunnableLambda, RunnableParallel } from "@langchain/core/runnables";
const calculateMean = RunnableLambda.from((data: number[]) => {
return data.reduce((a, b) => a + b, 0) / data.length;
});
const calculateMedian = RunnableLambda.from((data: number[]) => {
const sorted = data.slice().sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
});
const calculateMode = RunnableLambda.from((data: number[]) => {
const frequency: { [key: number]: number } = {};
let maxFreq = 0;
let modes: number[] = [];
data.forEach((item) => {
frequency[item] = (frequency[item] || 0) + 1;
if (frequency[item] > maxFreq) {
maxFreq = frequency[item];
modes = [item];
} else if (frequency[item] === maxFreq) {
modes.push(item);
}
});
return modes;
});
const analysisChain = RunnableParallel.from({
mean: calculateMean,
median: calculateMedian,
mode: calculateMode,
});
const res = await analysisChain.invoke([1, 2, 2, 3, 4]);
// Output: { mean: 2.4, median: 2, mode: [2] }
Error Handling, Resilience, and Configuration
Runnables that enhance robustness with retry mechanisms and fallback options:
RunnableBinding
Creates a customized Runnable by pre-setting certain parameters or configurations, allowing for reusable components tailored to specific contexts.
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
const queryDatabase = (query: string, config?: RunnableConfig) => {
const dbConfig = config?.configurable?.dbConfig;
// Use dbConfig to establish a connection and execute the query
return `Executed query on ${dbConfig.host}: ${query}`;
};
const runnable = RunnableLambda.from(queryDatabase);
// Bind configuration for different environments
const prodRunnable = runnable.bind({ configurable: { dbConfig: { host: 'prod.db.example.com' } } });
const testRunnable = runnable.bind({ configurable: { dbConfig: { host: 'test.db.example.com' } } });
const result1 = await prodRunnable.invoke("SELECT * FROM users;");
// Output: "Executed query on prod.db.example.com: SELECT * FROM users;"
const result2 = await testRunnable.invoke("SELECT * FROM users;");
// Output: "Executed query on test.db.example.com: SELECT * FROM users;"
RunnableRetry
Automatically retries a Runnable upon failure according to specified retry policies, enhancing resilience against transient errors.
import { RunnableLambda } from "@langchain/core/runnables";
const fetchWeatherData = async (location: string): Promise<string> => {
// Simulate an API call that might fail
if (Math.random() < 0.7) {
throw new Error("Network error");
}
return `Weather data for ${location}`;
};
const fetchWeatherLambda = RunnableLambda.from(fetchWeatherData);
// Apply retry logic
const fetchWeatherWithRetry = fetchWeatherLambda.withRetry({ stopAfterAttempt: 5 });
try {
const res = await fetchWeatherWithRetry.invoke("New York");
console.log(res);
} catch (error) {
console.error("Failed to fetch weather data after retries:", error.message);
}
RunnableWithFallbacks
Provides alternative Runnables to execute if the primary one fails, ensuring the workflow can continue or degrade gracefully.
import { RunnableLambda, RunnableWithFallbacks } from "@langchain/core/runnables";
const primaryDataSource = async (id: string): Promise<string> => {
// Simulate failure
throw new Error("Primary data source is unavailable");
};
const secondaryDataSource = async (id: string): Promise<string> => {
return `Data for ${id} from secondary source`;
};
const primaryRunnable = RunnableLambda.from(primaryDataSource);
const fallbackRunnable = RunnableLambda.from(secondaryDataSource);
// Setup with fallback
const dataRunnable = primaryRunnable.withFallbacks([fallbackRunnable]);
const res = await dataRunnable.invoke("item123");
// Output: "Data for item123 from secondary source"
Putting It All Together
In the previous sections, we’ve explored individual Runnables and their roles in building modular workflows. Now, let’s see how we can combine these Runnables to create comprehensive, real-world applications. Below are three examples that demonstrate how to integrate multiple Runnables to solve complex problems.
Example 1: Intelligent Document Processing Pipeline
A company wants to automate the processing of incoming documents like invoices, receipts, and contracts. The goal is to classify the document type, extract relevant data, validate it, and store it in a database. The system should handle errors gracefully and retry operations if transient failures occur.
Runnables Used: RunnableSequence, RouterRunnable, RunnableParallel, RunnableRetry, RunnableWithFallbacks, RunnableAssign, RunnableLambda
import {
RunnableSequence,
RouterRunnable,
RunnableLambda,
} from "@langchain/core/runnables";
// Define a unified output type
type UnifiedOutput = {
type: string;
amount?: number;
dueDate?: string;
client?: string;
parties?: string[];
term?: string;
total?: number;
items?: string[];
};
// Step 1: OCR Processing (simulate with a function)
const ocrRunnable = RunnableLambda.from(async (imageBuffer: string) => {
// Simulate OCR processing
return "Extracted text: Invoice for Acme Corp";
});
// Step 2: Document Classification
const classifyDocument = RunnableLambda.from(async (text: string) => {
// Simulate document classification
if (text.includes("Invoice")) return "invoice";
if (text.includes("Contract")) return "contract";
return "receipt";
});
// Step 3: Data Extraction Runnables for each document type
const extractInvoiceData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
// Extract data specific to invoices
return {
type: "invoice",
amount: 1000,
dueDate: "2024-12-31",
client: "Acme Corp",
};
}
);
const extractContractData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
// Extract data specific to contracts
return {
type: "contract",
parties: ["Company A", "Company B"],
term: "2 years",
};
}
);
const extractReceiptData = RunnableLambda.from(
async (text: string): Promise<UnifiedOutput> => {
// Extract data specific to receipts
return { type: "receipt", total: 50, items: ["Item1", "Item2"] };
}
);
const dataExtractionRouter = new RouterRunnable({
runnables: {
invoice: extractInvoiceData,
contract: extractContractData,
receipt: extractReceiptData,
},
});
// Step 5: Data Validation
const validateData = RunnableLambda.from(async (data: UnifiedOutput) => {
// Perform validation logic
if (!data || !data.type)
throw new Error("Validation failed: Data is missing or invalid");
return { ...data, isValid: true };
});
// Step 6: Save to Database (simulate with a function)
const saveToDatabase = RunnableLambda.from(async (data: UnifiedOutput) => {
// Simulate saving to a database
return `Data saved: ${JSON.stringify(data)}`;
});
// Step 7: Build the workflow sequence
const documentProcessingWorkflow = RunnableSequence.from<string, any>([
ocrRunnable,
classifyDocument,
dataExtractionRouter,
validateData,
saveToDatabase.withRetry({ stopAfterAttempt: 3 }),
]);
// Step 8: Add error handling with fallbacks
const workflowWithFallback = documentProcessingWorkflow.withFallbacks({
fallbacks: [
RunnableLambda.from(async () => {
return "An error occurred. Please try again later.";
}),
],
});
// Execute the workflow
(async () => {
try {
const result = await workflowWithFallback.invoke("Document image bytes");
console.log(result);
// Expected Output: "Data saved: { type: 'invoice', amount: 1000, dueDate: '2024-12-31', client: 'Acme Corp', isValid: true }"
} catch (error: any) {
console.error("Failed to process document:", (error as Error).message);
}
})();
The workflow starts by converting the document image into text using ocrRunnable. The extracted text is classified into a document type (invoice, contract, or receipt). RouterRunnable directs the text to the appropriate data extraction Runnable based on the document type. The extracted data is validated and then saved to the database. The RunnableRetry ensures that saving is retried up to three times in case of transient failures. If any step fails, RunnableWithFallbacks provides a fallback message to handle errors gracefully.
Example 2: Personalized Recommendation Engine
An e-commerce platform wants to provide personalized product recommendations to users based on their browsing history and preferences.
Runnables Used: RunnableParallel, RunnableMap, RunnableBranch, RunnableWithFallbacks
import {
RunnableParallel,
RunnableMap,
RunnableBranch,
RunnableSequence,
RunnableLambda,
} from "@langchain/core/runnables";
// Step 1: Fetch user data from multiple sources in parallel
const fetchUserData = RunnableParallel.from({
browsingHistory: RunnableLambda.from(async (userId) => {
// Simulate fetching browsing history
return ["Item1", "Item2"];
}),
purchaseHistory: RunnableLambda.from(async (userId) => {
// Simulate fetching purchase history
return ["Item3"];
}),
});
// Step 2: Map over the fetched data to process it
const processUserData = RunnableMap.from({
browsingHistory: RunnableLambda.from((history: any[]) => {
// Process browsing history
return history.map((item) => `Processed ${item}`);
}),
purchaseHistory: RunnableLambda.from((history: any[]) => {
// Process purchase history
return history.map((item) => `Processed ${item}`);
}),
});
// Step 3: Define recommendation algorithms
const newUserRecommendations = RunnableLambda.from(async (user) => {
// Logic for new users
return ["Product A", "Product B", "Product C"];
});
const returningUserRecommendations = RunnableLambda.from(async (user) => {
// Logic for returning users based on history
return ["Product X", "Product Y", "Product Z"];
});
// Step 4: Branch based on user type
const recommendationBranch = RunnableBranch.from([
[(user: any) => user.isNew, newUserRecommendations],
returningUserRecommendations,
]);
// Step 5: Create a fallback recommendation system
const defaultRecommendations = RunnableLambda.from(async (user) => {
// Default recommendations
return ["Default Product 1", "Default Product 2"];
});
const recommendationWithFallback = recommendationBranch.withFallbacks([
defaultRecommendations,
]);
// Step 6: Sequence the entire workflow
const recommendationWorkflow = RunnableSequence.from([
fetchUserData,
processUserData,
(data) => ({ ...data, isNew: data.purchaseHistory.length === 0 }),
recommendationWithFallback,
]);
// Usage
const userId = "user123";
const recommendations = recommendationWorkflow.invoke(userId);
// Output: Personalized recommendations based on user data
The workflow begins by concurrently fetching the user’s browsing history, purchase history, and profile using RunnableParallel. Each piece of data is then processed individually using RunnableMap to prepare it for recommendation generation.
The RunnableBranch decides which recommendation algorithm to use based on the user’s profile:
- If the user is a premium member (
isPremiumMember
is true), it uses premiumUserRecommendations. - If the user has no purchase history (indicating a new user), it uses newUserRecommendations.
- Otherwise, it defaults to regularUserRecommendations.
If any step in the recommendation process fails, RunnableWithFallbacks ensures that the system provides a set of default recommendations, maintaining a good user experience.
Finally, RunnableSequence orchestrates the entire workflow, ensuring that each step happens in the correct order. The workflow is executed by invoking it with a userId
, and it outputs personalized recommendations based on the user’s data.
Example 3: Data Processing Pipeline for Analytics
A company needs to process large datasets to generate analytics reports involving data cleaning, transformation, analysis, and visualization.
Runnables Used: RunnableSequence, RunnableEach, RunnableRetry, RunnableBinding
import {
RunnableSequence,
RunnableEach,
RunnableLambda,
} from "@langchain/core/runnables";
// Step 1: Define data fetching with retry
const fetchData = RunnableLambda.from(async (source) => {
// Simulate data fetching, which may fail
if (Math.random() < 0.2) {
throw new Error("Data fetch error");
}
return `Data from ${source}`;
}).withRetry({ stopAfterAttempt: 3 });
// Step 2: Data cleaning
const cleanData = RunnableLambda.from((data) => {
// Perform data cleaning
return `Cleaned ${data}`;
});
// Step 3: Data transformation
const transformData = RunnableLambda.from((data) => {
// Transform data
return `Transformed ${data}`;
});
// Step 4: Data analysis
const analyzeData = RunnableLambda.from((data) => {
// Analyze data
return `Analysis results of ${data}`;
});
// Step 5: Data visualization
const visualizeData = RunnableLambda.from((analysis) => {
// Generate visualization
return `Visualization of ${analysis}`;
});
// Step 6: Sequence the steps
const dataProcessingSequence = RunnableSequence.from([
cleanData,
transformData,
analyzeData,
visualizeData,
]);
// Step 7: Process multiple data sources
const dataSources = ["Dataset A", "Dataset B", "Dataset C"];
const processAllData = new RunnableEach({
bound: fetchData.pipe(dataProcessingSequence),
});
// Usage
const reports = processAllData.invoke(dataSources);
// Output: Array of visualization results for each data source
This workflow handles data processing for multiple datasets from different sources. It begins by defining a fetchData runnable that is bound to specific data sources using RunnableBinding. Each data fetch operation is wrapped with RunnableRetry to handle transient failures by retrying up to three times.
The data fetched from each source undergoes a series of processing steps defined by RunnableSequence:
- Data Cleaning: Removes or corrects erroneous data.
- Data Transformation: Converts data into a suitable format for analysis.
- Data Analysis: Performs analytical computations.
- Data Visualization: Generates visual representations of the analysis.
RunnableEach is used to process multiple datasets in parallel. It applies the same processing sequence to each dataset.
Conclusion
The Runnable architecture in LangChain serves as a powerful foundation for building complex, modular workflows involving large language models (LLMs). Throughout this article, we’ve explored how Runnables can be created and combined to address various challenges:
- Routing and Branching: Utilizing RouterRunnable and RunnableBranch allows for dynamic execution paths based on runtime conditions.
- Data Manipulation and Assignment: Tools like RunnableAssign, RunnablePick, and RunnableMap offer flexible data transformation capabilities, preparing inputs for subsequent processing steps.
- Sequence and Workflow Composition: By chaining tasks using RunnableSequence, RunnableEach, and RunnableParallel, developers can orchestrate processes, whether they require sequential execution or parallel processing.
- Error Handling and Resilience: With RunnableRetry and RunnableWithFallbacks, workflows gracefully handle errors and provide fallback mechanisms.
Runnable promotes a structured approach to building LLM agents and chains. As you integrate LangChain into your projects, consider how Runnables can enhance your workflows, making them more flexible, resilient, and easier to maintain.
Opinions expressed by DZone contributors are their own.
Comments