Skip to main content
This guide teaches you how to write custom stages to implement your own document processing logic in Lucille.

Overview

Custom stages allow you to:
  • Implement domain-specific transformations
  • Integrate with external APIs and services
  • Add custom validation and enrichment logic
  • Extend Lucille with proprietary algorithms

Stage Basics

All stages in Lucille extend the Stage abstract class and implement the processDocument method.

Minimal Stage Example

package com.example.lucille.stage;

import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Stage;
import com.kmwllc.lucille.core.StageException;
import com.kmwllc.lucille.core.spec.Spec;
import com.kmwllc.lucille.core.spec.SpecBuilder;
import com.typesafe.config.Config;
import java.util.Iterator;

public class UpperCaseStage extends Stage {
  
  // Every stage MUST declare a public static SPEC
  public static final Spec SPEC = SpecBuilder.stage()
      .requiredString("sourceField", "destField")
      .build();
  
  private final String sourceField;
  private final String destField;
  
  public UpperCaseStage(Config config) {
    super(config);  // Always call super(config)
    
    // Read configuration parameters
    this.sourceField = config.getString("sourceField");
    this.destField = config.getString("destField");
  }
  
  @Override
  public Iterator<Document> processDocument(Document doc) 
      throws StageException {
    
    // Check if source field exists
    if (!doc.has(sourceField)) {
      return null;  // Return null if no processing needed
    }
    
    // Get value, transform, and set
    String value = doc.getString(sourceField);
    doc.setField(destField, value.toUpperCase());
    
    return null;  // Return null for no child documents
  }
}

Configuration via SPEC

The SPEC defines valid configuration parameters and types:
public static final Spec SPEC = SpecBuilder.stage()
    .requiredString("sourceField", "destField")  // Required params
    .optionalString("format")                     // Optional params
    .optionalNumber("maxLength")                  // Numbers
    .optionalBoolean("caseSensitive")             // Booleans
    .build();
All stages automatically have these properties: name, class, conditions, and conditionPolicy.

Configuration Types

public static final Spec SPEC = SpecBuilder.stage()
    .requiredString("field")      // Single required string
    .optionalString("format")     // Single optional string
    .build();

// Usage
String field = config.getString("field");
String format = config.hasPath("format") 
    ? config.getString("format") 
    : "default";

Stage Lifecycle Methods

start()

Called once when the stage is initialized, before processing any documents:
@Override
public void start() throws StageException {
  // Initialize connections
  // Load models or resources
  // Validate configuration
  
  if (inputFile != null) {
    try {
      data = Files.readAllLines(Path.of(inputFile));
    } catch (IOException e) {
      throw new StageException("Failed to load file", e);
    }
  }
}

stop()

Called once when the stage is shut down:
@Override
public void stop() throws StageException {
  // Close connections
  // Free resources
  // Clean up
  
  if (connection != null) {
    connection.close();
  }
}

Working with Documents

Reading Fields

// Check if field exists
if (doc.has("fieldName")) {
  // Get single value
  String value = doc.getString("fieldName");
  
  // Get all values (fields can be multivalued)
  List<String> values = doc.getStringList("fieldName");
  
  // Check for non-null value
  if (doc.hasNonNull("fieldName")) {
    // Process...
  }
}

// Get typed values
Integer num = doc.getInt("count");
Boolean flag = doc.getBoolean("active");

Writing Fields

// Set field (replaces existing values)
doc.setField("newField", "value");

// Add value (appends to existing)
doc.addToField("multiField", "value1");
doc.addToField("multiField", "value2");

// Set or add (convenience method)
doc.setOrAdd("field", "value");

// Remove field
doc.removeField("obsoleteField");

// Update with mode
import com.kmwllc.lucille.core.UpdateMode;

doc.update("field", UpdateMode.OVERWRITE, "value");
doc.update("field", UpdateMode.APPEND, "value");
doc.update("field", UpdateMode.SKIP, "value");

Nested JSON Fields

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

// Read nested JSON
List<Document.Segment> path = Document.Segment.parse("user.profile.name");
JsonNode value = doc.getNestedJson(path);

// Write nested JSON
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.createObjectNode()
    .put("name", "John")
    .put("age", 30);
    
List<Document.Segment> destPath = Document.Segment.parse("user.info");
doc.setNestedJson(destPath, node);

Creating Child Documents

