Skip to content

A comprehensive, production-ready framework for building intelligent AI agents with advanced capabilities including tool calling, persistent memory, intelligent concurrency, and event-driven observability

License

Notifications You must be signed in to change notification settings

mobfish-ai/mobfish-agent

Repository files navigation

Mobfish: Enterprise AI Agent Framework

A comprehensive, production-ready framework for building intelligent AI agents with advanced capabilities including tool calling, persistent memory, intelligent concurrency, and event-driven observability.

🌟 Overview

Mobfish is an enterprise-grade AI agent framework that combines the power of Large Language Models with a sophisticated tool ecosystem. Originally designed for Amazon product competitor analysis, it has evolved into a versatile platform supporting multiple domains including e-commerce, finance, and marketing analytics.

πŸš€ Key Features

🧠 Persistent Memory System

  • SQLite-based session storage for conversation continuity across restarts
  • Cross-session memory access for long-term learning and context retention
  • Automatic session management with cleanup and analytics capabilities
  • Context preservation maintaining user preferences and learned information

⚑ Intelligent Concurrency Engine

  • Automatic readonly tool detection for parallel execution optimization
  • Smart execution planning that maintains data consistency while maximizing performance
  • Zero-configuration concurrency with built-in safety guarantees
  • Performance monitoring with detailed execution time analysis

🎯 Event-Driven Observability

  • Comprehensive event system tracking all agent operations
  • Real-time monitoring of tool execution, LLM interactions, and system events
  • Pluggable subscribers for custom logging, monitoring, and analytics
  • Debug-friendly architecture with full audit trails

πŸ”§ Modular Architecture

  • Domain-specific toolsets organized by business function
  • Extensible model adapters supporting multiple LLM providers
  • Type-safe tool system with automatic schema generation
  • Clean separation of concerns enabling easy maintenance and testing

πŸ› οΈ Advanced Tool System

  • Multi-domain support: Amazon/E-commerce, Finance, Marketing, and more
  • Automatic tool discovery across multiple modules
  • Pydantic-based validation ensuring type safety and error prevention
  • Intelligent tool classification for optimal execution strategies

πŸ“ Project Structure

mobfish/                    # Core framework package
β”œβ”€β”€ bus/                          # Event-driven observability system
β”‚   β”œβ”€β”€ event_bus.py             # Central event dispatcher
β”‚   β”œβ”€β”€ events.py                # Event type definitions
β”‚   └── subscribers.py           # Event handlers and loggers
β”œβ”€β”€ core/                        # Core orchestration and schemas
β”‚   β”œβ”€β”€ orchestrator.py          # Main agent controller with concurrency
β”‚   └── schemas.py               # Pydantic data models
β”œβ”€β”€ reasoning/                   # LLM integration layer
β”‚   └── adapters/                # Model-specific adapters
β”‚       β”œβ”€β”€ base.py              # Abstract adapter interface
β”‚       └── gemini_adapter.py    # Google Gemini implementation
β”œβ”€β”€ resources/                   # Tool ecosystem
β”‚   └── tools/                   # Tool infrastructure
β”‚       β”œβ”€β”€ definitions.py       # @tool decorator implementation
β”‚       β”œβ”€β”€ discovery.py         # Automatic tool discovery
β”‚       └── toolsets/            # Domain-specific tool collections
β”‚           β”œβ”€β”€ amazon.py        # E-commerce analysis tools
β”‚           β”œβ”€β”€ finance.py       # Financial analysis tools
β”‚           └── marketing.py     # Marketing analytics tools
└── state/                       # Persistent memory system
    β”œβ”€β”€ persistence.py           # Storage adapters (SQLite, Memory)
    └── session.py               # Session management

data/                            # Sample datasets
β”œβ”€β”€ amazon_kids_clothing_db.json # Product database
β”œβ”€β”€ price_history.csv           # Historical pricing data
β”œβ”€β”€ sales_history.csv           # Sales performance data
β”œβ”€β”€ sales_predictions.csv       # Forecasting data
└── strategy_history.csv        # Marketing strategy tracking

# Demo Applications
main.py                         # Basic competitor analysis demo
main_concurrency_demo.py        # Intelligent concurrency showcase
main_memory_demo.py             # Persistent memory demonstration
main_simple_concurrency.py     # Simple concurrency example

# Configuration & Compatibility
tools.py                        # Backward compatibility layer
.env                           # Environment configuration
mobfish_sessions.db          # SQLite session database (auto-created)

