0% found this document useful (0 votes)
42 views39 pages

College Project

This guide provides comprehensive instructions for implementing three AI models on a Mac Mini with M4 chip, including RUSH (Interactive Lecture Assistant), NAMI (System Control Assistant), and VEX (Automated Vlog Editor). It covers system requirements, software installation, model setup, and integration with Telegram bots for user interaction. Detailed steps for each model include creating configuration files, implementing audio processing, command execution, and file management functionalities.

Uploaded by

ehsassethi007
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
42 views39 pages

College Project

This guide provides comprehensive instructions for implementing three AI models on a Mac Mini with M4 chip, including RUSH (Interactive Lecture Assistant), NAMI (System Control Assistant), and VEX (Automated Vlog Editor). It covers system requirements, software installation, model setup, and integration with Telegram bots for user interaction. Detailed steps for each model include creating configuration files, implementing audio processing, command execution, and file management functionalities.

Uploaded by

ehsassethi007
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 39

Comprehensive Implementation Guide

for Mac Mini M4 AI Models


This guide provides detailed instructions for implementing three AI models on a Mac
Mini with M4 chip, 24GB RAM, and 512GB storage. The models are designed for college
use and include:

1. RUSH - Interactive Lecture Assistant (Telegram Bot)


2. NAMI - System Control Assistant (Telegram Bot)
3. VEX - Automated Vlog Editor

Table of Contents
1. System Requirements and Preparation
2. Model 1: RUSH - Interactive Lecture Assistant
3. Model 2: NAMI - System Control Assistant
4. Model 3: VEX - Automated Vlog Editor
5. System Integration and Resource Management
6. Troubleshooting and Optimization
7. Alternative Approaches

System Requirements and Preparation

Hardware Requirements

• Mac Mini with M4 chip


• 24GB RAM
• 512GB storage
• External SSD for video storage (recommended)
• Stable internet connection

Software Prerequisites

Before beginning implementation, install the following:

# Install Homebrew (package manager for macOS)


/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/
HEAD/install.sh)"
# Install Python and required tools
brew install python@3.11 ffmpeg node git

# Install Python packages


pip3 install python-telegram-bot openai-whisper pydub numpy pandas matplotlib
torch torchvision opencv-python moviepy pytube elevenlabs paramiko scp

System Configuration

1. Disable Sleep Mode:


2. Go to System Preferences > Energy Saver
3. Set "Computer sleep" to "Never"

4. Check "Prevent computer from sleeping automatically when the display is off"

5. Set Up Development Environment: bash mkdir -p ~/ai_models/{rush,nami,vex}


mkdir -p ~/ai_models/shared

Model 1: RUSH - Interactive Lecture Assistant


RUSH is designed to process lecture recordings, generate questions, and interact with
you using voice synthesis.

Step 1: Create Telegram Bot

1. Open Telegram and search for "BotFather"


2. Start a chat and send /newbot
3. Follow instructions to create a bot named "RUSH"
4. Save the API token provided by BotFather

Step 2: Set Up Local Environment

Create a configuration file:

cd ~/ai_models/rush
touch config.py

Edit config.py with the following content:

# Telegram Bot API Token


TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
# User ID (your Telegram user ID)
USER_ID = "YOUR_TELEGRAM_USER_ID" # You can get this from @userinfobot

# Paths
AUDIO_PATH = "/Users/yourusername/ai_models/rush/audio"
TRANSCRIPT_PATH = "/Users/yourusername/ai_models/rush/transcripts"

# ElevenLabs API (for voice synthesis)


ELEVENLABS_API_KEY = "YOUR_ELEVENLABS_API_KEY"
VOICE_ID = "BAPI_LARRI_VOICE_ID" # You'll create this voice in ElevenLabs

Create necessary directories:

mkdir -p ~/ai_models/rush/{audio,transcripts,responses}

Step 3: Set Up Voice Synthesis with ElevenLabs

1. Create an account at ElevenLabs


2. Generate an API key from your profile settings
3. Create a new voice similar to "Bapi Larri" using their voice cloning feature
4. Note the Voice ID for your configuration

Step 4: Implement Audio Processing with Whisper

Create transcribe.py :

import os
import whisper
from config import AUDIO_PATH, TRANSCRIPT_PATH

def transcribe_audio(file_path):
# Load Whisper model (can be 'tiny', 'base', 'small', 'medium', or 'large')
model = whisper.load_model("medium")

# Transcribe audio
result = model.transcribe(file_path)

# Get filename without extension


base_name = os.path.basename(file_path).split('.')[0]

# Save transcript
transcript_path = os.path.join(TRANSCRIPT_PATH, f"{base_name}.txt")
with open(transcript_path, "w") as f:
f.write(result["text"])

return transcript_path, result["text"]


Step 5: Implement Question Generation

Create question_generator.py :

import os
import openai
from config import TRANSCRIPT_PATH

# Set your OpenAI API key


openai.api_key = "YOUR_OPENAI_API_KEY"

def generate_questions(transcript_text, num_questions=5):


"""Generate curious questions based on lecture transcript"""

prompt = f"""
The following is a transcript from a college lecture:

{transcript_text[:4000]} # Limiting to 4000 chars to avoid token limits

Generate {num_questions} curious and insightful questions that would help a


student better understand
the material. The questions should be thought-provoking and demonstrate
curiosity about the subject.
Format each question on a new line with a number.
"""

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a curious and insightful teaching
assistant."},
{"role": "user", "content": prompt}
]
)

return response.choices[0].message.content

Step 6: Implement Voice Synthesis

Create voice_synthesis.py :

import requests
import os
from config import ELEVENLABS_API_KEY, VOICE_ID

def synthesize_speech(text, output_path):


