Overview
A Stage is an operation that can be performed on a Document. Stages are the building blocks of Pipelines, with each Stage performing a specific transformation, validation, or enrichment operation.
public abstract class Stage {
public abstract Iterator < Document > processDocument ( Document doc )
throws StageException ;
}
Stages operate on documents in place - they modify the document directly and optionally return child documents that were generated.
Stage Lifecycle
Every Stage follows a well-defined lifecycle managed by the Pipeline:
// 1. Construction and validation
Stage stage = Stage . fromConfig (config);
// 2. Initialization (sets name, registers metrics)
stage . initialize (position, metricsPrefix);
// 3. Start (optional setup)
stage . start ();
// 4. Processing (called for each document)
Iterator < Document > children = stage . processDocument (doc);
// 5. Stop (optional cleanup)
stage . stop ();
Construction and Validation
Stages are constructed from configuration and must declare a Spec:
public class CopyFields extends Stage {
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "source" , "dest" )
. optionalBoolean ( "overwrite" )
. build ();
private final String source ;
private final String dest ;
private final boolean overwrite ;
public CopyFields ( Config config ) {
super (config); // Validates against SPEC
this . source = config . getString ( "source" );
this . dest = config . getString ( "dest" );
this . overwrite = ConfigUtils . getOrDefault (config, "overwrite" , false );
}
}
The Spec automatically validates:
Required properties are present
Property types are correct
Standard stage properties (name, class, conditions, conditionPolicy)
Start and Stop
Stages can perform resource management:
@ Override
public void start () throws StageException {
// Load resources, initialize connections
this . dictionary = loadDictionary (dictionaryPath);
this . client = createHttpClient ();
log . info ( "Stage {} started" , getName ());
}
@ Override
public void stop () throws StageException {
// Release resources, close connections
if (client != null ) {
client . close ();
}
log . info ( "Stage {} stopped" , getName ());
}
Processing Documents
Most stages modify the document and return null:
@ Override
public Iterator < Document > processDocument ( Document doc) throws StageException {
String title = doc . getString ( "title" );
if (title != null ) {
doc . setField ( "title_normalized" , title . toLowerCase (). trim ());
}
return null ; // No children generated
}
Generating Children
Some stages create child documents:
@ Override
public Iterator < Document > processDocument ( Document doc) throws StageException {
String content = doc . getString ( "content" );
List < Document > chunks = new ArrayList <>();
int chunkNum = 0 ;
for ( String chunk : splitIntoChunks (content, chunkSize)) {
Document child = Document . create (
doc . getId () + "_chunk_" + chunkNum ++ ,
doc . getRunId ()
);
child . setField ( "content" , chunk);
child . setField ( "parent_id" , doc . getId ());
child . setField ( "chunk_num" , chunkNum);
chunks . add (child);
}
return chunks . iterator ();
}
Child documents are automatically passed through downstream stages only . They inherit the run ID from the parent but get their own unique IDs.
Dropping Documents
Mark documents for exclusion from indexing:
@ Override
public Iterator < Document > processDocument ( Document doc) throws StageException {
String status = doc . getString ( "status" );
if ( "deleted" . equals (status) || "spam" . equals (status)) {
doc . setDropped ( true );
}
return null ;
}
Conditions
Stages can execute conditionally based on document content:
{
name: "normalize-english-content"
class: "com.kmwllc.lucille.stage.NormalizeText"
fields: ["content"]
# Only process documents with language=English
conditions: [
{
fields: ["language"]
values: ["English", "en"]
operator: "must"
}
]
}
Condition Properties
List of field names to check. If any of these fields exist on the document, the condition evaluates based on the operator and values. fields: ["title", "content"] # Check if title OR content exists
List of values to search for in the specified fields. If not provided, only field existence is checked. fields: ["status"]
values: ["published", "active"] # Field must contain one of these
Either "must" or "must_not". Defaults to "must".
must : Condition passes if criteria are met
must_not : Condition passes if criteria are NOT met
fields: ["status"]
values: ["deleted"]
operator: "must_not" # Skip deleted documents
Multiple Conditions
Combine conditions with conditionPolicy:
{
class: "com.kmwllc.lucille.stage.OpenAIEmbed"
# Both conditions must be true
conditionPolicy: "all" # or "any" (default)
conditions: [
{
fields: ["content"]
operator: "must"
},
{
fields: ["status"]
values: ["draft"]
operator: "must_not"
}
]
}
Policies:
all : All conditions must pass (AND logic)
any : At least one condition must pass (OR logic)
Condition Evaluation
public boolean shouldProcess ( Document doc) {
if ( doc . isDropped ()) {
return false ; // Never process dropped documents
}
return condition . test (doc);
}
The evaluation logic:
Dropped documents are never processed
If no conditions specified, always process
Otherwise, evaluate conditions based on policy
Configuration
Stages are configured within pipeline definitions:
pipelines: [
{
name: "main-pipeline"
stages: [
{
name: "copy-title"
class: "com.kmwllc.lucille.stage.CopyFields"
source: "title"
dest: "title_copy"
},
{
class: "com.kmwllc.lucille.stage.NormalizeText"
fields: ["title", "content"]
lowercase: true
}
]
}
]
Standard Properties
All stages support these properties:
class Required . Fully-qualified Java class name
name Optional . Unique identifier (auto-generated if omitted)
conditions Optional . Array of condition objects
conditionPolicy Optional . “all” or “any” (default: “any”)
Stage-Specific Properties
Each stage defines its own required and optional properties via its Spec:
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "sourceField" , "destField" )
. optionalInt ( "maxLength" )
. optionalBoolean ( "preserveOriginal" )
. optionalList ( "patterns" , new TypeReference < List < String >>(){})
. build ();
Field Manipulation
Copy fields:
doc . setField ( "title_copy" , doc . getString ( "title" ));
Rename fields:
String value = doc . getString ( "old_name" );
doc . removeField ( "old_name" );
doc . setField ( "new_name" , value);
Concatenate fields:
String firstName = doc . getString ( "first_name" );
String lastName = doc . getString ( "last_name" );
doc . setField ( "full_name" , firstName + " " + lastName);
Multi-Valued Fields
Add values:
doc . addToField ( "tags" , "important" );
doc . addToField ( "tags" , "reviewed" );
// Result: tags = ["important", "reviewed"]
Process all values:
List < String > tags = doc . getStringList ( "tags" );
if (tags != null ) {
for ( String tag : tags) {
doc . addToField ( "tags_normalized" , tag . toLowerCase ());
}
}
Text Processing
Normalize text:
String text = doc . getString ( "content" );
if (text != null ) {
text = text . toLowerCase ()
. replaceAll ( " \\ s+" , " " )
. trim ();
doc . setField ( "content_normalized" , text);
}
Extract patterns:
Pattern emailPattern = Pattern . compile ( "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+ \\ .[a-zA-Z]{2,}" );
Matcher matcher = emailPattern . matcher ( doc . getString ( "content" ));
while ( matcher . find ()) {
doc . addToField ( "emails" , matcher . group ());
}
External Enrichment
Database lookup:
String userId = doc . getString ( "user_id" );
ResultSet rs = statement . executeQuery (
"SELECT * FROM users WHERE id = '" + userId + "'"
);
if ( rs . next ()) {
doc . setField ( "user_name" , rs . getString ( "name" ));
doc . setField ( "user_email" , rs . getString ( "email" ));
}
HTTP API call:
String productId = doc . getString ( "product_id" );
HttpResponse < String > response = httpClient . send (
HttpRequest . newBuilder ()
. uri ( URI . create (apiUrl + "/products/" + productId))
. build (),
HttpResponse . BodyHandlers . ofString ()
);
JsonNode data = objectMapper . readTree ( response . body ());
doc . setField ( "product_name" , data . get ( "name" ). asText ());
Error Handling
Throwing Exceptions
When a stage throws StageException, the document is marked as failed:
@ Override
public Iterator < Document > processDocument ( Document doc) throws StageException {
String required = doc . getString ( "required_field" );
if (required == null ) {
throw new StageException ( "Missing required_field" );
}
try {
performTransformation (doc);
} catch ( IOException e ) {
throw new StageException ( "Transformation failed: " + e . getMessage (), e);
}
return null ;
}
Graceful Degradation
Often better to handle errors without failing the document:
@ Override
public Iterator < Document > processDocument ( Document doc) throws StageException {
try {
String result = callExternalApi ( doc . getString ( "id" ));
doc . setField ( "enriched_data" , result);
} catch ( Exception e ) {
log . warn ( "Failed to enrich doc {}: {}" , doc . getId (), e . getMessage ());
doc . setField ( "enrichment_error" , e . getMessage ());
// Continue processing - don't throw
}
return null ;
}
Using Conditions
Prevent errors by checking preconditions:
{
class: "com.kmwllc.lucille.stage.ParseDate"
sourceField: "date_string"
destField: "date_parsed"
# Only process if source field exists
conditions: [
{ fields: ["date_string"], operator: "must" }
]
}
Metrics
Stages automatically collect performance metrics:
private Timer timer ;
private Counter errorCounter ;
private Counter childCounter ;
public void initialize ( int position, String metricsPrefix) {
if (name == null ) {
this . name = "stage_" + position;
}
MetricRegistry metrics = SharedMetricRegistries . getOrCreate (METRICS_REG);
String prefix = metricsPrefix + ".stage." + name;
this . timer = metrics . timer (prefix + ".processDocumentTime" );
this . errorCounter = metrics . counter (prefix + ".errors" );
this . childCounter = metrics . counter (prefix + ".children" );
}
Logged metrics:
Stage normalize-text metrics.
Docs processed: 10000.
Mean latency: 2.34 ms/doc.
Children: 0.
Errors: 12.
Built-in Stages
Lucille provides 80+ built-in stages. Common categories:
Field Operations
CopyFields : Copy field values
RenameFields : Rename fields
DeleteFields : Remove fields
SetStaticValues : Set constant values
Concatenate : Combine multiple fields
Text Processing
NormalizeText : Lowercase, trim, remove whitespace
TrimWhitespace : Remove leading/trailing whitespace
ApplyRegex : Apply regex transformations
ReplacePatterns : Find and replace patterns
RemoveDiacritics : Remove accents and diacritical marks
Data Validation
ValidateFields : Check required fields exist
DropDocument : Mark documents for dropping
Contains : Check if fields contain values
Parsing
ParseDate : Parse date strings
ParseJson : Parse JSON strings
ParseFloats : Parse numeric values
XPathExtractor : Extract from XML/HTML
Enrichment
QueryDatabase : Lookup data from SQL database
ElasticsearchLookup : Lookup from Elasticsearch
DictionaryLookup : Lookup from in-memory dictionary
Content Processing
FetchFileContent : Fetch file contents
ApplyFileHandlers : Extract text from files
ChunkText : Split text into chunks
DetectLanguage : Detect text language
AI/ML
OpenAIEmbed : Generate embeddings via OpenAI
PromptOllama : Generate text via Ollama
EmbeddedPython : Execute Python code
Creating Custom Stages
Extend the Stage class to create custom transformations:
public class CustomTransform extends Stage {
public static final Spec SPEC = SpecBuilder . stage ()
. requiredString ( "inputField" , "outputField" )
. optionalInt ( "threshold" )
. build ();
private final String inputField ;
private final String outputField ;
private final int threshold ;
public CustomTransform ( Config config ) {
super (config);
this . inputField = config . getString ( "inputField" );
this . outputField = config . getString ( "outputField" );
this . threshold = ConfigUtils . getOrDefault (config, "threshold" , 100 );
}
@ Override
public Iterator < Document > processDocument ( Document doc ) throws StageException {
String input = doc . getString (inputField);
if (input != null ) {
String output = performTransformation (input);
doc . setField (outputField, output);
}
return null ;
}
private String performTransformation ( String input ) {
// Your transformation logic here
return input . toUpperCase ();
}
}
Best Practices
Single Responsibility : Each stage should do one thing well
Null Safety : Always check for null before accessing field values
Meaningful Names : Use descriptive stage names for debugging
Condition Usage : Use conditions instead of if-statements when possible
Error Handling : Decide between failing fast vs. graceful degradation
Resource Management : Close connections and release resources in stop()
Immutable State : Avoid modifying shared state between documents
Document Spec : Declare all configuration properties in the Spec
Test Thoroughly : Test with various document states and edge cases
Log Appropriately : Log at appropriate levels (debug, info, warn, error)
Next Steps
Built-in Stages Explore all available stages
Documents Understand document structure
Pipelines Learn pipeline orchestration
Custom Stages Build your own stage