A modular, interactive CLI framework for building and managing Retrieval-Augmented Generation (RAG) systems. Built with Python, this tool provides an easy-to-use interface for processing documents, creating embeddings, and running RAG modules with real-time progress tracking.
- Interactive CLI Interface: Navigate through document collections and RAG modules using arrow keys
- Modular Architecture: Easily add new RAG modules without modifying core code
- Multiprocessing Support: Parallel processing for improved performance
- Real-time Progress Tracking: Live updates during processing with ETA calculations
- Error Handling & Logging: Comprehensive error logging and user-friendly error messages
- Cross-platform: Works on Linux, macOS, and Windows
- Configurable: Customize RAG parameters for each module
NCIIPC_AI_Grand_Challange/
├── main.py # Main CLI application entry point
├── pyproject.toml # Project configuration and dependencies
├── uv.lock # Dependency lock file
├── error_logs.txt # Error log file
├── README.md # This file
├── data/
│ ├── input/ # Input document collections directory
│ │ └── <collection_name>/ # Document collection folders
│ └── output/ # Processed output directory
│ └── <collection_name>/
├── modules/ # RAG modules directory
│ └── chunking_engine/ # Example module
│ ├── main.py # Module core logic
│ ├── batch_processor.py
│ └── batch_processor_app.py
├── tests/ # Unit tests
├── utils/ # Utility modules
│ └── console.py # Console formatting and interaction utilities
└── Infrastructure/ # Infrastructure-related files
- Python 3.8+
- uv package manager (recommended) or pip
-
Clone the repository:
git clone <repository-url> cd NCIIPC_AI_Grand_Challange
-
Install dependencies:
# Using uv (recommended) uv sync # Or using pip pip install -r requirements.txt
-
Run the application:
# Using uv uv run python main.py # Or directly python main.py
The chunking engine now supports row-based chunking for CSV and JSON files, which creates one chunk per row/record. This is ideal for structured data where each row represents a complete, logical record.
- CSV Files: Each row becomes a separate chunk with structured text representation
- JSON Files: Each array element or object key becomes a separate chunk
- Data Integrity: Preserves all original data in each chunk
- Flexible Configuration: Choose between row-based or traditional text chunking
Set these environment variables to control chunking behavior:
# Enable/disable row-based chunking
CSV_CHUNK_BY_ROW=true # Default: true (one chunk per CSV row)
JSON_CHUNK_BY_RECORD=true # Default: true (one chunk per JSON record/key)
# Traditional chunking parameters (when row-based is disabled)
CHUNK_SIZE=400 # Maximum characters per chunk
CHUNK_OVERLAP=100 # Characters to overlap between chunksFor a CSV file with product data:
product_id,name,color,description,price
123,"T-Shirt","Blue","Soft cotton shirt",29.99
124,"Jeans","Black","Slim fit jeans",79.99Row-based chunking creates:
- Chunk 1:
product_id: 123, name: T-Shirt, color: Blue, description: Soft cotton shirt, price: 29.99 - Chunk 2:
product_id: 124, name: Jeans, color: Black, description: Slim fit jeans, price: 79.99
For a JSON array of employees:
[
{"id": 1, "name": "John", "department": "Engineering"},
{"id": 2, "name": "Jane", "department": "Marketing"}
]Record-based chunking creates:
- Chunk 1:
id: 1, name: John, department: Engineering - Chunk 2:
id: 2, name: Jane, department: Marketing
- High Data Integrity: Each chunk corresponds to a single, logical record
- Precise Search Results: Queries match complete records exactly
- Structured Context: All fields from a row are included in search context
- No Data Loss: Overlapping chunks don't split important information
from modules.chunking_engine.main import ChunkingEngine, ChunkingConfig
# Enable row-based chunking
config = ChunkingConfig(
csv_chunk_by_row=True, # One chunk per CSV row
json_chunk_by_record=True, # One chunk per JSON record
debug=True
)
engine = ChunkingEngine()
result = engine.process_file(config, 'data/products.csv')
# Each chunk contains the full row data
for chunk in result['data']['chunks']:
print(f"Row data: {chunk['row_data']}") # For CSV files
print(f"Text: {chunk['text']}") # Structured text representation- Launch the CLI: Run
python main.py - Select Document Collection: Choose from available collections in
data/input/ - Select Module: Choose a RAG module
- Configure: Set module-specific parameters
- Run: Execute the processing with real-time progress tracking
Place your document collection folders in the data/input/ directory:
data/input/
├── my_documents/
│ ├── doc1.txt
│ ├── doc2.txt
│ └── ...
└── knowledge_base/
└── ...
The tool automatically detects all subdirectories in data/input/ as available document collections.
Each module should follow this structure:
modules/
└── your_module_name/
├── main.py # Core module logic and classes
├── batch_processor.py # Batch processing logic for RAG operations (optional)
└── batch_processor_app.py # CLI interface for the module
mkdir modules/your_module_nameCreate the main module file with your RAG processing logic:
"""
Your Module Name - Core RAG Processing Logic
"""
import os
from pathlib import Path
from typing import Dict, Any, List
class YourModuleConfig:
"""Configuration class for your module"""
def __init__(self, param1: int = 100, param2: str = "default"):
self.param1 = param1
self.param2 = param2
class YourModule:
"""Main processing class"""
def process_file(self, config: YourModuleConfig, file_path: str) -> Dict[str, Any]:
"""
Process a single file
Args:
config: Module configuration
file_path: Path to the file to process
Returns:
Dict containing processing results
"""
try:
# Your RAG processing logic here
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Process the content
result = {
"file_name": os.path.basename(file_path),
"processed_content": self._process_content(content, config),
"metadata": {
"param1": config.param1,
"param2": config.param2,
"processing_timestamp": "timestamp_here"
}
}
return result
except Exception as e:
raise Exception(f"Error processing {file_path}: {str(e)}")
def _process_content(self, content: str, config: YourModuleConfig) -> Any:
"""Your content processing logic for RAG"""
# Implement your RAG processing logic
return processed_contentCreate the CLI interface that integrates with the main framework:
import os
import json
import sys
import logging
import time
import multiprocessing
from multiprocessing import Process, Manager
from pathlib import Path
# Add project root to path for imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
# Import centralized console utilities
from utils.console import (
clear, format_time, check_quit, print_header,
print_footer, print_error, print_success, print_warning,
wait_for_enter, LiveConsole, Colors
)
# Import your module
try:
from modules.your_module_name.main import YourModule, YourModuleConfig
except ImportError:
print_error("Could not import YourModule. Please ensure main.py exists.")
sys.exit(1)
# Set up logging
logging.basicConfig(
level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('error_logs.txt', mode='a'),
]
)
logger = logging.getLogger(__name__)
def process_files_chunk(process_id, file_chunk, config, output_dir, progress_dict, total_files_dict, failed_dict):
"""Process a chunk of files in a separate process"""
processor = YourModule()
processed = 0
failed = 0
for file_path in file_chunk:
try:
result = processor.process_file(config, file_path)
file_name = os.path.basename(file_path)
output_file = f"{output_dir}/{file_name}.json"
os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=4)
processed += 1
except Exception as e:
failed += 1
logger.error(f"Process {process_id}: Error processing {file_path}: {str(e)}")
# Update progress
progress_dict[process_id] = processed
failed_dict[process_id] = failed
def display_progress_with_header(collection_name, module_name, progress_dict, failed_dict,
total_files_dict, num_processes, start_time, total_files):
"""Display live progress updates with consistent header"""
while True:
clear()
# Use centralized header
print_header(collection_name, module_name)
current_time = time.time()
elapsed_time = current_time - start_time
# Module-specific content
print(f"\n{Colors.BOLD}{Colors.CYAN}BATCH PROCESSING STATUS{Colors.END}")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
print(f"{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
print(f"{Colors.WHITE}Active Processes: {Colors.BOLD}{num_processes}{Colors.END}")
print(f"{Colors.WHITE}Elapsed Time: {Colors.BOLD}{format_time(elapsed_time)}{Colors.END}")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
total_processed = 0
total_failed = 0
all_completed = True
# Process status
for i in range(1, num_processes + 1):
processed = progress_dict.get(i, 0)
failed = failed_dict.get(i, 0)
target = total_files_dict.get(i, 0)
total_processed += processed
total_failed += failed
if processed + failed < target:
all_completed = False
status = f"{Colors.GREEN}COMPLETED{Colors.END}" if processed + failed >= target else f"{Colors.YELLOW}PROCESSING{Colors.END}"
print(f"{Colors.WHITE}Process {i}: {Colors.BOLD}{processed}/{target}{Colors.END} Files Processed [{status}]")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
print(f"{Colors.WHITE}Overall Progress: {Colors.BOLD}{total_processed + total_failed}/{total_files}{Colors.END} files")
# Progress calculation
if total_processed > 0:
avg_time_per_file = elapsed_time / (total_processed + total_failed)
remaining_files = total_files - (total_processed + total_failed)
if remaining_files > 0:
eta = remaining_files * avg_time_per_file
print(f"{Colors.CYAN}Estimated Time Remaining: {Colors.BOLD}{format_time(eta)}{Colors.END}")
# Use centralized footer
print_footer()
if all_completed:
break
# Check for quit using centralized function
if check_quit():
print_warning("Processing terminated by user!")
return False
time.sleep(1) # Update every second
return True
def split_list(lst, n):
"""Split list into n approximately equal parts"""
k, m = divmod(len(lst), n)
return [lst[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]
def main(input_path, num_processes=4):
"""
Process all .txt files recursively from the input path using multiprocessing
"""
# Get document collection and module names from environment or use defaults
collection_name = os.environ.get('DATASET_NAME', os.path.basename(input_path))
module_name = os.environ.get('MODULE_NAME', 'Your Module Name')
start_time = time.time()
# Setup
output_dir = input_path.replace("/data/input/", "/data/output/") + "/processed_data"
config = YourModuleConfig(param1=100, param2="default") # Configure as needed
# Collect all txt files
txt_files = []
for root, dirs, files in os.walk(input_path):
for file in files:
if file.endswith('.txt'): # Adjust file extension as needed
txt_files.append(os.path.join(root, file))
total_files = len(txt_files)
if total_files == 0:
clear()
print_header(collection_name, module_name)
print_error("No files found in the specified directory.")
print_footer()
wait_for_enter("Press Enter to return to main menu...")
return
# Adjust processes
if total_files < num_processes:
num_processes = total_files
# Split files
file_chunks = split_list(txt_files, num_processes)
# Initial display using LiveConsole
with LiveConsole(collection_name, module_name) as console:
print(f"\n{Colors.BOLD}{Colors.CYAN}INITIALIZATION{Colors.END}")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
print(f"{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
print(f"{Colors.WHITE}Processes: {Colors.BOLD}{num_processes}{Colors.END}")
for i, chunk in enumerate(file_chunks, 1):
print(f"{Colors.WHITE}Process {i}: {Colors.BOLD}{len(chunk)}{Colors.END} files")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
print(f"{Colors.YELLOW}Starting processing in 3 seconds...{Colors.END}")
time.sleep(3)
# Multiprocessing setup
manager = Manager()
progress_dict = manager.dict()
total_files_dict = manager.dict()
failed_dict = manager.dict()
# Initialize dictionaries
for i in range(1, num_processes + 1):
progress_dict[i] = 0
failed_dict[i] = 0
total_files_dict[i] = len(file_chunks[i-1])
# Start processes
processes = []
for i, file_chunk in enumerate(file_chunks, 1):
p = Process(target=process_files_chunk,
args=(i, file_chunk, config, output_dir, progress_dict, total_files_dict, failed_dict))
p.start()
processes.append(p)
# Display progress with header
completed_normally = display_progress_with_header(
collection_name, module_name, progress_dict, failed_dict,
total_files_dict, num_processes, start_time, total_files
)
# Handle termination
if not completed_normally:
print("Terminating all processes...")
for p in processes:
if p.is_alive():
p.terminate()
for p in processes:
p.join(timeout=5)
if p.is_alive():
p.kill()
else:
# Wait for completion
for p in processes:
p.join()
# Final results
end_time = time.time()
total_time = end_time - start_time
total_successful = sum(progress_dict.values())
total_failed = sum(failed_dict.values())
# Final display with consistent formatting
clear()
print_header(collection_name, module_name)
print(f"\n{Colors.BOLD}{Colors.CYAN}PROCESSING RESULTS{Colors.END}")
print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
if completed_normally:
print_success("Processing completed successfully!")
else:
print_warning("Processing was terminated by user")
print(f"\n{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
print(f"{Colors.WHITE}Successfully Processed: {Colors.BOLD}{Colors.GREEN}{total_successful}{Colors.END}")
print(f"{Colors.WHITE}Failed: {Colors.BOLD}{Colors.RED}{total_failed}{Colors.END}")
print(f"{Colors.WHITE}Total Time: {Colors.BOLD}{format_time(total_time)}{Colors.END}")
if total_files > 0:
success_rate = (total_successful / total_files) * 100
color = Colors.GREEN if success_rate >= 90 else Colors.YELLOW if success_rate >= 75 else Colors.RED
print(f"{Colors.WHITE}Success Rate: {Colors.BOLD}{color}{success_rate:.2f}%{Colors.END}")
if total_failed > 0:
print_warning("Error details logged to 'error_logs.txt'")
print_footer()
# Wait for user
wait_for_enter("Press Enter to return to main menu...")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python batch_processor_app.py <input_path> [num_processes]")
print("Example: python batch_processor_app.py /path/to/input 4")
sys.exit(1)
input_path = sys.argv[1]
num_processes = int(sys.argv[2]) if len(sys.argv) > 2 else 4
# Limit to reasonable number of processes
max_processes = multiprocessing.cpu_count()
if num_processes > max_processes:
print_warning(f"Requested {num_processes} processes, but only {max_processes} CPU cores available.")
print(f"Using {max_processes} processes instead.")
num_processes = max_processes
main(input_path, num_processes)Edit main.py to add your new module to the modules dictionary:
# In main.py, inside the InteractiveCLI class
self.modules = {
"chunking_engine": {
"name": "Chunking Engine",
"script": "modules/chunking_engine/batch_processor_app.py",
"description": "Process and chunk text files",
"processes": 4
},
"your_module_name": {
"name": "Your Module Display Name",
"script": "modules/your_module_name/batch_processor_app.py",
"description": "Brief description of what your module does",
"processes": 4 # Default number of processes
}
# Add more modules here in the future
}If your module needs user-configurable parameters, add a configuration method to the InteractiveCLI class:
def configure_module(self) -> Dict:
"""Configure module parameters"""
if self.selected_module == "your_module_name":
clear()
print_box(f"CONFIGURE {self.modules[self.selected_module]['name'].upper()}")
print()
# Get user input for your parameters
while True:
try:
print_info("Enter param1 value (1-1000):")
param1 = input("> ").strip()
if not param1:
param1 = 100 # default
else:
param1 = int(param1)
if 1 <= param1 <= 1000:
break
else:
print_error("Please enter a value between 1 and 1000")
except ValueError:
print_error("Please enter a valid number")
except KeyboardInterrupt:
print("\n\nExiting...")
sys.exit(0)
# Add more parameter inputs as needed
return {"param1": param1, "param2": "configured_value"}
# Handle other modules...
elif self.selected_module == "chunking_engine":
# existing configuration...
return {}The current implementation includes a complete example module called "Chunking Engine". Here's how it's structured:
# Example implementation (simplified)
class ChunkingConfig:
def __init__(self, chunk_size=400, chunk_overlap=100, enable_semantic=False, debug=False):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.enable_semantic = enable_semantic
self.debug = debug
class ChunkingEngine:
def process_file(self, config, file_path):
# Implementation for text chunking
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Chunking logic here...
chunks = self.chunk_text(content, config)
return {
"file_name": os.path.basename(file_path),
"chunks": chunks,
"metadata": {
"chunk_size": config.chunk_size,
"chunk_overlap": config.chunk_overlap,
"total_chunks": len(chunks)
}
}This file handles:
- Multiprocessing setup
- Progress tracking with live updates
- Error handling and logging
- Integration with the main CLI framework
The module is registered in main.py as:
"chunking_engine": {
"name": "Chunking Engine",
"script": "modules/chunking_engine/batch_processor_app.py",
"description": "Process and chunk text files",
"processes": 4
}- Follow the Structure: Keep your module's files organized as shown
- Use Centralized Utilities: Import from
utils.consolefor consistent UI - Handle Errors Gracefully: Log errors and provide user-friendly messages
- Support Configuration: Allow users to customize processing parameters
- Document Your Code: Add docstrings and comments for maintainability
- Multiprocessing: Use multiple processes for CPU-intensive tasks
- File I/O: Minimize disk access by processing in memory when possible
- Progress Tracking: Update progress frequently for better user experience
- Resource Management: Clean up resources properly in case of interruption
- Logging: Use the provided logging setup for error tracking
- User Feedback: Provide clear error messages to users
- Graceful Degradation: Handle failures without crashing the entire process
- Recovery: Allow users to resume interrupted processing when possible
- Module Not Found: Ensure your module files are in the correct directory structure
- Import Errors: Check that all dependencies are installed and paths are correct
- Permission Errors: Ensure write permissions for output directories
- Memory Issues: Reduce the number of processes or document sizes for large collections
Enable debug logging by modifying the logging level in your module:
logging.basicConfig(
level=logging.DEBUG, # Change from ERROR to DEBUG
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('error_logs.txt', mode='a'),
logging.StreamHandler() # Also log to console
]
)- Follow the module structure guidelines
- Add comprehensive documentation
- Include unit tests in the
tests/directory - Update this README with your new module information
[Add your license information here]
For support and questions:
- Check the error logs in
error_logs.txt - Review the troubleshooting section above
- Create an issue in the repository
Happy RAG Building! 🚀