"""
Qdrant Document-Level Aggregation for RAG Applications
PROBLEM STATEMENT:
================
In a RAG (Retrieval Augmented Generation) system, documents are typically chunked into smaller pieces
for embedding and storage in vector databases like Qdrant. However, when analyzing or reporting on
the data, users want to know about complete documents, not individual chunks.
BUSINESS CONTEXT:
================
- You have a large document corpus (5000+ companies) stored in Qdrant
- Each document is split into multiple chunks (e.g., "IP March 2024" → 50 chunks)
- Documents have metadata: company_name, doc_type (IP, AR, CC), doc_id
- Users select a subset of companies (e.g., 5 out of 5000)
- You need to tell users: "I analyzed 247 documents (89 IP, 94 AR, 64 CC) for your 5 companies"
TECHNICAL CHALLENGE:
==================
- Naive approach: Pull all chunks matching company filter → deduplicate in Python
- Problems: Massive data transfer, inefficient client-side processing, high memory usage
- Solution: Use Qdrant's search_groups to perform server-side deduplication by doc_id
KEY INSIGHT:
===========
search_groups with group_by="doc_id" tells Qdrant to:
1. Find all chunks matching your filter (company_name in selected companies)
2. Group chunks by document ID on the server
3. Return only 1 representative chunk per unique document
4. Transfer minimal data while preserving document-level metadata
EFFICIENCY GAINS:
================
Before: 1,000 documents × 50 chunks = 50,000 network transfers + Python deduplication
After: 1,000 documents = 1,000 network transfers + server-side grouping
Result: 50x less data transfer, orders of magnitude faster processing
"""
from collections import defaultdict
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter, FieldCondition, MatchAny
def get_document_counts_optimized(client: QdrantClient, collection_name: str, target_companies: list[str], vector_dimension: int = 1536):
"""
CORE FUNCTION: Get document-level counts by type for selected companies
PURPOSE:
--------
Given a subset of companies (e.g., 5 out of 5000), count how many unique documents
of each type (IP, AR, CC) exist for those companies. This tells users the scope
of work: "I analyzed 247 documents across your 5 companies."
HOW IT WORKS:
------------
1. Filter chunks by company_name matching target companies
2. Use search_groups with group_by="doc_id" to deduplicate chunks into documents
3. Qdrant returns 1 representative chunk per unique document (server-side grouping)
4. Count documents by doc_type to get final aggregation
EFFICIENCY:
----------
- Server-side deduplication in optimized Rust code
- Minimal data transfer (1 chunk per document, not all chunks)
- No client-side memory overhead from redundant chunks
Args:
client: QdrantClient instance
collection_name: Name of the Qdrant collection
target_companies: List of company names to filter by (e.g., ["Company1", "Company2"])
vector_dimension: Dimension of vectors in your collection (adjust to your model)
Returns:
Dict with:
- total_documents: Total unique documents found
- documents_by_type: {"IP": 89, "AR": 94, "CC": 64}
- target_companies: Echo of input companies
Example:
results = get_document_counts_optimized(client, "docs", ["AAPL", "MSFT"], 1536)
# Returns: {"total_documents": 247, "documents_by_type": {"IP": 89, "AR": 94, "CC": 64}}
"""
# Create filter for target companies
company_filter = Filter(
must=[
FieldCondition(
key="company_name",
match=MatchAny(any=target_companies)
)
]
)
# Use search_groups to let Qdrant handle deduplication by doc_id
# This returns only one representative chunk per unique document
response = client.search_groups(
collection_name=collection_name,
query_vector=[0.0] * vector_dimension, # Dummy vector - not used for filtering
group_by="doc_id", # Group chunks by document ID
query_filter=company_filter, # Apply company filter
group_size=1, # Only need 1 chunk per document
limit=10000, # Max number of unique documents
with_payload=["doc_type"] # Only fetch the payload we need
)
# Aggregate results from unique documents
type_counts = defaultdict(int)
for group in response.groups:
# Each group contains one hit because group_size=1
doc_type = group.hits[0].payload["doc_type"]
type_counts[doc_type] += 1
return {
"total_documents": len(response.groups),
"documents_by_type": dict(type_counts),
"target_companies": target_companies
}
def get_document_counts_with_pagination(client: QdrantClient, collection_name: str, target_companies: list[str],
vector_dimension: int = 1536, batch_size: int = 5000):
"""
Handle large collections by paginating through unique documents using search_groups.
This version supports pagination when you have more unique documents than can be
retrieved in a single request. The pagination applies to unique documents, not chunks.
Args:
client: QdrantClient instance
collection_name: Name of the Qdrant collection
target_companies: List of company names to filter by
vector_dimension: Dimension of vectors in your collection
batch_size: Number of unique documents to retrieve per batch
Returns:
Dict with total document count and breakdown by document type
"""
company_filter = Filter(
must=[
FieldCondition(
key="company_name",
match=MatchAny(any=target_companies)
)
]
)
all_type_counts = defaultdict(int)
offset = 0
total_documents = 0
print(f"Starting document aggregation for {len(target_companies)} companies...")
while True:
# Get batch of unique documents
response = client.search_groups(
collection_name=collection_name,
query_vector=[0.0] * vector_dimension,
group_by="doc_id",
query_filter=company_filter,
group_size=1,
limit=batch_size,
offset=offset,
with_payload=["doc_type"]
)
# Process this batch of unique documents
batch_count = 0
for group in response.groups:
doc_type = group.hits[0].payload["doc_type"]
all_type_counts[doc_type] += 1
batch_count += 1
total_documents += batch_count
print(f"Processed batch: {batch_count} documents (Total: {total_documents})")
# Check if we've retrieved all documents
if batch_count < batch_size:
break
offset += batch_size
return {
"total_documents": total_documents,
"documents_by_type": dict(all_type_counts),
"target_companies": target_companies,
"batches_processed": (offset // batch_size) + 1
}
def get_detailed_document_breakdown(client: QdrantClient, collection_name: str, target_companies: list[str],
vector_dimension: int = 1536):
"""
Get detailed breakdown showing documents by company AND type.
This version retrieves additional metadata to show how documents are distributed
across both companies and document types.
Args:
client: QdrantClient instance
collection_name: Name of the Qdrant collection
target_companies: List of company names to filter by
vector_dimension: Dimension of vectors in your collection
Returns:
Dict with detailed breakdown by company and type
"""
company_filter = Filter(
must=[
FieldCondition(
key="company_name",
match=MatchAny(any=target_companies)
)
]
)
response = client.search_groups(
collection_name=collection_name,
query_vector=[0.0] * vector_dimension,
group_by="doc_id",
query_filter=company_filter,
group_size=1,
limit=10000,
with_payload=["doc_type", "company_name"] # Get both fields
)
# Aggregate by type and by company+type
type_counts = defaultdict(int)
company_type_counts = defaultdict(lambda: defaultdict(int))
for group in response.groups:
hit = group.hits[0]
doc_type = hit.payload["doc_type"]
company_name = hit.payload["company_name"]
type_counts[doc_type] += 1
company_type_counts[company_name][doc_type] += 1
return {
"total_documents": len(response.groups),
"documents_by_type": dict(type_counts),
"documents_by_company_and_type": {
company: dict(types) for company, types in company_type_counts.items()
},
"target_companies": target_companies
}
# Example usage and testing
if __name__ == "__main__":
# Initialize client (adjust connection details as needed)
client = QdrantClient("localhost", port=6333)
# Example companies to analyze
target_companies = ["Company1", "Company2", "Company3", "Company4", "Company5"]
# Basic aggregation
print("=== Basic Document Counts ===")
results = get_document_counts_optimized(
client=client,
collection_name="your_collection_name",
target_companies=target_companies,
vector_dimension=1536 # Adjust to your model's dimension
)
print(f"Total documents analyzed: {results['total_documents']}")
print(f"Documents by type: {results['documents_by_type']}")
# With pagination for large collections
print("\n=== With Pagination ===")
paginated_results = get_document_counts_with_pagination(
client=client,
collection_name="your_collection_name",
target_companies=target_companies,
vector_dimension=1536,
batch_size=2000
)
print(f"Total documents: {paginated_results['total_documents']}")
print(f"Documents by type: {paginated_results['documents_by_type']}")
print(f"Processed in {paginated_results['batches_processed']} batches")
# Detailed breakdown
print("\n=== Detailed Breakdown ===")
detailed_results = get_detailed_document_breakdown(
client=client,
collection_name="your_collection_name",
target_companies=target_companies,
vector_dimension=1536
)
print(f"Total documents: {detailed_results['total_documents']}")
print(f"By type: {detailed_results['documents_by_type']}")
print("By company and type:")
for company, types in detailed_results['documents_by_company_and_type'].items():
print(f" {company}: {types}")
# User-friendly summary
print(f"\n=== Summary for Users ===")
total = detailed_results['total_documents']
by_type = detailed_results['documents_by_type']
type_summary = ", ".join([f"{count} {doc_type}" for doc_type, count in by_type.items()])
print(f"Analysis complete: I processed {total} documents ({type_summary}) ")
print(f"across {len(target_companies)} companies to provide your insights.")