Post

Icon Thread Safety Patterns Applied to HostProcessor

Thread Safety Patterns Applied to HostProcessor

Thread Safety Patterns Applied to HostProcessor

Thread Safety Patterns Applied to HostProcessor

Pattern 1: Thread-Safe Dependency Injection

Before:

1
private val nodeManager : NodeManager by inject(mode = LazyThreadSafetyMode.NONE)

After:

1
private val nodeManager : NodeManager by inject(mode = LazyThreadSafetyMode.SYNCHRONIZED)

Why: In multi-threaded environments, multiple threads calling emit() could trigger double initialization of dependencies. SYNCHRONIZED ensures thread-safe lazy initialization.


Pattern 2: Mutex-Protected Shared State

PeerSessionManager

Before:

1
2
3
4
5
6
7
8
9
class PeerSessionManager {
    private val knownSessions = mutableMapOf<String, String>()
    
    fun add(id: String, session: String) {
        knownSessions[id] = session  // RACE CONDITION!
    }
    
    fun isKnownSession(id: String) = knownSessions.values.contains(id)  // RACE CONDITION!
}

After:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class PeerSessionManager {
    private val knownSessions = mutableMapOf<String, String>()
    private val mutex = Mutex()
    
    suspend fun add(id: String, session: String) {
        mutex.withLock {
            knownSessions[id] = session  // THREAD SAFE
        }
    }
    
    suspend fun isKnownSession(id: String): Boolean {
        return mutex.withLock {
            knownSessions.values.contains(id)  // THREAD SAFE
        }
    }
}

Why: MutableMap is not thread-safe. Multiple coroutines could read/write simultaneously causing corruption.


Pattern 3: Wire Processing Serialization

Before:

1
2
3
4
5
6
7
8
9
beaconService.start(node) { wire ->
    // Process immediately - multiple wires could process concurrently
    when (wire.type) {
        KrillApp.Server -> {
            peerSessionManager.add(wire.id, wire.sessionId)  // RACE!
            nodeManager.update(wire.toNode())  // RACE!
        }
    }
}

After:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private val wireProcessingMutex = Mutex()

beaconService.start(node) { wire ->
    scope.launch {
        try {
            processWire(wire, node)  // Serialized processing
        } catch (e: Exception) {
            logger.e("Error processing wire: ${e.message}", e)
        }
    }
}

private suspend fun processWire(wire: NodeWire, node: Node) {
    wireProcessingMutex.withLock {
        // Only one wire processed at a time
        processHostWire(wire, node)
    }
}

Why: Multiple beacon messages could arrive simultaneously. Without serialization, session state could be corrupted.


Pattern 4: Separation of Concerns

Before: Monolithic Logic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
when (check) {
    is NodeFlow.Error -> {
        // 20 lines of logic
        peerSessionManager.add(...)
        nodeManager.update(...)
        if (wire.type == KrillApp.Server) {
            serverHandshakeProcess.trustServer(wire)
        } else if (wire.type == KrillApp.Client) {
            beaconManager.sendSignal(node)
        }
    }
    is NodeFlow.Success -> {
        // Another 20 lines
        if (!peerSessionManager.isKnownSession(wire.sessionId)) {
            // More nested logic
        }
    }
}

After: Clear Functions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
when (check) {
    is NodeFlow.Error -> {
        handleNewPeer(wire, node, isKnownSession)
    }
    is NodeFlow.Success -> {
        handleKnownPeer(wire, node, check.node.value, isKnownSession)
    }
}

private suspend fun handleNewPeer(wire: NodeWire, node: Node, isKnownSession: Boolean) {
    // Clear, focused logic for new peer discovery
}

private suspend fun handleKnownPeer(wire: NodeWire, node: Node, existingNode: Node, isKnownSession: Boolean) {
    if (!isKnownSession) {
        // Handle reconnection
    } else {
        // Handle duplicate beacon
    }
}

Why: Easier to understand, test, and maintain. Each function has a single responsibility.


Pattern 5: Deduplication at Service Level

Architecture:

1
2
3
4
5
6
HostProcessor (Created per emit() call)
    ↓
BeaconService (Singleton)
    ├── jobs: MutableMap<String, Job>
    ├── beaconJobMutex: Mutex
    └── start(node) checks jobs.containsKey(node.id)

BeaconService Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private val jobs: MutableMap<String, Job> = mutableMapOf()
private val beaconJobMutex = Mutex()

suspend fun start(node: Node, discovered: suspend (NodeWire) -> Unit) {
    beaconJobMutex.withLock {
        if (!jobs.containsKey(node.id)) {
            // Only create job if not already running
            jobs[node.id] = scope.launch {
                multicast.receiveBeacons { wire ->
                    // Process beacons
                }
            }
        }
    }
}

Why: Since HostProcessor instances are created fresh on each emit(), instance-level flags wouldn’t persist. Deduplication must be at singleton level.


Pattern 6: Error Isolation

Before:

1
2
3
4
beaconService.start(node) { wire ->
    // If this throws, entire beacon service could crash
    peerSessionManager.add(wire.id, wire.sessionId)
}

After:

1
2
3
4
5
6
7
8
9
10
beaconService.start(node) { wire ->
    scope.launch {
        try {
            processWire(wire, node)
        } catch (e: Exception) {
            // Log and continue - one bad wire doesn't crash service
            logger.e("Error processing wire from ${wire.id}: ${e.message}", e)
        }
    }
}

Why: Network issues or malformed messages shouldn’t crash the entire discovery service.


Pattern 7: Idempotent State Updates

NodeManager.update() Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override suspend fun update(node: Node) {
    val existing = nodes[copy.id]
    
    // Check if already equal (idempotent)
    if (existing is NodeFlow.Success && copy == existing.node.value) {
        return  // No-op if nothing changed
    }
    
    when (existing) {
        is NodeFlow.Error, null -> {
            // Create new
            val newNode = NodeFlow.Success(MutableStateFlow(copy))
            nodes[newNode.node.value.id] = newNode
        }
        is NodeFlow.Success -> {
            // Update existing flow
            (nodes[copy.id] as NodeFlow.Success).node.update { copy }
        }
    }
}

Why: Multiple calls with same data don’t trigger unnecessary flow updates or observers.


Pattern 8: Defensive Logging

Structured by Severity:

1
2
3
4
logger.d("...")  // Debug: Normal operation details
logger.i("...")  // Info: Significant events (connection established)
logger.w("...")  // Warning: Unexpected but handled (unknown wire type)
logger.e("...", exception)  // Error: Failures that need attention

Context-Rich Messages:

1
2
3
4
5
// Bad
logger.e("Error")

// Good
logger.e("Error processing wire from ${wire.id}: ${e.message}", e)

Why: When things go wrong in production, detailed logs are essential for debugging connection issues.


Summary of Thread Safety Guarantees

ComponentThread Safety MechanismProtected Operations
PeerSessionManagerMutexadd(), remove(), isKnownSession()
BeaconServicebeaconJobMutexjobs map access
HostProcessorwireProcessingMutexWire processing logic
NodeManagernodesMutexnodes map, swarm updates
Dependency InjectionSYNCHRONIZEDAll by inject() calls

Result: Safe concurrent access from multiple threads/coroutines without race conditions.

This post is licensed under CC BY 4.0 by the author.