Skip to content

Drivers

Unified Driver Architecture for Batch and Streaming Data

The Aleatoric Engine utilizes a Dual-Driver Architecture to ensure that data generation is identical regardless of the consumption mode. This guarantees that datasets used for training ML models (Batch) are mathematically identical to the data streams used for live testing (Stream).


Located in src/aleatoric/drivers.py, the driver module provides two main entry points:

High-throughput batch generation with optional multiprocessing.

def run_batch(
config: SimulationManifest,
duration_seconds: float,
chunk_size: int = 1000,
multiprocess: bool = False,
workers: Optional[int] = None,
window_seconds: Optional[float] = None,
max_retries: int = 3,
backoff_seconds: float = 0.5
) -> Tuple[str, int]:
"""
Returns: (file_path, row_count)
"""

Real-time streaming with optional wall-clock timing.

async def run_stream(
config: SimulationManifest,
duration_seconds: Optional[float] = None,
real_time: bool = True
) -> AsyncGenerator[Tuple[str, dict], None]:
"""
Yields: (event_type, event_data) tuples
"""

  • Purpose: High-throughput generation for historical analysis and ML training.
  • Mechanism:
    1. Instantiates HyperSynthReactor with the provided config.
    2. Iterates through the market generator as fast as CPU allows.
    3. Accumulates events in chunks (configurable via chunk_size).
    4. Writes chunks to Parquet file via PyArrow (with Polars fallback).
  • Output: File path to the generated Parquet file and row count.
  • Performance: Capable of generating millions of events per second (CPU bound).
  • Purpose: Scale batch generation across multiple CPU cores.
  • Mechanism:
    1. Divides total duration into time windows.
    2. Spawns ProcessPool workers (configurable count).
    3. Each worker generates its window with deterministic per-window seeding.
    4. Results are merged into a single Parquet file.
  • Configuration:
    • multiprocess=True: Enable multiprocessing.
    • workers: Number of parallel workers (default: min(4, CPU count)).
    • window_seconds: Duration per window (auto-calculated if not specified).
    • max_retries: Retry failed windows (default: 3).
    • backoff_seconds: Backoff between retries (default: 0.5s).
  • Determinism: Preserved via per-window seed derivation from base seed.
  • Purpose: Real-time simulation for bot testing, UI development, and system integration.
  • Mechanism:
    1. Instantiates HyperSynthReactor with the provided config.
    2. Iterates through the market generator.
    3. When real_time=True, calculates time_delta between events.
    4. Suspends execution using await asyncio.sleep(delta) to match wall-clock time.
    5. Yields (event_type, event_data) tuples immediately.
  • Output: An AsyncGenerator yielding event tuples.
  • Behavior: Effectively “plays back” the simulation in real-time.

from aleatoric.drivers import run_batch
from aleatoric.core.config import SimulationManifest
config = SimulationManifest(symbol="ETH", seed=123)
file_path, row_count = run_batch(config, duration_seconds=3600)
print(f"Generated {row_count} events to: {file_path}")
from aleatoric.drivers import run_batch
from aleatoric.core.config import SimulationManifest
config = SimulationManifest(symbol="BTC", seed=42)
file_path, row_count = run_batch(
config,
duration_seconds=86400, # 24 hours
multiprocess=True,
workers=8,
window_seconds=3600 # 1-hour windows
)
print(f"Generated {row_count} events using 8 workers")
import asyncio
from aleatoric.drivers import run_stream
from aleatoric.core.config import SimulationManifest
async def consume_stream():
config = SimulationManifest(symbol="ETH", seed=123)
async for event_type, event_data in run_stream(config, duration_seconds=60):
print(f"[{event_type}] {event_data}")
asyncio.run(consume_stream())

Driver behavior can be configured via environment variables:

VariableDefaultDescription
ALEATORIC_DRIVER_ENABLE_MULTIPROCESSfalseEnable multiprocessing by default
ALEATORIC_DRIVER_MAX_WORKERSautoMaximum worker processes
ALEATORIC_DRIVER_WINDOW_SECONDSautoWindow duration for multiprocess
ALEATORIC_DRIVER_MAX_RETRIES3Retries for failed windows
ALEATORIC_DRIVER_BACKOFF_SECONDS0.5Backoff between retries
ALEATORIC_BATCH_CHUNK_SIZE1000Events per chunk before flush

Both batch and stream modes guarantee bit-for-bit reproducibility given the same seed:

# These will produce identical event sequences:
run_batch(SimulationManifest(seed=42), duration_seconds=100)
run_stream(SimulationManifest(seed=42), duration_seconds=100, real_time=False)
# Multiprocess also preserves determinism:
run_batch(config, duration_seconds=100, multiprocess=False)
run_batch(config, duration_seconds=100, multiprocess=True, workers=4)
# ^ Both produce identical output

Last Updated: 2026-01-13 Version: 0.4.5