forked from redpanda-data/benthos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patherror.go
120 lines (108 loc) · 3.63 KB
/
error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Package batch contains internal utilities for interacting with message
// batches.
package batch
import (
"errors"
"github.com/redpanda-data/benthos/v4/internal/message"
)
// Error is an error type that also allows storing granular errors for each
// message of a batch.
type Error struct {
err error
erroredBatch message.Batch
partErrors map[int]error
}
// NewError creates a new batch-wide error, where it's possible to add granular
// errors for individual messages of the batch.
func NewError(msg message.Batch, err error) *Error {
if berr, ok := err.(*Error); ok {
err = berr.Unwrap()
}
return &Error{
err: err,
erroredBatch: msg,
}
}
// Failed stores an error state for a particular message of a batch. Returns a
// pointer to the underlying error, allowing the method to be chained.
//
// If Failed is not called then all messages are assumed to have failed. If it
// is called at least once then all message indexes that aren't explicitly
// failed are assumed to have been processed successfully.
func (e *Error) Failed(i int, err error) *Error {
if len(e.erroredBatch) <= i {
return e
}
if e.partErrors == nil {
e.partErrors = make(map[int]error)
}
e.partErrors[i] = err
return e
}
// IndexedErrors returns the number of indexed errors that have been registered
// for the batch.
func (e *Error) IndexedErrors() int {
return len(e.partErrors)
}
// XErroredBatch returns the underlying batch associated with the error.
func (e *Error) XErroredBatch() message.Batch {
return e.erroredBatch
}
// WalkPartsBySource applies a closure to each message that was part of the
// request that caused this error. The closure is provided the message part
// index, a pointer to the part, and its individual error, which may be nil if
// the message itself was processed successfully. The closure returns a bool
// which indicates whether the iteration should be continued.
//
// Important! The order to parts walked is not guaranteed to match that of the
// source batch. It is also possible for any given index to be represented zero,
// one or more times.
func (e *Error) WalkPartsBySource(sourceSortGroup *message.SortGroup, sourceBatch message.Batch, fn func(int, *message.Part, error) bool) {
_ = e.erroredBatch.Iter(func(i int, p *message.Part) error {
index := sourceSortGroup.GetIndex(p)
if index < 0 || index >= len(sourceBatch) {
return nil
}
var err error
if e.partErrors == nil {
err = e.err
} else {
err = e.partErrors[i]
}
if !fn(index, sourceBatch[index], err) {
return errors.New("stop")
}
return nil
})
}
// WalkPartsNaively applies a closure to each message that was part of the
// request that caused this error. The closure is provided the message part
// index, a pointer to the part, and its individual error, which may be nil if
// the message itself was processed successfully. The closure returns a bool
// which indicates whether the iteration should be continued.
//
// WARNING: The shape and order of the errored batch is not guaranteed to match
// that of an origin batch and therefore cannot be used to associate batch
// errors with the origin. Instead, use WalkPartsBySource.
func (e *Error) WalkPartsNaively(fn func(int, *message.Part, error) bool) {
_ = e.erroredBatch.Iter(func(i int, p *message.Part) error {
var err error
if e.partErrors == nil {
err = e.err
} else {
err = e.partErrors[i]
}
if !fn(i, p, err) {
return errors.New("stop")
}
return nil
})
}
// Error implements the common error interface.
func (e *Error) Error() string {
return e.err.Error()
}
// Unwrap returns the underlying common error.
func (e *Error) Unwrap() error {
return e.err
}