Skip to main content

Overview

The DatabaseConnector executes SQL SELECT statements against relational databases and publishes each result row as a Document. It supports result set joining, custom ID fields, column filtering, and pre/post SQL execution. Class: com.kmwllc.lucille.connector.jdbc.DatabaseConnector
Extends: AbstractConnector

Key Features

  • Execute SQL queries against any JDBC-compatible database
  • Publish result rows as Lucille Documents
  • Join multiple result sets with ordered queries
  • Execute pre and post SQL statements (DDL, DML)
  • Control fetch size for memory efficiency
  • Ignore specific columns from indexing
  • Connection retry logic with configurable backoff
  • Support for child documents via SQL joins

Class Signature

package com.kmwllc.lucille.connector.jdbc;

public class DatabaseConnector extends AbstractConnector {
  public DatabaseConnector(Config config);
  
  @Override
  public void execute(Publisher publisher) throws ConnectorException;
  
  @Override
  public void close() throws ConnectorException;
  
  public boolean isClosed();
}

Configuration Parameters

Required Parameters

driver
String
required
JDBC driver class name. The driver JAR must be on the classpath.Common values:
  • MySQL: com.mysql.cj.jdbc.Driver
  • PostgreSQL: org.postgresql.Driver
  • Oracle: oracle.jdbc.OracleDriver
  • SQL Server: com.microsoft.sqlserver.jdbc.SQLServerDriver
  • H2: org.h2.Driver
connectionString
String
required
JDBC connection URL for the database.Examples:
  • MySQL: jdbc:mysql://localhost:3306/mydb
  • PostgreSQL: jdbc:postgresql://localhost:5432/mydb
  • Oracle: jdbc:oracle:thin:@localhost:1521:orcl
  • SQL Server: jdbc:sqlserver://localhost:1433;databaseName=mydb
  • H2: jdbc:h2:./data/testdb
jdbcUser
String
required
Database username for authentication.
jdbcPassword
String
required
Database password for authentication.
sql
String
required
Primary SQL SELECT statement to execute. This query retrieves the main result set.Important: When using otherSQLs for joins, this query must be ordered by the join key.Example:
SELECT id, title, author, publication_date FROM articles WHERE published = true ORDER BY id
idField
String
required
Column name to use as the Document ID. This field from the result set becomes the Lucille document’s unique identifier.Example: "id", "article_id", "uuid"

Optional Parameters

fetchSize
Integer
Number of rows to fetch at a time from the database. Controls memory usage and network traffic.MySQL: Set to Integer.MIN_VALUE to enable streaming (prevents buffering entire result set in memory)H2: Streaming not supported
Behavior varies by JDBC driver. Many drivers default to 0 (fetch all rows at once).
preSQL
String
SQL statement to execute before the main query. Should not return a result set.Use cases: Setup, temporary tables, parameter settingExamples:
CREATE TEMPORARY TABLE temp_ids AS SELECT id FROM articles WHERE status = 'pending'
UPDATE processing_status SET started_at = NOW() WHERE batch_id = 123
postSQL
String
SQL statement to execute after the main query completes. Should not return a result set.Use cases: Cleanup, status updates, loggingExamples:
UPDATE processing_status SET completed_at = NOW() WHERE batch_id = 123
DROP TEMPORARY TABLE IF EXISTS temp_ids
otherSQLs
List<String>
List of additional SQL queries for joining related data. Each query creates child documents.Requirements:
  • Must be used with otherJoinFields
  • All queries (main sql and otherSQLs) must be ordered by their join key
  • Only integer join keys are supported
Example:
otherSQLs: [
  "SELECT article_id, tag_name FROM article_tags ORDER BY article_id",
  "SELECT article_id, comment_text, author FROM comments ORDER BY article_id"
]
otherJoinFields
List<String>
Join field names for each query in otherSQLs. Required if otherSQLs is provided.The order must match the otherSQLs list. Each field is the column name used to join with the primary result set.Example:
otherJoinFields: ["article_id", "article_id"]
ignoreColumns
List<String>
List of column names to exclude from the published Documents.Column names are matched case-insensitively.Example: ["internal_id", "deleted_flag", "password_hash"]
connectionRetries
Integer
default:"1"
Number of times to retry connecting to the database if the initial connection fails.
connectionRetryPause
Integer
default:"10000"
Duration in milliseconds to wait between connection retry attempts.

Document Fields

For each row in the result set, the connector creates a Document with:
  • ID: Value from the idField column (with docIdPrefix applied)
  • Fields: All result columns (except ID field and ignored columns)
  • Field names: Lowercased column names
  • Child documents: Flattened rows from otherSQLs (if configured)
Field names that match Lucille’s reserved fields will cause an exception unless added to ignoreColumns.Reserved fields: id, run_id, parent_id, children

Configuration Examples

Basic MySQL Query

connector: {
  name: "mysql-articles"
  class: "com.kmwllc.lucille.connector.jdbc.DatabaseConnector"
  pipeline: "content-pipeline"
  
  driver: "com.mysql.cj.jdbc.Driver"
  connectionString: "jdbc:mysql://localhost:3306/content_db"
  jdbcUser: "lucille_user"
  jdbcPassword: "secret123"
  
  sql: "SELECT id, title, body, author, created_at FROM articles WHERE published = true"
  idField: "id"
  
  fetchSize: -2147483648  # Integer.MIN_VALUE for MySQL streaming
}

PostgreSQL with Pre/Post SQL

