Post

Icon Snapshot Data Lifecycle Deep Dive

Comprehensive analysis of Snapshot data lifecycle including client-server communication, processing flows, deduplication mechanisms, and identified bugs with fixes

Snapshot Data Lifecycle Deep Dive

Snapshot Data Lifecycle Deep Dive

Overview

This document provides a comprehensive analysis of how Snapshot data flows through the Krill system, from user input on the client to storage on the server, and back to all connected clients via WebSocket. It identifies critical bugs, redundancies, and provides actionable TODO items with agent prompts for fixes.

Core Components

Snapshot Data Class

1
2
3
4
data class Snapshot(
    val timestamp: Long = 0L,
    val value: String = ""
)

The Snapshot class represents a point-in-time value for a DataPoint. It contains:

  • timestamp: Epoch milliseconds when the value was captured
  • value: String representation of the data (can be parsed as Double for numeric types)

NodeState Enum (Key States)

1
2
3
4
5
6
enum class NodeState {
    // ... other states ...
    EXECUTED,        // Node action requested (e.g., toggle BOOL, run trigger)
    SNAPSHOT_UPDATE, // Data value changed (e.g., sensor reading, user input)
    // ... other states ...
}

Key Classes

ClassPurpose
SnapshotPoint-in-time value with timestamp
SnapshotProcessorQueue for pending snapshots before storage
SnapshotTrackerTracks processed timestamps to detect stale data
SnapshotQueueServiceDrains queue and persists to DataStore
DataPointMetaDataContains the current snapshot value
ServerDataPointProcessorProcesses DataPoint state changes
NodeTrafficControlTransaction ID deduplication

Client-Server Snapshot Flow

User Input to Server Storage

sequenceDiagram
    participant User
    participant UI as Compose UI
    participant CNM as ClientNodeManager
    participant TC as TrafficControl
    participant HTTP as NodeHttp
    participant Server as Ktor Server
    participant SNM as ServerNodeManager
    participant NE as NodeEventBus
    participant SDP as ServerDataPointProcessor
    participant SP as SnapshotProcessor
    participant SQS as SnapshotQueueService
    participant DS as DataStore

    User->>UI: Enter new value
    UI->>CNM: nodeManager.updateSnapshot(node)
    CNM->>CNM: Generate tid, set SNAPSHOT_UPDATE
    CNM->>TC: trafficControl.record(tid)
    CNM->>HTTP: postNode(host, node)
    HTTP->>Server: POST /node/{id}
    Server->>TC: Check tid exists
    alt Duplicate TID
        Server-->>HTTP: 409 Conflict
    else New TID
        Server->>TC: record(tid)
        Server->>SNM: updateSnapshot(node)
        SNM->>SNM: updateInternal() - verify()
        SNM->>SDP: emit via StateFlow
        SDP->>SDP: processMyDataPoint()
        SDP->>SP: enqueue(node, snapshot)
        SP->>SQS: size.collect triggers drain
        SQS->>DS: post(node) - persist to file
        SNM->>NE: broadcast(node)
        NE->>WebSocket: sendSerialized(node)
        Server-->>HTTP: 200 OK
    end

Server to Client WebSocket Broadcast

sequenceDiagram
    participant SNM as ServerNodeManager
    participant NE as NodeEventBus
    participant SM as ServerSocketManager
    participant WS as WebSocket
    participant CSM as ClientSocketManager
    participant TC as TrafficControl
    participant CNM as ClientNodeManager
    participant UI as Compose UI

    SNM->>NE: broadcast(node)
    NE->>SM: broadcast(node)
    SM->>WS: sendSerialized(node)
    WS->>CSM: receiveDeserialized()
    CSM->>TC: contains(tid)?
    alt TID is mine
        CSM->>CSM: Skip (ack of my POST)
    else TID is new
        CSM->>CNM: update(node)
        CNM->>CNM: Check timestamp > existing
        CNM->>UI: StateFlow.update()
        UI->>UI: Recompose with new value
    end

Server-Side Processing Flow

SNAPSHOT_UPDATE Processing

flowchart TD
    A[Node StateFlow emits] --> B{Check node.state}
    B -->|SNAPSHOT_UPDATE| C{node.isMine()?}
    C -->|Yes| D[processMyDataPoint]
    C -->|No| E[process - send to external server]
    
    D --> F{Duplicate snapshot?}
    F -->|Yes| G[Skip - return true]
    F -->|No| H{Snapshot stale?}
    H -->|Yes| I[Skip - return true]
    H -->|No| J[snapshotTracker.post]
    J --> K[nodeManager.running]
    K --> L[snapshotProcessor.enqueue]
    L --> M[handleSpecialEvents]
    M --> N{Parent is Zigbee?}
    N -->|Yes| O[zigbee.sendCommand]
    N -->|No| P[Return success]
    O --> P
    
    E --> Q[Find host server]
    Q --> R[nodeHttp.postNode]
    R --> S[nodeManager.complete]