"""Convert text to speech using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"

headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": ELEVENLABS_API_KEY
}

data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}

response = requests.post(url, json=data, headers=headers)

if response.status_code == 200:
with open(output_path, 'wb') as f:
f.write(response.content)
return output_path
else:
print(f"Error: {response.status_code}")
print(response.text)
return None

Step 7: Create Telegram Bot Handler

Create bot.py :

import os
import logging
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler,
MessageHandler, filters, ContextTypes
import tempfile
from pydub import AudioSegment

from config import TELEGRAM_TOKEN, USER_ID, AUDIO_PATH, TRANSCRIPT_PATH


from transcribe import transcribe_audio
from question_generator import generate_questions
from voice_synthesis import synthesize_speech

# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /start is issued."""
await
update.message.reply_text('Hi! I am RUSH, your lecture assistant. Send me audio
recordings of your lectures, and I will ask you curious questions about them.')

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Send a message when the command /help is issued."""
await update.message.reply_text('Send me an audio recording of your lecture,
and I will transcribe it and ask you questions about it.')

async def process_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Process audio files sent by the user."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Notify user that processing has begun


await update.message.reply_text("I'm processing your lecture recording. This
might take a few minutes...")

# Get the audio file


audio_file = await context.bot.get_file(update.message.voice.file_id)

# Create a temporary file


with tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) as temp_file:
temp_path = temp_file.name

# Download the file


await audio_file.download_to_drive(temp_path)

# Convert ogg to wav (Whisper works better with wav)


audio = AudioSegment.from_ogg(temp_path)
wav_path = os.path.join(AUDIO_PATH,
f"lecture_{update.message.message_id}.wav")
audio.export(wav_path, format="wav")

# Remove temporary file


os.unlink(temp_path)

# Transcribe audio
await update.message.reply_text("Transcribing your lecture...")
transcript_path, transcript_text = transcribe_audio(wav_path)

# Generate questions
await update.message.reply_text("Generating curious questions about the
lecture...")
questions = generate_questions(transcript_text)
# Synthesize speech
await update.message.reply_text("Creating voice response...")
voice_path = os.path.join(AUDIO_PATH,
f"response_{update.message.message_id}.mp3")
synthesize_speech(questions, voice_path)

# Send transcript
with open(transcript_path, 'r') as f:
await update.message.reply_text("Here's the transcript of your lecture:")
await update.message.reply_text(f.read()[:4000] + "..." if len(f.read()) > 4000
else f.read())

# Send questions as text


await update.message.reply_text("Here are some curious questions about the
lecture:")
await update.message.reply_text(questions)

# Send questions as voice message


with open(voice_path, 'rb') as voice:
await update.message.reply_voice(voice)

def main():
"""Start the bot."""
# Create the Application
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()

# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.VOICE, process_audio))

# Run the bot


application.run_polling()

if __name__ == '__main__':
main()

Step 8: Create Startup Script

Create start_rush.sh :

#!/bin/bash
cd ~/ai_models/rush
python3 bot.py

Make it executable:
chmod +x start_rush.sh

Step 9: Set Up Automatic Startup

Create a LaunchAgent to start RUSH automatically:

mkdir -p ~/Library/LaunchAgents

Create ~/Library/LaunchAgents/com.user.rush.plist :

<?xml version="1.0" encoding="UTF-8"?>


<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.rush</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/rush/start_rush.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/rush/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/rush/output.log</string>
</dict>
</plist>

Load the LaunchAgent:

launchctl load ~/Library/LaunchAgents/com.user.rush.plist

Model 2: NAMI - System Control Assistant


NAMI provides terminal access to your system through Telegram, allowing you to
retrieve files and execute commands remotely.
Step 1: Create Telegram Bot

1. Open Telegram and search for "BotFather"


2. Start a chat and send /newbot
3. Follow instructions to create a bot named "NAMI"
4. Save the API token provided by BotFather

Step 2: Set Up Local Environment

Create a configuration file:

cd ~/ai_models/nami
touch config.py

Edit config.py with the following content:

# Telegram Bot API Token


TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"

# User ID (your Telegram user ID)


USER_ID = "YOUR_TELEGRAM_USER_ID" # You can get this from @userinfobot

# Allowed commands (for security)


ALLOWED_COMMANDS = [
"ls", "cd", "pwd", "cat", "head", "tail", "grep", "find",
"cp", "mv", "mkdir", "touch", "rm", "python", "python3"
]

# Paths
DOWNLOAD_PATH = "/Users/yourusername/ai_models/nami/downloads"
UPLOAD_PATH = "/Users/yourusername/ai_models/nami/uploads"

Create necessary directories:

mkdir -p ~/ai_models/nami/{downloads,uploads,logs}

Step 3: Implement Command Execution Module

Create command_executor.py :

import subprocess
import os
import re
from config import ALLOWED_COMMANDS
def is_command_allowed(command):
"""Check if the command is in the allowed list"""
# Extract the base command (before any arguments)
base_command = command.split()[0]

return base_command in ALLOWED_COMMANDS

def execute_command(command, working_dir=None):


"""Execute a shell command and return the output"""
if not is_command_allowed(command):
return False, f"Command '{command.split()[0]}' is not allowed for security
reasons."

try:
# Set working directory if provided
cwd = working_dir if working_dir else os.getcwd()

# Execute command
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=cwd
)

# Get output
stdout, stderr = process.communicate(timeout=60) # 60-second timeout

# Check if there was an error


if process.returncode != 0:
return False, f"Error executing command:\n{stderr}"

return True, stdout

except subprocess.TimeoutExpired:
return False, "Command execution timed out after 60 seconds."
except Exception as e:
return False, f"Error executing command: {str(e)}"

Step 4: Implement File Transfer Module

Create file_manager.py :

import os
import shutil
from config import DOWNLOAD_PATH, UPLOAD_PATH
def get_file_path(filename, directory=None):
"""Get the full path for a file"""
if directory:
# Make sure the path doesn't escape the allowed directories
if ".." in directory or directory.startswith("/"):
return None

full_path = os.path.join(os.path.expanduser("~"), directory, filename)


else:
full_path = os.path.join(UPLOAD_PATH, filename)

return full_path

def save_file(file_obj, filename):


"""Save a file to the uploads directory"""
os.makedirs(UPLOAD_PATH, exist_ok=True)
file_path = os.path.join(UPLOAD_PATH, filename)

with open(file_path, 'wb') as f:


f.write(file_obj.read())

return file_path

def get_file(file_path):
"""Get a file from the system"""
# Security check to prevent directory traversal
if ".." in file_path or not os.path.exists(file_path):
return None

if os.path.isfile(file_path):
return file_path

return None

Step 5: Create Natural Language Command Parser

Create nl_parser.py :

import re
import os
import openai

# Set your OpenAI API key


openai.api_key = "YOUR_OPENAI_API_KEY"

def parse_natural_language(text):
"""Convert natural language to shell commands"""

prompt = f"""
Convert the following natural language request into a shell command or a series
of shell commands.
Only return the command(s), nothing else.

Request: {text}

Commands:
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content":
"You are a helpful assistant that converts natural language into shell commands.
Only output the commands, no explanations."},
{"role": "user", "content": prompt}
]
)

