Overview
A Pipeline is a sequence of processing Stages that documents flow through for transformation and enrichment. Pipelines orchestrate the application of Stages to incoming documents, handling the flow of both parent documents and any children they generate.
public class Pipeline {
private final ArrayList < Stage > stages = new ArrayList <>();
public Iterator < Document > processDocument ( Document document ) throws StageException {
Iterator < Document > result = document . iterator ();
for ( Stage stage : stages) {
result = stage . apply (result);
}
return result;
}
}
Pipeline Architecture
Sequential Stage Processing
Documents flow through stages in the order they are defined. Each stage receives the output from the previous stage:
Document → Stage 1 → Stage 2 → Stage 3 → ... → Stage N → Indexer
↓ ↓ ↓ ↓
children children children children
Child documents generated by a stage are passed through downstream stages only , not through earlier stages. This ensures children inherit transformations that occur after their creation.
Example Flow
Consider a pipeline with stages S1, S2, S3, S4:
Parent Doc (id=1)
→ S1 (validates)
→ S2 (generates child with id=2)
→ S3 (processes both)
→ S4 (processes both)
Result:
- Doc 1: processed by S1, S2, S3, S4
- Doc 2: processed by S3, S4 only
Here’s the implementation:
// Stage S2 generates a child
public Iterator < Document > processDocument ( Document doc) {
Document child = Document . create ( "2" , doc . getRunId ());
child . setField ( "parent_id" , doc . getId ());
return Collections . singletonList (child). iterator ();
}
// Pipeline automatically ensures child flows through S3, S4
Configuration
Pipelines are configured in the pipelines array:
pipelines: [
{
name: "main-pipeline"
stages: [
{
name: "validate-required-fields"
class: "com.kmwllc.lucille.stage.ValidateFields"
requiredFields: ["id", "title", "content"]
},
{
name: "detect-language"
class: "com.kmwllc.lucille.stage.DetectLanguage"
sourceField: "content"
destField: "language"
},
{
name: "normalize-text"
class: "com.kmwllc.lucille.stage.NormalizeText"
fields: ["title", "content"]
lowercase: true
removeWhitespace: true
},
{
name: "generate-teaser"
class: "com.kmwllc.lucille.stage.CreateStaticTeaser"
sourceField: "content"
destField: "teaser"
maxLength: 200
}
]
},
{
name: "file-pipeline"
stages: [
{
class: "com.kmwllc.lucille.stage.FetchFileContent"
},
{
class: "com.kmwllc.lucille.stage.ApplyFileHandlers"
}
]
}
]
Pipeline Properties
name Required . Unique identifier for this pipeline. Connectors reference pipelines by name.
stages Required . Array of stage configurations defining the transformation sequence.
Document Flow Patterns
Linear Flow
The simplest pattern where each document flows through all stages:
Input: Doc A
S1: Transform A → A'
S2: Transform A' → A''
S3: Transform A'' → A'''
Output: A'''
Child Generation Flow
Stages can generate child documents that flow through downstream stages:
Input: Doc A
S1: Transform A → A'
S2: Split A' → [A'', Child B, Child C]
S3: Transform all → [A''', B', C']
S4: Transform all → [A'''', B'', C'']
Output: A'''', B'', C''
Example: Chunking a large document
// Stage that splits document into chunks
public Iterator < Document > processDocument ( Document doc) {
String content = doc . getString ( "content" );
List < Document > chunks = new ArrayList <>();
for ( String chunk : splitIntoChunks (content)) {
Document child = Document . create (
doc . getId () + "_chunk_" + chunks . size (),
doc . getRunId ()
);
child . setField ( "content" , chunk);
child . setField ( "parent_id" , doc . getId ());
chunks . add (child);
}
return chunks . iterator ();
}
Document Dropping
Documents can be marked as dropped to prevent indexing:
// Stage that drops documents based on criteria
public Iterator < Document > processDocument ( Document doc) {
if ( doc . getString ( "status" ). equals ( "deleted" )) {
doc . setDropped ( true );
}
return null ; // No children
}
Dropped documents still flow through remaining stages but are filtered out before indexing. They can be conditionally processed by later stages.
Parallelism
Worker-Level Parallelism
Multiple workers can process different documents through the same pipeline simultaneously:
worker {
threads: 4 // Local mode
}
Distributed mode automatically enables parallelism through multiple Worker processes:
# Terminal 1
java -cp lucille.jar Worker pipeline1
# Terminal 2
java -cp lucille.jar Worker pipeline1
# Terminal 3
java -cp lucille.jar Worker pipeline1
Stage Execution
Within a single worker, stages execute sequentially for each document:
Worker 1: Doc A → S1 → S2 → S3 → S4
Worker 2: Doc B → S1 → S2 → S3 → S4
Worker 3: Doc C → S1 → S2 → S3 → S4
This ensures:
Predictable transformation order
Safe state management within stages
Simpler debugging and testing
If you need stage-level parallelism, split your pipeline into multiple pipelines with separate connectors and use the Multi-Pipeline pattern .
Stage Lifecycle
Stages have a lifecycle managed by the pipeline:
// Pipeline initialization
pipeline . addStage (stage, metricsPrefix);
// Called once when pipeline starts
stage . start ();
// Called for each document
Iterator < Document > results = stage . apply (document);
// Called once when pipeline stops
stage . stop ();
Initialization
public void addStage ( Stage stage, String metricsPrefix)
throws PipelineException, StageException {
stage . initialize ( stages . size () + 1 , metricsPrefix);
if ( stages . stream (). anyMatch (s -> stage . getName (). equals ( s . getName ()))) {
throw new PipelineException (
"Two stages cannot have the same name: " + stage . getName ()
);
}
stages . add (stage);
}
Stage names must be unique within a pipeline to:
Enable clear metrics collection
Support targeted debugging
Avoid configuration ambiguity
Start and Stop
Stages can perform setup and cleanup:
@ Override
public void start () throws StageException {
// Load resources, open connections
this . dictionary = loadDictionary (dictionaryPath);
this . httpClient = HttpClient . newHttpClient ();
}
@ Override
public void stop () throws StageException {
// Release resources, close connections
if (httpClient != null ) {
httpClient . close ();
}
}
Pipeline Patterns
Validation Pipeline
Validate and filter documents early:
stages: [
{ class: "com.kmwllc.lucille.stage.ValidateFields"
requiredFields: ["id", "title"] }
{ class: "com.kmwllc.lucille.stage.DropDocument"
conditions: [{fields: ["status"], values: ["deleted"]}] }
# ... transformation stages
]
Enrichment Pipeline
Add information from external sources:
stages: [
{ class: "com.kmwllc.lucille.stage.QueryDatabase"
query: "SELECT * FROM users WHERE id = ?"
queryParam: "user_id" }
{ class: "com.kmwllc.lucille.stage.ElasticsearchLookup"
index: "products"
queryField: "product_id" }
# ... additional enrichment
]
Chunking Pipeline
Split large documents into smaller pieces:
stages: [
{ class: "com.kmwllc.lucille.stage.FetchFileContent" }
{ class: "com.kmwllc.lucille.stage.ChunkText"
sourceField: "content"
chunkSize: 512
overlap: 50 }
{ class: "com.kmwllc.lucille.stage.OpenAIEmbed"
sourceField: "content"
destField: "embedding" }
]
Multi-Pipeline Pattern
Use multiple pipelines for different document types:
connectors: [
{ name: "docs", pipeline: "document-pipeline" }
{ name: "images", pipeline: "image-pipeline" }
]
pipelines: [
{ name: "document-pipeline"
stages: [ /* text processing */ ] }
{ name: "image-pipeline"
stages: [ /* image processing */ ] }
]
Error Handling
Stage Exceptions
When a stage throws StageException, the document is marked as failed:
@ Override
public Iterator < Document > processDocument ( Document doc)
throws StageException {
try {
performTransformation (doc);
return null ;
} catch ( IOException e ) {
throw new StageException ( "Transformation failed" , e);
}
}
The Worker catches the exception and:
Logs the error
Sends a FAIL event for the document
Continues processing other documents
Conditional Execution
Use conditions to skip stages for certain documents:
{
class: "com.kmwllc.lucille.stage.OpenAIEmbed"
conditions: [
{ fields: ["content"], operator: "must" }
{ fields: ["status"], values: ["draft"], operator: "must_not" }
]
}
See Stages - Conditions for details.
Metrics Collection
Pipelines automatically collect metrics for each stage:
public void initialize ( int position, String metricsPrefix)
throws StageException {
if (name == null ) {
this . name = "stage_" + position;
}
MetricRegistry metrics = SharedMetricRegistries . getOrCreate (METRICS_REG);
this . timer = metrics . timer (metricsPrefix + ".stage." + name + ".processDocumentTime" );
this . errorCounter = metrics . counter (metricsPrefix + ".stage." + name + ".errors" );
this . childCounter = metrics . counter (metricsPrefix + ".stage." + name + ".children" );
}
Available metrics:
processDocumentTime : Time spent processing each document
errors : Number of documents that failed in this stage
children : Number of child documents generated
Viewing metrics:
pipeline . logMetrics ();
// Logs: Stage myStage metrics. Docs processed: 1000.
// Mean latency: 12.34 ms/doc. Children: 50. Errors: 2.
Pipeline Validation
Validate your pipeline configuration before running:
Config config = ConfigFactory . load ( "myconfig.conf" );
Map < String , List < Exception >> errors =
Runner . runInValidationMode (config);
if ( ! errors . isEmpty ()) {
errors . forEach ((name, exceptions) -> {
System . out . println ( "Pipeline: " + name);
exceptions . forEach (e ->
System . out . println ( " - " + e . getMessage ()));
});
}
Validation checks:
All stage classes exist and are loadable
Required stage configuration properties are present
Stage names are unique within each pipeline
Pipeline names are unique
Referenced resources (files, dictionaries) exist
Best Practices
Order Stages Carefully : Place validation and filtering stages early
Use Meaningful Names : Name stages to describe their purpose
Keep Stages Focused : Each stage should do one thing well
Handle Errors Gracefully : Use conditions and dropped flags appropriately
Monitor Metrics : Track stage performance to identify bottlenecks
Test Incrementally : Add and test stages one at a time
Document Dependencies : Note if stages depend on fields from earlier stages
Consider Parallelism : Use multiple workers for throughput
Next Steps
Stages Learn about individual stage transformations
Documents Understand document structure and fields
Built-in Stages Explore available stage implementations
Custom Stages Build your own stage