Skip to main content
This guide shows how to ingest large-scale vector data from Parquet files and index to Pinecone for fast semantic search and retrieval.

Overview

This example demonstrates:
  • Reading vector embeddings from Parquet files on S3
  • Processing and filtering vector data
  • Indexing to Pinecone for similarity search
  • Configuring namespaces and metadata

Prerequisites

1

Pinecone Account

Create a free account at Pinecone and get your API key.
2

Create Pinecone Index

Create an index in the Pinecone console:
  • Name: vector
  • Dimensions: Match your embedding size (e.g., 768)
  • Metric: cosine or euclidean
3

Parquet Data

Prepare Parquet files with embeddings:
  • Store in S3 or local filesystem
  • Include pid (document ID) and values (embedding vector) columns

Configuration

Complete Vector Ingestion Config

# Vector ingestion from Parquet to Pinecone

connectors: [
  {
    class: "com.kmwllc.lucille.parquet.connector.ParquetConnector",
    
    # Path to Parquet files
    pathToStorage: ${?PARQUET_PATH},
    
    # Filesystem URI
    fsUri: ${?FS_URI}
    
    # S3 credentials (if using S3)
    s3Key: ${?S3_KEY}
    s3Secret: ${?S3_SECRET}
    
    # ID field in Parquet
    idField: "pid"
    
    name: "connector1"
    pipeline: "pipeline1"
  }
]

pipelines: [
  {
    name: "pipeline1",
    stages: [
      {
        name: "removeEmptyFields",
        class: "com.kmwllc.lucille.stage.RemoveEmptyFields"
      }
    ]
  }
]

indexer {
  batchTimeout: 6000
  batchSize: 500
  type: "pinecone"
  class: "com.kmwllc.lucille.pinecone.indexer.PineconeIndexer"
}

pinecone {
  index: ${?PINECONE_INDEX}
  apiKey: ${?PINECONE_API_KEY}
  
  # Namespace mapping
  namespaces: {
    "kmw_msmarco": "namespace"
  }
  
  # Field containing embedding vector
  defaultEmbeddingField: "values"
  
  # Metadata fields to include
  metadataFields: [
    "docid",
    "title",
    "text"
  ]
}

worker {
  threads: 2
  # maxRetries: 2  # Uncomment if using Zookeeper
}

publisher {
  queueCapacity: 100
}

log {
  seconds: 30  # Status update frequency
}

Environment Variables

# Parquet source
export PARQUET_PATH="bucket/parquet-embeddings"
export FS_URI="s3://"
export S3_KEY="your-s3-access-key"
export S3_SECRET="your-s3-secret-key"

# Pinecone configuration
export PINECONE_INDEX="vector"
export PINECONE_API_KEY="your-pinecone-api-key"
For local Parquet files, use fsUri: "file://" and omit S3 credentials.

Understanding the Configuration

ParquetConnector

Reads Parquet files efficiently:
{
  class: "com.kmwllc.lucille.parquet.connector.ParquetConnector",
  pathToStorage: "bucket/parquet-data",
  fsUri: "s3://"               # Or "file://" for local
  idField: "pid"                # Column to use as document ID
  pipeline: "pipeline1"
}
Key features:
  • Columnar reading for efficiency
  • Supports S3, local filesystem, HDFS
  • Automatic schema detection
  • Handles nested structures

Pinecone Indexer

Indexes vectors to Pinecone:
pinecone {
  index: "vector"                    # Pinecone index name
  apiKey: "your-api-key"             # API key
  defaultEmbeddingField: "values"    # Field containing vector
  
  # Optional namespace mapping
  namespaces: {
    "dataset_name": "namespace_field"
  }
  
  # Metadata to store with vectors
  metadataFields: [
    "docid",
    "title",
    "text"
  ]
}
Namespaces allow you to partition vectors within a single index, useful for multi-tenant applications or separating datasets.

Pipeline Stages for Vector Data

Remove Empty Fields

Clean up sparse data:
{
  name: "removeEmptyFields",
  class: "com.kmwllc.lucille.stage.RemoveEmptyFields"
}

Normalize Vectors

Ensure unit length for cosine similarity:
{
  name: "normalizeVector",
  class: "com.kmwllc.lucille.stage.NormalizeVector"
  vectorField: "values"
}

Filter by Quality Score

Only index high-quality embeddings:
{
  name: "filterQuality",
  class: "com.kmwllc.lucille.stage.DropDocument"
  conditions: [
    {
      fields: ["quality_score"]
      values: ["0.1", "0.2", "0.3", "0.4"]
      operator: "must"
    }
  ]
}

