Chains in Legion

Chains are sequential workflows that connect blocks and agents to accomplish complex tasks. Learn how to create and use chains effectively in your Legion applications.

What are Chains?

Chains in Legion are powerful abstractions that allow you to:

  • Connect multiple blocks in a sequential workflow
  • Process data through a series of transformations
  • Combine the outputs of multiple blocks
  • Handle errors and retries gracefully
  • Monitor the execution of complex workflows

Creating Basic Chains

Here's how to create a simple chain that processes text through multiple blocks:

from typing import List
from pydantic import BaseModel, Field
from legion.blocks import block
from legion.groups.decorators import chain
# Define input/output schemas
class TextInput(BaseModel):
text: str = Field(description="Input text to process")
class TextAnalysisOutput(BaseModel):
sentiment: str = Field(description="Detected sentiment")
keywords: List[str] = Field(description="Extracted keywords")
summary: str = Field(description="Text summary")
# Define blocks for the chain
@block(
input_schema=TextInput,
output_schema=BaseModel,
tags=["text", "sentiment"]
)
def analyze_sentiment(input_data: TextInput) -> BaseModel:
"""Analyze the sentiment of input text."""
# Implementation details...
class SentimentOutput(BaseModel):
sentiment: str = Field(description="Detected sentiment")
return SentimentOutput(sentiment="positive")
@block(
input_schema=TextInput,
output_schema=BaseModel,
tags=["text", "extraction"]
)
def extract_keywords(input_data: TextInput) -> BaseModel:
"""Extract keywords from input text."""
# Implementation details...
class KeywordsOutput(BaseModel):
keywords: List[str] = Field(description="Extracted keywords")
return KeywordsOutput(keywords=["example", "keywords"])
@block(
input_schema=TextInput,
output_schema=BaseModel,
tags=["text", "summarization"]
)
def summarize_text(input_data: TextInput) -> BaseModel:
"""Summarize the input text."""
# Implementation details...
class SummaryOutput(BaseModel):
summary: str = Field(description="Text summary")
return SummaryOutput(summary="Example summary")
# Create a chain that combines the blocks
@chain
class TextAnalysisChain:
"""A chain that analyzes text by determining sentiment, extracting keywords, and generating a summary."""
# Define the blocks to be executed in sequence
members = [
analyze_sentiment,
extract_keywords,
summarize_text
]
# Define how to combine the outputs from each block
def combine_outputs(self, outputs):
return TextAnalysisOutput(
sentiment=outputs[0].sentiment,
keywords=outputs[1].keywords,
summary=outputs[2].summary
)
# Use the chain
analysis_chain = TextAnalysisChain()
input_data = TextInput(text="Example text for analysis")
result = await analysis_chain.aprocess(input_data)
print(f"Sentiment: {result.sentiment}")
print(f"Keywords: {', '.join(result.keywords)}")
print(f"Summary: {result.summary}")

Chain Features

Sequential Processing

Chains execute blocks in a defined sequence, passing data from one block to the next. This allows for complex data transformations through multiple processing steps.

Output Combination

The combine_outputs method allows you to merge the outputs from all blocks into a single, coherent result structure.

Error Handling

Chains provide built-in error handling mechanisms, allowing you to gracefully handle failures in any block and implement retry logic.

Monitoring

Track the execution of your chains with built-in monitoring capabilities, including execution time, success rates, and detailed logs.

Advanced Chain Patterns

Conditional Chains

You can create chains that conditionally execute different blocks based on the input or intermediate results:

@chain
class ConditionalProcessingChain:
"""A chain that processes data differently based on its type."""
async def aprocess(self, input_data):
# Determine which processing path to take
if input_data.data_type == "text":
# Process text data
result = await self.process_text(input_data.content)
elif input_data.data_type == "image":
# Process image data
result = await self.process_image(input_data.content)
else:
# Default processing
result = await self.process_default(input_data.content)
return result
async def process_text(self, text):
# Text-specific processing
pass
async def process_image(self, image):
# Image-specific processing
pass
async def process_default(self, content):
# Default processing
pass

Parallel Processing Chains

For improved performance, you can execute blocks in parallel when they don't depend on each other's outputs:

import asyncio
from legion.groups.decorators import chain
@chain
class ParallelProcessingChain:
"""A chain that processes data through multiple blocks in parallel."""
async def aprocess(self, input_data):
# Execute blocks in parallel
results = await asyncio.gather(
self.analyze_sentiment(input_data),
self.extract_keywords(input_data),
self.summarize_text(input_data)
)
# Combine results
return {
"sentiment": results[0],
"keywords": results[1],
"summary": results[2]
}
async def analyze_sentiment(self, text):
# Sentiment analysis implementation
pass
async def extract_keywords(self, text):
# Keyword extraction implementation
pass
async def summarize_text(self, text):
# Text summarization implementation
pass

Nested Chains

You can compose complex workflows by nesting chains within other chains:

@chain
class DataPreprocessingChain:
"""A chain that preprocesses data before analysis."""
members = [
clean_data,
normalize_data,
validate_data
]
@chain
class DataAnalysisChain:
"""A chain that analyzes preprocessed data."""
members = [
analyze_patterns,
generate_statistics,
create_visualizations
]
@chain
class CompleteDataPipeline:
"""A complete data processing pipeline that combines preprocessing and analysis."""
def __init__(self):
self.preprocessing_chain = DataPreprocessingChain()
self.analysis_chain = DataAnalysisChain()
async def aprocess(self, input_data):
# First, preprocess the data
preprocessed_data = await self.preprocessing_chain.aprocess(input_data)
# Then, analyze the preprocessed data
analysis_results = await self.analysis_chain.aprocess(preprocessed_data)
# Return the final results
return analysis_results

Best Practices

  • Single Responsibility: Design each block in your chain to perform a single, well-defined task. This improves reusability and makes your chains easier to maintain.
  • Error Handling: Implement proper error handling in your chains to gracefully recover from failures. Consider using try-except blocks and implementing retry logic for transient errors.
  • Input Validation: Validate inputs at each stage of your chain to catch errors early. Use Pydantic models to enforce schema validation.
  • Monitoring: Add logging and monitoring to track the performance and execution of your chains. This helps identify bottlenecks and troubleshoot issues.
  • Testing: Write unit tests for individual blocks and integration tests for entire chains. This ensures your chains work correctly and helps prevent regressions.

Related Resources

Blocks

Learn about the building blocks that make up chains.

Blocks Documentation →

Teams

Discover how to combine chains with agents in collaborative teams.

Teams Documentation →

Examples

See practical examples of chains in action.

Examples →