πŸš€ Quick Start

1. Installation

# Install required dependencies
pip install google-generativeai pandas pydantic python-dotenv

# Clone or download the project
git clone <repository-url>
cd Mobfish

2. Configuration

Create a .env file in the project root:

GOOGLE_API_KEY=your_gemini_api_key_here
GEMINI_MODEL_NAME=gemini-1.5-flash

3. Run Demonstrations

# Basic competitor analysis
python main.py

# Intelligent concurrency demo
python main_concurrency_demo.py

# Persistent memory showcase
python main_memory_demo.py

# Simple concurrency example
python main_simple_concurrency.py

πŸ’‘ Usage Examples

Basic Agent Setup

from mobfish import Orchestrator
from mobfish.resources.tools.toolsets.amazon import (
    find_similar_products_by_image,
    get_price_history,
    analyze_design_patterns
)

# Create agent with tools
agent = Orchestrator(tools=[
    find_similar_products_by_image,
    get_price_history,
    analyze_design_patterns
])

# Run analysis
result = agent.run("Analyze product B0CYG8P329")

Advanced: Memory + Concurrency + Observability

from mobfish import Orchestrator
from mobfish.bus import EventBus, LoggingSubscriber

# Set up event-driven observability
event_bus = EventBus()
logging_subscriber = LoggingSubscriber()
event_bus.subscribe(logging_subscriber.handle_event)

# Create agent with auto-discovery and event bus
agent = Orchestrator.create_with_auto_discovery(event_bus=event_bus)

# Run with persistent memory and intelligent concurrency
result = agent.run_with_memory_and_concurrency_sync(
    initial_prompt="Analyze multiple products concurrently",
    session_id="user_session_001",
    max_turns=5
)

Creating Custom Tools

from mobfish.resources import tool
from pydantic import BaseModel
from typing import Dict, Any

class ProductInput(BaseModel):
    asin: str
    include_reviews: bool = True

@tool(pydantic_model=ProductInput)
def analyze_product_sentiment(asin: str, include_reviews: bool = True) -> Dict[str, Any]:
    """Analyze customer sentiment for a product."""
    try:
        # Your analysis logic here
        sentiment_score = perform_sentiment_analysis(asin, include_reviews)
        
        return {
            "status": "success",
            "data": {
                "asin": asin,
                "sentiment_score": sentiment_score,
                "confidence": 0.95
            }
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}

Multi-Domain Analysis

# Load tools from multiple domains
from mobfish.resources.tools.toolsets import amazon, finance, marketing

# Create comprehensive agent
all_tools = [
    # E-commerce tools
    amazon.find_similar_products_by_image,
    amazon.get_price_history,
    
    # Financial tools
    finance.calculate_profit_margins,
    finance.analyze_market_trends,
    
    # Marketing tools
    marketing.analyze_customer_sentiment,
    marketing.optimize_pricing_strategy
]

agent = Orchestrator(tools=all_tools)

# Run comprehensive business analysis
result = agent.run_with_concurrency(
    "Perform a complete business analysis including product research, "
    "financial projections, and marketing strategy recommendations"
)

πŸ—οΈ Architecture Highlights

Intelligent Concurrency System

The framework automatically classifies tools as readonly or write operations:

  • Readonly tools (data retrieval, analysis) execute in parallel for maximum performance
  • Write tools (data modification, external actions) execute serially for data consistency
  • Zero configuration required - intelligence is built into the system

Persistent Memory Brain

  • Session-based memory preserving context across agent restarts
  • SQLite storage for reliability and performance
  • Cross-session access enabling long-term learning
  • Automatic cleanup and session management

Event-Driven Observability

  • Comprehensive event tracking for all system operations
  • Real-time monitoring of performance and behavior
  • Extensible subscriber system for custom analytics
  • Debug-friendly logging with full operation audit trails

Modular Tool Ecosystem

  • Domain-specific organization (Amazon, Finance, Marketing)
  • Type-safe interfaces with Pydantic validation
  • Automatic schema generation for LLM consumption
  • Easy extensibility for new domains and capabilities

πŸ”§ Advanced Configuration

Custom Model Adapters

from mobfish.reasoning.adapters.base import ModelAdapter

class CustomAdapter(ModelAdapter):
    def generate_response(self, messages, tools=None, **kwargs):
        # Implement your LLM integration
        pass
    
    def format_tools_for_model(self, tools):
        # Convert tools to your model's format
        pass

