A comprehensive, production-ready framework for building intelligent AI agents with advanced capabilities including tool calling, persistent memory, intelligent concurrency, and event-driven observability.
Mobfish is an enterprise-grade AI agent framework that combines the power of Large Language Models with a sophisticated tool ecosystem. Originally designed for Amazon product competitor analysis, it has evolved into a versatile platform supporting multiple domains including e-commerce, finance, and marketing analytics.
- SQLite-based session storage for conversation continuity across restarts
- Cross-session memory access for long-term learning and context retention
- Automatic session management with cleanup and analytics capabilities
- Context preservation maintaining user preferences and learned information
- Automatic readonly tool detection for parallel execution optimization
- Smart execution planning that maintains data consistency while maximizing performance
- Zero-configuration concurrency with built-in safety guarantees
- Performance monitoring with detailed execution time analysis
- Comprehensive event system tracking all agent operations
- Real-time monitoring of tool execution, LLM interactions, and system events
- Pluggable subscribers for custom logging, monitoring, and analytics
- Debug-friendly architecture with full audit trails
- Domain-specific toolsets organized by business function
- Extensible model adapters supporting multiple LLM providers
- Type-safe tool system with automatic schema generation
- Clean separation of concerns enabling easy maintenance and testing
- Multi-domain support: Amazon/E-commerce, Finance, Marketing, and more
- Automatic tool discovery across multiple modules
- Pydantic-based validation ensuring type safety and error prevention
- Intelligent tool classification for optimal execution strategies
mobfish/ # Core framework package
βββ bus/ # Event-driven observability system
β βββ event_bus.py # Central event dispatcher
β βββ events.py # Event type definitions
β βββ subscribers.py # Event handlers and loggers
βββ core/ # Core orchestration and schemas
β βββ orchestrator.py # Main agent controller with concurrency
β βββ schemas.py # Pydantic data models
βββ reasoning/ # LLM integration layer
β βββ adapters/ # Model-specific adapters
β βββ base.py # Abstract adapter interface
β βββ gemini_adapter.py # Google Gemini implementation
βββ resources/ # Tool ecosystem
β βββ tools/ # Tool infrastructure
β βββ definitions.py # @tool decorator implementation
β βββ discovery.py # Automatic tool discovery
β βββ toolsets/ # Domain-specific tool collections
β βββ amazon.py # E-commerce analysis tools
β βββ finance.py # Financial analysis tools
β βββ marketing.py # Marketing analytics tools
βββ state/ # Persistent memory system
βββ persistence.py # Storage adapters (SQLite, Memory)
βββ session.py # Session management
data/ # Sample datasets
βββ amazon_kids_clothing_db.json # Product database
βββ price_history.csv # Historical pricing data
βββ sales_history.csv # Sales performance data
βββ sales_predictions.csv # Forecasting data
βββ strategy_history.csv # Marketing strategy tracking
# Demo Applications
main.py # Basic competitor analysis demo
main_concurrency_demo.py # Intelligent concurrency showcase
main_memory_demo.py # Persistent memory demonstration
main_simple_concurrency.py # Simple concurrency example
# Configuration & Compatibility
tools.py # Backward compatibility layer
.env # Environment configuration
mobfish_sessions.db # SQLite session database (auto-created)
# Install required dependencies
pip install google-generativeai pandas pydantic python-dotenv
# Clone or download the project
git clone <repository-url>
cd MobfishCreate a .env file in the project root:
GOOGLE_API_KEY=your_gemini_api_key_here
GEMINI_MODEL_NAME=gemini-1.5-flash# Basic competitor analysis
python main.py
# Intelligent concurrency demo
python main_concurrency_demo.py
# Persistent memory showcase
python main_memory_demo.py
# Simple concurrency example
python main_simple_concurrency.pyfrom mobfish import Orchestrator
from mobfish.resources.tools.toolsets.amazon import (
find_similar_products_by_image,
get_price_history,
analyze_design_patterns
)
# Create agent with tools
agent = Orchestrator(tools=[
find_similar_products_by_image,
get_price_history,
analyze_design_patterns
])
# Run analysis
result = agent.run("Analyze product B0CYG8P329")from mobfish import Orchestrator
from mobfish.bus import EventBus, LoggingSubscriber
# Set up event-driven observability
event_bus = EventBus()
logging_subscriber = LoggingSubscriber()
event_bus.subscribe(logging_subscriber.handle_event)
# Create agent with auto-discovery and event bus
agent = Orchestrator.create_with_auto_discovery(event_bus=event_bus)
# Run with persistent memory and intelligent concurrency
result = agent.run_with_memory_and_concurrency_sync(
initial_prompt="Analyze multiple products concurrently",
session_id="user_session_001",
max_turns=5
)from mobfish.resources import tool
from pydantic import BaseModel
from typing import Dict, Any
class ProductInput(BaseModel):
asin: str
include_reviews: bool = True
@tool(pydantic_model=ProductInput)
def analyze_product_sentiment(asin: str, include_reviews: bool = True) -> Dict[str, Any]:
"""Analyze customer sentiment for a product."""
try:
# Your analysis logic here
sentiment_score = perform_sentiment_analysis(asin, include_reviews)
return {
"status": "success",
"data": {
"asin": asin,
"sentiment_score": sentiment_score,
"confidence": 0.95
}
}
except Exception as e:
return {"status": "error", "message": str(e)}# Load tools from multiple domains
from mobfish.resources.tools.toolsets import amazon, finance, marketing
# Create comprehensive agent
all_tools = [
# E-commerce tools
amazon.find_similar_products_by_image,
amazon.get_price_history,
# Financial tools
finance.calculate_profit_margins,
finance.analyze_market_trends,
# Marketing tools
marketing.analyze_customer_sentiment,
marketing.optimize_pricing_strategy
]
agent = Orchestrator(tools=all_tools)
# Run comprehensive business analysis
result = agent.run_with_concurrency(
"Perform a complete business analysis including product research, "
"financial projections, and marketing strategy recommendations"
)The framework automatically classifies tools as readonly or write operations:
- Readonly tools (data retrieval, analysis) execute in parallel for maximum performance
- Write tools (data modification, external actions) execute serially for data consistency
- Zero configuration required - intelligence is built into the system
- Session-based memory preserving context across agent restarts
- SQLite storage for reliability and performance
- Cross-session access enabling long-term learning
- Automatic cleanup and session management
- Comprehensive event tracking for all system operations
- Real-time monitoring of performance and behavior
- Extensible subscriber system for custom analytics
- Debug-friendly logging with full operation audit trails
- Domain-specific organization (Amazon, Finance, Marketing)
- Type-safe interfaces with Pydantic validation
- Automatic schema generation for LLM consumption
- Easy extensibility for new domains and capabilities
from mobfish.reasoning.adapters.base import ModelAdapter
class CustomAdapter(ModelAdapter):
def generate_response(self, messages, tools=None, **kwargs):
# Implement your LLM integration
pass
def format_tools_for_model(self, tools):
# Convert tools to your model's format
pass
# Use custom adapter
agent = Orchestrator(tools=my_tools, model_adapter=CustomAdapter())from mobfish.bus import EventBus
class PerformanceMonitor:
def handle_event(self, event):
if hasattr(event, 'execution_time'):
self.log_performance(event.tool_name, event.execution_time)
event_bus = EventBus()
monitor = PerformanceMonitor()
event_bus.subscribe(monitor.handle_event)from mobfish.state import PersistenceAdapter
class RedisAdapter(PersistenceAdapter):
def save_session(self, session_id: str, data: dict) -> bool:
# Implement Redis storage
pass
def load_session(self, session_id: str) -> dict:
# Implement Redis retrieval
pass- Intelligent concurrency with automatic readonly tool detection
- Persistent memory eliminating redundant processing
- Event-driven architecture for minimal overhead observability
- Efficient data structures optimized for agent workloads
- Connection pooling and resource management
- Type safety throughout the entire system
- Comprehensive error handling with graceful degradation
- Audit trails for all operations and decisions
- Session management with automatic cleanup
- Extensible architecture supporting custom integrations
- Production-ready logging and monitoring
# Run basic functionality test
python main.py
# Test concurrency performance
python main_concurrency_demo.py
# Test persistent memory
python main_memory_demo.py
# Simple concurrency validation
python main_simple_concurrency.py-
Missing API Key
# Create .env file with your Gemini API key echo "GOOGLE_API_KEY=your_key_here" > .env
-
Tool Discovery Issues
# Verify tools are properly decorated from mobfish.resources.tools.discovery import discover_tools tools = discover_tools('mobfish.resources.tools.toolsets.amazon') print(f"Discovered {len(tools)} tools")
-
Memory/Session Issues
# Check session database agent = Orchestrator(tools=[]) sessions = agent.list_sessions() print(f"Found {len(sessions)} sessions")
-
Performance Issues
# Enable detailed logging from mobfish.bus import EventBus, DetailedLoggingSubscriber event_bus = EventBus() detailed_logger = DetailedLoggingSubscriber() event_bus.subscribe(detailed_logger.handle_event)
GOOGLE_API_KEY=your_production_api_key
GEMINI_MODEL_NAME=gemini-1.5-pro
DATABASE_PATH=/path/to/production/sessions.db
LOG_LEVEL=INFO
MAX_CONCURRENT_TOOLS=10
SESSION_TIMEOUT_HOURS=24# Production monitoring
from mobfish.bus import EventBus
import logging
class ProductionMonitor:
def __init__(self):
self.logger = logging.getLogger('aetherium_production')
def handle_event(self, event):
if event.event_type == 'error':
self.logger.error(f"Agent error: {event}")
elif hasattr(event, 'execution_time') and event.execution_time > 10:
self.logger.warning(f"Slow tool execution: {event}")
event_bus = EventBus()
monitor = ProductionMonitor()
event_bus.subscribe(monitor.handle_event)- Additional LLM Support: OpenAI GPT, Anthropic Claude, local models
- Enhanced Toolsets: More domain-specific tool collections
- Advanced Memory: Vector embeddings, semantic search
- Distributed Execution: Multi-node agent coordination
- Web Interface: Browser-based agent interaction
- API Gateway: RESTful API for agent services
This project is part of a proof-of-concept demonstration and research initiative.
- Lao Wang & Xiao Zhu - Core Architecture & Development
- Version: 1.0.0 - Victory Edition
- Status: Production Ready
We welcome contributions! Please see individual module README files for specific development guidelines and best practices.
Mobfish: Where AI meets enterprise-grade performance and reliability. π