Post

Icon Krill Connectivity & Synchronization Report

Krill Connectivity & Synchronization Report

Krill Connectivity & Synchronization Report

Krill Connectivity & Synchronization Report

Generated: 2025-12-03
Version: 1.0
Focus Areas: Node synchronization, MQTT, WebSockets, Beacons, StateFlows, Race Conditions


Table of Contents

  1. Executive Summary
  2. Architecture Overview
  3. Connectivity Flows
  4. Critical Issues Identified
  5. Race Conditions
  6. Exponential Execution Risks
  7. StateFlow Usage Issues
  8. MQTT Synchronization Problems
  9. Server-to-Server Synchronization
  10. WASM Client Special Cases
  11. Action Items
  12. Additional Findings
  13. Recommendations
  14. Prompt for Future Updates

Executive Summary

The Krill application is a distributed KMP (Kotlin Multiplatform) system with servers, clients (Android, iOS, Desktop), and WASM apps that synchronize node state across a network. The system uses:

  • Multicast UDP beacons for node discovery
  • MQTT for real-time updates (except WASM)
  • WebSockets for WASM client updates
  • REST APIs for node CRUD operations
  • StateFlows for reactive state management

Key Findings:

  • Good: Multicast beacon discovery enables automatic peer detection
  • Good: Multiple transport mechanisms for different platforms
  • ⚠️ Issues: Multiple race conditions in node updates
  • ⚠️ Issues: Potential for infinite loops in bidirectional synchronization
  • ⚠️ Issues: StateFlow update conflicts
  • ⚠️ Issues: Missing synchronization in critical sections
  • ⚠️ Issues: Broadcast storms possible with server-to-server beacons

Architecture Overview

Node Types

1
2
3
4
5
6
7
8
KrillApp (sealed class hierarchy)
├── Server - Hosts other nodes, runs processors
│   ├── Pin - GPIO control (Raspberry Pi)
│   ├── DataPoint - Data collection/processing
│   └── RuleEngine - Automation rules
├── Client - User interface applications
├── SerialDevice - Hardware integrations
└── Project - Organizational nodes

Platform Matrix

PlatformBeacon DiscoveryMQTTWebSocketREST APINode Observer
Server (JVM)✅ Send/Receive✅ Broker✅ Server✅ HostServerNodeObserver
Android✅ Receive✅ Client✅ ClientDefaultNodeObserver
iOS✅ Receive✅ Client✅ ClientDefaultNodeObserver
Desktop (JVM)✅ Receive✅ Client✅ ClientDefaultNodeObserver
WASM✅ Client⚠️ Via WebSocketNo-op observer

Key Components

  1. NodeManager - Central node registry with MutableMap<String, NodeFlow>
  2. BeaconService - Multicast UDP discovery (239.255.0.69:45317)
  3. MQTTBroker - Server-side message broker (port 8883)
  4. SharedMqttClient - Platform-specific MQTT clients
  5. ServerSocketManager - WebSocket connections for WASM
  6. NodeEventBus - Event distribution within process
  7. NodeObserver - StateFlow collectors for node updates

Connectivity Flows

1. Initial Discovery & Connection Flow

sequenceDiagram
    participant App as Client App
    participant Beacon as BeaconService
    participant NCM as NodeConnectionManager
    participant Server as Krill Server
    participant NM as NodeManager
    
    App->>NM: init()
    App->>Beacon: start() - receive beacons
    Server->>Beacon: sendBeacons(NodeWire)
    Note over Server: Every 2-5 seconds
    
    Beacon->>App: onPeerSeen(NodeWire)
    App->>NCM: connect(peer)
    NCM->>Server: GET /trust (cert)
    NCM->>Server: GET /node/{id}
    Server-->>NCM: Node (full object)
    NCM->>NM: update(node, observe=true)
    NM->>NM: Store in nodes map
    NM->>NM: Start StateFlow observer
    
    Note over App,Server: Connection established

2. Node Update Propagation (Normal Flow)

sequenceDiagram
    participant User as User Action
    participant App as Client App
    participant Server as Krill Server
    participant MQTT as MQTT Broker
    participant Apps as Other Apps
    participant Beacon as BeaconService
    
    User->>App: Edit/Update Node
    App->>App: nm.update(node, post=true)
    App->>Server: POST /node/{id}
    
    Server->>Server: nm.update(node)
    Server->>Server: fileOperations.update(node)
    Server->>Server: NodeEventBus.post(node)
    
    Server->>MQTT: publish(node)
    Server->>Server: ServerSocketManager.broadcast(node)
    Server->>Beacon: sendBeacons(node.toWire())
    
    MQTT-->>Apps: MQTT message
    Apps->>Apps: nm.update(node)
    Apps->>Apps: StateFlow.update()
    
    Note over Apps: UI auto-updates via StateFlow

3. Server-to-Server Synchronization Flow

sequenceDiagram
    participant S1 as Server 1
    participant B1 as Beacon Service
    participant S2 as Server 2
    participant B2 as Beacon Service
    participant NC as NodeConnectionManager
    
    S1->>B1: sendBeacons(self)
    B1-->>B2: Multicast packet
    B2->>S2: onPeerSeen(Server1Wire)
    
    S2->>S2: readNode(Server1.id)
    alt Node not found
        S2->>NC: connect(Server1Wire)
        NC->>S1: GET /node/{id}
        S1-->>NC: Node object
        NC->>S2: nm.update(node, observe=true)
    else Node found but in ERROR
        S2->>S2: update state to PAIRING
        S2->>NC: connect(Server1Wire)
    else Node found and OK
        S2->>S2: Skip connection
    end
    
    Note over S1,S2: ⚠️ ISSUE: No child node sync!

4. WASM Client Update Flow