commands = response.choices[0].message.content.strip()

# Clean up the response to ensure it's just commands


commands = re.sub(r'^```bash\n', '', commands)
commands = re.sub(r'\n```$', '', commands)

return commands

Step 6: Create Telegram Bot Handler

Create bot.py :

import os
import logging
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler,
MessageHandler, filters, ContextTypes
import tempfile

from config import TELEGRAM_TOKEN, USER_ID, DOWNLOAD_PATH,


UPLOAD_PATH
from command_executor import execute_command
from file_manager import get_file_path, save_file, get_file
from nl_parser import parse_natural_language

# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)

# Current working directory for each user


user_cwd = {}

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Send a message when the command /start is issued."""
await
update.message.reply_text('Hi! I am NAMI, your system control assistant. I can help
you access files and execute commands on your Mac Mini.')

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Send a message when the command /help is issued."""
help_text = """
I can help you with the following:

1. Execute shell commands: Just send me a command like 'ls -la' or describe what
you want in natural language.
2. Get files: Use /get [path/to/file] to retrieve a file from your Mac.
3. Change directory: Use /cd [directory] to change the working directory.
4. Current directory: Use /pwd to see the current working directory.

For security reasons, I can only execute certain commands and access files within
allowed directories.
"""
await update.message.reply_text(help_text)

async def execute_shell_command(update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Execute a shell command."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Get the command


command = update.message.text

# Get the current working directory for this user


cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))

# Execute the command


success, output = execute_command(command, cwd)

# Update the current directory if it was a cd command


if success and command.startswith("cd "):
new_dir = command[3:].strip()
if new_dir.startswith("/"):
# Absolute path
user_cwd[str(update.message.from_user.id)] = new_dir
else:
# Relative path
user_cwd[str(update.message.from_user.id)] =
os.path.normpath(os.path.join(cwd, new_dir))
# Send the output
if success:
# Split long outputs into multiple messages
if len(output) > 4000:
for i in range(0, len(output), 4000):
await update.message.reply_text(output[i:i+4000])
else:
await update.message.reply_text(output if output else
"Command executed successfully (no output).")
else:
await update.message.reply_text(f"Error: {output}")

async def process_natural_language(update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Process natural language requests."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Get the text


text = update.message.text

# Parse the natural language


await update.message.reply_text("Processing your request...")
command = parse_natural_language(text)

# Send the command for confirmation


await update.message.reply_text(f"I'll execute: `{command}`")

# Get the current working directory for this user


cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))

# Execute the command


success, output = execute_command(command, cwd)

# Send the output


if success:
# Split long outputs into multiple messages
if len(output) > 4000:
for i in range(0, len(output), 4000):
await update.message.reply_text(output[i:i+4000])
else:
await update.message.reply_text(output if output else
"Command executed successfully (no output).")
else:
await update.message.reply_text(f"Error: {output}")

async def get_file_command(update: Update, context:


ContextTypes.DEFAULT_TYPE):
"""Get a file from the system."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Check if a file path was provided


if not context.args:
await update.message.reply_text("Please provide a file path. Usage: /get
[path/to/file]")
return

# Get the file path


file_path = " ".join(context.args)

# Get the current working directory for this user


cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))

# If the path is not absolute, make it relative to the current directory


if not file_path.startswith("/"):
file_path = os.path.join(cwd, file_path)

# Check if the file exists


if not os.path.exists(file_path):
await update.message.reply_text(f"File not found: {file_path}")
return

# Check if it's a file


if not os.path.isfile(file_path):
await update.message.reply_text(f"Not a file: {file_path}")
return

# Send the file


try:
with open(file_path, 'rb') as f:
await update.message.reply_document(document=f,
filename=os.path.basename(file_path))
except Exception as e:
await update.message.reply_text(f"Error sending file: {str(e)}")

async def cd_command(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Change the current working directory."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Check if a directory was provided


if not context.args:
# Default to home directory
user_cwd[str(update.message.from_user.id)] = os.path.expanduser("~")
await update.message.reply_text(f"Changed directory to:
{os.path.expanduser('~')}")
return

# Get the directory


directory = " ".join(context.args)

# Get the current working directory for this user


cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))

# If the path is not absolute, make it relative to the current directory


if not directory.startswith("/"):
new_dir = os.path.normpath(os.path.join(cwd, directory))
else:
new_dir = directory

# Check if the directory exists


if not os.path.exists(new_dir):
await update.message.reply_text(f"Directory not found: {new_dir}")
return

# Check if it's a directory


if not os.path.isdir(new_dir):
await update.message.reply_text(f"Not a directory: {new_dir}")
return

# Update the current directory


user_cwd[str(update.message.from_user.id)] = new_dir
await update.message.reply_text(f"Changed directory to: {new_dir}")

async def pwd_command(update: Update, context: ContextTypes.DEFAULT_TYPE):


"""Show the current working directory."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return

