Overview
An Indexer is a thread that retrieves processed Documents from the end of a Pipeline and sends them in batches to a destination search engine. Indexers handle batching, routing, error handling, and completion tracking to ensure reliable delivery of documents.
public abstract class Indexer implements Runnable {
public abstract boolean validateConnection ();
protected abstract Set < Pair < Document , String >> sendToIndex ( List < Document > documents ) throws Exception ;
public abstract void closeConnection ();
}
Only one Indexer can be defined in a Lucille run. All pipelines feed to the same Indexer instance.
Indexer Architecture
Core Responsibilities
Poll for Documents : Continuously retrieve processed documents from message queues
Batch Documents : Group documents together for efficient bulk operations
Send to Destination : Use search engine APIs to index documents
Track Completion : Send success/failure events back to the Publisher
Handle Errors : Retry failed documents or report failures
Document Flow
Pipeline → IndexerMessenger → Batch → Search Engine
↓ ↓
EventMessenger ← Success/Failure
Batching Strategy
Indexers use batching to optimize network usage and search engine performance:
private final Batch batch ;
while (running) {
Document doc = messenger . pollDocToIndex ();
if (doc == null ) {
// Timeout - flush if batch is old enough
sendToIndex ( batch . flushIfExpired ());
} else {
// Add to batch and flush if full
sendToIndex ( batch . add (doc));
}
}
Batch Configuration
indexer {
batchSize: 100 # Number of documents per batch
batchTimeout: 100 # Milliseconds before flushing partial batch
}
Batch size considerations:
Larger batches : Better throughput, higher latency
Smaller batches : Lower latency, more network overhead
Typical values : 50-500 documents
Batch timeout considerations:
Shorter timeout : Lower latency for final documents
Longer timeout : Better batch fill rates
Typical values : 100-1000 milliseconds
Batching Patterns
Default batching strategy where all documents go to the same destination: Batch batch = new SingleBatch (batchSize, batchTimeout);
batch . add (doc1);
batch . add (doc2);
List < Document > ready = batch . flush ();
// Send all documents to default index/collection
Routes documents to different indices based on a field value: Batch batch = new MultiBatch (batchSize, batchTimeout, "index_field" );
batch . add (doc1); // index_field = "products"
batch . add (doc2); // index_field = "users"
// Maintains separate batches per index
Configure with: indexer {
indexOverrideField: "target_index"
}
Configuration
Indexers have two configuration sections:
Generic Indexer Configuration
Common to all indexer types:
indexer {
type: "solr" # or "opensearch", "elasticsearch", "csv"
# Batching
batchSize: 100
batchTimeout: 100
# Field handling
ignoreFields: ["temp_field", "internal_data"]
idOverrideField: "custom_id"
indexOverrideField: "target_index"
# Deletion support
deletionMarkerField: "is_deleted"
deletionMarkerFieldValue: "true"
deleteByFieldField: "delete_query"
deleteByFieldValue: "query_string"
}
Implementation-Specific Configuration
Each indexer type has its own configuration block:
# Solr configuration
solr {
useCloudClient: true
url: "localhost:9983"
defaultCollection: "main_index"
commitWithin: 10000
}
# OpenSearch configuration
opensearch {
url: "https://localhost:9200"
index: "documents"
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
}
# Elasticsearch configuration
elastic {
url: "http://localhost:9200"
index: "documents"
username: "elastic"
password: "${ELASTIC_PASSWORD}"
}
# CSV configuration
csv {
filePath: "/output/documents.csv"
append: false
}
Generic Indexer Properties
Field Handling
List of field names to exclude from indexing: indexer {
ignoreFields: ["temp_data", "processing_metadata", "internal_id"]
}
Useful for:
Removing temporary fields added during pipeline processing
Excluding sensitive information
Reducing index size
Field containing the ID to use instead of the document’s id field: indexer {
idOverrideField: "custom_doc_id"
}
The document: {
"id" : "doc1" ,
"custom_doc_id" : "PROD-12345" ,
"title" : "Product Title"
}
Will be indexed with ID PROD-12345 instead of doc1.
Field specifying which index/collection to send the document to: indexer {
indexOverrideField: "target_index"
}
Enables dynamic routing: doc . setField ( "target_index" , "products" ); // → products index
doc . setField ( "target_index" , "users" ); // → users index
Not supported by OpenSearch and Elasticsearch indexers.
Deletion Support
Mark documents for deletion based on a field value: indexer {
deletionMarkerField: "is_deleted"
deletionMarkerFieldValue: "true"
}
Documents with is_deleted = "true" will be deleted from the index instead of updated. Both properties must be set together.
Execute delete-by-query operations: indexer {
deleteByFieldField: "delete_query"
deleteByFieldValue: "query_string"
}
Documents with a delete_query field will trigger deletion of all documents matching that query. Both properties must be set together.
Run Loop
The indexer runs continuously, polling for documents:
@ Override
public void run () {
MDC . put (RUNID_FIELD, localRunId != null ? localRunId : "UNKNOWN" );
try {
while (running) {
checkForDoc ();
}
sendToIndexWithAccounting ( batch . flush ()); // Final batch
} finally {
close ();
}
}
private void checkForDoc () {
Document doc = messenger . pollDocToIndex ();
if (doc == null ) {
// Timeout - check if batch expired
sendToIndexWithAccounting ( batch . flushIfExpired ());
} else {
// Add to batch, send if full
sendToIndexWithAccounting ( batch . add (doc));
}
}
Accounting and Events
After sending each batch, the indexer:
Records metrics (timing, document count)
Identifies failed documents
Sends completion events for each document
private void sendToIndexWithAccounting ( List < Document > batchedDocs) {
if ( batchedDocs . isEmpty ()) return ;
stopWatch . start ();
Set < Pair < Document , String >> failedDocs = sendToIndex (batchedDocs);
stopWatch . stop ();
histogram . update ( stopWatch . getNanoTime () / batchedDocs . size ());
meter . mark ( batchedDocs . size ());
// Send FAIL events for failed documents
for ( Pair < Document , String > pair : failedDocs) {
messenger . sendEvent ( pair . getLeft (), "FAILED: " + pair . getRight (), Event . Type . FAIL );
}
// Send FINISH events for successful documents
for ( Document doc : batchedDocs) {
if ( ! failedDocs . contains (doc)) {
messenger . sendEvent (doc, "SUCCEEDED" , Event . Type . FINISH );
}
}
messenger . batchComplete (batchedDocs);
}
Built-in Indexers
Core Indexers
SolrIndexer Index to Apache Solr. Supports both standalone and SolrCloud deployments.
OpenSearchIndexer Index to OpenSearch. Supports bulk operations and routing.
ElasticsearchIndexer Index to Elasticsearch. Compatible with ES 7.x and 8.x.
CSVIndexer Write documents to CSV files. Useful for exports and debugging.
Plugin Indexers
PineconeIndexer Index vector embeddings to Pinecone. Available in lucille-pinecone plugin.
WeaviateIndexer Index to Weaviate vector database. Available in lucille-weaviate plugin.
Connection Validation
Indexers validate connectivity before starting:
Indexer indexer = IndexerFactory . fromConfig (config, messenger, bypass, metricsPrefix);
if ( ! indexer . validateConnection ()) {
log . error ( "Indexer could not connect" );
System . exit ( 1 );
}
Thread indexerThread = new Thread (indexer);
indexerThread . start ();
Validation typically:
Checks network connectivity
Verifies authentication credentials
Confirms index/collection exists
Tests basic write permissions
Error Handling
Document-Level Failures
Some documents may fail while others succeed:
@ Override
protected Set < Pair < Document, String >> sendToIndex ( List < Document > documents) throws Exception {
BulkResponse response = client . bulk ( buildBulkRequest (documents));
Set < Pair < Document , String >> failed = new HashSet <>();
for ( int i = 0 ; i < response . getItems (). length ; i ++ ) {
if ( response . getItems ()[i]. isFailed ()) {
String reason = response . getItems ()[i]. getFailureMessage ();
failed . add ( Pair . of ( documents . get (i), reason));
}
}
return failed;
}
Failed documents:
Get logged with failure reason
Receive FAIL events
Are not retried automatically
Don’t cause the run to fail
Batch-Level Failures
If the entire batch fails (network error, auth failure, etc.):
try {
Set < Pair < Document , String >> failedDocs = sendToIndex (batchedDocs);
// Handle individual failures...
} catch ( Exception e ) {
// Entire batch failed
log . error ( "Error sending documents to index: " + e . getMessage (), e);
for ( Document doc : batchedDocs) {
messenger . sendEvent (doc, "FAILED: " + e . getMessage (), Event . Type . FAIL );
}
}
Batch failures:
Log the exception
Mark all documents as failed
Send FAIL events for all documents
Don’t terminate the indexer (continues with next batch)
Metrics
Indexers collect detailed performance metrics:
MetricRegistry metrics = SharedMetricRegistries . getOrCreate (METRICS_REG);
this . meter = metrics . meter (metricsPrefix + ".indexer.docsIndexed" );
this . histogram = metrics . histogram (metricsPrefix + ".indexer.batchTimeOverSize" );
Metrics collected:
docsIndexed : Total documents successfully indexed
batchTimeOverSize : Average time per document (batch time / batch size)
docsIndexed rate : Documents per second (1-minute, 5-minute, 15-minute rates)
Example output:
10000 docs indexed.
One minute rate: 234.56 docs/sec.
Mean backend latency: 8.32 ms/doc.
Bypass Mode
In test mode, the indexer can bypass the actual search engine:
Indexer indexer = IndexerFactory . fromConfig (
config,
messenger,
bypass = true , // Skip actual indexing
metricsPrefix
);
Bypass mode:
Skips sendToIndex() call
Still sends success events
Useful for testing pipeline logic
Validates configuration without search engine
Creating Custom Indexers
Extend Indexer to create custom destinations:
public class CustomIndexer extends Indexer {
public static final Spec SPEC = SpecBuilder . indexer ()
. requiredString ( "apiUrl" , "apiKey" )
. optionalInt ( "timeout" )
. build ();
private final String apiUrl ;
private final HttpClient client ;
public CustomIndexer ( Config config , IndexerMessenger messenger ,
boolean bypass , String metricsPrefix , String localRunId ) {
super (config, messenger, bypass, metricsPrefix, localRunId);
this . apiUrl = config . getString ( "custom.apiUrl" );
this . client = HttpClient . newHttpClient ();
}
@ Override
protected String getIndexerConfigKey () {
return "custom" ;
}
@ Override
public boolean validateConnection () {
try {
HttpResponse < String > response = client . send (
HttpRequest . newBuilder ( URI . create (apiUrl + "/health" )). build (),
HttpResponse . BodyHandlers . ofString ()
);
return response . statusCode () == 200 ;
} catch ( Exception e ) {
return false ;
}
}
@ Override
protected Set < Pair < Document , String >> sendToIndex ( List < Document > documents ) throws Exception {
Set < Pair < Document , String >> failed = new HashSet <>();
for ( Document doc : documents) {
try {
String json = objectMapper . writeValueAsString ( getIndexerDoc (doc));
HttpResponse < String > response = client . send (
HttpRequest . newBuilder ()
. uri ( URI . create (apiUrl + "/documents" ))
. POST ( HttpRequest . BodyPublishers . ofString (json))
. build (),
HttpResponse . BodyHandlers . ofString ()
);
if ( response . statusCode () != 200 ) {
failed . add ( Pair . of (doc, "HTTP " + response . statusCode ()));
}
} catch ( Exception e ) {
failed . add ( Pair . of (doc, e . getMessage ()));
}
}
return failed;
}
@ Override
public void closeConnection () {
// Cleanup
}
}
Best Practices
Optimize Batch Size : Test different sizes to find optimal throughput
Handle Timeouts : Set appropriate batch timeout to avoid hanging
Validate Early : Check connection before processing documents
Log Failures : Provide detailed error messages for debugging
Monitor Metrics : Track indexing rate and latency
Clean Up Resources : Always close connections in closeConnection()
Test Bypass Mode : Verify configuration without search engine
Document Config : Use Spec to validate all properties
Next Steps
Built-in Indexers Explore available indexer implementations
Deployment Deploy indexers in distributed mode
Documents Understand document structure
Architecture Review system architecture