sequenceDiagram
    participant WASM as WASM App
    participant WS as WebSocket
    participant Server as Server
    participant MQTT as MQTT Broker
    
    WASM->>Server: GET /health
    Server-->>WASM: Host Node
    WASM->>Server: GET /nodes
    Server-->>WASM: List<Node>
    WASM->>WASM: nm.update() for each
    
    WASM->>Server: WebSocket connect /ws
    Server->>WASM: WebSocket established
    
    Note over WASM,Server: WASM cannot make REST calls to other servers (CORS)
    
    loop Node updates
        Server->>MQTT: publish(node)
        Server->>WS: sendSerialized(SocketData(node))
        WS-->>WASM: SocketData
        WASM->>WASM: nm.update(node)
    end
    
    User->>WASM: Edit node
    WASM->>WS: sendSerialized(SocketData(node))
    WS->>Server: Receive update
    Server->>Server: nm.update(node)
    
    Note over Server: Server propagates to swarm

5. Background Processor Execution Flow

sequenceDiagram
    participant Proc as Processor (exec)
    participant Node as Node StateFlow
    participant Obs as ServerNodeObserver
    participant Chain as buildChain()
    participant EB as NodeEventBus
    
    Proc->>Node: update snapshot/value
    Node->>Node: StateFlow.update()
    Node->>Obs: Flow collector triggered
    
    Obs->>Obs: Check state == EXECUTED
    Obs->>Chain: buildChain().invoke(scope)
    
    Chain->>Chain: exec(node) - current processor
    Chain->>Chain: delay(100ms)
    
    loop For each child
        Chain->>Chain: childChain()
        Note over Chain: Recursive execution
    end
    
    Obs->>Obs: nm.update(state=NONE)
    Obs->>EB: NodeEventBus.post(node)
    
    EB->>EB: MQTT publish
    EB->>EB: WebSocket broadcast
    EB->>EB: Beacon send
    
    Note over Proc,EB: ⚠️ ISSUE: Recursive chain can amplify!

6. Node Deletion Cascade Flow

sequenceDiagram
    participant User
    participant App
    participant NM as NodeManager
    participant Server
    participant MQTT
    
    User->>App: Delete node
    App->>Server: DELETE /node/{id}
    Server->>NM: delete(node)
    
    NM->>NM: observer.remove(id)
    NM->>NM: fileOperations.delete(id)
    NM->>NM: nodes.remove(id)
    NM->>Server: nodeHttp.deleteNode(node)
    NM->>NM: NodeEventBus.post(state=DELETING)
    
    NM->>NM: Find children (parent==id)
    
    loop For each child
        NM->>NM: scope.launch { delete(child) }
        Note over NM: ⚠️ Recursive deletion
    end
    
    Server->>MQTT: Publish delete event
    Note over NM: ⚠️ ISSUE: Race with concurrent creates!

Critical Issues Identified

1. StateFlow Update Race Conditions

Location: NodeManager.update()

1
2
3
4
5
6
7
8
9
// Lines 138-148 in NodeManager.kt
if (nodes[node.id] == null) {
    nodes[node.id] = NodeFlow.Success(MutableStateFlow(copy))
    copy.type.exec(scope, copy)
} else {
    if (nodes[node.id] is NodeFlow.Success) {
        (nodes[node.id] as NodeFlow.Success).node.update { copy }
    }
}

Issues:

  1. Check-then-act race: Between checking nodes[node.id] == null and setting it, another thread could insert
  2. No mutex protection on the nodes map
  3. StateFlow.update doesn’t guarantee atomic read-modify-write across the map lookup

Impact: Duplicate node entries, lost updates, inconsistent state


2. NodeManager.update() Concurrent Modification

Location: NodeManager.update() lines 118-167

Issues:

  1. The nodes map is accessed without synchronization
  2. Multiple concurrent calls to update() for the same node can race
  3. readNode() can return stale data during an update
  4. post flag triggers async POST but doesn’t wait for completion

Scenario:

1
2
3
4
Thread 1: update(node1, post=true)  -> starts POST
Thread 2: update(node1, post=false) -> reads node before POST completes
Thread 3: MQTT receives node1       -> another update
Result: Last write wins, middle update lost

3. Infinite Beacon Loop Between Servers

Location: Lifecycle.kt lines 84-107 + Beacon.kt

Current Flow:

  1. Server A updates a node
  2. NodeEventBus broadcasts the event
  3. Event handler calls Multicast(scope).sendBeacons(node.toWire())
  4. Server B receives beacon
  5. Server B updates the node via nm.refresh()
  6. Server B’s NodeEventBus broadcasts
  7. Server B sends beacon back to Server A
  8. Infinite loop!

Code Evidence:

1
2
// Lifecycle.kt:92
scope.launch { Multicast(scope).sendBeacons(node.toWire()) }

Missing: Beacon deduplication, timestamp checking, or originator tracking


4. MQTT Publish Without Connection Check

Location: MQTTBroker.publish() lines 61-83

1
2
3
4
5
6
7
8
9
10
suspend fun publish(node: Node) {
    publishMutex.withLock {
        val clients = broker.getConnectedClientIds()
        if (clients.isEmpty()) {
            Logger.d("MQTT Broker: no clients connected, skipping publish")
            return
        }
        broker.publish(...)
    }
}

Issues:

  1. Check clients.isEmpty() then publish is not atomic
  2. Client can disconnect between check and publish
  3. No retry mechanism for transient failures
  4. Exception handling logs but doesn’t propagate

5. Recursive buildChain() Execution

Location: NodeFunctions.kt lines 10-33

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun NodeFlow.Success.buildChain(): Chain {
    val children = nm.nodes()
        .filter { it.parent == this.node.value.id }
        .map { nm.readNode(it.id) as NodeFlow.Success }
    
    val childChains = children.map { child -> child.buildChain() }
    
    return {
        this@buildChain.exec(this)
        delay(100)
        for (childChain in childChains) {
            childChain()
        }
    }
}

