Skip to main content

Overview

Running Lucille in production requires careful attention to:
  • Monitoring and alerting - Track metrics and detect issues proactively
  • Performance tuning - Optimize throughput and resource utilization
  • Reliability - Ensure fault tolerance and data durability
  • Security - Protect credentials and secure inter-component communication
  • Operational excellence - Automate deployments and maintain observability
This guide assumes you’re running Lucille in distributed mode with Kafka. Local mode is not recommended for production workloads.

Monitoring and Metrics

Built-in Metrics

Lucille uses Dropwizard Metrics to track performance across all components. Configure logging level:
application.conf
runner {
  metricsLoggingLevel: "INFO"  # Log metrics to stdout
  connectorTimeout: 86400000   # 24 hours for long-running connectors
}

log {
  seconds: 30  # Log metrics every 30 seconds
}

Key Metrics to Monitor

Worker Metrics

Document Processing Rate:
WORKER: 27017 docs processed. 
  One minute rate: 1787.10 docs/sec. 
  Mean pipeline latency: 10.63 ms/doc.
Key Indicators:
  • docs processed - Total documents processed since startup
  • One minute rate - Current throughput (docs/sec)
  • Mean pipeline latency - Average time per document through all stages
Alert Thresholds:
  • ⚠️ Throughput drops below 50% of baseline
  • ⚠️ Mean latency increases by 2x
  • 🚨 Throughput = 0 for >5 minutes (worker stalled)

Indexer Metrics

Indexing Rate:
INDEXER: 17016 docs indexed. 
  One minute rate: 455.07 docs/sec. 
  Mean backend latency: 6.90 ms/doc.
Key Indicators:
  • docs indexed - Total documents written to destination
  • One minute rate - Current indexing throughput
  • Mean backend latency - Average time per batch request to search backend
Alert Thresholds:
  • ⚠️ Backend latency exceeds 100ms/doc (backend saturation)
  • ⚠️ Indexing rate < processing rate for >10 minutes (backlog growing)
  • 🚨 Indexer stopped (no output for >5 minutes)

Publisher Metrics

Connector Output:
PUBLISHER: 37029 docs published. 
  One minute rate: 3225.69 docs/sec. 
  Mean connector latency: 0.00 ms/doc. 
  Waiting on 21014 docs.
Key Indicators:
  • docs published - Total documents emitted by connector
  • Waiting on X docs - Documents not yet indexed or failed
  • Mean connector latency - Time to read and publish each document
Alert Thresholds:
  • ⚠️ Waiting on count grows unbounded (downstream bottleneck)
  • 🚨 Waiting on count stuck for >1 hour (deadlock or failure)

Stage-Level Metrics

Per-Stage Performance:
STAGE: Stage test_summary metrics. 
  Docs processed: 200000. 
  Mean latency: 0.3532 ms/doc. 
  Children: 0. 
  Errors: 0.
Key Indicators:
  • Mean latency - Time spent in this specific stage
  • Children - Child documents created by this stage
  • Errors - Document failures in this stage
Alert Thresholds:
  • ⚠️ Stage latency >90th percentile (hotspot detected)
  • 🚨 Errors >1% of documents (data quality issue)

Exporting Metrics

Prometheus Integration

Expose Lucille metrics to Prometheus:
import com.codahale.metrics.SharedMetricRegistries;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.HTTPServer;
import com.kmwllc.lucille.util.LogUtils;

public class PrometheusExporter {
  public static void main(String[] args) throws Exception {
    // Export Lucille metrics to Prometheus
    DropwizardExports prometheusExporter = new DropwizardExports(
      SharedMetricRegistries.getOrCreate(LogUtils.METRICS_REG)
    );
    prometheusExporter.register();
    
    // Start HTTP server on port 9091
    HTTPServer server = new HTTPServer(9091);
    System.out.println("Prometheus metrics exposed on :9091/metrics");
  }
}

Grafana Dashboards

Key Panels:
# Document processing throughput
rate(lucille_worker_docs_processed_total[5m])

