Tuesday, July 8, 2025
HomeArtificial IntelligenceA Code Implementation for Designing Clever Multi-Agent Workflows with the BeeAI Framework

A Code Implementation for Designing Clever Multi-Agent Workflows with the BeeAI Framework

BeeAI FrameworkOn this tutorial, we discover the ability and adaptability of the beeai-framework by constructing a completely practical multi-agent system from the bottom up. We stroll by the important parts, customized brokers, instruments, reminiscence administration, and occasion monitoring, to point out how BeeAI simplifies the event of clever, cooperative brokers. Alongside the best way, we reveal how these brokers can carry out complicated duties, equivalent to market analysis, code evaluation, and strategic planning, utilizing a modular, production-ready sample.

import subprocess
import sys
import asyncio
import json
from typing import Dict, Listing, Any, Non-compulsory
from datetime import datetime
import os


def install_packages():
    packages = [
        "beeai-framework",
        "requests",
        "beautifulsoup4",
        "numpy",
        "pandas",
        "pydantic"
    ]
   
    print("Putting in required packages...")
    for package deal in packages:
        strive:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"✅ {package deal} put in efficiently")
        besides subprocess.CalledProcessError as e:
            print(f"❌ Failed to put in {package deal}: {e}")
    print("Set up full!")


install_packages()


strive:
    from beeai_framework import ChatModel
    from beeai_framework.brokers import Agent
    from beeai_framework.instruments import Instrument
    from beeai_framework.workflows import Workflow
    BEEAI_AVAILABLE = True
    print("✅ BeeAI Framework imported efficiently")
besides ImportError as e:
    print(f"⚠️ BeeAI Framework import failed: {e}")
    print("Falling again to customized implementation...")
    BEEAI_AVAILABLE = False

We start by putting in all of the required packages, together with the beeai-framework, to make sure our surroundings is prepared for multi-agent improvement. As soon as put in, we try to import BeeAI’s core modules. If the import fails, we gracefully fall again to a customized implementation to take care of workflow performance.

class MockChatModel:
    """Mock LLM for demonstration functions"""
    def __init__(self, model_name: str = "mock-llm"):
        self.model_name = model_name
   
    async def generate(self, messages: Listing[Dict[str, str]]) -> str:
        """Generate a mock response"""
        last_message = messages[-1]['content'] if messages else ""
       
        if "market" in last_message.decrease():
            return "Market evaluation reveals sturdy progress in AI frameworks with 42% YoY improve. Key opponents embody LangChain, CrewAI, and AutoGen."
        elif "code" in last_message.decrease():
            return "Code evaluation reveals good construction with async patterns. Think about including extra error dealing with and documentation."
        elif "technique" in last_message.decrease():
            return "Strategic suggestion: Deal with ease of use, sturdy documentation, and enterprise options to compete successfully."
        else:
            return f"Analyzed: {last_message[:100]}... Suggestion: Implement finest practices for scalability and maintainability."


class CustomTool:
    """Base class for customized instruments"""
    def __init__(self, title: str, description: str):
        self.title = title
        self.description = description
   
    async def run(self, input_data: str) -> str:
        """Override this methodology in subclasses"""
        elevate NotImplementedError

We outline a MockChatModel to simulate LLM habits when BeeAI is unavailable, permitting us to check and prototype workflows with out counting on exterior APIs. Alongside it, we create a CustomTool base class, which serves as a blueprint for task-specific instruments that our brokers can use, laying the muse for modular, tool-augmented agent capabilities.

