Overview
Distributed mode separates Lucille components into independent processes that communicate via Kafka. This architecture enables:
Horizontal scaling - Add more Workers or Indexers to increase throughput
Fault tolerance - Kafka provides durable message storage and replay
Independent scaling - Scale each component based on its bottleneck
High throughput - Process millions of documents efficiently
Production reliability - Suitable for 24/7 production workloads
Distributed mode requires a running Kafka cluster. Workers and Indexers must be started before the Runner.
Architecture
Components run in separate JVMs and communicate through Kafka topics:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Runner │ │ Worker │ │ Indexer │
│ (Process) │ │ (Process) │ │ (Process) │
│ │ │ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │Connector│ │ │ │Pipeline │ │ │ │ Batching│ │
│ └────┬────┘ │ │ └────┬────┘ │ │ └────┬────┘ │
└──────┼──────┘ └─────┼──────┘ └─────┼──────┘
│ │ │
├─────────────────────┼─────────────────────┤
│ │ │
┌───▼─────────────────────▼─────────────────────▼───┐
│ Kafka Message Broker │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Source │ │ Dest. │ │ Events │ │
│ │ Topic │ │ Topic │ │ Topic │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────┘
▲
│
Multiple instances can scale horizontally
Kafka Topics
Distributed mode uses three Kafka topics per pipeline:
Topic Producer Consumer Purpose {pipeline}_sourceRunner (Connector) Worker(s) Unprocessed documents {pipeline}_destinationWorker(s) Indexer Processed documents {pipeline}_eventsWorker, Indexer Runner (Publisher) Lifecycle events
Kafka topics must be created before running Lucille, or Kafka must be configured with auto.create.topics.enable=true.
Configuration
Add Kafka configuration to your config file:
kafka {
bootstrapServers: "kafka-broker1:9092,kafka-broker2:9092"
# Consumer settings
consumerGroupId: "lucille_workers"
maxPollIntervalSecs: 600
# Producer settings
maxRequestSize: 250000000 # 250MB for large documents
# Serialization
documentDeserializer: "com.kmwllc.lucille.message.KafkaDocumentDeserializer"
documentSerializer: "com.kmwllc.lucille.message.KafkaDocumentSerializer"
# Event tracking
events: true
}
connectors: [
{
class: "com.kmwllc.lucille.connector.FileConnector"
name: "file_connector"
pipeline: "processing_pipeline"
paths: ["s3://bucket/data/*.csv"]
fileHandlers: { csv: {} }
}
]
pipelines: [
{
name: "processing_pipeline"
stages: [
{
class: "com.kmwllc.lucille.stage.ValidateFields"
required: ["id", "title"]
}
]
}
]
indexer {
type: "OpenSearch"
batchSize: 500
batchTimeout: 5000
}
opensearch {
url: "https://opensearch-cluster:9200"
index: "documents"
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
}
Step 2: Start Worker Processes
Workers consume from the source topic, process documents, and produce to the destination topic:
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker \
processing_pipeline
Start multiple workers for parallel processing:
# Terminal 1
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
# Terminal 2
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
# Terminal 3...
Each Worker process can run multiple worker threads based on worker.numThreads configuration.
Step 3: Start Indexer Process
The Indexer consumes from the destination topic and writes to the search backend:
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer \
processing_pipeline
Multiple Indexers for higher write throughput:
# Terminal 1
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer processing_pipeline
# Terminal 2
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer processing_pipeline
The Runner executes connectors and monitors completion via events:
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Runner \
-useKafka
The -useKafka flag is critical - it tells the Runner to use Kafka messaging instead of in-memory queues.
Scaling Strategies
Scale Workers (Pipeline Processing)
Symptom: Destination topic lag is low, but source topic lag is growing.
Solution: Add more Worker processes.
# Check Kafka consumer lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group lucille_workers
GROUP TOPIC LAG
lucille_workers processing_pipeline_source 15000 ⚠️ High lag
Add Workers:
Start additional Worker JVMs on the same machine
Deploy Workers on additional machines
Increase worker.numThreads in existing Workers
Scale Indexers (Backend Writes)
Symptom: Destination topic lag is growing.
Solution: Add more Indexer processes or tune batch settings.
# Check destination topic lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group lucille_indexers
GROUP TOPIC LAG
lucille_indexers processing_pipeline_destination 8000 ⚠️ High lag
Optimize Indexing:
Increase Batch Size
Add Indexer Instances
indexer {
batchSize: 1000 # Larger batches = fewer requests
batchTimeout: 10000 # Wait longer to fill batches
}
Scale Connector (Source Reads)
Symptom: Runner finishes quickly, but overall throughput is low.
Solution: Increase connector parallelism or tune source connector settings.
connectors: [
{
class: "com.kmwllc.lucille.connector.DatabaseConnector"
name: "db_connector"
pipeline: "processing_pipeline"
# Increase fetch size for bulk reads
fetchSize: 10000
# Use multiple threads for partitioned reads
numThreads: 4
}
]
Kafka Configuration Deep Dive
Consumer Settings
kafka {
# Maximum time between polls before consumer is considered dead
maxPollIntervalSecs: 600
# Number of records returned in a single poll
maxPollRecords: 500
# Consumer group ID for Workers
consumerGroupId: "lucille_workers"
# Offset commit strategy
enableAutoCommit: false # Manual commits after processing
}
maxPollIntervalSecs must be greater than the longest expected document processing time. If a Worker takes longer than this to process a batch, it will be kicked from the consumer group.
Producer Settings
kafka {
# Maximum request size (for large documents)
maxRequestSize: 250000000
# Compression for network efficiency
compressionType: "gzip"
# Delivery semantics
acks: "all" # Wait for all replicas
# Batching for throughput
batchSize: 16384
lingerMs: 10
}
Event Topic Configuration
kafka {
# Enable event tracking
events: true
# Event topic settings
eventTopicPartitions: 1 # Single partition for ordering
eventTopicReplicas: 3 # High durability
eventTopicRetentionMs: 86400000 # 24 hours
}
Hybrid Mode: Kafka Local
For testing distributed behavior without deploying multiple processes:
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Runner \
-useKafka -local
This mode:
Launches Worker and Indexer as threads (like local mode)
Uses Kafka for messaging (like distributed mode)
Useful for integration testing Kafka configurations
Monitoring and Observability
Log Aggregation
Centralize logs from all processes:
# Example: Filebeat + Elasticsearch + Kibana
filebeat.inputs:
- type: log
paths:
- /var/log/lucille/ * .log
fields:
component: worker
pipeline: processing_pipeline
Metrics Collection
Lucille emits metrics via Dropwizard Metrics. Integrate with monitoring systems:
runner {
metricsLoggingLevel: "INFO" # Log metrics to stdout
}
log {
seconds: 30 # Log metrics every 30 seconds
}
Example Prometheus integration:
import com.codahale.metrics.SharedMetricRegistries;
import io.prometheus.client.dropwizard.DropwizardExports;
DropwizardExports prometheusExporter = new DropwizardExports (
SharedMetricRegistries . getOrCreate ( "lucille" )
);
prometheusExporter . register ();
Kafka Monitoring
Monitor topic lag and throughput:
# Consumer lag by group
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group lucille_workers
# Topic throughput
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka:9092 \
--topic processing_pipeline_source
Fault Tolerance and Recovery
Kafka Offset Management
Workers commit offsets only after successful processing:
// Worker.java:186-194
private void commitOffsetsAndRemoveCounter ( Document doc) {
try {
messenger . commitPendingDocOffsets (); // Commit after processing
if (trackRetries && doc != null ) {
counter . remove (doc);
}
} catch ( Exception commitException ) {
log . error ( "Error committing updated offsets" , commitException);
}
}
If a Worker crashes, uncommitted offsets allow another Worker to reprocess those documents from Kafka.
Indexer Failure Handling
The Indexer sends failure events for documents that cannot be indexed:
// Indexer.java:296-303
for ( Pair < Document , String > pair : failedDocPairs) {
try {
messenger . sendEvent ( pair . getLeft (), "FAILED: " + pair . getRight (), Event . Type . FAIL );
docLogger . error ( "Sent failure message for doc {}. Reason: {}" ,
pair . getLeft (). getId (), pair . getRight ());
} catch ( Exception e ) {
log . error ( "Couldn't send failure event for doc {}" , pair . getLeft (). getId (), e);
}
}
Graceful Shutdown
All components handle SIGINT gracefully:
Worker Shutdown:
Signal . handle ( new Signal ( "INT" ), signal -> {
workerPool . stop (); // Finish in-flight documents
log . info ( "Workers shutting down" );
System . exit ( 0 );
});
Indexer Shutdown:
Signal . handle ( new Signal ( "INT" ), signal -> {
indexer . terminate (); // Flush final batch
log . info ( "Indexer shutting down" );
System . exit ( 0 );
});
Deployment Patterns
Containerized Deployment (Docker/Kubernetes)
Worker Dockerfile
Kubernetes Deployment
FROM openjdk:17-slim
COPY lucille-core/target/lucille.jar /app/lucille.jar
COPY lucille-core/target/lib /app/lib
COPY application.conf /app/application.conf
ENTRYPOINT [ "java" , \
"-Dconfig.file=/app/application.conf" , \
"-cp" , "/app/lucille.jar:/app/lib/*" , \
"com.kmwllc.lucille.core.Worker" ]
CMD [ "processing_pipeline" ]
/etc/systemd/system/lucille-worker@.service
Start Multiple Workers
[Unit]
Description =Lucille Worker %i
After =network.target kafka.service
[Service]
Type =simple
User =lucille
WorkingDirectory =/opt/lucille
ExecStart =/usr/bin/java \
- Dconfig.file =/etc/lucille/application.conf \
-cp '/opt/lucille/lucille.jar:/opt/lucille/lib/*' \
com.kmwllc.lucille.core.Worker \
processing_pipeline
Restart =on-failure
RestartSec =10
[Install]
WantedBy =multi-user.target
Best Practices
Troubleshooting
Workers Not Consuming Messages
Symptom: Source topic has messages, but workers are idle.
Causes:
Consumer group rebalancing
Kafka connectivity issues
Incorrect pipeline name
Debug:
# Check consumer group membership
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group lucille_workers
# Verify topic exists
kafka-topics.sh --bootstrap-server kafka:9092 --list
High Memory Usage
Symptom: Workers or Indexers run out of heap memory.
Causes:
Large documents in memory
Too many worker threads
Indexer batches too large
Solutions:
# Increase heap size
java -Xmx8g -Xms4g ...
# Reduce batch size
indexer {
batchSize: 250 # Smaller batches
}
worker {
numThreads: 2 # Fewer threads per process
}
Next Steps
Production Best Practices Learn monitoring, performance tuning, and troubleshooting for production
Kafka Configuration Deep dive into Kafka configuration options