Skip to main content

Overview

The KafkaConnector reads messages from a Kafka topic and publishes each message as a Lucille Document. It supports configurable offset management, custom deserializers, message limits, and timeout handling. Class: com.kmwllc.lucille.connector.KafkaConnector
Extends: AbstractConnector

Key Features

  • Read messages from Kafka topics
  • Custom starting offsets per partition
  • Configurable message limits
  • Custom document deserializers
  • Timeout and polling configuration
  • Automatic offset commit
  • Graceful shutdown
  • Extensible for custom message handling

Class Signature

package com.kmwllc.lucille.connector;

public class KafkaConnector extends AbstractConnector {
  public KafkaConnector(Config config);
  
  @Override
  public void execute(Publisher publisher) throws ConnectorException;
  
  @Override
  public void close() throws ConnectorException;
  
  public void enhanceConsumerProperties(Properties props, Config config);
  public void handleMessage(ConsumerRecord<String, Document> record, Publisher publisher) throws ConnectorException;
}

Configuration Parameters

Required Parameters

kafka.bootstrapServers
String
required
Comma-separated list of Kafka broker addresses.Example: "localhost:9092" or "broker1:9092,broker2:9092,broker3:9092"
kafka.topic
String
required
Kafka topic to read messages from.Example: "documents-topic"
kafka.consumerGroupId
String
required
Kafka consumer group ID. Multiple consumers with the same group ID share partition processing.Example: "lucille-consumer-group"
kafka.clientId
String
required
Unique identifier for this Kafka consumer client. Used for logging and metrics.Example: "lucille-connector-1"
kafka.maxPollIntervalSecs
Integer
required
Maximum time in seconds between poll() calls before the consumer is considered dead.Example: 300 (5 minutes)
If document processing takes longer than this value, increase it to prevent rebalancing.

Optional Parameters

idField
String
Field name in the Kafka message to use as the Lucille Document ID.If not specified, the deserializer determines the ID (typically from a message field or generated).Example: "documentId", "messageKey"
offsets
Map<Integer, Long>
Map of partition numbers to initial offsets. Use this to start reading from specific positions.Example:
offsets: {
  0: 1000
  1: 2500
  2: 3000
}
This starts reading:
  • Partition 0 at offset 1000
  • Partition 1 at offset 2500
  • Partition 2 at offset 3000
Offsets override consumer group committed offsets. Use with caution.
maxMessages
Long
Maximum number of messages to process before stopping the connector.Useful for testing or processing finite batches.Example: 1000
messageTimeout
Long
default:"100"
Timeout in milliseconds to wait when polling Kafka for new messages.Example: 1000 (1 second)
continueOnTimeout
Boolean
default:"true"
Whether to continue polling after a timeout with no messages.
  • true: Keep polling indefinitely until maxMessages is reached or connector is closed
  • false: Stop after first timeout with no messages
Use cases:
  • true: Continuous streaming ingestion
  • false: Process available messages then stop
kafka.documentDeserializer
String
Fully qualified class name of a custom deserializer for Kafka message values.Must implement Kafka’s Deserializer<Document> interface.Default: com.kmwllc.lucille.connector.KafkaConnectorDefaultDeserializerExample: "com.example.CustomDocumentDeserializer"

Document Deserialization

The connector uses a deserializer to convert Kafka message values to Lucille Documents.

Default Deserializer

The default KafkaConnectorDefaultDeserializer expects messages in a specific format that can be deserialized to Documents. The exact format depends on how Documents are serialized when producing to Kafka.

Custom Deserializer

To use a custom deserializer:
  1. Implement org.apache.kafka.common.serialization.Deserializer<Document>
  2. Configure with kafka.documentDeserializer
  3. Access configuration properties in configure():
    • idField: If configured, use this field as the Document ID
    • docIdPrefix: Prefix to add to Document IDs
Example custom deserializer:
public class CustomDocumentDeserializer implements Deserializer<Document> {
  private String idField;
  private String docIdPrefix;
  
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    this.idField = (String) configs.get("idField");
    this.docIdPrefix = (String) configs.get("docIdPrefix");
  }
  
  @Override
  public Document deserialize(String topic, byte[] data) {
    // Parse data and create Document
    JSONObject json = new JSONObject(new String(data));
    
    String id = idField != null ? json.getString(idField) : UUID.randomUUID().toString();
    Document doc = Document.create(docIdPrefix + id);
    
    // Populate document fields
    for (String key : json.keySet()) {
      doc.setField(key, json.get(key));
    }
    
    return doc;
  }
}

Configuration Examples

Basic Kafka Consumer

connector: {
  name: "kafka-ingest"
  class: "com.kmwllc.lucille.connector.KafkaConnector"
  pipeline: "content-pipeline"
  
  kafka: {
    bootstrapServers: "localhost:9092"
    topic: "documents"
    consumerGroupId: "lucille-group"
    clientId: "lucille-client-1"
    maxPollIntervalSecs: 300
  }
  
  idField: "doc_id"
}

Process Finite Batch

connector: {
  name: "kafka-batch"
  class: "com.kmwllc.lucille.connector.KafkaConnector"
  pipeline: "batch-pipeline"
  
  kafka: {
    bootstrapServers: "kafka1:9092,kafka2:9092"
    topic: "batch-topic"
    consumerGroupId: "batch-consumer"
    clientId: "batch-client"
    maxPollIntervalSecs: 600
  }
  
  maxMessages: 10000
  continueOnTimeout: false
  messageTimeout: 5000
}

Start from Specific Offsets