Issues:

  1. Exponential execution: If node tree is deep, execution time grows exponentially
  2. No cycle detection: Circular parent references cause infinite recursion
  3. No timeout: Long chains can block indefinitely
  4. Memory: All chains built upfront, not lazily
  5. Concurrent modifications: nm.nodes() can change during iteration

Example:

1
2
3
4
5
6
7
8
9
10
11
Server (3 children)
  ├── DataPoint A (2 children)
  │   ├── Trigger 1
  │   └── Trigger 2
  ├── DataPoint B (2 children)
  │   ├── Trigger 3
  │   └── Trigger 4
  └── DataPoint C
  
Execution count: 1 + 3 + 4 = 8 exec() calls
With 10 levels: Thousands of exec() calls

6. Server-to-Server Sync Missing Children

Location: Lifecycle.kt lines 155-166 and NodeConnectionManager.kt

Current Behavior: When Server B discovers Server A via beacon:

  1. It downloads Server A’s host node only
  2. It does NOT download Server A’s child nodes
  3. Child nodes only sync if they individually send beacons or MQTT

Code:

1
2
3
4
// NodeConnectionManager.kt:24
val host = response.body<Node>()
nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true)
// ⚠️ Only updates the server node, not children!

Expected: Should call GET /nodes to fetch all children owned by that server


7. WASM Update Reflection

Location: ServerSocketManager.jvm.kt lines 38-42

1
2
3
4
5
val socketData = receiveDeserialized<SocketData>(...)
val host = nm.readNode(socketData.node.host) as NodeFlow.Success

if (host.node.value.id == installId())
    nm.update(socketData.node)

Issues:

  1. Cast without check: as NodeFlow.Success can throw
  2. Only updates if hosted locally: Remote nodes from WASM are ignored
  3. No broadcasting: WASM update isn’t propagated to MQTT/other WebSockets
  4. CORS workaround incomplete: WASM sends to host, but host doesn’t relay to other servers

8. NodeObserver StateFlow Collection Overlap

Location: DefaultNodeObserver.kt lines 19-46

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
override fun observe(flow: NodeFlow.Success) {
    if (!SystemInfo.isServer() && !jobs.containsKey(flow.node.value.id)) {
        val n = flow.node
        jobs[flow.node.value.id] = scope.launch {
            // ...
            if (jobs.containsKey(n.value.id)) {
                jobs[n.value.id]?.cancel()  // ⚠️ Cancels itself!
            }
            jobs[n.value.id] = scope.launch {
                try {
                    flow.node.collect(collector)
                    // ...
                }
            }
        }
    }
}

Issues:

  1. Self-canceling: New job cancels itself immediately (lines 37-39)
  2. Double launch: Launches a job that launches another job
  3. Key collision: Both use same key n.value.id
  4. Race: jobs.containsKey() check not synchronized with map mutation

9. Beacon Timestamp Not Used

Location: NodeWire.kt and BeaconService.kt

1
2
3
4
5
data class NodeWire(
    val timestamp: Long,  // ⚠️ Generated but never checked
    val id: String,
    // ...
)

Issues:

  1. Timestamp is sent but receivers don’t use it for deduplication
  2. Old beacons can overwrite newer state
  3. No expiration of stale beacons
  4. Network delays can cause out-of-order updates

10. File Operations Not Atomic

Location: NodeManager.kt lines 229-234

1
2
3
4
5
6
7
private suspend fun post(node: Node) {
    if (node.host == installId) {
        fileOperations.update(node)
    } else {
        nodeHttp.postNode(node)
    }
}

Issues:

  1. fileOperations.update() implementations are platform-specific
  2. No guarantee of atomic write
  3. Concurrent writes can corrupt files
  4. No write-ahead log or journaling
  5. Crashes during write leave partial state

Race Conditions

Summary Table

#Race ConditionLocationSeverityImpact
RC1Map check-then-setNodeManager.update() L138HIGHDuplicate nodes, lost updates
RC2MQTT client disconnectMQTTBroker.publish() L64MEDIUMFailed publishes
RC3NodeObserver job mapDefaultNodeObserver.observe() L36HIGHObservers don’t start
RC4File write concurrentFileOperations.update()MEDIUMCorrupted persistence
RC5Swarm set updateNodeManager.updateSwarm() L40LOWInconsistent swarm view
RC6Delete vs createNodeManager.delete() L170MEDIUMZombie nodes
RC7StateFlow vs mapNodeManager.update() L145HIGHFlow emits wrong value
RC8Beacon dedupBeaconService receiveMEDIUMDuplicate processing
RC9Connection setNodeConnectionManager L15LOWDuplicate connections
RC10MQTT topic setSharedMqttClient.jvm.kt L119MEDIUMDuplicate subscriptions

Exponential Execution Risks

1. buildChain() Recursion

Complexity: O(2^depth) in worst case for binary tree

Mitigation Needed:

  • Add max depth limit (e.g., 10 levels)
  • Implement cycle detection
  • Use iterative traversal instead of recursion
  • Add timeout per chain execution

2. Delete Cascade

Location: NodeManager.delete() L183-188

1
2
3
4
5
nodes().filter { n -> n.parent == node.id }.forEach { n ->
    scope.launch {
        delete(n)  // ⚠️ Recursive
    }
}

Risk: Deleting a parent with 100 children → 100 concurrent launches → each may have children → exponential

Mitigation:

  • Add deletion depth limit
  • Use iterative BFS/DFS
  • Batch deletions

3. Beacon Storm

Scenario:

  • 5 servers on network
  • Node update triggers beacon from each
  • Each server receives 4 beacons
  • Each beacon triggers update + beacon send
  • 5^n potential messages

Mitigation:

  • Rate limit beacon sends
  • Deduplicate by timestamp
  • Only beacon on user-initiated changes, not MQTT updates

4. MQTT Subscription Explosion

