Skip to main content

Event-Driven Agent Architecture

[[TOC]]

Component Diagram

┌─────────────────────────────────────────────────────────┐
│ Grafana Alert Fired │
└────────────────────┬────────────────────────────────────┘
│ HTTP POST (webhook)

┌─────────────────────────────────────────────────────────┐
│ Kafka Bridge (FastAPI) │
│ │
│ • Receive webhook │
│ • Parse alert payload │
│ • Route to topic by severity │
│ • Return 202 Accepted │
└────────────────────┬────────────────────────────────────┘
│ Produce to Redpanda

┌─────────────────────────────────────────────────────────┐
│ Redpanda Topics │
│ │
│ • sre-critical (tier 0 + tier 1 critical) │
│ • sre-warning (tier 1 warnings) │
│ • sre-info (tier 0 info) │
│ • agent-thoughts (SSE stream for UI) │
│ • agent-actions (audit log) │
└────────────────────┬────────────────────────────────────┘
│ Consumer poll

┌─────────────────────────────────────────────────────────┐
│ SRE Agent (LangGraph + Python) │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ State Machine Flow │ │
│ │ │ │
│ │ 1. consume_event │ │
│ │ 2. create_gitlab_issue │ │
│ │ 3. enrich_context (Loki + Prometheus) │ │
│ │ 4. analyze_root_cause (LLM) │ │
│ │ 5. match_runbook │ │
│ │ 6. execute_remediation │ │
│ │ 7. verify_success │ │
│ │ 8. close_or_escalate │ │
│ └──────────────────────────────────────────────┘ │
│ │
│ MCP Clients: │
│ • Grafana (query Loki/Prometheus) │
│ • GitLab (issue management) │
│ • Docker (container control) │
└─────────────────────────────────────────────────────────┘

Kafka Bridge Implementation

File: kafka-bridge/main.py

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from kafka import KafkaProducer
from typing import List, Dict, Any
import json
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Grafana → Kafka Bridge")

# Kafka producer (lazy init)
producer = None