Stages can generate child documents:
@Override
public Iterator<Document> processDocument(Document doc) 
    throws StageException {
  
  List<Document> children = new ArrayList<>();
  
  // Create child documents
  for (int i = 0; i < 5; i++) {
    String childId = doc.getId() + "-child-" + i;
    Document child = Document.create(childId);
    
    child.setField("parent_id", doc.getId());
    child.setField("index", i);
    child.setField("data", "Child " + i);
    
    children.add(child);
  }
  
  // Return iterator over children
  return children.iterator();
}
Child documents are processed by subsequent stages in the pipeline. Return null if no children are generated.

Example: CopyFields Stage

Real implementation from Lucille core:
package com.kmwllc.lucille.stage;

import com.fasterxml.jackson.core.type.TypeReference;
import com.kmwllc.lucille.core.ConfigUtils;
import com.kmwllc.lucille.core.spec.Spec;
import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Stage;
import com.kmwllc.lucille.core.StageException;
import com.kmwllc.lucille.core.UpdateMode;
import com.kmwllc.lucille.core.spec.SpecBuilder;
import com.typesafe.config.Config;
import java.util.*;

public class CopyFields extends Stage {
  
  public static final Spec SPEC = SpecBuilder.stage()
      .requiredParent("fieldMapping", new TypeReference<Map<String, Object>>() {})
      .optionalString("updateMode")
      .build();
  
  private final Map<String, Object> fieldMapping;
  private final UpdateMode updateMode;
  private List<Pair<String,String>> fieldPairs = new ArrayList<>();
  
  public CopyFields(Config config) {
    super(config);
    this.fieldMapping = config.getConfig("fieldMapping").root().unwrapped();
    this.updateMode = UpdateMode.fromConfig(config);
  }
  
  @Override
  public void start() throws StageException {
    if (fieldMapping.size() == 0) {
      throw new StageException("fieldMapping must have at least one pair");
    }
    
    // Build field pairs at start time
    for (Entry<String, Object> entry : fieldMapping.entrySet()) {
      if (entry.getValue() instanceof String) {
        fieldPairs.add(Pair.of(entry.getKey(), (String) entry.getValue()));
      } else if (entry.getValue() instanceof List) {
        for (String val : (List<String>) entry.getValue()) {
          fieldPairs.add(Pair.of(entry.getKey(), val));
        }
      }
    }
  }
  
  @Override
  public Iterator<Document> processDocument(Document doc) {
    for (Pair<String, String> fieldPair : fieldPairs) {
      if (!doc.has(fieldPair.getKey())) {
        continue;
      }
      doc.update(fieldPair.getValue(), updateMode, 
          doc.getStringList(fieldPair.getKey()).toArray(new String[0]));
    }
    return null;
  }
}

Example: API Integration Stage

Call external APIs to enrich documents:
package com.example.lucille.stage;

import com.kmwllc.lucille.core.*;
import com.kmwllc.lucille.core.spec.*;
import com.typesafe.config.Config;
import java.net.http.*;
import java.net.URI;
import java.util.Iterator;

public class GeocodingStage extends Stage {
  
  public static final Spec SPEC = SpecBuilder.stage()
      .requiredString("addressField")
      .optionalString("latField", "lonField", "apiKey")
      .build();
  
  private final String addressField;
  private final String latField;
  private final String lonField;
  private final String apiKey;
  private HttpClient httpClient;
  
  public GeocodingStage(Config config) {
    super(config);
    this.addressField = config.getString("addressField");
    this.latField = ConfigUtils.getOrDefault(config, "latField", "latitude");
    this.lonField = ConfigUtils.getOrDefault(config, "lonField", "longitude");
    this.apiKey = ConfigUtils.getOrDefault(config, "apiKey", null);
  }
  
  @Override
  public void start() throws StageException {
    this.httpClient = HttpClient.newHttpClient();
  }
  