Location: SharedMqttClient.jvm.kt L118

1
2
3
4
5
6
7
8
9
actual override suspend fun subscribe(node: List<Node>) {
    val newTopics = topicMutex.withLock {
        val topics = node.map { "/${it.id}" }
        val toSubscribe = topics.filter { it !in subscribedTopics }
        subscribedTopics.addAll(toSubscribe)
        toSubscribe
    }
    // ...
}

Risk: If 1000 nodes × 10 clients = 10,000 subscriptions

Mitigation:

  • Use wildcard topics (e.g., /server-id/#)
  • Subscribe to parent topics only
  • Implement topic hierarchy

StateFlow Usage Issues

1. MutableStateFlow in NodeFlow.Success

Location: NodeFlow.kt L12

1
2
3
4
data class Success(
    val node: MutableStateFlow<Node>,
    val instance: String = Uuid.random().toString()
) : NodeFlow(...)

Issues:

  1. Exposed mutability: Callers can directly update the flow
  2. No encapsulation: No control over who updates
  3. Equality: Two Success with same node but different instance aren’t equal

Better Design:

1
2
3
4
5
6
7
8
9
class Success(
    private val _node: MutableStateFlow<Node>
) : NodeFlow(...) {
    val node: StateFlow<Node> = _node.asStateFlow()
    
    fun update(transform: (Node) -> Node) {
        _node.update(transform)
    }
}

2. StateFlow.update Without Transaction

Location: NodeManager.update() L145

1
(nodes[node.id] as NodeFlow.Success).node.update { copy }

Issues:

  1. update lambda receives current value, returns new value
  2. But map lookup nodes[node.id] isn’t atomic with update
  3. Node could be removed from map between lookup and update

Fix:

1
2
val nodeFlow = nodes[node.id] as? NodeFlow.Success ?: return
nodeFlow.node.update { copy }

3. Hot StateFlow Collectors

Location: ServerNodeObserver.kt L27-29

1
2
3
4
n.node.collect { collectedNode ->
    Logger.i("emit ${n.node.subscriptionCount.value} ...")
    // ...
}

Issues:

  1. Nested collection: Collecting inside a collector (L28 collects n.node again)
  2. Infinite recursion risk: If collector updates the same flow
  3. Subscription leak: subscriptionCount.value suggests multiple subscribers but they’re not cleaned up

4. Concurrent StateFlow Updates

Location: ServerNodeObserver.kt L40-44

1
2
3
4
5
6
if (meta.snapshot.timestamp > cnm.snapshot.timestamp) {
    nm.update(
        collectedNode.copy(state = NodeState.NONE, meta = meta),
        post = true
    )
}

Issues:

  1. Check timestamp, then update - not atomic
  2. Another update could occur between check and update
  3. post = true triggers async HTTP call, but StateFlow updated synchronously

MQTT Synchronization Problems

1. Topic Naming Convention

Current: /${node.id} (e.g., /abc-123-def-456)

Issues:

  1. No hierarchy: Can’t subscribe to all nodes from a server
  2. No wildcards: Must subscribe to each node individually
  3. Scale: 1000 nodes = 1000 subscriptions per client

Recommendation:

1
2
3
/server/{server-id}/node/{node-id}
/server/{server-id}/node/+          # All nodes from server
/server/+/node/{specific-node}      # One node across servers

2. QoS Level

Current: Qos.AT_MOST_ONCE

Issues:

  1. Message loss: Network issues drop updates
  2. No acknowledgment: Publisher doesn’t know if received
  3. Order not guaranteed: Updates can arrive out of sequence

Recommendation:

  • Use Qos.AT_LEAST_ONCE for critical updates
  • Add sequence numbers to detect gaps
  • Implement periodic full-state sync

3. Retained Messages

Current: retain = false

Issues:

  1. Late joiners miss state: Client connecting after update doesn’t get current value
  2. Requires full sync: Must fetch via HTTP on connect

Recommendation:

  • Use retain = true for state updates
  • Clear retained messages on node delete
  • Implement retention policy (TTL)

4. No Authentication

Current: Broker accepts any client

Issues:

  1. Security: No authentication or authorization
  2. Spoofing: Malicious client can publish fake updates
  3. DoS: Flood broker with messages

Recommendation:

  • Enable username/password authentication
  • Use client certificates
  • Implement ACLs per topic

5. Publish Mutex Doesn’t Prevent Reordering

Location: MQTTBroker.kt L23, L62

1
2
3
4
5
6
7
8
private val publishMutex = Mutex()

suspend fun publish(node: Node) {
    publishMutex.withLock {
        // ...
        broker.publish(...)
    }
}

Issues:

  1. Only prevents concurrent publishes to broker
  2. Doesn’t prevent reordering at receiver
  3. Network can deliver out of order
  4. Multiple servers publishing same node ID

Example:

1
2
3
4
Server A: publish(node v1) at T1
Server B: publish(node v2) at T2
Client receives: v2, then v1 (due to network delay)
Result: Client has old v1 state

6. MQTT Client Reconnection

Location: SharedMqttClient.jvm.kt L22-95

Issues:

  1. No automatic reconnection: If disconnected, stays disconnected
  2. Subscriptions lost: Must resubscribe after reconnect
  3. onDisconnected callback: Called but doesn’t retry
  4. Topic tracking: subscribedTopics.clear() on disconnect (L85) loses subscription intent

Fix Needed:

1
2
3
4
5
6
7
8
9
10
11
private var shouldReconnect = true
private var reconnectJob: Job? = null

override fun onDisconnected(node: Node) {
    if (shouldReconnect) {
        reconnectJob = scope.launch {
            delay(5000)  // Exponential backoff
            connect(onConnected, onDisconnected)
        }
    }
}

Server-to-Server Synchronization

Current State

What Works:

  • Server A broadcasts beacon every 2-5 seconds
  • Server B receives beacon and connects
  • Server B downloads Server A’s host node only
  • Server B observes Server A’s host node

What’s Missing:

  1. Child Node Sync
    1
    2
    3
    
    // NodeConnectionManager.kt only gets host
    val host = response.body<Node>()
    nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true)
    

    Should be:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    val host = response.body<Node>()
    nm.update(node = host, observe = true)
       
    // Fetch all children
    val children = httpClient.get("${peer.url}/nodes")
        .body<List<Node>>()
        .filter { it.host == host.id }
       
    children.forEach { child ->
        nm.update(child, observe = true)
    }
    
  2. MQTT Cross-Server Subscribe
    • Servers don’t subscribe to other servers’ MQTT brokers
    • Updates on Server A don’t reach Server B via MQTT
    • Only beacons propagate (every 2-5 seconds, with dedup issues)
  3. Bidirectional Updates
    graph LR
        S1[Server 1] -->|Beacon| S2[Server 2]
        S2 -->|Beacon| S1
        S1 -->|Update node| S1
        S1 -.->|No MQTT| S2
        S2 -.->|No MQTT| S1
    
  4. Conflict Resolution
    • No vector clocks or version numbers
    • No last-write-wins timestamp enforcement
    • Updates can be lost if both servers modify same node