EXECUTED Processing (DataPoint with BOOL Type)

flowchart TD
    A[Node StateFlow emits] --> B{Check node.state}
    B -->|EXECUTED| C[executeDataPoint]
    C --> D{meta.dataType?}
    D -->|BOOL| E{snapshot.value <= 0?}
    E -->|Yes - is OFF| F["updateSnapshot(value=1)"]
    E -->|No - is ON| G["updateSnapshot(value=0)"]
    F --> H[SNAPSHOT_UPDATE triggers]
    G --> H
    D -->|Other| I[Return true - no action]
    
    style F fill:#f96,stroke:#333
    style G fill:#f96,stroke:#333

⚠️ BUG IDENTIFIED: This toggle logic causes problems when a Connection executor sets a target DataPoint to EXECUTED state with a specific value - the value gets toggled instead of preserved!


Connection Executor Flow

Current (Buggy) Behavior

flowchart TD
    A[Connection EXECUTED] --> B[Get source snapshot]
    B --> C[Get target node]
    C --> D["Copy snapshot to target"]
    D --> E["target.state = EXECUTED ⚠️"]
    E --> F[ServerDataPointProcessor.post]
    F --> G{DataType.BOOL?}
    G -->|Yes| H["executeDataPoint TOGGLES value! ⚠️"]
    G -->|No| I[No toggle - OK]
    H --> J[Value is now OPPOSITE]
    
    style E fill:#f96,stroke:#333
    style H fill:#f00,stroke:#333

Problem Scenario: Zigbee Plug Control

sequenceDiagram
    participant Plug as Zigbee Plug
    participant ZB as ServerZigbeeBoss
    participant DP1 as DataPoint (Zigbee)
    participant CONN as Connection
    participant DP2 as Target DataPoint
    participant SDP as ServerDataPointProcessor

    Note over Plug: User turns ON plug manually
    Plug->>ZB: ReportAttributesCommand (value=1)
    ZB->>DP1: updateSnapshot(value="1")
    Note over DP1: State: SNAPSHOT_UPDATE
    DP1->>CONN: Execute child (Connection)
    CONN->>DP2: Copy value="1", state=EXECUTED
    Note over DP2: State: EXECUTED, value="1"
    DP2->>SDP: post(node)
    SDP->>SDP: executeDataPoint() - BOOL toggle!
    SDP->>DP2: updateSnapshot(value="0") ⚠️
    Note over DP2: Now value="0" (toggled!)
    DP2->>ZB: handleSpecialEvents
    ZB->>Plug: turnOff() ⚠️
    Note over Plug: Plug turns OFF immediately!

The plug immediately turns off because the Connection executor sets EXECUTED state, which triggers the toggle logic!


Transaction ID (tid) Deduplication

Purpose

The tid (Transaction ID) prevents duplicate processing when:

  1. Client POST reaches server, server broadcasts update
  2. Client receives its own update back via WebSocket
  3. Without tid check, client would re-POST, creating a loop

Flow

flowchart TD
    A[Client: Generate tid] --> B[trafficControl.record tid]
    B --> C[POST to server]
    C --> D[Server: Check tid exists]
    D -->|Exists| E[409 Conflict - duplicate]
    D -->|New| F[Server: record tid]
    F --> G[Process node]
    G --> H[Broadcast via WebSocket]
    H --> I[Client receives broadcast]
    I --> J[trafficControl.contains tid?]
    J -->|Yes - my tid| K[Skip - this is ack of my POST]
    J -->|No| L[Update local node]
    
    style K fill:#9f9,stroke:#333

Cleanup

1
2
3
4
5
6
7
// NodeTrafficControl - auto-cleanup after 20 seconds
scope.launch {
    delay(20000)
    mutex.withLock {
        map.remove(tid)
    }
}

Snapshot Queue Processing

Queue Architecture

flowchart LR
    subgraph Input
        A[processMyDataPoint] --> B[snapshotProcessor.enqueue]
    end
    
    subgraph Queue
        B --> C[SnapshotQueue]
        C --> D[Deduplicate by timestamp]
        D --> E[Check if stale]
    end
    
    subgraph Service
        E --> F[SnapshotQueueService]
        F --> G[size StateFlow triggers]
        G --> H[drainProcessableSnapshots]
        H --> I[Sort by timestamp]
        I --> J[dataStore.post each]
    end
    
    subgraph Storage
        J --> K[ServerDataStore]
        K --> L[File: data/YYYY/MM/DD/nodeId]
    end

SnapshotTracker Staleness Check

The SnapshotTracker maintains a map of node IDs to their last processed timestamps:

