๐Ÿ—๏ธ SoRag Indexing System Architecture

Document Processing Pipeline - From Raw Files to Vector Search

๐ŸŽฏ Indexing System Overview

SoRag Indexing ๆ˜ฏไธ€ไธชๅฎŒๆ•ด็š„ๆ–‡ๆกฃๅค„็†็ฎก้“๏ผŒ่ดŸ่ดฃๅฐ†ๅŽŸๅง‹ๆ–‡ๆกฃ่ฝฌๆขไธบๅฏๆœ็ดข็š„ๅ‘้‡ๆ•ฐๆฎๅบ“ๆก็›ฎใ€‚

6
Pipeline Stages
4
Chunker Types
3
Embedder Models
5
Vector Stores
4
File Parsers
59
Total Files

๐Ÿ“Š ๅ›พไพ‹่ฏดๆ˜Ž Legend

๐Ÿ“‹ Node - ๅค„็†่Š‚็‚น
๐Ÿญ Factory - ็ป„ไปถๅทฅๅŽ‚
๐Ÿ”ง Component - ๅŠŸ่ƒฝ็ป„ไปถ
๐Ÿ’พ Storage - ๅญ˜ๅ‚จ็ณป็ปŸ
๐Ÿ“Š Queue - ๆถˆๆฏ้˜Ÿๅˆ—
๐Ÿ”„ Flow - ๆ•ฐๆฎๆต

Complete Indexing Pipeline Architecture

graph TB %% Style definitions classDef workflow fill:#667eea,stroke:#fff,stroke-width:3px,color:#fff,font-weight:bold classDef node fill:#764ba2,stroke:#fff,stroke-width:2px,color:#fff classDef factory fill:#f093fb,stroke:#333,stroke-width:2px,color:#333 classDef component fill:#4ecdc4,stroke:#333,stroke-width:1px,color:#333 classDef storage fill:#ff6b6b,stroke:#333,stroke-width:2px,color:#333 classDef queue fill:#ffe66d,stroke:#333,stroke-width:2px,color:#333 classDef startEnd fill:#4CAF50,stroke:#fff,stroke-width:2px,color:#fff %% Input Sources DOCUMENTS["๐Ÿ“„ Source Documents
PDF, Images, Text"]:::startEnd TASK_QUEUE["๐Ÿ“Š Task Queue
Database Tasks"]:::queue %% Main Pipeline Nodes INDEXING_WF["๐Ÿ—๏ธ INDEXING WORKFLOW
LangGraph Linear Pipeline"]:::workflow TASK_INIT["๐Ÿ“‹ Task Init Node
Load task & metadata"]:::node PARSE_NODE["๐Ÿ“ Parse Node
Extract content"]:::node CHUNK_NODE["โœ‚๏ธ Chunk Node
Split into segments"]:::node EMBED_NODE["๐Ÿง  Embed Node
Generate embeddings"]:::node UPSERT_NODE["โฌ†๏ธ Upsert Node
Store in vector DBs"]:::node COMMIT_NODE["โœ… Commit Node
Finalize & cleanup"]:::node %% Factory Components subgraph FACTORIES["Component Factories"] PARSER_FACTORY["๐Ÿญ Parser Factory
4 file formats"]:::factory CHUNKER_FACTORY["๐Ÿญ Chunker Factory
4 chunking strategies"]:::factory EMBEDDER_FACTORY["๐Ÿญ Embedder Factory
Dense + Sparse"]:::factory UPSERTER_FACTORY["๐Ÿญ Upserter Factory
Vector + Sparse stores"]:::factory end %% Parser Components subgraph PARSERS["Document Parsers (4 types)"] PDF_PARSER["๐Ÿ“• PDF Parser
PyPDF2, OCR"]:::component IMG_PARSER["๐Ÿ–ผ๏ธ Image Parser
OCR, Vision models"]:::component TEXT_PARSER["๐Ÿ“ Text Parser
Plain text, markdown"]:::component DOC_PARSER["๐Ÿ“„ Document Parser
DOCX, other formats"]:::component end %% Chunker Components subgraph CHUNKERS["Text Chunkers (4 types)"] FIXED_CHUNKER["๐Ÿ“ Fixed Size Chunker
Character/token based"]:::component SEMANTIC_CHUNKER["๐Ÿง  Semantic Chunker
AI-based splitting"]:::component PAGE_CHUNKER["๐Ÿ“„ Page Aware Chunker
Preserve page structure"]:::component SMART_CHUNKER["๐ŸŽฏ Smart Chunker
Adaptive algorithms"]:::component end %% Embedder Components subgraph EMBEDDERS["Embedding Models (3 types)"] BGE_EMBEDDER["๐Ÿ‡จ๐Ÿ‡ณ BGE Dense Embedder
Chinese optimized"]:::component OPENAI_EMBEDDER["๐Ÿค– OpenAI Embedder
text-embedding-ada-002"]:::component BM25_EMBEDDER["๐Ÿ“Š BM25 Sparse Embedder
Traditional TF-IDF"]:::component end %% Storage Systems subgraph VECTOR_STORES["Vector Storage (5 systems)"] FAISS_DB["๐Ÿ” FAISS
Local vector search"]:::storage QDRANT_DB["โ˜๏ธ Qdrant
Cloud vector DB"]:::storage PINECONE_DB["๐ŸŒฒ Pinecone
Managed vector DB"]:::storage ELASTICSEARCH["๐Ÿ”Ž Elasticsearch
Sparse search"]:::storage MOCK_STORE["๐Ÿงช Mock Store
Testing purposes"]:::storage end %% Message Queue System subgraph MESSAGING["Message Queue System"] BROKER["๐Ÿ“ก Message Broker
CAP Event Sourcing"]:::queue CONSUMERS["๐Ÿ‘ท Queue Consumers
Worker processes"]:::queue WORKERS["๐Ÿ”ง Processing Workers
Parallel execution"]:::queue end %% Main Flow DOCUMENTS --> TASK_QUEUE TASK_QUEUE --> INDEXING_WF %% Pipeline Flow INDEXING_WF --> TASK_INIT TASK_INIT --> PARSE_NODE PARSE_NODE --> CHUNK_NODE CHUNK_NODE --> EMBED_NODE EMBED_NODE --> UPSERT_NODE UPSERT_NODE --> COMMIT_NODE %% Factory Connections PARSE_NODE ==> PARSER_FACTORY CHUNK_NODE ==> CHUNKER_FACTORY EMBED_NODE ==> EMBEDDER_FACTORY UPSERT_NODE ==> UPSERTER_FACTORY %% Component Connections PARSER_FACTORY ==> PARSERS CHUNKER_FACTORY ==> CHUNKERS EMBEDDER_FACTORY ==> EMBEDDERS UPSERTER_FACTORY ==> VECTOR_STORES %% Alternative Message Queue Path TASK_QUEUE -.-> MESSAGING MESSAGING -.-> PARSERS MESSAGING -.-> CHUNKERS MESSAGING -.-> EMBEDDERS MESSAGING -.-> VECTOR_STORES %% Success/Error Paths COMMIT_NODE --> SUCCESS["โœ… INDEXING COMPLETE"]:::startEnd TASK_INIT -.->|error| ERROR["โŒ INDEXING FAILED"]:::startEnd PARSE_NODE -.->|error| ERROR CHUNK_NODE -.->|error| ERROR EMBED_NODE -.->|error| ERROR UPSERT_NODE -.->|error| ERROR

