Skip to main content

Overview

A Stage is an operation that can be performed on a Document. Stages are the building blocks of Pipelines, with each Stage performing a specific transformation, validation, or enrichment operation.
public abstract class Stage {
    public abstract Iterator<Document> processDocument(Document doc) 
        throws StageException;
}
Stages operate on documents in place - they modify the document directly and optionally return child documents that were generated.

Stage Lifecycle

Every Stage follows a well-defined lifecycle managed by the Pipeline:
// 1. Construction and validation
Stage stage = Stage.fromConfig(config);

// 2. Initialization (sets name, registers metrics)
stage.initialize(position, metricsPrefix);

// 3. Start (optional setup)
stage.start();

// 4. Processing (called for each document)
Iterator<Document> children = stage.processDocument(doc);

// 5. Stop (optional cleanup)
stage.stop();

Construction and Validation

Stages are constructed from configuration and must declare a Spec:
public class CopyFields extends Stage {
    
    public static final Spec SPEC = SpecBuilder.stage()
        .requiredString("source", "dest")
        .optionalBoolean("overwrite")
        .build();
    
    private final String source;
    private final String dest;
    private final boolean overwrite;
    
    public CopyFields(Config config) {
        super(config);  // Validates against SPEC
        this.source = config.getString("source");
        this.dest = config.getString("dest");
        this.overwrite = ConfigUtils.getOrDefault(config, "overwrite", false);
    }
}
The Spec automatically validates:
  • Required properties are present
  • Property types are correct
  • Standard stage properties (name, class, conditions, conditionPolicy)

Start and Stop

Stages can perform resource management:
@Override
public void start() throws StageException {
    // Load resources, initialize connections
    this.dictionary = loadDictionary(dictionaryPath);
    this.client = createHttpClient();
    log.info("Stage {} started", getName());
}

@Override
public void stop() throws StageException {
    // Release resources, close connections
    if (client != null) {
        client.close();
    }
    log.info("Stage {} stopped", getName());
}

Processing Documents

In-Place Transformation

Most stages modify the document and return null:
@Override
public Iterator<Document> processDocument(Document doc) throws StageException {
    String title = doc.getString("title");
    if (title != null) {
        doc.setField("title_normalized", title.toLowerCase().trim());
    }
    return null;  // No children generated
}

Generating Children

Some stages create child documents:
@Override
public Iterator<Document> processDocument(Document doc) throws StageException {
    String content = doc.getString("content");
    List<Document> chunks = new ArrayList<>();
    
    int chunkNum = 0;
    for (String chunk : splitIntoChunks(content, chunkSize)) {
        Document child = Document.create(
            doc.getId() + "_chunk_" + chunkNum++,
            doc.getRunId()
        );
        child.setField("content", chunk);
        child.setField("parent_id", doc.getId());
        child.setField("chunk_num", chunkNum);
        chunks.add(child);
    }
    
    return chunks.iterator();
}
Child documents are automatically passed through downstream stages only. They inherit the run ID from the parent but get their own unique IDs.

Dropping Documents

Mark documents for exclusion from indexing:
@Override
public Iterator<Document> processDocument(Document doc) throws StageException {
    String status = doc.getString("status");
    if ("deleted".equals(status) || "spam".equals(status)) {
        doc.setDropped(true);
    }
    return null;
}

Conditions

Stages can execute conditionally based on document content:
{
    name: "normalize-english-content"
    class: "com.kmwllc.lucille.stage.NormalizeText"
    fields: ["content"]
    
    # Only process documents with language=English
    conditions: [
        {
            fields: ["language"]
            values: ["English", "en"]
            operator: "must"
        }
    ]
}

Condition Properties

List of field names to check. If any of these fields exist on the document, the condition evaluates based on the operator and values.
fields: ["title", "content"]  # Check if title OR content exists
List of values to search for in the specified fields. If not provided, only field existence is checked.
fields: ["status"]
values: ["published", "active"]  # Field must contain one of these
Either "must" or "must_not". Defaults to "must".
  • must: Condition passes if criteria are met
  • must_not: Condition passes if criteria are NOT met
fields: ["status"]
values: ["deleted"]
operator: "must_not"  # Skip deleted documents

Multiple Conditions