Synchronization Scenarios

Scenario 1: New Node Created on Server A

1
2
3
4
5
6
7
8
9
10
1. User on Server A creates DataPoint X
2. Server A: nm.update(X, post=true)
3. Server A: fileOperations.create(X)
4. Server A: NodeEventBus.post(X)
5. Server A: MQTT.publish(X)         → Local clients get update
6. Server A: Beacon.send(X.toWire()) → Sent every 2-5 sec
7. Server B: Beacon.receive(X)
8. Server B: nm.refresh(X)
9. Server B: HTTP GET /node/X        → ✅ Server B gets node
10. Server B: nm.update(X)

Time to propagate: 2-5 seconds (beacon interval)
Issue: Beacon storms if many nodes created

Scenario 2: Node Updated on Server A

1
2
3
4
5
6
7
8
1. Background processor on Server A updates DataPoint X
2. Server A: StateFlow.update(X)
3. Server A: NodeEventBus.post(X)
4. Server A: MQTT.publish(X)         → Local clients get update
5. Server A: Beacon.send(X.toWire())
6. Server B: Beacon.receive(X)
7. Server B: nm.refresh(X)
8. Server B: HTTP GET /node/X        → ✅ Gets latest

Issue: Server B doesn’t subscribe to Server A’s MQTT, so only gets updates via beacons (2-5 sec delay)

Scenario 3: Concurrent Updates on Both Servers

1
2
3
4
5
6
1. Server A: DataPoint X = {value: 100, timestamp: T1}
2. Server B: DataPoint X = {value: 200, timestamp: T2}
3. Server A: Beacon.send(X)
4. Server B: Beacon.send(X)
5. Server A receives B's beacon: nm.refresh(X) → HTTP GET → {value: 200, T2}
6. Server B receives A's beacon: nm.refresh(X) → HTTP GET → {value: 100, T1}

Result: Both servers overwrite each other indefinitely (infinite loop)
Fix Needed: Compare timestamps, keep newer value

graph TB
    subgraph "Server A"
        A1[NodeManager A]
        A2[MQTT Broker A]
        A3[Beacon Service A]
        A4[StateFlows A]
    end
    
    subgraph "Server B"
        B1[NodeManager B]
        B2[MQTT Broker B]
        B3[Beacon Service B]
        B4[StateFlows B]
    end
    
    A3 -.->|Discover| B3
    B3 -.->|Discover| A3
    A2 <-->|MQTT Bridge| B2
    A1 -->|REST API| B1
    B1 -->|REST API| A1
    
    A4 -.->|Subscribe| B2
    B4 -.->|Subscribe| A2

Components:

  1. MQTT Bridge: Servers subscribe to each other’s MQTT brokers
  2. Timestamp-based Conflict Resolution: Use NodeWire.timestamp
  3. Periodic Full Sync: Every N minutes, exchange all node IDs and checksums
  4. Beacon Deduplication: Track last beacon timestamp per node

WASM Client Special Cases

Architecture Constraints

Why Different:

  1. CORS: Browser security prevents cross-origin REST API calls
  2. No UDP: Browser can’t send/receive multicast beacons
  3. No MQTT: No native MQTT client support in WASM

Current Solution:

  • WebSocket to hosting server
  • All updates routed through host
  • Host acts as proxy

Issues with Current Implementation

1. Single Point of Failure

graph LR
    W[WASM Client] <-->|WebSocket| S1[Host Server]
    S1 -.->|MQTT| S2[Other Servers]
    S2 -.->|No direct path| W

Issue: WASM only gets updates from its host server, not from other servers directly

2. Asymmetric Update Flow

WASM → Server:

1
2
3
4
5
6
7
// ClientSocketManager.kt
httpClient.webSocket(urlString = ws.toString()) {
    while (isActive) {
        val socketData = receiveDeserialized<SocketData>(...)
        nm.update(socketData.node)
    }
}

Server → WASM:

1
2
3
4
// ServerSocketManager.jvm.kt
val socketData = receiveDeserialized<SocketData>(...)
if (host.node.value.id == installId())
    nm.update(socketData.node)

Issue: Server only processes WASM updates if it’s the host, doesn’t relay to other servers!

3. WASM Can’t Edit Remote Nodes

Scenario:

  1. WASM client connected to Server A
  2. Node X is hosted on Server B
  3. User edits Node X in WASM
  4. WASM sends update to Server A via WebSocket
  5. Server A checks: if (host.node.value.id == installId()) → FALSE
  6. Update is silently dropped!

Fix Needed:

