This guide shows how to ingest large-scale vector data from Parquet files and index to Pinecone for fast semantic search and retrieval.
Overview
This example demonstrates:
Reading vector embeddings from Parquet files on S3
Processing and filtering vector data
Indexing to Pinecone for similarity search
Configuring namespaces and metadata
Prerequisites
Pinecone Account
Create a free account at Pinecone and get your API key.
Create Pinecone Index
Create an index in the Pinecone console:
Name : vector
Dimensions : Match your embedding size (e.g., 768)
Metric : cosine or euclidean
Parquet Data
Prepare Parquet files with embeddings:
Store in S3 or local filesystem
Include pid (document ID) and values (embedding vector) columns
Configuration
Complete Vector Ingestion Config
conf/parquet-pinecone.conf
# Vector ingestion from Parquet to Pinecone
connectors: [
{
class: "com.kmwllc.lucille.parquet.connector.ParquetConnector",
# Path to Parquet files
pathToStorage: ${?PARQUET_PATH},
# Filesystem URI
fsUri: ${?FS_URI}
# S3 credentials (if using S3)
s3Key: ${?S3_KEY}
s3Secret: ${?S3_SECRET}
# ID field in Parquet
idField: "pid"
name: "connector1"
pipeline: "pipeline1"
}
]
pipelines: [
{
name: "pipeline1",
stages: [
{
name: "removeEmptyFields",
class: "com.kmwllc.lucille.stage.RemoveEmptyFields"
}
]
}
]
indexer {
batchTimeout: 6000
batchSize: 500
type: "pinecone"
class: "com.kmwllc.lucille.pinecone.indexer.PineconeIndexer"
}
pinecone {
index: ${?PINECONE_INDEX}
apiKey: ${?PINECONE_API_KEY}
# Namespace mapping
namespaces: {
"kmw_msmarco": "namespace"
}
# Field containing embedding vector
defaultEmbeddingField: "values"
# Metadata fields to include
metadataFields: [
"docid",
"title",
"text"
]
}
worker {
threads: 2
# maxRetries: 2 # Uncomment if using Zookeeper
}
publisher {
queueCapacity: 100
}
log {
seconds: 30 # Status update frequency
}
Environment Variables
# Parquet source
export PARQUET_PATH = "bucket/parquet-embeddings"
export FS_URI = "s3://"
export S3_KEY = "your-s3-access-key"
export S3_SECRET = "your-s3-secret-key"
# Pinecone configuration
export PINECONE_INDEX = "vector"
export PINECONE_API_KEY = "your-pinecone-api-key"
For local Parquet files, use fsUri: "file://" and omit S3 credentials.
Understanding the Configuration
ParquetConnector
Reads Parquet files efficiently:
{
class: "com.kmwllc.lucille.parquet.connector.ParquetConnector",
pathToStorage: "bucket/parquet-data",
fsUri: "s3://" # Or "file://" for local
idField: "pid" # Column to use as document ID
pipeline: "pipeline1"
}
Key features :
Columnar reading for efficiency
Supports S3, local filesystem, HDFS
Automatic schema detection
Handles nested structures
Pinecone Indexer
Indexes vectors to Pinecone:
pinecone {
index: "vector" # Pinecone index name
apiKey: "your-api-key" # API key
defaultEmbeddingField: "values" # Field containing vector
# Optional namespace mapping
namespaces: {
"dataset_name": "namespace_field"
}
# Metadata to store with vectors
metadataFields: [
"docid",
"title",
"text"
]
}
Namespaces allow you to partition vectors within a single index, useful for multi-tenant applications or separating datasets.
Pipeline Stages for Vector Data
Remove Empty Fields
Clean up sparse data:
{
name: "removeEmptyFields",
class: "com.kmwllc.lucille.stage.RemoveEmptyFields"
}
Normalize Vectors
Ensure unit length for cosine similarity:
{
name: "normalizeVector",
class: "com.kmwllc.lucille.stage.NormalizeVector"
vectorField: "values"
}
Filter by Quality Score
Only index high-quality embeddings:
{
name: "filterQuality",
class: "com.kmwllc.lucille.stage.DropDocument"
conditions: [
{
fields: ["quality_score"]
values: ["0.1", "0.2", "0.3", "0.4"]
operator: "must"
}
]
}
Enrich vectors with searchable metadata:
{
name: "addMetadata",
class: "com.kmwllc.lucille.stage.SetField"
fieldName: "source"
value: "msmarco"
},
{
name: "addTimestamp",
class: "com.kmwllc.lucille.stage.AddCurrentDate"
fieldName: "indexed_at"
}
Running the Ingestion
Build the Project
Add Pinecone dependency to pom.xml: < dependency >
< groupId > com.kmwllc </ groupId >
< artifactId > lucille-pinecone </ artifactId >
< version > 0.8.0-SNAPSHOT </ version >
</ dependency >
Build:
Set Environment Variables
source .env # Or export variables manually
Run Ingestion
Monitor the logs: INFO ParquetConnector - Reading bucket/parquet-data
INFO Publisher - Published 10000 documents
INFO PineconeIndexer - Indexed 10000 vectors
INFO Runner - Completed in 45 seconds
Verify in Pinecone
Check your index stats in the Pinecone console or via API: import pinecone
pinecone.init( api_key = "your-api-key" )
index = pinecone.Index( "vector" )
print (index.describe_index_stats())
Batch Size Optimization
indexer {
batchSize: 500 # Vectors per batch (100-1000)
batchTimeout: 6000 # Max wait time in ms
}
Pinecone recommendations :
Start with 100-500 vectors per batch
Increase for higher throughput
Monitor for rate limits
Worker Threads
worker {
threads: 4 # Parallel processing threads
}
More threads = faster ingestion, but higher memory usage.
Publisher Queue
publisher {
queueCapacity: 1000 # In-flight documents (10-10000)
}
Larger queues improve throughput but use more memory.
Querying Pinecone
After ingestion, query your vectors:
import pinecone
pinecone.init( api_key = "your-api-key" )
index = pinecone.Index( "vector" )
# Query with a vector
results = index.query(
vector = [ 0.1 , 0.2 , ... ], # Your query embedding
top_k = 10 ,
include_metadata = True ,
namespace = "kmw_msmarco"
)
for match in results[ 'matches' ]:
print ( f "Score: { match[ 'score' ] } " )
print ( f "Metadata: { match[ 'metadata' ] } " )
Advanced Patterns
Multiple Namespaces
Partition vectors by tenant or dataset:
pinecone {
namespaces: {
"tenant1": "tenant_id", # Use tenant_id field value
"tenant2": "tenant_id",
"default": "namespace" # Fallback namespace
}
}
Store searchable text for hybrid retrieval:
metadataFields: [
"docid",
"title",
"text", # Full text for re-ranking
"category", # For filtering
"timestamp" # For temporal filtering
]
Query with filters:
results = index.query(
vector = [ 0.1 , 0.2 , ... ],
filter = { "category" : { "$eq" : "news" }},
top_k = 10
)
Incremental Updates
Update existing vectors:
indexer {
type: "pinecone"
updateMode: "upsert" # Upsert mode (default)
}
Pinecone automatically handles updates based on document ID.
Monitoring and Metrics
Enable Detailed Metrics
runner {
metricsLoggingLevel: "INFO"
}
log {
seconds: 10 # Log every 10 seconds
}
Key Metrics to Watch
Throughput : Vectors indexed per second
Latency : Time per batch
Error rate : Failed batches
Memory usage : JVM heap utilization
Next Steps
Troubleshooting
Reduce batch size and increase timeout: indexer {
batchSize: 100
batchTimeout: 10000
}
Or upgrade your Pinecone plan for higher limits.
Ensure your embeddings match the index dimension: # Check your index dimension in Pinecone console
# Verify embedding size in your Parquet data
You may need to recreate the index with the correct dimension.
Verify S3 credentials and path: aws s3 ls s3://bucket/parquet-data/
Check Parquet schema: pip install pyarrow
python -c "import pyarrow.parquet as pq; print(pq.read_schema('file.parquet'))"
Reduce queue capacity and batch size: publisher {
queueCapacity: 50
}
indexer {
batchSize: 100
}
Or increase JVM memory: java -Xmx8g -Dconfig.file=conf/parquet-pinecone.conf ...