#!/opt/cloudlinux/venv/bin/python3
# Copyright Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
lvestats-plugin-runner -- long-lived subprocess host for Python LveStatsPlugin classes.

The Rust daemon (lvestats-server) spawns ONE runner for the whole daemon lifetime
and drives it over stdin/stdout with line-delimited JSON. Every Python plugin
discovered in the plugins directory is hosted as an instance inside this single
process, so plugin state (counters, caches, DB engines) survives across ticks,
matching the semantics of the Python lve-stats daemon's in-process executor.

Protocol (one request per line, one response per line, stderr = logs):

    {"op":"list","plugins_dir":"<dir>"}
        -> {"status":"ok","plugins":[{"file":..,"class":..,"order":..,"period":..,"timeout":..}, ...]}

    {"op":"init","class":"<Name>","plugin_config":{...},"db_url":"..."|null,"is_user_plugin":true}
        -> {"status":"ok"} | {"status":"error","message":..,"traceback":..}

    {"op":"execute","class":"<Name>","now":<float>,"lve_data":{...}}
        -> {"status":"ok","lve_data":{...}}
         | {"status":"terminated"}
         | {"status":"error","message":..,"traceback":..}

    {"op":"shutdown"}
        -> {"status":"ok"}   (process exits after writing response)

SIGUSR1 dumps stack traces of all threads to stderr.
SIGUSR2 is ignored outside of execute() and raises LveStatsPluginTerminated inside,
so the Rust daemon can interrupt a sleeping plugin during graceful shutdown.
"""

import importlib.util
import inspect
import io
import json
import os
import signal
import sys
import traceback


# Import the real base class and termination exception from the installed
# lve-stats package so a plugin's `except LveStatsPluginTerminated:` catches
# the same class our SIGUSR2 handler raises. Fall back to local stubs so the
# runner still works on systems where the package is not installed
# (e.g. development / CI environments that rely only on this runner).
try:
    from lvestats.core.plugin import LveStatsPlugin, LveStatsPluginTerminated  # noqa: F401
except ImportError:
    class LveStatsPluginTerminated(Exception):
        pass

    class LveStatsPlugin:
        order = 0
        period = None
        timeout = None

        def execute(self, lve_data):
            pass

        def set_config(self, config):
            pass

        def set_db_engine(self, engine):
            pass


# ---------------------------------------------------------------------------
# lve_data hydrate/dehydrate
#
# Convert `lve_data` between the Rust JSON shape and the rich Python object
# shape that user plugins ported from legacy lve-stats expect.
#
# Legacy lve-stats (Python-only) passed `lve_data` containing rich objects:
#
#     lve_data['stats'][uid]         -> LVEStat               (lvestat module)
#     lve_data['old_stats'][uid]     -> LVEStat
#     lve_data['lve_usage'][uid]     -> AggregatedLveUsage    (lvestats.plugins.generic.aggregators)
#     lve_data['lve_usages_5s'][i]   -> {uid: LVEUsage}       (lvestats.plugins.generic.analyzers)
#
# In lve-stats3 the daemon serializes `LveData` to JSON and we receive a
# plain dict of dicts with string keys. Several field names also differ:
# `ProcLveEntry` and `AggregatedLveUsage` use `*_limit`-suffixed names
# instead of legacy short names like `cpu`, `lep`, `lmem`, `io`. We wrap
# each entry into the original legacy class so plugin code that does
# `isinstance(s, LVEStat)`, `usage.has_interesting_values()`, or simply
# attribute reads/writes keeps working unchanged. The runner is shebanged
# to `/opt/cloudlinux/venv/bin/python3`, where `lvestat` and `lvestats`
# are always installed; we import them at module load and fail fast if
# missing.
#
# Two of the three classes need a small extension because the Rust struct
# carries fields legacy didn't model:
#
#     * LVEStat              + cpu_fault, io_fault, iops_fault, cpu_max_limit
#     * AggregatedLveUsage   + id, count, created
#     * LVEUsage             — no extension (slots match Rust keys 1:1)
#
# The subclasses keep `isinstance(_, LVEStat)` etc. true for plugin code
# that does type checks.
#
# Errors are non-fatal: malformed values are passed through unconverted
# with a warning to stderr — a single bad key from one plugin must not
# corrupt the pipeline for the rest. Matches the policy in
# `crates/lvestats-core/src/types.rs::lve_data_from_python_json`.
# ---------------------------------------------------------------------------

from lvestat import LVEStat  # noqa: E402
from lvestats.plugins.generic.analyzers import LVEUsage, LVEUSAGESLOTS  # noqa: E402
from lvestats.plugins.generic.aggregators import AggregatedLveUsage  # noqa: E402


# Field-name translation tables (Rust JSON key <-> legacy slot name)

# `LveStat` (Rust ProcLveEntry) <-> legacy `LVEStat`
_LVESTAT_RUST_TO_LEGACY = {
    "cpu_limit":    "cpu",       # legacy: cpu = CPU limit
    "mep_limit":    "lep",
    "mem_limit":    "lmem",
    "io_limit":     "io",
    "memphy_limit": "lmemphy",
    "nproc_limit":  "lnproc",
    "iops_limit":   "liops",
}
_LVESTAT_LEGACY_TO_RUST = {v: k for k, v in _LVESTAT_RUST_TO_LEGACY.items()}

# `LveUsage` (Rust) <-> `LVEUsage` (legacy) — names match 1:1, no table needed.

# `AggregatedLveUsage` (Rust DB shape) <-> `AggregatedLveUsage` (legacy LVEUSAGESLOTS)
_AGG_RUST_TO_LEGACY = {
    "cpu":       "cpu_usage",    # rust: avg cpu / legacy: cpu_usage
    "cpu_limit": "lcpu",
    "mep_limit": "lep",
    "io":        "io_usage",     # rust: avg io / legacy: io_usage
    "io_limit":  "io",           # rust: io limit / legacy: io
    "mem":       "mem_usage",
    "mem_limit": "lmem",
}
_AGG_LEGACY_TO_RUST = {v: k for k, v in _AGG_RUST_TO_LEGACY.items()}


# Real-class extensions: add Rust-only fields that legacy slots don't model.

# Slots on LVEStat we DON'T serialize back to the Rust ProcLveEntry shape:
# helper-method refs, plus legacy-only fields with no Rust counterpart.
_LVESTAT_NON_WIRE_SLOTS = frozenset((
    "_get_attributes", "_set_attributes",
    "ncpu", "lcpuw",
))


class _LVEStatExtended(LVEStat):
    """Real `LVEStat` extended with the Rust ProcLveEntry fields legacy
    didn't model: per-resource `*_fault` counters and `cpu_max_limit`.
    Plugins still see `isinstance(s, LVEStat) == True`."""

    __slots__ = ("cpu_fault", "io_fault", "iops_fault", "cpu_max_limit")

    def __init__(self, version=8):
        # `LVEStat(None, version)` zero-inits all slots; `None` means
        # "do not parse a /proc/lve line" (the documented escape hatch).
        LVEStat.__init__(self, None, version)
        self.cpu_fault = 0
        self.io_fault = 0
        self.iops_fault = 0
        self.cpu_max_limit = 0


# Slots emitted on dehydrate — base LVEStat slots minus internals/legacy-only,
# plus the four extended slots.
_LVESTAT_WIRE_SLOTS = tuple(
    s for s in LVEStat.__slots__ if s not in _LVESTAT_NON_WIRE_SLOTS
) + _LVEStatExtended.__slots__


# Extra fields lve-stats3's AggregatedLveUsage carries that legacy
# `LVEUSAGESLOTS` doesn't model.
_AGG_EXTRA_SLOTS = ("id", "count", "created")


class _AggregatedExtended(AggregatedLveUsage):
    """Real `AggregatedLveUsage` extended with the lve-stats3 DB-shape
    fields (`id`, `count`, `created`)."""

    __slots__ = _AGG_EXTRA_SLOTS

    def __init__(self, lve_version=8):
        AggregatedLveUsage.__init__(self, lve_version=lve_version)
        self.id = 0
        self.count = 0
        self.created = 0


_AGG_WIRE_SLOTS = tuple(LVEUSAGESLOTS) + _AGG_EXTRA_SLOTS


# Legacy scratch keys user plugins read/write but Rust doesn't model.
SCRATCH_KEYS = ("old_now", "lve_usages", "lve_active_ids")


def _hydrate_warn(msg):
    sys.stderr.write("Warning: lvestats-plugin-runner: " + msg + "\n")


def _make_lve_stat(d):
    """Build a (extended) `LVEStat` instance from a Rust ProcLveEntry dict."""
    obj = _LVEStatExtended(version=8)
    for rust_key, val in d.items():
        legacy_attr = _LVESTAT_RUST_TO_LEGACY.get(rust_key, rust_key)
        try:
            setattr(obj, legacy_attr, val)
        except AttributeError:
            _hydrate_warn("LVEStat: unknown field %r dropped during hydrate" % (rust_key,))
    return obj


def _lve_stat_to_dict(obj):
    """Serialize an `LVEStat` (or extended subclass) to a Rust ProcLveEntry dict."""
    out = {}
    for slot in _LVESTAT_WIRE_SLOTS:
        try:
            val = getattr(obj, slot)
        except AttributeError:
            continue
        if val is None:
            continue
        rust_key = _LVESTAT_LEGACY_TO_RUST.get(slot, slot)
        out[rust_key] = val
    return out


def _make_lve_usage(d):
    """Build an `LVEUsage` from a Rust LveUsage dict (slots match 1:1)."""
    obj = LVEUsage(lve_version=d.get("lve_version", 8))
    for k, val in d.items():
        try:
            setattr(obj, k, val)
        except AttributeError:
            _hydrate_warn("LVEUsage: unknown field %r dropped during hydrate" % (k,))
    return obj


def _lve_usage_to_dict(obj):
    """Serialize an `LVEUsage` to a Rust LveUsage dict (1:1, no rename)."""
    out = {}
    for slot in LVEUSAGESLOTS:
        try:
            val = getattr(obj, slot)
        except AttributeError:
            continue
        if val is None:
            continue
        out[slot] = val
    return out


def _make_aggregated(d):
    """Build a (extended) `AggregatedLveUsage` from a Rust dict."""
    obj = _AggregatedExtended(lve_version=d.get("lve_version", 8))
    for rust_key, val in d.items():
        legacy_attr = _AGG_RUST_TO_LEGACY.get(rust_key, rust_key)
        try:
            setattr(obj, legacy_attr, val)
        except AttributeError:
            _hydrate_warn("AggregatedLveUsage: unknown field %r dropped during hydrate"
                          % (rust_key,))
    return obj


def _aggregated_to_dict(obj):
    """Serialize an `AggregatedLveUsage` to a Rust AggregatedLveUsage dict."""
    out = {}
    for slot in _AGG_WIRE_SLOTS:
        try:
            val = getattr(obj, slot)
        except AttributeError:
            continue
        if val is None:
            continue
        rust_key = _AGG_LEGACY_TO_RUST.get(slot, slot)
        out[rust_key] = val
    return out


def _to_int_uid(k):
    if isinstance(k, int):
        return k
    try:
        return int(k)
    except (TypeError, ValueError):
        return None


def _hydrate_uid_map(d, builder, label):
    """Hydrate {uid_str: dict} into {int(uid): rich_object}."""
    if not isinstance(d, dict):
        _hydrate_warn("expected dict for %s, got %s" % (label, type(d).__name__))
        return d
    out = {}
    for k, v in d.items():
        uid = _to_int_uid(k)
        if uid is None:
            _hydrate_warn("non-int %s key %r dropped during hydrate" % (label, k))
            continue
        if isinstance(v, dict):
            out[uid] = builder(v)
        else:
            _hydrate_warn("non-dict %s entry for uid %r passed through" % (label, k))
            out[uid] = v
    return out


def hydrate(lve_data, scratch=None):
    """Convert the JSON-decoded `lve_data` into a rich-object form for plugins.

    - `stats` / `old_stats`: {uid_str: dict} -> {int(uid): LVEStat}
    - `lve_usage`:           {uid_str: dict} -> {int(uid): AggregatedLveUsage}
    - `lve_usages_5s`:       {uid_str: dict} -> [{int(uid): LVEUsage}]
                              (1-element list to match legacy aggregator API)
    - `faults`, `users`, `dbgov_data`, scalars: passed through unchanged
    - `scratch` (legacy keys persisted across ticks): merged in if not present

    Returns a *new* dict; the caller's `lve_data` is not mutated.
    """
    if not isinstance(lve_data, dict):
        return lve_data

    out = dict(lve_data)

    if "stats" in out:
        out["stats"] = _hydrate_uid_map(out["stats"], _make_lve_stat, "stats")
    if "old_stats" in out:
        out["old_stats"] = _hydrate_uid_map(
            out["old_stats"], _make_lve_stat, "old_stats"
        )
    if "lve_usage" in out:
        out["lve_usage"] = _hydrate_uid_map(
            out["lve_usage"], _make_aggregated, "lve_usage"
        )
    if "lve_usages_5s" in out:
        flat = _hydrate_uid_map(out["lve_usages_5s"], _make_lve_usage, "lve_usages_5s")
        # Promote to legacy list-of-dicts shape so legacy aggregator code
        # (`for it in lve_data['lve_usages_5s']: for uid, usage in it.items()`)
        # iterates correctly.
        out["lve_usages_5s"] = [flat] if isinstance(flat, dict) else flat

    if scratch:
        for k, v in scratch.items():
            out.setdefault(k, v)

    return out


def _unwrap_stats_value(v):
    if isinstance(v, LVEStat):
        return _lve_stat_to_dict(v)
    return v


def _unwrap_lve_usage_value(v):
    # AggregatedLveUsage is a subclass of LVEUsage — check it first so we
    # serialize the Rust DB-shape keys (cpu, mem, io with *_limit twins).
    if isinstance(v, AggregatedLveUsage):
        return _aggregated_to_dict(v)
    if isinstance(v, LVEUsage):
        # Plugin assigned a bare LVEUsage where AggregatedLveUsage was
        # expected — best-effort: serialize the LVEUsage portion via the
        # AggregatedLveUsage key namespace.
        return _aggregated_to_dict(v)
    return v


def _unwrap_lve_usages_5s_value(v):
    if isinstance(v, LVEUsage):
        return _lve_usage_to_dict(v)
    return v


def _dehydrate_uid_map(d, unwrap, label):
    if not isinstance(d, dict):
        _hydrate_warn("expected dict for %s during dehydrate, got %s"
                      % (label, type(d).__name__))
        return d
    out = {}
    for k, v in d.items():
        out[str(k)] = unwrap(v)
    return out


def dehydrate(lve_data, scratch=None):
    """Convert plugin-mutated `lve_data` back to the JSON shape Rust expects.

    Inverse of `hydrate`. Strips `SCRATCH_KEYS` into `scratch` (if provided)
    so they survive into the next tick's hydrate but don't reach Rust.
    """
    if not isinstance(lve_data, dict):
        return lve_data

    out = dict(lve_data)

    if "stats" in out:
        out["stats"] = _dehydrate_uid_map(out["stats"], _unwrap_stats_value, "stats")
    if "old_stats" in out:
        out["old_stats"] = _dehydrate_uid_map(
            out["old_stats"], _unwrap_stats_value, "old_stats"
        )
    if "lve_usage" in out:
        out["lve_usage"] = _dehydrate_uid_map(
            out["lve_usage"], _unwrap_lve_usage_value, "lve_usage"
        )

    if "lve_usages_5s" in out:
        v = out["lve_usages_5s"]
        if isinstance(v, list):
            tail = v[-1] if v else {}
            if isinstance(tail, dict):
                out["lve_usages_5s"] = _dehydrate_uid_map(
                    tail, _unwrap_lve_usages_5s_value, "lve_usages_5s"
                )
            else:
                out["lve_usages_5s"] = {}
        elif isinstance(v, dict):
            out["lve_usages_5s"] = _dehydrate_uid_map(
                v, _unwrap_lve_usages_5s_value, "lve_usages_5s"
            )
        # Anything else: pass through; Rust deserializer skips on type mismatch.

    for k in SCRATCH_KEYS:
        if k in out:
            if scratch is not None:
                scratch[k] = out[k]
            del out[k]

    return out


# ---------------------------------------------------------------------------
# Plugin host
# ---------------------------------------------------------------------------


# Schema-level SQLAlchemy exceptions we treat specially so the Rust daemon
# can recreate the schema and retry (mirrors Python daemon's recover_db()
# path in `lvestats/eventloop/plugin_executors.py:106-114`). If SQLAlchemy
# isn't installed, the tuple is empty and no classification happens.
try:
    from sqlalchemy.exc import (
        NoSuchColumnError, NoSuchTableError, NoReferenceError,
    )
    _DB_SCHEMA_ERRORS = (NoSuchColumnError, NoSuchTableError, NoReferenceError)
except ImportError:
    _DB_SCHEMA_ERRORS = ()


# Single-threaded request loop — no locking needed.
_classes = {}    # class_name -> class object, populated by op_list
_instances = {}  # class_name -> plugin instance, populated by op_init
# Per-instance scratch dict for legacy keys (`old_now`, `lve_usages`,
# `lve_active_ids`) that user plugins read/write but Rust doesn't model.
# Preserved across ticks so legacy semantics hold (e.g. LVEUsageAnalyzer
# stores `self.now` in `lve_data['old_now']` for the next tick to read).
_scratch = {}    # class_name -> dict of legacy scratch keys


def _sigusr1_handler(signum, frame):
    lines = ["--- lvestats-plugin-runner thread traces ---"]
    for tid, stack in sys._current_frames().items():
        lines.append("# Thread %d" % tid)
        lines.extend(traceback.format_stack(stack))
    sys.stderr.write("\n".join(lines) + "\n")
    sys.stderr.flush()


def _sigusr2_handler(signum, frame):
    raise LveStatsPluginTerminated("SIGUSR2")


def _is_broken_symlink(path):
    return os.path.islink(path) and not os.path.exists(os.path.realpath(path))


def _discover_classes(plugins_dir):
    """Import every .py file in plugins_dir and collect all LveStatsPlugin subclasses.

    Mirrors lvestats.core.plugin_loader.PluginLoader: loads every subclass by
    isinstance check (not by filename), registers modules in sys.modules so
    sibling imports work, and skips broken symlinks with a warning.
    """
    descriptors = []
    classes = {}

    if not os.path.isdir(plugins_dir):
        sys.stderr.write("Warning: plugins directory not found: %s\n" % plugins_dir)
        return descriptors, classes

    for filename in sorted(os.listdir(plugins_dir)):
        if not filename.endswith(".py"):
            continue
        full_path = os.path.join(plugins_dir, filename)
        if _is_broken_symlink(full_path):
            sys.stderr.write("Warning: skipping broken symlink %s\n" % full_path)
            continue

        module_name = filename[:-3]
        try:
            spec = importlib.util.spec_from_file_location(module_name, full_path)
            if spec is None or spec.loader is None:
                continue
            module = importlib.util.module_from_spec(spec)
            # Register before exec so sibling modules can find each other.
            sys.modules[module_name] = module
            spec.loader.exec_module(module)
        except Exception as exc:
            sys.stderr.write("Warning: failed to import %s: %s\n" % (full_path, exc))
            sys.modules.pop(module_name, None)
            continue

        for name, obj in inspect.getmembers(module, inspect.isclass):
            if not issubclass(obj, LveStatsPlugin) or obj is LveStatsPlugin:
                continue
            # Skip classes re-exported from other modules (match plugin_loader.py).
            if getattr(obj, "__module__", None) != module_name:
                continue
            classes[name] = obj
            descriptors.append({
                "file": filename,
                "class": name,
                "order": getattr(obj, "order", 0),
                "period": getattr(obj, "period", None),
                "timeout": getattr(obj, "timeout", None),
            })

    return descriptors, classes


def op_list(req):
    plugins_dir = req.get("plugins_dir")
    if not plugins_dir:
        return {"status": "error", "message": "list: plugins_dir missing"}
    descriptors, classes = _discover_classes(plugins_dir)
    _classes.clear()
    _classes.update(classes)
    # Instances tied to previously-loaded classes are no longer valid.
    _instances.clear()
    _scratch.clear()
    return {"status": "ok", "plugins": descriptors}


def op_init(req):
    class_name = req.get("class")
    if not class_name:
        return {"status": "error", "message": "init: class missing"}
    cls = _classes.get(class_name)
    if cls is None:
        return {
            "status": "error",
            "message": "class %r not loaded; send list first" % class_name,
        }

    try:
        cls.__is_user_plugin__ = bool(req.get("is_user_plugin", True))
        instance = cls()

        plugin_config = req.get("plugin_config") or {}
        if hasattr(instance, "set_config"):
            instance.set_config(plugin_config)

        db_url = req.get("db_url")
        if db_url and hasattr(instance, "set_db_engine"):
            try:
                from sqlalchemy import create_engine  # noqa: WPS433
                engine = create_engine(db_url)
                instance.set_db_engine(engine)
            except Exception as exc:
                # Plugins that actually use the engine will fail later; log and continue.
                sys.stderr.write(
                    "Warning: set_db_engine failed for %s: %s\n" % (class_name, exc)
                )

        _instances[class_name] = instance
        # Re-initializing a plugin discards any prior scratch state.
        _scratch.pop(class_name, None)
        return {"status": "ok"}
    except Exception as exc:
        return {
            "status": "error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }


def op_execute(req):
    class_name = req.get("class")
    if not class_name:
        return {"status": "error", "message": "execute: class missing"}
    instance = _instances.get(class_name)
    if instance is None:
        return {
            "status": "error",
            "message": "plugin %r not initialized; send init first" % class_name,
        }

    now = req.get("now", 0.0)
    raw_lve_data = req.get("lve_data") or {}

    try:
        instance.now = now
    except Exception:
        pass

    # Hydrate dict -> rich-object form so user plugins ported from legacy
    # lve-stats see the LVEStat / LVEUsage / AggregatedLveUsage shape they
    # expect (attribute access, int uid keys, list-shaped lve_usages_5s).
    # Per-plugin scratch dict carries legacy-only keys (old_now, lve_usages,
    # lve_active_ids) across ticks since Rust doesn't model them.
    scratch = _scratch.setdefault(class_name, {})
    lve_data = hydrate(raw_lve_data, scratch=scratch)

    # Arm SIGUSR2 only around execute() so an out-of-band signal between
    # requests doesn't kill the runner.
    signal.signal(signal.SIGUSR2, _sigusr2_handler)
    try:
        instance.execute(lve_data)
    except LveStatsPluginTerminated:
        return {"status": "terminated"}
    except _DB_SCHEMA_ERRORS as exc:
        # Tell the Rust side to run ensure_schema() and respawn us; matches
        # the `recover_db()` flow in Python lve-stats
        # (plugin_executors.py:106-114 + plugin_context.py:34-35).
        return {
            "status": "db_schema_error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }
    except Exception as exc:
        return {
            "status": "error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }
    finally:
        signal.signal(signal.SIGUSR2, signal.SIG_IGN)

    # Convert rich objects back to JSON-serializable form. Legacy scratch
    # keys are pulled into `scratch` (already the same dict registered in
    # `_scratch[class_name]`) and stripped from the outbound payload.
    out_lve_data = dehydrate(lve_data, scratch=scratch)
    return {"status": "ok", "lve_data": out_lve_data}


def op_shutdown(_req):
    return {"status": "ok", "_exit": True}


_DISPATCH = {
    "list": op_list,
    "init": op_init,
    "execute": op_execute,
    "shutdown": op_shutdown,
}


def main():
    # Isolate the JSON-RPC channel from anything a plugin (or a library it
    # imports) might write to stdout. The Rust daemon reads this process's
    # stdout line-by-line and parses each line as JSON. A single stray
    # `print(...)`, `os.write(1, ...)`, or C-extension `fprintf(stdout, ...)`
    # corrupts the stream. Clone fd 1 to a private fd used only for protocol
    # writes, then point fd 1 at fd 2 (stderr) so all other writes — including
    # those from C extensions and subprocesses that inherit fd 1 — land in
    # stderr, which the daemon already drains into its logs. Done here (not
    # at import time) so the module can be loaded by tests without clobbering
    # the test process's stdout.
    global _protocol
    _protocol_fd = os.dup(1)
    os.dup2(2, 1)
    _protocol = io.TextIOWrapper(
        os.fdopen(_protocol_fd, "wb", buffering=0),
        encoding="utf-8",
        write_through=True,
    )
    # Rebind Python-level stdout too so `print()` without an explicit `file=`
    # goes to stderr instead of leaving the stream partly redirected.
    sys.stdout = sys.stderr

    signal.signal(signal.SIGUSR1, _sigusr1_handler)
    signal.signal(signal.SIGUSR2, signal.SIG_IGN)

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError as exc:
            resp = {"status": "error", "message": "invalid json: %s" % exc}
            _protocol.write(json.dumps(resp) + "\n")
            _protocol.flush()
            continue

        op = req.get("op")
        handler = _DISPATCH.get(op)
        if handler is None:
            resp = {"status": "error", "message": "unknown op %r" % op}
        else:
            try:
                resp = handler(req)
            except Exception as exc:
                resp = {
                    "status": "error",
                    "message": str(exc),
                    "traceback": traceback.format_exc(),
                }

        exit_after = resp.pop("_exit", False)
        _protocol.write(json.dumps(resp) + "\n")
        _protocol.flush()
        if exit_after:
            sys.exit(0)


if __name__ == "__main__":
    main()
