Pipelines define sequences of processing stages that transform documents before indexing. Each pipeline applies a list of stages to every incoming document.
Pipeline Structure
Pipelines are defined as a list in the configuration file:
pipelines: [
{
name: "pipeline1"
stages: [
# Stage configurations...
]
},
{
name: "pipeline2"
stages: [
# Stage configurations...
]
}
]
Pipeline Parameters
Name to assign to this pipeline. Referenced by connectors via their pipeline property.
List of stage configurations to apply to each document in sequence
Stage Configuration
Each stage in a pipeline requires at minimum a class property:
stages: [
{
name: "copyFields"
class: "com.kmwllc.lucille.stage.CopyFields"
source: ["input1", "input2"]
dest: ["output1", "output2"]
updateMode: "overwrite"
}
]
Common Stage Parameters
Fully qualified class name of the stage implementation
Optional name for the stage. If omitted, the class name is used. Stage names must be unique within a pipeline.
List of conditions that determine whether the stage executes for a given document
How to combine multiple conditions:
all - All conditions must match (AND)
any - At least one condition must match (OR)
Conditional Execution
Stages can execute conditionally based on document field values:
Condition Structure
conditions: [
{
fields: ["status", "type"]
values: ["active", "published"]
operator: "must"
}
]
Document fields to check for the condition
Values to look for in the specified fields. If omitted, only field existence is checked. Supports strings, numbers, booleans, and null.
Path to a file containing values (one per line). Alternative to inline values. Cannot be used together with values.
Condition operator:
must - At least one field must contain at least one value (OR within fields)
must_not - No fields should contain any of the values
Condition Examples
Field Existence
Value Matching
Exclusion
Multiple Conditions
Values from File
Execute only if document has specific fields: {
class: "com.kmwllc.lucille.stage.CopyFields"
conditions: [
{
fields: ["author", "title"]
operator: "must"
}
]
}
Execute only if fields contain specific values: {
class: "com.kmwllc.lucille.stage.DetectLanguage"
conditions: [
{
fields: ["content_type"]
values: ["text/plain", "text/html"]
operator: "must"
}
]
}
Skip execution if fields contain certain values: {
class: "com.kmwllc.lucille.stage.ParseDate"
conditions: [
{
fields: ["status"]
values: ["deleted", "archived"]
operator: "must_not"
}
]
}
Combine multiple conditions with a policy: {
class: "com.kmwllc.lucille.stage.OpenAIEmbed"
conditionPolicy: "all"
conditions: [
{
fields: ["content"]
operator: "must" # Must have content
},
{
fields: ["embedding"]
operator: "must_not" # Must not already have embedding
},
{
fields: ["language"]
values: ["en", "es"]
operator: "must" # Must be English or Spanish
}
]
}
Load condition values from an external file: {
class: "com.kmwllc.lucille.stage.DeleteFields"
conditions: [
{
fields: ["user_id"]
valuesPath: "/path/to/blocked-users.txt"
operator: "must_not"
}
]
}
Stage Ordering
Stages execute in the order they appear in the pipeline configuration. Order matters:
stages: [
# 1. First, copy raw content to processed field
{
class: "com.kmwllc.lucille.stage.CopyFields"
source: ["content"]
dest: ["content_processed"]
},
# 2. Then remove HTML from the processed field
{
class: "com.kmwllc.lucille.stage.ApplyJSoup"
source: "content_processed"
},
# 3. Finally, detect language on cleaned content
{
class: "com.kmwllc.lucille.stage.DetectLanguage"
source: "content_processed"
dest: "language"
}
]
Common Pipeline Patterns
Text Processing
Field Transformation
Enrichment
Filtering
pipelines: [
{
name: "text-pipeline"
stages: [
# Parse file content
{
class: "com.kmwllc.lucille.stage.FetchFileContent"
},
# Remove HTML tags
{
class: "com.kmwllc.lucille.stage.ApplyJSoup"
source: "file_content"
dest: "text"
},
# Normalize whitespace
{
class: "com.kmwllc.lucille.stage.TrimWhitespace"
fields: ["text"]
},
# Detect language
{
class: "com.kmwllc.lucille.stage.DetectLanguage"
source: "text"
dest: "language"
},
# Create timestamp
{
class: "com.kmwllc.lucille.stage.Timestamp"
dest: "indexed_at"
}
]
}
]
pipelines: [
{
name: "transform-pipeline"
stages: [
# Rename fields
{
class: "com.kmwllc.lucille.stage.RenameFields"
fieldMapping: {
old_name: "new_name"
legacy_id: "id"
}
},
# Set static values
{
class: "com.kmwllc.lucille.stage.SetStaticValues"
fieldMapping: {
source: "migration"
version: "2.0"
}
},
# Parse dates
{
class: "com.kmwllc.lucille.stage.ParseDate"
source: "created_at"
dest: "created_date"
format: "yyyy-MM-dd HH:mm:ss"
},
# Remove empty fields
{
class: "com.kmwllc.lucille.stage.RemoveEmptyFields"
}
]
}
]
pipelines: [
{
name: "enrichment-pipeline"
stages: [
# Look up additional data from database
{
class: "com.kmwllc.lucille.stage.QueryDatabase"
connectionString: ${DB_URL}
query: "SELECT * FROM metadata WHERE id = ?"
queryParams: ["doc_id"]
},
# Generate embeddings for semantic search
{
class: "com.kmwllc.lucille.stage.OpenAIEmbed"
apiKey: ${OPENAI_API_KEY}
source: "content"
dest: "embedding"
},
# Extract entities
{
class: "com.kmwllc.lucille.stage.ExtractEntities"
source: "content"
destPrefix: "entity_"
}
]
}
]
pipelines: [
{
name: "filter-pipeline"
stages: [
# Drop documents that match criteria
{
class: "com.kmwllc.lucille.stage.DropDocument"
conditions: [
{
fields: ["status"]
values: ["deleted", "draft"]
operator: "must"
}
]
},
# Remove sensitive fields
{
class: "com.kmwllc.lucille.stage.DeleteFields"
fields: ["ssn", "credit_card", "password"]
},
# Drop specific values from fields
{
class: "com.kmwllc.lucille.stage.DropValues"
fieldMapping: {
tags: ["internal", "private"]
}
}
]
}
]
Multiple Connectors, One Pipeline
Multiple connectors can feed into the same pipeline:
connectors: [
{
name: "files"
class: "com.kmwllc.lucille.connector.FileConnector"
pipeline: "shared-pipeline" # Same pipeline
paths: ["/data/files"]
},
{
name: "database"
class: "com.kmwllc.lucille.connector.DatabaseConnector"
pipeline: "shared-pipeline" # Same pipeline
query: "SELECT * FROM documents"
}
]
pipelines: [
{
name: "shared-pipeline"
stages: [
# Stages that work for both sources...
]
}
]
Pipeline Validation
Lucille validates pipeline configurations at startup. Common validation errors:
{
class: "com.kmwllc.lucille.stage.InvalidStage" # Class doesn't exist
}
Error : Class not found or not a valid Stage implementation
Missing required parameters
{
class: "com.kmwllc.lucille.stage.CopyFields"
# Missing required 'source' and 'dest' parameters
}
Error : Required configuration parameter missing
Invalid condition operator
conditions: [
{
fields: ["status"]
operator: "should" # Invalid - must be 'must' or 'must_not'
}
]
Error : Unsupported operator value
stages: [
{name: "copy", class: "com.kmwllc.lucille.stage.CopyFields"},
{name: "copy", class: "com.kmwllc.lucille.stage.CopyFields"} # Duplicate
]
Error : Two stages cannot have the same name
Worker Configuration
Control pipeline execution with worker settings:
worker {
# Number of worker threads per pipeline in local mode
threads: 2
# Pipeline to execute (for distributed mode)
pipeline: "pipeline_name"
# Exit if worker hasn't polled in this time
exitOnTimeout: true
maxProcessingSecs: 600 # 10 minutes
# Maximum document retry attempts
maxRetries: 2
# Enable heartbeat logging
enableHeartbeat: true
}
Number of worker threads to start for each pipeline in local mode
Name of the pipeline to execute. Only needed when Worker runs as a separate process in distributed mode.
Exit if worker hasn’t processed a message within maxProcessingSecs
Maximum time to spend processing a message before assuming a problem
Maximum retry attempts across all workers for any document. Requires ZooKeeper.
Generate heartbeat.log for liveness checks. Frequency controlled by log.seconds.
Alternative Pipeline Syntax
You can declare pipelines incrementally using HOCON merging:
# Declare initial pipeline
pipelines: [{name: "pipeline1", stages: [{class: "com.kmwllc.lucille.stage.CopyFields"}]}]
# Add another pipeline by extending the list
pipelines: ${pipelines} [{name: "pipeline2", stages: [{class: "com.kmwllc.lucille.stage.DeleteFields"}]}]
This is useful for organizing pipelines across multiple configuration files.
Next Steps
Browse Stages Explore all available processing stages
Configure Indexers Set up search engine destinations