Skip to main content

What are Stages?

Stages are the core processing units in Lucille’s ETL pipeline. Each stage performs a specific operation on documents as they flow through the pipeline. Stages can transform data, enrich content, extract information, apply ML models, and much more.

Stage Architecture

All stages extend the base Stage class and implement the processDocument() method:
public abstract Iterator<Document> processDocument(Document doc) throws StageException;

Key Concepts

Each stage processes a Document object in place and can optionally return child documents. The stage can:
  • Modify existing fields
  • Add new fields
  • Remove fields
  • Generate child documents
  • Drop documents from the pipeline
Stages have a defined lifecycle:
  1. Construction - Stage is instantiated with a Config object
  2. Initialization - initialize() sets up metrics and naming
  3. Start - start() performs any required setup (loading resources, establishing connections)
  4. Processing - processDocument() is called for each document
  5. Stop - stop() releases resources and closes connections
Every stage must declare a public static Spec SPEC that defines its configuration parameters. Common configuration patterns:
  • source / dest - Field mapping
  • updateMode - How to handle existing field values (overwrite, append, skip)
  • conditions - Conditional execution based on document state
  • Stage-specific parameters

Conditional Execution

Stages can be configured to execute conditionally based on document field values.
conditions
List<Config>
List of conditions that determine if the stage should process a document.Each condition can specify:
  • fields - Fields to check for existence or values
  • values - Specific values to match against
  • operator - Either must or must_not
conditionPolicy
String
default:"all"
How to combine multiple conditions:
  • all - All conditions must be satisfied (AND logic)
  • any - At least one condition must be satisfied (OR logic)

Example: Conditional Stage Execution

stages:
  - class: com.kmwllc.lucille.stage.DetectLanguage
    name: detect_english_language
    source: ["content"]
    languageField: "language"
    conditions:
      - fields: ["content"]
        operator: must
    conditionPolicy: all

Update Modes

Many stages support an updateMode parameter that controls how destination fields are updated:
Replace any existing values in the destination field with new values.
updateMode: overwrite  # default for most stages

Common Configuration Parameters

name
String
Unique identifier for the stage instance. Used in logging and metrics. If not specified, defaults to stage_N where N is the position in the pipeline.
class
String
required
Fully qualified class name of the stage implementation.Example: com.kmwllc.lucille.stage.ApplyRegex

Stage Categories

Lucille includes 60+ built-in stages organized into functional categories:

Text Processing

Stages for manipulating and transforming text content:
  • Regular expression matching and extraction
  • Text normalization and case conversion
  • String concatenation and formatting
  • Whitespace trimming
  • Pattern replacement
View Text Processing Stages →

Data Transformation

Stages for field operations and data manipulation:
  • Copying and renaming fields
  • Field deletion and value filtering
  • Type conversion and parsing
  • Static value assignment
  • Field flattening and restructuring
View Data Transformation Stages →

Enrichment

Stages for enhancing documents with additional information:
  • Language detection
  • Dictionary and entity lookup
  • Database queries
  • External API calls
  • Geolocation enrichment
View Enrichment Stages →

AI & Machine Learning

Stages leveraging AI/ML models:
  • OpenAI embeddings
  • LLM prompting (Ollama)
  • Text chunking for RAG
  • Vector generation
  • Entity extraction
View AI/ML Stages →

Creating Custom Stages

To create a custom stage:
  1. Extend the Stage class
public class MyCustomStage extends Stage {
  public static final Spec SPEC = SpecBuilder.stage()
      .requiredString("myParam")
      .optionalBoolean("myFlag")
      .build();

  public MyCustomStage(Config config) {
    super(config);
    // Initialize stage-specific fields
  }

  @Override
  public void start() throws StageException {
    // Perform setup (load resources, etc.)
  }

  @Override
  public Iterator<Document> processDocument(Document doc) throws StageException {
    // Process the document
    return null; // or return iterator of child documents
  }

  @Override
  public void stop() throws StageException {
    // Clean up resources
  }
}
  1. Define the Spec
Use SpecBuilder to declare required and optional parameters.
  1. Implement processDocument()
This is where your stage’s logic lives. Return null if no child documents are generated.
  1. Handle Resources
Use start() and stop() for resource management to ensure proper cleanup.

Best Practices

Validate Configuration

Use the Spec to validate all configuration at startup, not during document processing.

Handle Missing Fields

Always check if fields exist before accessing them using doc.has(fieldName).

Use UpdateMode

Support updateMode parameter to give users control over field updates.

Log Appropriately

Use appropriate log levels and include document IDs in error messages.

Performance Considerations

  • Minimize I/O - Perform expensive operations (file access, network calls) in start() when possible
  • Batch Operations - Process multiple fields or values together to reduce overhead
  • Lazy Loading - Only load resources when first needed
  • Thread Safety - Stages must be thread-safe as multiple workers may use the same instance

Error Handling

Stages should throw StageException for processing errors:
if (!doc.has(sourceField)) {
  throw new StageException("Required field '" + sourceField + "' not found in document " + doc.getId());
}
The pipeline will track error counts in metrics and can be configured to:
  • Skip the document and continue
  • Halt processing
  • Route to an error handler

Metrics

Each stage automatically tracks:
  • processDocumentTime - Time spent processing documents
  • errors - Count of errors encountered
  • children - Count of child documents generated
Metrics are accessible via the metrics registry and can be exported to monitoring systems.