Content is user-generated and unverified.
""" Qdrant Document-Level Aggregation for RAG Applications PROBLEM STATEMENT: ================ In a RAG (Retrieval Augmented Generation) system, documents are typically chunked into smaller pieces for embedding and storage in vector databases like Qdrant. However, when analyzing or reporting on the data, users want to know about complete documents, not individual chunks. BUSINESS CONTEXT: ================ - You have a large document corpus (5000+ companies) stored in Qdrant - Each document is split into multiple chunks (e.g., "IP March 2024" → 50 chunks) - Documents have metadata: company_name, doc_type (IP, AR, CC), doc_id - Users select a subset of companies (e.g., 5 out of 5000) - You need to tell users: "I analyzed 247 documents (89 IP, 94 AR, 64 CC) for your 5 companies" TECHNICAL CHALLENGE: ================== - Naive approach: Pull all chunks matching company filter → deduplicate in Python - Problems: Massive data transfer, inefficient client-side processing, high memory usage - Solution: Use Qdrant's search_groups to perform server-side deduplication by doc_id KEY INSIGHT: =========== search_groups with group_by="doc_id" tells Qdrant to: 1. Find all chunks matching your filter (company_name in selected companies) 2. Group chunks by document ID on the server 3. Return only 1 representative chunk per unique document 4. Transfer minimal data while preserving document-level metadata EFFICIENCY GAINS: ================ Before: 1,000 documents × 50 chunks = 50,000 network transfers + Python deduplication After: 1,000 documents = 1,000 network transfers + server-side grouping Result: 50x less data transfer, orders of magnitude faster processing """ from collections import defaultdict from qdrant_client import QdrantClient from qdrant_client.http.models import Filter, FieldCondition, MatchAny def get_document_counts_optimized(client: QdrantClient, collection_name: str, target_companies: list[str], vector_dimension: int = 1536): """ CORE FUNCTION: Get document-level counts by type for selected companies PURPOSE: -------- Given a subset of companies (e.g., 5 out of 5000), count how many unique documents of each type (IP, AR, CC) exist for those companies. This tells users the scope of work: "I analyzed 247 documents across your 5 companies." HOW IT WORKS: ------------ 1. Filter chunks by company_name matching target companies 2. Use search_groups with group_by="doc_id" to deduplicate chunks into documents 3. Qdrant returns 1 representative chunk per unique document (server-side grouping) 4. Count documents by doc_type to get final aggregation EFFICIENCY: ---------- - Server-side deduplication in optimized Rust code - Minimal data transfer (1 chunk per document, not all chunks) - No client-side memory overhead from redundant chunks Args: client: QdrantClient instance collection_name: Name of the Qdrant collection target_companies: List of company names to filter by (e.g., ["Company1", "Company2"]) vector_dimension: Dimension of vectors in your collection (adjust to your model) Returns: Dict with: - total_documents: Total unique documents found - documents_by_type: {"IP": 89, "AR": 94, "CC": 64} - target_companies: Echo of input companies Example: results = get_document_counts_optimized(client, "docs", ["AAPL", "MSFT"], 1536) # Returns: {"total_documents": 247, "documents_by_type": {"IP": 89, "AR": 94, "CC": 64}} """ # Create filter for target companies company_filter = Filter( must=[ FieldCondition( key="company_name", match=MatchAny(any=target_companies) ) ] ) # Use search_groups to let Qdrant handle deduplication by doc_id # This returns only one representative chunk per unique document response = client.search_groups( collection_name=collection_name, query_vector=[0.0] * vector_dimension, # Dummy vector - not used for filtering group_by="doc_id", # Group chunks by document ID query_filter=company_filter, # Apply company filter group_size=1, # Only need 1 chunk per document limit=10000, # Max number of unique documents with_payload=["doc_type"] # Only fetch the payload we need ) # Aggregate results from unique documents type_counts = defaultdict(int) for group in response.groups: # Each group contains one hit because group_size=1 doc_type = group.hits[0].payload["doc_type"] type_counts[doc_type] += 1 return { "total_documents": len(response.groups), "documents_by_type": dict(type_counts), "target_companies": target_companies } def get_document_counts_with_pagination(client: QdrantClient, collection_name: str, target_companies: list[str], vector_dimension: int = 1536, batch_size: int = 5000): """ Handle large collections by paginating through unique documents using search_groups. This version supports pagination when you have more unique documents than can be retrieved in a single request. The pagination applies to unique documents, not chunks. Args: client: QdrantClient instance collection_name: Name of the Qdrant collection target_companies: List of company names to filter by vector_dimension: Dimension of vectors in your collection batch_size: Number of unique documents to retrieve per batch Returns: Dict with total document count and breakdown by document type """ company_filter = Filter( must=[ FieldCondition( key="company_name", match=MatchAny(any=target_companies) ) ] ) all_type_counts = defaultdict(int) offset = 0 total_documents = 0 print(f"Starting document aggregation for {len(target_companies)} companies...") while True: # Get batch of unique documents response = client.search_groups( collection_name=collection_name, query_vector=[0.0] * vector_dimension, group_by="doc_id", query_filter=company_filter, group_size=1, limit=batch_size, offset=offset, with_payload=["doc_type"] ) # Process this batch of unique documents batch_count = 0 for group in response.groups: doc_type = group.hits[0].payload["doc_type"] all_type_counts[doc_type] += 1 batch_count += 1 total_documents += batch_count print(f"Processed batch: {batch_count} documents (Total: {total_documents})") # Check if we've retrieved all documents if batch_count < batch_size: break offset += batch_size return { "total_documents": total_documents, "documents_by_type": dict(all_type_counts), "target_companies": target_companies, "batches_processed": (offset // batch_size) + 1 } def get_detailed_document_breakdown(client: QdrantClient, collection_name: str, target_companies: list[str], vector_dimension: int = 1536): """ Get detailed breakdown showing documents by company AND type. This version retrieves additional metadata to show how documents are distributed across both companies and document types. Args: client: QdrantClient instance collection_name: Name of the Qdrant collection target_companies: List of company names to filter by vector_dimension: Dimension of vectors in your collection Returns: Dict with detailed breakdown by company and type """ company_filter = Filter( must=[ FieldCondition( key="company_name", match=MatchAny(any=target_companies) ) ] ) response = client.search_groups( collection_name=collection_name, query_vector=[0.0] * vector_dimension, group_by="doc_id", query_filter=company_filter, group_size=1, limit=10000, with_payload=["doc_type", "company_name"] # Get both fields ) # Aggregate by type and by company+type type_counts = defaultdict(int) company_type_counts = defaultdict(lambda: defaultdict(int)) for group in response.groups: hit = group.hits[0] doc_type = hit.payload["doc_type"] company_name = hit.payload["company_name"] type_counts[doc_type] += 1 company_type_counts[company_name][doc_type] += 1 return { "total_documents": len(response.groups), "documents_by_type": dict(type_counts), "documents_by_company_and_type": { company: dict(types) for company, types in company_type_counts.items() }, "target_companies": target_companies } # Example usage and testing if __name__ == "__main__": # Initialize client (adjust connection details as needed) client = QdrantClient("localhost", port=6333) # Example companies to analyze target_companies = ["Company1", "Company2", "Company3", "Company4", "Company5"] # Basic aggregation print("=== Basic Document Counts ===") results = get_document_counts_optimized( client=client, collection_name="your_collection_name", target_companies=target_companies, vector_dimension=1536 # Adjust to your model's dimension ) print(f"Total documents analyzed: {results['total_documents']}") print(f"Documents by type: {results['documents_by_type']}") # With pagination for large collections print("\n=== With Pagination ===") paginated_results = get_document_counts_with_pagination( client=client, collection_name="your_collection_name", target_companies=target_companies, vector_dimension=1536, batch_size=2000 ) print(f"Total documents: {paginated_results['total_documents']}") print(f"Documents by type: {paginated_results['documents_by_type']}") print(f"Processed in {paginated_results['batches_processed']} batches") # Detailed breakdown print("\n=== Detailed Breakdown ===") detailed_results = get_detailed_document_breakdown( client=client, collection_name="your_collection_name", target_companies=target_companies, vector_dimension=1536 ) print(f"Total documents: {detailed_results['total_documents']}") print(f"By type: {detailed_results['documents_by_type']}") print("By company and type:") for company, types in detailed_results['documents_by_company_and_type'].items(): print(f" {company}: {types}") # User-friendly summary print(f"\n=== Summary for Users ===") total = detailed_results['total_documents'] by_type = detailed_results['documents_by_type'] type_summary = ", ".join([f"{count} {doc_type}" for doc_type, count in by_type.items()]) print(f"Analysis complete: I processed {total} documents ({type_summary}) ") print(f"across {len(target_companies)} companies to provide your insights.")
Content is user-generated and unverified.
    Optimized Qdrant Document Aggregation with search_groups | Claude