๐๏ธ SoRag Indexing System Architecture
Document Processing Pipeline - From Raw Files to Vector Search
๐ฏ Indexing System Overview
SoRag Indexing ๆฏไธไธชๅฎๆด็ๆๆกฃๅค็็ฎก้๏ผ่ด่ดฃๅฐๅๅงๆๆกฃ่ฝฌๆขไธบๅฏๆ็ดข็ๅ้ๆฐๆฎๅบๆก็ฎใ
- ๆถๆๆจกๅผ: LangGraph Linear Pipeline + Factory Pattern + Message Queue
- ๅค็้ถๆฎต: 6ไธชไธป่ฆๆญฅ้ชค (Init โ Parse โ Chunk โ Embed โ Upsert โ Commit)
- ็ปไปถๆฐ้: 59ไธชPythonๆไปถ๏ผ่ฆ็่งฃๆใๅๅใๅตๅ
ฅใๅญๅจ
- ๅญๅจๆฏๆ: ๅค็งๅ้ๆฐๆฎๅบ (FAISS, Qdrant, Pinecone) + ็จ็ๆ็ดข (Elasticsearch)
- ๆไปถๆ ผๅผ: PDF, ๅพ็, ๆๆฌๆๆกฃ็ญๅคๆ ผๅผๆฏๆ
๐ ๅพไพ่ฏดๆ Legend
๐ Node - ๅค็่็น
๐ญ Factory - ็ปไปถๅทฅๅ
๐ง Component - ๅ่ฝ็ปไปถ
๐พ Storage - ๅญๅจ็ณป็ป
๐ Queue - ๆถๆฏ้ๅ
๐ Flow - ๆฐๆฎๆต
Complete Indexing Pipeline Architecture
graph TB
%% Style definitions
classDef workflow fill:#667eea,stroke:#fff,stroke-width:3px,color:#fff,font-weight:bold
classDef node fill:#764ba2,stroke:#fff,stroke-width:2px,color:#fff
classDef factory fill:#f093fb,stroke:#333,stroke-width:2px,color:#333
classDef component fill:#4ecdc4,stroke:#333,stroke-width:1px,color:#333
classDef storage fill:#ff6b6b,stroke:#333,stroke-width:2px,color:#333
classDef queue fill:#ffe66d,stroke:#333,stroke-width:2px,color:#333
classDef startEnd fill:#4CAF50,stroke:#fff,stroke-width:2px,color:#fff
%% Input Sources
DOCUMENTS["๐ Source Documents
PDF, Images, Text"]:::startEnd
TASK_QUEUE["๐ Task Queue
Database Tasks"]:::queue
%% Main Pipeline Nodes
INDEXING_WF["๐๏ธ INDEXING WORKFLOW
LangGraph Linear Pipeline"]:::workflow
TASK_INIT["๐ Task Init Node
Load task & metadata"]:::node
PARSE_NODE["๐ Parse Node
Extract content"]:::node
CHUNK_NODE["โ๏ธ Chunk Node
Split into segments"]:::node
EMBED_NODE["๐ง Embed Node
Generate embeddings"]:::node
UPSERT_NODE["โฌ๏ธ Upsert Node
Store in vector DBs"]:::node
COMMIT_NODE["โ
Commit Node
Finalize & cleanup"]:::node
%% Factory Components
subgraph FACTORIES["Component Factories"]
PARSER_FACTORY["๐ญ Parser Factory
4 file formats"]:::factory
CHUNKER_FACTORY["๐ญ Chunker Factory
4 chunking strategies"]:::factory
EMBEDDER_FACTORY["๐ญ Embedder Factory
Dense + Sparse"]:::factory
UPSERTER_FACTORY["๐ญ Upserter Factory
Vector + Sparse stores"]:::factory
end
%% Parser Components
subgraph PARSERS["Document Parsers (4 types)"]
PDF_PARSER["๐ PDF Parser
PyPDF2, OCR"]:::component
IMG_PARSER["๐ผ๏ธ Image Parser
OCR, Vision models"]:::component
TEXT_PARSER["๐ Text Parser
Plain text, markdown"]:::component
DOC_PARSER["๐ Document Parser
DOCX, other formats"]:::component
end
%% Chunker Components
subgraph CHUNKERS["Text Chunkers (4 types)"]
FIXED_CHUNKER["๐ Fixed Size Chunker
Character/token based"]:::component
SEMANTIC_CHUNKER["๐ง Semantic Chunker
AI-based splitting"]:::component
PAGE_CHUNKER["๐ Page Aware Chunker
Preserve page structure"]:::component
SMART_CHUNKER["๐ฏ Smart Chunker
Adaptive algorithms"]:::component
end
%% Embedder Components
subgraph EMBEDDERS["Embedding Models (3 types)"]
BGE_EMBEDDER["๐จ๐ณ BGE Dense Embedder
Chinese optimized"]:::component
OPENAI_EMBEDDER["๐ค OpenAI Embedder
text-embedding-ada-002"]:::component
BM25_EMBEDDER["๐ BM25 Sparse Embedder
Traditional TF-IDF"]:::component
end
%% Storage Systems
subgraph VECTOR_STORES["Vector Storage (5 systems)"]
FAISS_DB["๐ FAISS
Local vector search"]:::storage
QDRANT_DB["โ๏ธ Qdrant
Cloud vector DB"]:::storage
PINECONE_DB["๐ฒ Pinecone
Managed vector DB"]:::storage
ELASTICSEARCH["๐ Elasticsearch
Sparse search"]:::storage
MOCK_STORE["๐งช Mock Store
Testing purposes"]:::storage
end
%% Message Queue System
subgraph MESSAGING["Message Queue System"]
BROKER["๐ก Message Broker
CAP Event Sourcing"]:::queue
CONSUMERS["๐ท Queue Consumers
Worker processes"]:::queue
WORKERS["๐ง Processing Workers
Parallel execution"]:::queue
end
%% Main Flow
DOCUMENTS --> TASK_QUEUE
TASK_QUEUE --> INDEXING_WF
%% Pipeline Flow
INDEXING_WF --> TASK_INIT
TASK_INIT --> PARSE_NODE
PARSE_NODE --> CHUNK_NODE
CHUNK_NODE --> EMBED_NODE
EMBED_NODE --> UPSERT_NODE
UPSERT_NODE --> COMMIT_NODE
%% Factory Connections
PARSE_NODE ==> PARSER_FACTORY
CHUNK_NODE ==> CHUNKER_FACTORY
EMBED_NODE ==> EMBEDDER_FACTORY
UPSERT_NODE ==> UPSERTER_FACTORY
%% Component Connections
PARSER_FACTORY ==> PARSERS
CHUNKER_FACTORY ==> CHUNKERS
EMBEDDER_FACTORY ==> EMBEDDERS
UPSERTER_FACTORY ==> VECTOR_STORES
%% Alternative Message Queue Path
TASK_QUEUE -.-> MESSAGING
MESSAGING -.-> PARSERS
MESSAGING -.-> CHUNKERS
MESSAGING -.-> EMBEDDERS
MESSAGING -.-> VECTOR_STORES
%% Success/Error Paths
COMMIT_NODE --> SUCCESS["โ
INDEXING COMPLETE"]:::startEnd
TASK_INIT -.->|error| ERROR["โ INDEXING FAILED"]:::startEnd
PARSE_NODE -.->|error| ERROR
CHUNK_NODE -.->|error| ERROR
EMBED_NODE -.->|error| ERROR
UPSERT_NODE -.->|error| ERROR
Data Flow and State Management
graph LR
%% State Flow
subgraph STATE_FLOW["IndexingState Data Flow"]
INIT_STATE["Initial State
task_id, trace_id"]
PARSE_STATE["Parse State
+ content, metadata"]
CHUNK_STATE["Chunk State
+ chunk_ids, token_count"]
EMBED_STATE["Embed State
+ embedding_ids"]
UPSERT_STATE["Upsert State
+ mapping_ids"]
FINAL_STATE["Final State
+ statistics, status"]
end
%% Repository Layer
subgraph REPOS["Database Repositories"]
TASK_REPO["TaskRepository
Task management"]
CHUNK_REPO["ChunkRepository
Text chunks"]
EMBED_REPO["EmbeddingRepository
Vector embeddings"]
MAPPING_REPO["MappingRepository
Vector store mapping"]
EVENT_REPO["EventRepository
Audit trail"]
end
%% Configuration
subgraph CONFIG["Configuration System"]
SETTINGS["Settings
System configuration"]
TENANT_CTX["TenantContext
Multi-tenant isolation"]
UOW["UnitOfWork
Transaction management"]
end
%% State progression
INIT_STATE --> PARSE_STATE
PARSE_STATE --> CHUNK_STATE
CHUNK_STATE --> EMBED_STATE
EMBED_STATE --> UPSERT_STATE
UPSERT_STATE --> FINAL_STATE
%% Database interactions
INIT_STATE -.-> TASK_REPO
PARSE_STATE -.-> EVENT_REPO
CHUNK_STATE -.-> CHUNK_REPO
EMBED_STATE -.-> EMBED_REPO
UPSERT_STATE -.-> MAPPING_REPO
FINAL_STATE -.-> TASK_REPO
%% Configuration usage
CONFIG ==> STATE_FLOW
CONFIG ==> REPOS
%% Styling
classDef stateNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef repoNode fill:#fff3e0,stroke:#e65100,stroke-width:2px
classDef configNode fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
class INIT_STATE,PARSE_STATE,CHUNK_STATE,EMBED_STATE,UPSERT_STATE,FINAL_STATE stateNode
class TASK_REPO,CHUNK_REPO,EMBED_REPO,MAPPING_REPO,EVENT_REPO repoNode
class SETTINGS,TENANT_CTX,UOW configNode
๐ Document Parsers
- PDF Parser: PyPDF2 + OCR for scanned PDFs
- Image Parser: OCR + Vision models for images
- Text Parser: Plain text, Markdown, CSV
- Document Parser: DOCX, RTF, other formats
โ๏ธ Text Chunkers
- Fixed Size: Character/token-based chunking
- Semantic: AI-based intelligent splitting
- Page Aware: Preserve page boundaries
- Smart Chunker: Adaptive algorithms
๐ง Embedding Models
- BGE Dense: Chinese-optimized embeddings
- OpenAI: text-embedding-ada-002
- BM25 Sparse: Traditional TF-IDF vectors
- Custom Models: Pluggable architecture
๐พ Vector Storage
- FAISS: Local high-performance search
- Qdrant: Cloud-native vector DB
- Pinecone: Managed vector service
- Elasticsearch: Sparse search + BM25
- Mock Store: Testing and development
๐ Message Queue (CAP)
- Broker: Event-driven architecture
- Consumers: Worker process management
- Workers: Parallel task processing
- CAP Handler: Event sourcing pattern
๐ง Infrastructure
- Unit of Work: Transaction management
- Tenant Context: Multi-tenant isolation
- Factory Pattern: Component registration
- Repository Pattern: Data access layer
Pipeline Stage |
Input |
Processing |
Output |
Components Used |
1. Task Init |
Task ID |
Load task metadata & config |
Task state + file info |
TaskRepository, Settings |
2. Parse |
File path + metadata |
Extract text from documents |
Raw content + metadata |
PDF/Image/Text/Doc Parsers |
3. Chunk |
Raw content |
Split into manageable segments |
Chunk list + token counts |
Fixed/Semantic/Page/Smart Chunkers |
4. Embed |
Text chunks |
Generate vector embeddings |
Dense + sparse embeddings |
BGE/OpenAI/BM25 Embedders |
5. Upsert |
Embeddings + chunks |
Store in vector databases |
Storage mapping IDs |
FAISS/Qdrant/Pinecone/ES Upserters |
6. Commit |
All processing results |
Finalize task & cleanup |
Success status + statistics |
TaskRepository, EventRepository |
Key Architecture Patterns
# LangGraph Linear Pipeline
workflow = StateGraph(IndexingState)
workflow.add_edge(START, "task_init")
workflow.add_conditional_edges("task_init", check_success, {"success": "parse", "error": "error"})
workflow.add_conditional_edges("parse", check_success, {"success": "chunk", "error": "error"})
# ... continue for all stages
# Factory Pattern for Components
parser_factory = get_parser_factory()
chunker_factory = get_chunker_factory()
dense_embedder_factory = get_dense_embedder_factory()
sparse_embedder_factory = get_sparse_embedder_factory()
vector_upserter_factory = get_vector_upserter_factory()
sparse_upserter_factory = get_sparse_upserter_factory()
# Dependency Injection in Nodes
parse_node = ParseNode(uow)
chunk_node = ChunkNode(uow, settings, chunker_factory)
embed_node = EmbedNode(uow, settings, dense_embedder_factory, sparse_embedder_factory)
upsert_node = UpsertNode(uow, settings, vector_upserter_factory, sparse_upserter_factory)
# State Management
class IndexingState(TypedDict):
task_id: int
content: Optional[str]
chunk_ids: Optional[list]
dense_embedding_ids: Optional[list]
sparse_embedding_ids: Optional[list]
vector_mapping_ids: Optional[list]
# ... 30+ fields tracking complete pipeline state
# Error Handling & Recovery
if result.success and result.data:
state.update(result.data)
else:
state["error_message"] = result.error_message
state["failed_at_stage"] = result.stage
# Pipeline automatically routes to error handler
๐๏ธ Architecture Highlights
- โ
Linear Pipeline Design: LangGraph-based sequential processing with error handling
- โ
Factory Pattern: Pluggable components for parsers, chunkers, embedders, upserters
- โ
Multi-Storage Support: FAISS, Qdrant, Pinecone for vectors + Elasticsearch for sparse
- โ
Comprehensive State Tracking: 30+ fields track complete pipeline progress
- โ
Error Recovery: Conditional edges route to error handler at each stage
- โ
Multi-Tenant Isolation: Tenant context propagated through entire pipeline
- โ
Message Queue Alternative: CAP-based async processing for high-throughput scenarios
- โ
Performance Monitoring: Detailed statistics and timing for each stage