class MarketResearchTool(CustomTool):
    """Customized software for market analysis and competitor evaluation"""
   
    def __init__(self):
        tremendous().__init__(
            title="market_research",
            description="Analyzes market developments and competitor data"
        )
        self.market_data = {
            "AI_frameworks": {
                "opponents": ["LangChain", "CrewAI", "AutoGen", "Haystack", "Semantic Kernel"],
                "market_size": "$2.8B",
                "growth_rate": "42% YoY",
                "key_trends": ["Multi-agent systems", "Production deployment", "Tool integration", "Enterprise adoption"]
            },
            "enterprise_adoption": {
                "fee": "78%",
                "top_use_cases": ["Customer support", "Data analysis", "Code generation", "Document processing"],
                "challenges": ["Reliability", "Cost control", "Integration complexity", "Governance"]
            }
        }
   
    async def run(self, question: str) -> str:
        """Simulate market analysis primarily based on question"""
        query_lower = question.decrease()
       
        if "competitor" in query_lower or "competitors" in query_lower:
            knowledge = self.market_data["AI_frameworks"]
            return f"""Market Evaluation Outcomes:
           
Key Opponents: {', '.be a part of(knowledge['competitors'])}
Market Dimension: {knowledge['market_size']}
Development Charge: {knowledge['growth_rate']}
Key Developments: {', '.be a part of(knowledge['key_trends'])}


Suggestion: Deal with differentiating options like simplified deployment, higher debugging instruments, and enterprise-grade safety."""
       
        elif "adoption" in query_lower or "enterprise" in query_lower:
            knowledge = self.market_data["enterprise_adoption"]
            return f"""Enterprise Adoption Evaluation:
           
Adoption Charge: {knowledge['rate']}
High Use Circumstances: {', '.be a part of(knowledge['top_use_cases'])}
Major Challenges: {', '.be a part of(knowledge['challenges'])}


Suggestion: Tackle reliability and price management issues by higher monitoring and useful resource administration options."""
       
        else:
            return "Market analysis obtainable for: competitor evaluation, enterprise adoption, or particular development evaluation. Please specify your focus space."

We implement the MarketResearchTool as a specialised extension of our CustomTool base class. This software simulates real-world market intelligence by returning pre-defined insights on AI framework developments, key opponents, adoption charges, and trade challenges. With this, we equip our brokers to make knowledgeable, data-driven suggestions throughout workflow execution.

class CodeAnalysisTool(CustomTool):
    """Customized software for analyzing code patterns and suggesting enhancements"""
   
    def __init__(self):
        tremendous().__init__(
            title="code_analysis",
            description="Analyzes code construction and suggests enhancements"
        )
   
    async def run(self, code_snippet: str) -> str:
        """Analyze code and supply insights"""
        evaluation = {
            "strains": len(code_snippet.break up('n')),
            "complexity": "Excessive" if len(code_snippet) > 500 else "Medium" if len(code_snippet) > 200 else "Low",
            "async_usage": "Sure" if "async" in code_snippet or "await" in code_snippet else "No",
            "error_handling": "Current" if "strive:" in code_snippet or "besides:" in code_snippet else "Lacking",
            "documentation": "Good" if '"""' in code_snippet or "'''" in code_snippet else "Wants enchancment",
            "imports": "Current" if "import " in code_snippet else "None detected",
            "courses": len([line for line in code_snippet.split('n') if line.strip().startswith('class ')]),
            "capabilities": len([line for line in code_snippet.split('n') if line.strip().startswith('def ') or line.strip().startswith('async def ')])
        }
       
        options = []
        if evaluation["error_handling"] == "Lacking":
            options.append("Add try-except blocks for error dealing with")
        if evaluation["documentation"] == "Wants enchancment":
            options.append("Add docstrings and feedback")
        if "print(" in code_snippet:
            options.append("Think about using correct logging as an alternative of print statements")
        if evaluation["async_usage"] == "Sure" and "await" not in code_snippet:
            options.append("Guarantee correct await utilization with async capabilities")
        if evaluation["complexity"] == "Excessive":
            options.append("Think about breaking down into smaller capabilities")
       
        return f"""Code Evaluation Report:
       
Construction:
- Strains of code: {evaluation['lines']}
- Complexity: {evaluation['complexity']}
- Lessons: {evaluation['classes']}
- Features: {evaluation['functions']}


High quality Metrics:
- Async utilization: {evaluation['async_usage']}
- Error dealing with: {evaluation['error_handling']}
- Documentation: {evaluation['documentation']}


Solutions:
{chr(10).be a part of(f"• {suggestion}" for suggestion in options) if options else "• Code seems to be good! Following finest practices."}


Total Rating: {10 - len(options) * 2}/10"""


