This guide teaches you how to write custom stages to implement your own document processing logic in Lucille.
Overview
Custom stages allow you to:
Implement domain-specific transformations
Integrate with external APIs and services
Add custom validation and enrichment logic
Extend Lucille with proprietary algorithms
Stage Basics
All stages in Lucille extend the Stage abstract class and implement the processDocument method.
Minimal Stage Example
package com.example.lucille.stage;
import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Stage;
import com.kmwllc.lucille.core.StageException;
import com.kmwllc.lucille.core.spec.Spec;
import com.kmwllc.lucille.core.spec.SpecBuilder;
import com.typesafe.config.Config;
import java.util.Iterator;
public class UpperCaseStage extends Stage {
// Every stage MUST declare a public static SPEC
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "sourceField" , "destField" )
. build ();
private final String sourceField ;
private final String destField ;
public UpperCaseStage ( Config config ) {
super (config); // Always call super(config)
// Read configuration parameters
this . sourceField = config . getString ( "sourceField" );
this . destField = config . getString ( "destField" );
}
@ Override
public Iterator < Document > processDocument ( Document doc )
throws StageException {
// Check if source field exists
if ( ! doc . has (sourceField)) {
return null ; // Return null if no processing needed
}
// Get value, transform, and set
String value = doc . getString (sourceField);
doc . setField (destField, value . toUpperCase ());
return null ; // Return null for no child documents
}
}
Configuration via SPEC
The SPEC defines valid configuration parameters and types:
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "sourceField" , "destField" ) // Required params
. optionalString ( "format" ) // Optional params
. optionalNumber ( "maxLength" ) // Numbers
. optionalBoolean ( "caseSensitive" ) // Booleans
. build ();
All stages automatically have these properties : name, class, conditions, and conditionPolicy.
Configuration Types
String Parameters
Number Parameters
Boolean Parameters
Complex Types
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "field" ) // Single required string
. optionalString ( "format" ) // Single optional string
. build ();
// Usage
String field = config . getString ( "field" );
String format = config . hasPath ( "format" )
? config . getString ( "format" )
: "default" ;
Stage Lifecycle Methods
start()
Called once when the stage is initialized, before processing any documents:
@ Override
public void start () throws StageException {
// Initialize connections
// Load models or resources
// Validate configuration
if (inputFile != null ) {
try {
data = Files . readAllLines ( Path . of (inputFile));
} catch ( IOException e ) {
throw new StageException ( "Failed to load file" , e);
}
}
}
stop()
Called once when the stage is shut down:
@ Override
public void stop () throws StageException {
// Close connections
// Free resources
// Clean up
if (connection != null ) {
connection . close ();
}
}
Working with Documents
Reading Fields
// Check if field exists
if ( doc . has ( "fieldName" )) {
// Get single value
String value = doc . getString ( "fieldName" );
// Get all values (fields can be multivalued)
List < String > values = doc . getStringList ( "fieldName" );
// Check for non-null value
if ( doc . hasNonNull ( "fieldName" )) {
// Process...
}
}
// Get typed values
Integer num = doc . getInt ( "count" );
Boolean flag = doc . getBoolean ( "active" );
Writing Fields
// Set field (replaces existing values)
doc . setField ( "newField" , "value" );
// Add value (appends to existing)
doc . addToField ( "multiField" , "value1" );
doc . addToField ( "multiField" , "value2" );
// Set or add (convenience method)
doc . setOrAdd ( "field" , "value" );
// Remove field
doc . removeField ( "obsoleteField" );
// Update with mode
import com.kmwllc.lucille.core.UpdateMode;
doc . update ( "field" , UpdateMode . OVERWRITE , "value" );
doc . update ( "field" , UpdateMode . APPEND , "value" );
doc . update ( "field" , UpdateMode . SKIP , "value" );
Nested JSON Fields
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
// Read nested JSON
List < Document . Segment > path = Document . Segment . parse ( "user.profile.name" );
JsonNode value = doc . getNestedJson (path);
// Write nested JSON
ObjectMapper mapper = new ObjectMapper ();
JsonNode node = mapper . createObjectNode ()
. put ( "name" , "John" )
. put ( "age" , 30 );
List < Document . Segment > destPath = Document . Segment . parse ( "user.info" );
doc . setNestedJson (destPath, node);
Creating Child Documents
Stages can generate child documents:
@ Override
public Iterator < Document > processDocument ( Document doc)
throws StageException {
List < Document > children = new ArrayList <>();
// Create child documents
for ( int i = 0 ; i < 5 ; i ++ ) {
String childId = doc . getId () + "-child-" + i;
Document child = Document . create (childId);
child . setField ( "parent_id" , doc . getId ());
child . setField ( "index" , i);
child . setField ( "data" , "Child " + i);
children . add (child);
}
// Return iterator over children
return children . iterator ();
}
Child documents are processed by subsequent stages in the pipeline. Return null if no children are generated.
Example: CopyFields Stage
Real implementation from Lucille core:
package com.kmwllc.lucille.stage;
import com.fasterxml.jackson.core.type.TypeReference;
import com.kmwllc.lucille.core.ConfigUtils;
import com.kmwllc.lucille.core.spec.Spec;
import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Stage;
import com.kmwllc.lucille.core.StageException;
import com.kmwllc.lucille.core.UpdateMode;
import com.kmwllc.lucille.core.spec.SpecBuilder;
import com.typesafe.config.Config;
import java.util. * ;
public class CopyFields extends Stage {
public static final Spec SPEC = SpecBuilder . stage ()
. requiredParent ( "fieldMapping" , new TypeReference < Map < String , Object >>() {})
. optionalString ( "updateMode" )
. build ();
private final Map < String , Object > fieldMapping ;
private final UpdateMode updateMode ;
private List < Pair < String , String >> fieldPairs = new ArrayList <>();
public CopyFields ( Config config ) {
super (config);
this . fieldMapping = config . getConfig ( "fieldMapping" ). root (). unwrapped ();
this . updateMode = UpdateMode . fromConfig (config);
}
@ Override
public void start () throws StageException {
if ( fieldMapping . size () == 0 ) {
throw new StageException ( "fieldMapping must have at least one pair" );
}
// Build field pairs at start time
for ( Entry < String , Object > entry : fieldMapping . entrySet ()) {
if ( entry . getValue () instanceof String) {
fieldPairs . add ( Pair . of ( entry . getKey (), (String) entry . getValue ()));
} else if ( entry . getValue () instanceof List) {
for ( String val : ( List < String > ) entry . getValue ()) {
fieldPairs . add ( Pair . of ( entry . getKey (), val));
}
}
}
}
@ Override
public Iterator < Document > processDocument ( Document doc ) {
for ( Pair < String , String > fieldPair : fieldPairs) {
if ( ! doc . has ( fieldPair . getKey ())) {
continue ;
}
doc . update ( fieldPair . getValue (), updateMode,
doc . getStringList ( fieldPair . getKey ()). toArray ( new String [ 0 ]));
}
return null ;
}
}
Example: API Integration Stage
Call external APIs to enrich documents:
package com.example.lucille.stage;
import com.kmwllc.lucille.core. * ;
import com.kmwllc.lucille.core.spec. * ;
import com.typesafe.config.Config;
import java.net.http. * ;
import java.net.URI;
import java.util.Iterator;
public class GeocodingStage extends Stage {
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "addressField" )
. optionalString ( "latField" , "lonField" , "apiKey" )
. build ();
private final String addressField ;
private final String latField ;
private final String lonField ;
private final String apiKey ;
private HttpClient httpClient ;
public GeocodingStage ( Config config ) {
super (config);
this . addressField = config . getString ( "addressField" );
this . latField = ConfigUtils . getOrDefault (config, "latField" , "latitude" );
this . lonField = ConfigUtils . getOrDefault (config, "lonField" , "longitude" );
this . apiKey = ConfigUtils . getOrDefault (config, "apiKey" , null );
}
@ Override
public void start () throws StageException {
this . httpClient = HttpClient . newHttpClient ();
}
@ Override
public Iterator < Document > processDocument ( Document doc )
throws StageException {
if ( ! doc . has (addressField)) {
return null ;
}
String address = doc . getString (addressField);
try {
// Build API request
String url = String . format (
"https://api.geocoding.com/geocode?address=%s&key=%s" ,
URLEncoder . encode (address, StandardCharsets . UTF_8 ),
apiKey
);
HttpRequest request = HttpRequest . newBuilder ()
. uri ( URI . create (url))
. GET ()
. build ();
// Make request
HttpResponse < String > response = httpClient . send (
request,
HttpResponse . BodyHandlers . ofString ()
);
// Parse response (simplified)
if ( response . statusCode () == 200 ) {
JsonNode json = new ObjectMapper (). readTree ( response . body ());
double lat = json . get ( "lat" ). asDouble ();
double lon = json . get ( "lon" ). asDouble ();
doc . setField (latField, lat);
doc . setField (lonField, lon);
}
} catch ( Exception e ) {
throw new StageException ( "Geocoding failed" , e);
}
return null ;
}
@ Override
public void stop () {
// Cleanup if needed
}
}
Conditional Execution
Stages automatically support conditions:
{
name: "geocode"
class: "com.example.lucille.stage.GeocodingStage"
addressField: "address"
# Only process documents matching conditions
conditions: [
{
fields: ["country"]
values: ["USA", "Canada"]
operator: "must"
}
]
conditionPolicy: "all" # or "any"
}
The shouldProcess(Document doc) method is called automatically.
Error Handling
@ Override
public Iterator < Document > processDocument ( Document doc)
throws StageException {
try {
// Processing logic
processField (doc);
} catch ( IllegalArgumentException e ) {
// Log and skip document
log . warn ( "Invalid data in doc {}: {}" , doc . getId (), e . getMessage ());
return null ;
} catch ( IOException e ) {
// Throw StageException for pipeline to handle
throw new StageException ( "Failed to process document" , e);
}
return null ;
}
StageException causes the document to be retried or dropped based on your retry configuration.
Testing Custom Stages
import org.junit.Test;
import static org.junit.Assert. * ;
import com.typesafe.config.ConfigFactory;
public class UpperCaseStageTest {
@ Test
public void testUpperCase () throws Exception {
// Create config
String configStr = """
sourceField: "input"
destField: "output"
""" ;
Config config = ConfigFactory . parseString (configStr);
// Create stage
UpperCaseStage stage = new UpperCaseStage (config);
stage . start ();
// Create test document
Document doc = Document . create ( "test-1" );
doc . setField ( "input" , "hello world" );
// Process
stage . processDocument (doc);
// Assert
assertEquals ( "HELLO WORLD" , doc . getString ( "output" ));
stage . stop ();
}
}
Using Custom Stages
After implementing your stage, use it in configuration:
pipelines: [
{
name: "pipeline1",
stages: [
{
name: "upperCase"
class: "com.example.lucille.stage.UpperCaseStage"
sourceField: "title"
destField: "title_upper"
},
{
name: "geocode"
class: "com.example.lucille.stage.GeocodingStage"
addressField: "address"
apiKey: ${?GEOCODING_API_KEY}
}
]
}
]
Ensure your custom stages are on the classpath:
< dependency >
< groupId > com.example </ groupId >
< artifactId > custom-lucille-stages </ artifactId >
< version > 1.0.0 </ version >
</ dependency >
Best Practices
Always define a complete SPEC
Use ConfigUtils.getOrDefault() for optional params
Validate configuration in start()
Support environment variable overrides with ${?VAR}
Log warnings for skipped documents
Throw StageException for fatal errors
Include document ID in error messages
Consider retry strategies for transient failures
Write unit tests for each stage
Test with null/missing fields
Test conditional execution
Use mock objects for external dependencies
Next Steps
Common Patterns
Multivalued Field Processing
List < String > values = doc . getStringList (sourceField);
for ( String value : values) {
String transformed = transform (value);
doc . addToField (destField, transformed);
}
Field Validation
if ( doc . has ( "email" )) {
String email = doc . getString ( "email" );
if ( ! isValidEmail (email)) {
doc . setField ( "validation_error" , "Invalid email format" );
}
}
if ( doc . has ( "file_path" )) {
String path = doc . getString ( "file_path" );
doc . setField ( "file_name" , extractFileName (path));
doc . setField ( "file_ext" , extractExtension (path));
doc . setField ( "file_dir" , extractDirectory (path));
}