Skip to main content

Overview

A Connector is the entry point for data into Lucille. Connectors connect to source systems, read data, and generate Documents that will be processed through pipelines and eventually indexed into search engines.
Connectors are responsible for data extraction only. All data transformation happens in the Pipeline through Stages.

Connector Lifecycle

Every Connector follows a well-defined four-phase lifecycle that ensures reliable data extraction:
public interface Connector extends AutoCloseable {
    void preExecute(String runId) throws ConnectorException;
    void execute(Publisher publisher) throws ConnectorException;
    void postExecute(String runId) throws ConnectorException;
    void close() throws Exception;
}

Lifecycle Phases

Performs setup operations before document generation begins.Common uses:
  • Validate connection to source system
  • Initialize resources (database connections, file handles)
  • Record start time or checkpoint
  • Verify required permissions
Example:
@Override
public void preExecute(String runId) throws ConnectorException {
    try {
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        log.info("Connected to database for run: " + runId);
    } catch (SQLException e) {
        throw new ConnectorException("Failed to connect", e);
    }
}
Important: If preExecute() throws an exception, the run fails immediately and execute() is never called.
The main execution phase where documents are generated and published.Responsibilities:
  • Read data from the source system
  • Create Document objects with appropriate IDs and fields
  • Publish documents via the provided Publisher
  • Handle pagination, batching, or streaming as needed
Example:
@Override
public void execute(Publisher publisher) throws ConnectorException {
    try (ResultSet rs = statement.executeQuery(query)) {
        while (rs.next()) {
            Document doc = Document.create(rs.getString("id"), getRunId());
            doc.setField("title", rs.getString("title"));
            doc.setField("content", rs.getString("content"));
            publisher.publish(doc);
        }
    } catch (Exception e) {
        throw new ConnectorException("Error reading data", e);
    }
}
Important: Only called if preExecute() succeeds. Exceptions cause the run to fail.
Performs cleanup or finalization after all documents have been published.Common uses:
  • Update checkpoints or high-water marks
  • Record completion time or statistics
  • Trigger downstream processes
  • Move or archive processed files
Example:
@Override
public void postExecute(String runId) throws ConnectorException {
    try {
        updateCheckpoint(runId, Instant.now());
        log.info("Processed {} documents in run: {}", docCount, runId);
    } catch (Exception e) {
        throw new ConnectorException("Failed to update checkpoint", e);
    }
}
Important: Only called if both preExecute() and execute() succeed.
Always called to release resources, even if earlier phases failed.Responsibilities:
  • Close database connections
  • Release file handles
  • Clean up temporary resources
  • Close network connections
Example:
@Override
public void close() throws Exception {
    if (connection != null) {
        connection.close();
    }
}
Important: Always called in a finally block, regardless of success or failure.

Configuration

Connectors are configured in the connectors array in your Lucille configuration:
connectors: [
    {
        name: "database-connector"
        class: "com.kmwllc.lucille.connector.jdbc.DatabaseConnector"
        pipeline: "main-pipeline"
        
        # Connector-specific configuration
        jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
        jdbcUser: "${DB_USER}"
        jdbcPassword: "${DB_PASSWORD}"
        query: "SELECT * FROM products WHERE updated_at > :lastRun"
        idField: "product_id"
    },
    {
        name: "file-connector"
        class: "com.kmwllc.lucille.connector.FileConnector"
        pipeline: "file-pipeline"
        path: "/data/documents"
        recursive: true
    }
]

Required Properties

class

Fully-qualified Java class name of the Connector implementation

name

Unique identifier for this connector (auto-generated if omitted)

pipeline

Name of the pipeline to send documents to

Optional Properties

  • docIdPrefix: String prepended to all document IDs generated by this connector
  • collapse: Whether to use a collapsing publisher (combines documents with same ID)

Common Patterns

Publishing Documents

The Publisher interface provides the mechanism for sending documents into the pipeline:
public void publish(Document doc) throws PublisherException;
Example with error handling:
for (DataRecord record : records) {
    try {
        Document doc = createDocument(record);
        publisher.publish(doc);
        publishedCount++;
    } catch (PublisherException e) {
        log.error("Failed to publish doc: {}", record.getId(), e);
        failedCount++;
        // Continue processing or throw based on requirements
    }
}

Setting Run ID

Documents should include the run ID to enable tracking and correlation:
Document doc = Document.create(id, runId);
// or
Document doc = Document.create(id);
doc.initializeRunId(runId);

Collapsing Publisher

Some connectors publish multiple documents with the same ID (e.g., one per field value). A collapsing publisher combines these into a single multi-valued document:
@Override
public boolean requiresCollapsingPublisher() {
    return true;
}
Example:
// Without collapsing:
publisher.publish(doc(id="1", tags=["java"]));
publisher.publish(doc(id="1", tags=["lucille"]));
// Results in 2 documents

