forked from reugn/go-streams
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.go
249 lines (212 loc) · 6.69 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package gcp
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"sync"
"cloud.google.com/go/storage"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
"google.golang.org/api/iterator"
)
// StorageSourceConfig represents the configuration options for the GCP Storage
// source connector.
type StorageSourceConfig struct {
// The name of the storage bucket to read from.
Bucket string
// The path within the bucket to use. If empty, the root of the
// bucket will be used.
Prefix string
// Delimiter can be used to restrict the results to only the objects in
// the given "directory". Without the delimiter, the entire tree under
// the prefix is returned.
Delimiter string
}
// StorageSource represents the Google Cloud Storage source connector.
type StorageSource struct {
client *storage.Client
config *StorageSourceConfig
out chan any
logger *slog.Logger
}
var _ streams.Source = (*StorageSource)(nil)
// NewStorageSource returns a new [StorageSource].
// The connector reads all objects within the configured path and transmits
// them as an [StorageObject] through the output channel.
func NewStorageSource(ctx context.Context, client *storage.Client,
config *StorageSourceConfig, logger *slog.Logger) *StorageSource {
if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "gcp.storage"),
slog.String("type", "source")))
storageSource := &StorageSource{
client: client,
config: config,
out: make(chan any),
logger: logger,
}
// read objects and send them downstream
go storageSource.readObjects(ctx)
return storageSource
}
func (s *StorageSource) readObjects(ctx context.Context) {
defer func() {
s.logger.Info("Closing connector")
close(s.out)
}()
bucketHandle := s.client.Bucket(s.config.Bucket)
// check if the bucket exists
if _, err := bucketHandle.Attrs(ctx); err != nil {
s.logger.Error("Failed to get bucket attrs",
slog.String("bucket", s.config.Bucket), slog.Any("error", err))
return
}
// create objects iterator
it := bucketHandle.Objects(ctx, &storage.Query{
Prefix: s.config.Prefix,
Delimiter: s.config.Delimiter,
})
for { // iterate over the objects in the bucket
objectAttrs, err := it.Next()
if err != nil {
if !errors.Is(err, iterator.Done) {
s.logger.Error("Failed to read object from bucket",
slog.String("bucket", s.config.Bucket), slog.Any("error", err))
}
// If the previous call to Next returned an error other than iterator.Done,
// all subsequent calls will return the same error. To continue iteration,
// a new `ObjectIterator` must be created.
return
}
// create a new reader to read the contents of the object
reader, err := bucketHandle.Object(objectAttrs.Name).NewReader(ctx)
if err != nil {
s.logger.Error("Failed to create reader from object",
slog.String("object", objectAttrs.Name),
slog.String("bucket", s.config.Bucket), slog.Any("error", err))
continue
}
select {
// send the object downstream
case s.out <- &StorageObject{
Key: objectAttrs.Name,
Data: reader,
}:
case <-ctx.Done():
s.logger.Debug("Object reading terminated", slog.Any("error", ctx.Err()))
return
}
}
}
// Via streams data to a specified operator and returns it.
func (s *StorageSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
}
// Out returns the output channel of the StorageSource connector.
func (s *StorageSource) Out() <-chan any {
return s.out
}
// StorageObject contains details of the GCP Storage object.
type StorageObject struct {
// Key is the object name including any subdirectories.
// For example, "directory/file.json".
Key string
// Data is an [io.ReadCloser] representing the binary content of the object.
Data io.ReadCloser
}
// StorageSinkConfig represents the configuration options for the GCP Storage
// sink connector.
type StorageSinkConfig struct {
// The name of the GCP Storage bucket to write to.
Bucket string
// The number of concurrent workers to use when writing data to GCP Storage.
// The default is 1.
Parallelism int
}
// StorageSink represents the Google Cloud Storage sink connector.
type StorageSink struct {
client *storage.Client
config *StorageSinkConfig
in chan any
logger *slog.Logger
}
var _ streams.Sink = (*StorageSink)(nil)
// NewStorageSink returns a new [StorageSink].
// Incoming elements are expected to be of the [StorageObject] type. These will
// be uploaded to the configured bucket using their key field as the path.
func NewStorageSink(ctx context.Context, client *storage.Client,
config *StorageSinkConfig, logger *slog.Logger) *StorageSink {
if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "gcp.storage"),
slog.String("type", "sink")))
if config.Parallelism < 1 {
config.Parallelism = 1
}
storageSink := &StorageSink{
client: client,
config: config,
in: make(chan any, config.Parallelism),
logger: logger,
}
// start writing incoming data
go storageSink.writeObjects(ctx)
return storageSink
}
// writeObjects writes incoming stream data elements to GCP Storage using the
// configured parallelism.
func (s *StorageSink) writeObjects(ctx context.Context) {
bucketHandle := s.client.Bucket(s.config.Bucket)
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range s.in {
var err error
switch object := data.(type) {
case StorageObject:
err = s.writeObject(ctx, bucketHandle, &object)
case *StorageObject:
err = s.writeObject(ctx, bucketHandle, object)
default:
s.logger.Error("Unsupported data type",
slog.String("type", fmt.Sprintf("%T", object)))
}
if err != nil {
s.logger.Error("Error writing object",
slog.Any("error", err))
}
}
}()
}
// wait for all writers to exit
wg.Wait()
s.logger.Info("All object writers exited")
}
// writeObject writes a single object to GCP Storage.
func (s *StorageSink) writeObject(ctx context.Context, bucketHandle *storage.BucketHandle,
object *StorageObject) error {
defer object.Data.Close()
// writes will be retried on transient errors from the server
writer := bucketHandle.Object(object.Key).NewWriter(ctx)
if _, err := io.Copy(writer, object.Data); err != nil {
return fmt.Errorf("failed to write object %s: %w", object.Key, err)
}
if err := writer.Close(); err != nil {
return fmt.Errorf("failed to close writer %s: %w", object.Key, err)
}
s.logger.Debug("Object successfully written", slog.String("key", object.Key))
return nil
}
// In returns the input channel of the StorageSink connector.
func (s *StorageSink) In() chan<- any {
return s.in
}