Skip to main content
Pipelines define sequences of processing stages that transform documents before indexing. Each pipeline applies a list of stages to every incoming document.

Pipeline Structure

Pipelines are defined as a list in the configuration file:
pipelines: [
  {
    name: "pipeline1"
    stages: [
      # Stage configurations...
    ]
  },
  {
    name: "pipeline2"
    stages: [
      # Stage configurations...
    ]
  }
]

Pipeline Parameters

name
string
required
Name to assign to this pipeline. Referenced by connectors via their pipeline property.
stages
list<object>
required
List of stage configurations to apply to each document in sequence

Stage Configuration

Each stage in a pipeline requires at minimum a class property:
stages: [
  {
    name: "copyFields"
    class: "com.kmwllc.lucille.stage.CopyFields"
    source: ["input1", "input2"]
    dest: ["output1", "output2"]
    updateMode: "overwrite"
  }
]

Common Stage Parameters

class
string
required
Fully qualified class name of the stage implementation
name
string
Optional name for the stage. If omitted, the class name is used. Stage names must be unique within a pipeline.
conditions
list<object>
List of conditions that determine whether the stage executes for a given document
conditionPolicy
string
default:"all"
How to combine multiple conditions:
  • all - All conditions must match (AND)
  • any - At least one condition must match (OR)

Conditional Execution

Stages can execute conditionally based on document field values:

Condition Structure

conditions: [
  {
    fields: ["status", "type"]
    values: ["active", "published"]
    operator: "must"
  }
]
conditions[].fields
list<string>
required
Document fields to check for the condition
conditions[].values
list<mixed>
Values to look for in the specified fields. If omitted, only field existence is checked.Supports strings, numbers, booleans, and null.
conditions[].valuesPath
string
Path to a file containing values (one per line). Alternative to inline values.Cannot be used together with values.
conditions[].operator
string
default:"must"
Condition operator:
  • must - At least one field must contain at least one value (OR within fields)
  • must_not - No fields should contain any of the values

Condition Examples

Execute only if document has specific fields:
{
  class: "com.kmwllc.lucille.stage.CopyFields"
  conditions: [
    {
      fields: ["author", "title"]
      operator: "must"
    }
  ]
}

Stage Ordering

Stages execute in the order they appear in the pipeline configuration. Order matters:
stages: [
  # 1. First, copy raw content to processed field
  {
    class: "com.kmwllc.lucille.stage.CopyFields"
    source: ["content"]
    dest: ["content_processed"]
  },
  
  # 2. Then remove HTML from the processed field
  {
    class: "com.kmwllc.lucille.stage.ApplyJSoup"
    source: "content_processed"
  },
  
  # 3. Finally, detect language on cleaned content
  {
    class: "com.kmwllc.lucille.stage.DetectLanguage"
    source: "content_processed"
    dest: "language"
  }
]

Common Pipeline Patterns

pipelines: [
  {
    name: "text-pipeline"
    stages: [
      # Parse file content
      {
        class: "com.kmwllc.lucille.stage.FetchFileContent"
      },
      # Remove HTML tags
      {
        class: "com.kmwllc.lucille.stage.ApplyJSoup"
        source: "file_content"
        dest: "text"
      },
      # Normalize whitespace
      {
        class: "com.kmwllc.lucille.stage.TrimWhitespace"
        fields: ["text"]
      },
      # Detect language
      {
        class: "com.kmwllc.lucille.stage.DetectLanguage"
        source: "text"
        dest: "language"
      },
      # Create timestamp
      {
        class: "com.kmwllc.lucille.stage.Timestamp"
        dest: "indexed_at"
      }
    ]
  }
]

Multiple Connectors, One Pipeline

Multiple connectors can feed into the same pipeline:
connectors: [
  {
    name: "files"
    class: "com.kmwllc.lucille.connector.FileConnector"
    pipeline: "shared-pipeline"  # Same pipeline
    paths: ["/data/files"]
  },
  {
    name: "database"
    class: "com.kmwllc.lucille.connector.DatabaseConnector"
    pipeline: "shared-pipeline"  # Same pipeline
    query: "SELECT * FROM documents"
  }
]

pipelines: [
  {
    name: "shared-pipeline"
    stages: [
      # Stages that work for both sources...
    ]
  }
]

Pipeline Validation

Lucille validates pipeline configurations at startup. Common validation errors:
{
  class: "com.kmwllc.lucille.stage.InvalidStage"  # Class doesn't exist
}
Error: Class not found or not a valid Stage implementation
{
  class: "com.kmwllc.lucille.stage.CopyFields"
  # Missing required 'source' and 'dest' parameters
}
Error: Required configuration parameter missing
conditions: [
  {
    fields: ["status"]
    operator: "should"  # Invalid - must be 'must' or 'must_not'
  }
]
Error: Unsupported operator value
stages: [
  {name: "copy", class: "com.kmwllc.lucille.stage.CopyFields"},
  {name: "copy", class: "com.kmwllc.lucille.stage.CopyFields"}  # Duplicate
]
Error: Two stages cannot have the same name

Worker Configuration

Control pipeline execution with worker settings:
worker {
  # Number of worker threads per pipeline in local mode
  threads: 2
  
  # Pipeline to execute (for distributed mode)
  pipeline: "pipeline_name"
  
  # Exit if worker hasn't polled in this time
  exitOnTimeout: true
  maxProcessingSecs: 600  # 10 minutes
  
  # Maximum document retry attempts
  maxRetries: 2
  
  # Enable heartbeat logging
  enableHeartbeat: true
}
worker.threads
number
default:"2"
Number of worker threads to start for each pipeline in local mode
worker.pipeline
string
Name of the pipeline to execute. Only needed when Worker runs as a separate process in distributed mode.
worker.exitOnTimeout
boolean
default:"false"
Exit if worker hasn’t processed a message within maxProcessingSecs
worker.maxProcessingSecs
number
default:"600"
Maximum time to spend processing a message before assuming a problem
worker.maxRetries
number
Maximum retry attempts across all workers for any document. Requires ZooKeeper.
worker.enableHeartbeat
boolean
default:"false"
Generate heartbeat.log for liveness checks. Frequency controlled by log.seconds.

Alternative Pipeline Syntax

You can declare pipelines incrementally using HOCON merging:
# Declare initial pipeline
pipelines: [{name: "pipeline1", stages: [{class: "com.kmwllc.lucille.stage.CopyFields"}]}]

# Add another pipeline by extending the list
pipelines: ${pipelines} [{name: "pipeline2", stages: [{class: "com.kmwllc.lucille.stage.DeleteFields"}]}]
This is useful for organizing pipelines across multiple configuration files.

Next Steps

Browse Stages

Explore all available processing stages

Configure Indexers

Set up search engine destinations