class CustomAgent:
    """Customized agent implementation"""
   
    def __init__(self, title: str, function: str, directions: str, instruments: Listing[CustomTool], llm=None):
        self.title = title
        self.function = function
        self.directions = directions
        self.instruments = instruments
        self.llm = llm or MockChatModel()
        self.reminiscence = []
   
    async def run(self, job: str) -> Dict[str, Any]:
        """Execute agent job"""
        print(f"🤖 {self.title} ({self.function}) processing job...")
       
        self.reminiscence.append({"kind": "job", "content material": job, "timestamp": datetime.now()})
       
        task_lower = job.decrease()
        tool_used = None
        tool_result = None
       
        for software in self.instruments:
            if software.title == "market_research" and ("market" in task_lower or "competitor" in task_lower):
                tool_result = await software.run(job)
                tool_used = software.title
                break
            elif software.title == "code_analysis" and ("code" in task_lower or "analyze" in task_lower):
                tool_result = await software.run(job)
                tool_used = software.title
                break
       
        messages = [
            {"role": "system", "content": f"You are {self.role}. {self.instructions}"},
            {"role": "user", "content": task}
        ]
       
        if tool_result:
            messages.append({"function": "system", "content material": f"Instrument {tool_used} supplied: {tool_result}"})
       
        response = await self.llm.generate(messages)
       
        self.reminiscence.append({"kind": "response", "content material": response, "timestamp": datetime.now()})
       
        return {
            "agent": self.title,
            "job": job,
            "tool_used": tool_used,
            "tool_result": tool_result,
            "response": response,
            "success": True
        }

We now implement the CodeAnalysisTool, which allows our brokers to evaluate code snippets primarily based on construction, complexity, documentation, and error dealing with. This software generates insightful options to enhance code high quality. We additionally outline the CustomAgent class, equipping every agent with its personal function, directions, reminiscence, instruments, and entry to an LLM. This design permits every agent to resolve whether or not a software is required intelligently after which synthesize responses utilizing each evaluation and LLM reasoning, making certain adaptable and context-aware habits.

class WorkflowMonitor:
    """Monitor and log workflow occasions"""
   
    def __init__(self):
        self.occasions = []
        self.start_time = datetime.now()
   
    def log_event(self, event_type: str, knowledge: Dict[str, Any]):
        """Log workflow occasions"""
        timestamp = datetime.now()
        self.occasions.append({
            "timestamp": timestamp,
            "length": (timestamp - self.start_time).total_seconds(),
            "event_type": event_type,
            "knowledge": knowledge
        })
        print(f"[{timestamp.strftime('%H:%M:%S')}] {event_type}: {knowledge.get('agent', 'System')}")
   
    def get_summary(self):
        """Get monitoring abstract"""
        return {
            "total_events": len(self.occasions),
            "total_duration": (datetime.now() - self.start_time).total_seconds(),
            "event_types": checklist(set([e["event_type"] for e in self.occasions])),
            "occasions": self.occasions
        }


class CustomWorkflow:
    """Customized workflow implementation"""
   
    def __init__(self, title: str, description: str):
        self.title = title
        self.description = description
        self.brokers = []
        self.monitor = WorkflowMonitor()
   
    def add_agent(self, agent: CustomAgent):
        """Add agent to workflow"""
        self.brokers.append(agent)
        self.monitor.log_event("agent_added", {"agent": agent.title, "function": agent.function})
   
    async def run(self, duties: Listing[str]) -> Dict[str, Any]:
        """Execute workflow with duties"""
        self.monitor.log_event("workflow_started", {"duties": len(duties)})
       
        outcomes = []
        context = {"shared_insights": []}
       
        for i, job in enumerate(duties):
            agent = self.brokers[i % len(self.agents)]
           
            if context["shared_insights"]:
                enhanced_task = f"{job}nnContext from earlier evaluation:n" + "n".be a part of(context["shared_insights"][-2:])
            else:
                enhanced_task = job
           
            outcome = await agent.run(enhanced_task)
            outcomes.append(outcome)
           
            context["shared_insights"].append(f"{agent.title}: {outcome['response'][:200]}...")
           
            self.monitor.log_event("task_completed", {
                "agent": agent.title,
                "task_index": i,
                "success": outcome["success"]
            })
       
        self.monitor.log_event("workflow_completed", {"total_tasks": len(duties)})
       
        return {
            "workflow": self.title,
            "outcomes": outcomes,
            "context": context,
            "abstract": self._generate_summary(outcomes)
        }
   
    def _generate_summary(self, outcomes: Listing[Dict[str, Any]]) -> str:
        """Generate workflow abstract"""
        summary_parts = []
       
        for end in outcomes:
            summary_parts.append(f"• {outcome['agent']}: {outcome['response'][:150]}...")
       
        return f"""Workflow Abstract for {self.title}:


{chr(10).be a part of(summary_parts)}


Key Insights:
• Market alternatives recognized in AI framework house
• Technical structure suggestions supplied
• Strategic implementation plan outlined
• Multi-agent collaboration demonstrated efficiently"""

