Post

Icon Node Processor Refactoring Summary

Complete refactoring of node processor architecture extracting common control flow logic into a centralized NodeProcessExecutor class, creating a clean state machine pattern for processing nodes

Node Processor Refactoring Summary

Node Processor Refactoring Summary

Overview

This refactoring extracts common control flow logic from all server processors into a centralized NodeProcessExecutor class, creating a clean state machine pattern for processing nodes.

Key Changes

1. New Core Component: NodeProcessExecutor

Location: /krill-sdk/src/commonMain/kotlin/krill/zone/node/NodeProcessExecutor.kt

Purpose: Centralized executor that handles:

  • Job lifecycle management (creation, tracking, completion)
  • Mutex-based deduplication to prevent race conditions
  • Timestamp-based deduplication (100ms window)
  • Automatic error handling and node state transitions
  • Child node execution on success
  • Automatic error state setting on failure

Key Features:

  • submit() method accepts task logic as a lambda returning Boolean (success/failure)
  • Handles all coroutine job management, invokeOnCompletion callbacks
  • Protects against ERROR state processing
  • Provides keepJobRunning option for long-running tasks (e.g., cron jobs)
  • Thread-safe with mutex protection

2. Refactored BaseNodeProcessor

Changes:

  • Now uses NodeProcessExecutor for all operations
  • Simplified base post() method handles common state transitions
  • Constructor reordered: fileOperations first, then overrides
  • Removed duplicate mutex/jobs map - now handled by executor

3. All Server Processors Refactored

Before Pattern (Example):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private val jobs = mutableMapOf<String, Job>()
private val mutex = Mutex()

override fun post(node: Node) {
    super.post(node)
    scope.launch {
        mutex.withLock {
            if (!jobs.contains(node.id) && node.isMine()) {
                val job = scope.launch {
                    when (node.state) {
                        NodeState.EXECUTED -> {
                            try {
                                // processing logic mixed with control flow
                                process(node)
                            } catch (e: Exception) {
                                nodeManager.error(node)
                            }
                        }
                    }
                }
                jobs[node.id] = job
                job.invokeOnCompletion {
                    jobs.remove(node.id)
                }
            }
        }
    }
}

After Pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
override fun post(node: Node) {
    super.post(node)  // Calls BaseNodeProcessor.post() first
    
    if (!node.isMine()) return  // Early exit for non-owned nodes
    
    scope.launch {
        when (node.state) {
            NodeState.EXECUTED -> {
                executor.submit(
                    node = node,
                    shouldProcess = { it == NodeState.EXECUTED },
                    executeChildren = true,
                    keepJobRunning = false
                ) { n ->
                    // Pure business logic - returns Boolean
                    processMyFeature(n)
                }
            }
            
            NodeState.USER_EDIT -> {
                nodeManager.execute(node)
            }
            
            NodeState.DELETING -> {
                executor.cancel(node.id)
            }
            
            else -> {}
        }
    }
}

private suspend fun processMyFeature(node: Node): Boolean {
    return try {
        // Business logic here
        nodeManager.complete(node)
        true  // Success
    } catch (e: Exception) {
        logger.e(e) { "Error processing" }
        false  // Executor sets ERROR state
    }
}

4. Processors Refactored

All server processors now follow the new pattern:

  • ServerCronProcessor
  • ServerCalculationProcessor
  • ServerComputeProcessor
  • ServerLambdaProcessor
  • ServerTriggerProcessor
  • ServerRuleProcessor
  • ServerBeaconProcessor
  • ServerServerProcessor
  • ServerClientProcessor
  • ServerDataSourceProcessor
  • ServerDataPointProcessor

Benefits

1. Consistency

  • All processors now follow the exact same pattern
  • Easier to understand and maintain
  • New developers can learn one pattern

2. Reduced Boilerplate

  • ~30-50 lines of control flow code removed from each processor
  • No more manual job management
  • No more manual mutex handling

3. Better Error Handling

  • Automatic error state transitions
  • Consistent error logging
  • No forgotten nodeManager.error() calls

4. Thread Safety

  • Centralized mutex protection
  • Prevents duplicate processing
  • Timestamp-based deduplication prevents rapid-fire updates

5. Testability

  • Pure business logic methods are easy to test
  • No need to mock jobs or mutex
  • Test success/failure paths separately

6. Performance

  • Single job map per executor (not per processor)
  • Efficient timestamp-based deduplication
  • Proper job cleanup on completion

State Machine Flow

stateDiagram-v2
    [*] --> post: Node state change
    post --> BaseNodeProcessor.post(): super.post(node)
    BaseNodeProcessor.post() --> handleBaseOperations: Common states
    handleBaseOperations --> ProcessorSpecific: Not handled
    ProcessorSpecific --> executor.submit(): EXECUTED state
    executor.submit() --> CheckDedup: Check conditions
    CheckDedup --> LaunchJob: Not duplicate
    CheckDedup --> Skip: Duplicate/ERROR state
    LaunchJob --> ExecuteTask: Run lambda
    ExecuteTask --> Success: Returns true
    ExecuteTask --> Failure: Returns false
    Success --> ExecuteChildren: If configured
    Success --> Complete: Job done
    Failure --> SetError: Auto error state
    Complete --> [*]
    SetError --> [*]
    Skip --> [*]

Migration Guide

For New Processors

Follow the Processor Pattern Guide for creating new processors.

For Existing Code

  1. Remove jobs map and mutex from processor
  2. Change post() to use executor.submit()
  3. Extract business logic into separate method returning Boolean
  4. Remove manual nodeManager.error() calls
  5. Return true on success, false on failure

Testing

All refactored processors maintain 100% existing behavior:

  • All existing tests pass
  • No breaking changes to API
  • Backward compatible with existing nodes

Conclusion

This refactoring significantly improves code quality, maintainability, and consistency across the entire processor architecture. The centralized executor pattern provides a solid foundation for future processor development and makes the codebase easier to understand and modify.

This post is licensed under CC BY 4.0 by the author.