Connectors read data from various sources and emit Documents to be processed by pipelines. Each connector is executed in sequence as part of a Lucille run.
Connector Structure
Connectors are defined as a list in the configuration file:
connectors: [
{
name: "connector1"
class: "com.kmwllc.lucille.connector.FileConnector"
pipeline: "pipeline1"
# connector-specific settings...
},
{
name: "connector2"
class: "com.kmwllc.lucille.connector.CSVConnector"
pipeline: "pipeline2"
# connector-specific settings...
}
]
Common Parameters
All connectors support these base parameters:
Name to assign to this connector for logging and console output
Fully qualified class name of the connector implementation
Name of the pipeline that will process this connector’s output
FileConnector
Traverses local and cloud storage (S3, GCP, Azure) and publishes a Document for each file encountered.
Basic Configuration
connectors: [
{
name: "file-ingest"
class: "com.kmwllc.lucille.connector.FileConnector"
pipeline: "file-processing"
paths: ["/path/to/files", "s3://my-bucket/prefix"]
}
]
Parameters
Paths or URIs to traverse. Supports local paths and cloud storage URIs. Note : S3 URIs must be percent-encoded. Use s3://test/folder%20with%20spaces for paths with special characters.
Filter Options
Control which files are processed:
Regex patterns to include files filterOptions: {
includes: [".*\\.pdf$", ".*\\.docx$"]
}
Regex patterns to exclude files filterOptions: {
excludes: [".*\\.DS_Store$", ".*\\.tmp$"]
}
filterOptions.lastModifiedCutoff
Include only files modified within this period filterOptions: {
lastModifiedCutoff: "3d" # Last 3 days
}
filterOptions.lastPublishedCutoff
Include only files not published within this period. Requires state configuration. filterOptions: {
lastPublishedCutoff: "6h" # Not published in last 6 hours
}
File Options
fileOptions.getFileContent
Fetch file content during traversal
fileOptions.handleArchivedFiles
Process files inside archive containers (zip, tar, etc.)
fileOptions.handleCompressedFiles
Process compressed files (gzip, etc.)
fileOptions.moveToAfterProcessing
URI to move files after successful processing. Only works with single input path. fileOptions: {
moveToAfterProcessing: "/path/to/processed/"
}
fileOptions.moveToErrorFolder
URI to move files if processing fails. Only works with single input path.
State Management
Track file publish times to avoid reprocessing:
Embedded Database
Derby Database
External Database
state {
# Uses H2 embedded database by default
driver: "org.h2.Driver"
connectionString: "jdbc:h2:./state/my-connector"
jdbcUser: ""
jdbcPassword: ""
tableName: "file_state"
performDeletions: true
pathLength: 200
}
state {
driver: "org.apache.derby.iapi.jdbc.AutoloadedDriver"
connectionString: "jdbc:derby:lucille_state;"
jdbcUser: ""
jdbcPassword: ""
# Disable automatic deletion tracking
performDeletions: false
}
state {
driver: "org.postgresql.Driver"
connectionString: ${DATABASE_URL}
jdbcUser: ${DB_USER}
jdbcPassword: ${DB_PASSWORD}
tableName: "lucille_file_state"
}
state.driver
string
default: "org.h2.Driver"
JDBC driver class
JDBC connection string. Defaults to jdbc:h2:./state/{CONNECTOR_NAME} if omitted.
Table name for state tracking. Defaults to connector name.
Delete rows for files removed from storage
Maximum length for stored file paths when Lucille creates the table
Cloud Storage Configuration
AWS S3
Azure Blob
Google Cloud
s3 {
# Use default credentials if not specified
accessKeyId: ${?AWS_ACCESS_KEY_ID}
secretAccessKey: ${?AWS_SECRET_ACCESS_KEY}
region: ${?AWS_DEFAULT_REGION}
# Max file references in memory
maxNumOfPages: 100
}
AWS access key ID. Omit to use default credentials.
AWS secret access key. Omit to use default credentials.
Maximum number of file references to hold in memory
# Option 1: Connection String
azure {
connectionString: ${?AZURE_CONNECTION_STRING}
maxNumOfPages: 100
}
# Option 2: Account Name and Key
azure {
accountName: ${?AZURE_ACCOUNT_NAME}
accountKey: ${?AZURE_ACCOUNT_KEY}
maxNumOfPages: 100
}
Azure connection string (alternative to accountName/accountKey)
Maximum number of file references to hold in memory
gcp {
pathToServiceKey: ${?GCP_SERVICE_KEY_PATH}
maxNumOfPages: 100
}
Path to the Google Cloud service key JSON file
Maximum number of file references to hold in memory
File Handlers
Configure custom handling for specific file types:
fileOptions: {
csv {
docIdPrefix: "csvHandled-"
filenameField: "file_name"
}
}
Map of file type to handler configuration. Supports csv, json, xml. Supply a class property to override the default handler.
Complete Example
Basic File Connector
S3 with State Tracking
CSV File Handler
connectors: [
{
name: "local-files"
class: "com.kmwllc.lucille.connector.FileConnector"
pipeline: "file-pipeline"
paths: ["/data/documents"]
filterOptions: {
excludes: [".*\\.DS_Store$"]
lastModifiedCutoff: "7d"
}
}
]
CSVConnector (Deprecated)
CSVConnector is deprecated. Use FileConnector with CSVFileHandler instead.
Produces documents from a CSV file.
connectors: [
{
name: "csv-legacy"
class: "com.kmwllc.lucille.connector.CSVConnector"
pipeline: "csv-pipeline"
path: "/path/to/file.csv"
# Optional parameters
lineNumberField: "row_number"
filenameField: "filename"
filePathField: "filepath"
idField: "doc_id"
separatorChar: ","
useTabs: false
interpretQuotes: true
lowercaseFields: true
# Move after processing
moveToAfterProcessing: "/processed/"
moveToErrorFolder: "/errors/"
}
]
SequenceConnector
Generates a sequence of numbered documents for testing:
connectors: [
{
name: "test-sequence"
class: "com.kmwllc.lucille.connector.SequenceConnector"
pipeline: "test-pipeline"
numDocs: 1000
}
]
JSONConnector
Reads documents from JSON files:
connectors: [
{
name: "json-ingest"
class: "com.kmwllc.lucille.connector.JSONConnector"
pipeline: "json-pipeline"
path: "/path/to/data.json"
}
]
SolrConnector
Queries Solr and creates documents from results:
connectors: [
{
name: "solr-source"
class: "com.kmwllc.lucille.connector.SolrConnector"
pipeline: "migration-pipeline"
solrUrl: "http://localhost:8983/solr/source-collection"
query: "*:*"
rows: 100
}
]
KafkaConnector
Consumes documents from a Kafka topic:
connectors: [
{
name: "kafka-source"
class: "com.kmwllc.lucille.connector.KafkaConnector"
pipeline: "streaming-pipeline"
topic: "input-topic"
}
]
Next Steps
Build Pipelines Create processing stages for documents
File Handlers Learn about specialized file processing