Skip to content

imabhisht/NCIIPC_AI_Grand_Challange

Repository files navigation

NCIIPC AI Grand Challenge - RAG Framework

A modular, interactive CLI framework for building and managing Retrieval-Augmented Generation (RAG) systems. Built with Python, this tool provides an easy-to-use interface for processing documents, creating embeddings, and running RAG modules with real-time progress tracking.

Features

  • Interactive CLI Interface: Navigate through document collections and RAG modules using arrow keys
  • Modular Architecture: Easily add new RAG modules without modifying core code
  • Multiprocessing Support: Parallel processing for improved performance
  • Real-time Progress Tracking: Live updates during processing with ETA calculations
  • Error Handling & Logging: Comprehensive error logging and user-friendly error messages
  • Cross-platform: Works on Linux, macOS, and Windows
  • Configurable: Customize RAG parameters for each module

Project Structure

NCIIPC_AI_Grand_Challange/
├── main.py                    # Main CLI application entry point
├── pyproject.toml            # Project configuration and dependencies
├── uv.lock                   # Dependency lock file
├── error_logs.txt            # Error log file
├── README.md                 # This file
├── data/
│   ├── input/               # Input document collections directory
│   │   └── <collection_name>/  # Document collection folders
│   └── output/              # Processed output directory
│       └── <collection_name>/
├── modules/                 # RAG modules directory
│   └── chunking_engine/     # Example module
│       ├── main.py          # Module core logic
│       ├── batch_processor.py
│       └── batch_processor_app.py
├── tests/                   # Unit tests
├── utils/                   # Utility modules
│   └── console.py           # Console formatting and interaction utilities
└── Infrastructure/          # Infrastructure-related files

Quick Start

Prerequisites

  • Python 3.8+
  • uv package manager (recommended) or pip

Installation

  1. Clone the repository:

    git clone <repository-url>
    cd NCIIPC_AI_Grand_Challange
  2. Install dependencies:

    # Using uv (recommended)
    uv sync
    
    # Or using pip
    pip install -r requirements.txt
  3. Run the application:

    # Using uv
    uv run python main.py
    
    # Or directly
    python main.py

CSV and JSON Row-Based Chunking

The chunking engine now supports row-based chunking for CSV and JSON files, which creates one chunk per row/record. This is ideal for structured data where each row represents a complete, logical record.

Features

  • CSV Files: Each row becomes a separate chunk with structured text representation
  • JSON Files: Each array element or object key becomes a separate chunk
  • Data Integrity: Preserves all original data in each chunk
  • Flexible Configuration: Choose between row-based or traditional text chunking

Configuration Options

Set these environment variables to control chunking behavior:

# Enable/disable row-based chunking
CSV_CHUNK_BY_ROW=true          # Default: true (one chunk per CSV row)
JSON_CHUNK_BY_RECORD=true      # Default: true (one chunk per JSON record/key)

# Traditional chunking parameters (when row-based is disabled)
CHUNK_SIZE=400                 # Maximum characters per chunk
CHUNK_OVERLAP=100              # Characters to overlap between chunks

Example Usage

CSV Row-Based Chunking

For a CSV file with product data:

product_id,name,color,description,price
123,"T-Shirt","Blue","Soft cotton shirt",29.99
124,"Jeans","Black","Slim fit jeans",79.99

Row-based chunking creates:

  • Chunk 1: product_id: 123, name: T-Shirt, color: Blue, description: Soft cotton shirt, price: 29.99
  • Chunk 2: product_id: 124, name: Jeans, color: Black, description: Slim fit jeans, price: 79.99

JSON Record-Based Chunking

For a JSON array of employees:

[
  {"id": 1, "name": "John", "department": "Engineering"},
  {"id": 2, "name": "Jane", "department": "Marketing"}
]

Record-based chunking creates:

  • Chunk 1: id: 1, name: John, department: Engineering
  • Chunk 2: id: 2, name: Jane, department: Marketing

Benefits of Row-Based Chunking

  1. High Data Integrity: Each chunk corresponds to a single, logical record
  2. Precise Search Results: Queries match complete records exactly
  3. Structured Context: All fields from a row are included in search context
  4. No Data Loss: Overlapping chunks don't split important information