Combine conditions with conditionPolicy:
{
    class: "com.kmwllc.lucille.stage.OpenAIEmbed"
    
    # Both conditions must be true
    conditionPolicy: "all"  # or "any" (default)
    
    conditions: [
        {
            fields: ["content"]
            operator: "must"
        },
        {
            fields: ["status"]
            values: ["draft"]
            operator: "must_not"
        }
    ]
}
Policies:
  • all: All conditions must pass (AND logic)
  • any: At least one condition must pass (OR logic)

Condition Evaluation

public boolean shouldProcess(Document doc) {
    if (doc.isDropped()) {
        return false;  // Never process dropped documents
    }
    return condition.test(doc);
}
The evaluation logic:
  1. Dropped documents are never processed
  2. If no conditions specified, always process
  3. Otherwise, evaluate conditions based on policy

Configuration

Stages are configured within pipeline definitions:
pipelines: [
    {
        name: "main-pipeline"
        stages: [
            {
                name: "copy-title"
                class: "com.kmwllc.lucille.stage.CopyFields"
                source: "title"
                dest: "title_copy"
            },
            {
                class: "com.kmwllc.lucille.stage.NormalizeText"
                fields: ["title", "content"]
                lowercase: true
            }
        ]
    }
]

Standard Properties

All stages support these properties:

class

Required. Fully-qualified Java class name

name

Optional. Unique identifier (auto-generated if omitted)

conditions

Optional. Array of condition objects

conditionPolicy

Optional. “all” or “any” (default: “any”)

Stage-Specific Properties

Each stage defines its own required and optional properties via its Spec:
public static final Spec SPEC = SpecBuilder.stage()
    .requiredString("sourceField", "destField")
    .optionalInt("maxLength")
    .optionalBoolean("preserveOriginal")
    .optionalList("patterns", new TypeReference<List<String>>(){})
    .build();

Common Transformation Patterns

Field Manipulation

Copy fields:
doc.setField("title_copy", doc.getString("title"));
Rename fields:
String value = doc.getString("old_name");
doc.removeField("old_name");
doc.setField("new_name", value);
Concatenate fields:
String firstName = doc.getString("first_name");
String lastName = doc.getString("last_name");
doc.setField("full_name", firstName + " " + lastName);

Multi-Valued Fields

Add values:
doc.addToField("tags", "important");
doc.addToField("tags", "reviewed");
// Result: tags = ["important", "reviewed"]
Process all values:
List<String> tags = doc.getStringList("tags");
if (tags != null) {
    for (String tag : tags) {
        doc.addToField("tags_normalized", tag.toLowerCase());
    }
}

Text Processing

Normalize text:
String text = doc.getString("content");
if (text != null) {
    text = text.toLowerCase()
                .replaceAll("\\s+", " ")
                .trim();
    doc.setField("content_normalized", text);
}
Extract patterns:
Pattern emailPattern = Pattern.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}");
Matcher matcher = emailPattern.matcher(doc.getString("content"));
while (matcher.find()) {
    doc.addToField("emails", matcher.group());
}

External Enrichment

Database lookup:
String userId = doc.getString("user_id");
ResultSet rs = statement.executeQuery(
    "SELECT * FROM users WHERE id = '" + userId + "'"
);
if (rs.next()) {
    doc.setField("user_name", rs.getString("name"));
    doc.setField("user_email", rs.getString("email"));
}
HTTP API call:
String productId = doc.getString("product_id");
HttpResponse<String> response = httpClient.send(
    HttpRequest.newBuilder()
        .uri(URI.create(apiUrl + "/products/" + productId))
        .build(),
    HttpResponse.BodyHandlers.ofString()
);
JsonNode data = objectMapper.readTree(response.body());
doc.setField("product_name", data.get("name").asText());

Error Handling

Throwing Exceptions

When a stage throws StageException, the document is marked as failed:
@Override
public Iterator<Document> processDocument(Document doc) throws StageException {
    String required = doc.getString("required_field");
    if (required == null) {
        throw new StageException("Missing required_field");
    }
    
    try {
        performTransformation(doc);
    } catch (IOException e) {
        throw new StageException("Transformation failed: " + e.getMessage(), e);
    }
    
    return null;
}

Graceful Degradation

Often better to handle errors without failing the document:
@Override
public Iterator<Document> processDocument(Document doc) throws StageException {
    try {
        String result = callExternalApi(doc.getString("id"));
        doc.setField("enriched_data", result);
    } catch (Exception e) {
        log.warn("Failed to enrich doc {}: {}", doc.getId(), e.getMessage());
        doc.setField("enrichment_error", e.getMessage());
        // Continue processing - don't throw
    }
    return null;
}

