-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprovider.go
140 lines (113 loc) · 2.5 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package mail
import (
"net/mail"
"sync"
"github.com/pkg/errors"
"github.com/sourcegraph/conc/pool"
)
const (
contentTypeText = "text/plain"
contentTypeHTML = "text/html"
defaultMaxWorkers = 3
)
// BaseProvider is a base implementation of the Provider interface
type BaseProvider struct {
from mail.Address
subjPrefix string
maxWorkers int
queue chan Message
watchOnce sync.Once
}
func NewBaseProvider(
from mail.Address,
opts ...Option,
) (*BaseProvider, <-chan error) {
config := providerConfig{
subjPrefix: "",
maxWorkers: defaultMaxWorkers,
}
for _, opt := range opts {
if opt != nil {
opt(&config)
}
}
p := &BaseProvider{
from: from,
subjPrefix: config.subjPrefix,
maxWorkers: config.maxWorkers,
}
return p, p.init()
}
func (p *BaseProvider) init() <-chan error {
errC := make(chan error)
p.watchOnce.Do(func() {
p.queue = make(chan Message, p.maxWorkers)
go p.watch(errC)
})
return errC
}
func (p *BaseProvider) watch(errC chan<- error) {
wp := pool.
New().
WithMaxGoroutines(p.maxWorkers)
for message := range p.queue {
msg := message
wp.Go(func() {
if err := p.render(&msg); err != nil {
errC <- err
return
}
if err := p.send(msg); err != nil {
errC <- errors.Wrapf(err, "sending email %s", msg)
}
})
}
wp.Wait()
close(errC)
}
func (p *BaseProvider) render(msg *Message) error {
if !msg.HasRecipients() {
return errors.Errorf("no recipients for email %s", msg)
}
if err := msg.Render(); err != nil {
return errors.Wrapf(err, "rendering email %s", msg)
}
if !(msg.HasContent() || msg.HasAttachments()) {
return errors.Errorf("no content or attachments for email %s", msg)
}
return nil
}
func (p *BaseProvider) send(msg Message) error {
panic("implement me")
}
// SendMessage pushes messages to the queue
func (p *BaseProvider) SendMessage(messages ...Message) {
for _, msg := range messages {
p.queue <- msg
}
}
// Close closes the queue
func (p *BaseProvider) Close() {
close(p.queue)
}
type providerConfig struct {
subjPrefix string
maxWorkers int
}
// Option is a configuration option for the provider
type Option func(*providerConfig)
// WithSubjectPrefix sets the default subject prefix for all messages
func WithSubjectPrefix(prefix string) Option {
return func(o *providerConfig) {
o.subjPrefix = prefix
}
}
// WithMaxWorkers sets the maximum number of workers
func WithMaxWorkers(max uint) Option {
if max == 0 {
return nil
}
return func(o *providerConfig) {
o.maxWorkers = int(max)
}
}