#!/usr/bin/env python3 """bio-state — Block I/O 状态机构建器 从 bpftrace 读取原始内核 trace 数据 (stdin JSON lines),关联 I/O 生命周期事件, 输出结构化的 I/O 状态变更事件。 输入: {"type":"I",...} / {"type":"D",...} / {"type":"C",...} / {"type":"R",...} 输出: {"type":"begin",...} / {"type":"end",...} / {"type":"requeue",...} 用法: sudo bpftrace bio-trace.bt | python3 bio-state | python3 bio-tui """ import sys import json import os def build_devmap(): """从 /sys/block 建立 dev_t → name 的映射.""" devmap = {} try: for name in os.listdir("/sys/block"): dev_path = f"/sys/block/{name}/dev" if os.path.exists(dev_path): dev_str = open(dev_path).read().strip() major_minor = dev_str.split(":") dev_t = (int(major_minor[0]) << 20) | int(major_minor[1]) devmap[dev_t] = name except (OSError, IOError): pass return devmap def dev_to_name(dev_t, devmap): """将 dev_t 转换为设备名.""" return devmap.get(dev_t, f"dev-{dev_t}") def main(): devmap = build_devmap() # in_flight: dict[(dev, sector), {ts, bytes, rwbs, comm, devname}] in_flight: dict[tuple[int, int], dict] = {} seen_completions: set[tuple[int, int, int]] = set() # (dev, sector, ts) # Flush operations: keyed by (dev, "flush_N") flushes: dict[tuple[int, str], dict] = {} flush_counter = 0 for line in sys.stdin: line = line.strip() if not line: continue try: ev = json.loads(line) except json.JSONDecodeError: continue etype = ev.get("type") if etype in ("ready", "done"): continue dev = ev["dev"] sector = ev["sector"] rwbs = ev.get("rwbs", "") devname = dev_to_name(dev, devmap) ts_ns = ev["ts"] is_flush = "F" in rwbs if etype == "I": # I/O 入队 key = (dev, sector) in_flight[key] = { "insert_ts": ts_ns, "bytes": ev.get("bytes", 0), "rwbs": rwbs, "comm": ev.get("comm", ""), "devname": devname, } elif etype == "D": # I/O 派发到驱动 if is_flush and sector in (0, 18446744073709551615): # Flush request — no corresponding insert, track directly flush_counter += 1 fkey = (dev, f"flush_{flush_counter}") flushes[fkey] = { "issue_ts": ts_ns, "devname": devname, "comm": ev.get("comm", ""), } else: key = (dev, sector) if key in in_flight: in_flight[key]["issue_ts"] = ts_ns in_flight[key]["dispatched"] = True # 输出 begin 事件(第一次派发时) if "reported" not in in_flight[key]: in_flight[key]["reported"] = True obj = { "type": "begin", "ts": ts_ns, "dev": dev, "sector": sector, "key": f"{dev}:{sector}", "bytes": in_flight[key]["bytes"], "rwbs": rwbs, "comm": in_flight[key]["comm"], "devname": devname, } print(json.dumps(obj), flush=True) else: # 无 INSERT 的 ISSUE — 直接记录 in_flight[key] = { "insert_ts": ts_ns, "issue_ts": ts_ns, "bytes": ev.get("bytes", 0), "rwbs": rwbs, "comm": ev.get("comm", ""), "devname": devname, "dispatched": True, "reported": True, } obj = { "type": "begin", "ts": ts_ns, "dev": dev, "sector": sector, "key": f"{dev}:{sector}", "bytes": ev.get("bytes", 0), "rwbs": rwbs, "comm": ev.get("comm", ""), "devname": devname, } print(json.dumps(obj), flush=True) elif etype == "C": # I/O 完成 if is_flush and sector in (0, 18446744073709551615): # Flush 完成 — 找到最近未完成的 flush for fkey, fdata in list(flushes.items()): if fdata["devname"] == devname and "complete_ts" not in fdata: fdata["complete_ts"] = ts_ns latency_ns = ts_ns - fdata["issue_ts"] obj = { "type": "end", "ts": ts_ns, "dev": dev, "sector": -1, "key": f"{dev}:flush", "bytes": 0, "rwbs": rwbs, "latency_ns": latency_ns, "error": ev.get("error", 0), "comm": fdata.get("comm", ""), "devname": devname, "is_flush": True, } print(json.dumps(obj), flush=True) del flushes[fkey] break else: key = (dev, sector) # 防止重复完成事件 comp_tag = (dev, sector, ts_ns) if comp_tag in seen_completions: continue seen_completions.add(comp_tag) # 清理旧标签(避免无限增长) — 保留最近 100000 条 if len(seen_completions) > 100000: seen_completions.clear() if key in in_flight: info = in_flight.pop(key) issue_ts = info.get("issue_ts", info["insert_ts"]) latency_ns = ts_ns - info["insert_ts"] obj = { "type": "end", "ts": ts_ns, "dev": dev, "sector": sector, "key": f"{dev}:{sector}", "bytes": info["bytes"], "rwbs": rwbs, "latency_ns": latency_ns, "error": ev.get("error", 0), "comm": info["comm"], "devname": devname, "is_flush": False, } print(json.dumps(obj), flush=True) elif etype == "R": # I/O 重新排队 key = (dev, sector) devname = dev_to_name(dev, devmap) obj = { "type": "requeue", "ts": ts_ns, "dev": dev, "sector": sector, "key": f"{dev}:{sector}", "devname": devname, "rwbs": rwbs, } print(json.dumps(obj), flush=True) if __name__ == "__main__": main()