1
2
3
4
5
6
7
// map: MutableMap<String, Long> - tracks last processed timestamp per node ID
suspend fun stale(node: Node): Boolean {
    val meta = node.meta as DataPointMetaData
    if (meta.snapshot.timestamp <= 0L) return true  // Invalid timestamp
    if (map[node.id] == null) return false           // Never seen - not stale
    return map[node.id]!! >= meta.snapshot.timestamp // Already processed newer/equal
}

Identified Bugs and Issues

Bug 1: BOOL Toggle on Connection Copy

Location: ServerConnectionProcessor.process() method, in the main snapshot copy block
Root Cause: Setting state = NodeState.EXECUTED on target triggers toggle
Impact: Zigbee plugs turn off immediately when turned on manually

flowchart TD
    A["Connection.process()"] --> B["target.state = EXECUTED"]
    B --> C["ServerDataPointProcessor sees EXECUTED"]
    C --> D["executeDataPoint() toggles BOOL"]
    D --> E["Value becomes opposite ⚠️"]
    
    style B fill:#f96,stroke:#333
    style D fill:#f00,stroke:#333

Fix: Use NodeState.SNAPSHOT_UPDATE instead of NodeState.EXECUTED

Bug 2: executeDataPoint Return Value Inconsistency

Location: ServerDataPointProcessor.executeDataPoint() method, BOOL toggle logic
Issue: Returns true when toggling to 1, false when toggling to 0
Impact: May cause false error states

1
2
3
4
5
6
7
return if (meta.snapshot.value.toDouble() <= 0) {
    nodeManager.updateSnapshot(/*...*/ value = "1")
    true  // ✅ Success
} else {
    nodeManager.updateSnapshot(/*...*/ value = "0")
    false  // ❌ False means ERROR state!
}

Fix: Return true in both cases since both are successful operations

Bug 3: ConnectionResolver.execute vs ServerConnectionProcessor.process Inconsistency

Location: Two different implementations for Connection execution
Issue: ConnectionResolver.execute() uses SNAPSHOT_UPDATE (correct), but ServerConnectionProcessor.process() uses EXECUTED (incorrect)

1
2
3
4
5
// ConnectionResolver.execute() - CORRECT
nodeManager.updateSnapshot(targetNode.copy(...))

// ServerConnectionProcessor.process() - INCORRECT  
nodeManager.update(target.value.copy(state = NodeState.EXECUTED, ...))

Bug 4: Potential Race in SnapshotTracker

Location: SnapshotTracker class, stale() and post() methods
Issue: Check and update not atomic - possible race condition

Bug 5: Memory Leak in processedSnapshots Cleanup

Location: ServerDataPointProcessor.processMyDataPoint() method, processedSnapshots cleanup block
Issue: Cleanup removes oldest entries but set operations may be slow


Redundancy Analysis

1. Duplicate Snapshot Checking

Snapshots are checked for duplicates in multiple places:

LocationMethodPurpose
ServerDataPointProcessorprocessedSnapshots setDedup by timestamp
SnapshotTrackerstale() checkDetect out-of-order
SnapshotQueueadd() duplicate checkQueue-level dedup
ServerNodeManagerverify() timestamp compareReject older data

Recommendation: Consolidate into single deduplication point

2. State Transition Duplication

Multiple places set state and call update:

1
2
3
4
5
6
7
8
// BaseNodeProcessor.post()
nodeManager.complete(node)

// NodeProcessExecutor.handleBaseOperations()
fileOperations.update(node.copy(state = NodeState.NONE))

// ServerDataPointProcessor.processMyDataPoint()
nodeManager.running(node)

3. Traffic Control vs Timestamp Deduplication

Both NodeTrafficControl (tid-based) and timestamp comparisons are used. Consider if both are necessary.


TODO Items with Agent Prompts

TODO 1: Fix Connection Executor State Bug

Priority: HIGH
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/executor/connection/ServerConnectionProcessor.kt

Agent Prompt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
In ServerConnectionProcessor.kt, the process() method incorrectly sets the target 
DataPoint to NodeState.EXECUTED when copying a snapshot value. This causes BOOL 
DataPoints to toggle their value instead of receiving the copied value.

Change the main snapshot copy block from:
  nodeManager.update(target.value.copy(
      timestamp = Clock.System.now().toEpochMilliseconds(),
      meta = targetMeta.copy(snapshot = newSnapshot),
      state = NodeState.EXECUTED
  ))

To:
  nodeManager.updateSnapshot(target.value.copy(
      meta = targetMeta.copy(snapshot = newSnapshot)
  ))

Note: updateSnapshot() automatically sets state to SNAPSHOT_UPDATE and generates 
a new timestamp, so we don't need to specify them explicitly. This avoids the 
toggle logic that executes for EXECUTED state.

TODO 2: Fix executeDataPoint Return Value

