π Now with Multi-MCP Support & Configurable System Prompts!
Intelligent Kafka ecosystem management powered by AI, integrating both Schema Registry and Kafka Brokers MCP servers with fully customizable AI behavior through configuration files.
- YAML-based Configuration: Customize AI behavior without code changes
- Environment-Specific Prompts: Different AI personalities for dev/staging/prod
- Organization Customization: Adapt to your company's terminology and policies
- Dynamic Reloading: Update prompts at runtime
- Prompt Templates: Reusable templates for common operations
- Risk Tolerance Settings: Configurable safety levels per environment
- Dual MCP Servers: Seamlessly integrates Kafka Schema Registry MCP and Kafka Brokers MCP
- Unified Management: Single interface for schemas, topics, brokers, and consumer groups
- Cross-Domain Analysis: Correlate schemas with topics, analyze data pipelines
- π Schema Evolution Monitoring - Track changes with customizable analysis criteria
- π‘οΈ Breaking Change Prevention - Configurable compatibility checking
- π Auto-Documentation - Generate docs with organization-specific templates
- π¬ Natural Language Queries - Customizable intent recognition and responses
- π€ Multi-LLM Support - Anthropic, OpenAI, Google, or self-hosted models
Configure AI behavior in prompts.yaml:
# Customize the AI's role and expertise
global:
role: |
You are an expert Kafka architect for ACME Corp specializing in:
- E-commerce event streaming
- Real-time fraud detection
- PCI and GDPR compliance
# Environment-specific behavior
environments:
production:
tone_adjustment: |
Be extremely cautious. Always provide rollback procedures.
risk_tolerance: "low"
development:
tone_adjustment: |
Be educational and detailed. Explain the reasoning.
risk_tolerance: "high"
# Operation-specific prompts
operations:
schema_analysis:
evaluation_criteria: |
Evaluate based on:
1. PCI compliance for payment fields
2. GDPR requirements for PII
3. Company naming standards (ACME-SCHEMA-XXX)
4. Performance SLA requirements- Global Settings: Define AI role, tone, and constraints
- Operation Prompts: Customize prompts for each operation type
- Environment Overrides: Different behavior for dev/staging/prod
- Custom Templates: Create reusable prompt templates
- Dynamic Variables: Template substitution with context variables
- Chain-of-Thought: Configurable reasoning strategies
# Clone repository
git clone https://github.com/aywengo/kafka-ai-agent.git
cd kafka-ai-agent
# Install dependencies
pip install -r requirements.txt
# Install BOTH MCP servers
npm install -g @aywengo/kafka-schema-reg-mcp
npm install -g @aywengo/kafka-brokers-mcp
# Configure environment
cp .env.example .env
# Edit .env with your settings# See how configurable prompts work
python examples/prompt_customization_demo.pyThis interactive demo shows:
- Environment-specific prompt behavior
- Organization customization
- Dynamic prompt building
- Interactive configuration
- Prompt testing and validation
from kafka_ai_agent_configurable import ConfigurableKafkaAIAgent
# Initialize with custom prompts
agent = ConfigurableKafkaAIAgent(
config_path="config.yaml",
prompts_path="prompts.yaml" # Your custom prompts
)
await agent.initialize()
# Customize for your organization
org_config = {
"global": {
"role": "You are a Kafka expert for FinTech Corp..."
},
"operations": {
"schema_analysis": {
"evaluation_criteria": "Focus on financial compliance..."
}
}
}
await agent.customize_prompts(org_config)
# Use with customized behavior
result = await agent.analyze_schema_with_prompts(
"payment-events",
"production"
)# Analyze with environment-specific prompts
python cli_enhanced.py ecosystem --environment prod
# The AI will automatically adjust its behavior based on:
# - Environment (cautious in prod, educational in dev)
# - Risk tolerance settings
# - Organization-specific requirements# Start API server
uvicorn api_enhanced:app --reload
# The API automatically uses configured prompts for all operations# Global AI configuration
global:
role: "AI expertise and background"
tone: "Communication style"
constraints: "Safety and compliance rules"
# Operation-specific prompts
operations:
schema_analysis:
base: "Main prompt for analysis"
evaluation_criteria: "What to evaluate"
output_format: "How to structure response"
context_variables: ["list", "of", "variables"]
schema_evolution:
compatibility_analysis: "How to check compatibility"
migration_planning: "How to create migration plans"
risk_assessment: "How to assess risks"
# Environment configurations
environments:
production:
tone_adjustment: "Production-specific tone"
risk_tolerance: "low"
additional_context: "Production considerations"
development:
tone_adjustment: "Dev-specific tone"
risk_tolerance: "high"
additional_context: "Development considerations"
# Custom templates
templates:
error_analysis: |
Analyze error: ${error_message}
Context: ${error_context}
capacity_planning: |
Current: ${current_throughput}
Growth: ${growth_rate}
# Settings
settings:
max_tokens: 2000
temperature: 0.7
use_chain_of_thought: true
require_confirmation_for:
- production_changes
- delete_operations- schema_analysis - Schema evaluation and recommendations
- schema_evolution - Evolution planning and migration
- breaking_changes - Detection and fixes
- documentation - Auto-documentation generation
- ecosystem_analysis - Complete health assessment
- pipeline_analysis - Data flow validation
- nlp_query - Natural language processing
- compatibility_fix - Auto-fix strategies
- consumer_impact - Impact assessment
Use template variables in prompts:
operations:
schema_analysis:
base: |
Analyze schema: ${schema_name}
Version: ${version}
Environment: ${environment}
Consumers: ${downstream_consumers}# Programmatically customize for your organization
ui = PromptConfigurationUI(agent)
# Update AI role
await ui.update_global_role(
"You are a Kafka expert for Healthcare Corp, "
"specializing in HIPAA compliance and patient data..."
)
# Set production tone
await ui.set_environment_tone(
"production",
"Be extremely cautious with patient data. "
"Always consider HIPAA requirements."
)
# Add custom template
await ui.add_custom_template(
"hipaa_check",
"Verify HIPAA compliance for: ${data_fields}"
)global:
constraints: |
- Always check GDPR compliance for EU data
- Verify PCI DSS for payment fields
- Ensure SOC2 audit requirementsoperations:
schema_analysis:
evaluation_criteria: |
- Latency impact (target: <1ms)
- Message size optimization
- Compression efficiency
- Serialization performanceenvironments:
team_analytics:
tone_adjustment: "Focus on data science requirements"
team_platform:
tone_adjustment: "Focus on infrastructure and reliability"
team_product:
tone_adjustment: "Focus on feature delivery speed"- Comprehensive Analysis: Full ecosystem health with custom scoring
- Pipeline Validation: End-to-end data flow verification
- Topic-Schema Alignment: Automated correlation and validation
- Consumer Intelligence: Impact analysis with custom criteria
- Evolution Planning: Customizable migration strategies
- Compatibility Checking: Configurable rule sets
- Auto-Documentation: Template-based generation
- Breaking Change Detection: Custom detection rules
- Real-time Monitoring: WebSocket updates with custom alerts
- Health Scoring: Configurable scoring algorithms
- Risk Assessment: Environment-specific thresholds
- Alert Routing: Custom notification channels
# Test prompts before deployment
ui = PromptConfigurationUI(agent)
prompt = await ui.test_prompt(
PromptOperation.SCHEMA_EVOLUTION,
"Test input for evolution",
environment="production"
)
print(f"Generated prompt: {prompt}")# Reload prompts without restart
await agent.reload_prompts()from prompt_manager import PromptBuilder
builder = PromptBuilder(agent.prompt_manager)
prompt = (builder
.with_operation(PromptOperation.SCHEMA_EVOLUTION)
.with_environment("production")
.with_variables(
subject="critical-events",
consumer_count=50,
business_criticality="CRITICAL"
)
.add_section("Special considerations...")
.build("Evaluate schema change"))- No Code Changes: Modify AI behavior through configuration
- Environment Awareness: Different behavior for dev/staging/prod
- Organization Alignment: Match your company's terminology and policies
- Compliance Ready: Built-in support for regulatory requirements
- Team Customization: Different prompts for different teams
- Rapid Iteration: Test and refine prompts without deployment
- Consistency: Standardized AI responses across the organization
- Run the Demo:
python examples/prompt_customization_demo.py - Customize prompts.yaml: Add your organization's requirements
- Test Your Prompts: Use the testing utilities
- Deploy: Start using customized AI behavior
Contributions welcome! See CONTRIBUTING.md.
MIT License - see LICENSE file.
- Kafka Schema Registry MCP Server
- Kafka Brokers MCP Server
- Model Context Protocol (MCP) by Anthropic
- LLM providers: Anthropic, OpenAI, Google
β Star this repo if you find it helpful!
π₯ The most flexible and customizable Kafka AI Agent available!