# Use custom adapter
agent = Orchestrator(tools=my_tools, model_adapter=CustomAdapter())

Custom Event Subscribers

from mobfish.bus import EventBus

class PerformanceMonitor:
    def handle_event(self, event):
        if hasattr(event, 'execution_time'):
            self.log_performance(event.tool_name, event.execution_time)

event_bus = EventBus()
monitor = PerformanceMonitor()
event_bus.subscribe(monitor.handle_event)

Custom Storage Backends

from mobfish.state import PersistenceAdapter

class RedisAdapter(PersistenceAdapter):
    def save_session(self, session_id: str, data: dict) -> bool:
        # Implement Redis storage
        pass
    
    def load_session(self, session_id: str) -> dict:
        # Implement Redis retrieval
        pass

πŸ“Š Performance Features

  • Intelligent concurrency with automatic readonly tool detection
  • Persistent memory eliminating redundant processing
  • Event-driven architecture for minimal overhead observability
  • Efficient data structures optimized for agent workloads
  • Connection pooling and resource management

πŸ›‘οΈ Enterprise Features

  • Type safety throughout the entire system
  • Comprehensive error handling with graceful degradation
  • Audit trails for all operations and decisions
  • Session management with automatic cleanup
  • Extensible architecture supporting custom integrations
  • Production-ready logging and monitoring

πŸ§ͺ Testing & Development

# Run basic functionality test
python main.py

# Test concurrency performance
python main_concurrency_demo.py

# Test persistent memory
python main_memory_demo.py

# Simple concurrency validation
python main_simple_concurrency.py

πŸ” Troubleshooting

Common Issues

  1. Missing API Key

    # Create .env file with your Gemini API key
    echo "GOOGLE_API_KEY=your_key_here" > .env
  2. Tool Discovery Issues

    # Verify tools are properly decorated
    from mobfish.resources.tools.discovery import discover_tools
    tools = discover_tools('mobfish.resources.tools.toolsets.amazon')
    print(f"Discovered {len(tools)} tools")
  3. Memory/Session Issues

    # Check session database
    agent = Orchestrator(tools=[])
    sessions = agent.list_sessions()
    print(f"Found {len(sessions)} sessions")
  4. Performance Issues

    # Enable detailed logging
    from mobfish.bus import EventBus, DetailedLoggingSubscriber
    event_bus = EventBus()
    detailed_logger = DetailedLoggingSubscriber()
    event_bus.subscribe(detailed_logger.handle_event)

πŸš€ Production Deployment

Environment Variables

GOOGLE_API_KEY=your_production_api_key
GEMINI_MODEL_NAME=gemini-1.5-pro
DATABASE_PATH=/path/to/production/sessions.db
LOG_LEVEL=INFO
MAX_CONCURRENT_TOOLS=10
SESSION_TIMEOUT_HOURS=24

Monitoring Setup

# Production monitoring
from mobfish.bus import EventBus
import logging

class ProductionMonitor:
    def __init__(self):
        self.logger = logging.getLogger('aetherium_production')
    
    def handle_event(self, event):
        if event.event_type == 'error':
            self.logger.error(f"Agent error: {event}")
        elif hasattr(event, 'execution_time') and event.execution_time > 10:
            self.logger.warning(f"Slow tool execution: {event}")

event_bus = EventBus()
monitor = ProductionMonitor()
event_bus.subscribe(monitor.handle_event)

πŸ“ˆ Roadmap & Extensions

  • Additional LLM Support: OpenAI GPT, Anthropic Claude, local models
  • Enhanced Toolsets: More domain-specific tool collections
  • Advanced Memory: Vector embeddings, semantic search
  • Distributed Execution: Multi-node agent coordination
  • Web Interface: Browser-based agent interaction
  • API Gateway: RESTful API for agent services

πŸ“„ License

This project is part of a proof-of-concept demonstration and research initiative.

πŸ‘₯ Authors & Contributors

  • Lao Wang & Xiao Zhu - Core Architecture & Development
  • Version: 1.0.0 - Victory Edition
  • Status: Production Ready

🀝 Contributing

We welcome contributions! Please see individual module README files for specific development guidelines and best practices.


Mobfish: Where AI meets enterprise-grade performance and reliability. πŸš€

About

A comprehensive, production-ready framework for building intelligent AI agents with advanced capabilities including tool calling, persistent memory, intelligent concurrency, and event-driven observability

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages