In this tutorial, we build the entire Agentic UI stack from the ground up using plain Python, without relying on external frameworks to abstract away the core ideas. We implement the AG-UI event stream to make agent behavior observable in real time, and we bring in A2UI as a declarative layer that allows interfaces to be defined as structured JSON rather than executable code. As we progress, we enable an LLM to generate full user interfaces from natural language, synchronize agent and UI state through JSON Patch updates, and enforce human-in-the-loop safety for critical actions. Also, we gain a clear, end-to-end understanding of how agent reasoning transforms into interactive, protocol-compliant user interfaces.
import subprocess, sys
for pkg in ["openai", "rich", "pydantic"]:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
import os, getpass
if os.environ.get("OPENAI_API_KEY"):
API_KEY = os.environ["OPENAI_API_KEY"]
print(" Using OPENAI_API_KEY from environment.")
else:
try:
from google.colab import userdata
API_KEY = userdata.get("OPENAI_API_KEY")
print("
Using OPENAI_API_KEY from Colab Secrets.")
except Exception:
API_KEY = getpass.getpass("
Enter your OpenAI API key (hidden): ")
print("
API key received.")
BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
import json, re, time, uuid, copy, textwrap
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Any, Optional, Generator
from pydantic import BaseModel, Field
from openai import OpenAI
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.tree import Tree
from rich.text import Text
from rich.markdown import Markdown
from rich import box
console = Console(width=105)
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
def llm(messages, **kw):
try:
return client.chat.completions.create(model=MODEL, messages=messages, temperature=0.2, **kw)
except Exception as e:
console.print(f"[red]LLM error: {e}[/]")
return None
def hdr(n, title, sub=""):
console.print()
console.rule(f"[bold cyan]SECTION {n}", style="cyan")
body = f"[bold white]{title}[/]\n[dim]{sub}[/]" if sub else f"[bold white]{title}[/]"
console.print(Panel(body, border_style="cyan", padding=(1, 2)))
hdr(1, "AG-UI Protocol — Event System",
"The real AG-UI protocol uses ~16 event types streamed via SSE.\n"
"We implement all core event types and a streaming emitter in pure Python.")
class AGUIEventType(str, Enum):
RUN_STARTED = "RUN_STARTED"
RUN_FINISHED = "RUN_FINISHED"
RUN_ERROR = "RUN_ERROR"
TEXT_MESSAGE_START = "TEXT_MESSAGE_START"
TEXT_MESSAGE_CONTENT = "TEXT_MESSAGE_CONTENT"
TEXT_MESSAGE_END = "TEXT_MESSAGE_END"
TOOL_CALL_START = "TOOL_CALL_START"
TOOL_CALL_ARGS = "TOOL_CALL_ARGS"
TOOL_CALL_RESULT = "TOOL_CALL_RESULT"
TOOL_CALL_END = "TOOL_CALL_END"
STATE_SNAPSHOT = "STATE_SNAPSHOT"
STATE_DELTA = "STATE_DELTA"
INTERRUPT = "INTERRUPT"
CUSTOM = "CUSTOM"
STEP_STARTED = "STEP_STARTED"
STEP_FINISHED = "STEP_FINISHED"
@dataclass
class AGUIEvent:
type: AGUIEventType
data: dict = field(default_factory=dict)
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
def to_sse(self) -> str:
payload = {"type": self.type.value, "id": self.event_id, **self.data}
return f"event: ag-ui\ndata: {json.dumps(payload)}\n\n"
def to_json(self) -> dict:
return {"type": self.type.value, "id": self.event_id, "ts": self.timestamp, **self.data}
class AGUIEventStream:
def __init__(self):
self.events: list[AGUIEvent] = []
self.listeners: list = []
def emit(self, event: AGUIEvent):
self.events.append(event)
for listener in self.listeners:
listener(event)
def on(self, callback):
self.listeners.append(callback)
def replay(self) -> list[dict]:
return [e.to_json() for e in self.events]
def demo_agui_lifecycle():
stream = AGUIEventStream()
event_colors = {
"RUN_": "bold green", "TEXT_": "cyan", "TOOL_": "magenta",
"STATE_": "yellow", "INTERRUPT": "bold red", "STEP_": "dim",
}
def frontend_listener(event: AGUIEvent):
color = "white"
for prefix, c in event_colors.items():
if event.type.value.startswith(prefix):
color = c
break
detail = json.dumps(event.data)[:80] if event.data else ""
console.print(f" [{color}]
{event.type.value:.<28}[/] {detail}")
stream.on(frontend_listener)
run_id = str(uuid.uuid4())[:8]
console.print("[bold]Simulating full AG-UI agent run...[/]\n")
stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id}))
stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "analyzing_query", "label": "Understanding request"}))
stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "analyzing_query"}))
msg_id = str(uuid.uuid4())[:8]
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id, "role": "assistant"}))
for chunk in ["I'll ", "look up ", "the data ", "and build ", "a dashboard ", "for you."]:
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, {"message_id": msg_id, "delta": chunk}))
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id}))
tool_id = str(uuid.uuid4())[:8]
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_START, {"tool_call_id": tool_id, "name": "query_database"}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_ARGS, {"tool_call_id": tool_id, "args_delta": '{"query": "SELECT revenue FROM sales"}'}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_RESULT, {"tool_call_id": tool_id, "result": [{"month": "Jan", "revenue": 42000}, {"month": "Feb", "revenue": 58000}]}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_END, {"tool_call_id": tool_id}))
stream.emit(AGUIEvent(AGUIEventType.STATE_SNAPSHOT, {
"state": {"active_agent": "DataAnalyst", "stage": "rendering", "progress": 0.75}
}))
stream.emit(AGUIEvent(AGUIEventType.STATE_DELTA, {
"delta": [{"op": "replace", "path": "/progress", "value": 1.0}]
}))
stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"reason": "high_risk_action",
"description": "Agent wants to send an email to all 5,000 customers.",
"options": ["approve", "reject", "modify"],
}))
stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "status": "completed"}))
console.print(Panel(
stream.events[3].to_sse(),
title="[bold]Example SSE wire format (how it looks on the network)",
border_style="dim",
))
table = Table(title="AG-UI Event Stream Summary", box=box.ROUNDED)
table.add_column("Category", style="cyan", width=15)
table.add_column("Events", justify="center", style="green")
counts = {}
for e in stream.events:
cat = e.type.value.rsplit("_", 1)[0] if "_" in e.type.value else e.type.value
counts[cat] = counts.get(cat, 0) + 1
for cat, n in counts.items():
table.add_row(cat, str(n))
table.add_row("[bold]TOTAL", f"[bold]{len(stream.events)}")
console.print(table)
demo_agui_lifecycle()
We start by building the backbone of every agentic frontend: the AG-UI event stream. We implement all 16 event types from the real AG-UI specification, lifecycle events, token-by-token text streaming, streamed tool calls, state snapshots, deltas, and interrupt signals, and serialize them into the SSE wire format that production systems use over HTTP. We then wire up a frontend listener that reacts to each event as it arrives, simulating the exact experience a React or Flutter app would have consuming this stream.
hdr(2, "A2UI — Declarative Component Trees",
"Google's A2UI spec: agents emit flat JSON component lists with ID refs.\n"
"The client's widget registry maps types → native widgets.\n"
"Safe like data, expressive like code. No executable code sent.")
class A2UIMessageType(str, Enum):
CREATE_SURFACE = "createSurface"
UPDATE_COMPONENTS = "updateComponents"
UPDATE_DATA_MODEL = "updateDataModel"
DELETE_SURFACE = "deleteSurface"
@dataclass
class A2UIComponent:
id: str
type: str
properties: dict = field(default_factory=dict)
children: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
d = {"id": self.id, "type": self.type, **self.properties}
if self.children:
d["children"] = self.children
return d
@dataclass
class A2UIDataModel:
data: dict = field(default_factory=dict)
def get_binding(self, path: str) -> Any:
parts = [p for p in path.split("/") if p]
val = self.data
for p in parts:
if isinstance(val, dict):
val = val.get(p)
elif isinstance(val, list) and p.isdigit():
val = val[int(p)]
else:
return None
return val
@dataclass
class A2UISurface:
surface_id: str
components: list[A2UIComponent] = field(default_factory=list)
data_model: A2UIDataModel = field(default_factory=A2UIDataModel)
def to_messages(self) -> list[dict]:
msgs = []
msgs.append({
"type": A2UIMessageType.CREATE_SURFACE.value,
"surfaceId": self.surface_id,
})
msgs.append({
"type": A2UIMessageType.UPDATE_COMPONENTS.value,
"surfaceId": self.surface_id,
"components": [c.to_dict() for c in self.components],
})
if self.data_model.data:
msgs.append({
"type": A2UIMessageType.UPDATE_DATA_MODEL.value,
"surfaceId": self.surface_id,
"dataModel": self.data_model.data,
})
return msgs
class WidgetRegistry:
def __init__(self):
self._renderers = {}
def register(self, component_type: str, render_fn):
self._renderers[component_type] = render_fn
def render(self, component: A2UIComponent, surface: A2UISurface, indent: int = 0):
fn = self._renderers.get(component.type)
if fn:
fn(component, surface, indent)
else:
pad = " " * indent
console.print(f"{pad}[dim]⟨{component.type} id={component.id}⟩ (no renderer)[/]")
def render_tree(self, surface: A2UISurface):
comp_map = {c.id: c for c in surface.components}
all_children = set()
for c in surface.components:
all_children.update(c.children)
roots = [c for c in surface.components if c.id not in all_children]
def _render(comp_id: str, indent: int):
comp = comp_map.get(comp_id)
if not comp:
return
self.render(comp, surface, indent)
for child_id in comp.children:
_render(child_id, indent + 1)
for root in roots:
_render(root.id, 0)
registry = WidgetRegistry()
def _resolve(comp, surface, key, default=None):
val = comp.properties.get(key, default)
binding = comp.properties.get("dataBinding")
if binding and isinstance(binding, str) and binding.startswith("/"):
resolved = surface.data_model.get_binding(binding)
if resolved is not None:
return resolved
if isinstance(val, str) and val.startswith("/") and "/" in val[1:]:
resolved = surface.data_model.get_binding(val)
if resolved is not None:
return resolved
return val
def _to_float(val, default=0.0):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
cleaned = val.strip().rstrip("%")
try:
f = float(cleaned)
if "%" in val or f > 1:
return f / 100.0
return f
except ValueError:
return default
return default
def render_card(comp, surface, indent):
pad = " " * indent
title = str(_resolve(comp, surface, "title", "Card"))
console.print(f"{pad}┌─{'─' * 50}─┐")
console.print(f"{pad}│ [bold]{title:^50}[/] │")
console.print(f"{pad}├─{'─' * 50}─┤")
if not comp.children:
subtitle = str(_resolve(comp, surface, "subtitle", ""))
if subtitle:
console.print(f"{pad}│ {subtitle:<49}│")
console.print(f"{pad}└─{'─' * 50}─┘")
def render_text(comp, surface, indent):
pad = " " * indent
text = _resolve(comp, surface, "text", "")
style = comp.properties.get("style", "body")
styles = {"headline": "bold white", "body": "white", "caption": "dim", "label": "bold cyan"}
console.print(f"{pad}[{styles.get(style, 'white')}]{text}[/]")
def render_button(comp, surface, indent):
pad = " " * indent
label = str(_resolve(comp, surface, "label", "Button"))
variant = comp.properties.get("variant", "primary")
colors = {"primary": "bold white on blue", "secondary": "white on grey30", "danger": "bold white on red"}
console.print(f"{pad} [{colors.get(variant, 'white')}] {label} [/]")
def render_text_field(comp, surface, indent):
pad = " " * indent
label = comp.properties.get("label", "Input")
placeholder = comp.properties.get("placeholder", "")
console.print(f"{pad} {label}: [dim]┌──────────────────────────┐[/]")
console.print(f"{pad} [dim]│ {placeholder:<25}│[/]")
console.print(f"{pad} [dim]└──────────────────────────┘[/]")
def render_row(comp, surface, indent):
pass
def render_column(comp, surface, indent):
pass
def render_image(comp, surface, indent):
pad = " " * indent
alt = comp.properties.get("alt", "image")
console.print(f"{pad} [dim]
[{alt}][/]")
def render_divider(comp, surface, indent):
pad = " " * indent
console.print(f"{pad} {'─' * 50}")
def render_chip(comp, surface, indent):
pad = " " * indent
label = str(_resolve(comp, surface, "label", ""))
console.print(f"{pad} [on grey23] {label} [/]")
def render_progress(comp, surface, indent):
pad = " " * indent
raw_value = _resolve(comp, surface, "value", 0)
value = max(0.0, min(1.0, _to_float(raw_value, 0.0)))
label = str(_resolve(comp, surface, "label", ""))
bar_len = int(value * 40)
bar = f"[green]{'█' * bar_len}[/][dim]{'░' * (40 - bar_len)}[/]"
console.print(f"{pad} {label}: {bar} {value*100:.0f}%")
for name, fn in [
("card", render_card), ("text", render_text), ("button", render_button),
("text-field", render_text_field), ("row", render_row), ("column", render_column),
("image", render_image), ("divider", render_divider), ("chip", render_chip),
("progress-bar", render_progress),
]:
registry.register(name, fn)
console.print("\n[bold]Demo: A2UI booking form — agent generates a restaurant reservation UI[/]\n")
booking_surface = A2UISurface(
surface_id="booking-form-1",
components=[
A2UIComponent("root", "card", {"title": "
Reserve a Table"}, children=["c1", "c2", "c3", "c4", "c5", "c6"]),
A2UIComponent("c1", "text", {"text": "", "dataBinding": "/restaurant/name", "style": "headline"}),
A2UIComponent("c2", "text", {"text": "", "dataBinding": "/restaurant/cuisine", "style": "caption"}),
A2UIComponent("c3", "divider", {}),
A2UIComponent("c4", "text-field", {"label": "Date", "placeholder": "YYYY-MM-DD"}),
A2UIComponent("c5", "text-field", {"label": "Guests", "placeholder": "1-12"}),
A2UIComponent("c6", "button", {"label": "Reserve Now", "variant": "primary", "action": "submit_booking"}),
],
data_model=A2UIDataModel({"restaurant": {"name": "Chez Laurent", "cuisine": "French Contemporary • $$$$"}})
)
console.print(Panel(
"\n".join(json.dumps(m, indent=2)[:200] for m in booking_surface.to_messages()),
title="[bold]A2UI JSONL stream (what goes over the wire)",
border_style="yellow",
))
console.print("[bold]Rendered by client widget registry:[/]\n")
registry.render_tree(booking_surface)
console.print()
t = Table(title="A2UI Flat Component List (Adjacency Model)", box=box.ROUNDED)
t.add_column("ID", style="cyan", width=8)
t.add_column("Type", style="green", width=14)
t.add_column("Children", style="yellow", width=20)
t.add_column("Bindings", style="magenta", width=25)
for c in booking_surface.components:
binding = c.properties.get("dataBinding", "")
t.add_row(c.id, c.type, ", ".join(c.children) if c.children else "—", binding or "—")
console.print(t)
We implement Google’s A2UI specification: a flat adjacency-list model where components reference children by ID rather than nesting, making the format trivially streamable and easy for LLMs to generate incrementally. We build a client-side Widget Registry that maps abstract type strings like “card”, “text-field”, and “progress-bar” to concrete terminal renderers, mirroring how a production app maps them to React components or Flutter widgets. We demonstrate the full cycle with a restaurant booking form, complete with data model bindings that decouple dynamic values from UI structure, exactly as the A2UI spec prescribes.
hdr(3, "Generative UI — LLM Produces Live Interfaces",
"The agent generates A2UI component trees dynamically based on the query.\n"
"This is the core of 'Generative UI' — context-adaptive interfaces\n"
"that go far beyond text-only chat responses.")
A2UI_GENERATION_PROMPT = """\
You are an A2UI Generative UI agent. Given a user query, you generate a rich \
interactive interface — NOT text. You output an A2UI component tree as JSON.
RULES:
1. Output a flat list of components using the adjacency model (children = list of IDs).
2. Available component types: card, text, button, text-field, row, column, divider, chip, image, progress-bar, select, date-picker, data-table
3. Include a separate "dataModel" object for dynamic values. Use "/path/to/value" bindings.
4. The ROOT component should be a "card" with all others as descendants.
5. Think about what UI BEST serves the user — forms for input, tables for data, \
progress bars for status, chips for tags, buttons for actions.
OUTPUT FORMAT (strict JSON, nothing else):
{
"surfaceId": "unique-id",
"components": [
{"id": "root", "type": "card", "title": "...", "children": ["c1", "c2"]},
{"id": "c1", "type": "text", "text": "...", "style": "headline"},
...
],
"dataModel": { ... }
}
"""
def generate_ui(user_query: str) -> Optional[A2UISurface]:
console.print(f" [dim]Generating UI for:[/] [bold]{user_query}[/]")
response = llm([
{"role": "system", "content": A2UI_GENERATION_PROMPT},
{"role": "user", "content": user_query},
], max_tokens=1200)
if not response:
return None
raw = response.choices[0].message.content
try:
cleaned = re.sub(r'```json\s*|\s*```', '', raw).strip()
spec = json.loads(cleaned)
except json.JSONDecodeError:
console.print(f"[red]Failed to parse generated UI: {raw[:200]}[/]")
return None
components = []
for c in spec.get("components", []):
components.append(A2UIComponent(
id=c.get("id", str(uuid.uuid4())[:6]),
type=c.get("type", "text"),
properties={k: v for k, v in c.items() if k not in ("id", "type", "children")},
children=c.get("children", []),
))
surface = A2UISurface(
surface_id=spec.get("surfaceId", f"gen-{uuid.uuid4().hex[:6]}"),
components=components,
data_model=A2UIDataModel(spec.get("dataModel", {})),
)
return surface
def demo_generative_ui(query: str):
surface = generate_ui(query)
if surface:
console.print(f"\n[bold green]Generated {len(surface.components)} components:[/]")
registry.render_tree(surface)
console.print()
types = {}
for c in surface.components:
types[c.type] = types.get(c.type, 0) + 1
console.print(" [dim]Component types used:[/] " + ", ".join(f"[cyan]{t}[/]×{n}" for t, n in types.items()))
if surface.data_model.data:
console.print(f" [dim]Data model keys:[/] {list(surface.data_model.data.keys())}")
console.print()
console.print("\n[bold]Demo 1: Agent generates an onboarding form[/]")
demo_generative_ui(
"Create a user onboarding flow: collect name, email, role (dropdown), "
"preferred notification method (chips), and a 'Get Started' button."
)
console.print("\n[bold]Demo 2: Agent generates a data dashboard[/]")
demo_generative_ui(
"Show a project status dashboard with: project name 'Atlas v2', "
"4 team members, sprint progress at 68%, 3 blockers flagged as critical, "
"and action buttons for 'View Backlog' and 'Schedule Standup'."
)
console.print("\n[bold]Demo 3: Agent generates a confirmation dialog[/]")
demo_generative_ui(
"Show a payment confirmation: $2,450 charge to Visa ending 4242, "
"order #ORD-8891, with Approve and Decline buttons."
)
We hand the keys to the LLM and let it generate complete A2UI component trees at runtime from plain English descriptions, this is Generative UI in its purest form. We prompt the model with the A2UI schema and component catalog, and it produces fully structured surfaces with cards, forms, chips, progress bars, and data bindings, choosing the best UI pattern for each query. We run three demos, an onboarding flow, a project dashboard, and a payment confirmation, showing how the same agent adapts its interface to wildly different contexts without a single hardcoded layout.
hdr(4, "State Synchronization — Shared State Between Agent & UI",
"AG-UI syncs state bidirectionally using STATE_SNAPSHOT and STATE_DELTA.\n"
"The agent IS the state machine; the UI IS the renderer.\n"
"JSON Patch diffs keep updates minimal and efficient.")
class SharedState:
def __init__(self, initial: dict = None):
self.state: dict = initial or {}
self.history: list[dict] = []
self.version: int = 0
def snapshot(self) -> AGUIEvent:
return AGUIEvent(AGUIEventType.STATE_SNAPSHOT, {"state": copy.deepcopy(self.state), "version": self.version})
def apply_delta(self, operations: list[dict]) -> AGUIEvent:
for op in operations:
path_parts = [p for p in op["path"].split("/") if p]
target = self.state
for part in path_parts[:-1]:
if isinstance(target, dict):
target = target.setdefault(part, {})
elif isinstance(target, list) and part.isdigit():
target = target[int(part)]
key = path_parts[-1] if path_parts else None
if key is None:
continue
if op["op"] == "replace":
target[key] = op["value"]
elif op["op"] == "add":
if isinstance(target, list) and key.isdigit():
target.insert(int(key), op["value"])
else:
target[key] = op["value"]
elif op["op"] == "remove":
if isinstance(target, dict):
target.pop(key, None)
self.version += 1
self.history.append({"version": self.version, "ops": operations})
return AGUIEvent(AGUIEventType.STATE_DELTA, {"delta": operations, "version": self.version})
console.print("\n[bold]Demo: Document review pipeline — 3 agents, shared state[/]\n")
stream = AGUIEventStream()
state = SharedState({
"document": {"title": "Q4 Strategy Report", "status": "draft", "word_count": 2840},
"pipeline": {"stage": "research", "progress": 0.0},
"agents": {"active": "Researcher", "queue": ["Editor", "Reviewer"]},
"feedback": [],
})
def log_event(event: AGUIEvent):
if event.type in (AGUIEventType.STATE_SNAPSHOT, AGUIEventType.STATE_DELTA):
if event.type == AGUIEventType.STATE_DELTA:
ops = event.data.get("delta", [])
for op in ops:
console.print(f" [yellow]STATE_DELTA[/] v{event.data.get('version')}: "
f"[cyan]{op['op']}[/] {op['path']} → {op.get('value', '∅')}")
else:
console.print(f" [yellow]STATE_SNAPSHOT[/] v{event.data.get('version')}: {list(event.data['state'].keys())}")
stream.on(log_event)
stream.emit(state.snapshot())
console.print("\n[bold green]▸ Researcher agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "research_complete"},
{"op": "replace", "path": "/pipeline/progress", "value": 0.33},
{"op": "add", "path": "/feedback/0", "value": {"agent": "Researcher", "note": "Added 4 new data sources"}},
]))
console.print("\n[bold green]▸ Editor agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/agents/active", "value": "Editor"},
{"op": "replace", "path": "/pipeline/stage", "value": "editing"},
{"op": "replace", "path": "/pipeline/progress", "value": 0.66},
{"op": "replace", "path": "/document/word_count", "value": 3150},
]))
console.print("\n[bold green]▸ Reviewer agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/agents/active", "value": "Reviewer"},
{"op": "replace", "path": "/pipeline/stage", "value": "review_complete"},
{"op": "replace", "path": "/pipeline/progress", "value": 1.0},
{"op": "replace", "path": "/document/status", "value": "approved"},
]))
console.print(Panel(
json.dumps(state.state, indent=2),
title="[bold]Final shared state after pipeline",
border_style="green",
))
t = Table(title="State History (versions)", box=box.ROUNDED)
t.add_column("Version", style="cyan", justify="center")
t.add_column("Operations", style="yellow")
for h in state.history:
ops_summary = "; ".join(f"{o['op']} {o['path']}" for o in h["ops"])
t.add_row(str(h["version"]), ops_summary[:70])
console.print(t)
hdr(5, "Human-in-the-Loop — AG-UI INTERRUPT Events",
"When an agent hits a high-stakes action, it emits an INTERRUPT event.\n"
"The frontend renders an approval UI. Execution pauses until the human\n"
"approves, rejects, or modifies. State is preserved throughout.")
@dataclass
class InterruptRequest:
interrupt_id: str
action_description: str
risk_level: str
affected_resources: list[str]
proposed_changes: dict
options: list[str] = field(default_factory=lambda: ["approve", "reject", "modify"])
@dataclass
class InterruptResponse:
interrupt_id: str
decision: str
modifications: Optional[dict] = None
class InterruptableAgent:
RISK_RULES = {
"delete": "critical",
"payment": "critical",
"email_all": "high",
"publish": "high",
"update": "medium",
"read": "low",
}
def __init__(self):
self.stream = AGUIEventStream()
self.pending_interrupts: dict[str, InterruptRequest] = {}
def assess_and_maybe_interrupt(self, action: str, details: dict) -> Optional[InterruptRequest]:
risk = "low"
for keyword, level in self.RISK_RULES.items():
if keyword in action.lower():
risk = level
break
if risk in ("critical", "high"):
interrupt = InterruptRequest(
interrupt_id=str(uuid.uuid4())[:8],
action_description=action,
risk_level=risk,
affected_resources=details.get("resources", []),
proposed_changes=details.get("changes", {}),
)
self.pending_interrupts[interrupt.interrupt_id] = interrupt
self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"interrupt_id": interrupt.interrupt_id,
"reason": risk,
"description": interrupt.action_description,
"affected_resources": interrupt.affected_resources,
"proposed_changes": interrupt.proposed_changes,
"options": interrupt.options,
}))
return interrupt
return None
def resolve_interrupt(self, response: InterruptResponse) -> str:
interrupt = self.pending_interrupts.pop(response.interrupt_id, None)
if not interrupt:
return "No pending interrupt found."
if response.decision == "approve":
return f"
APPROVED: '{interrupt.action_description}' executing now."
elif response.decision == "reject":
return f"
REJECTED: '{interrupt.action_description}' cancelled."
elif response.decision == "modify":
return f"
MODIFIED: '{interrupt.action_description}' updated with: {response.modifications}"
return f"Unknown decision: {response.decision}"
console.print("\n[bold]Demo: Agent encounters actions of varying risk levels[/]\n")
agent = InterruptableAgent()
actions = [
("Read user profile", {"resources": ["user:123"]}),
("Update user preferences", {"resources": ["user:123"], "changes": {"theme": "dark"}}),
("Delete user account", {"resources": ["user:123", "data:all"], "changes": {"action": "permanent_delete"}}),
("Email all 12,000 users", {"resources": ["email:newsletter"], "changes": {"subject": "Big Announcement"}}),
("Publish blog post", {"resources": ["post:draft-42"], "changes": {"status": "public"}}),
]
def event_logger(event):
if event.type == AGUIEventType.INTERRUPT:
d = event.data
risk_style = {"critical": "bold red", "high": "bold yellow"}.get(d["reason"], "white")
console.print(f"\n [bold]
INTERRUPT EVENT[/]")
console.print(f" Risk: [{risk_style}]{d['reason'].upper()}[/]")
console.print(f" Action: {d['description']}")
console.print(f" Affected: {d['affected_resources']}")
console.print(f" Options: {d['options']}")
agent.stream.on(event_logger)
results = []
for action_desc, details in actions:
interrupt = agent.assess_and_maybe_interrupt(action_desc, details)
if interrupt:
decision = "reject" if interrupt.risk_level == "critical" else "approve"
result = agent.resolve_interrupt(InterruptResponse(interrupt.interrupt_id, decision))
else:
result = f"
AUTO-EXECUTED: '{action_desc}' (low risk, no approval needed)"
results.append((action_desc, result))
console.print()
t = Table(title="Execution Results", box=box.ROUNDED, show_lines=True)
t.add_column("Action", style="white", width=28)
t.add_column("Outcome", style="dim", width=55)
for action_desc, result in results:
t.add_row(action_desc, result)
console.print(t)
We build a SharedState engine that emits AG-UI STATE_SNAPSHOT and STATE_DELTA events using JSON Patch operations, keeping the agent backend and the frontend UI perfectly synchronized through every mutation. We demonstrate this with a three-agent document review pipeline in which a Researcher, Editor, and Reviewer each modify the shared state in sequence, and the frontend sees every change the instant it occurs. We then implement the AG-UI INTERRUPT pattern, in which the agent assesses risk levels for proposed actions, emits interrupt events for any dangerous actions, and pauses execution until a human approves, rejects, or modifies the plan.
hdr(6, "Full Pipeline — LLM-Driven Adaptive UI",
"The complete Agentic UI architecture in one pipeline:\n"
" User query → Intent analysis → UI pattern selection →\n"
" A2UI generation → AG-UI event streaming → State sync → Render")
UI_ROUTER_PROMPT = """\
You are a UI routing agent. Given a user query, decide what type of UI to generate.
RESPOND IN JSON ONLY:
{
"intent": "form | dashboard | confirmation | list | detail | wizard | error",
"reasoning": "why this UI pattern fits",
"ui_complexity": "simple | moderate | complex",
"needs_approval": true/false,
"data_requirements": ["what data the UI needs"]
}
"""
class AgenticUIPipeline:
def __init__(self):
self.stream = AGUIEventStream()
self.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0})
self.interrupt_agent = InterruptableAgent()
def route(self, query: str) -> dict:
resp = llm([
{"role": "system", "content": UI_ROUTER_PROMPT},
{"role": "user", "content": query},
], max_tokens=300)
if not resp:
return {"intent": "dashboard", "reasoning": "fallback", "ui_complexity": "simple",
"needs_approval": False, "data_requirements": []}
raw = resp.choices[0].message.content
try:
return json.loads(re.sub(r'```json\s*|\s*```', '', raw).strip())
except json.JSONDecodeError:
return {"intent": "dashboard", "reasoning": "parse_fallback", "ui_complexity": "simple",
"needs_approval": False, "data_requirements": []}
def run(self, user_query: str):
run_id = str(uuid.uuid4())[:8]
console.print(Panel(f"[bold]{user_query}[/]", title="
User Query", border_style="white"))
self.stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id}))
self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "routing"}))
routing = self.route(user_query)
console.print(f"\n [bold cyan]
Router Decision:[/]")
console.print(f" Intent: [green]{routing.get('intent')}[/] | "
f"Complexity: [yellow]{routing.get('ui_complexity')}[/] | "
f"Approval: {'
' if routing.get('needs_approval') else '
'}")
console.print(f" Reasoning: [dim]{routing.get('reasoning', '')}[/]")
self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "routing", "result": routing}))
self.state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "generating"},
])
self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "generating_ui"}))
msg_id = str(uuid.uuid4())[:8]
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id}))
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, {
"message_id": msg_id,
"delta": f"Building a {routing.get('intent')} interface for you..."
}))
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id}))
surface = generate_ui(user_query)
self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "generating_ui"}))
if routing.get("needs_approval") and surface:
self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"reason": "ui_confirmation",
"description": f"Generated {len(surface.components)} component UI. Render it?",
"options": ["render", "regenerate", "cancel"],
}))
console.print("\n [bold yellow]
INTERRUPT:[/] UI generated, awaiting human approval...")
console.print(" [green]→ Auto-approving for demo...[/]")
if surface:
self.state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "rendering"},
{"op": "replace", "path": "/renders", "value": self.state.state.get("renders", 0) + 1},
])
console.print(f"\n[bold green]
Rendered Interface ({len(surface.components)} components):[/]\n")
registry.render_tree(surface)
self.stream.emit(AGUIEvent(AGUIEventType.CUSTOM, {
"subtype": "a2ui_surface",
"surface": surface.to_messages(),
}))
self.state.apply_delta([{"op": "replace", "path": "/pipeline/stage", "value": "complete"}])
self.stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "status": "success"}))
console.print()
event_counts = {}
for e in self.stream.events:
event_counts[e.type.value] = event_counts.get(e.type.value, 0) + 1
t = Table(title="Pipeline Event Summary", box=box.ROUNDED)
t.add_column("Event Type", style="cyan")
t.add_column("Count", justify="center", style="green")
for etype, count in sorted(event_counts.items()):
t.add_row(etype, str(count))
console.print(t)
pipeline = AgenticUIPipeline()
console.print("\n[bold]Demo 1: Agent builds a settings form[/]")
pipeline.run(
"Create a notification settings panel where I can toggle email/SMS/push, "
"set quiet hours, and pick a notification sound."
)
pipeline.stream = AGUIEventStream()
pipeline.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0})
console.print("\n[bold]Demo 2: Agent builds an order tracking dashboard[/]")
pipeline.run(
"Show order #ORD-7742 status: shipped via FedEx, tracking 789456123, "
"estimated delivery March 24, 2 of 3 items delivered. Show a progress bar "
"and action buttons for 'Contact Support' and 'Request Refund'."
)
hdr(7, "Incremental UI Updates — Live Surface Modification",
"A2UI surfaces are incrementally updateable. The agent can add, remove,\n"
"or modify components and data bindings on a live surface without\n"
"regenerating the whole tree. Essential for real-time collaboration.")
class LiveSurface:
def __init__(self, surface: A2UISurface):
self.surface = surface
self.update_log: list[str] = []
def add_component(self, component: A2UIComponent, parent_id: Optional[str] = None):
self.surface.components.append(component)
if parent_id:
for c in self.surface.components:
if c.id == parent_id:
c.children.append(component.id)
break
self.update_log.append(f"ADD {component.type}#{component.id} → parent:{parent_id or 'root'}")
def update_component(self, component_id: str, new_props: dict):
for c in self.surface.components:
if c.id == component_id:
c.properties.update(new_props)
self.update_log.append(f"UPD #{component_id} props: {list(new_props.keys())}")
return
self.update_log.append(f"ERR #{component_id} not found")
def remove_component(self, component_id: str):
self.surface.components = [c for c in self.surface.components if c.id != component_id]
for c in self.surface.components:
if component_id in c.children:
c.children.remove(component_id)
self.update_log.append(f"DEL #{component_id}")
def update_data(self, path: str, value: Any):
self.surface.data_model.data = _set_nested(self.surface.data_model.data, path, value)
self.update_log.append(f"DATA {path} = {value}")
def _set_nested(d: dict, path: str, value: Any) -> dict:
parts = [p for p in path.split("/") if p]
d = copy.deepcopy(d)
current = d
for p in parts[:-1]:
current = current.setdefault(p, {})
if parts:
current[parts[-1]] = value
return d
console.print("\n[bold]Demo: Live collaborative editing — agent modifies UI in real-time[/]\n")
initial = A2UISurface(
surface_id="task-board",
components=[
A2UIComponent("board", "card", {"title": "
Sprint Board"}, children=["t1", "t2", "t3"]),
A2UIComponent("t1", "chip", {"label": "AUTH-101: Login flow", "variant": "in_progress"}),
A2UIComponent("t2", "chip", {"label": "AUTH-102: OAuth setup", "variant": "todo"}),
A2UIComponent("t3", "chip", {"label": "AUTH-103: 2FA", "variant": "todo"}),
],
data_model=A2UIDataModel({"sprint": {"name": "Sprint 14", "velocity": 21}}),
)
live = LiveSurface(initial)
console.print("[bold]Initial board:[/]")
registry.render_tree(live.surface)
console.print("\n[bold yellow]Agent updating board in real-time...[/]\n")
live.update_component("t1", {"variant": "done", "label": "
AUTH-101: Login flow"})
live.update_component("t2", {"variant": "in_progress", "label": "
AUTH-102: OAuth setup"})
live.add_component(
A2UIComponent("t4", "chip", {"label": "AUTH-104: Password reset", "variant": "todo"}),
parent_id="board"
)
live.update_data("/sprint/velocity", 25)
live.remove_component("t3")
console.print("[bold]Updated board:[/]")
registry.render_tree(live.surface)
console.print()
t = Table(title="Incremental Update Log", box=box.ROUNDED)
t.add_column("#", style="cyan", width=4, justify="center")
t.add_column("Operation", style="yellow")
for i, entry in enumerate(live.update_log, 1):
t.add_row(str(i), entry)
console.print(t)
We wire every piece together into a single AgenticUIPipeline class that takes a user query, classifies its intent with an LLM router, selects the right UI pattern, generates an A2UI surface, streams the entire process over AG-UI events, manages shared state, and renders the result, the complete architecture in one run. We then build a LiveSurface class that supports incremental A2UI updates: adding, modifying, and removing components on an already-rendered surface without regenerating the whole tree, which is essential for real-time collaborative experiences. We demo this with a sprint board that an agent updates live, marking tasks complete, adding new ones, and adjusting data model values, all tracked in a detailed operation log.
hdr(8, "Reference — The Agentic UI Protocol Stack",
"How AG-UI, A2UI, MCP, and A2A fit together in the modern agent architecture.")
console.print(Panel("""\
[bold white]THE AGENTIC UI STACK (2026)[/]
┌──────────────────────────────────────────────────────┐
│ [bold cyan]USER INTERFACE[/] (React, Flutter, SwiftUI, Terminal) │
│ Renders native widgets from component specs │
└────────────────────────┬─────────────────────────────┘
│ A2UI component trees (JSON)
│ AG-UI events (SSE / WebSocket)
┌────────────────────────┴─────────────────────────────┐
│ [bold yellow]AG-UI PROTOCOL[/] (Agent
User Interaction) │
│ • Event streaming (TEXT, TOOL_CALL, STATE, INTERRUPT)│
│ • Bidirectional state sync (SNAPSHOT + DELTA) │
│ • Human-in-the-loop (INTERRUPT → approval flow) │
└────────────────────────┬─────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────┐
│ [bold magenta]AGENT RUNTIME[/] (LangGraph, CrewAI, custom, etc.) │
│ • Generates A2UI surfaces (Generative UI) │
│ • Manages shared state │
│ • Orchestrates sub-agents via A2A protocol │
│ • Accesses tools via MCP protocol │
└────────────────────────┬─────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────┐
│ [bold green]LLM BACKBONE[/] (GPT, Claude, Gemini, etc.) │
│ • Generates component trees as structured output │
│ • Reasons about UI patterns per context │
│ • Streams tokens for real-time rendering │
└──────────────────────────────────────────────────────┘
[dim]Protocol roles:
AG-UI = Agent
User (streaming events, state, HITL)
A2UI = Agent → UI (declarative component specs)
A2A = Agent → Agent (delegation, sub-agents)
MCP = Agent → Tools (function calling, context)[/]
""", title="[bold]Architecture Reference", border_style="cyan"))
ref = Table(title="Agentic UI Concepts — Quick Reference", box=box.DOUBLE_EDGE, show_lines=True)
ref.add_column("Concept", style="bold cyan", width=22)
ref.add_column("What It Does", style="white", width=35)
ref.add_column("Key Mechanism", style="yellow", width=28)
ref.add_row("AG-UI Events", "Stream agent actions to frontend in real-time", "SSE/WebSocket + ~16 event types")
ref.add_row("A2UI Components", "Declarative UI trees — safe, portable, native", "Flat JSON + widget registry")
ref.add_row("State Sync", "Keep agent & UI state in lockstep", "STATE_SNAPSHOT + STATE_DELTA")
ref.add_row("Generative UI", "LLM generates UI at runtime, not just text", "A2UI JSON as structured output")
ref.add_row("INTERRUPT (HITL)", "Pause execution for human approval", "INTERRUPT event → approval flow")
ref.add_row("Incremental Update","Modify live surfaces without full regeneration", "A2UI updateComponents message")
ref.add_row("Data Binding", "UI reads from a shared data model", "JSON Pointer paths (/path/to/val)")
ref.add_row("Widget Registry", "Client maps abstract types to native widgets", "Catalog of trusted components")
console.print(ref)
console.print(Panel(
"[bold green]Tutorial complete![/]\n\n"
"[dim]What you built:[/]\n"
" • A full AG-UI event system with all 16 event types\n"
" • An A2UI renderer with flat adjacency-list components + data binding\n"
" • LLM-powered Generative UI that creates interfaces from natural language\n"
" • Bidirectional state sync with JSON Patch deltas\n"
" • Human-in-the-loop interrupt and approval flows\n"
" • Incremental live surface updates\n\n"
"[dim]To go further:[/]\n"
" • Serve AG-UI events over real SSE with FastAPI\n"
" • Connect to CopilotKit React components for a real frontend\n"
" • Use Pydantic AI's AGUIAdapter for production agent hosting\n"
" • Add A2A protocol for multi-agent delegation\n"
" • Deploy on AWS Bedrock AgentCore with native AG-UI support",
title="[bold]
What's Next?",
border_style="green",
padding=(1, 2),
))
We close with a visual protocol stack diagram showing exactly how AG-UI, A2UI, A2A, and MCP fit together in the modern agentic architecture, from the LLM backbone at the bottom to the native UI at the top. We provide a quick-reference table mapping every concept we built, event streaming, component trees, state sync, generative UI, interrupts, incremental updates, data binding, and widget registries, to their core mechanisms. We point the way forward to production: serving AG-UI events over real SSE with FastAPI, connecting to CopilotKit React components, using Pydantic AI’s AGUIAdapter, and deploying on AWS Bedrock AgentCore.
In conclusion, we have a fully functional Agentic UI pipeline that takes a simple natural-language query and transforms it into a structured, interactive interface powered by an intelligent agent. We do not just assemble components; we understand how each layer operates and connects, from real-time AG-UI event streaming and declarative A2UI interface definitions to state synchronization through JSON Patch and enforced human-in-the-loop safety mechanisms. This clarity allows us to reason about system behavior, debug effectively, and extend functionality without relying on black-box abstractions. Also, we leave with the ability to design our own agent-driven UI systems, adapt them to different use cases, and confidently build production-ready experiences where agents and interfaces evolve together in a controlled, transparent, and scalable manner.
Check out the Full Codes with Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post A Coding Deep Dive into Agentic UI, Generative UI, State Synchronization, and Interrupt-Driven Approval Flows appeared first on MarkTechPost.
Credit: Source link

























