Home/Agentic AI/Haystack Agents/Custom Components

Haystack Agents

Master Haystack for building production-ready RAG agents and NLP pipelines

Creating Custom Components

Haystack's extensibility lets you create custom components that integrate seamlessly into pipelines. Add your own business logic, external APIs, or specialized processing.

Component Structure

Basic Custom Component Template
from haystack import component
from haystack.dataclasses import Document
from typing import List

@component
class CustomDocumentFilter:
    """
    Filters documents based on custom business logic.
    """
    
    def __init__(self, min_length: int = 100):
        self.min_length = min_length
    
    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document]) -> dict:
        """
        Filter documents by length and custom rules.
        
        :param documents: List of documents to filter
        :return: Dictionary with filtered documents
        """
        filtered = [
            doc for doc in documents 
            if len(doc.content) >= self.min_length
        ]
        
        return {"documents": filtered}

# Use in pipeline
pipeline.add_component("filter", CustomDocumentFilter(min_length=200))
pipeline.connect("retriever", "filter")
pipeline.connect("filter", "ranker")

Key Elements

  • @component decorator required
  • @component.output_types declares outputs
  • run() method processes data
  • • Return dict matching output types

Best Practices

  • • Type hints for all parameters
  • • Clear docstrings
  • • Handle edge cases gracefully
  • • Unit test in isolation

Real-World Custom Components

🌐 API Integration Component

Fetch real-time data from external APIs during pipeline execution.

@component
class WeatherAPIComponent:
    @component.output_types(weather_data=dict)
    def run(self, location: str) -> dict:
        response = requests.get(f"https://api.weather.com/{location}")
        return {"weather_data": response.json()}

🔐 Content Moderator

Filter sensitive or inappropriate content before LLM generation.

@component
class ContentModerator:
    @component.output_types(safe_documents=List[Document])
    def run(self, documents: List[Document]) -> dict:
        safe = [doc for doc in documents if not self.contains_pii(doc)]
        return {"safe_documents": safe}

📊 Analytics Logger

Track usage metrics and document retrieval patterns for optimization.

@component
class AnalyticsLogger:
    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document], query: str) -> dict:
        self.log_metrics(query, len(documents))
        return {"documents": documents}

Deployment & Production

REST API Deployment
# Haystack includes built-in REST API
from haystack.components.routers import ConditionalRouter
from haystack import Pipeline

# Define your pipeline
pipeline = Pipeline()
# ... add components ...

# Serve as REST API
from haystack.utils import launch_app

launch_app(
    pipeline=pipeline,
    host="0.0.0.0",
    port=8000
)

# Access at: http://localhost:8000/query
# POST request with: {"query": "your question"}

🐳 Docker Deployment

FROM python:3.10
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY pipeline.py .
CMD ["python", "pipeline.py"]

☁️ Cloud Deployment

Deploy to AWS Lambda, Azure Functions, or Google Cloud Run with containerization.

Testing & Debugging

Unit Testing Components
import pytest
from haystack.dataclasses import Document

def test_custom_filter():
    # Create test component
    filter_component = CustomDocumentFilter(min_length=100)
    
    # Create test documents
    docs = [
        Document(content="Short"),
        Document(content="This is a longer document that meets the minimum length requirement")
    ]
    
    # Run component
    result = filter_component.run(documents=docs)
    
    # Assert behavior
    assert len(result["documents"]) == 1
    assert len(result["documents"][0].content) >= 100

# Test pipeline execution
def test_pipeline_integration():
    pipeline = Pipeline()
    pipeline.add_component("filter", CustomDocumentFilter())
    # ... test full pipeline
    result = pipeline.run({"documents": test_docs})
    assert result is not None

🎯 Custom Component Guidelines

  • Single responsibility: Each component should do one thing well
  • Type safety: Use type hints and declare output types explicitly
  • Error handling: Gracefully handle edge cases and invalid inputs
  • Documentation: Clear docstrings explaining inputs, outputs, and behavior
  • Testing: Unit test components in isolation before pipeline integration
Prev