Comprehensive Implementation Guide
for Mac Mini M4 AI Models
This guide provides detailed instructions for implementing three AI models on a Mac
Mini with M4 chip, 24GB RAM, and 512GB storage. The models are designed for college
use and include:
1. RUSH - Interactive Lecture Assistant (Telegram Bot)
2. NAMI - System Control Assistant (Telegram Bot)
3. VEX - Automated Vlog Editor
Table of Contents
1. System Requirements and Preparation
2. Model 1: RUSH - Interactive Lecture Assistant
3. Model 2: NAMI - System Control Assistant
4. Model 3: VEX - Automated Vlog Editor
5. System Integration and Resource Management
6. Troubleshooting and Optimization
7. Alternative Approaches
System Requirements and Preparation
Hardware Requirements
• Mac Mini with M4 chip
• 24GB RAM
• 512GB storage
• External SSD for video storage (recommended)
• Stable internet connection
Software Prerequisites
Before beginning implementation, install the following:
# Install Homebrew (package manager for macOS)
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/
HEAD/install.sh)"
# Install Python and required tools
brew install python@3.11 ffmpeg node git
# Install Python packages
pip3 install python-telegram-bot openai-whisper pydub numpy pandas matplotlib
torch torchvision opencv-python moviepy pytube elevenlabs paramiko scp
System Configuration
1. Disable Sleep Mode:
2. Go to System Preferences > Energy Saver
3. Set "Computer sleep" to "Never"
4. Check "Prevent computer from sleeping automatically when the display is off"
5. Set Up Development Environment: bash mkdir -p ~/ai_models/{rush,nami,vex}
mkdir -p ~/ai_models/shared
Model 1: RUSH - Interactive Lecture Assistant
RUSH is designed to process lecture recordings, generate questions, and interact with
you using voice synthesis.
Step 1: Create Telegram Bot
1. Open Telegram and search for "BotFather"
2. Start a chat and send /newbot
3. Follow instructions to create a bot named "RUSH"
4. Save the API token provided by BotFather
Step 2: Set Up Local Environment
Create a configuration file:
cd ~/ai_models/rush
touch config.py
Edit config.py with the following content:
# Telegram Bot API Token
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
# User ID (your Telegram user ID)
USER_ID = "YOUR_TELEGRAM_USER_ID" # You can get this from @userinfobot
# Paths
AUDIO_PATH = "/Users/yourusername/ai_models/rush/audio"
TRANSCRIPT_PATH = "/Users/yourusername/ai_models/rush/transcripts"
# ElevenLabs API (for voice synthesis)
ELEVENLABS_API_KEY = "YOUR_ELEVENLABS_API_KEY"
VOICE_ID = "BAPI_LARRI_VOICE_ID" # You'll create this voice in ElevenLabs
Create necessary directories:
mkdir -p ~/ai_models/rush/{audio,transcripts,responses}
Step 3: Set Up Voice Synthesis with ElevenLabs
1. Create an account at ElevenLabs
2. Generate an API key from your profile settings
3. Create a new voice similar to "Bapi Larri" using their voice cloning feature
4. Note the Voice ID for your configuration
Step 4: Implement Audio Processing with Whisper
Create transcribe.py :
import os
import whisper
from config import AUDIO_PATH, TRANSCRIPT_PATH
def transcribe_audio(file_path):
# Load Whisper model (can be 'tiny', 'base', 'small', 'medium', or 'large')
model = whisper.load_model("medium")
# Transcribe audio
result = model.transcribe(file_path)
# Get filename without extension
base_name = os.path.basename(file_path).split('.')[0]
# Save transcript
transcript_path = os.path.join(TRANSCRIPT_PATH, f"{base_name}.txt")
with open(transcript_path, "w") as f:
f.write(result["text"])
return transcript_path, result["text"]
Step 5: Implement Question Generation
Create question_generator.py :
import os
import openai
from config import TRANSCRIPT_PATH
# Set your OpenAI API key
openai.api_key = "YOUR_OPENAI_API_KEY"
def generate_questions(transcript_text, num_questions=5):
"""Generate curious questions based on lecture transcript"""
prompt = f"""
The following is a transcript from a college lecture:
{transcript_text[:4000]} # Limiting to 4000 chars to avoid token limits
Generate {num_questions} curious and insightful questions that would help a
student better understand
the material. The questions should be thought-provoking and demonstrate
curiosity about the subject.
Format each question on a new line with a number.
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a curious and insightful teaching
assistant."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
Step 6: Implement Voice Synthesis
Create voice_synthesis.py :
import requests
import os
from config import ELEVENLABS_API_KEY, VOICE_ID
def synthesize_speech(text, output_path):
"""Convert text to speech using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": ELEVENLABS_API_KEY
}
data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
with open(output_path, 'wb') as f:
f.write(response.content)
return output_path
else:
print(f"Error: {response.status_code}")
print(response.text)
return None
Step 7: Create Telegram Bot Handler
Create bot.py :
import os
import logging
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler,
MessageHandler, filters, ContextTypes
import tempfile
from pydub import AudioSegment
from config import TELEGRAM_TOKEN, USER_ID, AUDIO_PATH, TRANSCRIPT_PATH
from transcribe import transcribe_audio
from question_generator import generate_questions
from voice_synthesis import synthesize_speech
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /start is issued."""
await
update.message.reply_text('Hi! I am RUSH, your lecture assistant. Send me audio
recordings of your lectures, and I will ask you curious questions about them.')
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /help is issued."""
await update.message.reply_text('Send me an audio recording of your lecture,
and I will transcribe it and ask you questions about it.')
async def process_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Process audio files sent by the user."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Notify user that processing has begun
await update.message.reply_text("I'm processing your lecture recording. This
might take a few minutes...")
# Get the audio file
audio_file = await context.bot.get_file(update.message.voice.file_id)
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) as temp_file:
temp_path = temp_file.name
# Download the file
await audio_file.download_to_drive(temp_path)
# Convert ogg to wav (Whisper works better with wav)
audio = AudioSegment.from_ogg(temp_path)
wav_path = os.path.join(AUDIO_PATH,
f"lecture_{update.message.message_id}.wav")
audio.export(wav_path, format="wav")
# Remove temporary file
os.unlink(temp_path)
# Transcribe audio
await update.message.reply_text("Transcribing your lecture...")
transcript_path, transcript_text = transcribe_audio(wav_path)
# Generate questions
await update.message.reply_text("Generating curious questions about the
lecture...")
questions = generate_questions(transcript_text)
# Synthesize speech
await update.message.reply_text("Creating voice response...")
voice_path = os.path.join(AUDIO_PATH,
f"response_{update.message.message_id}.mp3")
synthesize_speech(questions, voice_path)
# Send transcript
with open(transcript_path, 'r') as f:
await update.message.reply_text("Here's the transcript of your lecture:")
await update.message.reply_text(f.read()[:4000] + "..." if len(f.read()) > 4000
else f.read())
# Send questions as text
await update.message.reply_text("Here are some curious questions about the
lecture:")
await update.message.reply_text(questions)
# Send questions as voice message
with open(voice_path, 'rb') as voice:
await update.message.reply_voice(voice)
def main():
"""Start the bot."""
# Create the Application
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.VOICE, process_audio))
# Run the bot
application.run_polling()
if __name__ == '__main__':
main()
Step 8: Create Startup Script
Create start_rush.sh :
#!/bin/bash
cd ~/ai_models/rush
python3 bot.py
Make it executable:
chmod +x start_rush.sh
Step 9: Set Up Automatic Startup
Create a LaunchAgent to start RUSH automatically:
mkdir -p ~/Library/LaunchAgents
Create ~/Library/LaunchAgents/com.user.rush.plist :
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.rush</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/rush/start_rush.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/rush/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/rush/output.log</string>
</dict>
</plist>
Load the LaunchAgent:
launchctl load ~/Library/LaunchAgents/com.user.rush.plist
Model 2: NAMI - System Control Assistant
NAMI provides terminal access to your system through Telegram, allowing you to
retrieve files and execute commands remotely.
Step 1: Create Telegram Bot
1. Open Telegram and search for "BotFather"
2. Start a chat and send /newbot
3. Follow instructions to create a bot named "NAMI"
4. Save the API token provided by BotFather
Step 2: Set Up Local Environment
Create a configuration file:
cd ~/ai_models/nami
touch config.py
Edit config.py with the following content:
# Telegram Bot API Token
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
# User ID (your Telegram user ID)
USER_ID = "YOUR_TELEGRAM_USER_ID" # You can get this from @userinfobot
# Allowed commands (for security)
ALLOWED_COMMANDS = [
"ls", "cd", "pwd", "cat", "head", "tail", "grep", "find",
"cp", "mv", "mkdir", "touch", "rm", "python", "python3"
]
# Paths
DOWNLOAD_PATH = "/Users/yourusername/ai_models/nami/downloads"
UPLOAD_PATH = "/Users/yourusername/ai_models/nami/uploads"
Create necessary directories:
mkdir -p ~/ai_models/nami/{downloads,uploads,logs}
Step 3: Implement Command Execution Module
Create command_executor.py :
import subprocess
import os
import re
from config import ALLOWED_COMMANDS
def is_command_allowed(command):
"""Check if the command is in the allowed list"""
# Extract the base command (before any arguments)
base_command = command.split()[0]
return base_command in ALLOWED_COMMANDS
def execute_command(command, working_dir=None):
"""Execute a shell command and return the output"""
if not is_command_allowed(command):
return False, f"Command '{command.split()[0]}' is not allowed for security
reasons."
try:
# Set working directory if provided
cwd = working_dir if working_dir else os.getcwd()
# Execute command
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=cwd
)
# Get output
stdout, stderr = process.communicate(timeout=60) # 60-second timeout
# Check if there was an error
if process.returncode != 0:
return False, f"Error executing command:\n{stderr}"
return True, stdout
except subprocess.TimeoutExpired:
return False, "Command execution timed out after 60 seconds."
except Exception as e:
return False, f"Error executing command: {str(e)}"
Step 4: Implement File Transfer Module
Create file_manager.py :
import os
import shutil
from config import DOWNLOAD_PATH, UPLOAD_PATH
def get_file_path(filename, directory=None):
"""Get the full path for a file"""
if directory:
# Make sure the path doesn't escape the allowed directories
if ".." in directory or directory.startswith("/"):
return None
full_path = os.path.join(os.path.expanduser("~"), directory, filename)
else:
full_path = os.path.join(UPLOAD_PATH, filename)
return full_path
def save_file(file_obj, filename):
"""Save a file to the uploads directory"""
os.makedirs(UPLOAD_PATH, exist_ok=True)
file_path = os.path.join(UPLOAD_PATH, filename)
with open(file_path, 'wb') as f:
f.write(file_obj.read())
return file_path
def get_file(file_path):
"""Get a file from the system"""
# Security check to prevent directory traversal
if ".." in file_path or not os.path.exists(file_path):
return None
if os.path.isfile(file_path):
return file_path
return None
Step 5: Create Natural Language Command Parser
Create nl_parser.py :
import re
import os
import openai
# Set your OpenAI API key
openai.api_key = "YOUR_OPENAI_API_KEY"
def parse_natural_language(text):
"""Convert natural language to shell commands"""
prompt = f"""
Convert the following natural language request into a shell command or a series
of shell commands.
Only return the command(s), nothing else.
Request: {text}
Commands:
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content":
"You are a helpful assistant that converts natural language into shell commands.
Only output the commands, no explanations."},
{"role": "user", "content": prompt}
]
)
commands = response.choices[0].message.content.strip()
# Clean up the response to ensure it's just commands
commands = re.sub(r'^```bash\n', '', commands)
commands = re.sub(r'\n```$', '', commands)
return commands
Step 6: Create Telegram Bot Handler
Create bot.py :
import os
import logging
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler,
MessageHandler, filters, ContextTypes
import tempfile
from config import TELEGRAM_TOKEN, USER_ID, DOWNLOAD_PATH,
UPLOAD_PATH
from command_executor import execute_command
from file_manager import get_file_path, save_file, get_file
from nl_parser import parse_natural_language
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
# Current working directory for each user
user_cwd = {}
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /start is issued."""
await
update.message.reply_text('Hi! I am NAMI, your system control assistant. I can help
you access files and execute commands on your Mac Mini.')
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /help is issued."""
help_text = """
I can help you with the following:
1. Execute shell commands: Just send me a command like 'ls -la' or describe what
you want in natural language.
2. Get files: Use /get [path/to/file] to retrieve a file from your Mac.
3. Change directory: Use /cd [directory] to change the working directory.
4. Current directory: Use /pwd to see the current working directory.
For security reasons, I can only execute certain commands and access files within
allowed directories.
"""
await update.message.reply_text(help_text)
async def execute_shell_command(update: Update, context:
ContextTypes.DEFAULT_TYPE):
"""Execute a shell command."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Get the command
command = update.message.text
# Get the current working directory for this user
cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))
# Execute the command
success, output = execute_command(command, cwd)
# Update the current directory if it was a cd command
if success and command.startswith("cd "):
new_dir = command[3:].strip()
if new_dir.startswith("/"):
# Absolute path
user_cwd[str(update.message.from_user.id)] = new_dir
else:
# Relative path
user_cwd[str(update.message.from_user.id)] =
os.path.normpath(os.path.join(cwd, new_dir))
# Send the output
if success:
# Split long outputs into multiple messages
if len(output) > 4000:
for i in range(0, len(output), 4000):
await update.message.reply_text(output[i:i+4000])
else:
await update.message.reply_text(output if output else
"Command executed successfully (no output).")
else:
await update.message.reply_text(f"Error: {output}")
async def process_natural_language(update: Update, context:
ContextTypes.DEFAULT_TYPE):
"""Process natural language requests."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Get the text
text = update.message.text
# Parse the natural language
await update.message.reply_text("Processing your request...")
command = parse_natural_language(text)
# Send the command for confirmation
await update.message.reply_text(f"I'll execute: `{command}`")
# Get the current working directory for this user
cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))
# Execute the command
success, output = execute_command(command, cwd)
# Send the output
if success:
# Split long outputs into multiple messages
if len(output) > 4000:
for i in range(0, len(output), 4000):
await update.message.reply_text(output[i:i+4000])
else:
await update.message.reply_text(output if output else
"Command executed successfully (no output).")
else:
await update.message.reply_text(f"Error: {output}")
async def get_file_command(update: Update, context:
ContextTypes.DEFAULT_TYPE):
"""Get a file from the system."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Check if a file path was provided
if not context.args:
await update.message.reply_text("Please provide a file path. Usage: /get
[path/to/file]")
return
# Get the file path
file_path = " ".join(context.args)
# Get the current working directory for this user
cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))
# If the path is not absolute, make it relative to the current directory
if not file_path.startswith("/"):
file_path = os.path.join(cwd, file_path)
# Check if the file exists
if not os.path.exists(file_path):
await update.message.reply_text(f"File not found: {file_path}")
return
# Check if it's a file
if not os.path.isfile(file_path):
await update.message.reply_text(f"Not a file: {file_path}")
return
# Send the file
try:
with open(file_path, 'rb') as f:
await update.message.reply_document(document=f,
filename=os.path.basename(file_path))
except Exception as e:
await update.message.reply_text(f"Error sending file: {str(e)}")
async def cd_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Change the current working directory."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Check if a directory was provided
if not context.args:
# Default to home directory
user_cwd[str(update.message.from_user.id)] = os.path.expanduser("~")
await update.message.reply_text(f"Changed directory to:
{os.path.expanduser('~')}")
return
# Get the directory
directory = " ".join(context.args)
# Get the current working directory for this user
cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))
# If the path is not absolute, make it relative to the current directory
if not directory.startswith("/"):
new_dir = os.path.normpath(os.path.join(cwd, directory))
else:
new_dir = directory
# Check if the directory exists
if not os.path.exists(new_dir):
await update.message.reply_text(f"Directory not found: {new_dir}")
return
# Check if it's a directory
if not os.path.isdir(new_dir):
await update.message.reply_text(f"Not a directory: {new_dir}")
return
# Update the current directory
user_cwd[str(update.message.from_user.id)] = new_dir
await update.message.reply_text(f"Changed directory to: {new_dir}")
async def pwd_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show the current working directory."""
# Check if the user is authorized
if str(update.message.from_user.id) != USER_ID:
await update.message.reply_text("Sorry, you are not authorized to use this
bot.")
return
# Get the current working directory for this user
cwd = user_cwd.get(str(update.message.from_user.id), os.path.expanduser("~"))
# Send the current directory
await update.message.reply_text(f"Current directory: {cwd}")
def main():
"""Start the bot."""
# Create the Application
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("get", get_file_command))
application.add_handler(CommandHandler("cd", cd_command))
application.add_handler(CommandHandler("pwd", pwd_command))
# Handle shell commands (messages that start with a command)
application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.Regex(r'^[a-zA-Z0-9_]+\s'),
execute_shell_command
))
# Handle natural language requests
application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND & ~filters.Regex(r'^[a-zA-Z0-9_]+\s'),
process_natural_language
))
# Run the bot
application.run_polling()
if __name__ == '__main__':
main()
Step 7: Create Startup Script
Create start_nami.sh :
#!/bin/bash
cd ~/ai_models/nami
python3 bot.py
Make it executable:
chmod +x start_nami.sh
Step 8: Set Up Automatic Startup
Create a LaunchAgent to start NAMI automatically:
mkdir -p ~/Library/LaunchAgents
Create ~/Library/LaunchAgents/com.user.nami.plist :
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.nami</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/nami/start_nami.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/nami/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/nami/output.log</string>
</dict>
</plist>
Load the LaunchAgent:
launchctl load ~/Library/LaunchAgents/com.user.nami.plist
Model 3: VEX - Automated Vlog Editor
VEX automatically edits vlogs, adds memes, and creates short-form content while you
sleep.
Step 1: Set Up Local Environment
Create a configuration file:
cd ~/ai_models/vex
touch config.py
Edit config.py with the following content:
# Paths
INPUT_PATH = "/Users/yourusername/ai_models/vex/input"
OUTPUT_PATH = "/Users/yourusername/ai_models/vex/output"
TEMP_PATH = "/Users/yourusername/ai_models/vex/temp"
ASSETS_PATH = "/Users/yourusername/ai_models/vex/assets"
# ElevenLabs API (for voice synthesis)
ELEVENLABS_API_KEY = "YOUR_ELEVENLABS_API_KEY"
VOICE_ID = "BAPI_LARRI_VOICE_ID" # Same voice ID as RUSH
# YouTube API
YOUTUBE_API_KEY = "YOUR_YOUTUBE_API_KEY"
# Processing settings
MAX_PROCESSING_TIME = 7 * 60 * 60 # 7 hours in seconds
TARGET_SHORT_DURATION = 60 # 1 minute in seconds
NUM_SHORTS_TO_CREATE = 4 # Number of shorts to create
Create necessary directories:
mkdir -p ~/ai_models/vex/{input,output,temp,assets,logs}
mkdir -p ~/ai_models/vex/assets/{music,sfx,memes,transitions}
Step 2: Implement SSD Detection and Video Import
Create ssd_monitor.py :
import os
import time
import shutil
import subprocess
from config import INPUT_PATH
def get_mounted_volumes():
"""Get a list of mounted volumes"""
result = subprocess.run(['diskutil', 'list', '-plist'], capture_output=True, text=True)
# Parse the output to get volume names
volumes = []
for line in result.stdout.split('\n'):
if 'Volume' in line and '/Volumes/' in line:
volume = line.split('/Volumes/')[1].split("'")[0]
volumes.append(f"/Volumes/{volume}")
return volumes
def is_video_file(filename):
"""Check if a file is a video file based on extension"""
video_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.m4v']
return any(filename.lower().endswith(ext) for ext in video_extensions)
def import_videos_from_ssd():
"""Check for newly connected SSDs and import video files"""
# Get currently mounted volumes
volumes = get_mounted_volumes()
# Create input directory if it doesn't exist
os.makedirs(INPUT_PATH, exist_ok=True)
# Check each volume for video files
imported_files = []
for volume in volumes:
# Skip system volumes
if volume == '/Volumes/Macintosh HD':
continue
# Check if the volume exists
if not os.path.exists(volume):
continue
print(f"Checking volume: {volume}")
# Walk through the volume
for root, dirs, files in os.walk(volume):
for file in files:
if is_video_file(file):
source_path = os.path.join(root, file)
dest_path = os.path.join(INPUT_PATH, file)
# Copy the file if it doesn't already exist
if not os.path.exists(dest_path):
print(f"Importing: {source_path} -> {dest_path}")
shutil.copy2(source_path, dest_path)
imported_files.append(dest_path)
return imported_files
def monitor_for_ssd(check_interval=30):
"""Continuously monitor for newly connected SSDs"""
while True:
print("Checking for new SSDs...")
imported_files = import_videos_from_ssd()
if imported_files:
print(f"Imported {len(imported_files)} new video files.")
return imported_files
print(f"No new videos found. Checking again in {check_interval} seconds.")
time.sleep(check_interval)
if __name__ == "__main__":
monitor_for_ssd()
Step 3: Implement Video Analysis
Create video_analyzer.py :
import cv2
import numpy as np
import subprocess
import json
import os
from config import TEMP_PATH
def extract_frames(video_path, output_dir, fps=1):
"""Extract frames from video at specified FPS"""
os.makedirs(output_dir, exist_ok=True)
# Get video info
cap = cv2.VideoCapture(video_path)
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / video_fps
# Calculate frame interval
interval = int(video_fps / fps)
# Extract frames
count = 0
frame_paths = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if count % interval == 0:
frame_path = os.path.join(output_dir, f"frame_{count:06d}.jpg")
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
count += 1
cap.release()
return frame_paths, duration
def analyze_audio(video_path, output_dir):
"""Extract and analyze audio from video"""
os.makedirs(output_dir, exist_ok=True)
# Extract audio
audio_path = os.path.join(output_dir, "audio.wav")
subprocess.run([
"ffmpeg", "-i", video_path, "-q:a", "0", "-map", "a", audio_path, "-y"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Analyze audio for loudness
loudness_data = os.path.join(output_dir, "loudness.json")
subprocess.run([
"ffmpeg", "-i", audio_path, "-af",
"loudnorm=print_format=json", "-f", "null", "-"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Analyze audio for silence
silence_data = os.path.join(output_dir, "silence.txt")
subprocess.run([
"ffmpeg", "-i", audio_path, "-af",
"silencedetect=noise=-30dB:d=0.5", "-f", "null", "-"
], stdout=open(silence_data, 'w'), stderr=subprocess.STDOUT)
return audio_path, loudness_data, silence_data
def detect_scenes(video_path, threshold=30):
"""Detect scene changes in video"""
# Use PySceneDetect via command line
output_csv = os.path.join(TEMP_PATH, "scenes.csv")
subprocess.run([
"scenedetect", "-i", video_path, "detect-content",
f"--threshold={threshold}", "list-scenes", "-o", output_csv
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Parse the CSV file
scenes = []
if os.path.exists(output_csv):
with open(output_csv, 'r') as f:
lines = f.readlines()
for line in lines[1:]: # Skip header
parts = line.strip().split(',')
if len(parts) >= 4:
start_time = float(parts[0])
end_time = float(parts[1])
scenes.append((start_time, end_time))
return scenes
def analyze_video(video_path):
"""Perform comprehensive analysis of a video"""
# Create temporary directory
video_name = os.path.splitext(os.path.basename(video_path))[0]
temp_dir = os.path.join(TEMP_PATH, video_name)
os.makedirs(temp_dir, exist_ok=True)
# Extract frames
frames_dir = os.path.join(temp_dir, "frames")
frame_paths, duration = extract_frames(video_path, frames_dir)
# Analyze audio
audio_dir = os.path.join(temp_dir, "audio")
audio_path, loudness_data, silence_data = analyze_audio(video_path, audio_dir)
# Detect scenes
scenes = detect_scenes(video_path)
# Return analysis results
return {
"video_path": video_path,
"duration": duration,
"frame_paths": frame_paths,
"audio_path": audio_path,
"scenes": scenes,
"temp_dir": temp_dir
}
Step 4: Implement Trend Research
Create trend_researcher.py :
import os
import json
import requests
from pytube import YouTube, Search
from config import YOUTUBE_API_KEY, ASSETS_PATH
def search_trending_videos(category="entertainment", max_results=10):
"""Search for trending videos on YouTube"""
url = f"https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,statistics",
"chart": "mostPopular",
"regionCode": "US",
"videoCategoryId": "24" if category == "entertainment" else "20", #
24=Entertainment, 20=Gaming
"maxResults": max_results,
"key": YOUTUBE_API_KEY
}
response = requests.get(url, params=params)
data = response.json()
trending_videos = []
if "items" in data:
for item in data["items"]:
video_id = item["id"]
title = item["snippet"]["title"]
channel = item["snippet"]["channelTitle"]
views = item["statistics"]["viewCount"]
trending_videos.append({
"id": video_id,
"title": title,
"channel": channel,
"views": views,
"url": f"https://www.youtube.com/watch?v={video_id}"
})
return trending_videos
def search_trending_memes(max_results=10):
"""Search for trending meme videos on YouTube"""
search_results = Search("trending memes 2023").results
meme_videos = []
for i, result in enumerate(search_results):
if i >= max_results:
break
meme_videos.append({
"id": result.video_id,
"title": result.title,
"channel": result.author,
"url": f"https://www.youtube.com/watch?v={result.video_id}"
})
return meme_videos
def download_meme_clips(meme_videos, output_dir):
"""Download short clips from meme videos"""
os.makedirs(output_dir, exist_ok=True)
downloaded_clips = []
for meme in meme_videos:
try:
# Download the video
yt = YouTube(meme["url"])
stream = yt.streams.filter(progressive=True, file_extension="mp4").first()
if not stream:
continue
# Download to temporary location
temp_path = stream.download(output_path=output_dir,
filename=f"temp_{meme['id']}.mp4")
# Get video duration
duration = yt.length
# Extract a 3-second clip from the middle
middle_point = duration / 2
start_time = max(0, middle_point - 1.5)
end_time = min(duration, middle_point + 1.5)
clip_path = os.path.join(output_dir, f"meme_{meme['id']}.mp4")
# Use ffmpeg to extract the clip
os.system(f'ffmpeg -i "{temp_path}" -ss {start_time} -to {end_time} -c:v copy -
c:a copy "{clip_path}" -y')
# Remove the temporary file
os.remove(temp_path)
downloaded_clips.append({
"id": meme["id"],
"title": meme["title"],
"path": clip_path
})
except Exception as e:
print(f"Error downloading {meme['url']}: {str(e)}")
return downloaded_clips
def research_trends():
"""Research trending content and download assets"""
# Create assets directories
memes_dir = os.path.join(ASSETS_PATH, "memes")
os.makedirs(memes_dir, exist_ok=True)
# Get trending videos
trending_videos = search_trending_videos()
# Get trending memes
meme_videos = search_trending_memes()
# Download meme clips
meme_clips = download_meme_clips(meme_videos, memes_dir)
# Save research results
research_data = {
"trending_videos": trending_videos,
"meme_clips": meme_clips
}
with open(os.path.join(ASSETS_PATH, "trend_research.json"), 'w') as f:
json.dump(research_data, f, indent=2)
return research_data
Step 5: Implement Video Editing Pipeline
Create video_editor.py :
import os
import random
import subprocess
import json
import numpy as np
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips,
CompositeVideoClip, TextClip
from config import OUTPUT_PATH, ASSETS_PATH, TARGET_SHORT_DURATION
def create_edit_decision_list(video_analysis, trend_research):
"""Create an edit decision list based on video analysis and trends"""
scenes = video_analysis["scenes"]
duration = video_analysis["duration"]
# If no scenes detected, create artificial scenes
if not scenes:
scene_duration = 10 # 10 seconds per scene
num_scenes = int(duration / scene_duration)
scenes = [(i * scene_duration, (i + 1) * scene_duration) for i in
range(num_scenes)]
# Select interesting scenes for shorts
selected_scenes = []
# Prioritize scenes in the middle of the video (usually more interesting)
middle_index = len(scenes) // 2
priority_scenes = scenes[max(0, middle_index - 5):min(len(scenes), middle_index
+ 5)]
# Select scenes that are close to the target duration
for scene_start, scene_end in priority_scenes:
scene_duration = scene_end - scene_start
if 5 <= scene_duration <= 90: # Between 5 and 90 seconds
selected_scenes.append((scene_start, scene_end))
# If not enough scenes, add more from the rest
if len(selected_scenes) < 4:
for scene_start, scene_end in scenes:
if (scene_start, scene_end) not in selected_scenes:
scene_duration = scene_end - scene_start
if 5 <= scene_duration <= 90:
selected_scenes.append((scene_start, scene_end))
if len(selected_scenes) >= 4:
break
# Create edit decision list
edl = []
for i, (scene_start, scene_end) in enumerate(selected_scenes[:4]):
# Calculate target duration (around 60 seconds)
target_duration = min(scene_end - scene_start, TARGET_SHORT_DURATION)
# Select random meme clips
meme_clips = []
if trend_research and "meme_clips" in trend_research:
available_memes = trend_research["meme_clips"]
num_memes = min(3, len(available_memes))
selected_memes = random.sample(available_memes, num_memes)
meme_clips = [meme["path"] for meme in selected_memes]
edl.append({
"output_name": f"short_{i+1}.mp4",
"source_start": scene_start,
"source_end": scene_start + target_duration,
"meme_clips": meme_clips,
"add_text": True,
"add_music": True
})
return edl
def add_meme(main_clip, meme_path, position="random"):
"""Add a meme clip to the main video"""
meme_clip = VideoFileClip(meme_path)
# Resize meme to 1/4 of the main video
meme_clip = meme_clip.resize(width=main_clip.w // 4)
# Determine position
if position == "random":
position = random.choice(["top-left", "top-right", "bottom-left", "bottom-
right"])
if position == "top-left":
x_pos = 20
y_pos = 20
elif position == "top-right":
x_pos = main_clip.w - meme_clip.w - 20
y_pos = 20
elif position == "bottom-left":
x_pos = 20
y_pos = main_clip.h - meme_clip.h - 20
else: # bottom-right
x_pos = main_clip.w - meme_clip.w - 20
y_pos = main_clip.h - meme_clip.h - 20
# Set position
meme_clip = meme_clip.set_position((x_pos, y_pos))
# Determine when to show the meme (random time in the second half of the video)
main_duration = main_clip.duration
start_time = random.uniform(main_duration * 0.5, max(main_duration * 0.5,
main_duration - meme_clip.duration))
end_time = min(start_time + meme_clip.duration, main_duration)
# Set start and end times
meme_clip = meme_clip.set_start(start_time).set_end(end_time)
# Create composite clip
composite_clip = CompositeVideoClip([main_clip, meme_clip])
return composite_clip
def create_short(video_path, edit_info, output_dir):
"""Create a short video based on edit decision list"""
# Load the main video
main_clip = VideoFileClip(video_path).subclip(edit_info["source_start"],
edit_info["source_end"])
# Add memes if available
composite_clip = main_clip
for meme_path in edit_info["meme_clips"]:
if os.path.exists(meme_path):
composite_clip = add_meme(composite_clip, meme_path)
# Add text if requested
if edit_info["add_text"]:
# Create a title text
title = "Check out this moment!"
text_clip = TextClip(title, fontsize=30, color='white', bg_color='black', font='Arial-
Bold')
text_clip = text_clip.set_position(('center', 'bottom')).set_duration(5)
# Add text to the beginning
composite_clip = CompositeVideoClip([composite_clip, text_clip.set_start(0)])
# Set output path
output_path = os.path.join(output_dir, edit_info["output_name"])
# Write the result
composite_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
# Close clips to free memory
composite_clip.close()
if composite_clip != main_clip:
main_clip.close()
return output_path
def edit_vlog(video_path, video_analysis, trend_research):
"""Edit a vlog into multiple shorts"""
# Create output directory
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_dir = os.path.join(OUTPUT_PATH, video_name)
os.makedirs(output_dir, exist_ok=True)
# Create edit decision list
edl = create_edit_decision_list(video_analysis, trend_research)
# Create shorts
output_paths = []
for edit_info in edl:
output_path = create_short(video_path, edit_info, output_dir)
output_paths.append(output_path)
return output_paths
Step 6: Implement Voice-Over Generation
Create voice_synthesis.py :
import requests
import os
import json
import openai
from config import ELEVENLABS_API_KEY, VOICE_ID
def generate_script(video_content):
"""Generate a voice-over script based on video content"""
prompt = f"""
Create a short, engaging voice-over script for a vlog highlight. The content is
about:
{video_content}
The script should be conversational, energetic, and include some humor. It
should be about 30 seconds when read aloud.
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a creative content writer for YouTube
shorts."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
def synthesize_voice(text, output_path):
"""Convert text to speech using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": ELEVENLABS_API_KEY
}
data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
with open(output_path, 'wb') as f:
f.write(response.content)
return output_path
else:
print(f"Error: {response.status_code}")
print(response.text)
return None
def add_voiceover_to_video(video_path, audio_path, output_path):
"""Add voice-over audio to a video"""
# Use ffmpeg to add the voice-over
command = [
"ffmpeg", "-i", video_path,
"-i", audio_path,
"-filter_complex", "[0:a]volume=0.3[a1];[1:a]volume=1.0[a2];[a1]
[a2]amix=inputs=2:duration=longest",
"-c:v", "copy", output_path, "-y"
]
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return output_path
def create_voiceover(video_path, video_content, output_dir):
"""Create and add voice-over to a video"""
# Generate script
script = generate_script(video_content)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Get base filename
base_name = os.path.splitext(os.path.basename(video_path))[0]
# Synthesize voice
audio_path = os.path.join(output_dir, f"{base_name}_voiceover.mp3")
synthesize_voice(script, audio_path)
# Add voice-over to video
output_path = os.path.join(output_dir, f"{base_name}_with_voiceover.mp4")
add_voiceover_to_video(video_path, audio_path, output_path)
return output_path, script
Step 7: Create Main Processing Script
Create main.py :
import os
import time
import json
import logging
from datetime import datetime
from config import INPUT_PATH, OUTPUT_PATH, TEMP_PATH,
MAX_PROCESSING_TIME
from ssd_monitor import monitor_for_ssd
from video_analyzer import analyze_video
from trend_researcher import research_trends
from video_editor import edit_vlog
from voice_synthesis import create_voiceover
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'logs', 'vex.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger('VEX')
def process_video(video_path):
"""Process a single video file"""
start_time = time.time()
logger.info(f"Starting to process video: {video_path}")
try:
# Step 1: Analyze the video
logger.info("Analyzing video...")
video_analysis = analyze_video(video_path)
# Step 2: Research trends
logger.info("Researching trends...")
trend_research = research_trends()
# Step 3: Edit the vlog into shorts
logger.info("Editing vlog into shorts...")
output_paths = edit_vlog(video_path, video_analysis, trend_research)
# Step 4: Add voice-overs
logger.info("Adding voice-overs...")
final_outputs = []
for output_path in output_paths:
# Generate simple content description
video_name = os.path.basename(output_path)
content_description = f"A highlight from a vlog showing interesting
moments and activities."
# Create voice-over
output_dir = os.path.dirname(output_path)
final_path, script = create_voiceover(output_path, content_description,
output_dir)
final_outputs.append({"path": final_path, "script": script})
# Step 5: Create summary file
summary_path = os.path.join(os.path.dirname(output_paths[0]),
"summary.json")
with open(summary_path, 'w') as f:
json.dump({
"original_video": video_path,
"processing_time": time.time() - start_time,
"outputs": final_outputs
}, f, indent=2)
logger.info(f"Video processing completed in {time.time() - start_time:.2f}
seconds")
logger.info(f"Created {len(final_outputs)} shorts")
return final_outputs
except Exception as e:
logger.error(f"Error processing video: {str(e)}", exc_info=True)
return []
def main():
"""Main function to monitor for videos and process them"""
logger.info("Starting VEX - Automated Vlog Editor")
while True:
try:
# Step 1: Monitor for new videos from SSD
logger.info("Monitoring for new videos from SSD...")
video_files = monitor_for_ssd()
if not video_files:
logger.info("No new videos found. Waiting...")
time.sleep(60)
continue
# Step 2: Process each video
for video_path in video_files:
# Check if we have enough time to process
current_time = datetime.now()
# Only process if it's between 10 PM and 5 AM
if 22 <= current_time.hour or current_time.hour < 5:
logger.info(f"Processing video: {video_path}")
process_video(video_path)
else:
logger.info(f"Skipping processing until night time. Current hour:
{current_time.hour}")
# Step 3: Wait before checking again
logger.info("Finished processing. Waiting for new videos...")
time.sleep(300) # Wait 5 minutes before checking again
except Exception as e:
logger.error(f"Error in main loop: {str(e)}", exc_info=True)
time.sleep(60) # Wait a minute before retrying
if __name__ == "__main__":
main()
Step 8: Create Startup Script
Create start_vex.sh :
#!/bin/bash
cd ~/ai_models/vex
python3 main.py
Make it executable:
chmod +x start_vex.sh
Step 9: Set Up Automatic Startup
Create a LaunchAgent to start VEX automatically:
mkdir -p ~/Library/LaunchAgents
Create ~/Library/LaunchAgents/com.user.vex.plist :
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/
DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.user.vex</string>
<key>ProgramArguments</key>
<array>
<string>/Users/yourusername/ai_models/vex/start_vex.sh</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/yourusername/ai_models/vex/logs/error.log</string>
<key>StandardOutPath</key>
<string>/Users/yourusername/ai_models/vex/logs/output.log</string>
</dict>
</plist>
Load the LaunchAgent:
launchctl load ~/Library/LaunchAgents/com.user.vex.plist
System Integration and Resource Management
Resource Allocation
To ensure all three models can run efficiently on your Mac Mini M4 with 24GB RAM:
1. Memory Management: ```python # Add to each model's main script import
resource
# RUSH: Limit to 6GB resource.setrlimit(resource.RLIMIT_AS, (6 * 1024 * 1024 * 1024, -1))
# NAMI: Limit to 4GB resource.setrlimit(resource.RLIMIT_AS, (4 * 1024 * 1024 * 1024, -1))
# VEX: Limit to 12GB resource.setrlimit(resource.RLIMIT_AS, (12 * 1024 * 1024 * 1024, -1))
```
1. Storage Management: Create a cleanup script to manage storage:
```bash #!/bin/bash # cleanup.sh
# Set maximum storage usage (in GB) MAX_STORAGE=400
# Clean up temporary files find ~/ai_models/*/temp -type f -mtime +7 -delete
# Check current storage usage CURRENT_USAGE=$(du -sg ~/ai_models | cut -f1)
# If usage exceeds limit, remove oldest files if [ $CURRENT_USAGE -gt $MAX_STORAGE ];
then echo "Storage usage exceeds limit. Cleaning up..."
# Remove oldest input videos
find ~/ai_models/vex/input -type f -printf '%T+ %p\n' | sort | head -n 10 | cut -d' '
-f2 | xargs rm -f
# Remove oldest output videos
find ~/ai_models/vex/output -type d -printf '%T+ %p\n' | sort | head -n 5 | cut -d'
' -f2 | xargs rm -rf
fi ```
Add to crontab to run daily: bash 0 4 * * * /Users/yourusername/ai_models/shared/
cleanup.sh >> /Users/yourusername/ai_models/shared/cleanup.log 2>&1
System Monitoring
Create a monitoring script to ensure all services are running:
# monitor.py
import os
import subprocess
import time
import smtplib
from email.mime.text import MIMEText
def check_process(process_name):
"""Check if a process is running"""
result = subprocess.run(['pgrep', '-f', process_name], capture_output=True)
return result.returncode == 0
def restart_process(process_name, start_script):
"""Restart a process"""
subprocess.run(['pkill', '-f', process_name])
time.sleep(2)
subprocess.run(['bash', start_script])
def send_notification(subject, message):
"""Send an email notification"""
# Configure your email settings
sender = 'your_email@example.com'
recipient = 'your_email@example.com'
password = 'your_app_password'
msg = MIMEText(message)
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = recipient
try:
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender, password)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email: {str(e)}")
def main():
"""Main monitoring function"""
processes = [
{'name': 'bot.py', 'script': '/Users/yourusername/ai_models/rush/start_rush.sh',
'service': 'RUSH'},
{'name': 'bot.py', 'script': '/Users/yourusername/ai_models/nami/
start_nami.sh', 'service': 'NAMI'},
{'name': 'main.py', 'script': '/Users/yourusername/ai_models/vex/start_vex.sh',
'service': 'VEX'}
]
for process in processes:
if not check_process(process['name']):
print(f"{process['service']} is not running. Restarting...")
restart_process(process['name'], process['script'])
send_notification(
f"{process['service']} Restarted",
f"The {process['service']} service was not running and has been restarted."
)
if __name__ == '__main__':
main()
Add to crontab to run every 15 minutes:
*/15 * * * * /usr/bin/python3 /Users/yourusername/ai_models/shared/monitor.py
>> /Users/yourusername/ai_models/shared/monitor.log 2>&1
Troubleshooting and Optimization
Common Issues and Solutions
1. Telegram Bot Not Responding:
2. Check if the bot is running: ps aux | grep bot.py
3. Verify internet connection: ping telegram.org
4. Restart the bot: launchctl unload ~/Library/LaunchAgents/com.user.rush.plist &&
launchctl load ~/Library/LaunchAgents/com.user.rush.plist
5. Video Processing Errors:
6. Check FFmpeg installation: ffmpeg -version
7. Verify Python dependencies: pip3 list | grep -E 'moviepy|opencv'
8. Check available disk space: df -h
9. Check log files: tail -n 100 ~/ai_models/vex/logs/vex.log
10. Memory Issues:
11. Monitor memory usage: top -o mem
12. Reduce model sizes in Whisper (use 'small' instead of 'medium')
13. Adjust resource limits in the scripts
Performance Optimization
1. Reduce Whisper Model Size: For RUSH, if transcription is slow: python # Change
in transcribe.py model = whisper.load_model("small") # Instead of "medium"
2. Optimize Video Processing: For VEX, to speed up processing: python # Change in
video_analyzer.py def extract_frames(video_path, output_dir, fps=0.5): # Reduce
from 1 to 0.5 fps
3. Use Hardware Acceleration: For video encoding/decoding: bash # Add to ffmpeg
commands -hwaccel videotoolbox
Alternative Approaches
Cloud-Based Alternative
If the Mac Mini's resources are insufficient, consider these cloud alternatives:
1. RUSH Alternative:
2. Use OpenAI's Whisper API for transcription
3. Use ChatGPT API for question generation
4. Host the Telegram bot on a small cloud instance (AWS Lambda or Google Cloud
Functions)
5. NAMI Alternative:
6. Set up a small cloud VM (AWS EC2 t2.micro)
7. Install an SSH server with key authentication
8. Create a secure tunnel to your Mac Mini
9. Run the Telegram bot on the cloud VM
10. VEX Alternative:
11. Use AWS S3 for video storage
12. Use AWS Elastic Transcoder or Google Cloud Video Intelligence API
13. Set up a processing pipeline with AWS Lambda functions
14. Use Amazon Polly for voice synthesis
Simplified Local Setup
If you want a lighter setup that uses fewer resources:
1. Simplified RUSH:
2. Use pre-recorded questions instead of generating them
3. Skip voice synthesis and just send text responses
4. Use a lighter transcription model
5. Simplified NAMI:
6. Limit to basic file operations only
7. Remove natural language processing
8. Use predefined command templates
9. Simplified VEX:
10. Process videos during the day at scheduled times
11. Create fewer shorts (1-2 instead of 4)
12. Skip trend research and use a fixed set of effects
This comprehensive guide provides all the necessary steps to implement the three AI
models on your Mac Mini M4. The implementation focuses on efficiency, automation,
and resource management to ensure all three models can run simultaneously without
issues.