Skip to main content

Overview

The FileConnector traverses local and cloud storage from one or more roots and publishes a Document for each file encountered. It supports include/exclude regex filters, recency cutoffs, optional content fetching, archive/compressed file handling, file moves after processing, and optional JDBC-backed state to avoid republishing recently handled files. Class: com.kmwllc.lucille.connector.FileConnector
Extends: AbstractConnector

Key Features

  • Traverse local file systems, AWS S3, Google Cloud Storage, and Azure Blob Storage
  • Regex-based file filtering (include/exclude patterns)
  • Recency filters based on modification time and last published time
  • Archive file handling (ZIP, TAR, etc.)
  • Compressed file handling (GZIP, BZIP2, etc.)
  • Move files after processing or on error
  • JDBC-backed state management to prevent duplicate processing
  • Per-file-type handlers (CSV, JSON, XML)

Class Signature

package com.kmwllc.lucille.connector;

public class FileConnector extends AbstractConnector {
  public FileConnector(Config config) throws ConnectorException;
  
  @Override
  public void execute(Publisher publisher) throws ConnectorException;
}

Configuration Parameters

Required Parameters

paths
List<String>
required
Paths or URIs to traverse. Supports local paths and cloud storage URIs.Cloud URI Formats:
  • S3: s3://bucket-name/path/to/folder/ (must be percent-encoded for spaces/special characters)
  • GCP: gs://bucket-name/path/to/folder/
  • Azure: https://account.blob.core.windows.net/container/path/
  • Local: /path/to/local/folder or file:///path/to/local/folder
S3 URIs must be percent-encoded. Use s3://test/folder%20with%20spaces instead of s3://test/folder with spaces.

Filter Options

filterOptions.includes
List<String>
List of regex patterns to include files. Only files matching at least one pattern are processed.Example: [".*\\.pdf$", ".*\\.docx$"] - Only process PDF and DOCX files
filterOptions.excludes
List<String>
List of regex patterns to exclude files. Files matching any pattern are skipped.Example: [".*\\.tmp$", ".*/temp/.*"] - Skip temporary files and files in temp folders
filterOptions.lastModifiedCutoff
String
Duration string to include only files modified within this period. Uses HOCON duration format.Examples: "1h", "2d", "30m", "7d"Only files modified within the specified duration before the current time are processed.
filterOptions.lastPublishedCutoff
String
Duration string to include only files not published within this period. Requires state configuration.Examples: "1h", "2d", "30m"
This setting has no effect unless state is configured. It prevents republishing files that were recently processed.

File Handling Options

fileOptions.getFileContent
Boolean
default:"true"
Whether to fetch file content during traversal. Set to false for metadata-only indexing.
fileOptions.handleArchivedFiles
Boolean
default:"false"
Whether to process archive files (ZIP, TAR, JAR, etc.) and extract their contents.When enabled, modification/publish cutoffs apply to both the container and its entries.
fileOptions.handleCompressedFiles
Boolean
default:"false"
Whether to process compressed files (GZIP, BZIP2, XZ, etc.) and decompress their contents.
fileOptions.moveToAfterProcessing
String
URI to move files after successful processing. Only works with a single input path.Example: "s3://processed-bucket/completed/"
Cannot be used when multiple paths are specified.
fileOptions.moveToErrorFolder
String
URI to move files if processing fails. Only works with a single input path.Example: "/path/to/error/folder/"
Cannot be used when multiple paths are specified.

State Management

State tracking allows the connector to remember which files have been published and when, preventing duplicate processing across runs.
state.driver
String
default:"org.h2.Driver"
JDBC driver class for state storage.Common values: org.h2.Driver, com.mysql.cj.jdbc.Driver, org.postgresql.Driver
state.connectionString
String
default:"jdbc:h2:./state/{CONNECTOR_NAME}"
JDBC connection string. If omitted, an embedded H2 database is created at ./state/{CONNECTOR_NAME}.Examples:
  • H2: jdbc:h2:./state/my-connector
  • MySQL: jdbc:mysql://localhost:3306/lucille
  • PostgreSQL: jdbc:postgresql://localhost:5432/lucille
state.jdbcUser
String
default:""
Database username for state storage.
state.jdbcPassword
String
default:""
Database password for state storage.
state.tableName
String
default:"{connector name}"
Table name for storing state. Defaults to the connector name.
state.performDeletions
Boolean
default:"true"
Whether to delete rows for files that have been removed from storage.
state.pathLength
Integer
default:"200"
Maximum length for stored file paths when Lucille creates the state table.

Cloud Storage Options

s3.accessKeyId
String
AWS access key ID. Omit to use default credential provider chain.
s3.secretAccessKey
String
AWS secret access key. Omit to use default credential provider chain.
Both accessKeyId and secretAccessKey must be specified together or omitted together.
s3.region
String
AWS region for S3.Example: "us-east-1"
s3.maxNumOfPages
Integer
default:"100"
Maximum number of file references to hold in memory at once.