# Get the current working directory for this user


cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))

# Send the current directory


await update.message.reply_text(f"Current directory: {cwd}")

def main():
"""Start the bot."""
# Create the Application
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()

# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("get", get_file_command))
application.add_handler(CommandHandler("cd", cd_command))
application.add_handler(CommandHandler("pwd", pwd_command))

# Handle shell commands (messages that start with a command)


application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.Regex(r'^[a-zA-Z0-9_]+\s'),
execute_shell_command
))

# Handle natural language requests


application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND & ~filters.Regex(r'^[a-zA-Z0-9_]+\s'),
process_natural_language
))

# Run the bot


application.run_polling()

if __name__ == '__main__':
main()

Step 7: Create Startup Script

Create start_nami.sh :

#!/bin/bash
cd ~/ai_models/nami
python3 bot.py

Make it executable:

chmod +x start_nami.sh

Step 8: Set Up Automatic Startup

Create a LaunchAgent to start NAMI automatically:

mkdir -p ~/Library/LaunchAgents

Create ~/Library/LaunchAgents/com.user.nami.plist :

<?xml version="1.0" encoding="UTF-8"?>


<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.nami</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/nami/start_nami.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/nami/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/nami/output.log</string>
</dict>
</plist>

Load the LaunchAgent:

launchctl load ~/Library/LaunchAgents/com.user.nami.plist

Model 3: VEX - Automated Vlog Editor


VEX automatically edits vlogs, adds memes, and creates short-form content while you
sleep.

Step 1: Set Up Local Environment

Create a configuration file:

cd ~/ai_models/vex
touch config.py

Edit config.py with the following content:

# Paths
INPUT_PATH = "/Users/yourusername/ai_models/vex/input"
OUTPUT_PATH = "/Users/yourusername/ai_models/vex/output"
TEMP_PATH = "/Users/yourusername/ai_models/vex/temp"
ASSETS_PATH = "/Users/yourusername/ai_models/vex/assets"

# ElevenLabs API (for voice synthesis)


ELEVENLABS_API_KEY = "YOUR_ELEVENLABS_API_KEY"
VOICE_ID = "BAPI_LARRI_VOICE_ID" # Same voice ID as RUSH

# YouTube API
YOUTUBE_API_KEY = "YOUR_YOUTUBE_API_KEY"

# Processing settings
MAX_PROCESSING_TIME = 7 * 60 * 60 # 7 hours in seconds
TARGET_SHORT_DURATION = 60 # 1 minute in seconds
NUM_SHORTS_TO_CREATE = 4 # Number of shorts to create

Create necessary directories:

mkdir -p ~/ai_models/vex/{input,output,temp,assets,logs}
mkdir -p ~/ai_models/vex/assets/{music,sfx,memes,transitions}

Step 2: Implement SSD Detection and Video Import

Create ssd_monitor.py :

import os
import time
import shutil
import subprocess
from config import INPUT_PATH

def get_mounted_volumes():
"""Get a list of mounted volumes"""
result = subprocess.run(['diskutil', 'list', '-plist'], capture_output=True, text=True)
# Parse the output to get volume names
volumes = []
for line in result.stdout.split('\n'):
if 'Volume' in line and '/Volumes/' in line:
volume = line.split('/Volumes/')[1].split("'")[0]
volumes.append(f"/Volumes/{volume}")
return volumes

def is_video_file(filename):
"""Check if a file is a video file based on extension"""
video_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.m4v']
return any(filename.lower().endswith(ext) for ext in video_extensions)

def import_videos_from_ssd():
"""Check for newly connected SSDs and import video files"""
# Get currently mounted volumes
volumes = get_mounted_volumes()

# Create input directory if it doesn't exist


os.makedirs(INPUT_PATH, exist_ok=True)

# Check each volume for video files


imported_files = []
for volume in volumes:
# Skip system volumes
if volume == '/Volumes/Macintosh HD':
continue

# Check if the volume exists


if not os.path.exists(volume):
continue

print(f"Checking volume: {volume}")

# Walk through the volume


for root, dirs, files in os.walk(volume):
for file in files:
if is_video_file(file):
source_path = os.path.join(root, file)
dest_path = os.path.join(INPUT_PATH, file)

# Copy the file if it doesn't already exist


if not os.path.exists(dest_path):
print(f"Importing: {source_path} -> {dest_path}")
shutil.copy2(source_path, dest_path)
imported_files.append(dest_path)

return imported_files

def monitor_for_ssd(check_interval=30):
"""Continuously monitor for newly connected SSDs"""
while True:
print("Checking for new SSDs...")
imported_files = import_videos_from_ssd()

if imported_files:
print(f"Imported {len(imported_files)} new video files.")
return imported_files

print(f"No new videos found. Checking again in {check_interval} seconds.")


time.sleep(check_interval)

if __name__ == "__main__":
monitor_for_ssd()

Step 3: Implement Video Analysis

Create video_analyzer.py :
import cv2
import numpy as np
import subprocess
import json
import os
from config import TEMP_PATH

def extract_frames(video_path, output_dir, fps=1):


"""Extract frames from video at specified FPS"""
os.makedirs(output_dir, exist_ok=True)

# Get video info


cap = cv2.VideoCapture(video_path)
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / video_fps

# Calculate frame interval


interval = int(video_fps / fps)

# Extract frames
count = 0
frame_paths = []

while cap.isOpened():
ret, frame = cap.read()
if not ret:
break

