Overview
Running Lucille in production requires careful attention to:
Monitoring and alerting - Track metrics and detect issues proactively
Performance tuning - Optimize throughput and resource utilization
Reliability - Ensure fault tolerance and data durability
Security - Protect credentials and secure inter-component communication
Operational excellence - Automate deployments and maintain observability
This guide assumes you’re running Lucille in distributed mode with Kafka. Local mode is not recommended for production workloads.
Monitoring and Metrics
Built-in Metrics
Lucille uses Dropwizard Metrics to track performance across all components. Configure logging level:
runner {
metricsLoggingLevel: "INFO" # Log metrics to stdout
connectorTimeout: 86400000 # 24 hours for long-running connectors
}
log {
seconds: 30 # Log metrics every 30 seconds
}
Key Metrics to Monitor
Worker Metrics
Document Processing Rate:
WORKER: 27017 docs processed.
One minute rate: 1787.10 docs/sec.
Mean pipeline latency: 10.63 ms/doc.
Key Indicators:
docs processed - Total documents processed since startup
One minute rate - Current throughput (docs/sec)
Mean pipeline latency - Average time per document through all stages
Alert Thresholds:
⚠️ Throughput drops below 50% of baseline
⚠️ Mean latency increases by 2x
🚨 Throughput = 0 for >5 minutes (worker stalled)
Indexer Metrics
Indexing Rate:
INDEXER: 17016 docs indexed.
One minute rate: 455.07 docs/sec.
Mean backend latency: 6.90 ms/doc.
Key Indicators:
docs indexed - Total documents written to destination
One minute rate - Current indexing throughput
Mean backend latency - Average time per batch request to search backend
Alert Thresholds:
⚠️ Backend latency exceeds 100ms/doc (backend saturation)
⚠️ Indexing rate < processing rate for >10 minutes (backlog growing)
🚨 Indexer stopped (no output for >5 minutes)
Publisher Metrics
Connector Output:
PUBLISHER: 37029 docs published.
One minute rate: 3225.69 docs/sec.
Mean connector latency: 0.00 ms/doc.
Waiting on 21014 docs.
Key Indicators:
docs published - Total documents emitted by connector
Waiting on X docs - Documents not yet indexed or failed
Mean connector latency - Time to read and publish each document
Alert Thresholds:
⚠️ Waiting on count grows unbounded (downstream bottleneck)
🚨 Waiting on count stuck for >1 hour (deadlock or failure)
Stage-Level Metrics
Per-Stage Performance:
STAGE: Stage test_summary metrics.
Docs processed: 200000.
Mean latency: 0.3532 ms/doc.
Children: 0.
Errors: 0.
Key Indicators:
Mean latency - Time spent in this specific stage
Children - Child documents created by this stage
Errors - Document failures in this stage
Alert Thresholds:
⚠️ Stage latency >90th percentile (hotspot detected)
🚨 Errors >1% of documents (data quality issue)
Exporting Metrics
Prometheus Integration
Expose Lucille metrics to Prometheus:
PrometheusExporter.java
prometheus.yml
import com.codahale.metrics.SharedMetricRegistries;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.HTTPServer;
import com.kmwllc.lucille.util.LogUtils;
public class PrometheusExporter {
public static void main ( String [] args ) throws Exception {
// Export Lucille metrics to Prometheus
DropwizardExports prometheusExporter = new DropwizardExports (
SharedMetricRegistries . getOrCreate ( LogUtils . METRICS_REG )
);
prometheusExporter . register ();
// Start HTTP server on port 9091
HTTPServer server = new HTTPServer ( 9091 );
System . out . println ( "Prometheus metrics exposed on :9091/metrics" );
}
}
Grafana Dashboards
Key Panels:
# Document processing throughput
rate(lucille_worker_docs_processed_total[5m])
# Pipeline latency (p95)
histogram_quantile(0.95, rate(lucille_worker_pipeline_latency_bucket[5m]))
# Indexing throughput
rate(lucille_indexer_docs_indexed_total[5m])
# Error rate
rate(lucille_stage_errors_total[5m])
Log Aggregation
Centralize logs from all components:
filebeat.yml
Elasticsearch Query
filebeat.inputs :
- type : log
enabled : true
paths :
- /var/log/lucille/worker-*.log
fields :
component : worker
pipeline : processing_pipeline
- type : log
enabled : true
paths :
- /var/log/lucille/indexer-*.log
fields :
component : indexer
pipeline : processing_pipeline
output.elasticsearch :
hosts : [ "elasticsearch:9200" ]
index : "lucille-logs-%{+yyyy.MM.dd}"
Kafka Monitoring
Monitor Kafka health and consumer lag:
# Consumer lag by topic
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group lucille_workers
GROUP TOPIC PARTITION LAG
lucille_workers processing_pipeline_source 0 2500 ⚠️
lucille_workers processing_pipeline_source 1 1800
lucille_workers processing_pipeline_source 2 3200 ⚠️
Alert Thresholds:
⚠️ Lag >10,000 messages per partition
🚨 Lag growing for >30 minutes (workers can’t keep up)
🚨 Consumer group has no active members (all workers down)
Worker Optimization
Thread Pool Sizing
Rule of thumb: numThreads = (CPU cores * 2) for I/O-bound pipelines.
worker {
numThreads: 8 # Tune based on CPU and pipeline characteristics
}
Test different configurations:
Baseline Test
Run with default settings and record throughput.
CPU-bound Test
If pipeline is CPU-heavy (complex transformations), set numThreads = CPU cores.
I/O-bound Test
If pipeline waits on external services (API calls, database lookups), set numThreads = CPU cores * 3.
Monitor and Adjust
Watch CPU utilization and heap memory. Reduce threads if memory pressure is high.
JVM Heap Sizing
Calculate heap requirements:
Heap = (numThreads × avgDocSize × queueDepth) + overheadBuffer
Example:
8 worker threads
1MB average document size
100 documents in-flight per thread
20% overhead buffer
Heap = (8 × 1MB × 100) × 1.2 = 960MB ≈ 1GB minimum
Configuration:
java -Xmx4g -Xms4g \ # 4x minimum for safety
-XX:+UseG1GC \ # G1 for large heaps
-XX:MaxGCPauseMillis =200 \
-Dconfig.file=application.conf \
-cp 'lucille.jar:lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
Set -Xms equal to -Xmx to prevent heap resizing overhead during runtime.
Kafka Consumer Tuning
kafka {
# Fetch more records per poll
maxPollRecords: 500
# Increase timeout for slow pipelines
maxPollIntervalSecs: 1200 # 20 minutes
# Larger fetch buffer for throughput
fetchMinBytes: 1048576 # 1MB
maxPartitionFetchBytes: 10485760 # 10MB
}
Tuning Guidelines:
maxPollRecords × processing time < maxPollIntervalSecs
Lower maxPollRecords if workers timeout
Higher fetchMinBytes for network efficiency
Indexer Optimization
Batch Size Tuning
Balance latency vs throughput:
indexer {
batchSize: 1000 # Documents per batch
batchTimeout: 5000 # Milliseconds to wait for full batch
}
Tuning Strategy:
High Throughput
Low Latency
Balanced
indexer {
batchSize: 2000
batchTimeout: 10000 # Wait longer for full batches
}
Use when: Indexing millions of documents, latency is not critical.indexer {
batchSize: 250
batchTimeout: 1000 # Send smaller batches quickly
}
Use when: Real-time indexing, low end-to-end latency required.indexer {
batchSize: 500
batchTimeout: 5000
}
Use when: General-purpose production workloads.
Backend-Specific Tuning
Solr:
solr {
url: ["http://solr-1:8983/solr", "http://solr-2:8983/solr"]
defaultCollection: "documents"
commitWithin: 60000 # Soft commit within 60 seconds
# Connection pooling
maxConnectionsPerHost: 100
maxConnections: 500
}
OpenSearch/Elasticsearch:
opensearch {
url: "https://opensearch-cluster:9200"
index: "documents"
# Bulk indexing optimization
refreshPolicy: "false" # Don't refresh after each batch
# Connection pooling
maxConnections: 50
connectionTimeout: 30000
}
Pipeline Optimization
Identify Bottlenecks
Analyze stage-level metrics:
Stage extract_text metrics. Docs: 200000. Latency: 15.20 ms/doc. ⚠️ Slow!
Stage validate_fields metrics. Docs: 200000. Latency: 0.05 ms/doc.
Stage enrich_metadata metrics. Docs: 200000. Latency: 8.30 ms/doc.
Optimization strategies:
Cache external lookups - Use in-memory caches for repeated API calls
Parallelize stages - Split heavy stages into multiple lighter stages
Reduce I/O - Batch external service calls
Optimize regex - Precompile patterns, use simpler expressions
Stage-Specific Tuning
Example: Text extraction stage
pipelines: [
{
name: "processing_pipeline"
stages: [
{
class: "com.kmwllc.lucille.stage.TextExtractor"
# Limit file size for extraction
maxFileSize: 10485760 # 10MB
# Timeout for slow extractions
timeout: 30000 # 30 seconds
# Skip binary formats
skipMimeTypes: ["application/octet-stream"]
}
]
}
]
Kafka Tuning
Topic Partitioning
Partition count = max(numWorkers, numIndexers) × 2
Example: 5 workers + 2 indexers = 14 partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic processing_pipeline_source \
--partitions 14 \
--replication-factor 3
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic processing_pipeline_destination \
--partitions 14 \
--replication-factor 3
More partitions enable higher parallelism but increase Kafka broker overhead. Don’t exceed 100 partitions per topic without testing.
Retention and Cleanup
kafka {
# Topic retention settings (applied via Kafka topic configs)
retentionMs: 86400000 # 24 hours
retentionBytes: 10737418240 # 10GB per partition
# Cleanup policy
cleanupPolicy: "delete" # vs "compact"
}
Manually configure topic retention:
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name processing_pipeline_source \
--alter --add-config retention.ms= 86400000
Reliability and Fault Tolerance
Kafka Durability
Production Kafka configuration:
kafka {
# Producer durability
acks: "all" # Wait for all replicas
retries: 10 # Retry failed sends
maxInFlightRequestsPerConnection: 1 # Ordering guarantee
# Consumer reliability
enableAutoCommit: false # Manual offset commits
isolationLevel: "read_committed"
}
Topic replication:
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic processing_pipeline_source \
--partitions 10 \
--replication-factor 3 \ # At least 3 replicas
--config min.insync.replicas= 2 # Require 2+ replicas for writes
Retry Mechanisms
Document-Level Retries
Configure ZooKeeper-based retry tracking:
worker {
maxRetries: 3 # Retry failed documents up to 3 times
}
zookeeper {
connectString: "zk-1:2181,zk-2:2181,zk-3:2181"
sessionTimeout: 60000
connectionTimeout: 30000
}
How it works:
Document fails in pipeline
Worker sends document to failure topic
ZooKeeper tracks retry count per document ID
Worker reprocesses from failure topic
After maxRetries, document sent to dead letter queue (DLQ)
// Worker.java:96-112
if (trackRetries && counter . add (doc)) {
try {
log . info ( "Retry count exceeded for document " + doc . getId () +
"; Sending to failure topic" );
messenger . sendFailed (doc);
} catch ( Exception e ) {
log . error ( "Failed to send doc to failure topic: " + doc . getId (), e);
}
try {
messenger . sendEvent (doc, "SENT_TO_DLQ" , Event . Type . FAIL );
} catch ( Exception e ) {
log . error ( "Failed to send completion event for: " + doc . getId (), e);
}
}
Health Checks
Kubernetes liveness and readiness probes:
Worker Health Check
SimpleHealthCheck.java
apiVersion : apps/v1
kind : Deployment
metadata :
name : lucille-worker
spec :
template :
spec :
containers :
- name : worker
image : lucille-worker:latest
livenessProbe :
exec :
command :
- /bin/sh
- -c
- "ps aux | grep 'com.kmwllc.lucille.core.Worker' | grep -v grep"
initialDelaySeconds : 60
periodSeconds : 30
timeoutSeconds : 5
failureThreshold : 3
readinessProbe :
httpGet :
path : /health
port : 8080
initialDelaySeconds : 30
periodSeconds : 10
Security
Kafka Security
Enable SSL/TLS for encryption:
kafka {
bootstrapServers: "kafka-1:9093,kafka-2:9093"
security.protocol: "SSL"
ssl.truststore.location: "/etc/kafka/ssl/truststore.jks"
ssl.truststore.password: "${TRUSTSTORE_PASSWORD}"
ssl.keystore.location: "/etc/kafka/ssl/keystore.jks"
ssl.keystore.password: "${KEYSTORE_PASSWORD}"
ssl.key.password: "${KEY_PASSWORD}"
}
Enable SASL for authentication:
kafka {
bootstrapServers: "kafka-1:9093,kafka-2:9093"
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"lucille\" password=\"${KAFKA_PASSWORD}\";"
}
Secrets Management
Never hardcode credentials in config files.
Environment Variables
Export Secrets
Kubernetes Secrets
opensearch {
url: "https://opensearch-cluster:9200"
username: "${OPENSEARCH_USERNAME}"
password: "${OPENSEARCH_PASSWORD}"
}
kafka {
bootstrapServers: "${KAFKA_BOOTSTRAP_SERVERS}"
sasl.jaas.config: "${KAFKA_SASL_CONFIG}"
}
Troubleshooting
Common Issues
OutOfMemoryError in Workers
Symptoms:
Workers crash with java.lang.OutOfMemoryError: Java heap space
GC logs show frequent full GCs
Causes:
Heap too small for document size and thread count
Memory leak in custom stage
Too many worker threads
Solutions:
# 1. Increase heap size
java -Xmx8g -Xms8g \
-XX:+HeapDumpOnOutOfMemoryError \
-XX:HeapDumpPath=/var/log/lucille/heap-dump.hprof \
...
# 2. Analyze heap dump
jhat /var/log/lucille/heap-dump.hprof
# Or use Eclipse Memory Analyzer (MAT)
# 3. Reduce worker threads
worker {
numThreads: 4 # Reduce from 8
}
Kafka Consumer Rebalancing Loop
Symptoms:
Workers repeatedly log “Rebalancing consumer group”
No documents processed
High consumer lag
Causes:
Processing time exceeds maxPollIntervalSecs
Network issues between workers and Kafka
Solutions:
kafka {
# Increase timeout for slow pipelines
maxPollIntervalSecs: 1800 # 30 minutes
# Reduce batch size so each poll completes faster
maxPollRecords: 100
# Increase session timeout
sessionTimeoutMs: 60000
}
Indexer Falling Behind
Symptoms:
Destination topic lag growing
Indexer throughput < worker throughput
Causes:
Backend is saturated (slow writes)
Network latency to backend
Batch size too small
Solutions:
# 1. Increase batch size
indexer {
batchSize: 2000
batchTimeout: 10000
}
# 2. Tune backend settings (Solr example)
solr {
commitWithin: 300000 # 5 minutes (less frequent commits)
maxConnectionsPerHost: 200
}
# 3. Add more indexer instances
java -Dconfig.file=application.conf \
-cp 'lucille.jar:lib/*' \
com.kmwllc.lucille.core.Indexer processing_pipeline
High Error Rate in Pipeline
Symptoms:
Stage reports high error count: Errors: 5000
Many documents in dead letter queue
Causes:
Data quality issues (malformed documents)
Stage configuration errors
External service failures (API timeouts)
Debugging:
# 1. Tail worker logs for errors
tail -f /var/log/lucille/worker.log | grep ERROR
# 2. Query failed documents from DLQ
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic processing_pipeline_failed \
--from-beginning | head -10
# 3. Check stage-specific logs
grep "Stage validate_fields" /var/log/lucille/worker.log | grep ERROR
Solutions:
# Add validation and error handling
pipelines: [
{
name: "processing_pipeline"
stages: [
{
class: "com.kmwllc.lucille.stage.ValidateFields"
required: ["id", "title"]
dropIfInvalid: true # Drop instead of fail
},
{
class: "com.kmwllc.lucille.stage.TryCatch"
tryStages: [
{ class: "com.kmwllc.lucille.stage.EnrichFromAPI" }
]
catchStages: [
{ class: "com.kmwllc.lucille.stage.SetField"
field: "enrichment_failed"
value: "true"
}
]
}
]
}
]
Enable Debug Logging
Log4j2 configuration:
<? xml version = "1.0" encoding = "UTF-8" ?>
< Configuration status = "WARN" >
< Appenders >
< Console name = "Console" target = "SYSTEM_OUT" >
< PatternLayout pattern = "%d{ISO8601} %-5level [%t] %c{1} - %msg%n" />
</ Console >
< File name = "File" fileName = "/var/log/lucille/worker.log" >
< PatternLayout pattern = "%d{ISO8601} %-5level [%t] %c - %msg%n" />
</ File >
</ Appenders >
< Loggers >
< Logger name = "com.kmwllc.lucille" level = "DEBUG" /> <!-- Debug Lucille -->
< Logger name = "org.apache.kafka" level = "INFO" /> <!-- Quiet Kafka -->
< Root level = "INFO" >
< AppenderRef ref = "Console" />
< AppenderRef ref = "File" />
</ Root >
</ Loggers >
</ Configuration >
Capture Thread Dumps
# Find Worker PID
ps aux | grep com.kmwllc.lucille.core.Worker
# Capture thread dump
jstack < PI D > > worker-thread-dump.txt
# Analyze for deadlocks or stuck threads
grep -A 20 "BLOCKED" worker-thread-dump.txt
Profile with JFR
# Start Worker with JFR
java -XX:StartFlightRecording=duration=60s,filename=/tmp/worker-profile.jfr \
-Dconfig.file=application.conf \
-cp 'lucille.jar:lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
# Analyze with JMC (Java Mission Control)
jmc /tmp/worker-profile.jfr
Checklist for Production
Next Steps
Configuration Reference Detailed reference for all configuration options
API Reference Explore connectors, stages, and indexers