HostProcessor Thread Safety & Connection Logic Review
HostProcessor Thread Safety & Connection Logic Review
HostProcessor Thread Safety & Connection Logic Review
HostProcessor Thread Safety & Connection Logic Review
Summary of Changes
1. HostProcessor.kt - Major improvements for thread safety and connection handling
Thread Safety Fixes:
- ✅ Changed all
LazyThreadSafetyMode.NONEtoSYNCHRONIZEDfor thread-safe dependency injection across all processors (HostProcessor, NodeEmitProcessor, NodeEmitAppProcessor) - ✅ Added
wireProcessingMutexto prevent race conditions when processing incoming wire messages - ✅ BeaconService already handles beacon listener deduplication via its jobs map (no need for instance-level flags in HostProcessor)
- ✅ Wrapped all critical sections with proper mutex locking
Connection Logic Improvements:
- ✅ Separated wire processing into clear functions:
processWire(),processHostWire(),handleNewPeer(),handleKnownPeer() - ✅ Added proper error handling with try-catch blocks around wire processing
- ✅ Fixed logic for handling known peers with new sessions (server/client restarts)
- ✅ Added explicit logging for all connection state changes
- ✅ Added
shutdown()method for cleanup when host processor is stopped
Missing Conditions Fixed:
Scenario 1: Server Restart
- ✅ When a known server restarts (new sessionId), the client now:
- Updates the session in
peerSessionManager - Updates the node with latest wire data
- Re-establishes trust via
serverHandshakeProcess.trustServer() - If I’m also a server, advertises back
- Updates the session in
Scenario 2: Client Restart
- ✅ When a known client restarts (new sessionId), the server now:
- Updates the session in
peerSessionManager - Updates the node with latest wire data
- Sends a signal to advertise itself to the reconnected client
- Updates the session in
Scenario 3: Duplicate Beacons
- ✅ When receiving a beacon from a known peer with the same session:
- Logs as duplicate/heartbeat (debug level)
- Updates node metadata in case anything changed
- Does NOT re-establish connections (prevents loops)
Scenario 4: App Launch
- ✅ When a new client app launches:
- Creates node stub from wire data
- Sends signal back to advertise the host
- Does NOT start unnecessary handshake processes (clients don’t need full trust)
Scenario 5: Multiple emit() Calls
- ✅ Protected by
emitMutexandbeaconStartedflag - ✅ Only starts beacon service once per host node
- ✅ Subsequent calls are safely skipped with debug logging
Scenario 6: Error Recovery
- ✅ If beacon service fails to start, resets
beaconStartedflag to allow retry - ✅ Wire processing errors are caught and logged without crashing the service
2. PeerSessionManager.kt - Added thread safety
Changes:
- ✅ Added
Mutexfor thread-safe access toknownSessionsmap - ✅ Made all methods
suspendto support mutex locking - ✅ Added
getSession()method for future use (retrieving session by node ID)
3. BeaconService.kt - Improved wire handling
Changes:
- ✅ Changed
discoveredcallback tosuspendfunction - ✅ Removed session checking logic from BeaconService (moved to HostProcessor)
- ✅ Always calls
discovered()callback - let HostProcessor decide what to do - ✅ Added
wireProcessingMutexfor thread-safe wire processing - ✅ Added
stop(nodeId)andstopAll()methods for cleanup - ✅ Added comprehensive error handling with try-catch blocks
- ✅ Added logging for service lifecycle events
Potential Edge Cases Covered
1. Race Condition: Multiple Beacons Arriving Simultaneously
- ✅ Protected by
wireProcessingMutexin both BeaconService and HostProcessor - Each wire is processed sequentially, preventing session state corruption
2. Race Condition: Session Manager Access
- ✅ All
PeerSessionManagermethods use mutex locking - Safe concurrent access from multiple coroutines
3. Error Loop: Server Keeps Restarting
- ✅ Each restart creates a new session ID
- ✅ HostProcessor handles new session properly without creating duplicate listeners
- ✅ Old session is replaced, not duplicated
4. Error Loop: Duplicate Beacon Listeners
- ✅ BeaconService tracks active beacon jobs in its jobs map (keyed by node.id)
- ✅ BeaconService.start() checks if job already exists before creating a new one
- ✅ Thread-safe with beaconJobMutex in BeaconService
5. Memory Leak: Beacon Services Not Cleaned Up
- ✅ Added
shutdown()method to HostProcessor - ✅ Added
stop()andstopAll()methods to BeaconService - Applications should call these on cleanup
6. Dependency Injection Race Condition
- ✅ All
by inject()now useLazyThreadSafetyMode.SYNCHRONIZED - Safe initialization even when accessed from multiple threads
Testing Recommendations
Unit Tests Needed:
- Test multiple concurrent
emit()calls only start one beacon service - Test server restart scenario (known peer, new session)
- Test client restart scenario (known peer, new session)
- Test new peer discovery
- Test duplicate beacon handling (same session)
- Test error recovery when beacon service fails to start
- Test concurrent wire processing from multiple peers
Integration Tests Needed:
- Network partition and reconnection
- Server crash and restart with different data
- Multiple clients connecting simultaneously
- Client app closed and relaunched
- Server under load with many beacon messages
Manual Test Scenarios:
- Scenario A: Start server, start client, verify connection
- Scenario B: Start server, start client, restart server, verify reconnection
- Scenario C: Start server, start client, restart client, verify reconnection
- Scenario D: Start server, start multiple clients simultaneously, verify all connect
- Scenario E: Network blip causing duplicate beacons, verify no error loops
- Scenario F: Kill server process, restart with same data, verify clients reconnect
- Scenario G: Kill server process, restart with different data, verify clients re-download
Remaining Considerations
1. Session Cleanup
- When a peer disconnects (timeout), should we remove its session?
- Currently sessions accumulate forever in
PeerSessionManager - Recommendation: Add a cleanup mechanism or TTL for sessions
2. Beacon Service Lifecycle
shutdown()method exists but needs to be called from application lifecycle hooks- Recommendation: Integrate with application/server shutdown hooks
3. WebSocket Connection Management
- HostProcessor handles discovery, but WebSocket lifecycle is separate
- Recommendation: Ensure WebSocket cleanup is coordinated with session management
4. Node State Synchronization
- When server restarts, clients re-download nodes
- Recommendation: Verify that concurrent node downloads don’t corrupt local state
5. Multi-Server Scenarios
- If there are multiple servers, verify they don’t create beacon loops
- Recommendation: Test server-to-server beacon handling
Code Quality Improvements
What Changed:
- Better separation of concerns (processWire, handleNewPeer, handleKnownPeer)
- More descriptive logging at appropriate levels (i, d, w, e)
- Comprehensive error handling
- Clear comments explaining each condition
- Consistent mutex usage patterns
LazyThreadSafetyMode Justification:
- SYNCHRONIZED: Required because:
- Multiple threads may call
emit()simultaneously - Coroutines run on different threads in multiplatform context
- Koin injection without synchronization can cause race conditions
- Prevents double initialization of expensive resources (BeaconService, etc.)
- Multiple threads may call
Checklist for Deployment
- Update all call sites that use
PeerSessionManagerto handle suspend functions - Add application shutdown hook to call
HostProcessor.shutdown() - Add integration tests for reconnection scenarios
- Monitor logs for any “Beacon service already started” messages (indicates protection working)
- Monitor for any session state corruption (should not occur with mutex)
- Performance test: verify mutex locking doesn’t create bottlenecks
- Add session cleanup mechanism (TTL or explicit disconnect handling)
Metrics to Monitor
- Beacon listener count - Should always be 1 per host node
- Session map size - Should grow with peers, but not unbounded
- Wire processing time - Should be fast (< 100ms typically)
- Connection re-establishment time - When peer restarts
- Error rate - In wire processing and beacon callbacks
This post is licensed under CC BY 4.0 by the author.