Skip to main content

Overview

A Pipeline is a sequence of processing Stages that documents flow through for transformation and enrichment. Pipelines orchestrate the application of Stages to incoming documents, handling the flow of both parent documents and any children they generate.
public class Pipeline {
    private final ArrayList<Stage> stages = new ArrayList<>();
    
    public Iterator<Document> processDocument(Document document) throws StageException {
        Iterator<Document> result = document.iterator();
        for (Stage stage : stages) {
            result = stage.apply(result);
        }
        return result;
    }
}

Pipeline Architecture

Sequential Stage Processing

Documents flow through stages in the order they are defined. Each stage receives the output from the previous stage:
Document → Stage 1 → Stage 2 → Stage 3 → ... → Stage N → Indexer
             ↓         ↓         ↓                ↓
          children  children  children         children
Child documents generated by a stage are passed through downstream stages only, not through earlier stages. This ensures children inherit transformations that occur after their creation.

Example Flow

Consider a pipeline with stages S1, S2, S3, S4:
Parent Doc (id=1)
  → S1 (validates)
  → S2 (generates child with id=2)
  → S3 (processes both)
  → S4 (processes both)

Result: 
  - Doc 1: processed by S1, S2, S3, S4
  - Doc 2: processed by S3, S4 only
Here’s the implementation:
// Stage S2 generates a child
public Iterator<Document> processDocument(Document doc) {
    Document child = Document.create("2", doc.getRunId());
    child.setField("parent_id", doc.getId());
    return Collections.singletonList(child).iterator();
}

// Pipeline automatically ensures child flows through S3, S4

Configuration

Pipelines are configured in the pipelines array:
pipelines: [
    {
        name: "main-pipeline"
        stages: [
            {
                name: "validate-required-fields"
                class: "com.kmwllc.lucille.stage.ValidateFields"
                requiredFields: ["id", "title", "content"]
            },
            {
                name: "detect-language"
                class: "com.kmwllc.lucille.stage.DetectLanguage"
                sourceField: "content"
                destField: "language"
            },
            {
                name: "normalize-text"
                class: "com.kmwllc.lucille.stage.NormalizeText"
                fields: ["title", "content"]
                lowercase: true
                removeWhitespace: true
            },
            {
                name: "generate-teaser"
                class: "com.kmwllc.lucille.stage.CreateStaticTeaser"
                sourceField: "content"
                destField: "teaser"
                maxLength: 200
            }
        ]
    },
    {
        name: "file-pipeline"
        stages: [
            {
                class: "com.kmwllc.lucille.stage.FetchFileContent"
            },
            {
                class: "com.kmwllc.lucille.stage.ApplyFileHandlers"
            }
        ]
    }
]

Pipeline Properties

name

Required. Unique identifier for this pipeline. Connectors reference pipelines by name.

stages

Required. Array of stage configurations defining the transformation sequence.

Document Flow Patterns

Linear Flow

The simplest pattern where each document flows through all stages:
Input: Doc A
S1: Transform A → A'
S2: Transform A' → A''
S3: Transform A'' → A'''
Output: A'''

Child Generation Flow