1
2
3
4
5
6
7
8
9
10
// ServerSocketManager.jvm.kt
val socketData = receiveDeserialized<SocketData>(...)
val targetHost = nm.readNode(socketData.node.host)

if (targetHost.id == installId()) {
    nm.update(socketData.node)
} else {
    // Relay to target server
    nodeHttp.postNode(socketData.node)
}

4. No Offline Capability

Issue: WASM has no persistence (FileOperations.wasmJs.kt is no-op)

Impact:

  • Refresh page → lose all state
  • Must re-download all nodes on reconnect
  • No caching

Recommendation:

  • Use IndexedDB for persistence
  • Cache nodes in localStorage
  • Implement service worker for offline mode

5. WebSocket Reconnection

Location: ClientSocketManager.kt

Current: Single webSocket() call, no error recovery

Issues:

  1. Connection drops → WASM stops receiving updates
  2. No reconnection logic
  3. No exponential backoff
  4. User must refresh page

Fix:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun start() {
    while (isActive) {
        try {
            connectWebSocket()
        } catch (e: Exception) {
            Logger.e("WebSocket failed", e)
            delay(5000)  // Retry
        }
    }
}

private suspend fun connectWebSocket() {
    httpClient.webSocket(...) {
        // ...
    }
}

Action Items

Critical (Fix Immediately)

  1. [CR-1] Fix NodeManager Map Synchronization
    • Add Mutex around nodes map operations
    • Ensure atomic check-then-set operations
    • File: NodeManager.kt ```kotlin private val nodesMutex = Mutex()

    suspend fun update(node: Node, …) { nodesMutex.withLock { // existing logic } } ```

  2. [CR-2] Fix Infinite Beacon Loop
    • Add originator tracking to NodeWire
    • Don’t send beacon for updates received via beacon/MQTT
    • File: Lifecycle.kt, BeaconService.kt ```kotlin data class NodeWire( val timestamp: Long, val id: String, val type: KrillApp, val url: String, val host: String, val originServer: String // NEW )

    // Only beacon if originated locally if (node.host == installId()) { Multicast(scope).sendBeacons(node.toWire()) } ```

  3. [CR-3] Fix buildChain() Infinite Recursion
    • Add cycle detection
    • Add depth limit (max 20 levels)
    • Add timeout
    • File: NodeFunctions.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      
      fun NodeFlow.Success.buildChain(
        visited: Set<String> = emptySet(),
        depth: Int = 0
      ): Chain {
        if (depth > 20) throw IllegalStateException("Max depth")
        if (node.value.id in visited) {
            return { /* no-op */ }
        }
             
        val newVisited = visited + node.value.id
        val children = nm.nodes()
            .filter { it.parent == node.value.id }
            .map { nm.readNode(it.id) as NodeFlow.Success }
             
        val childChains = children.map { 
            it.buildChain(newVisited, depth + 1)
        }
             
        return {
            withTimeout(30_000) {
                this@buildChain.exec(this)
                delay(100)
                for (childChain in childChains) {
                    childChain()
                }
            }
        }
      }
      
  4. [CR-4] Fix NodeObserver Double Launch
    • Remove self-canceling logic
    • Simplify job management
    • File: DefaultNodeObserver.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      
      override fun observe(flow: NodeFlow.Success) {
        val id = flow.node.value.id
        if (!SystemInfo.isServer() && !jobs.containsKey(id)) {
            jobs[id] = scope.launch {
                try {
                    flow.node.collect { collectedNode ->
                        when (collectedNode.state) {
                            NodeState.EXECUTED -> {
                                flow.buildChain().invoke(scope)
                            }
                            else -> {
                                Logger.w("Node not processed: ${collectedNode.state}")
                            }
                        }
                    }
                } finally {
                    Logger.w("Exited node observing job $id")
                }
            }
        }
      }
      
  5. [CR-5] Fix WASM Relay to Remote Servers
    • Make server relay WASM updates to target servers
    • File: ServerSocketManager.jvm.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      
      try {
        while (isActive) {
            val socketData = receiveDeserialized<SocketData>(...)
            val targetHost = nm.readNode(socketData.node.host)
                 
            when (targetHost) {
                is NodeFlow.Success -> {
                    if (targetHost.node.value.id == installId()) {
                        nm.update(socketData.node)
                    } else {
                        // Relay to actual host server
                        scope.launch {
                            nodeHttp.postNode(socketData.node)
                        }
                    }
                }
                is NodeFlow.Error -> {
                    Logger.e("Unknown host for node ${socketData.node.id}")
                }
            }
        }
      } catch (t: Throwable) {
        Logger.e("WebSocket error", t)
      }
      

