Chains in Legion
Chains are sequential workflows that connect blocks and agents to accomplish complex tasks. Learn how to create and use chains effectively in your Legion applications.
What are Chains?
Chains in Legion are powerful abstractions that allow you to:
- Connect multiple blocks in a sequential workflow
- Process data through a series of transformations
- Combine the outputs of multiple blocks
- Handle errors and retries gracefully
- Monitor the execution of complex workflows
Creating Basic Chains
Here's how to create a simple chain that processes text through multiple blocks:
from typing import Listfrom pydantic import BaseModel, Fieldfrom legion.blocks import blockfrom legion.groups.decorators import chain# Define input/output schemasclass TextInput(BaseModel):text: str = Field(description="Input text to process")class TextAnalysisOutput(BaseModel):sentiment: str = Field(description="Detected sentiment")keywords: List[str] = Field(description="Extracted keywords")summary: str = Field(description="Text summary")# Define blocks for the chain@block(input_schema=TextInput,output_schema=BaseModel,tags=["text", "sentiment"])def analyze_sentiment(input_data: TextInput) -> BaseModel:"""Analyze the sentiment of input text."""# Implementation details...class SentimentOutput(BaseModel):sentiment: str = Field(description="Detected sentiment")return SentimentOutput(sentiment="positive")@block(input_schema=TextInput,output_schema=BaseModel,tags=["text", "extraction"])def extract_keywords(input_data: TextInput) -> BaseModel:"""Extract keywords from input text."""# Implementation details...class KeywordsOutput(BaseModel):keywords: List[str] = Field(description="Extracted keywords")return KeywordsOutput(keywords=["example", "keywords"])@block(input_schema=TextInput,output_schema=BaseModel,tags=["text", "summarization"])def summarize_text(input_data: TextInput) -> BaseModel:"""Summarize the input text."""# Implementation details...class SummaryOutput(BaseModel):summary: str = Field(description="Text summary")return SummaryOutput(summary="Example summary")# Create a chain that combines the blocks@chainclass TextAnalysisChain:"""A chain that analyzes text by determining sentiment, extracting keywords, and generating a summary."""# Define the blocks to be executed in sequencemembers = [analyze_sentiment,extract_keywords,summarize_text]# Define how to combine the outputs from each blockdef combine_outputs(self, outputs):return TextAnalysisOutput(sentiment=outputs[0].sentiment,keywords=outputs[1].keywords,summary=outputs[2].summary)# Use the chainanalysis_chain = TextAnalysisChain()input_data = TextInput(text="Example text for analysis")result = await analysis_chain.aprocess(input_data)print(f"Sentiment: {result.sentiment}")print(f"Keywords: {', '.join(result.keywords)}")print(f"Summary: {result.summary}")
Chain Features
Sequential Processing
Chains execute blocks in a defined sequence, passing data from one block to the next. This allows for complex data transformations through multiple processing steps.
Output Combination
The combine_outputs
method allows you to merge the outputs from all blocks into a single, coherent result structure.
Error Handling
Chains provide built-in error handling mechanisms, allowing you to gracefully handle failures in any block and implement retry logic.
Monitoring
Track the execution of your chains with built-in monitoring capabilities, including execution time, success rates, and detailed logs.
Advanced Chain Patterns
Conditional Chains
You can create chains that conditionally execute different blocks based on the input or intermediate results:
@chainclass ConditionalProcessingChain:"""A chain that processes data differently based on its type."""async def aprocess(self, input_data):# Determine which processing path to takeif input_data.data_type == "text":# Process text dataresult = await self.process_text(input_data.content)elif input_data.data_type == "image":# Process image dataresult = await self.process_image(input_data.content)else:# Default processingresult = await self.process_default(input_data.content)return resultasync def process_text(self, text):# Text-specific processingpassasync def process_image(self, image):# Image-specific processingpassasync def process_default(self, content):# Default processingpass
Parallel Processing Chains
For improved performance, you can execute blocks in parallel when they don't depend on each other's outputs:
import asynciofrom legion.groups.decorators import chain@chainclass ParallelProcessingChain:"""A chain that processes data through multiple blocks in parallel."""async def aprocess(self, input_data):# Execute blocks in parallelresults = await asyncio.gather(self.analyze_sentiment(input_data),self.extract_keywords(input_data),self.summarize_text(input_data))# Combine resultsreturn {"sentiment": results[0],"keywords": results[1],"summary": results[2]}async def analyze_sentiment(self, text):# Sentiment analysis implementationpassasync def extract_keywords(self, text):# Keyword extraction implementationpassasync def summarize_text(self, text):# Text summarization implementationpass
Nested Chains
You can compose complex workflows by nesting chains within other chains:
@chainclass DataPreprocessingChain:"""A chain that preprocesses data before analysis."""members = [clean_data,normalize_data,validate_data]@chainclass DataAnalysisChain:"""A chain that analyzes preprocessed data."""members = [analyze_patterns,generate_statistics,create_visualizations]@chainclass CompleteDataPipeline:"""A complete data processing pipeline that combines preprocessing and analysis."""def __init__(self):self.preprocessing_chain = DataPreprocessingChain()self.analysis_chain = DataAnalysisChain()async def aprocess(self, input_data):# First, preprocess the datapreprocessed_data = await self.preprocessing_chain.aprocess(input_data)# Then, analyze the preprocessed dataanalysis_results = await self.analysis_chain.aprocess(preprocessed_data)# Return the final resultsreturn analysis_results
Best Practices
- Single Responsibility: Design each block in your chain to perform a single, well-defined task. This improves reusability and makes your chains easier to maintain.
- Error Handling: Implement proper error handling in your chains to gracefully recover from failures. Consider using try-except blocks and implementing retry logic for transient errors.
- Input Validation: Validate inputs at each stage of your chain to catch errors early. Use Pydantic models to enforce schema validation.
- Monitoring: Add logging and monitoring to track the performance and execution of your chains. This helps identify bottlenecks and troubleshoot issues.
- Testing: Write unit tests for individual blocks and integration tests for entire chains. This ensures your chains work correctly and helps prevent regressions.