Add Metadata

Enrich vectors with searchable metadata:
{
  name: "addMetadata",
  class: "com.kmwllc.lucille.stage.SetField"
  fieldName: "source"
  value: "msmarco"
},
{
  name: "addTimestamp",
  class: "com.kmwllc.lucille.stage.AddCurrentDate"
  fieldName: "indexed_at"
}

Running the Ingestion

1

Build the Project

Add Pinecone dependency to pom.xml:
<dependency>
  <groupId>com.kmwllc</groupId>
  <artifactId>lucille-pinecone</artifactId>
  <version>0.8.0-SNAPSHOT</version>
</dependency>
Build:
mvn clean package
2

Set Environment Variables

source .env  # Or export variables manually
3

Run Ingestion

./scripts/run_ingest.sh
Monitor the logs:
INFO  ParquetConnector - Reading bucket/parquet-data
INFO  Publisher - Published 10000 documents
INFO  PineconeIndexer - Indexed 10000 vectors
INFO  Runner - Completed in 45 seconds
4

Verify in Pinecone

Check your index stats in the Pinecone console or via API:
import pinecone
pinecone.init(api_key="your-api-key")
index = pinecone.Index("vector")
print(index.describe_index_stats())

Performance Tuning

Batch Size Optimization

indexer {
  batchSize: 500        # Vectors per batch (100-1000)
  batchTimeout: 6000    # Max wait time in ms
}
Pinecone recommendations:
  • Start with 100-500 vectors per batch
  • Increase for higher throughput
  • Monitor for rate limits

Worker Threads

worker {
  threads: 4            # Parallel processing threads
}
More threads = faster ingestion, but higher memory usage.

Publisher Queue

publisher {
  queueCapacity: 1000   # In-flight documents (10-10000)
}
Larger queues improve throughput but use more memory.

Querying Pinecone

After ingestion, query your vectors:
import pinecone

pinecone.init(api_key="your-api-key")
index = pinecone.Index("vector")

# Query with a vector
results = index.query(
    vector=[0.1, 0.2, ...],  # Your query embedding
    top_k=10,
    include_metadata=True,
    namespace="kmw_msmarco"
)

for match in results['matches']:
    print(f"Score: {match['score']}")
    print(f"Metadata: {match['metadata']}")

Advanced Patterns

Multiple Namespaces

Partition vectors by tenant or dataset:
pinecone {
  namespaces: {
    "tenant1": "tenant_id",    # Use tenant_id field value
    "tenant2": "tenant_id",
    "default": "namespace"     # Fallback namespace
  }
}

Hybrid Search Metadata

Store searchable text for hybrid retrieval:
metadataFields: [
  "docid",
  "title",
  "text",           # Full text for re-ranking
  "category",       # For filtering
  "timestamp"       # For temporal filtering
]
Query with filters:
results = index.query(
    vector=[0.1, 0.2, ...],
    filter={"category": {"$eq": "news"}},
    top_k=10
)

Incremental Updates

Update existing vectors:
indexer {
  type: "pinecone"
  updateMode: "upsert"  # Upsert mode (default)
}
Pinecone automatically handles updates based on document ID.

Monitoring and Metrics

Enable Detailed Metrics

runner {
  metricsLoggingLevel: "INFO"
}

log {
  seconds: 10  # Log every 10 seconds
}

Key Metrics to Watch

  • Throughput: Vectors indexed per second
  • Latency: Time per batch
  • Error rate: Failed batches
  • Memory usage: JVM heap utilization

Next Steps

Troubleshooting

Reduce batch size and increase timeout:
indexer {
  batchSize: 100
  batchTimeout: 10000
}
Or upgrade your Pinecone plan for higher limits.
Ensure your embeddings match the index dimension:
# Check your index dimension in Pinecone console
# Verify embedding size in your Parquet data
You may need to recreate the index with the correct dimension.
Verify S3 credentials and path:
aws s3 ls s3://bucket/parquet-data/
Check Parquet schema:
pip install pyarrow
python -c "import pyarrow.parquet as pq; print(pq.read_schema('file.parquet'))"
Reduce queue capacity and batch size:
publisher {
  queueCapacity: 50
}
indexer {
  batchSize: 100
}
Or increase JVM memory:
java -Xmx8g -Dconfig.file=conf/parquet-pinecone.conf ...