Skip to main content

Overview

The OpenSearchIndexer sends documents to OpenSearch clusters using the official Java client. It supports bulk operations, partial updates, routing, and external versioning. Java Class: com.kmwllc.lucille.indexer.OpenSearchIndexer Source: OpenSearchIndexer.java

Configuration

Basic Configuration

indexer {
  type: "opensearch"
  
  opensearch {
    url: "https://localhost:9200"
    index: "documents"
  }
}

With Authentication

indexer {
  type: "opensearch"
  
  opensearch {
    url: "https://search.example.com:9200"
    index: "my_index"
    userName: "admin"
    password: "admin_password"
  }
}

Parameters

index
string
required
Target OpenSearch index name where documents will be stored.Example: "documents", "logs-2024"
url
string
required
OpenSearch HTTP endpoint including protocol and port.Example: "https://localhost:9200", "http://opensearch.local:9200"
update
boolean
default:"false"
Use the partial update API instead of index/replace operations. When true, only specified fields are updated; existing fields are preserved.
acceptInvalidCert
boolean
default:"false"
Allow invalid TLS certificates. Only use for development/testing environments.
indexer.routingField
string
Document field that supplies the routing key. Used to control which shard stores the document.Example: "user_id", "tenant_id"
indexer.versionType
string
Versioning type when using external version control. Options:
  • External: Use external version number
  • ExternalGte: External version must be >= current version
Requires documents to be KafkaDocument instances with offset information.

Features

Index Override

Route documents to different indices dynamically:
indexer {
  indexOverrideField: "target_index"
  
  opensearch {
    index: "default_index"  # Fallback index
  }
}
Documents with a target_index field will be sent to that index instead.

Partial Updates

Use the update API to modify only specific fields:
opensearch {
  update: true
}
Behavior:
  • Update = false (default): Document is replaced entirely (index operation)
  • Update = true: Only provided fields are modified; other fields remain unchanged

Routing Control

Control document placement across shards:
indexer {
  routingField: "tenant_id"
}
All documents with the same tenant_id value will be stored on the same shard, improving query performance for tenant-specific searches.

External Versioning

Use Kafka offsets or other external version numbers:
indexer {
  versionType: "External"
}
Requires documents to be KafkaDocument instances. The Kafka offset is used as the version number, enabling exactly-once semantics.

Deletion Support

indexer {
  deletionMarkerField: "deleted"
  deletionMarkerFieldValue: "true"
}
Documents are deleted by ID using bulk delete operations.

Bulk Request Processing

The indexer optimizes operations:
  1. Batch accumulation: Documents collected up to batchSize
  2. Grouping by operation: Separate uploads from deletes
  3. Index grouping: Operations grouped by target index
  4. Conflict resolution:
    • If a document ID appears in both upload and delete, the most recent operation wins
    • Uploads remove previous delete requests
    • Deletes remove previous upload requests

Connection Validation

Startup validation ensures OpenSearch is accessible:
boolean response = client.ping().value();
If the ping fails, the pipeline will not start.

Error Handling

Failed documents are tracked with detailed error messages:
Set<Pair<Document, String>> failedDocs;
// Each pair contains the document and the error reason
Common errors:
  • Index not found
  • Mapping type errors
  • Routing value missing when required
  • Version conflict

Example Configurations

indexer {
  type: "opensearch"
  batchSize: 500
  
  opensearch {
    url: "https://localhost:9200"
    index: "documents"
    acceptInvalidCert: true  # Dev only
  }
}
indexer {
  type: "opensearch"
  routingField: "tenant_id"
  batchSize: 1000
  
  opensearch {
    url: "https://opensearch.prod.example.com:9200"
    index: "tenant_data"
    userName: "indexer_user"
    password: "${OPENSEARCH_PASSWORD}"
  }
}
indexer {
  type: "opensearch"
  deletionMarkerField: "status"
  deletionMarkerFieldValue: "deleted"
  
  opensearch {
    url: "http://localhost:9200"
    index: "products"
    update: true  # Use partial updates
  }
}
indexer {
  type: "opensearch"
  versionType: "External"
  routingField: "partition_key"
  
  opensearch {
    url: "https://opensearch:9200"
    index: "kafka_events"
  }
}

Best Practices

Setting routingField ensures all documents for a tenant are on the same shard:
  • Faster tenant-specific queries
  • Efficient deletion of tenant data
  • Better cache utilization
  • Index mode (update=false): Faster, replaces entire document
  • Update mode (update=true): Slower, preserves unspecified fields
Use update mode when you only have partial document data.
  • Default: 100 documents
  • Increase to 500-1000 for small documents
  • Decrease for large documents with many fields
  • Monitor OpenSearch heap and bulk queue
When using external versioning:
  • Ensure Kafka offsets are monotonically increasing
  • Handle version conflict errors appropriately
  • Consider using ExternalGte instead of External

Troubleshooting

  • Verify OpenSearch is running and accessible
  • Check network connectivity and firewall rules
  • Confirm correct URL and port
  • Test with: curl -k https://localhost:9200
Create the index in OpenSearch before indexing:
curl -X PUT "https://localhost:9200/documents" -H 'Content-Type: application/json' -d'
{
  "mappings": {
    "properties": {
      "title": {"type": "text"},
      "timestamp": {"type": "date"}
    }
  }
}'
  • Ensure index mapping supports your document fields
  • Use ignoreFields to exclude incompatible fields
  • Check for type mismatches (e.g., sending string to long field)
If the index has required routing:
  • Set routingField in configuration
  • Ensure all documents have the routing field
  • Check OpenSearch index settings

Child Documents

The indexer processes child documents from the children field:
List<Document> children = doc.getChildren();
Note: Child document support is currently limited. Full nested document support is planned for future releases.

See Also