Skip to content

5. MVP Data Flow

Overview

Data flows through 4 bounded contexts: Configuration → Railway Infrastructure → External Trains → Retrofit Workflow.

End-to-End Flow

┌─────────────────────────────────────────┐
│   1. Configuration Context              │
│   Load & validate scenario              │
└──────────────┬──────────────────────────┘
               │ Scenario
               ├──────────────────────────┐
               │                          │
┌──────────────▼──────────┐  ┌───────────▼──────────┐
│ 2. Railway Infrastructure│  │ 3. External Trains   │
│    Build tracks          │  │    Schedule arrivals │
└──────────────┬──────────┘  └───────────┬──────────┘
               │ Track state              │ Events
               │                          │
               └──────────┬───────────────┘
                          │
               ┌──────────▼──────────────────────┐
               │ 4. Retrofit Workflow            │
               │    Execute simulation           │
               │    Collect metrics              │
               └──────────┬──────────────────────┘
                          │ Results
                          ▼
                    CSV + Charts

Phase 1: Configuration Loading

Input

scenario_dir/
├── scenario.json
├── tracks.json
├── workshops.json
├── locomotives.json
├── routes.json
└── trains.csv

Process

from contexts.configuration.domain.configuration_builder import ConfigurationBuilder

builder = ConfigurationBuilder(Path("scenario_dir"))
scenario = builder.build()

Output

Scenario(
    id="demo",
    start_date=datetime(2025, 1, 1),
    end_date=datetime(2025, 1, 2),
    trains=[...],
    tracks=[...],
    workshops=[...],
    locomotives=[...],
    routes=[...]
)

Phase 2: Railway Infrastructure Setup

Input: Scenario

Process

from contexts.railway_infrastructure.application.railway_context import RailwayContext

railway = RailwayContext(scenario)
# Builds track groups, initializes capacity management

Output

RailwayContext(
    track_groups={
        'collection': TrackGroup([Track(...), Track(...)]),
        'retrofit': TrackGroup([Track(...)]),
        'parking': TrackGroup([Track(...)])
    },
    track_selector=TrackSelector(...),
    capacity_service=CapacityService(...)
)

Phase 3: External Trains Initialization

Input: Scenario

Process

from contexts.external_trains.application.external_trains_context import ExternalTrainsContext

external_trains = ExternalTrainsContext(env, scenario, event_bus)
external_trains.initialize_arrivals()
# Schedules train arrival events

Output

Scheduled SimPy processes that publish TrainArrivedEvent at specified times.

Phase 4: Retrofit Workflow Execution

Input: Scenario + Railway + External Trains

Process

from contexts.retrofit_workflow.application.retrofit_workflow_context import RetrofitWorkflowContext

workflow = RetrofitWorkflowContext(env, scenario, railway, external_trains)
workflow.run()

Data Flow Within Workflow

TrainArrivedEvent
    ↓
ArrivalCoordinator
    ↓ (classify wagons)
CollectionCoordinator
    ↓ (form batches)
WorkshopCoordinator
    ↓ (retrofit)
ParkingCoordinator
    ↓ (to parking)
Metrics Collection

Output

{
    'wagon_events': [...],
    'locomotive_events': [...],
    'workshop_events': [...]
}

Phase 5: Results Export

Input: Metrics

Process

metrics = workflow.get_metrics()
workflow.export_events(output_path)

Output

output/
├── wagon_events.csv
├── locomotive_events.csv
├── workshop_events.csv
└── charts/
    ├── throughput.png
    └── utilization.png

Data Transformations

Phase Input Output Context
1 JSON/CSV files Scenario Configuration
2 Scenario Track infrastructure Railway Infrastructure
3 Scenario Scheduled arrivals External Trains
4 All above Metrics Retrofit Workflow
5 Metrics CSV/Charts Retrofit Workflow

Event Flow

sequenceDiagram
    participant ET as External Trains
    participant EB as Event Bus
    participant AC as ArrivalCoordinator
    participant CC as CollectionCoordinator
    participant WC as WorkshopCoordinator
    participant PC as ParkingCoordinator

    ET->>EB: TrainArrivedEvent
    EB->>AC: Deliver event
    AC->>AC: Classify wagons
    AC->>CC: Queue wagons
    CC->>CC: Form batch
    CC->>WC: Transport to workshop
    WC->>WC: Execute retrofit
    WC->>PC: Move to parking
    PC->>PC: Complete

Error Handling

try:
    # Load configuration
    scenario = builder.build()
except ValidationError as e:
    logger.error(f"Configuration error: {e}")
    sys.exit(1)

try:
    # Run simulation
    workflow.run()
except SimulationError as e:
    logger.error(f"Simulation error: {e}")
    # Export partial results
    workflow.export_events(output_path)
    sys.exit(2)