Skip to main content

Overview

Distributed mode separates Lucille components into independent processes that communicate via Kafka. This architecture enables:
  • Horizontal scaling - Add more Workers or Indexers to increase throughput
  • Fault tolerance - Kafka provides durable message storage and replay
  • Independent scaling - Scale each component based on its bottleneck
  • High throughput - Process millions of documents efficiently
  • Production reliability - Suitable for 24/7 production workloads
Distributed mode requires a running Kafka cluster. Workers and Indexers must be started before the Runner.

Architecture

Components run in separate JVMs and communicate through Kafka topics:
┌─────────────┐        ┌─────────────┐        ┌─────────────┐
│   Runner    │        │   Worker    │        │   Indexer   │
│  (Process)  │        │  (Process)  │        │  (Process)  │
│             │        │             │        │             │
│ ┌─────────┐ │        │ ┌─────────┐ │        │ ┌─────────┐ │
│ │Connector│ │        │ │Pipeline │ │        │ │ Batching│ │
│ └────┬────┘ │        │ └────┬────┘ │        │ └────┬────┘ │
└──────┼──────┘        └─────┼──────┘        └─────┼──────┘
       │                     │                     │
       ├─────────────────────┼─────────────────────┤
       │                     │                     │
   ┌───▼─────────────────────▼─────────────────────▼───┐
   │              Kafka Message Broker                  │
   │  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
   │  │  Source  │  │  Dest.   │  │  Events  │        │
   │  │  Topic   │  │  Topic   │  │  Topic   │        │
   │  └──────────┘  └──────────┘  └──────────┘        │
   └────────────────────────────────────────────────────┘


            Multiple instances can scale horizontally

Kafka Topics

Distributed mode uses three Kafka topics per pipeline:
TopicProducerConsumerPurpose
{pipeline}_sourceRunner (Connector)Worker(s)Unprocessed documents
{pipeline}_destinationWorker(s)IndexerProcessed documents
{pipeline}_eventsWorker, IndexerRunner (Publisher)Lifecycle events
Kafka topics must be created before running Lucille, or Kafka must be configured with auto.create.topics.enable=true.

Configuration

1
Step 1: Configure Kafka Connection
2
Add Kafka configuration to your config file:
3
kafka {
  bootstrapServers: "kafka-broker1:9092,kafka-broker2:9092"
  
  # Consumer settings
  consumerGroupId: "lucille_workers"
  maxPollIntervalSecs: 600
  
  # Producer settings
  maxRequestSize: 250000000  # 250MB for large documents
  
  # Serialization
  documentDeserializer: "com.kmwllc.lucille.message.KafkaDocumentDeserializer"
  documentSerializer: "com.kmwllc.lucille.message.KafkaDocumentSerializer"
  
  # Event tracking
  events: true
}

connectors: [
  {
    class: "com.kmwllc.lucille.connector.FileConnector"
    name: "file_connector"
    pipeline: "processing_pipeline"
    paths: ["s3://bucket/data/*.csv"]
    fileHandlers: { csv: {} }
  }
]

pipelines: [
  {
    name: "processing_pipeline"
    stages: [
      {
        class: "com.kmwllc.lucille.stage.ValidateFields"
        required: ["id", "title"]
      }
    ]
  }
]

indexer {
  type: "OpenSearch"
  batchSize: 500
  batchTimeout: 5000
}

opensearch {
  url: "https://opensearch-cluster:9200"
  index: "documents"
  username: "admin"
  password: "${OPENSEARCH_PASSWORD}"
}
4
Step 2: Start Worker Processes
5
Workers consume from the source topic, process documents, and produce to the destination topic:
6
java \
  -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Worker \
  processing_pipeline
7
Start multiple workers for parallel processing:
8
# Terminal 1
java -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Worker processing_pipeline

# Terminal 2
java -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Worker processing_pipeline

# Terminal 3...
9
Each Worker process can run multiple worker threads based on worker.numThreads configuration.
10
Step 3: Start Indexer Process
11
The Indexer consumes from the destination topic and writes to the search backend:
12
java \
  -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Indexer \
  processing_pipeline