# Pipeline latency (p95)
histogram_quantile(0.95, rate(lucille_worker_pipeline_latency_bucket[5m]))

# Indexing throughput
rate(lucille_indexer_docs_indexed_total[5m])

# Error rate
rate(lucille_stage_errors_total[5m])

Log Aggregation

Centralize logs from all components:
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/lucille/worker-*.log
    fields:
      component: worker
      pipeline: processing_pipeline
      
  - type: log
    enabled: true
    paths:
      - /var/log/lucille/indexer-*.log
    fields:
      component: indexer
      pipeline: processing_pipeline

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "lucille-logs-%{+yyyy.MM.dd}"

Kafka Monitoring

Monitor Kafka health and consumer lag:
# Consumer lag by topic
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group lucille_workers

GROUP           TOPIC                    PARTITION  LAG    
lucille_workers processing_pipeline_source  0          2500   ⚠️
lucille_workers processing_pipeline_source  1          1800
lucille_workers processing_pipeline_source  2          3200   ⚠️
Alert Thresholds:
  • ⚠️ Lag >10,000 messages per partition
  • 🚨 Lag growing for >30 minutes (workers can’t keep up)
  • 🚨 Consumer group has no active members (all workers down)

Performance Tuning

Worker Optimization

Thread Pool Sizing

Rule of thumb: numThreads = (CPU cores * 2) for I/O-bound pipelines.
worker {
  numThreads: 8  # Tune based on CPU and pipeline characteristics
}
Test different configurations:
1

Baseline Test

Run with default settings and record throughput.
2

CPU-bound Test

If pipeline is CPU-heavy (complex transformations), set numThreads = CPU cores.
3

I/O-bound Test

If pipeline waits on external services (API calls, database lookups), set numThreads = CPU cores * 3.
4

Monitor and Adjust

Watch CPU utilization and heap memory. Reduce threads if memory pressure is high.

JVM Heap Sizing

Calculate heap requirements:
Heap = (numThreads × avgDocSize × queueDepth) + overheadBuffer
Example:
  • 8 worker threads
  • 1MB average document size
  • 100 documents in-flight per thread
  • 20% overhead buffer
Heap = (8 × 1MB × 100) × 1.2 = 960MB ≈ 1GB minimum
Configuration:
java -Xmx4g -Xms4g \  # 4x minimum for safety
  -XX:+UseG1GC \       # G1 for large heaps
  -XX:MaxGCPauseMillis=200 \
  -Dconfig.file=application.conf \
  -cp 'lucille.jar:lib/*' \
  com.kmwllc.lucille.core.Worker processing_pipeline
Set -Xms equal to -Xmx to prevent heap resizing overhead during runtime.

Kafka Consumer Tuning

kafka {
  # Fetch more records per poll
  maxPollRecords: 500
  
  # Increase timeout for slow pipelines
  maxPollIntervalSecs: 1200  # 20 minutes
  
  # Larger fetch buffer for throughput
  fetchMinBytes: 1048576     # 1MB
  maxPartitionFetchBytes: 10485760  # 10MB
}
Tuning Guidelines:
  • maxPollRecords × processing time < maxPollIntervalSecs
  • Lower maxPollRecords if workers timeout
  • Higher fetchMinBytes for network efficiency

Indexer Optimization

Batch Size Tuning

Balance latency vs throughput:
indexer {
  batchSize: 1000      # Documents per batch
  batchTimeout: 5000   # Milliseconds to wait for full batch
}
Tuning Strategy:
indexer {
  batchSize: 2000
  batchTimeout: 10000  # Wait longer for full batches
}
Use when: Indexing millions of documents, latency is not critical.

Backend-Specific Tuning

Solr:
solr {
  url: ["http://solr-1:8983/solr", "http://solr-2:8983/solr"]
  defaultCollection: "documents"
  commitWithin: 60000  # Soft commit within 60 seconds
  
  # Connection pooling
  maxConnectionsPerHost: 100
  maxConnections: 500
}
OpenSearch/Elasticsearch:
opensearch {
  url: "https://opensearch-cluster:9200"
  index: "documents"
  
  # Bulk indexing optimization
  refreshPolicy: "false"  # Don't refresh after each batch
  
  # Connection pooling
  maxConnections: 50
  connectionTimeout: 30000
}