Stages can generate child documents that flow through downstream stages:
Input: Doc A
S1: Transform A → A'
S2: Split A' → [A'', Child B, Child C]
S3: Transform all → [A''', B', C']
S4: Transform all → [A'''', B'', C'']
Output: A'''', B'', C''
Example: Chunking a large document
// Stage that splits document into chunks
public Iterator<Document> processDocument(Document doc) {
    String content = doc.getString("content");
    List<Document> chunks = new ArrayList<>();
    
    for (String chunk : splitIntoChunks(content)) {
        Document child = Document.create(
            doc.getId() + "_chunk_" + chunks.size(),
            doc.getRunId()
        );
        child.setField("content", chunk);
        child.setField("parent_id", doc.getId());
        chunks.add(child);
    }
    
    return chunks.iterator();
}

Document Dropping

Documents can be marked as dropped to prevent indexing:
// Stage that drops documents based on criteria
public Iterator<Document> processDocument(Document doc) {
    if (doc.getString("status").equals("deleted")) {
        doc.setDropped(true);
    }
    return null; // No children
}
Dropped documents still flow through remaining stages but are filtered out before indexing. They can be conditionally processed by later stages.

Parallelism

Worker-Level Parallelism

Multiple workers can process different documents through the same pipeline simultaneously:
worker {
    threads: 4  // Local mode
}
Distributed mode automatically enables parallelism through multiple Worker processes:
# Terminal 1
java -cp lucille.jar Worker pipeline1

# Terminal 2 
java -cp lucille.jar Worker pipeline1

# Terminal 3
java -cp lucille.jar Worker pipeline1

Stage Execution

Within a single worker, stages execute sequentially for each document:
Worker 1: Doc A → S1 → S2 → S3 → S4
Worker 2: Doc B → S1 → S2 → S3 → S4
Worker 3: Doc C → S1 → S2 → S3 → S4
This ensures:
  • Predictable transformation order
  • Safe state management within stages
  • Simpler debugging and testing
If you need stage-level parallelism, split your pipeline into multiple pipelines with separate connectors and use the Multi-Pipeline pattern.

Stage Lifecycle

Stages have a lifecycle managed by the pipeline:
// Pipeline initialization
pipeline.addStage(stage, metricsPrefix);

// Called once when pipeline starts
stage.start();

// Called for each document
Iterator<Document> results = stage.apply(document);

// Called once when pipeline stops
stage.stop();

Initialization

public void addStage(Stage stage, String metricsPrefix) 
    throws PipelineException, StageException {
    
    stage.initialize(stages.size() + 1, metricsPrefix);
    
    if (stages.stream().anyMatch(s -> stage.getName().equals(s.getName()))) {
        throw new PipelineException(
            "Two stages cannot have the same name: " + stage.getName()
        );
    }
    
    stages.add(stage);
}
Stage names must be unique within a pipeline to:
  • Enable clear metrics collection
  • Support targeted debugging
  • Avoid configuration ambiguity

Start and Stop

Stages can perform setup and cleanup:
@Override
public void start() throws StageException {
    // Load resources, open connections
    this.dictionary = loadDictionary(dictionaryPath);
    this.httpClient = HttpClient.newHttpClient();
}

@Override
public void stop() throws StageException {
    // Release resources, close connections
    if (httpClient != null) {
        httpClient.close();
    }
}

Pipeline Patterns

Validation Pipeline

Validate and filter documents early:
stages: [
    { class: "com.kmwllc.lucille.stage.ValidateFields"
      requiredFields: ["id", "title"] }
    { class: "com.kmwllc.lucille.stage.DropDocument"
      conditions: [{fields: ["status"], values: ["deleted"]}] }
    # ... transformation stages
]

Enrichment Pipeline

Add information from external sources:
stages: [
    { class: "com.kmwllc.lucille.stage.QueryDatabase"
      query: "SELECT * FROM users WHERE id = ?"
      queryParam: "user_id" }
    { class: "com.kmwllc.lucille.stage.ElasticsearchLookup"
      index: "products"
      queryField: "product_id" }
    # ... additional enrichment
]

Chunking Pipeline

Split large documents into smaller pieces:
stages: [
    { class: "com.kmwllc.lucille.stage.FetchFileContent" }
    { class: "com.kmwllc.lucille.stage.ChunkText"
      sourceField: "content"
      chunkSize: 512
      overlap: 50 }
    { class: "com.kmwllc.lucille.stage.OpenAIEmbed"
      sourceField: "content"
      destField: "embedding" }
]

Multi-Pipeline Pattern

Use multiple pipelines for different document types:
connectors: [
    { name: "docs", pipeline: "document-pipeline" }
    { name: "images", pipeline: "image-pipeline" }
]

pipelines: [
    { name: "document-pipeline"
      stages: [ /* text processing */ ] }
    { name: "image-pipeline" 
      stages: [ /* image processing */ ] }
]

Error Handling

Stage Exceptions

When a stage throws StageException, the document is marked as failed:
@Override
public Iterator<Document> processDocument(Document doc) 
    throws StageException {
    try {
        performTransformation(doc);
        return null;
    } catch (IOException e) {
        throw new StageException("Transformation failed", e);
    }
}
The Worker catches the exception and:
  1. Logs the error
  2. Sends a FAIL event for the document
  3. Continues processing other documents

Conditional Execution

Use conditions to skip stages for certain documents:
{
    class: "com.kmwllc.lucille.stage.OpenAIEmbed"
    conditions: [
        { fields: ["content"], operator: "must" }
        { fields: ["status"], values: ["draft"], operator: "must_not" }
    ]
}
See Stages - Conditions for details.

Metrics Collection

Pipelines automatically collect metrics for each stage:
public void initialize(int position, String metricsPrefix) 
    throws StageException {
    
    if (name == null) {
        this.name = "stage_" + position;
    }
    
    MetricRegistry metrics = SharedMetricRegistries.getOrCreate(METRICS_REG);
    this.timer = metrics.timer(metricsPrefix + ".stage." + name + ".processDocumentTime");
    this.errorCounter = metrics.counter(metricsPrefix + ".stage." + name + ".errors");
    this.childCounter = metrics.counter(metricsPrefix + ".stage." + name + ".children");
}
Available metrics:
  • processDocumentTime: Time spent processing each document
  • errors: Number of documents that failed in this stage
  • children: Number of child documents generated
Viewing metrics:
pipeline.logMetrics();
// Logs: Stage myStage metrics. Docs processed: 1000. 
//       Mean latency: 12.34 ms/doc. Children: 50. Errors: 2.

Pipeline Validation

Validate your pipeline configuration before running:
Config config = ConfigFactory.load("myconfig.conf");
Map<String, List<Exception>> errors = 
    Runner.runInValidationMode(config);

if (!errors.isEmpty()) {
    errors.forEach((name, exceptions) -> {
        System.out.println("Pipeline: " + name);
        exceptions.forEach(e -> 
            System.out.println("  - " + e.getMessage()));
    });
}
Validation checks:
  • All stage classes exist and are loadable
  • Required stage configuration properties are present
  • Stage names are unique within each pipeline
  • Pipeline names are unique
  • Referenced resources (files, dictionaries) exist

Best Practices

  1. Order Stages Carefully: Place validation and filtering stages early
  2. Use Meaningful Names: Name stages to describe their purpose
  3. Keep Stages Focused: Each stage should do one thing well
  4. Handle Errors Gracefully: Use conditions and dropped flags appropriately
  5. Monitor Metrics: Track stage performance to identify bottlenecks
  6. Test Incrementally: Add and test stages one at a time
  7. Document Dependencies: Note if stages depend on fields from earlier stages
  8. Consider Parallelism: Use multiple workers for throughput

Next Steps

Stages

Learn about individual stage transformations

Documents

Understand document structure and fields

Built-in Stages

Explore available stage implementations

Custom Stages

Build your own stage