0% found this document useful (0 votes)
27 views51 pages

QUAD Project

The QUAD Project is a comprehensive guide for building a suite of four AI assistants (NAMI, RUSH, VEX, HUSK) optimized for Mac Mini M4 using free and open-source software. It includes detailed instructions, code explanations, and configurations for each assistant, focusing on academic content processing and integration with Telegram. The guide emphasizes modularity, resource management, and cross-assistant knowledge sharing to create a personalized AI ecosystem.

Uploaded by

ehsassethi007
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
27 views51 pages

QUAD Project

The QUAD Project is a comprehensive guide for building a suite of four AI assistants (NAMI, RUSH, VEX, HUSK) optimized for Mac Mini M4 using free and open-source software. It includes detailed instructions, code explanations, and configurations for each assistant, focusing on academic content processing and integration with Telegram. The guide emphasizes modularity, resource management, and cross-assistant knowledge sharing to create a personalized AI ecosystem.

Uploaded by

ehsassethi007
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 51

The QUAD Project: Your Personal AI

Ecosystem on Mac Mini M4


(Comprehensive Guide)

Upgraded from TRIO 2OU


Introduction: Building Your Intelligent Assistant Suite from
Scratch

Welcome, developer and creator! This comprehensive guide is your blueprint for constructing
and understanding the QUAD Project – a suite of four specialized AI assistants (NAMI, RUSH,
VEX, HUSK) – specifically tailored and optimized for your Mac Mini M4 (24GB RAM / 512GB
Storage variant). We embrace the "from scratch" philosophy, empowering you to build,
understand, customize, and potentially expand upon your very own personal AI ecosystem.

This guide focuses on using exclusively Free and Open Source Software (FOSS), ensuring
transparency, control, and cost-effectiveness (your only expense being the electricity bill!). We
will delve deep into the code, providing exhaustive line-by-line explanations for maximum
clarity, as you requested.

What You Will Build:

The QUAD Project consists of:

1. NAMI (Network and Machine Intelligence): Your system control and interaction hub,
managed via Telegram.
2. RUSH (Recording and Understanding Speech Helper): Processes audio, performs high-
performance transcription using whisper.cpp with Metal acceleration, and enables content
analysis (with hooks for local LLMs).
3. VEX (Video Exploration Helper): Analyzes video files, automatically detects scenes using
PySceneDetect, and extracts relevant clips using ffmpeg.
4. HUSK (Helpful Understanding & Study Knowledge): Analyzes academic PDFs and
documents, detects code and academic terminology, and provides study assistance through
Telegram.

Core Architectural Pillars & Features:

• Mac Mini M4 Optimization: Tailored setup leveraging Metal acceleration (MPS) and
efficient resource management for the 24GB RAM model.
• 100% FOSS Foundation: Built entirely with open-source tools (whisper.cpp, macOS say,
ChromaDB, Sentence Transformers, Flask, PySceneDetect, ffmpeg, launchd, psutil,
watchdog, python-telegram-bot, etc.).
• Modular Assistants: Independent yet cooperative assistants managed by a central
framework.
• Centralized Core Framework: Robust StateManager, MemoryManager,
CheckpointManager, and ModelController for lifecycle, resource awareness, and pause/
resume.
• Cross-Assistant Knowledge Sharing: A ChromaDB vector database allows assistants to
store and query information gathered by others, accessed via NAMI.
• Unified Web Dashboard: A Flask-based interface to monitor system resources (CPU,
RAM, Memory Pressure) and assistant status, with basic controls.
• Reliable Scheduling: Utilizes macOS launchd for robust, system-integrated process
management and scheduling.
• Detailed Guidance: Step-by-step instructions, complete code blocks, exhaustive line-by-
line explanations, troubleshooting tips, and performance optimization notes.

How to Use This Guide:

This guide is structured as a step-by-step build process. Follow the sections sequentially,
carefully copying the code into the specified files and executing the commands. The detailed
explanations accompanying each code line will illuminate the purpose and function of every
component. By the end, you will have not only a working QUAD Project but also a deep
understanding of its inner workings.

Prepare your editor and terminal. Let the development commence!


Part 9: HUSK Assistant Implementation

HUSK (Helpful Understanding & Study Knowledge) is the academic lecture assistant, designed
to analyze educational content, process academic PDFs, and provide study assistance. This
component extends the TRIO ecosystem into a QUAD system, leveraging the existing
infrastructure while adding specialized capabilities for academic contexts.

9.1 HUSK Overview and Role

Operating during typical study hours (e.g., 7:00 AM - 12:00 AM), HUSK aims to:

• Process Academic Content: Analyze PDFs, documents, and text files containing academic
material, with special attention to code snippets, formulas, and technical terminology.
• Integrate with RUSH: Leverage RUSH's transcription capabilities to process lecture
recordings and connect them with relevant study materials.
• Provide Study Assistance: Generate explanations, summaries, and study aids based on
processed content.
• Telegram Integration: Offer a conversational interface through Telegram, extending NAMI's
existing bot infrastructure.
• Knowledge Management: Maintain a specialized academic knowledge base that integrates
with the broader system.

9.2 HUSK Setup and Configuration

9.2.1 Dependencies

HUSK requires several FOSS libraries beyond the core system requirements:

# Install HUSK-specific dependencies


pip3 install PyPDF2 pdfminer.six python-docx nltk scikit-learn
gensim

• PyPDF2: A pure-Python library for PDF document manipulation.


• pdfminer.six: A tool for extracting information from PDF documents.
• python-docx: For processing Microsoft Word documents.
• nltk: Natural Language Toolkit for text processing and analysis.
• scikit-learn: Machine learning library for text classification.
• gensim: Topic modeling and document similarity analysis.
9.2.2 Configuration File ( husk_config.json )

Create a configuration file at ~/trio_project_m4/config/husk_config.json :

{
"general": {
"name": "HUSK",
"description": "Helpful Understanding & Study Knowledge",
"version": "1.0.0",
"active_hours": {
"start": "07:00",
"end": "00:00"
},
"working_directory": "~/trio_project_m4/husk",
"log_directory": "~/trio_project_m4/logs/husk"
},
"processing": {
"max_file_size_mb": 50,
"supported_formats": ["pdf", "docx", "txt", "md"],
"academic_keywords": [
"code", "algorithm", "function", "class", "method",
"theorem", "proof", "equation", "formula", "definition",
"example", "figure", "table", "reference", "citation"
],
"extraction_chunk_size": 1000,
"max_concurrent_files": 2
},
"telegram": {
"commands": {
"analyze": "Analyze an academic document",
"explain": "Request explanation of a concept",
"summarize": "Generate a summary of processed content",
"connect": "Connect document with RUSH transcriptions",
"status": "Check processing status",
"help": "Show available commands"
},
"message_templates": {
"welcome": "Welcome to HUSK, your academic assistant. Upload
a document to begin analysis.",
"processing": "Processing your document. This may take a few
minutes depending on size and complexity.",
"complete": "Analysis complete! Use /explain or /summarize to
interact with the content.",
"error": "An error occurred while processing your document.
Please try again."
}
},
"integration": {
"rush_data_path": "~/trio_project_m4/shared/
rush_transcriptions",
"knowledge_base_path": "~/trio_project_m4/shared/knowledge/
academic",
"vector_db_path": "~/trio_project_m4/shared/vector_db/academic"
},
"resources": {
"max_ram_usage_mb": 2048,
"max_cpu_percent": 70,
"checkpoint_interval_min": 15
}
}