Pipeline Optimization

Identify Bottlenecks

Analyze stage-level metrics:
Stage extract_text metrics. Docs: 200000. Latency: 15.20 ms/doc.  ⚠️ Slow!
Stage validate_fields metrics. Docs: 200000. Latency: 0.05 ms/doc.
Stage enrich_metadata metrics. Docs: 200000. Latency: 8.30 ms/doc.
Optimization strategies:
  1. Cache external lookups - Use in-memory caches for repeated API calls
  2. Parallelize stages - Split heavy stages into multiple lighter stages
  3. Reduce I/O - Batch external service calls
  4. Optimize regex - Precompile patterns, use simpler expressions

Stage-Specific Tuning

Example: Text extraction stage
pipelines: [
  {
    name: "processing_pipeline"
    stages: [
      {
        class: "com.kmwllc.lucille.stage.TextExtractor"
        
        # Limit file size for extraction
        maxFileSize: 10485760  # 10MB
        
        # Timeout for slow extractions
        timeout: 30000  # 30 seconds
        
        # Skip binary formats
        skipMimeTypes: ["application/octet-stream"]
      }
    ]
  }
]

Kafka Tuning

Topic Partitioning

Partition count = max(numWorkers, numIndexers) × 2 Example: 5 workers + 2 indexers = 14 partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic processing_pipeline_source \
  --partitions 14 \
  --replication-factor 3

kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic processing_pipeline_destination \
  --partitions 14 \
  --replication-factor 3
More partitions enable higher parallelism but increase Kafka broker overhead. Don’t exceed 100 partitions per topic without testing.

Retention and Cleanup

kafka {
  # Topic retention settings (applied via Kafka topic configs)
  retentionMs: 86400000       # 24 hours
  retentionBytes: 10737418240 # 10GB per partition
  
  # Cleanup policy
  cleanupPolicy: "delete"  # vs "compact"
}
Manually configure topic retention:
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name processing_pipeline_source \
  --alter --add-config retention.ms=86400000

Reliability and Fault Tolerance

Kafka Durability

Production Kafka configuration:
kafka {
  # Producer durability
  acks: "all"              # Wait for all replicas
  retries: 10              # Retry failed sends
  maxInFlightRequestsPerConnection: 1  # Ordering guarantee
  
  # Consumer reliability
  enableAutoCommit: false  # Manual offset commits
  isolationLevel: "read_committed"
}
Topic replication:
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic processing_pipeline_source \
  --partitions 10 \
  --replication-factor 3 \  # At least 3 replicas
  --config min.insync.replicas=2  # Require 2+ replicas for writes

Retry Mechanisms

Document-Level Retries

Configure ZooKeeper-based retry tracking:
worker {
  maxRetries: 3  # Retry failed documents up to 3 times
}

zookeeper {
  connectString: "zk-1:2181,zk-2:2181,zk-3:2181"
  sessionTimeout: 60000
  connectionTimeout: 30000
}
How it works:
  1. Document fails in pipeline
  2. Worker sends document to failure topic
  3. ZooKeeper tracks retry count per document ID
  4. Worker reprocesses from failure topic
  5. After maxRetries, document sent to dead letter queue (DLQ)
// Worker.java:96-112
if (trackRetries && counter.add(doc)) {
  try {
    log.info("Retry count exceeded for document " + doc.getId() + 
             "; Sending to failure topic");
    messenger.sendFailed(doc);
  } catch (Exception e) {
    log.error("Failed to send doc to failure topic: " + doc.getId(), e);
  }
  
  try {
    messenger.sendEvent(doc, "SENT_TO_DLQ", Event.Type.FAIL);
  } catch (Exception e) {
    log.error("Failed to send completion event for: " + doc.getId(), e);
  }
}