// With collapsing:
publisher.publish(doc(id="1", tags=["java"]));
publisher.publish(doc(id="1", tags=["lucille"]));
// Results in 1 document with tags=["java", "lucille"]

Pagination

For large datasets, implement pagination to avoid memory issues:
int offset = 0;
int pageSize = 1000;
boolean hasMore = true;

while (hasMore) {
    List<Record> page = fetchPage(offset, pageSize);
    
    for (Record record : page) {
        Document doc = createDocument(record);
        publisher.publish(doc);
    }
    
    offset += pageSize;
    hasMore = page.size() == pageSize;
}

Checkpointing

Track what has been processed to enable incremental updates:
@Override
public void preExecute(String runId) throws ConnectorException {
    lastCheckpoint = loadCheckpoint();
    currentRunId = runId;
}

@Override
public void execute(Publisher publisher) throws ConnectorException {
    // Query for records updated after last checkpoint
    String query = "SELECT * FROM data WHERE updated_at > ?";
    // Process and publish documents...
}

@Override
public void postExecute(String runId) throws ConnectorException {
    saveCheckpoint(runId, Instant.now());
}

Built-in Connectors

Lucille provides several ready-to-use connectors:

Core Connectors

DatabaseConnector

Execute SQL queries and generate documents from result sets. Supports JDBC-compatible databases.

FileConnector

Read files from local or remote filesystems. Supports various file formats through FileHandlers.

SolrConnector

Query Solr and ingest existing documents. Useful for Solr-to-Solr migrations.

SequenceConnector

Generate a sequence of empty documents. Useful for testing and performance benchmarking.

RSSConnector

Fetch and parse RSS/Atom feeds into documents.

Plugin Connectors

ParquetConnector

Read Apache Parquet files and generate documents from rows. Available in lucille-parquet plugin.

Creating Custom Connectors

Extend AbstractConnector to create your own connector:
public class CustomConnector extends AbstractConnector {
    
    public static final Spec SPEC = SpecBuilder.connector()
        .requiredString("apiUrl", "apiKey")
        .optionalInt("pageSize")
        .build();
    
    private final String apiUrl;
    private final String apiKey;
    private final int pageSize;
    
    public CustomConnector(Config config) {
        super(config);
        this.apiUrl = config.getString("apiUrl");
        this.apiKey = config.getString("apiKey");
        this.pageSize = config.hasPath("pageSize") 
            ? config.getInt("pageSize") : 100;
    }
    
    @Override
    public void execute(Publisher publisher) throws ConnectorException {
        // Implement your logic here
        for (Record record : fetchFromApi()) {
            Document doc = Document.create(record.getId(), getRunId());
            // Populate document fields from record
            publisher.publish(doc);
        }
    }
}

Validation with Spec

All connectors must declare a public static final Spec SPEC that defines their configuration properties:
public static final Spec SPEC = SpecBuilder.connector()
    .requiredString("jdbcUrl", "query")
    .optionalString("jdbcUser", "jdbcPassword", "idField")
    .optionalInt("fetchSize")
    .optionalBoolean("autoCommit")
    .build();
The Spec is used to:
  • Validate configuration at startup
  • Generate helpful error messages for missing/invalid properties
  • Document available configuration options

Error Handling

Connector-Level Failures

Exceptions thrown from lifecycle methods cause the entire run to fail:
@Override
public void execute(Publisher publisher) throws ConnectorException {
    if (!isSourceAvailable()) {
        throw new ConnectorException("Source system unavailable");
    }
    // Continue execution...
}

Document-Level Failures

Failures publishing individual documents should typically be logged but not fail the entire connector:
for (Record record : records) {
    try {
        Document doc = createDocument(record);
        publisher.publish(doc);
    } catch (Exception e) {
        log.error("Failed to process record: {}", record.getId(), e);
        // Continue with next record
    }
}
A connector is not considered failed if individual documents encounter errors during pipeline processing or indexing. Only exceptions from connector lifecycle methods cause run failure.

Best Practices

  1. Fail Fast in preExecute: Validate connections and required resources early
  2. Use Checkpoints: Track progress to enable incremental processing
  3. Handle Large Datasets: Use pagination or streaming to avoid memory issues
  4. Set Meaningful IDs: Document IDs should be stable and meaningful
  5. Include Run ID: Always set the run ID on generated documents
  6. Log Progress: Log milestones to aid debugging and monitoring
  7. Clean Up Resources: Always close connections and file handles in close()
  8. Document Configuration: Use Spec to declare and validate all properties

Next Steps

Documents

Learn about Document structure and fields

Pipelines

Understand how documents flow through pipelines

Built-in Connectors

Explore available connector implementations

Custom Connectors

Build your own connector