• general: Basic configuration including name, description, active hours, and directories.
• processing: Settings for document processing, including file size limits, supported formats,
and academic keywords.
• telegram: Configuration for the Telegram bot interface, including commands and message
templates.
• integration: Paths for integration with other components, particularly RUSH.
• resources: Resource limits and checkpoint settings.

9.2.3 Directory Structure

Create the necessary directories for HUSK:

# Create HUSK directories


mkdir -p ~/trio_project_m4/husk/{processors,models,cache,temp}
mkdir -p ~/trio_project_m4/logs/husk
mkdir -p ~/trio_project_m4/shared/knowledge/academic
mkdir -p ~/trio_project_m4/shared/vector_db/academic

9.3 HUSK Main Script ( husk_main.py )

Create the main HUSK script at ~/trio_project_m4/husk/husk_main.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
HUSK (Helpful Understanding & Study Knowledge) - Academic Lecture
Assistant
Part of the QUAD Project for Mac Mini M4
This module serves as the main entry point for the HUSK assistant,
handling initialization, configuration, and the main processing
loop.
"""

import os
import sys
import json
import time
import logging
import signal
import argparse
from datetime import datetime
from pathlib import Path

# Add parent directory to path for imports


sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# Import core framework components


from core.state_manager import StateManager
from core.memory_manager import MemoryManager
from core.checkpoint_manager import CheckpointManager
from core.controller import ModelController

# Import HUSK-specific modules


from husk.processors.pdf_processor import PDFProcessor
from husk.processors.docx_processor import DocxProcessor
from husk.processors.text_processor import TextProcessor
from husk.knowledge.academic_kb import AcademicKnowledgeBase
from husk.telegram.husk_telegram import HuskTelegramBot

class HUSKAssistant:
"""
Main HUSK Assistant class that coordinates all academic content
processing,
knowledge management, and user interaction components.
"""

def __init__(self, config_path=None, debug=False):


"""
Initialize the HUSK Assistant with configuration and core
components.

Args:
config_path (str): Path to the configuration file
debug (bool): Enable debug logging if True
"""
# Setup logging
log_level = logging.DEBUG if debug else logging.INFO
self.setup_logging(log_level)
self.logger = logging.getLogger("HUSK")
self.logger.info("Initializing HUSK Assistant...")

# Load configuration
self.config_path = config_path or os.path.expanduser(
"~/trio_project_m4/config/husk_config.json"
)
self.load_configuration()

# Initialize core framework components


self.state_manager = StateManager("HUSK")
self.memory_manager = MemoryManager(
max_memory_mb=self.config["resources"]
["max_ram_usage_mb"],
component_name="HUSK"
)
self.checkpoint_manager = CheckpointManager(
component_name="HUSK",
checkpoint_dir=os.path.expanduser("~/trio_project_m4/
checkpoints/husk"),
interval_minutes=self.config["resources"]
["checkpoint_interval_min"]
)
self.model_controller = ModelController(
component_name="HUSK",
max_cpu_percent=self.config["resources"]
["max_cpu_percent"]
)

# Initialize document processors


self.processors = {
"pdf": PDFProcessor(),
"docx": DocxProcessor(),
"txt": TextProcessor(),
"md": TextProcessor()
}

# Initialize knowledge base


self.knowledge_base = AcademicKnowledgeBase(

vector_db_path=os.path.expanduser(self.config["integration"]
["vector_db_path"]),
academic_keywords=self.config["processing"]
["academic_keywords"]
)

# Initialize Telegram bot if enabled


self.telegram_bot = None
if "telegram" in self.config:
self.telegram_bot = HuskTelegramBot(
self.config["telegram"],
self.process_document,
self.knowledge_base
)

# Setup signal handlers


signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)

# Track active processing tasks


self.active_tasks = {}

self.logger.info("HUSK Assistant initialized successfully")

def setup_logging(self, log_level):


"""Configure logging for the HUSK assistant."""
log_dir = os.path.expanduser("~/trio_project_m4/logs/husk")
os.makedirs(log_dir, exist_ok=True)

log_file = os.path.join(log_dir,
f"husk_{datetime.now().strftime('%Y%m%d')}.log")

logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %
(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)

def load_configuration(self):
"""Load configuration from JSON file."""
try:
with open(self.config_path, 'r') as f:
self.config = json.load(f)

# Expand user paths in configuration


for section in ["general", "integration"]:
if section in self.config:
for key, value in self.config[section].items():
if isinstance(value, str) and "~" in value:
self.config[section][key] =
os.path.expanduser(value)
# Create necessary directories
os.makedirs(self.config["general"]
["working_directory"], exist_ok=True)
os.makedirs(self.config["general"]["log_directory"],
exist_ok=True)

except Exception as e:
self.logger.error(f"Failed to load configuration:
{str(e)}")
raise

def is_within_active_hours(self):
"""Check if current time is within configured active
hours."""
now = datetime.now().time()
start_time = datetime.strptime(self.config["general"]
["active_hours"]["start"], "%H:%M").time()
end_time = datetime.strptime(self.config["general"]
["active_hours"]["end"], "%H:%M").time()

# Handle case where end time is on the next day


if end_time < start_time:
return now >= start_time or now <= end_time
else:
return start_time <= now <= end_time

def process_document(self, file_path, user_id=None,


callback=None):
"""
Process an academic document and store its content in the
knowledge base.

Args:
file_path (str): Path to the document file
user_id (str): Identifier for the requesting user
callback (callable): Function to call with updates

