NEO Trading Engine — Live Book Capture Spec¶
Design spec for the live XRPL top-of-book capture script and replay adapter. Do not implement until ready — this is the design target.
Purpose¶
The current paper environment uses static market data (mid/bid/ask constant across all ticks). This makes it impossible to test state-aware signals (volatility, spread regime, etc.).
Solution: capture live XRP/RLUSD top-of-book snapshots at 1s resolution, store them, and replay them through a drop-in adapter when running experiment sessions.
Architecture Principle¶
The engine must not know whether market data is static, replayed, or live.
MarketDataAdapter.fetch() is the only interface — swapping the source requires no engine changes.
Capture Script — capture_book.py¶
Standalone script. Runs independently from the trading engine (parallel process).
Usage¶
python capture_book.py --output data/book_capture_20260410.db
python capture_book.py --output data/book_capture_20260410.db --interval 1.0
Config inputs¶
xrpl_node_url— WebSocket node (default:wss://xrplcluster.com)base—{"currency": "XRP"}quote—{"currency": "RLUSD", "issuer": "<RLUSD_ISSUER_ADDRESS>"}poll_interval_seconds— default1.0output_db— path to output SQLite file
Core logic¶
async def capture_loop(output_db: str):
db = init_db(output_db)
async with AsyncWebsocketClient(XRPL_NODE) as client:
print(f"Connected. Writing to {output_db}")
while True:
snapshot = await fetch_top_of_book(client)
ledger_index = await get_current_ledger_index(client)
if snapshot:
write_snapshot(db, snapshot, ledger_index)
else:
log_skip(db, ledger_index, reason="empty_book")
await asyncio.sleep(POLL_INTERVAL_SECONDS)
async def fetch_top_of_book(client) -> dict | None:
"""Fetch best bid and ask. Returns None if book is empty or fetch fails."""
bid_req = BookOffers(taker_gets=BASE, taker_pays=QUOTE, limit=1)
ask_req = BookOffers(taker_gets=QUOTE, taker_pays=BASE, limit=1)
bid_resp = await client.request(bid_req)
ask_resp = await client.request(ask_req)
best_bid = parse_best_price(bid_resp, side="bid") # float or None
best_ask = parse_best_price(ask_resp, side="ask")
if best_bid is None or best_ask is None:
return None
mid = (best_bid + best_ask) / 2
spread_bps = ((best_ask - best_bid) / mid) * 10_000
return {
"best_bid": best_bid,
"best_ask": best_ask,
"mid_price": mid,
"spread_bps": spread_bps,
}
Shutdown summary (on Ctrl+C)¶
Capture complete.
Duration: 00:20:14
Total ticks: 1214
Skipped: 3
Distinct mid vals: 847
Mid range: 0.5571 – 0.5634
Spread range: 4.2 – 31.7 bps
Output: data/book_capture_20260410.db
distinct_mid_vals is the pre-run audit signal.
If it comes back as 1, the feed is static and the file is unusable for replay.
SQLite Schema¶
Same .db pattern as existing engine run files.
CREATE TABLE book_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
captured_at TEXT NOT NULL, -- ISO 8601 UTC
ledger_index INTEGER NOT NULL,
best_bid REAL NOT NULL,
best_ask REAL NOT NULL,
mid_price REAL NOT NULL,
spread_bps REAL NOT NULL -- precomputed at capture time
);
CREATE TABLE capture_meta (
key TEXT PRIMARY KEY,
value TEXT
-- stores: start_time, node_url, base, quote, poll_interval_seconds
);
CREATE TABLE skipped_ticks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
skipped_at TEXT,
ledger_index INTEGER,
reason TEXT
);
Replay Adapter — ReplayMarketDataAdapter¶
Drop-in replacement for the live MarketDataAdapter.
Implements the same fetch() interface — engine never knows the difference.
class ReplayMarketDataAdapter:
def __init__(self, db_path: str, start_time: str = None):
self.snapshots = load_snapshots(db_path, from_time=start_time)
self.cursor = 0
def fetch(self) -> MarketSnapshot:
row = self.snapshots[self.cursor % len(self.snapshots)]
self.cursor += 1
return MarketSnapshot(
best_bid=row.best_bid,
best_ask=row.best_ask,
mid_price=row.mid_price,
ledger_index=row.ledger_index,
)
Replay manifest¶
Each replay session should record which capture file was used:
replay_manifest:
source_db: data/book_capture_20260410.db
start_time: "2026-04-10T09:00:00Z"
end_time: "2026-04-10T09:20:00Z"
tick_count: 1214
distinct_mid_vals: 847
mid_range_bps: [0.5571, 0.5634]
spread_range_bps: [4.2, 31.7]
Pre-Run Audit Checklist¶
Before running any experiment session against a replay file, confirm:
-
distinct_mid_vals > 1— mid is not static -
spread_range_bpsshows real variance (not a single value) -
skipped_tickscount is low relative to total ticks -
tick_countis sufficient for session duration (at 1s capture / 4s tick cadence, need at leastsession_duration_seconds / 4snapshots) - Capture period represents a reasonable market condition (not holiday/weekend/low liquidity)
Recommended Sequencing¶
Now
1. Keep Phase 4A baseline unchanged and running
2. Implement capture_book.py and start collecting data
3. Validate first capture file passes pre-run audit
Soon (2–4 weeks)
1. Have multiple capture files across different market periods
2. Build ReplayMarketDataAdapter
3. Re-run Phase 4A baseline over replay to confirm it holds under real movement
Later 1. Re-test Phase 4B candidate signals (volatility, spread regime) with real microstructure 2. Any filter validated across 3+ distinct replay periods is trustworthy for live consideration
What Not To Do¶
- Do not hunt for third-party historical 1s XRP/RLUSD book data — it likely doesn't exist
- Do not attempt ledger reconstruction — too complex for current stage
- Do not let synthetic movement become the primary path unless a short-term unblock is needed
- Do not run experiment sessions against a capture file that fails the pre-run audit
Status¶
Capture script: RUNNING (as of 2026-04-10) - Validated: distinct mid values > 1, real spread variation confirmed, ledger index advancing, skipped ticks recording cleanly - Next milestone: accumulate sufficient data across varied market conditions (target: at least one calm period + one volatile period)
Replay adapter: NOT YET BUILT - Build once capture dataset is sufficient - Pre-run audit must pass before any experiment session
Spec authored: 2026-04-10.