connector: {
  name: "postgres-products"
  class: "com.kmwllc.lucille.connector.jdbc.DatabaseConnector"
  pipeline: "product-pipeline"
  
  driver: "org.postgresql.Driver"
  connectionString: "jdbc:postgresql://db.example.com:5432/products"
  jdbcUser: "app_user"
  jdbcPassword: "secure_password"
  
  preSQL: "CREATE TEMP TABLE active_products AS SELECT product_id FROM products WHERE status = 'active'"
  sql: "SELECT p.product_id, p.name, p.description, p.price FROM products p JOIN active_products a ON p.product_id = a.product_id ORDER BY p.product_id"
  postSQL: "DROP TABLE IF EXISTS active_products"
  
  idField: "product_id"
  ignoreColumns: ["internal_cost", "supplier_id"]
}

SQL Server with Joins (Child Documents)

connector: {
  name: "sqlserver-orders"
  class: "com.kmwllc.lucille.connector.jdbc.DatabaseConnector"
  pipeline: "order-pipeline"
  
  driver: "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  connectionString: "jdbc:sqlserver://localhost:1433;databaseName=OrdersDB"
  jdbcUser: "sa"
  jdbcPassword: "Admin123"
  
  # Main query - must be ordered by join key
  sql: "SELECT order_id, customer_name, order_date, total_amount FROM orders WHERE order_date > '2024-01-01' ORDER BY order_id"
  idField: "order_id"
  
  # Join queries - must be ordered by the same key
  otherSQLs: [
    "SELECT order_id, product_name, quantity, price FROM order_items ORDER BY order_id",
    "SELECT order_id, status, timestamp, notes FROM order_status_history ORDER BY order_id"
  ]
  otherJoinFields: ["order_id", "order_id"]
  
  fetchSize: 1000
}

Connection Retry Configuration

connector: {
  name: "resilient-connector"
  class: "com.kmwllc.lucille.connector.jdbc.DatabaseConnector"
  pipeline: "data-pipeline"
  
  driver: "org.postgresql.Driver"
  connectionString: "jdbc:postgresql://db.example.com:5432/mydb"
  jdbcUser: "app"
  jdbcPassword: "pass"
  
  sql: "SELECT * FROM data"
  idField: "id"
  
  connectionRetries: 5
  connectionRetryPause: 5000  # 5 seconds between retries
}

SQL Joins and Child Documents

The connector supports joining multiple result sets to create Documents with child documents:

How It Works

  1. Execute the main sql query
  2. Execute all otherSQLs queries in parallel
  3. For each main result row:
    • Create a parent Document
    • Find matching rows in other result sets using join keys
    • Create child Documents for each matching row
    • Add children to parent using doc.addChild()

Requirements

  • All queries (main and others) must be ordered by their join key
  • Join keys must be integers (or types comparable as integers)
  • The number of otherJoinFields must match the number of otherSQLs

Example Document Structure

Given this configuration:
sql: "SELECT order_id, customer FROM orders ORDER BY order_id"
otherSQLs: ["SELECT order_id, product FROM items ORDER BY order_id"]
otherJoinFields: ["order_id"]
And these database rows: orders table:
order_id | customer
1        | Alice
2        | Bob
items table:
order_id | product
1        | Laptop
1        | Mouse
2        | Keyboard
The connector produces Documents like:
{
  "id": "1",
  "customer": "Alice",
  "children": [
    {"id": "0", "product": "Laptop"},
    {"id": "1", "product": "Mouse"}
  ]
}
{
  "id": "2",
  "customer": "Bob",
  "children": [
    {"id": "0", "product": "Keyboard"}
  ]
}

Data Type Handling

The connector uses JDBCUtils.parseResultToDoc() to convert SQL types to Lucille field values:
  • NULL values: Not added to the Document
  • Strings: Added as String fields
  • Numbers: Added as appropriate numeric types (Integer, Long, Double, etc.)
  • Dates/Timestamps: Converted to Instant
  • Binary data: Added as byte arrays
  • Unsupported types: Skipped with a warning

Performance Considerations

Fetch Size

The fetchSize parameter is critical for large result sets:
Set fetchSize to Integer.MIN_VALUE (-2147483648) to enable streaming:
fetchSize: -2147483648
This prevents buffering the entire result set in memory.

Connection Management

  • Connections are created on-demand and closed in close()
  • Multiple connections are created when using otherSQLs (one per query)
  • Use connection pooling at the database level for better resource management

Query Optimization

  • Add appropriate indexes on columns used in WHERE clauses
  • Use ignoreColumns to reduce data transfer
  • Order by indexed columns when using joins
  • Consider partitioning large queries across multiple connector instances

Error Handling

Connection Failures

The connector retries connection failures based on connectionRetries and connectionRetryPause:
Attempt 1: Failed
Wait connectionRetryPause ms
Attempt 2: Failed
Wait connectionRetryPause ms
...
Attempt N: Success or throw exception

Query Failures

SQL exceptions during query execution immediately throw ConnectorException. No retry logic is applied to query execution.

Data Conversion Errors

If a column cannot be converted to a Lucille field value, a warning is logged and that field is skipped. The Document is still published.

Lifecycle Methods

execute(Publisher publisher)

  1. Create database connection with retries
  2. Execute preSQL if configured
  3. Execute main sql query
  4. Execute all otherSQLs queries
  5. Iterate through main result set:
    • Create Document from row
    • Match and attach child documents from other result sets
    • Publish Document
  6. Execute postSQL if configured
  7. Close result sets and statements (connections closed in close())

close()

Closes all database connections created during execution. Always called, even if exceptions occur.

Limitations

  • Join keys must be integers (or comparable as integers)
  • Only supports forward-only, read-only result sets
  • No support for stored procedures with multiple result sets
  • No support for batch inserts back to database
  • Column names must not conflict with Lucille reserved fields

Next Steps

KafkaConnector

Read messages from Kafka topics

FileConnector

Traverse local and cloud storage