Returns:
str: Task ID for tracking the processing
"""
try:
# Check if within active hours
if not self.is_within_active_hours():
message = "Outside of active hours. Try again
during active hours."
if callback:
callback({"status": "error", "message":
message})
return None
# Check file size
file_size_mb = os.path.getsize(file_path) / (1024 *
1024)
if file_size_mb > self.config["processing"]
["max_file_size_mb"]:
message = f"File too large ({file_size_mb:.1f} MB).
Maximum size is {self.config['processing']['max_file_size_mb']}
MB."
if callback:
callback({"status": "error", "message":
message})
return None

# Check file format


file_ext = os.path.splitext(file_path)
[1].lower().lstrip('.')
if file_ext not in self.config["processing"]
["supported_formats"]:
message = f"Unsupported file format: {file_ext}.
Supported formats: {', '.join(self.config['processing']
['supported_formats'])}"
if callback:
callback({"status": "error", "message":
message})
return None

# Check concurrent processing limit


if len(self.active_tasks) >= self.config["processing"]
["max_concurrent_files"]:
message = "Maximum number of concurrent processing
tasks reached. Try again later."
if callback:
callback({"status": "error", "message":
message})
return None

# Generate task ID and start processing


task_id = f"task_{int(time.time())}
_{os.path.basename(file_path)}"

# Update state
self.state_manager.set_state("processing")

# Start processing in a separate thread


import threading
thread = threading.Thread(
target=self._process_document_thread,
args=(task_id, file_path, file_ext, user_id,
callback)
)
thread.daemon = True
thread.start()

# Track the task


self.active_tasks[task_id] = {
"file_path": file_path,
"start_time": time.time(),
"status": "processing",
"user_id": user_id
}

return task_id

except Exception as e:
self.logger.error(f"Error starting document processing:
{str(e)}")
if callback:
callback({"status": "error", "message":
f"Processing error: {str(e)}"})
return None

def _process_document_thread(self, task_id, file_path,


file_ext, user_id, callback):
"""Thread function to process a document."""
try:
# Update callback with starting status
if callback:
callback({"status": "processing", "message":
"Starting document processing..."})

# Get appropriate processor


processor = self.processors.get(file_ext)
if not processor:
raise ValueError(f"No processor available for
{file_ext} files")

# Process the document


self.logger.info(f"Processing document: {file_path}")

# Extract content
content = processor.extract_content(file_path)

# Process academic content


academic_content = processor.extract_academic_content(
content,
self.config["processing"]["academic_keywords"]
)
# Store in knowledge base
doc_id = self.knowledge_base.add_document(
content=content,
academic_content=academic_content,
metadata={
"file_name": os.path.basename(file_path),
"file_type": file_ext,
"processed_date": datetime.now().isoformat(),
"user_id": user_id
}
)

# Check for RUSH transcriptions to connect


self._connect_with_rush_transcriptions(doc_id, content)

# Update task status


self.active_tasks[task_id]["status"] = "completed"
self.active_tasks[task_id]["completion_time"] =
time.time()
self.active_tasks[task_id]["document_id"] = doc_id

# Create checkpoint
self.checkpoint_manager.create_checkpoint({
"active_tasks": self.active_tasks,
"knowledge_base_status":
self.knowledge_base.get_status()
})

# Update callback with completion status


if callback:
callback({
"status": "completed",
"message": "Document processing completed",
"document_id": doc_id
})

self.logger.info(f"Document processing completed:


{file_path}")

except Exception as e:
self.logger.error(f"Error in document processing
thread: {str(e)}")
self.active_tasks[task_id]["status"] = "error"
self.active_tasks[task_id]["error"] = str(e)

if callback:
callback({"status": "error", "message":
f"Processing error: {str(e)}"})
finally:
# Check if all tasks are complete
active_processing = any(task["status"] == "processing"
for task in self.active_tasks.values())
if not active_processing:
self.state_manager.set_state("idle")

def _connect_with_rush_transcriptions(self, doc_id, content):


"""Connect document with relevant RUSH transcriptions."""
rush_dir = os.path.expanduser(self.config["integration"]
["rush_data_path"])
if not os.path.exists(rush_dir):
self.logger.warning(f"RUSH transcription directory not
found: {rush_dir}")
return

try:
# Find relevant transcriptions
relevant_transcriptions =
self.knowledge_base.find_related_transcriptions(
content, rush_dir
)

if relevant_transcriptions:
self.logger.info(f"Found
{len(relevant_transcriptions)} relevant RUSH transcriptions")

# Create connections in knowledge base


for trans_id, similarity in
relevant_transcriptions:
self.knowledge_base.create_connection(
doc_id,
trans_id,
{"type": "transcription_relation",
"similarity": similarity}
)

except Exception as e:
self.logger.error(f"Error connecting with RUSH
transcriptions: {str(e)}")

def get_task_status(self, task_id):


"""Get the status of a processing task."""
return self.active_tasks.get(task_id, {"status":
"not_found"})

def run(self):
"""Run the main HUSK assistant loop."""
self.logger.info("Starting HUSK assistant main loop")
self.state_manager.set_state("starting")

try:
# Restore from checkpoint if available
checkpoint_data =
self.checkpoint_manager.load_latest_checkpoint()
if checkpoint_data:
self.logger.info("Restoring from checkpoint")
if "active_tasks" in checkpoint_data:
self.active_tasks =
checkpoint_data["active_tasks"]
# Filter out completed tasks older than 24
hours
current_time = time.time()
self.active_tasks = {
task_id: task_data for task_id, task_data
in self.active_tasks.items()
if task_data["status"] != "completed" or
current_time -
task_data.get("completion_time", 0) < 86400
}

# Initialize knowledge base


self.knowledge_base.initialize()

# Start Telegram bot if configured


if self.telegram_bot:
self.telegram_bot.start()

# Set state to idle


self.state_manager.set_state("idle")

# Main loop
while True:
# Check if within active hours
if not self.is_within_active_hours():
if self.state_manager.get_state() !=
"sleeping":
self.logger.info("Outside active hours,
entering sleep mode")
self.state_manager.set_state("sleeping")

# Pause resource-intensive operations


self.model_controller.unload_models()

# Create checkpoint before sleeping


self.checkpoint_manager.create_checkpoint({
"active_tasks": self.active_tasks,
"knowledge_base_status":
self.knowledge_base.get_status()
})
else:
if self.state_manager.get_state() ==
"sleeping":
self.logger.info("Entering active hours,
resuming operations")
self.state_manager.set_state("idle")

# Perform periodic maintenance


self.memory_manager.check_memory_usage()

# Create periodic checkpoint


if
self.checkpoint_manager.should_create_checkpoint():
self.checkpoint_manager.create_checkpoint({
"active_tasks": self.active_tasks,
"knowledge_base_status":
self.knowledge_base.get_status()
})

# Sleep to prevent CPU hogging


time.sleep(10)

except KeyboardInterrupt:
self.logger.info("Keyboard interrupt received, shutting
down")
self.handle_shutdown(None, None)

except Exception as e:
self.logger.error(f"Error in main loop: {str(e)}")
self.handle_shutdown(None, None)

def handle_shutdown(self, signum, frame):


"""Handle graceful shutdown on signals."""
self.logger.info("Shutdown signal received, cleaning
up...")

# Update state
self.state_manager.set_state("shutting_down")

# Stop Telegram bot


if self.telegram_bot:
self.telegram_bot.stop()

# Create final checkpoint


self.checkpoint_manager.create_checkpoint({
"active_tasks": self.active_tasks,
"knowledge_base_status":
self.knowledge_base.get_status()
})

# Clean up resources
self.model_controller.unload_models()
self.knowledge_base.close()

self.logger.info("HUSK assistant shutdown complete")


sys.exit(0)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="HUSK Academic
Assistant")
parser.add_argument("--config", help="Path to configuration
file")
parser.add_argument("--debug", action="store_true",
help="Enable debug logging")
args = parser.parse_args()

husk = HUSKAssistant(config_path=args.config, debug=args.debug)


husk.run()

This main script initializes the HUSK assistant, sets up logging, loads configuration, and
manages the main processing loop. It handles document processing, integration with RUSH
transcriptions, and provides a Telegram interface for user interaction.

9.4 PDF Processor ( processors/pdf_processor.py )

Create the PDF processor module at ~/trio_project_m4/husk/processors/


pdf_processor.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
PDF Processor for HUSK Assistant
Handles extraction and analysis of academic content from PDF files
"""

import os
import re
import logging
import tempfile
from PyPDF2 import PdfReader
from pdfminer.high_level import extract_text as
pdfminer_extract_text

class PDFProcessor:
"""
Processor for PDF documents, extracting text and academic
content
with special handling for code blocks, formulas, and technical
terminology.
"""

def __init__(self):
"""Initialize the PDF processor."""
self.logger = logging.getLogger("HUSK.PDFProcessor")
self.logger.info("Initializing PDF Processor")

