ADR-014: Wagon Tracking and Queue Management Architecture¶
Status¶
IMPLEMENTED - Resolved in MVP
Context¶
PopUpSim had a fundamental issue with wagon tracking and queue management that caused wagons to be "lost" during the simulation workflow. The root cause was a dual-purpose wagons_queue that served both as a processing queue and a global wagon registry, leading to data inconsistency.
Problem (Resolved)¶
- W07 stuck on retrofit track: Wagons moved to retrofitted track were not added back to
wagons_queue, making them invisible tomove_to_parkingprocess - Dual-purpose queue:
popupsim.wagons_queueused for both processing workflow and wagon tracking - Broken workflow chain: Train → Collection → Retrofit → Workshop → Retrofitted → ❌ LOST → Parking
- Data inconsistency: Wagons removed from queue during processing but needed for later stages
Current Architecture Issues¶
# Problematic dual-purpose usage
self.wagons_queue: list[Wagon] = [] # Both processing queue AND global registry
# Broken lookup in move_to_parking
wagons_on_retrofitted = [
w for w in popupsim.wagons_queue # ❌ Wagons not in queue anymore!
if w.track == retrofitted_track.id and w.status == WagonStatus.RETROFITTED
]
Decision¶
IMPLEMENTED: Option 1 (Separation of Concerns - Registry + Queues) with SimPy store integration.
The MVP implements a hybrid approach combining: - WagonStateManager: Handles wagon state transitions and tracking - SimPy Stores: Separate workflow coordination stores for each stage - TrackCapacityManager: Physical capacity management separate from workflow
Implementation in MVP¶
class WorkshopOrchestrator:
def __init__(self, sim, scenario):
# Separate stores for workflow coordination (no dual-purpose queue)
self.retrofitted_wagons_ready = sim.create_store()
self.wagons_ready_for_stations = {}
self.wagons_completed = {}
# State management (replaces dual-purpose wagons_queue)
self.wagon_state = WagonStateManager()
self.track_capacity = TrackCapacityManager()
def put_wagon_if_fits_retrofitted(self, wagon):
"""Capacity-validated workflow coordination."""
if self.track_capacity.can_add_wagon(track_id, wagon.length):
yield self.retrofitted_wagons_ready.put(wagon)
return True
return False
Result: No more lost wagons - complete workflow chain with proper separation of concerns.
Decision Options (Evaluated)¶
Option 1: Separation of Concerns - Registry + Queues¶
Principle: Single Responsibility - separate tracking from workflow coordination
class WagonTracker:
"""Single responsibility: Track all wagons in system"""
def __init__(self):
self.registry: dict[str, Wagon] = {} # Never remove wagons
def get_wagons_by_track(self, track_id: str) -> list[Wagon]: ...
def get_wagons_by_status(self, status: WagonStatus) -> list[Wagon]: ...
class WorkflowQueues:
"""Single responsibility: Manage processing workflows"""
def __init__(self, sim):
self.collection_queue: list[Wagon] = []
self.retrofit_queue: list[Wagon] = []
self.retrofitted_ready = sim.create_store() # For move_to_parking
self.parking_queue: list[Wagon] = []
Pros: - Clear separation of concerns - No data loss - registry never loses wagons - Easy debugging - clear data ownership - Solves W07 problem directly
Cons: - More complexity - multiple data structures - Need to maintain consistency across structures
Option 2: Event-Driven State Machine¶
Principle: Reactive architecture with event-driven workflow coordination
class WagonEventManager:
def __init__(self, sim):
self.wagon_states: dict[str, Wagon] = {}
self.wagon_events: dict[WagonStatus, SimPy.Store] = {
WagonStatus.RETROFITTED: sim.create_store(),
WagonStatus.READY_FOR_PARKING: sim.create_store(),
}
def handle_wagon_retrofitted(self, wagon: Wagon) -> None:
self.wagon_states[wagon.id] = wagon
self.wagon_events[WagonStatus.RETROFITTED].put(wagon)
Pros: - Reactive - no polling required - Clear workflow triggers - Natural event integration - Scalable architecture
Cons: - More complex event handling - Still needs state storage for analytics - Higher learning curve
Option 3: Single Registry + Status-Based Queries¶
Principle: Single source of truth with query-based access
class WagonRegistry:
def __init__(self):
self.all_wagons: dict[str, Wagon] = {}
def get_wagons_by_status_and_track(self, status: WagonStatus, track_id: str) -> list[Wagon]:
return [w for w in self.all_wagons.values()
if w.status == status and w.track == track_id]
Pros: - Simple - single source of truth - No data synchronization issues - Easy to understand and maintain
Cons: - O(n) queries - potential performance issues - No workflow coordination mechanism - Polling-based approach
Option 4: Track-Based Wagon Management¶
Principle: Each track manages its own wagons
class TrackBasedWagonManager:
def __init__(self):
self.track_wagons: dict[str, list[Wagon]] = {
"collection": [],
"retrofit": [],
"retrofitted": [],
"parking": []
}
def move_wagon(self, wagon: Wagon, from_track: str, to_track: str): ...
Pros: - Natural organization by physical location - Efficient lookups by track - Mirrors real-world track organization
Cons: - Need to maintain consistency across moves - Complex cross-track operations - Potential data duplication
Option 5: Resource State Machines¶
Principle: Explicit state management with automatic transitions and side effects
class WagonStateMachine:
states = [
'ARRIVING', 'SELECTING', 'ON_COLLECTION', 'MOVING_TO_RETROFIT',
'ON_RETROFIT', 'RETROFITTING', 'RETROFITTED', 'MOVING_TO_PARKING', 'PARKING'
]
def on_enter_RETROFITTED(self, wagon: Wagon):
# Automatic queue management
self.orchestrator.parking_queue.put(wagon)
# Automatic event firing
self.metrics.record_event(WagonRetrofittedEvent.create(wagon_id=wagon.id))
def can_move_to_retrofit(self, wagon: Wagon) -> bool:
return (self.workshop_capacity.has_available_stations() and
self.track_capacity.can_add_wagon(wagon.length))
Pros: - Explicit state management - no "lost" wagons - Automatic queue coordination - Built-in validation with guards - Natural event integration - Excellent debugging capabilities - Prevents invalid state transitions
Cons: - Added complexity and abstraction - Learning curve for state machine concepts - Potential over-engineering for simple workflows
Analytics Considerations¶
All options must support analytics requirements:
# Analytics needs access to:
wagon.retrofit_start_time # When did retrofit begin?
wagon.retrofit_end_time # When did it complete?
wagon.waiting_time # How long did wagon wait?
wagon.track # Where is wagon now?
wagon.status # What state is it in?
Analytics Compatibility: - Option 1: ✅ Complete state in registry - Option 2: ⚠️ Needs separate state storage - Option 3: ✅ Direct state access - Option 4: ✅ State distributed across tracks - Option 5: ✅ State in machines + automatic events
Implementation Phases¶
MVP Phase (Immediate)¶
Goal: Solve W07 problem with minimal changes - Recommended: Option 1 (Registry + Queues) or Option 5 (Wagon State Machine) - Rationale: Clear separation, solves immediate problem, foundation for future
Post-MVP Phase (Enhanced)¶
Goal: Improved analytics and workflow coordination - Recommended: Hybrid of Option 1 + Option 2 (Registry + Events) - Add: Enhanced event projections, real-time analytics
Full Version (Enterprise)¶
Goal: Scalable, enterprise-grade architecture - Recommended: Event Sourcing + CQRS + State Machines - Features: Time-travel debugging, what-if analysis, microservices ready
Migration Strategy¶
Phase 1: MVP → Registry + Queues (solve W07)
Phase 2: Enhanced → Add event projections
Phase 3: Full → Event sourcing + CQRS
Phase 4: Enterprise → Microservices + real-time analytics
Open Questions¶
- Performance vs. Simplicity: How important is O(1) lookup performance vs. code simplicity?
- State Machine Scope: Should state machines apply to all resources or just critical ones (Wagon, Locomotive)?
- Event Integration: How tightly should new architecture integrate with existing event system?
- Migration Path: Should we implement incrementally or do a complete refactor?
- Testing Strategy: How do we ensure no regressions during architecture changes?
Implementation Results¶
Achieved in MVP¶
- ✅ W07 Problem Solved: Wagons no longer get lost during workflow
- ✅ Separation of Concerns: WagonStateManager handles state, SimPy stores handle workflow
- ✅ No Dual-Purpose Queue: Clear separation between tracking and coordination
- ✅ Complete Analytics: All wagon states tracked for metrics and visualization
- ✅ Event-Driven Architecture: SimPy stores eliminate polling and manual searches
Files Implementing This Decision¶
workshop_operations/application/orchestrator.py- Workflow coordination with separate storesworkshop_operations/domain/services/wagon_operations.py- WagonStateManager and WagonSelectoranalytics/domain/collectors/wagon_collector.py- Wagon state tracking for analytics
References¶
- Current issue: W07 wagon stuck on retrofit track
- Existing events:
WagonDeliveredEvent,WagonRetrofittedEvent - Analytics requirements: KPI calculation, throughput metrics
- MVP constraints: Direct context calls, file-based configuration
Authors: Development Team
Date: 2024-12-19
Review Date: TBD