Skip to main content

Overview

An Indexer is a thread that retrieves processed Documents from the end of a Pipeline and sends them in batches to a destination search engine. Indexers handle batching, routing, error handling, and completion tracking to ensure reliable delivery of documents.
public abstract class Indexer implements Runnable {
    public abstract boolean validateConnection();
    protected abstract Set<Pair<Document, String>> sendToIndex(List<Document> documents) throws Exception;
    public abstract void closeConnection();
}
Only one Indexer can be defined in a Lucille run. All pipelines feed to the same Indexer instance.

Indexer Architecture

Core Responsibilities

  1. Poll for Documents: Continuously retrieve processed documents from message queues
  2. Batch Documents: Group documents together for efficient bulk operations
  3. Send to Destination: Use search engine APIs to index documents
  4. Track Completion: Send success/failure events back to the Publisher
  5. Handle Errors: Retry failed documents or report failures

Document Flow

Pipeline → IndexerMessenger → Batch → Search Engine
              ↓                           ↓
          EventMessenger ← Success/Failure

Batching Strategy

Indexers use batching to optimize network usage and search engine performance:
private final Batch batch;

while (running) {
    Document doc = messenger.pollDocToIndex();
    
    if (doc == null) {
        // Timeout - flush if batch is old enough
        sendToIndex(batch.flushIfExpired());
    } else {
        // Add to batch and flush if full
        sendToIndex(batch.add(doc));
    }
}

Batch Configuration

indexer {
    batchSize: 100      # Number of documents per batch
    batchTimeout: 100   # Milliseconds before flushing partial batch
}
Batch size considerations:
  • Larger batches: Better throughput, higher latency
  • Smaller batches: Lower latency, more network overhead
  • Typical values: 50-500 documents
Batch timeout considerations:
  • Shorter timeout: Lower latency for final documents
  • Longer timeout: Better batch fill rates
  • Typical values: 100-1000 milliseconds

Batching Patterns

Default batching strategy where all documents go to the same destination:
Batch batch = new SingleBatch(batchSize, batchTimeout);
batch.add(doc1);
batch.add(doc2);
List<Document> ready = batch.flush();
// Send all documents to default index/collection
Routes documents to different indices based on a field value:
Batch batch = new MultiBatch(batchSize, batchTimeout, "index_field");
batch.add(doc1);  // index_field = "products"
batch.add(doc2);  // index_field = "users"
// Maintains separate batches per index
Configure with:
indexer {
    indexOverrideField: "target_index"
}

Configuration

Indexers have two configuration sections:

Generic Indexer Configuration

Common to all indexer types:
indexer {
    type: "solr"  # or "opensearch", "elasticsearch", "csv"
    
    # Batching
    batchSize: 100
    batchTimeout: 100
    
    # Field handling
    ignoreFields: ["temp_field", "internal_data"]
    idOverrideField: "custom_id"
    indexOverrideField: "target_index"
    
    # Deletion support
    deletionMarkerField: "is_deleted"
    deletionMarkerFieldValue: "true"
    deleteByFieldField: "delete_query"
    deleteByFieldValue: "query_string"
}

Implementation-Specific Configuration

Each indexer type has its own configuration block:
# Solr configuration
solr {
    useCloudClient: true
    url: "localhost:9983"
    defaultCollection: "main_index"
    commitWithin: 10000
}

# OpenSearch configuration
opensearch {
    url: "https://localhost:9200"
    index: "documents"
    username: "admin"
    password: "${OPENSEARCH_PASSWORD}"
}

# Elasticsearch configuration
elastic {
    url: "http://localhost:9200"
    index: "documents"
    username: "elastic"
    password: "${ELASTIC_PASSWORD}"
}

# CSV configuration
csv {
    filePath: "/output/documents.csv"
    append: false
}

Generic Indexer Properties

Field Handling

List of field names to exclude from indexing:
indexer {
    ignoreFields: ["temp_data", "processing_metadata", "internal_id"]
}
Useful for:
  • Removing temporary fields added during pipeline processing
  • Excluding sensitive information
  • Reducing index size
Field containing the ID to use instead of the document’s id field:
indexer {
    idOverrideField: "custom_doc_id"
}
The document:
{
    "id": "doc1",
    "custom_doc_id": "PROD-12345",
    "title": "Product Title"
}
Will be indexed with ID PROD-12345 instead of doc1.
Field specifying which index/collection to send the document to:
indexer {
    indexOverrideField: "target_index"
}
Enables dynamic routing:
doc.setField("target_index", "products");  // → products index
doc.setField("target_index", "users");     // → users index
Not supported by OpenSearch and Elasticsearch indexers.