High Priority

  1. [HP-1] Implement Server-to-Server Child Sync
    • Fetch all children when connecting to peer server
    • File: NodeConnectionManager.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      
      if (success) {
        // Get host node
        val host = httpClient.get(peer.url).body<Node>()
        nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true)
             
        // Get all children
        val allNodes = httpClient.get("${peer.url}/nodes").body<List<Node>>()
        val children = allNodes.filter { it.host == peer.id }
        children.forEach { child ->
            nm.update(child, observe = true)
        }
      }
      
  2. [HP-2] Add Timestamp-Based Conflict Resolution
    • Use NodeWire.timestamp for deduplication
    • File: NodeManager.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      
      fun update(node: Node, post: Boolean = false, observe: Boolean = false) {
        val existing = readNode(node.id)
        if (existing is NodeFlow.Success) {
            val existingTimestamp = (existing.node.value.meta as? TimestampedMetadata)?.timestamp ?: 0
            val newTimestamp = (node.meta as? TimestampedMetadata)?.timestamp ?: 0
                 
            if (newTimestamp <= existingTimestamp) {
                Logger.d("Ignoring older update for ${node.id}")
                return
            }
        }
             
        // ... rest of update logic
      }
      
  3. [HP-3] Add MQTT Reconnection Logic
    • Automatically reconnect on disconnect
    • Resubscribe to topics
    • File: SharedMqttClient.jvm.kt
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      
      actual override suspend fun connect(onConnected: (Node) -> Unit, onDisconnected: (Node) -> Unit) {
        val reconnectingOnDisconnected: (Node) -> Unit = { node ->
            onDisconnected(node)
            if (shouldReconnect) {
                scope.launch {
                    delay(5000)
                    connect(onConnected, reconnectingOnDisconnected)
                }
            }
        }
             
        // ... existing connection logic with reconnectingOnDisconnected
      }
      
  4. [HP-4] Add WebSocket Reconnection for WASM
    • Retry on connection failure
    • File: ClientSocketManager.kt ```kotlin fun start() { scope.launch { while (isActive) { try { connectWebSocket() } catch (e: Exception) { Logger.e(“WebSocket failed, retrying in 5s”, e) delay(5000) } } } }

    private suspend fun connectWebSocket() { httpClient.webSocket(urlString = ws.toString()) { while (isActive) { val socketData = receiveDeserialized(...) nm.update(socketData.node) } } } ```

  5. [HP-5] Encapsulate MutableStateFlow
    • Hide mutation, expose StateFlow
    • File: NodeFlow.kt ```kotlin data class Success( private val _node: MutableStateFlow, val instance: String = Uuid.random().toString() ) : NodeFlow(exec = { scope -> _node.value.type.exec(scope, _node.value) }) {

      val node: StateFlow = _node.asStateFlow()

      fun updateNode(transform: (Node) -> Node) { _node.update(transform) } }

    // Update all callers to use .updateNode() instead of .node.update() ```

Medium Priority

  1. [MP-1] Implement Beacon Deduplication
    • Track last beacon timestamp per node
    • File: BeaconService.kt
  2. [MP-2] Add MQTT Topic Hierarchy
    • Use /server/{id}/node/{nodeId} format
    • Enable wildcard subscriptions
  3. [MP-3] Implement MQTT QoS 1
    • Change from AT_MOST_ONCE to AT_LEAST_ONCE
    • Add sequence numbers
  4. [MP-4] Add Deletion Depth Limit
    • Prevent unbounded recursive deletion
    • File: NodeManager.kt
  5. [MP-5] Add Authentication to MQTT
    • Require username/password
    • Implement ACLs
  6. [MP-6] Implement File Write Atomicity
    • Use temp file + rename pattern
    • Add write-ahead logging
  7. [MP-7] Add WASM Persistence
    • Use IndexedDB for node storage
    • File: FileOperations.wasmJs.kt
  8. [MP-8] Rate Limit Beacon Sends
    • Debounce beacon sends (max 1 per second per node)
    • File: Lifecycle.kt
  9. [MP-9] Add Metrics/Monitoring
    • Track update latency
    • Monitor beacon traffic
    • Log MQTT message rates
  10. [MP-10] Implement Periodic Full Sync
    • Every 5 minutes, compare node checksums between servers
    • Detect and fix drift

Additional Findings

1. No Error State Recovery

Observation: Nodes can enter NodeState.ERROR but there’s no automatic recovery

Location: PeerConnector.kt L48 shows reconnection on ERROR, but only for server nodes

Recommendation:

  • Implement periodic health checks
  • Auto-retry failed operations
  • Expose retry action in UI

2. Memory Leaks in NodeManager

Issue: nodes map grows unbounded, never cleaned up

Impact: Long-running servers accumulate deleted nodes in memory

Fix:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun cleanup() {
    val staleNodes = nodes.filter { (id, flow) ->
        flow is NodeFlow.Error || 
        (flow is NodeFlow.Success && flow.node.value.state == NodeState.DELETING)
    }
    staleNodes.keys.forEach { nodes.remove(it) }
}

// Call periodically
scope.launch {
    while (isActive) {
        delay(60_000)
        cleanup()
    }
}

3. Missing Logging Context

Issue: Logs don’t include correlation IDs or request traces

Example:

1
2
Logger.i("updating node: read node file op")
// Which node? From where? Why?

Recommendation:

  • Add structured logging with context
  • Include node ID, operation, source (MQTT/HTTP/Beacon)

4. No Circuit Breaker for External Calls

Issue: HTTP calls to other servers can hang indefinitely

Location: NodeHttp.kt, NodeConnectionManager.kt

Recommendation:

1
2
3
4
5
6
7
httpClient.get(url) {
    timeout {
        requestTimeoutMillis = 5000
        connectTimeoutMillis = 3000
        socketTimeoutMillis = 5000
    }
}

5. No Backpressure Handling

Issue: Fast producers (MQTT messages) can overwhelm slow consumers (StateFlow collectors)

Recommendation:

  • Use buffer() or conflate() on StateFlows
  • Implement rate limiting on MQTT processing

6. Hardcoded Configuration

Locations:

  • DiscoveryConfig.PORT = 45317
  • MQTTBroker port = 8883
  • ServerMetaData.port

Issue: Can’t run multiple servers on same host for testing

Recommendation:

  • Use environment variables
  • Support config files

7. No Schema Versioning

Issue: Node serialization format has no version field

Impact: Breaking changes to Node/NodeMetaData break compatibility

Recommendation:

1
2
3
4
5
6
@Serializable
data class Node(
    val schemaVersion: Int = 1,  // NEW
    val id: String,
    // ...
)

8. Certificate Management

Issue: TLS certificates hardcoded at /etc/krill/certs/krill.crt

Security: Private key password is "changeit" (L28 in MQTTBroker.kt)

Recommendation:

  • Use environment variable for cert path
  • Support cert rotation
  • Use secure secret management (not hardcoded password)

9. No Transaction Support

Issue: Multi-step operations aren’t atomic

Example: Creating node + children can partially fail