# Regular expressions for academic content detection


self.code_block_patterns = [
r'```[\s\S]*?```', # Markdown-style code blocks
r'def\s+\w+\s*\([^)]*\)\s*:', # Python function
definitions
r'function\s+\w+\s*\([^)]*\)\s*{', # JavaScript
function definitions
r'class\s+\w+(\s+extends\s+\w+)?\s*{', # Class
definitions
r'(public|private|protected)\s+\w+\s+\w+\s*\(', #
Java/C# method definitions
r'#include\s+[<"][\w.]+[>"]', # C/C++ include
statements
r'import\s+[\w.]+;?', # Java/Python import statements
r'for\s*\([^)]+\)\s*{', # For loops
r'while\s*\([^)]+\)\s*{', # While loops
r'if\s*\([^)]+\)\s*{', # If statements
]

self.formula_patterns = [
r'\$\$[\s\S]*?\$\$', # LaTeX display equations
r'\$[\s\S]*?\$', # LaTeX inline equations
r'\\begin{equation}[\s\S]*?\\end{equation}', # LaTeX
equation environment
r'\\begin{align}[\s\S]*?\\end{align}', # LaTeX align
environment
]

# Compile patterns for efficiency


self.code_block_regex =
re.compile('|'.join(self.code_block_patterns))
self.formula_regex =
re.compile('|'.join(self.formula_patterns))

def extract_content(self, file_path):