We implement the WorkflowMonitor to log and monitor occasions all through the execution, giving us real-time visibility into the actions taken by every agent. With the CustomWorkflow class, we orchestrate all the multi-agent course of, assigning duties, preserving shared context throughout brokers, and capturing all related insights. This construction ensures that we not solely execute duties in a coordinated and clear means but in addition generate a complete abstract that highlights collaboration and key outcomes.

async def advanced_workflow_demo():
    """Reveal superior multi-agent workflow"""
   
    print("🚀 Superior Multi-Agent Workflow Demo")
    print("=" * 50)
   
    workflow = CustomWorkflow(
        title="Superior Enterprise Intelligence System",
        description="Multi-agent system for complete enterprise evaluation"
    )
   
    market_agent = CustomAgent(
        title="MarketAnalyst",
        function="Senior Market Analysis Analyst",
        directions="Analyze market developments, competitor panorama, and enterprise alternatives. Present data-driven insights with actionable suggestions.",
        instruments=[MarketResearchTool()],
        llm=MockChatModel()
    )
   
    tech_agent = CustomAgent(
        title="TechArchitect",
        function="Technical Structure Specialist",
        directions="Consider technical options, code high quality, and architectural selections. Deal with scalability, maintainability, and finest practices.",
        instruments=[CodeAnalysisTool()],
        llm=MockChatModel()
    )
   
    strategy_agent = CustomAgent(
        title="StrategicPlanner",
        function="Strategic Enterprise Planner",
        directions="Synthesize market and technical insights into complete strategic suggestions. Deal with ROI, danger evaluation, and implementation roadmaps.",
        instruments=[],
        llm=MockChatModel()
    )
   
    workflow.add_agent(market_agent)
    workflow.add_agent(tech_agent)
    workflow.add_agent(strategy_agent)
   
    duties = [
        "Analyze the current AI framework market landscape and identify key opportunities for a new multi-agent framework targeting enterprise users.",
        """Analyze this code architecture pattern and provide technical assessment:


async def multi_agent_workflow():
    agents = [ResearchAgent(), AnalysisAgent(), SynthesisAgent()]
    context = SharedContext()
   
    for agent in brokers:
        strive:
            outcome = await agent.run(context.get_task())
            if outcome.success:
                context.add_insight(outcome.knowledge)
            else:
                context.add_error(outcome.error)
        besides Exception as e:
            logger.error(f"Agent {agent.title} failed: {e}")
           
    return context.synthesize_recommendations()""",
        "Primarily based in the marketplace evaluation and technical evaluation, create a complete strategic plan for launching a aggressive AI framework with concentrate on multi-agent capabilities and enterprise adoption."
    ]
   
    print("n🔄 Executing Superior Workflow...")
    outcome = await workflow.run(duties)
   
    print("n✅ Workflow Accomplished Efficiently!")
    print("=" * 50)
    print("📊 COMPREHENSIVE ANALYSIS RESULTS")
    print("=" * 50)
    print(outcome["summary"])
   
    print("n📈 WORKFLOW MONITORING SUMMARY")
    print("=" * 30)
    abstract = workflow.monitor.get_summary()
    print(f"Whole Occasions: {abstract['total_events']}")
    print(f"Whole Period: {abstract['total_duration']:.2f} seconds")
    print(f"Occasion Sorts: {', '.be a part of(abstract['event_types'])}")
   
    return workflow, outcome