Health Checks

Kubernetes liveness and readiness probes:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: lucille-worker
spec:
  template:
    spec:
      containers:
      - name: worker
        image: lucille-worker:latest
        livenessProbe:
          exec:
            command:
            - /bin/sh
            - -c
            - "ps aux | grep 'com.kmwllc.lucille.core.Worker' | grep -v grep"
          initialDelaySeconds: 60
          periodSeconds: 30
          timeoutSeconds: 5
          failureThreshold: 3
        
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10

Security

Kafka Security

Enable SSL/TLS for encryption:
kafka {
  bootstrapServers: "kafka-1:9093,kafka-2:9093"
  
  security.protocol: "SSL"
  ssl.truststore.location: "/etc/kafka/ssl/truststore.jks"
  ssl.truststore.password: "${TRUSTSTORE_PASSWORD}"
  ssl.keystore.location: "/etc/kafka/ssl/keystore.jks"
  ssl.keystore.password: "${KEYSTORE_PASSWORD}"
  ssl.key.password: "${KEY_PASSWORD}"
}
Enable SASL for authentication:
kafka {
  bootstrapServers: "kafka-1:9093,kafka-2:9093"
  
  security.protocol: "SASL_SSL"
  sasl.mechanism: "SCRAM-SHA-512"
  sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"lucille\" password=\"${KAFKA_PASSWORD}\";"
}

Secrets Management

Never hardcode credentials in config files.
opensearch {
  url: "https://opensearch-cluster:9200"
  username: "${OPENSEARCH_USERNAME}"
  password: "${OPENSEARCH_PASSWORD}"
}

kafka {
  bootstrapServers: "${KAFKA_BOOTSTRAP_SERVERS}"
  sasl.jaas.config: "${KAFKA_SASL_CONFIG}"
}

Troubleshooting

Common Issues

OutOfMemoryError in Workers

Symptoms:
  • Workers crash with java.lang.OutOfMemoryError: Java heap space
  • GC logs show frequent full GCs
Causes:
  1. Heap too small for document size and thread count
  2. Memory leak in custom stage
  3. Too many worker threads
Solutions:
# 1. Increase heap size
java -Xmx8g -Xms8g \
  -XX:+HeapDumpOnOutOfMemoryError \
  -XX:HeapDumpPath=/var/log/lucille/heap-dump.hprof \
  ...

# 2. Analyze heap dump
jhat /var/log/lucille/heap-dump.hprof
# Or use Eclipse Memory Analyzer (MAT)
# 3. Reduce worker threads
worker {
  numThreads: 4  # Reduce from 8
}

Kafka Consumer Rebalancing Loop

Symptoms:
  • Workers repeatedly log “Rebalancing consumer group”
  • No documents processed
  • High consumer lag
Causes:
  • Processing time exceeds maxPollIntervalSecs
  • Network issues between workers and Kafka
Solutions:
kafka {
  # Increase timeout for slow pipelines
  maxPollIntervalSecs: 1800  # 30 minutes
  
  # Reduce batch size so each poll completes faster
  maxPollRecords: 100
  
  # Increase session timeout
  sessionTimeoutMs: 60000
}

Indexer Falling Behind

Symptoms:
  • Destination topic lag growing
  • Indexer throughput < worker throughput
Causes:
  1. Backend is saturated (slow writes)
  2. Network latency to backend
  3. Batch size too small
Solutions:
# 1. Increase batch size
indexer {
  batchSize: 2000
  batchTimeout: 10000
}

# 2. Tune backend settings (Solr example)
solr {
  commitWithin: 300000  # 5 minutes (less frequent commits)
  maxConnectionsPerHost: 200
}
# 3. Add more indexer instances
java -Dconfig.file=application.conf \
  -cp 'lucille.jar:lib/*' \
  com.kmwllc.lucille.core.Indexer processing_pipeline

High Error Rate in Pipeline

Symptoms:
  • Stage reports high error count: Errors: 5000
  • Many documents in dead letter queue
