#!/usr/bin/env python3 """bio-tui — Block I/O 生命周期可视化 TUI 启动内核追踪管线并实时渲染 I/O 生命周期仪表盘。 用法: ./bio-tui # 自动启动 bpftrace → bio-state 管线 ./bio-tui --replay trace.log # 回放录制的追踪日志 """ import sys import os import json import math import signal import threading import subprocess from collections import deque, defaultdict from textual.app import App, ComposeResult from textual.containers import Horizontal from textual.widgets import Static # ── Data Model ──────────────────────────────────────────────────────────── class IOModel: """I/O 状态模型.""" def __init__(self): self.pending = 0 self.completed = 0 self.bytes_read = 0 self.bytes_written = 0 self.flushes = 0 self.errors = 0 self.active_ios: dict[str, dict] = {} self.recent_latencies: deque[dict] = deque(maxlen=200) self.latency_buckets: dict[str, int] = { "0-100\u03bcs": 0, "100-500\u03bcs": 0, "0.5-1ms": 0, "1-5ms": 0, "5-10ms": 0, "10-50ms": 0, "50ms+": 0, } self.process_stats: dict[str, dict] = defaultdict( lambda: {"count": 0, "bytes": 0, "total_lat_ns": 0} ) self.rwbs_stats: dict[str, int] = defaultdict(int) self.paused = False self.throughput_window: deque[tuple[int, int]] = deque(maxlen=100) self.status_line = "" def process_event(self, ev: dict): if self.paused: return etype = ev.get("type", "") if etype == "begin": self.active_ios[ev["key"]] = { "bytes": ev["bytes"], "rwbs": ev["rwbs"], "comm": ev["comm"], } self.pending += 1 elif etype == "end": key = ev["key"] if key in self.active_ios: info = self.active_ios.pop(key) self.pending -= 1 else: info = { "bytes": ev.get("bytes", 0), "rwbs": ev.get("rwbs", ""), "comm": ev.get("comm", ""), } self.completed += 1 rwbs = info.get("rwbs", "") latency_ns = ev.get("latency_ns", 0) self.recent_latencies.append({ "rwbs": rwbs, "bytes": info["bytes"], "latency_ns": latency_ns, "comm": info["comm"], "error": ev.get("error", 0), "is_flush": ev.get("is_flush", False), }) self.throughput_window.append((ev["ts"], info["bytes"])) lat_us = latency_ns / 1000.0 if lat_us <= 100: self.latency_buckets["0-100\u03bcs"] += 1 elif lat_us <= 500: self.latency_buckets["100-500\u03bcs"] += 1 elif lat_us <= 1000: self.latency_buckets["0.5-1ms"] += 1 elif lat_us <= 5000: self.latency_buckets["1-5ms"] += 1 elif lat_us <= 10000: self.latency_buckets["5-10ms"] += 1 elif lat_us <= 50000: self.latency_buckets["10-50ms"] += 1 else: self.latency_buckets["50ms+"] += 1 if "R" in rwbs and "W" not in rwbs: self.bytes_read += info["bytes"] elif "W" in rwbs: self.bytes_written += info["bytes"] if ev.get("error", 0) != 0: self.errors += 1 if ev.get("is_flush", False) or "F" in rwbs: self.flushes += 1 comm = info["comm"] self.process_stats[comm]["count"] += 1 self.process_stats[comm]["bytes"] += info["bytes"] self.process_stats[comm]["total_lat_ns"] += latency_ns self.rwbs_stats[rwbs] += 1 elif etype == "requeue": key = ev["key"] if key in self.active_ios: self.pending -= 1 del self.active_ios[key] def get_throughput_bytes_per_sec(self) -> float: if len(self.throughput_window) < 2: return 0.0 first_ts = self.throughput_window[0][0] last_ts = self.throughput_window[-1][0] if last_ts <= first_ts: return 0.0 duration_s = (last_ts - first_ts) / 1e9 if duration_s < 0.001: return 0.0 total_bytes = sum(b for _, b in self.throughput_window) return total_bytes / duration_s def get_top_processes(self, n=8) -> list[tuple[str, dict]]: return sorted( self.process_stats.items(), key=lambda x: x[1]["count"], reverse=True, )[:n] def reset(self): self.__init__() # ── TUI Widgets ──────────────────────────────────────────────────────────── def _fmt_bytes(b: int) -> str: if b >= 1e9: return f"{b/1e9:.1f}G" if b >= 1e6: return f"{b/1e6:.1f}M" if b >= 1e3: return f"{b/1e3:.1f}K" return str(int(b)) class StatsBar(Static): def render(self): m = self.app.model tp = m.get_throughput_bytes_per_sec() if tp > 1e6: tp_str = f"{tp/1e6:.1f} MB/s" elif tp > 1e3: tp_str = f"{tp/1e3:.1f} KB/s" else: tp_str = f"{tp:.0f} B/s" pause = " [bold red]PAUSED[/]" if m.paused else "" return ( f"[bold] Block I/O 生命周期[/] " f"等待:{m.pending} 完成:{m.completed} " f"错误:{m.errors} 刷新:{m.flushes} " f"{tp_str} " f"R:{_fmt_bytes(m.bytes_read)} W:{_fmt_bytes(m.bytes_written)}" f"{pause}" ) class Waterfall(Static): def render(self): items = list(self.app.model.recent_latencies) if not items: return "[dim] 等待 I/O 事件... (另一终端运行 make load)[/]" w = max(10, self.size.width - 3) h = max(6, self.size.height - 2) recent = items[-w:] max_lat_ns = max((r["latency_ns"] for r in recent), default=1) max_lat_ns = max(max_lat_ns, 1000) lines = [f"[bold] I/O 延迟瀑布图 (最近{len(recent)})[/]"] log_max = math.log10(max_lat_ns) if max_lat_ns > 1 else 1 for row in range(h - 2, -1, -1): chars = [] for r in recent: lat = r["latency_ns"] if lat <= 0: norm = 0 else: norm = max(0, min(1, math.log10(lat) / log_max)) bar_h = int(norm * (h - 2)) if row < bar_h: rwbs = r["rwbs"] if "F" in rwbs: c = "magenta" elif "W" in rwbs: c = "yellow" elif "R" in rwbs: c = "cyan" else: c = "white" chars.append(f"[{c}]\u2588[/]") else: chars.append(" ") lines.append(" " + "".join(chars)) lines.append("[dim] " + "\u2500" * len(recent) + " \u25b6[/]") return "\n".join(lines) class LatencyHistogram(Static): def render(self): m = self.app.model buckets = m.latency_buckets total = sum(buckets.values()) if total == 0: return "[bold]延迟分布[/]\n [dim]等待数据...[/]" max_count = max(buckets.values()) or 1 bar_w = 16 lines = ["[bold]延迟分布[/]"] for label in buckets: count = buckets[label] pct = count / total * 100 filled = int(count / max_count * bar_w) bar = "\u2588" * filled + "\u2591" * (bar_w - filled) lines.append(f" {label:<10s} {bar} {pct:5.1f}%") return "\n".join(lines) class ProcessPanel(Static): def render(self): m = self.app.model top = m.get_top_processes(10) if not top: return "[bold]进程 I/O[/]\n [dim]等待数据...[/]" bar_w = 14 max_count = max(c for _, d in top for c in [d["count"]]) or 1 lines = ["[bold]进程 I/O[/]"] for comm, stats in top: count = stats["count"] filled = int(count / max_count * bar_w) bar = "\u2588" * filled + "\u2591" * (bar_w - filled) avg_lat = stats["total_lat_ns"] / count / 1000 if count > 0 else 0 lines.append(f" {comm:<14s} {bar} {count:5d} avg {avg_lat:.0f}\u03bcs") return "\n".join(lines) class RWBSPanel(Static): _LABELS = { "R": "Read", "W": "Write", "RM": "Read Meta", "WM": "Write Meta", "WS": "Write Sync", "WSM": "Write Sync Meta", "RA": "Read Ahead", "FF": "Flush", } def render(self): m = self.app.model stats = m.rwbs_stats if not stats: return "[bold]I/O 类型[/]\n [dim]等待数据...[/]" total = sum(stats.values()) or 1 sorted_s = sorted(stats.items(), key=lambda x: x[1], reverse=True)[:8] bar_w = 12 lines = ["[bold]I/O 类型[/]"] for rwbs, count in sorted_s: label = self._LABELS.get(rwbs, rwbs) pct = count / total * 100 filled = int(pct / 100 * bar_w) bar = "\u2588" * filled + "\u2591" * (bar_w - filled) lines.append(f" {label:<14s} {bar} {pct:5.1f}%") return "\n".join(lines) # ── Pipeline 管理 ──────────────────────────────────────────────────────── class Pipeline: """管理 bpftrace → bio-state 子进程管线.""" def __init__(self, replay_file: str | None = None): self.replay_file = replay_file self.proc: subprocess.Popen | None = None self._running = True def start(self) -> int | None: """启动管线,返回管道读端 fd.""" script_dir = os.path.dirname(os.path.abspath(__file__)) bio_state = os.path.join(script_dir, "bio-state") bio_trace = os.path.join(script_dir, "bio-trace.bt") if self.replay_file: # 回放模式:cat FILE | bio-state self.proc = subprocess.Popen( f"cat {self.replay_file} | {bio_state}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) else: # 实时追踪模式:sudo bpftrace | bio-state self.proc = subprocess.Popen( f"sudo bpftrace {bio_trace} | {bio_state}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) return self.proc.stdout.fileno() def stop(self): self._running = False if self.proc: try: os.killpg(os.getpgid(self.proc.pid), signal.SIGTERM) except (ProcessLookupError, OSError): pass self.proc.terminate() try: self.proc.wait(timeout=2) except subprocess.TimeoutExpired: self.proc.kill() # ── App ──────────────────────────────────────────────────────────────────── class BioTUI(App): """Block I/O 生命周期 TUI 应用.""" CSS = """ #stats-bar { height: 1; background: $surface; padding: 0 1; } #waterfall { height: 1fr; background: $surface-darken-1; min-height: 10; padding: 1; } #bottom-panels { height: 14; } #latency { width: 1fr; border: solid $primary; padding: 1; } #process { width: 1fr; border: solid $primary; padding: 1; } #rwbs { width: 1fr; border: solid $primary; padding: 1; } """ def __init__(self, replay_file: str | None = None): super().__init__() self.model = IOModel() self.pipeline = Pipeline(replay_file=replay_file) self._running = True self._pipe_fd: int | None = None self._reader_thread: threading.Thread | None = None def compose(self) -> ComposeResult: yield StatsBar("", id="stats-bar") yield Waterfall("", id="waterfall") with Horizontal(id="bottom-panels"): yield LatencyHistogram("", id="latency") yield ProcessPanel("", id="process") yield RWBSPanel("", id="rwbs") def on_mount(self) -> None: # 启动追踪管线 try: self._pipe_fd = self.pipeline.start() except Exception as e: self.model.status_line = f"启动失败: {e}" # 启动管道读取线程 self._reader_thread = threading.Thread(target=self._read_pipe, daemon=True) self._reader_thread.start() # 定期刷新 UI self.set_interval(0.2, self._periodic_refresh) def _read_pipe(self): """后台线程:读取管线 stdout,通过 call_from_thread 交付 UI.""" if self._pipe_fd is None: return try: with os.fdopen(self._pipe_fd, "rb", closefd=True) as f: for line in f: if not self._running: break line = line.decode("utf-8", errors="replace").strip() if not line: continue try: ev = json.loads(line) except json.JSONDecodeError: continue if ev.get("type") == "ready": continue self.call_from_thread(self._on_event, ev) except (OSError, ValueError): pass def _on_event(self, ev: dict): """在主线程中处理 I/O 事件.""" self.model.process_event(ev) def _periodic_refresh(self): """定期刷新 UI.""" for w in self.query(Static): w.refresh() def on_key(self, event) -> None: key = event.key if key == "q": self._running = False self.pipeline.stop() self.exit() elif key == "p": self.model.paused = not self.model.paused elif key == "c": self.model.reset() def on_unmount(self) -> None: self._running = False self.pipeline.stop() # ── Entry Point ────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Block I/O 生命周期可视化 TUI") parser.add_argument("--replay", metavar="FILE", help="回放录制的追踪日志") args = parser.parse_args() BioTUI(replay_file=args.replay).run()