Overview
TheKafkaConnector reads messages from a Kafka topic and publishes each message as a Lucille Document. It supports configurable offset management, custom deserializers, message limits, and timeout handling.
Class: com.kmwllc.lucille.connector.KafkaConnectorExtends:
AbstractConnector
Key Features
- Read messages from Kafka topics
- Custom starting offsets per partition
- Configurable message limits
- Custom document deserializers
- Timeout and polling configuration
- Automatic offset commit
- Graceful shutdown
- Extensible for custom message handling
Class Signature
Configuration Parameters
Required Parameters
Comma-separated list of Kafka broker addresses.Example:
"localhost:9092" or "broker1:9092,broker2:9092,broker3:9092"Kafka topic to read messages from.Example:
"documents-topic"Kafka consumer group ID. Multiple consumers with the same group ID share partition processing.Example:
"lucille-consumer-group"Unique identifier for this Kafka consumer client. Used for logging and metrics.Example:
"lucille-connector-1"Maximum time in seconds between poll() calls before the consumer is considered dead.Example:
300 (5 minutes)If document processing takes longer than this value, increase it to prevent rebalancing.
Optional Parameters
Field name in the Kafka message to use as the Lucille Document ID.If not specified, the deserializer determines the ID (typically from a message field or generated).Example:
"documentId", "messageKey"Map of partition numbers to initial offsets. Use this to start reading from specific positions.Example:This starts reading:
- Partition 0 at offset 1000
- Partition 1 at offset 2500
- Partition 2 at offset 3000
Maximum number of messages to process before stopping the connector.Useful for testing or processing finite batches.Example:
1000Timeout in milliseconds to wait when polling Kafka for new messages.Example:
1000 (1 second)Whether to continue polling after a timeout with no messages.
true: Keep polling indefinitely untilmaxMessagesis reached or connector is closedfalse: Stop after first timeout with no messages
true: Continuous streaming ingestionfalse: Process available messages then stop
Fully qualified class name of a custom deserializer for Kafka message values.Must implement Kafka’s
Deserializer<Document> interface.Default: com.kmwllc.lucille.connector.KafkaConnectorDefaultDeserializerExample: "com.example.CustomDocumentDeserializer"Document Deserialization
The connector uses a deserializer to convert Kafka message values to Lucille Documents.Default Deserializer
The defaultKafkaConnectorDefaultDeserializer expects messages in a specific format that can be deserialized to Documents. The exact format depends on how Documents are serialized when producing to Kafka.
Custom Deserializer
To use a custom deserializer:- Implement
org.apache.kafka.common.serialization.Deserializer<Document> - Configure with
kafka.documentDeserializer - Access configuration properties in
configure():idField: If configured, use this field as the Document IDdocIdPrefix: Prefix to add to Document IDs
Configuration Examples
Basic Kafka Consumer
Process Finite Batch
Start from Specific Offsets
Custom Deserializer
Message Handling
ThehandleMessage() method processes each Kafka consumer record. By default, it publishes the deserialized Document:
Custom Message Handling
ExtendKafkaConnector and override handleMessage() for custom processing:
Consumer Properties
TheenhanceConsumerProperties() method configures the Kafka consumer:
Override for Custom Configuration
Extend and override to add custom consumer properties:Polling Loop
The connector’s polling loop:- Poll Kafka with
messageTimeout - If no messages:
- If
continueOnTimeoutis true, continue polling - If
continueOnTimeoutis false, stop
- If
- For each message, call
handleMessage() - If
maxMessagesis reached, stop - Repeat until stopped or interrupted
Graceful Shutdown
The connector supports graceful shutdown:- Call
close()to stop the connector - The polling loop exits on the next iteration
- The consumer is woken up with
wakeup() - The consumer is closed, committing offsets
Offset Management
The connector uses Kafka’s automatic offset commit:ENABLE_AUTO_COMMIT_CONFIGis set totrue- Offsets are committed periodically by the Kafka consumer
- On shutdown, offsets are committed when the consumer closes
Manual Offset Seeking
Whenoffsets is configured:
- Subscribe to the topic
- Perform an initial poll with zero timeout to assign partitions
- For each partition in the assignment:
- If an offset is configured, seek to that offset
- Begin normal polling
Performance Considerations
Poll Timeout
ThemessageTimeout affects responsiveness vs. efficiency:
- Short timeout (e.g., 100ms): More responsive to shutdown, more CPU overhead
- Long timeout (e.g., 5000ms): Less CPU overhead, slower shutdown
Max Poll Interval
kafka.maxPollIntervalSecs must exceed the time to process a batch of messages:
Message Batching
The Kafka consumer fetches messages in batches. Control batch size with additional consumer properties:Error Handling
Deserialization Errors
If the deserializer throws an exception, the message is skipped and the consumer continues. Configure error handling in the deserializer.Publishing Errors
handleMessage() throws ConnectorException on publishing errors, stopping the connector.
Consumer Errors
Kafka consumer exceptions (e.g., network errors, rebalancing) are thrown asConnectorException.
Lifecycle Methods
execute(Publisher publisher)
- Create Kafka consumer with enhanced properties
- Subscribe to the topic
- If
offsetsis configured, seek to specified offsets - Start polling loop:
- Poll for messages
- Handle each message
- Check stop conditions
- Exit loop on completion, interruption, or max messages
close()
- Set
runningflag to false - Wake up the consumer from blocking poll
- Close the consumer (commits offsets)
- Set consumer reference to null
Limitations
- Only subscribes to a single topic
- Only supports auto-commit offset management
- Consumer group rebalancing may cause duplicate processing
- No built-in support for exactly-once semantics
- Message ordering is only guaranteed within a partition
Next Steps
SolrConnector
Query and index Solr collections
DatabaseConnector
Query relational databases via JDBC