"""
Extract text content from a PDF file.

Args:
file_path (str): Path to the PDF file

Returns:
dict: Extracted content with page numbers
"""
self.logger.info(f"Extracting content from PDF:
{file_path}")

try:
# Try PyPDF2 first
content = self._extract_with_pypdf2(file_path)

# If PyPDF2 extraction is too small, try PDFMiner


if len(''.join(content.values())) < 100:
self.logger.info("PyPDF2 extraction yielded minimal
content, trying PDFMiner")
content = self._extract_with_pdfminer(file_path)

return content

except Exception as e:
self.logger.error(f"Error extracting PDF content:
{str(e)}")
raise

def _extract_with_pypdf2(self, file_path):


"""Extract text using PyPDF2."""
content = {}
try:
with open(file_path, 'rb') as file:
reader = PdfReader(file)
for i, page in enumerate(reader.pages):
text = page.extract_text()
if text:
content[f"page_{i+1}"] = text
return content
except Exception as e:
self.logger.warning(f"PyPDF2 extraction failed:
{str(e)}")
return {}

def _extract_with_pdfminer(self, file_path):


"""Extract text using PDFMiner."""
content = {}
try:
# Extract all text first
full_text = pdfminer_extract_text(file_path)

# Try to split by page if possible


if "Page " in full_text:
# Attempt to split by page markers
page_splits = re.split(r'Page \d+', full_text)
if len(page_splits) > 1:
for i, page_text in
enumerate(page_splits[1:]): # Skip first empty split
content[f"page_{i+1}"] = page_text.strip()
else:
content["full_text"] = full_text
else:
content["full_text"] = full_text

return content
except Exception as e:
self.logger.warning(f"PDFMiner extraction failed:
{str(e)}")
return {"error": f"Extraction failed: {str(e)}"}

def extract_academic_content(self, content, academic_keywords):


"""
Extract academic content from the extracted text.

Args:
content (dict): Extracted text content by page
academic_keywords (list): List of academic keywords to
detect

Returns:
dict: Extracted academic content by category
"""
academic_content = {
"code_blocks": [],
"formulas": [],
"definitions": [],
"key_terms": {},
"references": []
}

# Process each page


for page_num, text in content.items():
# Extract code blocks
code_blocks = self.code_block_regex.findall(text)
for block in code_blocks:
academic_content["code_blocks"].append({
"page": page_num,
"content": block
})

# Extract formulas
formulas = self.formula_regex.findall(text)
for formula in formulas:
academic_content["formulas"].append({
"page": page_num,
"content": formula
})

# Extract definitions (lines containing "Definition" or


similar)
definition_lines = re.findall(r'(?:Definition|Theorem|
Lemma|Corollary|Proposition)\s*(?:\d+\.?\d*)?[:\.]?\s*([^\n]+)',
text)
for definition in definition_lines:
academic_content["definitions"].append({
"page": page_num,
"content": definition.strip()
})

# Extract references
reference_lines = re.findall(r'\[\d+\].*?(?:\n|$)',
text)
for reference in reference_lines:
academic_content["references"].append({
"page": page_num,
"content": reference.strip()
})

# Count occurrences of academic keywords


for keyword in academic_keywords:
count = len(re.findall(r'\b' + re.escape(keyword) +
r'\b', text, re.IGNORECASE))
if count > 0:
if keyword not in
academic_content["key_terms"]:
academic_content["key_terms"][keyword] = 0
academic_content["key_terms"][keyword] += count

return academic_content

def extract_images(self, file_path, output_dir):


"""
Extract images from a PDF file.
Args:
file_path (str): Path to the PDF file
output_dir (str): Directory to save extracted images

Returns:
list: Paths to extracted images
"""
# This is a placeholder for image extraction functionality
# Actual implementation would use a library like PyMuPDF
(fitz)
self.logger.info(f"Image extraction not fully implemented")
return []

This module handles the extraction and analysis of academic content from PDF files, with
special attention to code blocks, formulas, and technical terminology.

9.5 Academic Knowledge Base ( knowledge/academic_kb.py )

Create the academic knowledge base module at ~/trio_project_m4/husk/knowledge/


academic_kb.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Academic Knowledge Base for HUSK Assistant
Manages storage and retrieval of academic content using vector
embeddings
"""

import os
import json
import logging
import time
import uuid
from pathlib import Path
import numpy as np
from datetime import datetime

# Import ChromaDB for vector storage


import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
class AcademicKnowledgeBase:
"""
Knowledge base for academic content using ChromaDB for vector
storage
and retrieval of document content and academic elements.
"""

def __init__(self, vector_db_path, academic_keywords=None):


"""
Initialize the academic knowledge base.

Args:
vector_db_path (str): Path to store the vector database
academic_keywords (list): List of academic keywords to
track
"""
self.logger = logging.getLogger("HUSK.AcademicKB")
self.logger.info("Initializing Academic Knowledge Base")

self.vector_db_path = vector_db_path
self.academic_keywords = academic_keywords or []

# Ensure directory exists


os.makedirs(self.vector_db_path, exist_ok=True)

# Initialize ChromaDB client


self.client = chromadb.PersistentClient(
path=self.vector_db_path,
settings=Settings(
anonymized_telemetry=False
)
)

# Initialize embedding function (using sentence-


transformers)
self.embedding_function =
embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2" # Lightweight model
suitable for Mac Mini M4
)

# Collections for different types of content


self.collections = {}

# Connection tracking
self.connections = {}
self.connection_file = os.path.join(self.vector_db_path,
"connections.json")
def initialize(self):
"""Initialize or load the knowledge base collections."""
try:
# Create or get collections
self.collections["documents"] =
self.client.get_or_create_collection(
name="academic_documents",
embedding_function=self.embedding_function
)

self.collections["chunks"] =
self.client.get_or_create_collection(
name="academic_chunks",
embedding_function=self.embedding_function
)

self.collections["code_blocks"] =
self.client.get_or_create_collection(
name="code_blocks",
embedding_function=self.embedding_function
)

self.collections["formulas"] =
self.client.get_or_create_collection(
name="formulas",
embedding_function=self.embedding_function
)

# Load connections if file exists


if os.path.exists(self.connection_file):
with open(self.connection_file, 'r') as f:
self.connections = json.load(f)

self.logger.info("Knowledge base collections


initialized")

except Exception as e:
self.logger.error(f"Error initializing knowledge base:
{str(e)}")
raise

def add_document(self, content, academic_content,


metadata=None):
"""
Add a document to the knowledge base.

Args:
content (dict): Document content by page
academic_content (dict): Extracted academic content
metadata (dict): Additional metadata

Returns:
str: Document ID
"""
try:
# Generate document ID
doc_id = f"doc_{uuid.uuid4().hex}"

# Prepare metadata
doc_metadata = metadata or {}
doc_metadata["added_at"] = datetime.now().isoformat()
doc_metadata["doc_id"] = doc_id

# Add full document text


full_text = "\n\n".join(content.values())
self.collections["documents"].add(
ids=[doc_id],
documents=[full_text],
metadatas=[doc_metadata]
)

# Add document chunks for better retrieval


self._add_document_chunks(doc_id, content,
doc_metadata)

# Add academic content


self._add_academic_elements(doc_id, academic_content,
doc_metadata)

self.logger.info(f"Added document {doc_id} to knowledge


base")
return doc_id

except Exception as e:
self.logger.error(f"Error adding document to knowledge
base: {str(e)}")
raise

def _add_document_chunks(self, doc_id, content, doc_metadata):


"""Add document chunks to the knowledge base."""
chunk_ids = []
chunk_texts = []
chunk_metadatas = []

# Process each page as a chunk


for page_num, text in content.items():
# Skip empty pages
if not text.strip():
continue

chunk_id = f"{doc_id}_chunk_{page_num}"
chunk_metadata = doc_metadata.copy()
chunk_metadata["chunk_id"] = chunk_id
chunk_metadata["page"] = page_num

chunk_ids.append(chunk_id)
chunk_texts.append(text)
chunk_metadatas.append(chunk_metadata)

# Add chunks to collection


if chunk_ids:
self.collections["chunks"].add(
ids=chunk_ids,
documents=chunk_texts,
metadatas=chunk_metadatas
)

def _add_academic_elements(self, doc_id, academic_content,


doc_metadata):
"""Add academic elements to specialized collections."""
# Add code blocks
code_block_ids = []
code_block_texts = []
code_block_metadatas = []

for i, code_block in
enumerate(academic_content.get("code_blocks", [])):
block_id = f"{doc_id}_code_{i}"
block_metadata = doc_metadata.copy()
block_metadata["block_id"] = block_id
block_metadata["page"] = code_block.get("page",
"unknown")

code_block_ids.append(block_id)
code_block_texts.append(code_block["content"])
code_block_metadatas.append(block_metadata)

if code_block_ids:
self.collections["code_blocks"].add(
ids=code_block_ids,
documents=code_block_texts,
metadatas=code_block_metadatas
)

# Add formulas
formula_ids = []
formula_texts = []
formula_metadatas = []

for i, formula in
enumerate(academic_content.get("formulas", [])):
formula_id = f"{doc_id}_formula_{i}"
formula_metadata = doc_metadata.copy()
formula_metadata["formula_id"] = formula_id
formula_metadata["page"] = formula.get("page",
"unknown")

formula_ids.append(formula_id)
formula_texts.append(formula["content"])
formula_metadatas.append(formula_metadata)

if formula_ids:
self.collections["formulas"].add(
ids=formula_ids,
documents=formula_texts,
metadatas=formula_metadatas
)

def query_documents(self, query_text, limit=5):


"""
Query the knowledge base for relevant documents.

Args:
query_text (str): Query text
limit (int): Maximum number of results

Returns:
list: Relevant document IDs and metadata
"""
try:
results = self.collections["chunks"].query(
query_texts=[query_text],
n_results=limit
)

# Process results
documents = []
if results["ids"] and results["ids"][0]:
for i, doc_id in enumerate(results["ids"][0]):
doc_metadata = results["metadatas"][0][i]
similarity = results["distances"][0][i] if
"distances" in results else None

# Extract original document ID from chunk ID


original_doc_id = doc_metadata.get("doc_id",
doc_id.split("_chunk_")[0])
documents.append({
"doc_id": original_doc_id,
"metadata": doc_metadata,
"similarity": similarity
})

return documents

except Exception as e:
self.logger.error(f"Error querying documents:
{str(e)}")
return []

def query_code_blocks(self, query_text, limit=5):


"""Query the knowledge base for relevant code blocks."""
try:
results = self.collections["code_blocks"].query(
query_texts=[query_text],
n_results=limit
)

# Process results
code_blocks = []
if results["ids"] and results["ids"][0]:
for i, block_id in enumerate(results["ids"][0]):
block_metadata = results["metadatas"][0][i]
block_content = results["documents"][0][i]
similarity = results["distances"][0][i] if
"distances" in results else None

code_blocks.append({
"block_id": block_id,
"content": block_content,
"metadata": block_metadata,
"similarity": similarity
})

return code_blocks

except Exception as e:
self.logger.error(f"Error querying code blocks:
{str(e)}")
return []

def find_related_transcriptions(self, content,


transcription_dir):
"""
Find RUSH transcriptions related to the document content.
Args:
content (dict): Document content
transcription_dir (str): Directory containing RUSH
transcriptions

Returns:
list: Tuples of (transcription_id, similarity_score)
"""
try:
# Check if directory exists
if not os.path.exists(transcription_dir):
return []

# Get full text


full_text = "\n\n".join(content.values())

# Find transcription files


transcription_files = []
for root, _, files in os.walk(transcription_dir):
for file in files:
if file.endswith(".txt") or
file.endswith(".json"):

transcription_files.append(os.path.join(root, file))

if not transcription_files:
return []

# Load transcriptions and calculate similarity


related_transcriptions = []
for file_path in transcription_files:
try:
# Load transcription content
trans_content = ""
if file_path.endswith(".json"):
with open(file_path, 'r') as f:
trans_data = json.load(f)
if "text" in trans_data:
trans_content = trans_data["text"]
elif "segments" in trans_data:
segments =
trans_data.get("segments", [])
trans_content = "
".join([seg.get("text", "") for seg in segments])
else: # .txt file
with open(file_path, 'r') as f:
trans_content = f.read()
if not trans_content:
continue

# Calculate similarity using vector embeddings


doc_embedding =
self.embedding_function([full_text])[0]
trans_embedding =
self.embedding_function([trans_content])[0]

# Calculate cosine similarity


similarity = np.dot(doc_embedding,
trans_embedding) / (
np.linalg.norm(doc_embedding) *
np.linalg.norm(trans_embedding)
)

# Use filename as ID
trans_id = os.path.basename(file_path)

# Add if similarity is above threshold


if similarity > 0.5: # Adjust threshold as
needed
related_transcriptions.append((trans_id,
float(similarity)))

except Exception as e:
self.logger.warning(f"Error processing
transcription {file_path}: {str(e)}")

# Sort by similarity (highest first)


related_transcriptions.sort(key=lambda x: x[1],
reverse=True)

return related_transcriptions

except Exception as e:
self.logger.error(f"Error finding related
transcriptions: {str(e)}")
return []

def create_connection(self, source_id, target_id,


metadata=None):
"""
Create a connection between two items in the knowledge
base.

Args:
source_id (str): Source item ID
target_id (str): Target item ID
metadata (dict): Connection metadata
"""
connection_id = f"conn_{uuid.uuid4().hex}"

connection = {
"id": connection_id,
"source": source_id,
"target": target_id,
"created_at": datetime.now().isoformat(),
"metadata": metadata or {}
}

self.connections[connection_id] = connection

# Save connections to file


self._save_connections()

return connection_id

def _save_connections(self):
"""Save connections to file."""
try:
with open(self.connection_file, 'w') as f:
json.dump(self.connections, f)
except Exception as e:
self.logger.error(f"Error saving connections:
{str(e)}")

def get_connections(self, item_id):


"""Get all connections for an item."""
related_connections = []

for conn_id, conn in self.connections.items():


if conn["source"] == item_id or conn["target"] ==
item_id:
related_connections.append(conn)

return related_connections

def get_status(self):
"""Get status information about the knowledge base."""
status = {
"collections": {},
"connection_count": len(self.connections)
}

for name, collection in self.collections.items():


try:
count = collection.count()
status["collections"][name] = count
except:
status["collections"][name] = "error"

return status

def close(self):
"""Close the knowledge base and save any pending
changes."""
self._save_connections()
self.logger.info("Knowledge base closed")

This module manages the storage and retrieval of academic content using vector embeddings,
allowing for semantic search and connections between documents and RUSH transcriptions.

9.6 Telegram Interface ( telegram/husk_telegram.py )

Create the Telegram interface module at ~/trio_project_m4/husk/telegram/


husk_telegram.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Telegram Interface for HUSK Assistant
Extends NAMI's Telegram bot with academic assistance capabilities
"""

import os
import logging
import threading
import tempfile
import json
from datetime import datetime

# Import telegram library


from telegram import Update, InlineKeyboardButton,
InlineKeyboardMarkup
from telegram.ext import (
ApplicationBuilder, CommandHandler, MessageHandler,
CallbackQueryHandler, ContextTypes, filters
)

class HuskTelegramBot:
"""
Telegram bot interface for HUSK assistant, extending NAMI's
capabilities
with academic content processing and assistance.
"""

def __init__(self, config, process_document_func,


knowledge_base):
"""
Initialize the HUSK Telegram bot.

Args:
config (dict): Telegram configuration
process_document_func (callable): Function to process
documents
knowledge_base (AcademicKnowledgeBase): Knowledge base
instance
"""
self.logger = logging.getLogger("HUSK.TelegramBot")
self.logger.info("Initializing HUSK Telegram Bot")

self.config = config
self.process_document = process_document_func
self.knowledge_base = knowledge_base

# Get token from NAMI's configuration


self.token = self._get_nami_token()
if not self.token:
self.logger.error("Failed to get Telegram token from
NAMI configuration")
raise ValueError("Telegram token not found")

# Initialize application
self.application = None
self.running = False

# Track active tasks by user


self.user_tasks = {}

def _get_nami_token(self):
"""Get Telegram token from NAMI's configuration."""
try:
nami_config_path = os.path.expanduser("~/
trio_project_m4/config/nami_config.json")
if os.path.exists(nami_config_path):
with open(nami_config_path, 'r') as f:
nami_config = json.load(f)
return nami_config.get("telegram",
{}).get("token")
return None
except Exception as e:
self.logger.error(f"Error reading NAMI configuration:
{str(e)}")
return None

async def start_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
welcome_message = self.config["message_templates"]
["welcome"]
await update.message.reply_text(welcome_message)

async def help_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
commands = self.config["commands"]
help_text = "HUSK Academic Assistant Commands:\n\n"
for cmd, desc in commands.items():
help_text += f"/{cmd} - {desc}\n"
help_text += "\nYou can also send PDF documents directly
for analysis."

await update.message.reply_text(help_text)

async def analyze_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /analyze command."""
await update.message.reply_text(
"Please upload an academic document (PDF, DOCX, TXT)
for analysis."
)

async def explain_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /explain command."""
if not context.args:
await update.message.reply_text(
"Please specify what you'd like me to explain. For
example:\n"
"/explain python decorators"
)
return

query = " ".join(context.args)


await update.message.reply_text(f"Searching for information
about: {query}")

# Search in knowledge base


results = self.knowledge_base.query_documents(query,
limit=3)
code_results = self.knowledge_base.query_code_blocks(query,
limit=2)

if not results and not code_results:


await update.message.reply_text(
"I don't have enough information about this topic
yet. "
"Try uploading relevant academic materials first."
)
return

# Prepare response
response = f"Here's what I found about '{query}':\n\n"

# Add document results


if results:
response += "📚 Related Documents:\n"
for i, result in enumerate(results):
metadata = result["metadata"]
doc_name = metadata.get("file_name", "Unknown
document")
page = metadata.get("page", "")
response += f"{i+1}. {doc_name} {page}\n"
response += "\n"

# Add code examples if relevant


if code_results:
response += "💻 Related Code Examples:\n"
for i, block in enumerate(code_results):
content = block["content"]
# Truncate long code blocks
if len(content) > 300:
content = content[:297] + "..."
response += f"Example {i+1}:\n```\n{content}
\n```\n"

await update.message.reply_text(response)

async def summarize_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /summarize command."""
user_id = str(update.effective_user.id)

# Check if user has processed documents


if user_id not in self.user_tasks or not
self.user_tasks[user_id]:
await update.message.reply_text(
"You haven't uploaded any documents for me to
summarize yet. "
"Please upload an academic document first."
)
return

# Get the most recent task


recent_task_id = list(self.user_tasks[user_id].keys())[-1]
task_info = self.user_tasks[user_id][recent_task_id]

if task_info.get("status") != "completed":
await update.message.reply_text(
"Your document is still being processed. Please try
again later."
)
return

doc_id = task_info.get("document_id")
if not doc_id:
await update.message.reply_text(
"I couldn't find the document information. Please
try uploading again."
)
return

await update.message.reply_text("Generating summary of your


document...")

# This would typically involve more sophisticated


summarization
# For now, we'll just return some basic information

# Get document connections (e.g., with RUSH transcriptions)


connections = self.knowledge_base.get_connections(doc_id)

summary = "📝 Document Summary:\n\n"


summary += f"Document ID: {doc_id}\n"
summary += f"Processed on:
{task_info.get('completion_time_str', 'Unknown')}\n"

if connections:
summary += "\n🔗 Connected Resources:\n"
for conn in connections:
conn_type = conn.get("metadata", {}).get("type",
"Unknown")
target = conn.get("target", "Unknown")
similarity = conn.get("metadata",
{}).get("similarity", 0)
summary += f"- {conn_type.capitalize()}: {target}
(Similarity: {similarity:.2f})\n"
summary += "\nUse /explain followed by a specific topic to
get more detailed information."

await update.message.reply_text(summary)

async def status_command(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle /status command."""
user_id = str(update.effective_user.id)

if user_id not in self.user_tasks or not


self.user_tasks[user_id]:
await update.message.reply_text("You have no active or
recent processing tasks.")
return

status_text = "📊 Your Processing Tasks:\n\n"

for task_id, task_info in self.user_tasks[user_id].items():


status = task_info.get("status", "unknown")
file_name = task_info.get("file_name", "Unknown file")

status_emoji = "⏳"
if status == "completed":
status_emoji = "✅"
elif status == "error":
status_emoji = "❌"

status_text += f"{status_emoji} {file_name}:


{status.capitalize()}\n"

if status == "error" and "error_message" in task_info:


status_text += f" Error:
{task_info['error_message']}\n"

if "start_time_str" in task_info:
status_text += f" Started:
{task_info['start_time_str']}\n"

if status == "completed" and "completion_time_str" in


task_info:
status_text += f" Completed:
{task_info['completion_time_str']}\n"

status_text += "\n"

await update.message.reply_text(status_text)
async def connect_command(self, update: Update, context:
ContextTypes.DEFAULT_TYPE):
"""Handle /connect command to connect documents with RUSH
transcriptions."""
user_id = str(update.effective_user.id)

if user_id not in self.user_tasks or not


self.user_tasks[user_id]:
await update.message.reply_text(
"You haven't uploaded any documents to connect with
transcriptions. "
"Please upload an academic document first."
)
return

# Get completed tasks


completed_tasks = {
task_id: info for task_id, info in
self.user_tasks[user_id].items()
if info.get("status") == "completed" and "document_id"
in info
}

if not completed_tasks:
await update.message.reply_text(
"You don't have any completed document processing
tasks to connect."
)
return

# If there's only one completed task, use it


if len(completed_tasks) == 1:
task_id = list(completed_tasks.keys())[0]
task_info = completed_tasks[task_id]
doc_id = task_info["document_id"]

await update.message.reply_text(
f"Connecting document '{task_info.get('file_name',
'Unknown')}' with RUSH transcriptions..."
)

# This would typically involve more sophisticated


connection logic
# For now, we'll just simulate the process

await update.message.reply_text(
"Connection process complete. Use /summarize to see
the connections."
)
return

# If there are multiple completed tasks, ask the user to


choose
keyboard = []
for task_id, info in completed_tasks.items():
file_name = info.get("file_name", "Unknown file")
keyboard.append([
InlineKeyboardButton(file_name,
callback_data=f"connect_{task_id}")
])

reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Which document would you like to connect with RUSH
transcriptions?",
reply_markup=reply_markup
)

async def handle_callback(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle callback queries from inline keyboards."""
query = update.callback_query
await query.answer()

data = query.data
user_id = str(update.effective_user.id)

if data.startswith("connect_"):
task_id = data[8:] # Remove "connect_" prefix

if user_id in self.user_tasks and task_id in


self.user_tasks[user_id]:
task_info = self.user_tasks[user_id][task_id]
doc_id = task_info.get("document_id")

if doc_id:
await query.edit_message_text(
f"Connecting document
'{task_info.get('file_name', 'Unknown')}' with RUSH
transcriptions..."
)

# This would typically involve more


sophisticated connection logic
# For now, we'll just simulate the process

await context.bot.send_message(
chat_id=update.effective_chat.id,
text="Connection process complete. Use /
summarize to see the connections."
)
else:
await query.edit_message_text(
"Could not find document information.
Please try uploading again."
)
else:
await query.edit_message_text(
"Task information not found. Please try again."
)

async def handle_document(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle document uploads."""
user_id = str(update.effective_user.id)

# Check if document is provided


if not update.message.document:
await update.message.reply_text("Please upload a
document file.")
return

document = update.message.document
file_name = document.file_name
file_ext = os.path.splitext(file_name)
[1].lower().lstrip('.')

# Check file extension


supported_formats = self.config["processing"]
["supported_formats"]
if file_ext not in supported_formats:
await update.message.reply_text(
f"Unsupported file format: {file_ext}. "
f"Supported formats: {',
'.join(supported_formats)}"
)
return

# Download file
await
update.message.reply_text(self.config["message_templates"]
["processing"])

new_file = await context.bot.get_file(document.file_id)

# Create temp directory for download


with tempfile.TemporaryDirectory() as temp_dir:
local_file_path = os.path.join(temp_dir, file_name)
await new_file.download_to_drive(local_file_path)

# Process document
self.logger.info(f"Processing document: {file_name} for
user {user_id}")

# Create callback for updates


def process_callback(update_info):
# Store update in user tasks
if user_id not in self.user_tasks:
self.user_tasks[user_id] = {}

if task_id not in self.user_tasks[user_id]:


self.user_tasks[user_id][task_id] = {
"file_name": file_name,
"start_time": time.time(),
"start_time_str":
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}

# Update task info


self.user_tasks[user_id]
[task_id].update(update_info)

if update_info["status"] == "completed":
self.user_tasks[user_id][task_id]
["completion_time"] = time.time()
self.user_tasks[user_id][task_id]
["completion_time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:
%S")

# Send update to user (in a non-blocking way)


message = update_info.get("message", f"Status:
{update_info['status']}")

async def send_update():


try:
await
context.bot.send_message(chat_id=update.effective_chat.id,
text=message)

if update_info["status"] == "completed":
keyboard = [
[InlineKeyboardButton("Summarize",
callback_data=f"summarize_{task_id}")],
[InlineKeyboardButton("Connect with
Transcriptions", callback_data=f"connect_{task_id}")]
]
reply_markup =
InlineKeyboardMarkup(keyboard)

await context.bot.send_message(
chat_id=update.effective_chat.id,
text="What would you like to do
with this document?",
reply_markup=reply_markup
)
except Exception as e:
self.logger.error(f"Error sending update to
user: {str(e)}")

# Use application.create_task to run coroutine


self.application.create_task(send_update())

# Start processing
task_id = self.process_document(
local_file_path,
user_id=user_id,
callback=process_callback
)

if not task_id:
await update.message.reply_text(
"Failed to start document processing. Please
try again later."
)
return

# Initialize task tracking


if user_id not in self.user_tasks:
self.user_tasks[user_id] = {}

self.user_tasks[user_id][task_id] = {
"task_id": task_id,
"file_name": file_name,
"status": "processing",
"start_time": time.time(),
"start_time_str": datetime.now().strftime("%Y-%m-%d
%H:%M:%S")
}

async def handle_text(self, update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Handle text messages."""
text = update.message.text

# Check if it looks like a question


if text.strip().endswith("?"):
await self.explain_command(update, context)
else:
await update.message.reply_text(
"I'm designed to help with academic content. You
can:\n"
"- Upload documents for analysis\n"
"- Ask questions with /explain\n"
"- Summarize documents with /summarize\n"
"- Check processing status with /status\n"
"Use /help for more information."
)

def start(self):
"""Start the Telegram bot."""
if self.running:
return

try:
self.logger.info("Starting HUSK Telegram bot")

# Create application
self.application =
ApplicationBuilder().token(self.token).build()

# Add handlers
self.application.add_handler(CommandHandler("start",
self.start_command))
self.application.add_handler(CommandHandler("help",
self.help_command))
self.application.add_handler(CommandHandler("analyze",
self.analyze_command))
self.application.add_handler(CommandHandler("explain",
self.explain_command))

self.application.add_handler(CommandHandler("summarize",
self.summarize_command))
self.application.add_handler(CommandHandler("status",
self.status_command))
self.application.add_handler(CommandHandler("connect",
self.connect_command))

self.application.add_handler(CallbackQueryHandler(self.handle_callback))

self.application.add_handler(MessageHandler(filters.DOCUMENT,
self.handle_document))
self.application.add_handler(MessageHandler(filters.TEXT &
~filters.COMMAND, self.handle_text))

# Start the bot in a separate thread


self.bot_thread =
threading.Thread(target=self._run_bot)
self.bot_thread.daemon = True
self.bot_thread.start()

self.running = True
self.logger.info("HUSK Telegram bot started")

except Exception as e:
self.logger.error(f"Error starting Telegram bot:
{str(e)}")
raise

def _run_bot(self):
"""Run the bot in a separate thread."""
import asyncio

try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Run the bot


self.application.run_polling()

except Exception as e:
self.logger.error(f"Error in bot thread: {str(e)}")

def stop(self):
"""Stop the Telegram bot."""
if not self.running:
return

try:
self.logger.info("Stopping HUSK Telegram bot")

if self.application:
self.application.stop()

self.running = False
self.logger.info("HUSK Telegram bot stopped")

except Exception as e:
self.logger.error(f"Error stopping Telegram bot:
{str(e)}")

This module extends NAMI's Telegram bot with HUSK-specific commands and handlers for
academic content processing and assistance.

9.7 Integration with Core Framework

9.7.1 Launchd Configuration ( husk.plist )

Create a launchd configuration file at ~/Library/LaunchAgents/


com.trio_project_m4.husk.plist :

<?xml version="1.0" encoding="UTF-8"?>


<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://
www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.trio_project_m4.husk</string>
<key>ProgramArguments</key>
<array>
<string>/usr/bin/python3</string>
<string>/Users/YOUR_USERNAME/trio_project_m4/husk/
husk_main.py</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/YOUR_USERNAME/trio_project_m4/logs/husk/
husk_error.log</string>
<key>StandardOutPath</key>
<string>/Users/YOUR_USERNAME/trio_project_m4/logs/husk/
husk_output.log</string>
<key>EnvironmentVariables</key>
<dict>
<key>PATH</key>
<string>/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/
homebrew/bin</string>
</dict>
<key>WorkingDirectory</key>
<string>/Users/YOUR_USERNAME/trio_project_m4</string>
</dict>
</plist>

Replace YOUR_USERNAME with your actual macOS username.

9.7.2 Dashboard Integration

Update the dashboard configuration at ~/trio_project_m4/config/


dashboard_config.json to include HUSK:

{
"components": [
{
"name": "NAMI",
"description": "Network and Machine Intelligence",
"status_endpoint": "/api/status/nami",
"color": "#4285F4"
},
{
"name": "RUSH",
"description": "Recording and Understanding Speech Helper",
"status_endpoint": "/api/status/rush",
"color": "#EA4335"
},
{
"name": "VEX",
"description": "Video Exploration Helper",
"status_endpoint": "/api/status/vex",
"color": "#FBBC05"
},
{
"name": "HUSK",
"description": "Helpful Understanding & Study Knowledge",
"status_endpoint": "/api/status/husk",
"color": "#34A853"
}
],
"refresh_interval_seconds": 5,
"port": 5000,
"host": "0.0.0.0"
}
9.8 Running and Testing HUSK

9.8.1 Manual Startup

To start HUSK manually for testing:

# Navigate to the project directory


cd ~/trio_project_m4

# Run HUSK with debug logging


python3 husk/husk_main.py --debug

9.8.2 Launchd Integration

To integrate with launchd for automatic startup:

# Load the launchd configuration


launchctl load ~/Library/LaunchAgents/
com.trio_project_m4.husk.plist

# Check status
launchctl list | grep husk

9.8.3 Testing Document Processing

1. Start a conversation with the NAMI/HUSK Telegram bot


2. Upload an academic PDF document
3. Use the /analyze command to process the document
4. Use /explain to query information from the document
5. Use /summarize to get a summary of the document
6. Use /connect to link with RUSH transcriptions

9.9 Troubleshooting

9.9.1 Common Issues

1. Telegram Bot Not Responding:


◦ Check if NAMI is running and the Telegram bot is active
◦ Verify that the token is correctly shared between NAMI and HUSK
◦ Check the HUSK logs for connection errors
2. Document Processing Failures:
◦ Verify that all required Python packages are installed
◦ Check file permissions for temporary directories
◦ Ensure the document format is supported
◦ Check memory usage and resource constraints
3. Integration Issues:
◦ Verify that shared directories exist and are accessible
◦ Check that RUSH transcriptions are in the expected format
◦ Ensure the ChromaDB vector database is properly initialized

9.9.2 Log Analysis

Check the HUSK logs for detailed error information:

# View the most recent log entries


tail -n 100 ~/trio_project_m4/logs/husk/husk_$(date +%Y%m%d).log

# Search for errors


grep -i error ~/trio_project_m4/logs/husk/husk_*.log

9.10 Performance Optimization

9.10.1 Memory Usage

HUSK is configured to use a maximum of 2GB RAM by default. Adjust the


max_ram_usage_mb setting in the configuration file based on your Mac Mini M4's available
resources and the needs of other assistants.

9.10.2 Embedding Model Selection

The default embedding model ( all-MiniLM-L6-v2 ) is chosen for its balance of performance
and resource usage. For higher accuracy at the cost of more memory, consider using larger
models like all-mpnet-base-v2 .

9.10.3 Concurrent Processing

The default configuration limits concurrent document processing to 2 files. Adjust the
max_concurrent_files setting based on your system's capabilities and typical usage
patterns.
9.11 Future Enhancements

Potential future enhancements for HUSK include:

1. Advanced Document Analysis: Implement more sophisticated academic content


extraction, including figures, tables, and citations.
2. Multi-Document Synthesis: Enable connections and synthesis across multiple academic
documents.
3. Interactive Learning: Develop features for quiz generation and interactive learning based
on processed content.
4. Collaborative Study: Add support for shared document analysis and collaborative study
sessions.
5. Custom Domain Adaptation: Allow specialization for specific academic domains like
computer science, mathematics, or medicine.

(End of Part 9: HUSK Assistant Implementation)


Conclusion: Your QUAD AI Ecosystem

Congratulations! You have successfully built and configured the QUAD Project, a
comprehensive AI assistant ecosystem optimized for your Mac Mini M4. This system leverages
the power of Free and Open Source Software to provide a suite of specialized assistants that
work together while maintaining independent functionality.

The QUAD Project now consists of:

• NAMI: Your central control and interaction hub


• RUSH: Your audio processing and transcription assistant
• VEX: Your video analysis and scene detection assistant
• HUSK: Your academic content analysis and study assistant

These four assistants form a powerful ecosystem that can handle a wide range of tasks, from
system control to media processing to academic assistance. The modular architecture allows
you to use each assistant independently or in combination, sharing knowledge and capabilities
across the system.

As you continue to work with your QUAD Project, remember that the entire system is built on
open-source principles, giving you complete control and the ability to customize and extend its
functionality to meet your specific needs.

Enjoy your personal AI ecosystem!

You might also like