Causes:
  1. Data quality issues (malformed documents)
  2. Stage configuration errors
  3. External service failures (API timeouts)
Debugging:
# 1. Tail worker logs for errors
tail -f /var/log/lucille/worker.log | grep ERROR

# 2. Query failed documents from DLQ
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic processing_pipeline_failed \
  --from-beginning | head -10

# 3. Check stage-specific logs
grep "Stage validate_fields" /var/log/lucille/worker.log | grep ERROR
Solutions:
# Add validation and error handling
pipelines: [
  {
    name: "processing_pipeline"
    stages: [
      {
        class: "com.kmwllc.lucille.stage.ValidateFields"
        required: ["id", "title"]
        dropIfInvalid: true  # Drop instead of fail
      },
      {
        class: "com.kmwllc.lucille.stage.TryCatch"
        tryStages: [
          { class: "com.kmwllc.lucille.stage.EnrichFromAPI" }
        ]
        catchStages: [
          { class: "com.kmwllc.lucille.stage.SetField"
            field: "enrichment_failed"
            value: "true"
          }
        ]
      }
    ]
  }
]

Debugging Tools

Enable Debug Logging

Log4j2 configuration:
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="%d{ISO8601} %-5level [%t] %c{1} - %msg%n"/>
    </Console>
    <File name="File" fileName="/var/log/lucille/worker.log">
      <PatternLayout pattern="%d{ISO8601} %-5level [%t] %c - %msg%n"/>
    </File>
  </Appenders>
  <Loggers>
    <Logger name="com.kmwllc.lucille" level="DEBUG"/>  <!-- Debug Lucille -->
    <Logger name="org.apache.kafka" level="INFO"/>     <!-- Quiet Kafka -->
    <Root level="INFO">
      <AppenderRef ref="Console"/>
      <AppenderRef ref="File"/>
    </Root>
  </Loggers>
</Configuration>

Capture Thread Dumps

# Find Worker PID
ps aux | grep com.kmwllc.lucille.core.Worker

# Capture thread dump
jstack <PID> > worker-thread-dump.txt

# Analyze for deadlocks or stuck threads
grep -A 20 "BLOCKED" worker-thread-dump.txt

Profile with JFR

# Start Worker with JFR
java -XX:StartFlightRecording=duration=60s,filename=/tmp/worker-profile.jfr \
  -Dconfig.file=application.conf \
  -cp 'lucille.jar:lib/*' \
  com.kmwllc.lucille.core.Worker processing_pipeline

# Analyze with JMC (Java Mission Control)
jmc /tmp/worker-profile.jfr

Checklist for Production

1

Infrastructure

  • Kafka cluster with ≥3 brokers and replication factor 3
  • ZooKeeper ensemble for retry tracking (if using retries)
  • Search backend (Solr/OpenSearch/Elasticsearch) scaled appropriately
  • Monitoring stack (Prometheus + Grafana or ELK)
  • Log aggregation (Filebeat/Fluentd → Elasticsearch → Kibana)
2

Configuration

  • Kafka acks=all and min.insync.replicas=2
  • Topic partitions ≥ max(workers, indexers)
  • Secrets managed via environment variables or secret stores
  • JVM heap sized appropriately for workload
  • Worker threads tuned based on CPU and pipeline characteristics
  • Indexer batch size tuned for throughput vs latency
3

Observability

  • Metrics exported to Prometheus
  • Grafana dashboards for throughput, latency, errors
  • Alerts configured for critical thresholds
  • Log queries for common error scenarios
  • Kafka consumer lag monitoring
4

Reliability

  • Document retry mechanism with DLQ
  • Health checks for container orchestration
  • Graceful shutdown handlers registered
  • Backup and disaster recovery plan
  • Runbook for common operational tasks
5

Testing

  • Load test with production-scale data
  • Chaos testing (kill workers/indexers, network partitions)
  • Validation tests in CI/CD
  • Performance regression tests

Next Steps

Configuration Reference

Detailed reference for all configuration options

API Reference

Explore connectors, stages, and indexers