Data Flow and State Management

graph LR %% State Flow subgraph STATE_FLOW["IndexingState Data Flow"] INIT_STATE["Initial State
task_id, trace_id"] PARSE_STATE["Parse State
+ content, metadata"] CHUNK_STATE["Chunk State
+ chunk_ids, token_count"] EMBED_STATE["Embed State
+ embedding_ids"] UPSERT_STATE["Upsert State
+ mapping_ids"] FINAL_STATE["Final State
+ statistics, status"] end %% Repository Layer subgraph REPOS["Database Repositories"] TASK_REPO["TaskRepository
Task management"] CHUNK_REPO["ChunkRepository
Text chunks"] EMBED_REPO["EmbeddingRepository
Vector embeddings"] MAPPING_REPO["MappingRepository
Vector store mapping"] EVENT_REPO["EventRepository
Audit trail"] end %% Configuration subgraph CONFIG["Configuration System"] SETTINGS["Settings
System configuration"] TENANT_CTX["TenantContext
Multi-tenant isolation"] UOW["UnitOfWork
Transaction management"] end %% State progression INIT_STATE --> PARSE_STATE PARSE_STATE --> CHUNK_STATE CHUNK_STATE --> EMBED_STATE EMBED_STATE --> UPSERT_STATE UPSERT_STATE --> FINAL_STATE %% Database interactions INIT_STATE -.-> TASK_REPO PARSE_STATE -.-> EVENT_REPO CHUNK_STATE -.-> CHUNK_REPO EMBED_STATE -.-> EMBED_REPO UPSERT_STATE -.-> MAPPING_REPO FINAL_STATE -.-> TASK_REPO %% Configuration usage CONFIG ==> STATE_FLOW CONFIG ==> REPOS %% Styling classDef stateNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef repoNode fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef configNode fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px class INIT_STATE,PARSE_STATE,CHUNK_STATE,EMBED_STATE,UPSERT_STATE,FINAL_STATE stateNode class TASK_REPO,CHUNK_REPO,EMBED_REPO,MAPPING_REPO,EVENT_REPO repoNode class SETTINGS,TENANT_CTX,UOW configNode

๐Ÿ“ Document Parsers

  • PDF Parser: PyPDF2 + OCR for scanned PDFs
  • Image Parser: OCR + Vision models for images
  • Text Parser: Plain text, Markdown, CSV
  • Document Parser: DOCX, RTF, other formats

โœ‚๏ธ Text Chunkers

  • Fixed Size: Character/token-based chunking
  • Semantic: AI-based intelligent splitting
  • Page Aware: Preserve page boundaries
  • Smart Chunker: Adaptive algorithms

๐Ÿง  Embedding Models

  • BGE Dense: Chinese-optimized embeddings
  • OpenAI: text-embedding-ada-002
  • BM25 Sparse: Traditional TF-IDF vectors
  • Custom Models: Pluggable architecture

๐Ÿ’พ Vector Storage

  • FAISS: Local high-performance search
  • Qdrant: Cloud-native vector DB
  • Pinecone: Managed vector service
  • Elasticsearch: Sparse search + BM25
  • Mock Store: Testing and development

๐Ÿ“Š Message Queue (CAP)

  • Broker: Event-driven architecture
  • Consumers: Worker process management
  • Workers: Parallel task processing
  • CAP Handler: Event sourcing pattern

๐Ÿ”ง Infrastructure

  • Unit of Work: Transaction management
  • Tenant Context: Multi-tenant isolation
  • Factory Pattern: Component registration
  • Repository Pattern: Data access layer
Pipeline Stage Input Processing Output Components Used
1. Task Init Task ID Load task metadata & config Task state + file info TaskRepository, Settings
2. Parse File path + metadata Extract text from documents Raw content + metadata PDF/Image/Text/Doc Parsers
3. Chunk Raw content Split into manageable segments Chunk list + token counts Fixed/Semantic/Page/Smart Chunkers
4. Embed Text chunks Generate vector embeddings Dense + sparse embeddings BGE/OpenAI/BM25 Embedders
5. Upsert Embeddings + chunks Store in vector databases Storage mapping IDs FAISS/Qdrant/Pinecone/ES Upserters
6. Commit All processing results Finalize task & cleanup Success status + statistics TaskRepository, EventRepository

Key Architecture Patterns

# LangGraph Linear Pipeline
workflow = StateGraph(IndexingState)
workflow.add_edge(START, "task_init")
workflow.add_conditional_edges("task_init", check_success, {"success": "parse", "error": "error"})
workflow.add_conditional_edges("parse", check_success, {"success": "chunk", "error": "error"})
# ... continue for all stages

# Factory Pattern for Components
parser_factory = get_parser_factory()
chunker_factory = get_chunker_factory()
dense_embedder_factory = get_dense_embedder_factory()
sparse_embedder_factory = get_sparse_embedder_factory()
vector_upserter_factory = get_vector_upserter_factory()
sparse_upserter_factory = get_sparse_upserter_factory()

# Dependency Injection in Nodes
parse_node = ParseNode(uow)
chunk_node = ChunkNode(uow, settings, chunker_factory)
embed_node = EmbedNode(uow, settings, dense_embedder_factory, sparse_embedder_factory)
upsert_node = UpsertNode(uow, settings, vector_upserter_factory, sparse_upserter_factory)

# State Management
class IndexingState(TypedDict):
    task_id: int
    content: Optional[str]
    chunk_ids: Optional[list]
    dense_embedding_ids: Optional[list]
    sparse_embedding_ids: Optional[list]
    vector_mapping_ids: Optional[list]
    # ... 30+ fields tracking complete pipeline state

# Error Handling & Recovery
if result.success and result.data:
    state.update(result.data)
else:
    state["error_message"] = result.error_message
    state["failed_at_stage"] = result.stage
    # Pipeline automatically routes to error handler
            

๐Ÿ—๏ธ Architecture Highlights