13
Multiple Indexers for higher write throughput:
14
# Terminal 1
java -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Indexer processing_pipeline

# Terminal 2
java -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Indexer processing_pipeline
15
Step 4: Run the Runner
16
The Runner executes connectors and monitors completion via events:
17
java \
  -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Runner \
  -useKafka
18
The -useKafka flag is critical - it tells the Runner to use Kafka messaging instead of in-memory queues.

Scaling Strategies

Scale Workers (Pipeline Processing)

Symptom: Destination topic lag is low, but source topic lag is growing. Solution: Add more Worker processes.
# Check Kafka consumer lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group lucille_workers

GROUP           TOPIC                    LAG
lucille_workers processing_pipeline_source  15000  ⚠️ High lag
Add Workers:
  • Start additional Worker JVMs on the same machine
  • Deploy Workers on additional machines
  • Increase worker.numThreads in existing Workers

Scale Indexers (Backend Writes)

Symptom: Destination topic lag is growing. Solution: Add more Indexer processes or tune batch settings.
# Check destination topic lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group lucille_indexers

GROUP             TOPIC                         LAG
lucille_indexers  processing_pipeline_destination  8000  ⚠️ High lag
Optimize Indexing:
indexer {
  batchSize: 1000      # Larger batches = fewer requests
  batchTimeout: 10000  # Wait longer to fill batches
}

Scale Connector (Source Reads)

Symptom: Runner finishes quickly, but overall throughput is low. Solution: Increase connector parallelism or tune source connector settings.
connectors: [
  {
    class: "com.kmwllc.lucille.connector.DatabaseConnector"
    name: "db_connector"
    pipeline: "processing_pipeline"
    
    # Increase fetch size for bulk reads
    fetchSize: 10000
    
    # Use multiple threads for partitioned reads
    numThreads: 4
  }
]

Kafka Configuration Deep Dive

Consumer Settings

kafka {
  # Maximum time between polls before consumer is considered dead
  maxPollIntervalSecs: 600
  
  # Number of records returned in a single poll
  maxPollRecords: 500
  
  # Consumer group ID for Workers
  consumerGroupId: "lucille_workers"
  
  # Offset commit strategy
  enableAutoCommit: false  # Manual commits after processing
}
maxPollIntervalSecs must be greater than the longest expected document processing time. If a Worker takes longer than this to process a batch, it will be kicked from the consumer group.

Producer Settings

kafka {
  # Maximum request size (for large documents)
  maxRequestSize: 250000000
  
  # Compression for network efficiency
  compressionType: "gzip"
  
  # Delivery semantics
  acks: "all"  # Wait for all replicas
  
  # Batching for throughput
  batchSize: 16384
  lingerMs: 10
}

Event Topic Configuration

kafka {
  # Enable event tracking
  events: true
  
  # Event topic settings
  eventTopicPartitions: 1     # Single partition for ordering
  eventTopicReplicas: 3       # High durability
  eventTopicRetentionMs: 86400000  # 24 hours
}

Hybrid Mode: Kafka Local

For testing distributed behavior without deploying multiple processes:
java \
  -Dconfig.file=application.conf \
  -cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
  com.kmwllc.lucille.core.Runner \
  -useKafka -local
This mode:
  • Launches Worker and Indexer as threads (like local mode)
  • Uses Kafka for messaging (like distributed mode)
  • Useful for integration testing Kafka configurations

Monitoring and Observability

Log Aggregation