if count % interval == 0:
frame_path = os.path.join(output_dir, f"frame_{count:06d}.jpg")
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)

count += 1

cap.release()

return frame_paths, duration

def analyze_audio(video_path, output_dir):


"""Extract and analyze audio from video"""
os.makedirs(output_dir, exist_ok=True)

# Extract audio
audio_path = os.path.join(output_dir, "audio.wav")
subprocess.run([
"ffmpeg", "-i", video_path, "-q:a", "0", "-map", "a", audio_path, "-y"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Analyze audio for loudness


loudness_data = os.path.join(output_dir, "loudness.json")
subprocess.run([
"ffmpeg", "-i", audio_path, "-af",
"loudnorm=print_format=json", "-f", "null", "-"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# Analyze audio for silence


silence_data = os.path.join(output_dir, "silence.txt")
subprocess.run([
"ffmpeg", "-i", audio_path, "-af",
"silencedetect=noise=-30dB:d=0.5", "-f", "null", "-"
], stdout=open(silence_data, 'w'), stderr=subprocess.STDOUT)

return audio_path, loudness_data, silence_data

def detect_scenes(video_path, threshold=30):


"""Detect scene changes in video"""
# Use PySceneDetect via command line
output_csv = os.path.join(TEMP_PATH, "scenes.csv")

subprocess.run([
"scenedetect", "-i", video_path, "detect-content",
f"--threshold={threshold}", "list-scenes", "-o", output_csv
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Parse the CSV file


scenes = []
if os.path.exists(output_csv):
with open(output_csv, 'r') as f:
lines = f.readlines()
for line in lines[1:]: # Skip header
parts = line.strip().split(',')
if len(parts) >= 4:
start_time = float(parts[0])
end_time = float(parts[1])
scenes.append((start_time, end_time))

return scenes

def analyze_video(video_path):
"""Perform comprehensive analysis of a video"""
# Create temporary directory
video_name = os.path.splitext(os.path.basename(video_path))[0]
temp_dir = os.path.join(TEMP_PATH, video_name)
os.makedirs(temp_dir, exist_ok=True)

# Extract frames
frames_dir = os.path.join(temp_dir, "frames")
frame_paths, duration = extract_frames(video_path, frames_dir)

# Analyze audio
audio_dir = os.path.join(temp_dir, "audio")
audio_path, loudness_data, silence_data = analyze_audio(video_path, audio_dir)

# Detect scenes
scenes = detect_scenes(video_path)

# Return analysis results


return {
"video_path": video_path,
"duration": duration,
"frame_paths": frame_paths,
"audio_path": audio_path,
"scenes": scenes,
"temp_dir": temp_dir
}

Step 4: Implement Trend Research

Create trend_researcher.py :

import os
import json
import requests
from pytube import YouTube, Search
from config import YOUTUBE_API_KEY, ASSETS_PATH

def search_trending_videos(category="entertainment", max_results=10):


"""Search for trending videos on YouTube"""
url = f"https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,statistics",
"chart": "mostPopular",
"regionCode": "US",
"videoCategoryId": "24" if category == "entertainment" else "20", #
24=Entertainment, 20=Gaming
"maxResults": max_results,
"key": YOUTUBE_API_KEY
}

response = requests.get(url, params=params)


data = response.json()

trending_videos = []
if "items" in data:
for item in data["items"]:
video_id = item["id"]
title = item["snippet"]["title"]
channel = item["snippet"]["channelTitle"]
views = item["statistics"]["viewCount"]

trending_videos.append({
"id": video_id,
"title": title,
"channel": channel,
"views": views,
"url": f"https://www.youtube.com/watch?v={video_id}"
})

return trending_videos

def search_trending_memes(max_results=10):
"""Search for trending meme videos on YouTube"""
search_results = Search("trending memes 2023").results

meme_videos = []
for i, result in enumerate(search_results):
if i >= max_results:
break

meme_videos.append({
"id": result.video_id,
"title": result.title,
"channel": result.author,
"url": f"https://www.youtube.com/watch?v={result.video_id}"
})

return meme_videos

def download_meme_clips(meme_videos, output_dir):


"""Download short clips from meme videos"""
os.makedirs(output_dir, exist_ok=True)

downloaded_clips = []
for meme in meme_videos:
try:
# Download the video
yt = YouTube(meme["url"])
stream = yt.streams.filter(progressive=True, file_extension="mp4").first()

if not stream:
continue

# Download to temporary location


temp_path = stream.download(output_path=output_dir,
filename=f"temp_{meme['id']}.mp4")

# Get video duration


duration = yt.length

# Extract a 3-second clip from the middle


middle_point = duration / 2
start_time = max(0, middle_point - 1.5)
end_time = min(duration, middle_point + 1.5)
clip_path = os.path.join(output_dir, f"meme_{meme['id']}.mp4")

# Use ffmpeg to extract the clip


os.system(f'ffmpeg -i "{temp_path}" -ss {start_time} -to {end_time} -c:v copy -
c:a copy "{clip_path}" -y')

# Remove the temporary file


os.remove(temp_path)

downloaded_clips.append({
"id": meme["id"],
"title": meme["title"],
"path": clip_path
})

except Exception as e:
print(f"Error downloading {meme['url']}: {str(e)}")

return downloaded_clips

def research_trends():
"""Research trending content and download assets"""
# Create assets directories
memes_dir = os.path.join(ASSETS_PATH, "memes")
os.makedirs(memes_dir, exist_ok=True)

# Get trending videos


trending_videos = search_trending_videos()

# Get trending memes


meme_videos = search_trending_memes()

# Download meme clips


meme_clips = download_meme_clips(meme_videos, memes_dir)

# Save research results


research_data = {
"trending_videos": trending_videos,
"meme_clips": meme_clips
}

with open(os.path.join(ASSETS_PATH, "trend_research.json"), 'w') as f:


json.dump(research_data, f, indent=2)

return research_data

Step 5: Implement Video Editing Pipeline

Create video_editor.py :
import os
import random
import subprocess
import json
import numpy as np
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips,
CompositeVideoClip, TextClip
from config import OUTPUT_PATH, ASSETS_PATH, TARGET_SHORT_DURATION

def create_edit_decision_list(video_analysis, trend_research):


"""Create an edit decision list based on video analysis and trends"""
scenes = video_analysis["scenes"]
duration = video_analysis["duration"]

# If no scenes detected, create artificial scenes


if not scenes:
scene_duration = 10 # 10 seconds per scene
num_scenes = int(duration / scene_duration)
scenes = [(i * scene_duration, (i + 1) * scene_duration) for i in
range(num_scenes)]

# Select interesting scenes for shorts


selected_scenes = []

# Prioritize scenes in the middle of the video (usually more interesting)


middle_index = len(scenes) // 2
priority_scenes = scenes[max(0, middle_index - 5):min(len(scenes), middle_index
+ 5)]

# Select scenes that are close to the target duration


for scene_start, scene_end in priority_scenes:
scene_duration = scene_end - scene_start
if 5 <= scene_duration <= 90: # Between 5 and 90 seconds
selected_scenes.append((scene_start, scene_end))

# If not enough scenes, add more from the rest


if len(selected_scenes) < 4:
for scene_start, scene_end in scenes:
if (scene_start, scene_end) not in selected_scenes:
scene_duration = scene_end - scene_start
if 5 <= scene_duration <= 90:
selected_scenes.append((scene_start, scene_end))
if len(selected_scenes) >= 4:
break

# Create edit decision list


edl = []
for i, (scene_start, scene_end) in enumerate(selected_scenes[:4]):
# Calculate target duration (around 60 seconds)
target_duration = min(scene_end - scene_start, TARGET_SHORT_DURATION)
# Select random meme clips
meme_clips = []
if trend_research and "meme_clips" in trend_research:
available_memes = trend_research["meme_clips"]
num_memes = min(3, len(available_memes))
selected_memes = random.sample(available_memes, num_memes)
meme_clips = [meme["path"] for meme in selected_memes]

edl.append({
"output_name": f"short_{i+1}.mp4",
"source_start": scene_start,
"source_end": scene_start + target_duration,
"meme_clips": meme_clips,
"add_text": True,
"add_music": True
})

return edl

def add_meme(main_clip, meme_path, position="random"):


"""Add a meme clip to the main video"""
meme_clip = VideoFileClip(meme_path)

# Resize meme to 1/4 of the main video


meme_clip = meme_clip.resize(width=main_clip.w // 4)

# Determine position
if position == "random":
position = random.choice(["top-left", "top-right", "bottom-left", "bottom-
right"])

if position == "top-left":
x_pos = 20
y_pos = 20
elif position == "top-right":
x_pos = main_clip.w - meme_clip.w - 20
y_pos = 20
elif position == "bottom-left":
x_pos = 20
y_pos = main_clip.h - meme_clip.h - 20
else: # bottom-right
x_pos = main_clip.w - meme_clip.w - 20
y_pos = main_clip.h - meme_clip.h - 20

# Set position
meme_clip = meme_clip.set_position((x_pos, y_pos))

# Determine when to show the meme (random time in the second half of the video)
main_duration = main_clip.duration
start_time = random.uniform(main_duration * 0.5, max(main_duration * 0.5,
main_duration - meme_clip.duration))
end_time = min(start_time + meme_clip.duration, main_duration)
# Set start and end times
meme_clip = meme_clip.set_start(start_time).set_end(end_time)

# Create composite clip


composite_clip = CompositeVideoClip([main_clip, meme_clip])

return composite_clip

def create_short(video_path, edit_info, output_dir):


"""Create a short video based on edit decision list"""
# Load the main video
main_clip = VideoFileClip(video_path).subclip(edit_info["source_start"],
edit_info["source_end"])

# Add memes if available


composite_clip = main_clip
for meme_path in edit_info["meme_clips"]:
if os.path.exists(meme_path):
composite_clip = add_meme(composite_clip, meme_path)

# Add text if requested


if edit_info["add_text"]:
# Create a title text
title = "Check out this moment!"
text_clip = TextClip(title, fontsize=30, color='white', bg_color='black', font='Arial-
Bold')
text_clip = text_clip.set_position(('center', 'bottom')).set_duration(5)

# Add text to the beginning


composite_clip = CompositeVideoClip([composite_clip, text_clip.set_start(0)])

# Set output path


output_path = os.path.join(output_dir, edit_info["output_name"])

# Write the result


composite_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")

# Close clips to free memory


composite_clip.close()
if composite_clip != main_clip:
main_clip.close()

return output_path

def edit_vlog(video_path, video_analysis, trend_research):


"""Edit a vlog into multiple shorts"""
# Create output directory
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_dir = os.path.join(OUTPUT_PATH, video_name)
os.makedirs(output_dir, exist_ok=True)
# Create edit decision list
edl = create_edit_decision_list(video_analysis, trend_research)

# Create shorts
output_paths = []
for edit_info in edl:
output_path = create_short(video_path, edit_info, output_dir)
output_paths.append(output_path)

return output_paths

Step 6: Implement Voice-Over Generation

Create voice_synthesis.py :

import requests
import os
import json
import openai
from config import ELEVENLABS_API_KEY, VOICE_ID

def generate_script(video_content):
"""Generate a voice-over script based on video content"""
prompt = f"""
Create a short, engaging voice-over script for a vlog highlight. The content is
about:

{video_content}

The script should be conversational, energetic, and include some humor. It


should be about 30 seconds when read aloud.
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a creative content writer for YouTube
shorts."},
{"role": "user", "content": prompt}
]
)

return response.choices[0].message.content

def synthesize_voice(text, output_path):


"""Convert text to speech using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"

headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": ELEVENLABS_API_KEY
}

data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}

response = requests.post(url, json=data, headers=headers)

if response.status_code == 200:
with open(output_path, 'wb') as f:
f.write(response.content)
return output_path
else:
print(f"Error: {response.status_code}")
print(response.text)
return None

def add_voiceover_to_video(video_path, audio_path, output_path):


"""Add voice-over audio to a video"""
# Use ffmpeg to add the voice-over
command = [
"ffmpeg", "-i", video_path,
"-i", audio_path,
"-filter_complex", "[0:a]volume=0.3[a1];[1:a]volume=1.0[a2];[a1]
[a2]amix=inputs=2:duration=longest",
"-c:v", "copy", output_path, "-y"
]

subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

return output_path

def create_voiceover(video_path, video_content, output_dir):


"""Create and add voice-over to a video"""
# Generate script
script = generate_script(video_content)

# Create output directory if it doesn't exist


os.makedirs(output_dir, exist_ok=True)

# Get base filename


base_name = os.path.splitext(os.path.basename(video_path))[0]

# Synthesize voice
audio_path = os.path.join(output_dir, f"{base_name}_voiceover.mp3")
synthesize_voice(script, audio_path)

# Add voice-over to video


output_path = os.path.join(output_dir, f"{base_name}_with_voiceover.mp4")
add_voiceover_to_video(video_path, audio_path, output_path)

return output_path, script

Step 7: Create Main Processing Script

Create main.py :

import os
import time
import json
import logging
from datetime import datetime

from config import INPUT_PATH, OUTPUT_PATH, TEMP_PATH,


MAX_PROCESSING_TIME
from ssd_monitor import monitor_for_ssd
from video_analyzer import analyze_video
from trend_researcher import research_trends
from video_editor import edit_vlog
from voice_synthesis import create_voiceover

# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'logs', 'vex.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger('VEX')

def process_video(video_path):
"""Process a single video file"""
start_time = time.time()
logger.info(f"Starting to process video: {video_path}")

try:
# Step 1: Analyze the video
logger.info("Analyzing video...")
video_analysis = analyze_video(video_path)

# Step 2: Research trends


logger.info("Researching trends...")
trend_research = research_trends()

# Step 3: Edit the vlog into shorts


logger.info("Editing vlog into shorts...")
output_paths = edit_vlog(video_path, video_analysis, trend_research)

# Step 4: Add voice-overs


logger.info("Adding voice-overs...")
final_outputs = []
for output_path in output_paths:
# Generate simple content description
video_name = os.path.basename(output_path)
content_description = f"A highlight from a vlog showing interesting
moments and activities."

# Create voice-over
output_dir = os.path.dirname(output_path)
final_path, script = create_voiceover(output_path, content_description,
output_dir)
final_outputs.append({"path": final_path, "script": script})

# Step 5: Create summary file


summary_path = os.path.join(os.path.dirname(output_paths[0]),
"summary.json")
with open(summary_path, 'w') as f:
json.dump({
"original_video": video_path,
"processing_time": time.time() - start_time,
"outputs": final_outputs
}, f, indent=2)

logger.info(f"Video processing completed in {time.time() - start_time:.2f}


seconds")
logger.info(f"Created {len(final_outputs)} shorts")

return final_outputs

except Exception as e:
logger.error(f"Error processing video: {str(e)}", exc_info=True)
return []

def main():
"""Main function to monitor for videos and process them"""
logger.info("Starting VEX - Automated Vlog Editor")

while True:
try:
# Step 1: Monitor for new videos from SSD
logger.info("Monitoring for new videos from SSD...")
video_files = monitor_for_ssd()

if not video_files:
logger.info("No new videos found. Waiting...")
time.sleep(60)
continue

# Step 2: Process each video


for video_path in video_files:
# Check if we have enough time to process
current_time = datetime.now()
# Only process if it's between 10 PM and 5 AM
if 22 <= current_time.hour or current_time.hour < 5:
logger.info(f"Processing video: {video_path}")
process_video(video_path)
else:
logger.info(f"Skipping processing until night time. Current hour:
{current_time.hour}")

# Step 3: Wait before checking again


logger.info("Finished processing. Waiting for new videos...")
time.sleep(300) # Wait 5 minutes before checking again

except Exception as e:
logger.error(f"Error in main loop: {str(e)}", exc_info=True)
time.sleep(60) # Wait a minute before retrying

if __name__ == "__main__":
main()

Step 8: Create Startup Script

Create start_vex.sh :

#!/bin/bash
cd ~/ai_models/vex
python3 main.py

Make it executable:

chmod +x start_vex.sh

Step 9: Set Up Automatic Startup

Create a LaunchAgent to start VEX automatically:

mkdir -p ~/Library/LaunchAgents
Create ~/Library/LaunchAgents/com.user.vex.plist :

<?xml version="1.0" encoding="UTF-8"?>


<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.vex</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/vex/start_vex.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/vex/logs/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/vex/logs/output.log</string>
</dict>
</plist>

Load the LaunchAgent:

launchctl load ~/Library/LaunchAgents/com.user.vex.plist

System Integration and Resource Management

Resource Allocation

To ensure all three models can run efficiently on your Mac Mini M4 with 24GB RAM:

1. Memory Management: ```python # Add to each model's main script import


resource

# RUSH: Limit to 6GB resource.setrlimit(resource.RLIMIT_AS, (6 * 1024 * 1024 * 1024, -1))

# NAMI: Limit to 4GB resource.setrlimit(resource.RLIMIT_AS, (4 * 1024 * 1024 * 1024, -1))

# VEX: Limit to 12GB resource.setrlimit(resource.RLIMIT_AS, (12 * 1024 * 1024 * 1024, -1))


```

1. Storage Management: Create a cleanup script to manage storage:


```bash #!/bin/bash # cleanup.sh

# Set maximum storage usage (in GB) MAX_STORAGE=400

# Clean up temporary files find ~/ai_models/*/temp -type f -mtime +7 -delete

# Check current storage usage CURRENT_USAGE=$(du -sg ~/ai_models | cut -f1)

# If usage exceeds limit, remove oldest files if [ $CURRENT_USAGE -gt $MAX_STORAGE ];


then echo "Storage usage exceeds limit. Cleaning up..."

# Remove oldest input videos


find ~/ai_models/vex/input -type f -printf '%T+ %p\n' | sort | head -n 10 | cut -d' '
-f2 | xargs rm -f

# Remove oldest output videos


find ~/ai_models/vex/output -type d -printf '%T+ %p\n' | sort | head -n 5 | cut -d'
' -f2 | xargs rm -rf

fi ```

Add to crontab to run daily: bash 0 4 * * * /Users/yourusername/ai_models/shared/


cleanup.sh >> /Users/yourusername/ai_models/shared/cleanup.log 2>&1

System Monitoring

Create a monitoring script to ensure all services are running:

# monitor.py
import os
import subprocess
import time
import smtplib
from email.mime.text import MIMEText

def check_process(process_name):
"""Check if a process is running"""
result = subprocess.run(['pgrep', '-f', process_name], capture_output=True)
return result.returncode == 0

def restart_process(process_name, start_script):


"""Restart a process"""
subprocess.run(['pkill', '-f', process_name])
time.sleep(2)
subprocess.run(['bash', start_script])

def send_notification(subject, message):


"""Send an email notification"""
# Configure your email settings
sender = 'your_email@example.com'
recipient = 'your_email@example.com'
password = 'your_app_password'

msg = MIMEText(message)
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = recipient

try:
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender, password)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email: {str(e)}")

def main():
"""Main monitoring function"""
processes = [
{'name': 'bot.py', 'script': '/Users/yourusername/ai_models/rush/start_rush.sh',
'service': 'RUSH'},
{'name': 'bot.py', 'script': '/Users/yourusername/ai_models/nami/
start_nami.sh', 'service': 'NAMI'},
{'name': 'main.py', 'script': '/Users/yourusername/ai_models/vex/start_vex.sh',
'service': 'VEX'}
]

for process in processes:


if not check_process(process['name']):
print(f"{process['service']} is not running. Restarting...")
restart_process(process['name'], process['script'])
send_notification(
f"{process['service']} Restarted",
f"The {process['service']} service was not running and has been restarted."
)

if __name__ == '__main__':
main()

Add to crontab to run every 15 minutes:

*/15 * * * * /usr/bin/python3 /Users/yourusername/ai_models/shared/monitor.py


>> /Users/yourusername/ai_models/shared/monitor.log 2>&1
Troubleshooting and Optimization

Common Issues and Solutions

1. Telegram Bot Not Responding:


2. Check if the bot is running: ps aux | grep bot.py
3. Verify internet connection: ping telegram.org

4. Restart the bot: launchctl unload ~/Library/LaunchAgents/com.user.rush.plist &&


launchctl load ~/Library/LaunchAgents/com.user.rush.plist

5. Video Processing Errors:

6. Check FFmpeg installation: ffmpeg -version


7. Verify Python dependencies: pip3 list | grep -E 'moviepy|opencv'
8. Check available disk space: df -h

9. Check log files: tail -n 100 ~/ai_models/vex/logs/vex.log

10. Memory Issues:

11. Monitor memory usage: top -o mem


12. Reduce model sizes in Whisper (use 'small' instead of 'medium')
13. Adjust resource limits in the scripts

Performance Optimization

1. Reduce Whisper Model Size: For RUSH, if transcription is slow: python # Change
in transcribe.py model = whisper.load_model("small") # Instead of "medium"

2. Optimize Video Processing: For VEX, to speed up processing: python # Change in


video_analyzer.py def extract_frames(video_path, output_dir, fps=0.5): # Reduce
from 1 to 0.5 fps

3. Use Hardware Acceleration: For video encoding/decoding: bash # Add to ffmpeg


commands -hwaccel videotoolbox
Alternative Approaches

Cloud-Based Alternative

If the Mac Mini's resources are insufficient, consider these cloud alternatives:

1. RUSH Alternative:
2. Use OpenAI's Whisper API for transcription
3. Use ChatGPT API for question generation

4. Host the Telegram bot on a small cloud instance (AWS Lambda or Google Cloud
Functions)

5. NAMI Alternative:

6. Set up a small cloud VM (AWS EC2 t2.micro)


7. Install an SSH server with key authentication
8. Create a secure tunnel to your Mac Mini

9. Run the Telegram bot on the cloud VM

10. VEX Alternative:

11. Use AWS S3 for video storage


12. Use AWS Elastic Transcoder or Google Cloud Video Intelligence API
13. Set up a processing pipeline with AWS Lambda functions
14. Use Amazon Polly for voice synthesis

Simplified Local Setup

If you want a lighter setup that uses fewer resources:

1. Simplified RUSH:
2. Use pre-recorded questions instead of generating them
3. Skip voice synthesis and just send text responses

4. Use a lighter transcription model

5. Simplified NAMI:

6. Limit to basic file operations only


7. Remove natural language processing

8. Use predefined command templates


9. Simplified VEX:

10. Process videos during the day at scheduled times


11. Create fewer shorts (1-2 instead of 4)
12. Skip trend research and use a fixed set of effects

This comprehensive guide provides all the necessary steps to implement the three AI
models on your Mac Mini M4. The implementation focuses on efficiency, automation,
and resource management to ensure all three models can run simultaneously without
issues.

You might also like