Programmatic Usage

from modules.chunking_engine.main import ChunkingEngine, ChunkingConfig

# Enable row-based chunking
config = ChunkingConfig(
    csv_chunk_by_row=True,      # One chunk per CSV row
    json_chunk_by_record=True,  # One chunk per JSON record
    debug=True
)

engine = ChunkingEngine()
result = engine.process_file(config, 'data/products.csv')

# Each chunk contains the full row data
for chunk in result['data']['chunks']:
    print(f"Row data: {chunk['row_data']}")  # For CSV files
    print(f"Text: {chunk['text']}")          # Structured text representation

Usage

Basic Workflow

  1. Launch the CLI: Run python main.py
  2. Select Document Collection: Choose from available collections in data/input/
  3. Select Module: Choose a RAG module
  4. Configure: Set module-specific parameters
  5. Run: Execute the processing with real-time progress tracking

Adding Document Collections

Place your document collection folders in the data/input/ directory:

data/input/
├── my_documents/
│   ├── doc1.txt
│   ├── doc2.txt
│   └── ...
└── knowledge_base/
    └── ...

The tool automatically detects all subdirectories in data/input/ as available document collections.

Adding New RAG Modules

Module Structure

Each module should follow this structure:

modules/
└── your_module_name/
    ├── main.py              # Core module logic and classes
    ├── batch_processor.py   # Batch processing logic for RAG operations (optional)
    └── batch_processor_app.py  # CLI interface for the module

Step-by-Step Guide

1. Create Module Directory

mkdir modules/your_module_name

2. Implement Core Logic (main.py)

Create the main module file with your RAG processing logic:

"""
Your Module Name - Core RAG Processing Logic
"""
import os
from pathlib import Path
from typing import Dict, Any, List

class YourModuleConfig:
    """Configuration class for your module"""
    def __init__(self, param1: int = 100, param2: str = "default"):
        self.param1 = param1
        self.param2 = param2

class YourModule:
    """Main processing class"""
    
    def process_file(self, config: YourModuleConfig, file_path: str) -> Dict[str, Any]:
        """
        Process a single file
        
        Args:
            config: Module configuration
            file_path: Path to the file to process
            
        Returns:
            Dict containing processing results
        """
        try:
            # Your RAG processing logic here
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Process the content
            result = {
                "file_name": os.path.basename(file_path),
                "processed_content": self._process_content(content, config),
                "metadata": {
                    "param1": config.param1,
                    "param2": config.param2,
                    "processing_timestamp": "timestamp_here"
                }
            }
            
            return result
            
        except Exception as e:
            raise Exception(f"Error processing {file_path}: {str(e)}")
    
    def _process_content(self, content: str, config: YourModuleConfig) -> Any:
        """Your content processing logic for RAG"""
        # Implement your RAG processing logic
        return processed_content

3. Create Batch Processor (batch_processor_app.py)

Create the CLI interface that integrates with the main framework:

import os
import json
import sys
import logging
import time
import multiprocessing
from multiprocessing import Process, Manager
from pathlib import Path

# Add project root to path for imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

# Import centralized console utilities
from utils.console import (
    clear, format_time, check_quit, print_header, 
    print_footer, print_error, print_success, print_warning,
    wait_for_enter, LiveConsole, Colors
)

# Import your module
try:
    from modules.your_module_name.main import YourModule, YourModuleConfig
except ImportError:
    print_error("Could not import YourModule. Please ensure main.py exists.")
    sys.exit(1)

# Set up logging
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('error_logs.txt', mode='a'),
    ]
)
logger = logging.getLogger(__name__)

def process_files_chunk(process_id, file_chunk, config, output_dir, progress_dict, total_files_dict, failed_dict):
    """Process a chunk of files in a separate process"""
    processor = YourModule()
    processed = 0
    failed = 0
    
    for file_path in file_chunk:
        try:
            result = processor.process_file(config, file_path)
            
            file_name = os.path.basename(file_path)
            output_file = f"{output_dir}/{file_name}.json"
            
            os.makedirs(output_dir, exist_ok=True)
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(result, f, indent=4)
            
            processed += 1
            
        except Exception as e:
            failed += 1
            logger.error(f"Process {process_id}: Error processing {file_path}: {str(e)}")
        
        # Update progress
        progress_dict[process_id] = processed
        failed_dict[process_id] = failed