async def simple_tool_demo():
    """Reveal particular person software performance"""
   
    print("n🛠️ Particular person Instrument Demo")
    print("=" * 30)
   
    market_tool = MarketResearchTool()
    code_tool = CodeAnalysisTool()
   
    print("Out there Instruments:")
    print(f"• {market_tool.title}: {market_tool.description}")
    print(f"• {code_tool.title}: {code_tool.description}")
   
    print("n🔍 Market Analysis Evaluation:")
    market_result = await market_tool.run("competitor evaluation in AI frameworks")
    print(market_result)
   
    print("n🔍 Code Evaluation:")
    sample_code=""'
import asyncio
from typing import Listing, Dict


class AgentManager:
    """Manages a number of AI brokers"""
   
    def __init__(self):
        self.brokers = []
        self.outcomes = []
   
    async def add_agent(self, agent):
        """Add agent to supervisor"""
        self.brokers.append(agent)
   
    async def run_all(self, job: str) -> Listing[Dict]:
        """Run job on all brokers"""
        outcomes = []
        for agent in self.brokers:
            strive:
                outcome = await agent.execute(job)
                outcomes.append(outcome)
            besides Exception as e:
                print(f"Agent failed: {e}")
                outcomes.append({"error": str(e)})
        return outcomes
'''
   
    code_result = await code_tool.run(sample_code)
    print(code_result)

We reveal two highly effective workflows. First, within the particular person software demo, we immediately check the capabilities of our MarketResearchTool and CodeAnalysisTool, making certain they generate related insights independently. Then, we convey the whole lot collectively within the superior workflow demo, the place we deploy three specialised brokers, MarketAnalyst, TechArchitect, and StrategicPlanner, to sort out enterprise evaluation duties collaboratively.

async def principal():
    """Major demo perform"""
   
    print("🐝 Superior BeeAI Framework Tutorial")
    print("=" * 40)
    print("This tutorial demonstrates:")
    print("• Multi-agent workflows")
    print("• Customized software improvement")
    print("• Reminiscence administration")
    print("• Occasion monitoring")
    print("• Manufacturing-ready patterns")
   
    if BEEAI_AVAILABLE:
        print("• Utilizing actual BeeAI Framework")
    else:
        print("• Utilizing customized implementation (BeeAI not obtainable)")
   
    print("=" * 40)
   
    await simple_tool_demo()
   
    print("n" + "="*50)
    await advanced_workflow_demo()
   
    print("n🎉 Tutorial Full!")
    print("nNext Steps:")
    print("1. Set up BeeAI Framework correctly: pip set up beeai-framework")
    print("2. Configure your most popular LLM (OpenAI, Anthropic, native fashions)")
    print("3. Discover the official BeeAI documentation")
    print("4. Construct customized brokers to your particular use case")
    print("5. Deploy to manufacturing with correct monitoring")


if __name__ == "__main__":
    strive:
        import nest_asyncio
        nest_asyncio.apply()
        print("✅ Utilized nest_asyncio for Colab compatibility")
    besides ImportError:
        print("⚠️ nest_asyncio not obtainable - could not work in some environments")
   
    asyncio.run(principal())

We wrap up our tutorial with the principle() perform, which ties collectively the whole lot we’ve constructed, demonstrating each tool-level capabilities and a full multi-agent enterprise intelligence workflow. Whether or not we’re operating BeeAI natively or utilizing a fallback setup, we guarantee compatibility with environments like Google Colab utilizing nest_asyncio. With this construction in place, we’re able to scale our agent techniques, discover deeper use circumstances, and confidently deploy production-ready AI workflows.

In conclusion, we’ve constructed and executed a strong multi-agent workflow utilizing the BeeAI framework (or a customized equal), showcasing its potential in real-world enterprise intelligence functions. We’ve seen how simple it’s to create brokers with particular roles, connect instruments for job augmentation, and monitor execution in a clear means.


Take a look at the Codes. All credit score for this analysis goes to the researchers of this mission. Additionally, be at liberty to observe us on Twitter, Youtube and Spotify and don’t overlook to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments