Skip to content

Orion Investigation — Directional Drift Guard

Vesper — findings below against Q1–Q5. A few items need rulings before I write code. Branch checked out: feat/directional-drift-guard (stacked on feat/anchor-saturation-guard).


Q1 — Fill event surface in the tick loop

Finding: no surface exists. There is no "fills this tick" list inside the tick loop. Fills enter the system via two asynchronous paths:

  • Live (reconciler) path, inside Step 5: LedgerReconciler.reconcile()_apply_partial_fill() / _apply_full_fill()ExecutionEngine.record_partial_fill() / record_full_fill()ExecutionEngine._persist_fill()StateManager.record_fill() + InventoryManager.apply_fill(). ReconciliationResult exposes only counts (partial_fills, full_fills) — no side, quantity, or timestamp. The tick loop discards per-fill identity.
  • Paper path, after Step 9: main_loop._simulate_paper_fills() runs per paper order → ExecutionEngine.simulate_paper_full_fill() → same _persist_fill(). Returns only a scalar count.

Both paths write to the fills table. fills columns available (see neo_engine/models.py:121 + state_manager.py:1194): id, order_id, created_at (ISO8601), price, quantity, session_id, mid_price_at_fill, .... fills does NOT carry side — side lives on the parent orders row, joined via fills.order_id. The relevant helpers today are get_fills_for_order(order_id) and get_fills_since(iso) — neither joins to orders.side.

Proposed surface: add one DB helper — get_fill_events_since(session_id, watermark_iso) — that joins fills ⋈ orders and returns (id, created_at, side, quantity) tuples. On each tick the guard reads everything newer than its watermark, appends to its rolling state, advances the watermark. This works uniformly for both live and paper paths, requires no changes to reconciler / execution_engine / paper fill sim, and uses the existing idx_fills_session_id index plus created_at for the range predicate.

Quantity semantics (cross-check): ExecutionEngine._persist_fill() at execution_engine.py:897–946 passes fill_delta (the incremental quantity) straight to record_fill() and also to InventoryManager.apply_fill(fill_quantity_rlusd=fill_delta). InventoryManager.apply_fill() documents the parameter as RLUSD notional (inventory_manager.py:210–234). Therefore fills.quantity is RLUSD notional, which is exactly what Condition B needs — buy_rlusd − sell_rlusd reduces to sign-weighted SUM(quantity) by orders.side.

Flag for ruling — see Q1 deviation below.

Q2 — Fill side convention

Confirmed: OrderSide.BUY = "buy" means buy XRP / pay RLUSD. Evidence:

  • inventory_manager.py:253–260 (docstring + code):
  • BUY: xrp_change = +xrp_quantity, rlusd_change = -fill_quantity_rlusd → XRP balance up, RLUSD balance down.
  • SELL: inverse.
  • S41 on-chain matches: 3 fills persisted with orders.side='buy', wallet gained XRP and lost RLUSD (confirms the convention).

For the drift guard: a "same-side burst of BUY fills" means the engine is accumulating XRP and bleeding RLUSD; a "SELL burst" is the reverse.

Q3 — Insertion point

Proposed: Step 8.5b — immediately after Step 8.5 (anchor saturation guard), before the "no intents" log and Step 9 submit.

Order-of-evaluation reasoning:

  • Live mode: fills from this tick are recorded in Step 5 (before Step 8 intents are generated). The drift guard at Step 8.5b sees them in the same tick.
  • Paper mode: fills for this tick are simulated in _simulate_paper_fills after Step 9 (main_loop:2268). Fills persisted this tick therefore become visible to the guard on the next tick (one-tick lag by construction, ~4s at current cadence). This is acceptable because the Condition A window is 30 s (≈7 ticks) and Condition B is 120 s; a single-tick lag does not change whether any of the three conditions fire.

Coexistence with the anchor guard: both guards transition to DEGRADED via _enter_degraded_mode(), which is idempotent. If the anchor guard fires first this tick and clears intents, the drift guard still runs, its state still updates from new fill events, and a simultaneous trigger emits one [DRIFT_GUARD] row + WARNING log in addition to [ANCHOR_SAT]. Independent one-shot flags keep their dedup independent. No ordering conflict.

Q4 — Time source

Proposed: time.time() (Unix epoch seconds) for all wall-clock deques, and a per-tick integer counter for Condition C.

  • Condition A (30 s burst window): append time.time() when recording a fill event; purge entries older than now − burst_window_seconds. Precedent: the participation filter at main_loop.py:2103 uses now_pf = time.time() for its 8-s suppression timer — this branch uses the same time source for consistency with existing loop code.
  • Condition B (120 s rolling net notional window): same time.time() pattern — deque of (ts, signed_rlusd); purge and sum on each evaluation.
  • Condition C (no opposing fills for N ticks): pure tick counter — increment every evaluation; reset on an opposing-side fill; guard-armed once no_opposing_fill_min_fills fills have been observed. No time involvement.

MarketSnapshot has no timestamp field (verified: market_data.py:47–78), so the tick snapshot is not an option. datetime.utcnow().isoformat() is used by the reconciler for DB timestamps, but converting back to float each tick is wasteful — epoch seconds is simpler and matches the participation filter precedent.

Test handling: time.time() is monkey-patchable in unit tests (unittest.mock.patch("time.time", side_effect=...)). No extra test infrastructure needed.

Q5 — circuit_breaker_events write pattern

