-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfinitemachine.go
271 lines (238 loc) · 9.01 KB
/
finitemachine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package statemachine
import (
"fmt"
"runtime"
)
// Context represents the context of the state machine.
type Context struct {
InputState State `json:"inputState"`
EventEmitted Event `json:"eventEmitted"`
InputArbitraryData map[string]interface{} `json:"inputArbitraryData"`
OutputArbitraryData map[string]interface{} `json:"outputArbitraryData"`
Handler StepHandler `json:"-"`
Callbacks map[string]StateCallbacks `json:"-"`
StepNumber int `json:"stepNumber"`
TransitionHistory []TransitionHistory `json:"transitionHistory"`
StateMachine *StateMachine `json:"-"`
}
// StateMachineError is a base struct for custom errors in the state machine package.
type StateMachineError struct {
Msg string
}
func (e *StateMachineError) Error() string {
return e.Msg
}
// UnexpectedEventTypeError indicates an unexpected event type was encountered.
type UnexpectedEventTypeError struct {
StateMachineError
EventType string
Inner error
}
func NewUnexpectedEventTypeError(inner error, eventType string) *UnexpectedEventTypeError {
return &UnexpectedEventTypeError{
StateMachineError: StateMachineError{Msg: fmt.Sprintf("unexpected event type: %s", eventType)},
EventType: eventType,
Inner: inner,
}
}
func (e *UnexpectedEventTypeError) Unwrap() error {
return e.Inner
}
// RestartExecutionError indicates an error in restarting execution from a terminal state.
type RestartExecutionError struct {
StateMachineError
State State
}
func NewRestartExecutionError(state State) *RestartExecutionError {
return &RestartExecutionError{
StateMachineError: StateMachineError{Msg: fmt.Sprintf("cannot restart execution from terminal state: %s", state.String())},
State: state,
}
}
// ExecutionActionError indicates an error in determining the execution action.
type ExecutionActionError struct {
StateMachineError
Inner error
}
func NewExecutionActionError(inner error, msg string) *ExecutionActionError {
return &ExecutionActionError{
StateMachineError: StateMachineError{Msg: fmt.Sprintf("Execution Action Error: %s", msg)},
Inner: inner,
}
}
func (e *ExecutionActionError) Unwrap() error {
return e.Inner
}
// HandlingContextError indicates an error in handling the context.
type HandlingContextError struct {
StateMachineError
Inner error
}
func NewHandlingContextError(inner error, msg string) *HandlingContextError {
return &HandlingContextError{
StateMachineError: StateMachineError{Msg: fmt.Sprintf("Handling Context Error: %s", msg)},
Inner: inner,
}
}
func (smCtx *Context) finishHandlingContext(event Event, input, output map[string]interface{}) error {
smCtx.EventEmitted = event
smCtx.InputArbitraryData = input
smCtx.OutputArbitraryData = output
// Trigger after-event callback
if callbacks, ok := smCtx.Callbacks[smCtx.InputState.String()]; ok {
cb := callbacks.AfterAnEvent
if err := cb(smCtx.StateMachine, smCtx); err != nil {
return err
}
}
return nil
}
func DetermineTerminalStateEvent(smCtx *Context) (executionEvent Event) {
switch smCtx.InputState {
case StateCompleted:
executionEvent = OnAlreadyCompleted
// Skip this step
case StateFailed:
executionEvent = OnFailed
// Skip this step
case StateRollbackCompleted:
executionEvent = OnAlreadyRollbackCompleted
// Skip this step
case StateRollbackFailed:
executionEvent = OnRollbackFailed
// Skip this step
case StateCancelled:
executionEvent = OnCancelled
// Skip this step
}
return executionEvent
}
// DetermineExecutionAction determines the execution action based on the input state.
func DetermineExecutionAction(inputArbitraryData map[string]interface{}, smCtx *Context) (convertedEvent Event, err error) {
if IsTerminalState(smCtx.InputState) {
return DetermineTerminalStateEvent(smCtx), nil
}
var outputData map[string]interface{}
var executionEvent interface{}
switch smCtx.InputState {
case StateRetry:
lastState := getLastNonRetryState(smCtx.TransitionHistory)
return restartExecutionFromState(lastState, smCtx)
case StateParked:
// If parked, try to restart execution based on previous state
lastState := getLastNonParkedState(smCtx.TransitionHistory)
return restartExecutionFromState(lastState, smCtx)
case StateSleeping, StatePending, StateOpen:
executionEvent, outputData, err = smCtx.Handler.ExecuteForward(inputArbitraryData, smCtx.TransitionHistory)
// assert that executionevent is a forwardEven
if forwardEvent, ok := executionEvent.(ForwardEvent); ok {
// forwardEvent is now the concrete type
// Convert it to a generic event
convertedEvent = forwardEvent.ToEvent()
} else {
return OnError, NewUnexpectedEventTypeError(err, fmt.Sprintf("%T", executionEvent))
}
case StatePaused:
executionEvent, outputData, err = smCtx.Handler.ExecutePause(inputArbitraryData, smCtx.TransitionHistory)
// assert that executionevent is a PauseEvent
if pauseEvent, ok := executionEvent.(PauseEvent); ok {
// pauseEvent is now the concrete type
// Convert it to a generic event
convertedEvent = pauseEvent.ToEvent()
} else {
// Handle the error or unexpected type
return OnError, NewUnexpectedEventTypeError(err, fmt.Sprintf("%T", executionEvent))
}
case StateRollback:
executionEvent, outputData, err = smCtx.Handler.ExecuteBackward(inputArbitraryData, smCtx.TransitionHistory)
// assert that executionevent is a BackwardEvent
if backwardEvent, ok := executionEvent.(BackwardEvent); ok {
// backwardEvent is now the concrete type
// Convert it to a generic event
convertedEvent = backwardEvent.ToEvent()
} else {
// Handle the error or unexpected type
return OnError, NewUnexpectedEventTypeError(err, fmt.Sprintf("%T", executionEvent))
}
case StateResume:
executionEvent, outputData, err = smCtx.Handler.ExecuteResume(inputArbitraryData, smCtx.TransitionHistory)
// assert that executionevent is a ResumeEvent
if resumeEvent, ok := executionEvent.(ResumeEvent); ok {
// resumeEvent is now the concrete type
// Convert it to a generic event
convertedEvent = resumeEvent.ToEvent()
} else {
// Handle the error or unexpected type
return OnError, NewUnexpectedEventTypeError(err, fmt.Sprintf("%T", executionEvent))
}
default: // not sure what happened so let's park it
convertedEvent = OnParked
}
smCtx.OutputArbitraryData = outputData
return convertedEvent, err
}
func restartExecutionFromState(state State, smCtx *Context) (Event, error) {
switch state {
case StateSleeping, StatePending, StateOpen:
executionEvent, _, err := smCtx.Handler.ExecuteForward(smCtx.InputArbitraryData, smCtx.TransitionHistory)
return executionEvent.ToEvent(), err
case StatePaused:
executionEvent, _, err := smCtx.Handler.ExecutePause(smCtx.InputArbitraryData, smCtx.TransitionHistory)
return executionEvent.ToEvent(), err
case StateRollback:
executionEvent, _, err := smCtx.Handler.ExecuteBackward(smCtx.InputArbitraryData, smCtx.TransitionHistory)
return executionEvent.ToEvent(), err
case StateResume:
executionEvent, _, err := smCtx.Handler.ExecuteResume(smCtx.InputArbitraryData, smCtx.TransitionHistory)
return executionEvent.ToEvent(), err
default:
if IsTerminalState(state) {
return DetermineTerminalStateEvent(smCtx), NewRestartExecutionError(state)
}
return OnFailed, nil
}
}
func getLastNonParkedState(history []TransitionHistory) State {
for i := len(history) - 1; i >= 0; i-- {
if history[i].ModifiedState != StateParked {
return history[i].ModifiedState
}
}
return StateUnknown // Or some default state
}
func getLastNonRetryState(history []TransitionHistory) State {
for i := len(history) - 1; i >= 0; i-- {
if history[i].ModifiedState != StateRetry {
return history[i].ModifiedState
}
}
return StateUnknown // Or some default state
}
func (smCtx *Context) Handle() (executionEvent Event, err error) {
inputArbitraryData := CopyMap(smCtx.InputArbitraryData) // this will end up with the same value as out
outputArbitraryData := inputArbitraryData
defer func() {
if r := recover(); r != nil {
// Capture the stack trace
stackTrace := make([]byte, 1024)
length := runtime.Stack(stackTrace, false)
stackTrace = stackTrace[:length]
// Convert the panic to an error and wrap it, including the stack trace
err = fmt.Errorf("panic occurred in Handle: %v\nStack Trace: %s", r, stackTrace)
executionEvent = OnError
}
if finishErr := smCtx.finishHandlingContext(executionEvent, smCtx.InputArbitraryData, outputArbitraryData); finishErr != nil {
// Wrap the error from finishHandlingContext with custom error type
err = NewHandlingContextError(finishErr, finishErr.Error())
executionEvent = OnError
}
}()
executionEvent, err = DetermineExecutionAction(inputArbitraryData, smCtx)
if err != nil {
// Wrap the error from DetermineExecutionAction with custom error type
err = NewExecutionActionError(err, err.Error())
return executionEvent, err
}
outputArbitraryData = CopyMap(smCtx.OutputArbitraryData)
return executionEvent, err
}