Deletion Support

Mark documents for deletion based on a field value:
indexer {
    deletionMarkerField: "is_deleted"
    deletionMarkerFieldValue: "true"
}
Documents with is_deleted = "true" will be deleted from the index instead of updated.Both properties must be set together.
Execute delete-by-query operations:
indexer {
    deleteByFieldField: "delete_query"
    deleteByFieldValue: "query_string"
}
Documents with a delete_query field will trigger deletion of all documents matching that query.Both properties must be set together.

Run Loop

The indexer runs continuously, polling for documents:
@Override
public void run() {
    MDC.put(RUNID_FIELD, localRunId != null ? localRunId : "UNKNOWN");
    
    try {
        while (running) {
            checkForDoc();
        }
        sendToIndexWithAccounting(batch.flush()); // Final batch
    } finally {
        close();
    }
}

private void checkForDoc() {
    Document doc = messenger.pollDocToIndex();
    
    if (doc == null) {
        // Timeout - check if batch expired
        sendToIndexWithAccounting(batch.flushIfExpired());
    } else {
        // Add to batch, send if full
        sendToIndexWithAccounting(batch.add(doc));
    }
}

Accounting and Events

After sending each batch, the indexer:
  1. Records metrics (timing, document count)
  2. Identifies failed documents
  3. Sends completion events for each document
private void sendToIndexWithAccounting(List<Document> batchedDocs) {
    if (batchedDocs.isEmpty()) return;
    
    stopWatch.start();
    Set<Pair<Document, String>> failedDocs = sendToIndex(batchedDocs);
    stopWatch.stop();
    
    histogram.update(stopWatch.getNanoTime() / batchedDocs.size());
    meter.mark(batchedDocs.size());
    
    // Send FAIL events for failed documents
    for (Pair<Document, String> pair : failedDocs) {
        messenger.sendEvent(pair.getLeft(), "FAILED: " + pair.getRight(), Event.Type.FAIL);
    }
    
    // Send FINISH events for successful documents
    for (Document doc : batchedDocs) {
        if (!failedDocs.contains(doc)) {
            messenger.sendEvent(doc, "SUCCEEDED", Event.Type.FINISH);
        }
    }
    
    messenger.batchComplete(batchedDocs);
}

Built-in Indexers

Core Indexers

SolrIndexer

Index to Apache Solr. Supports both standalone and SolrCloud deployments.

OpenSearchIndexer

Index to OpenSearch. Supports bulk operations and routing.

ElasticsearchIndexer

Index to Elasticsearch. Compatible with ES 7.x and 8.x.

CSVIndexer

Write documents to CSV files. Useful for exports and debugging.

Plugin Indexers

PineconeIndexer

Index vector embeddings to Pinecone. Available in lucille-pinecone plugin.

WeaviateIndexer

Index to Weaviate vector database. Available in lucille-weaviate plugin.

Connection Validation

Indexers validate connectivity before starting:
Indexer indexer = IndexerFactory.fromConfig(config, messenger, bypass, metricsPrefix);

if (!indexer.validateConnection()) {
    log.error("Indexer could not connect");
    System.exit(1);
}

Thread indexerThread = new Thread(indexer);
indexerThread.start();
Validation typically:
  • Checks network connectivity
  • Verifies authentication credentials
  • Confirms index/collection exists
  • Tests basic write permissions

Error Handling

Document-Level Failures

Some documents may fail while others succeed:
@Override
protected Set<Pair<Document, String>> sendToIndex(List<Document> documents) throws Exception {
    BulkResponse response = client.bulk(buildBulkRequest(documents));
    
    Set<Pair<Document, String>> failed = new HashSet<>();
    for (int i = 0; i < response.getItems().length; i++) {
        if (response.getItems()[i].isFailed()) {
            String reason = response.getItems()[i].getFailureMessage();
            failed.add(Pair.of(documents.get(i), reason));
        }
    }
    
    return failed;
}
Failed documents:
  • Get logged with failure reason
  • Receive FAIL events
  • Are not retried automatically
  • Don’t cause the run to fail

Batch-Level Failures

