Drivers
The Aleatoric Engine Drivers
Section titled “The Aleatoric Engine Drivers”Unified Driver Architecture for Batch and Streaming Data
The Aleatoric Engine utilizes a Dual-Driver Architecture to ensure that data generation is identical regardless of the consumption mode. This guarantees that datasets used for training ML models (Batch) are mathematically identical to the data streams used for live testing (Stream).
Driver Functions
Section titled “Driver Functions”Located in src/aleatoric/drivers.py, the driver module provides two main entry points:
run_batch()
Section titled “run_batch()”High-throughput batch generation with optional multiprocessing.
def run_batch( config: SimulationManifest, duration_seconds: float, chunk_size: int = 1000, multiprocess: bool = False, workers: Optional[int] = None, window_seconds: Optional[float] = None, max_retries: int = 3, backoff_seconds: float = 0.5) -> Tuple[str, int]: """ Returns: (file_path, row_count) """run_stream()
Section titled “run_stream()”Real-time streaming with optional wall-clock timing.
async def run_stream( config: SimulationManifest, duration_seconds: Optional[float] = None, real_time: bool = True) -> AsyncGenerator[Tuple[str, dict], None]: """ Yields: (event_type, event_data) tuples """1. Batch Mode
Section titled “1. Batch Mode”- Purpose: High-throughput generation for historical analysis and ML training.
- Mechanism:
- Instantiates
HyperSynthReactorwith the provided config. - Iterates through the market generator as fast as CPU allows.
- Accumulates events in chunks (configurable via
chunk_size). - Writes chunks to Parquet file via PyArrow (with Polars fallback).
- Instantiates
- Output: File path to the generated Parquet file and row count.
- Performance: Capable of generating millions of events per second (CPU bound).
2. Batch Mode with Multiprocessing
Section titled “2. Batch Mode with Multiprocessing”- Purpose: Scale batch generation across multiple CPU cores.
- Mechanism:
- Divides total duration into time windows.
- Spawns ProcessPool workers (configurable count).
- Each worker generates its window with deterministic per-window seeding.
- Results are merged into a single Parquet file.
- Configuration:
multiprocess=True: Enable multiprocessing.workers: Number of parallel workers (default: min(4, CPU count)).window_seconds: Duration per window (auto-calculated if not specified).max_retries: Retry failed windows (default: 3).backoff_seconds: Backoff between retries (default: 0.5s).
- Determinism: Preserved via per-window seed derivation from base seed.
3. Stream Mode
Section titled “3. Stream Mode”- Purpose: Real-time simulation for bot testing, UI development, and system integration.
- Mechanism:
- Instantiates
HyperSynthReactorwith the provided config. - Iterates through the market generator.
- When
real_time=True, calculatestime_deltabetween events. - Suspends execution using
await asyncio.sleep(delta)to match wall-clock time. - Yields
(event_type, event_data)tuples immediately.
- Instantiates
- Output: An
AsyncGeneratoryielding event tuples. - Behavior: Effectively “plays back” the simulation in real-time.
Usage Examples
Section titled “Usage Examples”Running a Batch Job (Single Process)
Section titled “Running a Batch Job (Single Process)”from aleatoric.drivers import run_batchfrom aleatoric.core.config import SimulationManifest
config = SimulationManifest(symbol="ETH", seed=123)file_path, row_count = run_batch(config, duration_seconds=3600)print(f"Generated {row_count} events to: {file_path}")Running a Batch Job (Multiprocess)
Section titled “Running a Batch Job (Multiprocess)”from aleatoric.drivers import run_batchfrom aleatoric.core.config import SimulationManifest
config = SimulationManifest(symbol="BTC", seed=42)file_path, row_count = run_batch( config, duration_seconds=86400, # 24 hours multiprocess=True, workers=8, window_seconds=3600 # 1-hour windows)print(f"Generated {row_count} events using 8 workers")Running a Stream
Section titled “Running a Stream”import asynciofrom aleatoric.drivers import run_streamfrom aleatoric.core.config import SimulationManifest
async def consume_stream(): config = SimulationManifest(symbol="ETH", seed=123)
async for event_type, event_data in run_stream(config, duration_seconds=60): print(f"[{event_type}] {event_data}")
asyncio.run(consume_stream())Environment Configuration
Section titled “Environment Configuration”Driver behavior can be configured via environment variables:
| Variable | Default | Description |
|---|---|---|
ALEATORIC_DRIVER_ENABLE_MULTIPROCESS | false | Enable multiprocessing by default |
ALEATORIC_DRIVER_MAX_WORKERS | auto | Maximum worker processes |
ALEATORIC_DRIVER_WINDOW_SECONDS | auto | Window duration for multiprocess |
ALEATORIC_DRIVER_MAX_RETRIES | 3 | Retries for failed windows |
ALEATORIC_DRIVER_BACKOFF_SECONDS | 0.5 | Backoff between retries |
ALEATORIC_BATCH_CHUNK_SIZE | 1000 | Events per chunk before flush |
Determinism Guarantee
Section titled “Determinism Guarantee”Both batch and stream modes guarantee bit-for-bit reproducibility given the same seed:
# These will produce identical event sequences:run_batch(SimulationManifest(seed=42), duration_seconds=100)run_stream(SimulationManifest(seed=42), duration_seconds=100, real_time=False)
# Multiprocess also preserves determinism:run_batch(config, duration_seconds=100, multiprocess=False)run_batch(config, duration_seconds=100, multiprocess=True, workers=4)# ^ Both produce identical outputLast Updated: 2026-01-13 Version: 0.4.5