Confirmed: the existing StateManager.record_circuit_breaker_event() (state_manager.py:1635) is fully suitable. No new writer needed. Signature:

record_circuit_breaker_event(
    *, breaker: str,
    triggered_at: Optional[str] = None,
    cooldown_seconds: Optional[int] = None,
    manual_reset_required: bool = True,
    context: Optional[dict] = None,
) -> int

The drift guard will call it once per session-trigger with: - breaker="directional_drift_guard" - manual_reset_required=True (matches anchor guard — DEGRADED, restart-required recovery) - context varies by condition: - A: {"condition_triggered": "A", "burst_count": int, "burst_window_seconds": float, "side": "buy"|"sell", "fill_sides_in_window": ["buy","buy","buy", ...]} - B: {"condition_triggered": "B", "net_notional_rlusd": float, "threshold": float, "window_seconds": float} - C: {"condition_triggered": "C", "ticks_since_opposing_fill": int, "threshold_ticks": int, "last_fill_side": "buy"|"sell"}

session_id is auto-tagged by the writer (Vesper-approved C2 migration on the anchor guard branch), so no extra work here. The ignore_cleanup_errors / sm.close Windows teardown fix from C5a will be replicated in any integration test that opens a real StateManager.

Design sketch (for Vesper's sanity check — not code yet)

New state on NEOEngine.__init__:

self._drift_fill_events: deque[tuple[float, str, float]] = deque()   # (ts, side, rlusd_notional)
self._drift_net_notional_events: deque[tuple[float, float]] = deque() # (ts, signed_rlusd)
self._drift_ticks_since_opposing_fill: int = 0
self._drift_fills_seen_this_session: int = 0
self._drift_last_fill_side: str | None = None
self._drift_last_fill_watermark_iso: str | None = None  # DB watermark
self._drift_guard_triggered_this_session: bool = False  # one-shot, set BEFORE side effects

Per-tick flow at Step 8.5b: 1. Query get_fill_events_since(session_id, self._drift_last_fill_watermark_iso) → list of (id, created_at_iso, side, quantity_rlusd). 2. For each event: ts = time.time() (NOT parsed from created_at — wall-clock-at-observation is fine; fills from the prior tick are ~4 s stale which is inside the 30 s window). Append to both deques. Update watermark, counters, _drift_last_fill_side. 3. Purge deques older than respective windows. 4. Increment _drift_ticks_since_opposing_fill by 1 (reset logic is inside step 2 when an opposing-side fill arrives). 5. Evaluate A | B | C in that order; first condition to fire wins. OR logic — short-circuit on first hit. 6. On first trigger this session: set one-shot flag before side effects, emit WARNING, persist one circuit_breaker_events row (best-effort; ERROR log on raise, never blocks DEGRADED), call _enter_degraded_mode("directional_drift_guard_<cond>"), clear intents.

Guard resets at session boundary: the engine rebuilds on startup; deques are initialised empty, counters zero, flag False. Nothing persists across restart (same pattern as anchor guard).

Deviations from spec — flagged for ruling

D1 — New DB helper get_fill_events_since(session_id, watermark_iso). The guard needs fill events with side attached. Today no helper joins fills ⋈ orders for that shape (closest: get_most_recent_buy_fill_with_markout() at state_manager.py:1282, which is side-specific and read-latest-only). Proposed addition to state_manager.py:

def get_fill_events_since(
    self, *, session_id: int, watermark_iso: Optional[str]
) -> list[tuple[str, str, str, float]]:
    """
    Return (fill_id, created_at_iso, side, quantity_rlusd) tuples for
    every fill in this session whose created_at > watermark_iso, oldest
    first. watermark_iso=None returns all fills in the session.
    side is read from the joined orders row ('buy' | 'sell').
    quantity is RLUSD notional (see InventoryManager.apply_fill).
    """

Additive read helper, no schema change, uses existing idx_fills_session_id index. Request ruling: approve as part of this branch, or fold into a follow-up? My recommendation: approve here — the guard can't work without a side-aware fill read.

D2 — Condition B net_notional_threshold_rlusd = 50.0 default. Your tasking already calls out that this default is a proposal pending Atlas calibration. I will wire the value from YAML (no hardcoding), log a comment in config.example.yaml flagging the calibration TBD, and not tune beyond 50.0 in this branch. Post-first-live-session we can return to this with data.

D3 — Wall-clock time-at-observation (not fills.created_at) for deque timestamps. See Q4. This means paper-mode fills enter the 30 s window ~4 s late (one tick). The spec does not require sub-tick precision and the window is 30 s (≈7 ticks), so this is fine. Flagging explicitly in case you want fill-time ISO parsing instead.

No other deviations from the spec.

Commit plan (unchanged from tasking, listing for the record)

  1. feat(config): DirectionalDriftGuardConfig dataclass + YAML defaults
  2. feat(db,main_loop): drift guard state + get_fill_events_since helper (Conditions A/B/C)
  3. feat(main_loop): drift guard evaluation + DEGRADED trigger (Step 8.5b)
  4. feat(db,main_loop): drift guard circuit_breaker_events persistence + WARNING log
  5. test(guard): directional drift guard — ≥10 tests

Unit tests will use unittest.mock.patch("time.time", ...) for wall-clock determinism and MagicMock(spec=NEOEngine) for unbound-method invocation (same pattern as the anchor guard suite). Part C integration test uses a real StateManager with the addCleanup(sm.close) LIFO fix from C5a.

Awaiting your ruling on D1, D2, D3 before I touch code.

— Orion