connector: {
  name: "kafka-replay"
  class: "com.kmwllc.lucille.connector.KafkaConnector"
  pipeline: "replay-pipeline"
  
  kafka: {
    bootstrapServers: "kafka.example.com:9092"
    topic: "events"
    consumerGroupId: "replay-group"
    clientId: "replay-client"
    maxPollIntervalSecs: 300
  }
  
  # Start from specific offsets
  offsets: {
    0: 5000
    1: 5000
    2: 5000
  }
}

Custom Deserializer

connector: {
  name: "kafka-custom"
  class: "com.kmwllc.lucille.connector.KafkaConnector"
  pipeline: "custom-pipeline"
  
  kafka: {
    bootstrapServers: "localhost:9092"
    topic: "json-messages"
    consumerGroupId: "custom-group"
    clientId: "custom-client"
    maxPollIntervalSecs: 300
    documentDeserializer: "com.example.JSONDocumentDeserializer"
  }
  
  idField: "message_id"
  docIdPrefix: "kafka-"
}

Message Handling

The handleMessage() method processes each Kafka consumer record. By default, it publishes the deserialized Document:
public void handleMessage(ConsumerRecord<String, Document> record, Publisher publisher) throws ConnectorException {
  Document doc = record.value();
  publisher.publish(doc);
}

Custom Message Handling

Extend KafkaConnector and override handleMessage() for custom processing:
public class CustomKafkaConnector extends KafkaConnector {
  public CustomKafkaConnector(Config config) {
    super(config);
  }
  
  @Override
  public void handleMessage(ConsumerRecord<String, Document> record, Publisher publisher) throws ConnectorException {
    Document doc = record.value();
    
    // Add Kafka metadata
    doc.setField("kafka_partition", record.partition());
    doc.setField("kafka_offset", record.offset());
    doc.setField("kafka_timestamp", record.timestamp());
    
    // Conditional publishing
    if (doc.hasField("valid")) {
      publisher.publish(doc);
    } else {
      // Log invalid message
      log.warn("Skipping invalid message at offset {}", record.offset());
    }
  }
}

Consumer Properties

The enhanceConsumerProperties() method configures the Kafka consumer:
public void enhanceConsumerProperties(Properties props, Config config) {
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  
  String deserializerClass = config.hasPath("kafka.documentDeserializer")
      ? config.getString("kafka.documentDeserializer")
      : KafkaConnectorDefaultDeserializer.class.getName();
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerClass);
  
  if (idField != null) {
    props.put("idField", idField);
  }
  props.put("docIdPrefix", getDocIdPrefix());
}

Override for Custom Configuration

Extend and override to add custom consumer properties:
@Override
public void enhanceConsumerProperties(Properties props, Config config) {
  super.enhanceConsumerProperties(props, config);
  
  // Add custom properties
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  props.put("custom.deserializer.property", "value");
}

Polling Loop

The connector’s polling loop:
  1. Poll Kafka with messageTimeout
  2. If no messages:
    • If continueOnTimeout is true, continue polling
    • If continueOnTimeout is false, stop
  3. For each message, call handleMessage()
  4. If maxMessages is reached, stop
  5. Repeat until stopped or interrupted

Graceful Shutdown

The connector supports graceful shutdown:
  • Call close() to stop the connector
  • The polling loop exits on the next iteration
  • The consumer is woken up with wakeup()
  • The consumer is closed, committing offsets

Offset Management

The connector uses Kafka’s automatic offset commit:
  • ENABLE_AUTO_COMMIT_CONFIG is set to true
  • Offsets are committed periodically by the Kafka consumer
  • On shutdown, offsets are committed when the consumer closes

Manual Offset Seeking

When offsets is configured:
  1. Subscribe to the topic
  2. Perform an initial poll with zero timeout to assign partitions
  3. For each partition in the assignment:
    • If an offset is configured, seek to that offset
  4. Begin normal polling
Manual offset seeking overrides the consumer group’s committed offsets. Use this carefully to avoid duplicate or missed messages.

Performance Considerations

Poll Timeout

The messageTimeout affects responsiveness vs. efficiency:
  • Short timeout (e.g., 100ms): More responsive to shutdown, more CPU overhead
  • Long timeout (e.g., 5000ms): Less CPU overhead, slower shutdown

Max Poll Interval

kafka.maxPollIntervalSecs must exceed the time to process a batch of messages:
maxPollIntervalSecs > (processing time per message) × (max.poll.records)
If processing is too slow, the consumer is removed from the group and partitions are rebalanced.

Message Batching

The Kafka consumer fetches messages in batches. Control batch size with additional consumer properties:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");

Error Handling

Deserialization Errors

If the deserializer throws an exception, the message is skipped and the consumer continues. Configure error handling in the deserializer.

Publishing Errors

handleMessage() throws ConnectorException on publishing errors, stopping the connector.

Consumer Errors

Kafka consumer exceptions (e.g., network errors, rebalancing) are thrown as ConnectorException.

Lifecycle Methods

execute(Publisher publisher)

  1. Create Kafka consumer with enhanced properties
  2. Subscribe to the topic
  3. If offsets is configured, seek to specified offsets
  4. Start polling loop:
    • Poll for messages
    • Handle each message
    • Check stop conditions
  5. Exit loop on completion, interruption, or max messages

close()

  1. Set running flag to false
  2. Wake up the consumer from blocking poll
  3. Close the consumer (commits offsets)
  4. Set consumer reference to null

Limitations

  • Only subscribes to a single topic
  • Only supports auto-commit offset management
  • Consumer group rebalancing may cause duplicate processing
  • No built-in support for exactly-once semantics
  • Message ordering is only guaranteed within a partition

Next Steps

SolrConnector

Query and index Solr collections

DatabaseConnector

Query relational databases via JDBC