Priority: MEDIUM
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/datapoint/ServerDataPointProcessor.kt

Agent Prompt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
In ServerDataPointProcessor.kt, the executeDataPoint() method returns false when 
toggling a BOOL value from 1 to 0. This is incorrect because both toggle directions 
are successful operations.

Change the BOOL toggle logic in executeDataPoint() from:
  DataType.BOOL -> {
      return if (meta.snapshot.value.toDouble() <= 0) {
          nodeManager.updateSnapshot(/*...*/ value = "1")
          true
      } else {
          nodeManager.updateSnapshot(/*...*/ value = "0")
          false  // <-- BUG: should be true
      }
  }

To:
  DataType.BOOL -> {
      val newValue = if (meta.snapshot.value.toDouble() <= 0) "1" else "0"
      // updateSnapshot() will auto-generate timestamp and set SNAPSHOT_UPDATE state
      nodeManager.updateSnapshot(node.copy(
          meta = meta.copy(snapshot = Snapshot(value = newValue))
      ))
      return true  // Both directions are success
  }

Note: The Snapshot is created with just the value because updateSnapshot() 
automatically assigns a new timestamp and sets the state to SNAPSHOT_UPDATE. 
This is handled in BaseNodeManager.updateSnapshot():
  update(node.copy(state = NodeState.SNAPSHOT_UPDATE, 
                   timestamp = Clock.System.now().toEpochMilliseconds(), 
                   tid = Uuid.random().toString()))

TODO 3: Add Unit Test for Connection to BOOL DataPoint

Priority: HIGH
Files: New test file in krill-sdk/src/test/kotlin/

Agent Prompt:

1
2
3
4
5
6
7
8
9
10
Create a unit test that verifies when a Connection executor copies a value "1" 
from a source DataPoint to a target DataPoint with DataType.BOOL, the target 
receives value "1" (not toggled to "0").

Test should:
1. Create source DataPoint with snapshot value "1"
2. Create target DataPoint with DataType.BOOL and snapshot value "0"
3. Create Connection with source and target
4. Execute the Connection
5. Assert target snapshot value is "1" (copied) not "0" (toggled)

TODO 4: Consolidate Snapshot Deduplication

Priority: LOW
Files: Multiple - analysis required

Agent Prompt:

1
2
3
4
5
6
7
8
9
10
11
12
Analyze the snapshot deduplication logic across these files:
- ServerDataPointProcessor.kt (processedSnapshots set)
- SnapshotTracker.kt (stale check)
- SnapshotQueue.kt (add duplicate check)
- ServerNodeManager.kt (verify timestamp compare)

Create a proposal to consolidate these into a single, clear deduplication 
strategy. Consider:
1. What is the source of truth for "already processed"?
2. Where should deduplication happen (earliest possible point)?
3. How to handle out-of-order arrivals?
4. Memory management for tracking processed timestamps

TODO 5: Document EXECUTED vs SNAPSHOT_UPDATE Contract

Priority: MEDIUM
Files: Documentation

Agent Prompt:

1
2
3
4
5
6
7
8
9
10
11
12
13
Create documentation clarifying when to use EXECUTED vs SNAPSHOT_UPDATE states:

EXECUTED:
- Trigger an action (run cron, execute lambda, toggle BOOL)
- Implies "do something" based on current state
- For BOOL DataPoints: toggles the value

SNAPSHOT_UPDATE:
- Update data value without triggering action logic
- Implies "store this value"
- For BOOL DataPoints: stores value as-is without toggle

Add examples showing correct usage in Connection, Calculation, Lambda executors.

TODO 6: Fix Race Condition in SnapshotTracker

Priority: LOW
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/datapoint/SnapshotTracker.kt

Agent Prompt:

1
2
3
4
5
6
7
8
The SnapshotTracker has a potential race condition between stale() check and 
post() update. Both use mutex.withLock individually, but a sequence of 
stale() -> post() is not atomic.

Consider:
1. Adding a single atomic checkAndPost() method
2. Using a concurrent map with atomic operations
3. Documenting that callers must ensure exclusive access

Best Practices for Snapshot Handling

✅ DO

  1. Use updateSnapshot() for data updates - Ensures SNAPSHOT_UPDATE state
  2. Use execute() for action triggers - Correctly triggers executeDataPoint
  3. Always check isMine() before processing - Respect node ownership
  4. Use tid for cross-process deduplication - Prevents client-server echo loops
  5. Compare timestamps before updates - Reject stale data

❌ DON’T

  1. Don’t set EXECUTED state when copying data - Causes BOOL toggle
  2. Don’t use update() with manual state changes - Use helper methods
  3. Don’t skip tid recording on HTTP calls - Breaks deduplication
  4. Don’t process DELETING state nodes - Already cleaned up

This post is licensed under CC BY 4.0 by the author.