Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions devops_agent/core/master_agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import os
from pathlib import Path
from typing import Any

from agno.knowledge import Knowledge
from agno.models.openai import OpenAIChat
Expand All @@ -18,10 +17,9 @@
from devops_agent.core.devops_agent import execute_devops_agent
from devops_agent.core.kubernetes_agent import execute_k8s_agent
from devops_agent.core.terraform_agent import execute_terraform_agent

from rich.console import Console

from dotenv import load_dotenv, find_dotenv
from devops_agent.utils.stream_handler import StreamingResponseHandler

load_dotenv(find_dotenv())

Expand Down Expand Up @@ -81,14 +79,26 @@ def execute_master_agent(provider: str, user_query: str = None, log_file: Path =
enable_agentic_memory=True,
markdown=True
)
response = devops_team.run(user_query, stream_intermediate_steps=True, retry=3)
# response = devops_team.run(user_query, stream_intermediate_steps=True, retry=3)

handler = StreamingResponseHandler(
console=console,
show_message=True,
show_reasoning=True,
show_tool_calls=True,
show_member_responses=True,
markdown=True
)

# Assuming you have a team object
handler.handle_stream(devops_team, input=user_query)

response = handler.response_content

# saved the response to knowledge in async mode
asyncio.run(
knowledge.add_content_async(text_content=f"question: {user_query}, Assistant: {response.content}",
skip_if_exists=False,
metadata={"agent_id": response.team_id, "session_id": response.session_id,
"run_id": response.run_id})
)
knowledge.add_content_async(text_content=f"question: {user_query}, Assistant: {response}",
skip_if_exists=False)

return response.content
)
return response
318 changes: 318 additions & 0 deletions devops_agent/utils/stream_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
import textwrap
from typing import Any, Dict, List, Optional, Set, Union
from rich.console import Console, Group
from rich.live import Live
from rich.panel import Panel
from rich.status import Status
from rich.text import Text
from rich.markdown import Markdown
import time


class StreamingResponseHandler:
"""Handler for streaming team responses with rich console output"""

def __init__(
self,
console: Optional[Console] = None,
show_message: bool = True,
show_reasoning: bool = True,
show_tool_calls: bool = True,
show_member_responses: bool = True,
markdown: bool = False,
):
self.console = console or Console()
self.show_message = show_message
self.show_reasoning = show_reasoning
self.show_tool_calls = show_tool_calls
self.show_member_responses = show_member_responses
self.markdown = markdown

# Content trackers
self.response_content = ""
self.reasoning_content = ""
self.input_content = ""

# Tool call trackers
self.team_tool_calls = []
self.member_tool_calls = {}
self.processed_tool_calls = set()

# Member response trackers
self.member_responses = {}

# Timing
self.start_time = time.time()

def _get_elapsed_time(self) -> float:
"""Get elapsed time since start"""
return time.time() - self.start_time

def _create_panel(
self,
content: Union[str, Text, Markdown],
title: str,
border_style: str = "blue"
) -> Panel:
"""Create a styled panel"""
return Panel(
content,
title=title,
border_style=border_style,
padding=(1, 2),
)

def _format_tool_call(self, tool: Any) -> str:
"""Format a tool call for display"""
if tool is None:
return "Unknown Tool"

tool_name = getattr(tool, 'tool_name', None) or getattr(tool, 'name', str(tool))
tool_args = getattr(tool, 'tool_args', None) or getattr(tool, 'arguments', {})

if tool_args and isinstance(tool_args, dict):
try:
args_str = ", ".join(f"{k}={v}" for k, v in tool_args.items())
return f"{tool_name}({args_str})"
except Exception:
return f"{tool_name}()"
return f"{tool_name}()"

def _add_tool_call(self, tool: Any, member_id: Optional[str] = None):
"""Add a tool call, avoiding duplicates"""
if tool is None:
return

# Generate unique ID for this tool call
tool_id = getattr(tool, 'tool_call_id', None) or str(hash(str(tool)))

if tool_id not in self.processed_tool_calls:
self.processed_tool_calls.add(tool_id)

if member_id:
if member_id not in self.member_tool_calls:
self.member_tool_calls[member_id] = []
self.member_tool_calls[member_id].append(tool)
else:
self.team_tool_calls.append(tool)

def _build_panels(self) -> List[Panel]:
"""Build all panels for current state"""
panels = []
elapsed = self._get_elapsed_time()

# Message panel
if self.input_content and self.show_message:
message_panel = self._create_panel(
Text(self.input_content, style="green"),
"Message",
border_style="cyan"
)
panels.append(message_panel)

# Reasoning panel
if self.reasoning_content and self.show_reasoning:
thinking_panel = self._create_panel(
Text(self.reasoning_content),
f"Thinking ({elapsed:.1f}s)",
border_style="green"
)
panels.append(thinking_panel)

