NodeManager StateFlow Architecture Review
Comprehensive analysis of NodeManager's StateFlow-based architecture, identifying inefficiencies and proposing improvements for better reactive UI integration
NodeManager StateFlow Architecture Review
Current Architecture Analysis
Core Data Structure
The NodeManager stores nodes in a map with the following structure:
1
2
3
4
5
6
7
8
9
protected val nodes: MutableMap<String, NodeFlow> = mutableMapOf()
sealed class NodeFlow {
data class Success(
val node: MutableStateFlow<Node>, // ⚠️ Mutable state
val instance: String = Uuid.random().toString()
) : NodeFlow()
data class Error(val msg: String, val exception: Exception?) : NodeFlow()
}
Current Workflow Pattern
Throughout the codebase, the typical pattern is:
- Read node from manager → Get
NodeFlow.SuccesscontainingMutableStateFlow<Node> - Extract snapshot value → Access
node.valueto read current state - Modify node → Create copy with changes
- Call update() → Post back to NodeManager via
nodeManager.update(modifiedNode) - NodeManager updates source → Updates the
MutableStateFlowviaflow.node.update { node } - Observers react → NodeObserver collects emissions and triggers
node.type.emit(node)
Key Components
1. NodeManager (Client vs Server)
Server (ServerNodeManager):
- Actor-based serialization for thread-safe updates
- File persistence for durability
- Observes only nodes owned by this server (
node.isMine()) - Full cluster state management
Client (ClientNodeManager):
- No actor pattern (single-threaded UI context)
- No file operations
- Observes ALL nodes for UI reactivity
- Updates posted to server via HTTP
2. NodeObserver
1
2
3
4
5
6
7
8
9
10
class DefaultNodeObserver {
suspend fun observe(nodeFlow: NodeFlow.Success) {
scope.launch {
nodeFlow.node.collect { node ->
// Emit to processor for state-specific handling
node.type.emit(node)
}
}
}
}
Server-Side Processors
Processors follow similar pattern:
- Read node via
nodeManager.readNode() - Extract value, compute result
- Update via
nodeManager.update(node.copy(...))
Critical Analysis
✅ What Works Well
- Single Source of Truth: The map in NodeManager is the canonical state
- Clear Separation: Server vs Client implementations handle different concerns
- Thread Safety: Server uses actor pattern effectively
- Reactive Updates: StateFlow emissions propagate changes throughout system
- Error Handling:
NodeFlowsealed class provides type-safe error states
⚠️ Concerns & Inefficiencies
1. Double Wrapping Anti-Pattern
You’re storing MutableStateFlow<Node> in a map, then wrapping it in NodeFlow.Success, then returning it. This creates unnecessary indirection:
1
Map<String, NodeFlow.Success(MutableStateFlow<Node>)>
The NodeFlow wrapper adds little value since you’re storing in a map anyway (nullability is handled by map lookup).
2. Read-Modify-Update Race Conditions (Client)
In the client, this pattern is dangerous:
1
2
3
val node = (nodeManager.readNode(id) as NodeFlow.Success).node.value
// ... time passes, user clicks button ...
nodeManager.update(node.copy(state = NodeState.EXECUTED))
Problem: The node is a snapshot. If the StateFlow updated between read and update, those changes are lost. You’re overwriting with stale data.
Current Mitigation: The client implementation checks for duplicates:
1
2
3
if (existing is NodeFlow.Success && node == existing.node.value) {
return // Ignore duplicate
}
But this doesn’t prevent lost updates if fields differ.
3. Composables Don’t Subscribe to Source StateFlow
Most composables do this:
1
2
val nodeFlow = nodeManager.readNode(nodeId)
val node = (nodeFlow as? NodeFlow.Success)?.node?.value
They take a snapshot but don’t subscribe. To get updates, they must:
- Re-read on recomposition (inefficient)
- Or manually manage StateFlow subscription (boilerplate)
The MutableStateFlow exists but isn’t being leveraged in UI!
4. NodeObserver Redundancy
The NodeObserver collects from each MutableStateFlow and calls node.type.emit(). This is an extra layer of indirection. Why not have processors/UI subscribe directly to the StateFlow?
5. Update Function Semantic Overload
nodeManager.update() does multiple things:
- Create new node (if doesn’t exist)
- Update existing node
- Trigger HTTP post to server (client-side user edits)
- Trigger file persistence (server-side)
- Call
observe()to start collection
This violates Single Responsibility Principle.
6. Subscription Count Warning Ignored
1
2
3
if (flow.node.subscriptionCount.value > 1) {
logger.e("node has multiple observers - probably a bug")
}
This warning suggests the architecture doesn’t expect multiple collectors. But with Compose, you WANT multiple subscribers!
Proposed Improvements
Option A: Pure StateFlow Repository Pattern (Recommended)
Embrace StateFlow fully and eliminate redundant layers.
Architecture Changes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Simplified - no NodeFlow wrapper needed
*/
class ClientNodeManager(...) {
// Exposed as StateFlow for UI consumption
private val nodes: MutableMap<String, MutableStateFlow<Node>> = mutableMapOf()
// For UI to discover available nodes
private val _nodeIds = MutableStateFlow<Set<String>>(emptySet())
val nodeIds: StateFlow<Set<String>> = _nodeIds
/**
* Get or create a StateFlow for a node.
* UI can subscribe directly and get automatic updates.
*/
fun getNodeFlow(id: String): StateFlow<Node>? {
return nodes[id]
}
/**
* Update via transformation function - safer than copy/replace
*/
suspend fun updateNode(id: String, transform: (Node) -> Node) {
nodes[id]?.update(transform)
// Post to server if needed
val updated = nodes[id]?.value
if (updated?.state == NodeState.USER_EDIT) {
scope.launch {
findServer(updated)?.let { server ->
nodeHttp.postNode(host = server, node = updated)
}
}
}
}
/**
* Overload for full replacement when you have the new node
*/
suspend fun updateNode(node: Node) {
updateNode(node.id) { node }
}
}
Composable Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Composable
fun NodeCard(nodeId: String) {
val nodeManager: NodeManager = koinInject()
// Direct subscription - recomposes automatically on updates!
val node by nodeManager.getNodeFlow(nodeId)
?.collectAsState()
?: return Text("Node not found")
// Render using current node state
Card {
Text(node.name())
Text("State: ${node.state}")
Button(onClick = {
scope.launch {
// Transform function ensures we're updating latest state
nodeManager.updateNode(nodeId) { current ->
current.copy(state = NodeState.EXECUTED)
}
}
}) {
Text("Execute")
}
}
}
Processor Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ServerCalculationProcessor {
suspend fun process(node: Node): Boolean {
val meta = node.meta as CalculationMetaData
// Get target StateFlow
val targetFlow = nodeManager.getNodeFlow(meta.target) ?: return false
// Transform with latest state
nodeManager.updateNode(meta.target) { current ->
val tm = current.meta as DataPointMetaData
val newSnapshot = Snapshot(now(), computedValue)
current.copy(
meta = tm.copy(snapshot = newSnapshot),
state = NodeState.EXECUTED
)
}
return true
}
}
Benefits
✅ No more read-update races: Transform function always works with latest state
✅ Direct StateFlow subscription: Composables recompose automatically
✅ Simpler code: Eliminate NodeFlow wrapper, NodeObserver indirection
✅ Better Compose integration: Natural use of collectAsState()
✅ Less boilerplate: No manual LaunchedEffect for loading
✅ Multiple subscribers welcome: StateFlow is designed for this!
Option B: Immutable Repository with Event Channel
If you want to avoid mutable state in the map:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ClientNodeManager(...) {
// Immutable snapshots
private val nodes: MutableMap<String, Node> = mutableMapOf()
// Single event channel for all updates
private val _updates = MutableSharedFlow<NodeUpdate>()
val updates: SharedFlow<NodeUpdate> = _updates
data class NodeUpdate(val id: String, val node: Node, val updateType: UpdateType)
suspend fun updateNode(node: Node) {
nodes[node.id] = node
_updates.emit(NodeUpdate(node.id, node, UpdateType.MODIFIED))
}
fun getNode(id: String): Node? = nodes[id]
}
Composable Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Composable
fun NodeCard(nodeId: String) {
val nodeManager: NodeManager = koinInject()
var node by remember { mutableStateOf(nodeManager.getNode(nodeId)) }
// Subscribe to updates
LaunchedEffect(nodeId) {
nodeManager.updates
.filter { it.id == nodeId }
.collect { update ->
node = update.node
}
}
// Render...
}
Trade-offs
✅ True immutability - easier to reason about
✅ Clear event stream for debugging
❌ More boilerplate in composables
❌ Filtering updates per-node is less efficient than per-StateFlow subscription
Option C: Hybrid - Keep Current, Fix Usage
Keep the existing architecture but fix how it’s used:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Add convenience extension to NodeManager
fun NodeManager.observeNodeAsState(id: String): StateFlow<Node>? {
return when (val flow = readNode(id)) {
is NodeFlow.Success -> flow.node
is NodeFlow.Error -> null
}
}
// Add atomic update operation
suspend fun NodeManager.atomicUpdate(id: String, transform: (Node) -> Node) {
when (val flow = readNode(id)) {
is NodeFlow.Success -> {
flow.node.update { current ->
val updated = transform(current)
// Side effects (HTTP post, etc.) handled here
update(updated) // Triggers existing machinery
updated
}
}
is NodeFlow.Error -> throw IllegalStateException("Node not found: $id")
}
}
Composable Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Composable
fun NodeCard(nodeId: String) {
val nodeManager: NodeManager = koinInject()
// Subscribe directly to the StateFlow!
val node by nodeManager.observeNodeAsState(nodeId)
?.collectAsState()
?: return Text("Node not found")
Button(onClick = {
scope.launch {
nodeManager.atomicUpdate(nodeId) { current ->
current.copy(state = NodeState.EXECUTED)
}
}
}) {
Text("Execute")
}
}
Benefits
✅ Minimal changes to existing code
✅ Fixes the race condition issue
✅ Enables direct StateFlow subscription
❌ Keeps the complex multi-layer architecture
❌ Doesn’t address NodeObserver redundancy
Recommendations
Immediate (Low-Effort Wins)
- Add
observeNodeAsState()helper - Let composables subscribe to StateFlow directly - Add
atomicUpdate()helper - Prevent read-update races with transform function - Document the pattern - Make it clear composables should subscribe, not snapshot
Short-Term (Moderate Refactor)
- Simplify client-side to Option A - Remove NodeFlow wrapper on client
- Make NodeObserver opt-in - Only for nodes that need type-specific processing
- Split update() into create/update/sync - Clear separation of concerns
Long-Term (If Needed)
- Consider Option B for server - If immutability is important for distributed consistency
- Add optimistic updates - Client updates immediately, rollback on server conflict
- Event sourcing - Store node changes as events for better debugging/replay
Key Insight: You Already Have the Infrastructure!
The MutableStateFlow<Node> in your map IS the reactive source of truth.
Your composables just need to subscribe to it instead of taking snapshots. Once you do that:
- ✅ Updates flow automatically to UI
- ✅ No need to re-read on recomposition
- ✅ No race conditions with transform-based updates
- ✅ Multiple composables can observe same node (what StateFlow is designed for!)
The problem isn’t the architecture - it’s that you’re not fully leveraging the StateFlow you already have.
Example Migration
Before (Current Pattern)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Composable
fun MyScreen() {
val nodeManager: NodeManager = koinInject()
var nodeFlow by remember { mutableStateOf<NodeFlow?>(null) }
LaunchedEffect(nodeId) {
nodeFlow = nodeManager.readNode(nodeId)
}
val node = (nodeFlow as? NodeFlow.Success)?.node?.value
node?.let {
Text(it.name())
Button(onClick = {
scope.launch {
nodeManager.update(it.copy(state = NodeState.EXECUTED))
}
}) { Text("Execute") }
}
}
After (Leveraging StateFlow)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Composable
fun MyScreen() {
val nodeManager: NodeManager = koinInject()
// Direct subscription - automatically recomposes
val node by nodeManager.observeNodeAsState(nodeId)
?.collectAsState()
?: return Text("Loading...")
Text(node.name())
Button(onClick = {
scope.launch {
// Atomic update with latest state
nodeManager.atomicUpdate(nodeId) { current ->
current.copy(state = NodeState.EXECUTED)
}
}
}) { Text("Execute") }
}
Difference:
- ✅ Less code
- ✅ No manual state management
- ✅ No LaunchedEffect needed
- ✅ Automatic recomposition
- ✅ No race conditions
Conclusion
Your architecture is fundamentally sound - you chose StateFlow, which is perfect for this. The issue is that you’re fighting against it by taking snapshots instead of subscribing.
Recommended Path Forward:
- Add the helper functions (
observeNodeAsState,atomicUpdate) - 1 hour - Migrate a few composables to the new pattern - verify it works - 2 hours
- Update documentation and examples - 1 hour
- Gradually migrate rest of codebase - ongoing
This will give you:
- Cleaner code
- Better Compose integration
- No race conditions
- Less boilerplate
- Proper reactive architecture
The infrastructure is already there - you just need to use it properly! 🚀