Recommendation:

  • Implement transaction boundaries
  • Add rollback capability
  • Use database for persistence instead of files

10. Platform-Specific Code Duplication

Observation: MQTT clients have duplicate logic across platforms

Files:

  • SharedMqttClient.jvm.kt
  • SharedMqttClient.android.kt
  • SharedMqttClient.ios.kt

Recommendation:

  • Extract common logic to BaseMqttClient
  • Reduce duplication

Recommendations

Short Term (1-2 Weeks)

  1. Fix Critical Race Conditions
    • Add mutexes to NodeManager
    • Fix observer double launch
    • Stop infinite beacon loop
  2. Add Defensive Programming
    • Null checks before casts
    • Timeout all network calls
    • Add max depth limits
  3. Improve Observability
    • Add structured logging
    • Log correlation IDs
    • Add performance metrics

Medium Term (1-2 Months)

  1. Redesign Server-to-Server Sync
    • Implement MQTT bridge between servers
    • Add timestamp-based conflict resolution
    • Fetch all children on peer discovery
  2. Harden MQTT
    • Upgrade to QoS 1
    • Add authentication
    • Implement topic hierarchy
  3. Improve WASM Support
    • Add IndexedDB persistence
    • Implement proper relaying
    • Add WebSocket reconnection

Long Term (3-6 Months)

  1. Replace File Persistence with Database
    • Use SQLite for local storage
    • Add transaction support
    • Enable atomic updates
  2. Implement Event Sourcing
    • Log all node changes as events
    • Replay for consistency
    • Enable audit trail
  3. Add Consensus Protocol
    • Use Raft or similar for multi-server consistency
    • Elect leader for coordination
    • Handle network partitions
  4. Performance Optimization
    • Batch MQTT publishes
    • Implement caching layer
    • Optimize StateFlow collectors

Prompt for Future Updates

Use this prompt to update this report in the future:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
This KMP codebase has apps for different platforms and a Ktor server. Apps launch, initialize, and wait for peers to advertise themselves as servers on the same network using UDP multicast beacons. When an app sees a beacon from a server (see KrillApp.Server), it downloads and displays all child nodes that server owns in a graph of connected nodes.

Apps connect over MQTT for real-time updates. If a user edits or deletes a node, we POST to the server host node, which processes the change from the REST POST/DELETE, broadcasts the change to other connected apps over MQTT, and advertises it via BeaconService. The server also runs background processes that update and execute chains of nodes and cleanly informs clients of new info over MQTT.

The WASM module is an exception: it doesn't use MQTT and can't POST to other servers due to CORS. There's a dedicated WebSocket connection for the WASM client to get updates from the hosting server. Instead of making REST calls to servers, WASM sends changes to its host, which informs the rest of the swarm.

**Server nodes also act as apps** and should get beacons from other servers so everyone has a synchronized list of nodes in NodeManager.

**Your Task:**
1. Do a deep dive into the connectivity and synchronization process
2. Update ConnectivityReport.md with your findings
3. Use Mermaid syntax for diagrams
4. Analyze flows and identify issues that would cause:
   - Updates to go wrong
   - Race conditions
   - Exponential executions
   - Infinite loops
5. Create Mermaid diagrams showing logic errors
6. Provide a list of action items to harden the flow
7. Include findings, recommendations, and notes
8. Update this prompt based on what you learn

**Key Files to Review:**
- `krill-sdk/src/commonMain/kotlin/krill/zone/KrillApp.kt` - Application architecture
- `krill-sdk/src/commonMain/kotlin/krill/zone/node/NodeManager.kt` - Node management and StateFlows
- `krill-sdk/src/commonMain/kotlin/krill/zone/beacon/BeaconService.kt` - Multicast discovery
- `krill-sdk/src/jvmMain/kotlin/krill/zone/io/SharedMqttClient.jvm.kt` - MQTT client
- `server/src/main/kotlin/krill/zone/server/mqtt/MQTTBroker.kt` - MQTT broker
- `krill-sdk/src/commonMain/kotlin/krill/zone/io/ClientSocketManager.kt` - WASM WebSocket
- `shared/src/jvmMain/kotlin/krill/zone/server/ServerSocketManager.jvm.kt` - Server WebSocket
- `server/src/main/kotlin/krill/zone/server/Routes.kt` - REST API
- `server/src/main/kotlin/krill/zone/server/Lifecycle.kt` - Server lifecycle and events

**Focus Areas:**
- StateFlow usage and concurrent updates
- MQTT message handling and synchronization
- WebSocket communication for WASM
- Server-to-server synchronization via beacons
- Race conditions in NodeManager
- Infinite loops in beacon/MQTT propagation
- buildChain() recursive execution
- Node deletion cascades
- Error handling and recovery

This code is in early development, so look for logic flaws and bugs.

Conclusion

The Krill connectivity system has a solid foundation with multi-transport support (beacons, MQTT, WebSockets, REST) and cross-platform capabilities. However, it suffers from several critical issues related to concurrency, synchronization, and distributed system consistency.

Immediate Action Required:

  1. Fix race conditions in NodeManager
  2. Stop infinite beacon loops
  3. Add recursion limits to buildChain()
  4. Fix WASM relay logic

Architecture Improvements Needed:

  1. Proper server-to-server synchronization
  2. Timestamp-based conflict resolution
  3. MQTT reconnection and QoS upgrades
  4. Transaction boundaries for multi-step operations

Long-Term Vision: Consider evolving toward a more robust distributed architecture with:

  • Consensus protocol for multi-server deployments
  • Event sourcing for auditability
  • Database-backed persistence
  • Proper circuit breakers and backpressure

The current implementation works for single-server or trusted network scenarios but needs hardening for production use with multiple servers and untrusted networks.


Report Version: 1.0
Last Updated: 2025-12-03
Next Review: After implementing critical action items

This post is licensed under CC BY 4.0 by the author.