# Member tool calls and responses
for member_id in sorted(self.member_tool_calls.keys()):
# Member tool calls panel
if self.show_tool_calls and self.member_tool_calls[member_id]:
tool_calls_text = self._format_tool_calls_list(
self.member_tool_calls[member_id]
)
member_name = member_id # You can map this to actual names

tool_panel = self._create_panel(
tool_calls_text,
f"{member_name} Tool Calls",
border_style="yellow"
)
panels.append(tool_panel)

# Member response panel
if self.show_member_responses and member_id in self.member_responses:
response_content = self.member_responses[member_id]
if self.markdown:
response_content = Markdown(response_content)
else:
response_content = Text(response_content)

member_panel = self._create_panel(
response_content,
f"{member_id} Response",
border_style="magenta"
)
panels.append(member_panel)

# Team tool calls panel
if self.show_tool_calls and self.team_tool_calls:
tool_calls_text = self._format_tool_calls_list(self.team_tool_calls)
team_tool_panel = self._create_panel(
tool_calls_text,
"Team Tool Calls",
border_style="yellow"
)
panels.append(team_tool_panel)

# Team response panel
if self.response_content:
response_content = self.response_content
if self.markdown:
response_content = Markdown(response_content)
else:
response_content = Text(response_content)

response_panel = self._create_panel(
response_content,
f"Response ({elapsed:.1f}s)",
border_style="blue"
)
panels.append(response_panel)

return panels

def _format_tool_calls_list(self, tool_calls: List[Any]) -> str:
"""Format a list of tool calls with wrapping"""
console_width = self.console.width
panel_width = console_width - 10 # Account for panel borders

lines = []
for tool in tool_calls:
formatted = self._format_tool_call(tool)
wrapped = textwrap.fill(
f"• {formatted}",
width=panel_width,
subsequent_indent=" "
)
lines.append(wrapped)

return "\n\n".join(lines)

def handle_stream(
self,
team: Any,
input: str,
**kwargs
):
"""Handle streaming response from team"""
self.input_content = input
self.start_time = time.time()

with Live(console=self.console, refresh_per_second=10) as live:
# Initial status
status = Status(
"Thinking...",
spinner="aesthetic",
speed=0.4
)
live.update(status)

# Get streaming response
response_stream = team.run(
input=input,
stream=True,
stream_intermediate_steps=True,
**kwargs
)

# Process events
for event in response_stream:
try:
event_type = getattr(event, 'event', None)

# Handle different event types
if event_type == "TeamRunContent":
# Main response content
content = getattr(event, 'content', '')
if isinstance(content, str):
self.response_content += content

elif event_type == "run_content":
# Alternative content event
content = getattr(event, 'content', '')
if isinstance(content, str):
self.response_content += content

elif event_type == "TeamReasoningStep":
# Reasoning content
reasoning = getattr(event, 'content', '')
if reasoning:
self.reasoning_content += reasoning

elif event_type == "reasoning_content":
# Alternative reasoning event
reasoning = getattr(event, 'reasoning_content', '')
if reasoning:
self.reasoning_content += reasoning

elif event_type == "TeamToolCallStarted":
# Team tool call started
tool = getattr(event, 'tool', None)
if tool:
self._add_tool_call(tool)

elif event_type == "tool_call_completed":
# Team tool call completed
tool = getattr(event, 'tool', None)
if tool:
self._add_tool_call(tool)

elif event_type == "ToolCallStarted":
# Member tool call started
tool = getattr(event, 'tool', None)
member_id = getattr(event, 'member_id', 'unknown')
if tool:
self._add_tool_call(tool, member_id)

elif event_type == "ToolCallCompleted":
# Member tool call completed
tool = getattr(event, 'tool', None)
member_id = getattr(event, 'member_id', 'unknown')
if tool:
self._add_tool_call(tool, member_id)

# Handle member responses
if hasattr(event, 'member_responses') and event.member_responses:
for member_response in event.member_responses:
member_id = getattr(
member_response,
'agent_id',
getattr(member_response, 'team_id', None)
)

if member_id:
# Extract member content
content = getattr(member_response, 'content', '')
if content and member_id not in self.member_responses:
self.member_responses[member_id] = content
elif content:
self.member_responses[member_id] += content

# Extract member tools
tools = getattr(member_response, 'tools', None)
if tools is not None:
for tool in tools:
self._add_tool_call(tool, member_id)

# Update display
panels = self._build_panels()
if panels:
live.update(Group(*panels))
else:
live.update(status)

except Exception as e:
# Log error but continue processing
self.console.print(f"[red]Error processing event: {e}[/red]")
continue

# Final update without status
panels = self._build_panels()
if panels:
live.update(Group(*panels))