def get_producer():
global producer
if producer is None:
producer = KafkaProducer(
bootstrap_servers=['redpanda:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
max_in_flight_requests_per_connection=1
)
return producer

class GrafanaAlert(BaseModel):
receiver: str
status: str
alerts: List[Dict[str, Any]]
groupLabels: Dict[str, str]
commonLabels: Dict[str, str]
commonAnnotations: Dict[str, str]
externalURL: str
version: str = "4"
groupKey: str
truncatedAlerts: int = 0

@app.post("/alerts")
async def ingest_alerts(payload: GrafanaAlert, bg: BackgroundTasks):
logger.info(f"Received {len(payload.alerts)} alert(s) from Grafana")

events_produced = 0

for alert in payload.alerts:
if alert.get("status") == "resolved":
continue

labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
tier = labels.get("tier", "unknown")
severity = labels.get("severity", "info")

if tier == "human_only":
continue

event = {
"source": "grafana",
"fingerprint": alert.get("fingerprint"),
"status": alert.get("status"),
"labels": labels,
"annotations": annotations,
"startsAt": alert.get("startsAt"),
"endsAt": alert.get("endsAt"),
"generatorURL": alert.get("generatorURL"),
"received_at": datetime.utcnow().isoformat(),
"agent_action": labels.get("agent_action", "investigate"),
"runbook_id": labels.get("runbook_id"),
"escalate_threshold": float(labels.get("escalate_threshold", "0.7")),
}

topic = route_to_topic(tier, severity)
bg.add_task(produce_event, topic, event)
events_produced += 1

return {"status": "accepted", "queued": events_produced}

def route_to_topic(tier: str, severity: str) -> str:
if tier == "autonomous" and severity == "critical":
return "sre-critical"
elif tier == "supervised" and severity == "critical":
return "sre-critical"
elif severity == "warning":
return "sre-warning"
else:
return "sre-info"

def produce_event(topic: str, event: dict):
try:
future = get_producer().send(topic, value=event)
record_metadata = future.get(timeout=10)
logger.info(f"Produced to {topic} (partition={record_metadata.partition})")
except Exception as e:
logger.error(f"Failed to produce to {topic}: {e}")

@app.get("/health")
async def health_check():
try:
get_producer().bootstrap_connected()
return {"status": "healthy", "kafka": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Kafka unavailable: {e}")

SRE Agent State Machine

File: sre-agent/agent.py

from typing import TypedDict, Optional, List
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from datetime import datetime
import logging
import json

logger = logging.getLogger(__name__)

class SREState(TypedDict):
# Event metadata
event: dict
fingerprint: str
alert_name: str
service: str
severity: str

# Context enrichment
logs: Optional[str]
metrics: Optional[dict]
recent_events: Optional[List[dict]]

# Analysis
root_cause: Optional[str]
confidence: Optional[float]
recommended_action: Optional[str]

# Runbook execution
runbook_id: Optional[str]
runbook_steps: Optional[List[dict]]
actions_taken: List[str]

# GitLab tracking
gitlab_issue_iid: Optional[int]

# Routing
escalate: bool
escalation_reason: Optional[str]
status: str # investigating | remediating | resolved | escalated

llm = ChatAnthropic(model="claude-sonnet-4-20250514", temperature=0)

def consume_event(state: SREState) -> SREState:
event = state["event"]
return {
**state,
"fingerprint": event["fingerprint"],
"alert_name": event["labels"]["alertname"],
"service": event["labels"].get("container", event["labels"].get("service")),
"severity": event["labels"]["severity"],
"actions_taken": [],
"escalate": False,
"status": "investigating"
}

def create_gitlab_issue(state: SREState) -> SREState:
from mcp_clients.gitlab import GitLabClient
gitlab = GitLabClient()
issue_iid = gitlab.create_incident(
alert=state["event"],
initial_analysis={"pattern": "Investigating...", "service": state["service"]}
)
logger.info(f"Created GitLab issue #{issue_iid}")
return {**state, "gitlab_issue_iid": issue_iid}

def enrich_context(state: SREState) -> SREState:
from mcp_clients.grafana import GrafanaClient
from datetime import timedelta

grafana = GrafanaClient()
alert_time = datetime.fromisoformat(state["event"]["startsAt"].replace("Z", "+00:00"))
start = alert_time - timedelta(minutes=5)
end = alert_time + timedelta(minutes=1)

logs_query = f'{{container="{state["service"]}"}}'
logs = grafana.query_loki(query=logs_query, start=start, end=end, limit=500)

metrics = grafana.query_prometheus_range(
query=f'container_memory_usage_bytes{{container="{state["service"]}"}}',
start=start, end=end, step="15s"
)

return {
**state,
"logs": "\n".join([log["line"] for log in logs[:100]]),
"metrics": metrics
}

def analyze_root_cause(state: SREState) -> SREState:
prompt = f"""
You are an SRE agent analyzing an incident.

**Alert:** {state["alert_name"]}
**Service:** {state["service"]}
**Severity:** {state["severity"]}

**Recent Logs:**
{state["logs"]}

**Metrics:**
{state["metrics"]}

**Task:** Identify root cause, recommend action, provide confidence.

**Output (JSON):**
{{
"root_cause": "Brief explanation",
"recommended_action": "restart|scale|investigate|escalate",
"confidence": 0.85,
"reasoning": "Why this is the likely cause"
}}
"""

response = llm.invoke(prompt)
analysis = json.loads(response.content)

return {
**state,
"root_cause": analysis["root_cause"],
"recommended_action": analysis["recommended_action"],
"confidence": analysis["confidence"],
"status": "remediating" if analysis["confidence"] >= 0.7 else "investigating"
}

def match_runbook(state: SREState) -> SREState:
from runbook_registry import RunbookRegistry
registry = RunbookRegistry()

if state["event"]["labels"].get("runbook_id"):
runbook = registry.get_by_id(state["event"]["labels"]["runbook_id"])
else:
runbook = registry.match_by_pattern(state["root_cause"])

if runbook:
return {**state, "runbook_id": runbook.id, "runbook_steps": runbook.steps}
else:
return {**state, "escalate": True, "escalation_reason": "no_runbook_matched"}

def execute_remediation(state: SREState) -> SREState:
from mcp_clients.docker import DockerClient
from mcp_clients.gitlab import GitLabClient

docker = DockerClient()
gitlab = GitLabClient()

for step in state["runbook_steps"]:
logger.info(f"Executing step: {step['action']}")

if step["action"] == "restart_container":
result = docker.restart_container(state["service"])
elif step["action"] == "scale_container":
result = docker.scale_container(state["service"], step["params"]["replicas"])
elif step["action"] == "check_health":
result = docker.health_check(state["service"])
else:
result = {"success": False, "error": f"Unknown action: {step['action']}"}

gitlab.add_timeline_entry(
issue_iid=state["gitlab_issue_iid"],
step={"action": step["action"], "timestamp": datetime.utcnow().isoformat()},
result=result
)

state["actions_taken"].append(step["action"])

if not result["success"] and step.get("required", True):
return {
**state,
"escalate": True,
"escalation_reason": f"step_failed: {step['action']}",
"status": "escalated"
}

return {**state, "status": "resolved"}

def verify_success(state: SREState) -> SREState:
from mcp_clients.grafana import GrafanaClient

grafana = GrafanaClient()
error_rate = grafana.query_prometheus_instant(
query=f'rate({{container="{state["service"]}", status=~"5.."}}[2m])'
)

if error_rate > 0.01:
return {
**state,
"escalate": True,
"escalation_reason": "remediation_ineffective",
"status": "escalated"
}

return state

def close_or_escalate(state: SREState) -> SREState:
from mcp_clients.gitlab import GitLabClient
gitlab = GitLabClient()

if state["escalate"]:
gitlab.escalate_incident(
issue_iid=state["gitlab_issue_iid"],
reason=state["escalation_reason"],
oncall_user="platform-oncall"
)
else:
gitlab.resolve_incident(
issue_iid=state["gitlab_issue_iid"],
resolution={
"runbook_id": state["runbook_id"],
"root_cause": state["root_cause"]
}
)

return state

def route_next_step(state: SREState) -> str:
if state.get("escalate"):
return "close_or_escalate"
elif state["status"] == "investigating":
return "match_runbook"
elif state["status"] == "remediating":
return "execute_remediation"
elif state["status"] == "resolved":
return "verify_success"
else:
return END

def build_sre_graph():
graph = StateGraph(SREState)

graph.add_node("consume_event", consume_event)
graph.add_node("create_gitlab_issue", create_gitlab_issue)
graph.add_node("enrich_context", enrich_context)
graph.add_node("analyze_root_cause", analyze_root_cause)
graph.add_node("match_runbook", match_runbook)
graph.add_node("execute_remediation", execute_remediation)
graph.add_node("verify_success", verify_success)
graph.add_node("close_or_escalate", close_or_escalate)

graph.set_entry_point("consume_event")
graph.add_edge("consume_event", "create_gitlab_issue")
graph.add_edge("create_gitlab_issue", "enrich_context")
graph.add_edge("enrich_context", "analyze_root_cause")
graph.add_conditional_edges("analyze_root_cause", route_next_step)
graph.add_conditional_edges("match_runbook", route_next_step)
graph.add_edge("execute_remediation", "verify_success")
graph.add_conditional_edges("verify_success", route_next_step)
graph.add_edge("close_or_escalate", END)

return graph.compile()

Kafka Consumer Loop

File: sre-agent/consumer.py

from kafka import KafkaConsumer
from agent import build_sre_graph
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

sre_graph = build_sre_graph()

def main():
consumer = KafkaConsumer(
'sre-critical',
'sre-warning',
bootstrap_servers=['redpanda:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='sre-agent-group',
auto_offset_reset='latest',
enable_auto_commit=True,
max_poll_records=10
)

logger.info("SRE Agent consumer started. Listening for events...")

for message in consumer:
event = message.value
logger.info(f"Received event: {event['labels']['alertname']}")

try:
result = sre_graph.invoke({"event": event})
logger.info(f"Agent execution completed. Status: {result['status']}")
except Exception as e:
logger.error(f"Agent execution failed: {e}", exc_info=True)

if __name__ == "__main__":
main()

Next: [[AIOps-GitLab-Integration]] - GitLab MCP client implementation