If the entire batch fails (network error, auth failure, etc.):
try {
    Set<Pair<Document, String>> failedDocs = sendToIndex(batchedDocs);
    // Handle individual failures...
} catch (Exception e) {
    // Entire batch failed
    log.error("Error sending documents to index: " + e.getMessage(), e);
    
    for (Document doc : batchedDocs) {
        messenger.sendEvent(doc, "FAILED: " + e.getMessage(), Event.Type.FAIL);
    }
}
Batch failures:
  • Log the exception
  • Mark all documents as failed
  • Send FAIL events for all documents
  • Don’t terminate the indexer (continues with next batch)

Metrics

Indexers collect detailed performance metrics:
MetricRegistry metrics = SharedMetricRegistries.getOrCreate(METRICS_REG);
this.meter = metrics.meter(metricsPrefix + ".indexer.docsIndexed");
this.histogram = metrics.histogram(metricsPrefix + ".indexer.batchTimeOverSize");
Metrics collected:
  • docsIndexed: Total documents successfully indexed
  • batchTimeOverSize: Average time per document (batch time / batch size)
  • docsIndexed rate: Documents per second (1-minute, 5-minute, 15-minute rates)
Example output:
10000 docs indexed. 
One minute rate: 234.56 docs/sec. 
Mean backend latency: 8.32 ms/doc.

Bypass Mode

In test mode, the indexer can bypass the actual search engine:
Indexer indexer = IndexerFactory.fromConfig(
    config, 
    messenger, 
    bypass = true,  // Skip actual indexing
    metricsPrefix
);
Bypass mode:
  • Skips sendToIndex() call
  • Still sends success events
  • Useful for testing pipeline logic
  • Validates configuration without search engine

Creating Custom Indexers

Extend Indexer to create custom destinations:
public class CustomIndexer extends Indexer {
    
    public static final Spec SPEC = SpecBuilder.indexer()
        .requiredString("apiUrl", "apiKey")
        .optionalInt("timeout")
        .build();
    
    private final String apiUrl;
    private final HttpClient client;
    
    public CustomIndexer(Config config, IndexerMessenger messenger, 
                        boolean bypass, String metricsPrefix, String localRunId) {
        super(config, messenger, bypass, metricsPrefix, localRunId);
        this.apiUrl = config.getString("custom.apiUrl");
        this.client = HttpClient.newHttpClient();
    }
    
    @Override
    protected String getIndexerConfigKey() {
        return "custom";
    }
    
    @Override
    public boolean validateConnection() {
        try {
            HttpResponse<String> response = client.send(
                HttpRequest.newBuilder(URI.create(apiUrl + "/health")).build(),
                HttpResponse.BodyHandlers.ofString()
            );
            return response.statusCode() == 200;
        } catch (Exception e) {
            return false;
        }
    }
    
    @Override
    protected Set<Pair<Document, String>> sendToIndex(List<Document> documents) throws Exception {
        Set<Pair<Document, String>> failed = new HashSet<>();
        
        for (Document doc : documents) {
            try {
                String json = objectMapper.writeValueAsString(getIndexerDoc(doc));
                HttpResponse<String> response = client.send(
                    HttpRequest.newBuilder()
                        .uri(URI.create(apiUrl + "/documents"))
                        .POST(HttpRequest.BodyPublishers.ofString(json))
                        .build(),
                    HttpResponse.BodyHandlers.ofString()
                );
                
                if (response.statusCode() != 200) {
                    failed.add(Pair.of(doc, "HTTP " + response.statusCode()));
                }
            } catch (Exception e) {
                failed.add(Pair.of(doc, e.getMessage()));
            }
        }
        
        return failed;
    }
    
    @Override
    public void closeConnection() {
        // Cleanup
    }
}

Best Practices

  1. Optimize Batch Size: Test different sizes to find optimal throughput
  2. Handle Timeouts: Set appropriate batch timeout to avoid hanging
  3. Validate Early: Check connection before processing documents
  4. Log Failures: Provide detailed error messages for debugging
  5. Monitor Metrics: Track indexing rate and latency
  6. Clean Up Resources: Always close connections in closeConnection()
  7. Test Bypass Mode: Verify configuration without search engine
  8. Document Config: Use Spec to validate all properties

Next Steps

Built-in Indexers

Explore available indexer implementations

Deployment

Deploy indexers in distributed mode

Documents

Understand document structure

Architecture

Review system architecture