Centralize logs from all processes:
# Example: Filebeat + Elasticsearch + Kibana
filebeat.inputs:
  - type: log
    paths:
      - /var/log/lucille/*.log
    fields:
      component: worker
      pipeline: processing_pipeline

Metrics Collection

Lucille emits metrics via Dropwizard Metrics. Integrate with monitoring systems:
runner {
  metricsLoggingLevel: "INFO"  # Log metrics to stdout
}

log {
  seconds: 30  # Log metrics every 30 seconds
}
Example Prometheus integration:
import com.codahale.metrics.SharedMetricRegistries;
import io.prometheus.client.dropwizard.DropwizardExports;

DropwizardExports prometheusExporter = new DropwizardExports(
  SharedMetricRegistries.getOrCreate("lucille")
);
prometheusExporter.register();

Kafka Monitoring

Monitor topic lag and throughput:
# Consumer lag by group
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group lucille_workers

# Topic throughput
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list kafka:9092 \
  --topic processing_pipeline_source

Fault Tolerance and Recovery

Kafka Offset Management

Workers commit offsets only after successful processing:
// Worker.java:186-194
private void commitOffsetsAndRemoveCounter(Document doc) {
  try {
    messenger.commitPendingDocOffsets();  // Commit after processing
    if (trackRetries && doc != null) {
      counter.remove(doc);
    }
  } catch (Exception commitException) {
    log.error("Error committing updated offsets", commitException);
  }
}
If a Worker crashes, uncommitted offsets allow another Worker to reprocess those documents from Kafka.

Indexer Failure Handling

The Indexer sends failure events for documents that cannot be indexed:
// Indexer.java:296-303
for (Pair<Document, String> pair : failedDocPairs) {
  try {
    messenger.sendEvent(pair.getLeft(), "FAILED: " + pair.getRight(), Event.Type.FAIL);
    docLogger.error("Sent failure message for doc {}. Reason: {}", 
      pair.getLeft().getId(), pair.getRight());
  } catch (Exception e) {
    log.error("Couldn't send failure event for doc {}", pair.getLeft().getId(), e);
  }
}

Graceful Shutdown

All components handle SIGINT gracefully: Worker Shutdown:
Signal.handle(new Signal("INT"), signal -> {
  workerPool.stop();  // Finish in-flight documents
  log.info("Workers shutting down");
  System.exit(0);
});
Indexer Shutdown:
Signal.handle(new Signal("INT"), signal -> {
  indexer.terminate();  // Flush final batch
  log.info("Indexer shutting down");
  System.exit(0);
});

Deployment Patterns

Containerized Deployment (Docker/Kubernetes)

FROM openjdk:17-slim

COPY lucille-core/target/lucille.jar /app/lucille.jar
COPY lucille-core/target/lib /app/lib
COPY application.conf /app/application.conf

ENTRYPOINT ["java", \
  "-Dconfig.file=/app/application.conf", \
  "-cp", "/app/lucille.jar:/app/lib/*", \
  "com.kmwllc.lucille.core.Worker"]

CMD ["processing_pipeline"]

Systemd Services (Bare Metal/VMs)

[Unit]
Description=Lucille Worker %i
After=network.target kafka.service

[Service]
Type=simple
User=lucille
WorkingDirectory=/opt/lucille
ExecStart=/usr/bin/java \
  -Dconfig.file=/etc/lucille/application.conf \
  -cp '/opt/lucille/lucille.jar:/opt/lucille/lib/*' \
  com.kmwllc.lucille.core.Worker \
  processing_pipeline

Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target

Best Practices

  • Partition topics with sufficient partitions for parallelism (e.g., 10-20 partitions)
  • Tune batch sizes to balance latency vs throughput
  • Use compression on Kafka producers (gzip or snappy)
  • Monitor JVM heap - allocate enough memory for worker threads and batches
  • Benchmark pipelines before production to identify bottlenecks

Troubleshooting

Workers Not Consuming Messages

Symptom: Source topic has messages, but workers are idle. Causes:
  1. Consumer group rebalancing
  2. Kafka connectivity issues
  3. Incorrect pipeline name
Debug:
# Check consumer group membership
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group lucille_workers

# Verify topic exists
kafka-topics.sh --bootstrap-server kafka:9092 --list

High Memory Usage

Symptom: Workers or Indexers run out of heap memory. Causes:
  1. Large documents in memory
  2. Too many worker threads
  3. Indexer batches too large
Solutions:
# Increase heap size
java -Xmx8g -Xms4g ...
# Reduce batch size
indexer {
  batchSize: 250  # Smaller batches
}

worker {
  numThreads: 2  # Fewer threads per process
}

Next Steps

Production Best Practices

Learn monitoring, performance tuning, and troubleshooting for production

Kafka Configuration

Deep dive into Kafka configuration options