  @Override
  public Iterator<Document> processDocument(Document doc) 
      throws StageException {
    
    if (!doc.has(addressField)) {
      return null;
    }
    
    String address = doc.getString(addressField);
    
    try {
      // Build API request
      String url = String.format(
          "https://api.geocoding.com/geocode?address=%s&key=%s",
          URLEncoder.encode(address, StandardCharsets.UTF_8),
          apiKey
      );
      
      HttpRequest request = HttpRequest.newBuilder()
          .uri(URI.create(url))
          .GET()
          .build();
      
      // Make request
      HttpResponse<String> response = httpClient.send(
          request,
          HttpResponse.BodyHandlers.ofString()
      );
      
      // Parse response (simplified)
      if (response.statusCode() == 200) {
        JsonNode json = new ObjectMapper().readTree(response.body());
        double lat = json.get("lat").asDouble();
        double lon = json.get("lon").asDouble();
        
        doc.setField(latField, lat);
        doc.setField(lonField, lon);
      }
      
    } catch (Exception e) {
      throw new StageException("Geocoding failed", e);
    }
    
    return null;
  }
  
  @Override
  public void stop() {
    // Cleanup if needed
  }
}

Conditional Execution

Stages automatically support conditions:
{
  name: "geocode"
  class: "com.example.lucille.stage.GeocodingStage"
  addressField: "address"
  
  # Only process documents matching conditions
  conditions: [
    {
      fields: ["country"]
      values: ["USA", "Canada"]
      operator: "must"
    }
  ]
  conditionPolicy: "all"  # or "any"
}
The shouldProcess(Document doc) method is called automatically.

Error Handling

@Override
public Iterator<Document> processDocument(Document doc) 
    throws StageException {
  
  try {
    // Processing logic
    processField(doc);
  } catch (IllegalArgumentException e) {
    // Log and skip document
    log.warn("Invalid data in doc {}: {}", doc.getId(), e.getMessage());
    return null;
  } catch (IOException e) {
    // Throw StageException for pipeline to handle
    throw new StageException("Failed to process document", e);
  }
  
  return null;
}
StageException causes the document to be retried or dropped based on your retry configuration.

Testing Custom Stages

import org.junit.Test;
import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory;

public class UpperCaseStageTest {
  
  @Test
  public void testUpperCase() throws Exception {
    // Create config
    String configStr = """
      sourceField: "input"
      destField: "output"
    """;
    Config config = ConfigFactory.parseString(configStr);
    
    // Create stage
    UpperCaseStage stage = new UpperCaseStage(config);
    stage.start();
    
    // Create test document
    Document doc = Document.create("test-1");
    doc.setField("input", "hello world");
    
    // Process
    stage.processDocument(doc);
    
    // Assert
    assertEquals("HELLO WORLD", doc.getString("output"));
    
    stage.stop();
  }
}

Using Custom Stages

After implementing your stage, use it in configuration:
pipelines: [
  {
    name: "pipeline1",
    stages: [
      {
        name: "upperCase"
        class: "com.example.lucille.stage.UpperCaseStage"
        sourceField: "title"
        destField: "title_upper"
      },
      {
        name: "geocode"
        class: "com.example.lucille.stage.GeocodingStage"
        addressField: "address"
        apiKey: ${?GEOCODING_API_KEY}
      }
    ]
  }
]
Ensure your custom stages are on the classpath:
<dependency>
  <groupId>com.example</groupId>
  <artifactId>custom-lucille-stages</artifactId>
  <version>1.0.0</version>
</dependency>

Best Practices

  • Initialize expensive resources in start(), not in the constructor
  • Reuse HTTP clients, database connections, etc.
  • Avoid blocking operations when possible
  • Use batch APIs for external calls
  • Always define a complete SPEC
  • Use ConfigUtils.getOrDefault() for optional params
  • Validate configuration in start()
  • Support environment variable overrides with ${?VAR}
  • Log warnings for skipped documents
  • Throw StageException for fatal errors
  • Include document ID in error messages
  • Consider retry strategies for transient failures
  • Write unit tests for each stage
  • Test with null/missing fields
  • Test conditional execution
  • Use mock objects for external dependencies

Next Steps

Common Patterns

Multivalued Field Processing

List<String> values = doc.getStringList(sourceField);
for (String value : values) {
  String transformed = transform(value);
  doc.addToField(destField, transformed);
}

Field Validation

if (doc.has("email")) {
  String email = doc.getString("email");
  if (!isValidEmail(email)) {
    doc.setField("validation_error", "Invalid email format");
  }
}

Metadata Extraction

if (doc.has("file_path")) {
  String path = doc.getString("file_path");
  doc.setField("file_name", extractFileName(path));
  doc.setField("file_ext", extractExtension(path));
  doc.setField("file_dir", extractDirectory(path));
}