Using Conditions

Prevent errors by checking preconditions:
{
    class: "com.kmwllc.lucille.stage.ParseDate"
    sourceField: "date_string"
    destField: "date_parsed"
    
    # Only process if source field exists
    conditions: [
        { fields: ["date_string"], operator: "must" }
    ]
}

Metrics

Stages automatically collect performance metrics:
private Timer timer;
private Counter errorCounter;
private Counter childCounter;

public void initialize(int position, String metricsPrefix) {
    if (name == null) {
        this.name = "stage_" + position;
    }
    
    MetricRegistry metrics = SharedMetricRegistries.getOrCreate(METRICS_REG);
    String prefix = metricsPrefix + ".stage." + name;
    
    this.timer = metrics.timer(prefix + ".processDocumentTime");
    this.errorCounter = metrics.counter(prefix + ".errors");
    this.childCounter = metrics.counter(prefix + ".children");
}
Logged metrics:
Stage normalize-text metrics. 
  Docs processed: 10000. 
  Mean latency: 2.34 ms/doc. 
  Children: 0. 
  Errors: 12.

Built-in Stages

Lucille provides 80+ built-in stages. Common categories:

Field Operations

  • CopyFields: Copy field values
  • RenameFields: Rename fields
  • DeleteFields: Remove fields
  • SetStaticValues: Set constant values
  • Concatenate: Combine multiple fields

Text Processing

  • NormalizeText: Lowercase, trim, remove whitespace
  • TrimWhitespace: Remove leading/trailing whitespace
  • ApplyRegex: Apply regex transformations
  • ReplacePatterns: Find and replace patterns
  • RemoveDiacritics: Remove accents and diacritical marks

Data Validation

  • ValidateFields: Check required fields exist
  • DropDocument: Mark documents for dropping
  • Contains: Check if fields contain values

Parsing

  • ParseDate: Parse date strings
  • ParseJson: Parse JSON strings
  • ParseFloats: Parse numeric values
  • XPathExtractor: Extract from XML/HTML

Enrichment

  • QueryDatabase: Lookup data from SQL database
  • ElasticsearchLookup: Lookup from Elasticsearch
  • DictionaryLookup: Lookup from in-memory dictionary

Content Processing

  • FetchFileContent: Fetch file contents
  • ApplyFileHandlers: Extract text from files
  • ChunkText: Split text into chunks
  • DetectLanguage: Detect text language

AI/ML

  • OpenAIEmbed: Generate embeddings via OpenAI
  • PromptOllama: Generate text via Ollama
  • EmbeddedPython: Execute Python code

Creating Custom Stages

Extend the Stage class to create custom transformations:
public class CustomTransform extends Stage {
    
    public static final Spec SPEC = SpecBuilder.stage()
        .requiredString("inputField", "outputField")
        .optionalInt("threshold")
        .build();
    
    private final String inputField;
    private final String outputField;
    private final int threshold;
    
    public CustomTransform(Config config) {
        super(config);
        this.inputField = config.getString("inputField");
        this.outputField = config.getString("outputField");
        this.threshold = ConfigUtils.getOrDefault(config, "threshold", 100);
    }
    
    @Override
    public Iterator<Document> processDocument(Document doc) throws StageException {
        String input = doc.getString(inputField);
        if (input != null) {
            String output = performTransformation(input);
            doc.setField(outputField, output);
        }
        return null;
    }
    
    private String performTransformation(String input) {
        // Your transformation logic here
        return input.toUpperCase();
    }
}

Best Practices

  1. Single Responsibility: Each stage should do one thing well
  2. Null Safety: Always check for null before accessing field values
  3. Meaningful Names: Use descriptive stage names for debugging
  4. Condition Usage: Use conditions instead of if-statements when possible
  5. Error Handling: Decide between failing fast vs. graceful degradation
  6. Resource Management: Close connections and release resources in stop()
  7. Immutable State: Avoid modifying shared state between documents
  8. Document Spec: Declare all configuration properties in the Spec
  9. Test Thoroughly: Test with various document states and edge cases
  10. Log Appropriately: Log at appropriate levels (debug, info, warn, error)

Next Steps

Built-in Stages

Explore all available stages

Documents

Understand document structure

Pipelines

Learn pipeline orchestration

Custom Stages

Build your own stage