File Handlers

fileHandlers
Map<String, Map<String, Object>>
Per-type FileHandler configuration for CSV, JSON, and XML files.Supply a class to override the default handler. Otherwise, built-in handlers are used.Example:
{
  "csv": {
    "class": "com.custom.CSVHandler",
    "delimiter": ","
  },
  "json": {
    "docIdPrefix": "json-"
  }
}

Document Fields

Each published Document includes these fields:
file_path
String
Full path to the file (URI format).
file_modification_date
Instant
Last modification timestamp of the file.
file_creation_date
Instant
Creation timestamp of the file (may be null for some storage types).
file_size_bytes
Long
Size of the file in bytes.
file_content
byte[]
Raw file content (only if fileOptions.getFileContent is true).

Configuration Examples

Basic Local File Traversal

connector: {
  name: "local-files"
  class: "com.kmwllc.lucille.connector.FileConnector"
  pipeline: "main-pipeline"
  paths: ["/data/documents"]
  filterOptions: {
    includes: [".*\\.pdf$"]
  }
  fileOptions: {
    getFileContent: true
  }
}

S3 with State Management

connector: {
  name: "s3-documents"
  class: "com.kmwllc.lucille.connector.FileConnector"
  pipeline: "doc-pipeline"
  paths: ["s3://my-bucket/documents/"]
  filterOptions: {
    lastModifiedCutoff: "7d"
    lastPublishedCutoff: "1d"
  }
  state: {
    driver: "org.h2.Driver"
    connectionString: "jdbc:h2:./state/s3-docs"
  }
  s3: {
    region: "us-east-1"
  }
}

Multiple Storage Sources

connector: {
  name: "multi-source"
  class: "com.kmwllc.lucille.connector.FileConnector"
  pipeline: "ingest"
  paths: [
    "/local/path/docs",
    "s3://bucket1/folder/",
    "gs://bucket2/data/"
  ]
  filterOptions: {
    excludes: [".*\\.tmp$", ".*\\.bak$"]
  }
  s3: {
    region: "us-west-2"
  }
  gcp: {
    pathToServiceKey: "/keys/gcp-key.json"
  }
}

Archive Processing with File Moves

connector: {
  name: "archive-processor"
  class: "com.kmwllc.lucille.connector.FileConnector"
  pipeline: "archive-pipeline"
  paths: ["/incoming/archives"]
  fileOptions: {
    handleArchivedFiles: true
    handleCompressedFiles: true
    moveToAfterProcessing: "/processed/"
    moveToErrorFolder: "/errors/"
  }
}

Storage Client Architecture

The FileConnector uses pluggable StorageClient implementations:

StorageClient Interface

package com.kmwllc.lucille.connector.storageclient;

public interface StorageClient {
  void init() throws IOException;
  void shutdown() throws IOException;
  void traverse(Publisher publisher, TraversalParams params) throws Exception;
  void traverse(Publisher publisher, TraversalParams params, FileConnectorStateManager stateMgr) throws Exception;
  InputStream getFileContentStream(URI uri) throws IOException;
  void moveFile(URI filePath, URI folder) throws IOException;
}

Built-in Storage Clients

Traverses local file systems using Files.walkFileTree().URI Scheme: file (or no scheme)No configuration required.
Accesses AWS S3 using the AWS SDK v2.URI Scheme: s3Configuration: See S3 options above
Accesses Google Cloud Storage using the GCP client library.URI Scheme: gsConfiguration: See GCP options above
Accesses Azure Blob Storage using the Azure SDK.URI Scheme: https (with blob.core.windows.net authority)Configuration: See Azure options above

Archive File Handling

When handleArchivedFiles is enabled, the connector:
  1. Detects archive files (ZIP, TAR, JAR, etc.)
  2. Extracts entries from the archive
  3. Publishes a separate Document for each entry
  4. Uses the separator ! in file paths: archive.zip!internal/file.txt
Modification and publish cutoffs apply to both the archive container and individual entries.

State Management Details

When state is configured:
  • The connector creates a table (default name: connector name) with columns for file path and last published timestamp
  • Before publishing, it checks if the file was recently published (based on lastPublishedCutoff)
  • After publishing, it updates the timestamp for that file path
  • Files that are moved or renamed are always republished regardless of state
  • If performDeletions is true, rows for deleted files are removed from the state table
Traversal may be slower when state is enabled, especially for large file sets.

Performance Considerations

  • Use maxNumOfPages to control memory usage when traversing large cloud storage buckets
  • Set getFileContent: false if you only need file metadata
  • Use filterOptions.includes and filterOptions.excludes to reduce the number of files processed
  • Consider using lastModifiedCutoff for incremental ingestion
  • State management adds overhead but prevents duplicate processing

Next Steps

DatabaseConnector

Query relational databases via JDBC

Connectors Overview

Learn about the Connector lifecycle and interface