def display_progress_with_header(collection_name, module_name, progress_dict, failed_dict, 
                                total_files_dict, num_processes, start_time, total_files):
    """Display live progress updates with consistent header"""
    
    while True:
        clear()
        
        # Use centralized header
        print_header(collection_name, module_name)
        
        current_time = time.time()
        elapsed_time = current_time - start_time
        
        # Module-specific content
        print(f"\n{Colors.BOLD}{Colors.CYAN}BATCH PROCESSING STATUS{Colors.END}")
        print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
        print(f"{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
        print(f"{Colors.WHITE}Active Processes: {Colors.BOLD}{num_processes}{Colors.END}")
        print(f"{Colors.WHITE}Elapsed Time: {Colors.BOLD}{format_time(elapsed_time)}{Colors.END}")
        print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
        
        total_processed = 0
        total_failed = 0
        all_completed = True
        
        # Process status
        for i in range(1, num_processes + 1):
            processed = progress_dict.get(i, 0)
            failed = failed_dict.get(i, 0)
            target = total_files_dict.get(i, 0)
            
            total_processed += processed
            total_failed += failed
            
            if processed + failed < target:
                all_completed = False
            
            status = f"{Colors.GREEN}COMPLETED{Colors.END}" if processed + failed >= target else f"{Colors.YELLOW}PROCESSING{Colors.END}"
            print(f"{Colors.WHITE}Process {i}: {Colors.BOLD}{processed}/{target}{Colors.END} Files Processed [{status}]")
        
        print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
        print(f"{Colors.WHITE}Overall Progress: {Colors.BOLD}{total_processed + total_failed}/{total_files}{Colors.END} files")
        
        # Progress calculation
        if total_processed > 0:
            avg_time_per_file = elapsed_time / (total_processed + total_failed)
            remaining_files = total_files - (total_processed + total_failed)
            if remaining_files > 0:
                eta = remaining_files * avg_time_per_file
                print(f"{Colors.CYAN}Estimated Time Remaining: {Colors.BOLD}{format_time(eta)}{Colors.END}")
        
        # Use centralized footer
        print_footer()
        
        if all_completed:
            break
        
        # Check for quit using centralized function
        if check_quit():
            print_warning("Processing terminated by user!")
            return False
            
        time.sleep(1)  # Update every second
    
    return True

def split_list(lst, n):
    """Split list into n approximately equal parts"""
    k, m = divmod(len(lst), n)
    return [lst[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]

def main(input_path, num_processes=4):
    """
    Process all .txt files recursively from the input path using multiprocessing
    """
    # Get document collection and module names from environment or use defaults
    collection_name = os.environ.get('DATASET_NAME', os.path.basename(input_path))
    module_name = os.environ.get('MODULE_NAME', 'Your Module Name')
    
    start_time = time.time()
    
    # Setup
    output_dir = input_path.replace("/data/input/", "/data/output/") + "/processed_data"
    config = YourModuleConfig(param1=100, param2="default")  # Configure as needed
    
    # Collect all txt files
    txt_files = []
    for root, dirs, files in os.walk(input_path):
        for file in files:
            if file.endswith('.txt'):  # Adjust file extension as needed
                txt_files.append(os.path.join(root, file))
    
    total_files = len(txt_files)
    
    if total_files == 0:
        clear()
        print_header(collection_name, module_name)
        print_error("No files found in the specified directory.")
        print_footer()
        wait_for_enter("Press Enter to return to main menu...")
        return
    
    # Adjust processes
    if total_files < num_processes:
        num_processes = total_files
    
    # Split files
    file_chunks = split_list(txt_files, num_processes)
    
    # Initial display using LiveConsole
    with LiveConsole(collection_name, module_name) as console:
        print(f"\n{Colors.BOLD}{Colors.CYAN}INITIALIZATION{Colors.END}")
        print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
        print(f"{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
        print(f"{Colors.WHITE}Processes: {Colors.BOLD}{num_processes}{Colors.END}")
        
        for i, chunk in enumerate(file_chunks, 1):
            print(f"{Colors.WHITE}Process {i}: {Colors.BOLD}{len(chunk)}{Colors.END} files")
        
        print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
        print(f"{Colors.YELLOW}Starting processing in 3 seconds...{Colors.END}")
        time.sleep(3)
    
    # Multiprocessing setup
    manager = Manager()
    progress_dict = manager.dict()
    total_files_dict = manager.dict()
    failed_dict = manager.dict()
    
    # Initialize dictionaries
    for i in range(1, num_processes + 1):
        progress_dict[i] = 0
        failed_dict[i] = 0
        total_files_dict[i] = len(file_chunks[i-1])
    
    # Start processes
    processes = []
    for i, file_chunk in enumerate(file_chunks, 1):
        p = Process(target=process_files_chunk, 
                   args=(i, file_chunk, config, output_dir, progress_dict, total_files_dict, failed_dict))
        p.start()
        processes.append(p)
    
    # Display progress with header
    completed_normally = display_progress_with_header(
        collection_name, module_name, progress_dict, failed_dict, 
        total_files_dict, num_processes, start_time, total_files
    )
    
    # Handle termination
    if not completed_normally:
        print("Terminating all processes...")
        for p in processes:
            if p.is_alive():
                p.terminate()
        
        for p in processes:
            p.join(timeout=5)
            if p.is_alive():
                p.kill()
    else:
        # Wait for completion
        for p in processes:
            p.join()
    
    # Final results
    end_time = time.time()
    total_time = end_time - start_time
    
    total_successful = sum(progress_dict.values())
    total_failed = sum(failed_dict.values())
    
    # Final display with consistent formatting
    clear()
    print_header(collection_name, module_name)
    print(f"\n{Colors.BOLD}{Colors.CYAN}PROCESSING RESULTS{Colors.END}")
    print(f"{Colors.BLUE}{'-' * 60}{Colors.END}")
    
    if completed_normally:
        print_success("Processing completed successfully!")
    else:
        print_warning("Processing was terminated by user")
    
    print(f"\n{Colors.WHITE}Total Files: {Colors.BOLD}{total_files}{Colors.END}")
    print(f"{Colors.WHITE}Successfully Processed: {Colors.BOLD}{Colors.GREEN}{total_successful}{Colors.END}")
    print(f"{Colors.WHITE}Failed: {Colors.BOLD}{Colors.RED}{total_failed}{Colors.END}")
    print(f"{Colors.WHITE}Total Time: {Colors.BOLD}{format_time(total_time)}{Colors.END}")
    
    if total_files > 0:
        success_rate = (total_successful / total_files) * 100
        color = Colors.GREEN if success_rate >= 90 else Colors.YELLOW if success_rate >= 75 else Colors.RED
        print(f"{Colors.WHITE}Success Rate: {Colors.BOLD}{color}{success_rate:.2f}%{Colors.END}")
    
    if total_failed > 0:
        print_warning("Error details logged to 'error_logs.txt'")
    
    print_footer()
    
    # Wait for user
    wait_for_enter("Press Enter to return to main menu...")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python batch_processor_app.py <input_path> [num_processes]")
        print("Example: python batch_processor_app.py /path/to/input 4")
        sys.exit(1)
    
    input_path = sys.argv[1]
    num_processes = int(sys.argv[2]) if len(sys.argv) > 2 else 4
    
    # Limit to reasonable number of processes
    max_processes = multiprocessing.cpu_count()
    if num_processes > max_processes:
        print_warning(f"Requested {num_processes} processes, but only {max_processes} CPU cores available.")
        print(f"Using {max_processes} processes instead.")
        num_processes = max_processes
    
    main(input_path, num_processes)

4. Register Module in Main CLI

Edit main.py to add your new module to the modules dictionary:

# In main.py, inside the InteractiveCLI class
self.modules = {
    "chunking_engine": {
        "name": "Chunking Engine",
        "script": "modules/chunking_engine/batch_processor_app.py",
        "description": "Process and chunk text files",
        "processes": 4
    },
    "your_module_name": {
        "name": "Your Module Display Name",
        "script": "modules/your_module_name/batch_processor_app.py",
        "description": "Brief description of what your module does",
        "processes": 4  # Default number of processes
    }
    # Add more modules here in the future
}

5. Add Module-Specific Configuration (Optional)

If your module needs user-configurable parameters, add a configuration method to the InteractiveCLI class:

def configure_module(self) -> Dict:
    """Configure module parameters"""
    if self.selected_module == "your_module_name":
        clear()
        print_box(f"CONFIGURE {self.modules[self.selected_module]['name'].upper()}")
        print()
        
        # Get user input for your parameters
        while True:
            try:
                print_info("Enter param1 value (1-1000):")
                param1 = input("> ").strip()
                
                if not param1:
                    param1 = 100  # default
                else:
                    param1 = int(param1)
                
                if 1 <= param1 <= 1000:
                    break
                else:
                    print_error("Please enter a value between 1 and 1000")
            except ValueError:
                print_error("Please enter a valid number")
            except KeyboardInterrupt:
                print("\n\nExiting...")
                sys.exit(0)
        
        # Add more parameter inputs as needed
        
        return {"param1": param1, "param2": "configured_value"}
    
    # Handle other modules...
    elif self.selected_module == "chunking_engine":
        # existing configuration...
    
    return {}

Example: Chunking Engine Module

The current implementation includes a complete example module called "Chunking Engine". Here's how it's structured:

Core Logic (modules/chunking_engine/main.py)

# Example implementation (simplified)
class ChunkingConfig:
    def __init__(self, chunk_size=400, chunk_overlap=100, enable_semantic=False, debug=False):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.enable_semantic = enable_semantic
        self.debug = debug

class ChunkingEngine:
    def process_file(self, config, file_path):
        # Implementation for text chunking
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # Chunking logic here...
        chunks = self.chunk_text(content, config)
        
        return {
            "file_name": os.path.basename(file_path),
            "chunks": chunks,
            "metadata": {
                "chunk_size": config.chunk_size,
                "chunk_overlap": config.chunk_overlap,
                "total_chunks": len(chunks)
            }
        }

CLI Integration (modules/chunking_engine/batch_processor_app.py)

This file handles:

  • Multiprocessing setup
  • Progress tracking with live updates
  • Error handling and logging
  • Integration with the main CLI framework

Registration in Main CLI

The module is registered in main.py as:

"chunking_engine": {
    "name": "Chunking Engine",
    "script": "modules/chunking_engine/batch_processor_app.py",
    "description": "Process and chunk text files",
    "processes": 4
}

Best Practices

Module Development

  1. Follow the Structure: Keep your module's files organized as shown
  2. Use Centralized Utilities: Import from utils.console for consistent UI
  3. Handle Errors Gracefully: Log errors and provide user-friendly messages
  4. Support Configuration: Allow users to customize processing parameters
  5. Document Your Code: Add docstrings and comments for maintainability

Performance

  1. Multiprocessing: Use multiple processes for CPU-intensive tasks
  2. File I/O: Minimize disk access by processing in memory when possible
  3. Progress Tracking: Update progress frequently for better user experience
  4. Resource Management: Clean up resources properly in case of interruption

Error Handling

  1. Logging: Use the provided logging setup for error tracking
  2. User Feedback: Provide clear error messages to users
  3. Graceful Degradation: Handle failures without crashing the entire process
  4. Recovery: Allow users to resume interrupted processing when possible

Troubleshooting

Common Issues

  1. Module Not Found: Ensure your module files are in the correct directory structure
  2. Import Errors: Check that all dependencies are installed and paths are correct
  3. Permission Errors: Ensure write permissions for output directories
  4. Memory Issues: Reduce the number of processes or document sizes for large collections

Debug Mode

Enable debug logging by modifying the logging level in your module:

logging.basicConfig(
    level=logging.DEBUG,  # Change from ERROR to DEBUG
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('error_logs.txt', mode='a'),
        logging.StreamHandler()  # Also log to console
    ]
)

Contributing

  1. Follow the module structure guidelines
  2. Add comprehensive documentation
  3. Include unit tests in the tests/ directory
  4. Update this README with your new module information

License

[Add your license information here]

Support

For support and questions:

  • Check the error logs in error_logs.txt
  • Review the troubleshooting section above
  • Create an issue in the repository

Happy RAG Building! 🚀

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors