forked from knadh/listmonk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.go
349 lines (297 loc) · 9.38 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package runner
import (
"bytes"
"fmt"
"html/template"
"log"
"sync"
"time"
"github.com/knadh/listmonk/messenger"
"github.com/knadh/listmonk/models"
)
const (
batchSize = 10000
// BaseTPL is the name of the base template.
BaseTPL = "base"
// ContentTpl is the name of the compiled message.
ContentTpl = "content"
)
// DataSource represents a data backend, such as a database,
// that provides subscriber and campaign records.
type DataSource interface {
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
NextSubscribers(campID, limit int) ([]*models.Subscriber, error)
GetCampaign(campID int) (*models.Campaign, error)
PauseCampaign(campID int) error
CancelCampaign(campID int) error
FinishCampaign(campID int) error
CreateLink(url string) (string, error)
}
// Runner handles the scheduling, processing, and queuing of campaigns
// and message pushes.
type Runner struct {
cfg Config
src DataSource
messengers map[string]messenger.Messenger
logger *log.Logger
// Campaigns that are currently running.
camps map[int]*models.Campaign
// Links generated using Track() are cached here so as to not query
// the database for the link UUID for every message sent. This has to
// be locked as it may be used externally when previewing campaigns.
links map[string]string
linksMutex sync.RWMutex
msgQueue chan *Message
subFetchQueue chan *models.Campaign
}
// Message represents an active subscriber that's being processed.
type Message struct {
Campaign *models.Campaign
Subscriber *models.Subscriber
UnsubscribeURL string
Body []byte
to string
}
// Config has parameters for configuring the runner.
type Config struct {
Concurrency int
LinkTrackURL string
UnsubscribeURL string
}
// New returns a new instance of Mailer.
func New(cfg Config, src DataSource, l *log.Logger) *Runner {
r := Runner{
cfg: cfg,
messengers: make(map[string]messenger.Messenger),
src: src,
camps: make(map[int]*models.Campaign, 0),
links: make(map[string]string, 0),
logger: l,
subFetchQueue: make(chan *models.Campaign, 100),
msgQueue: make(chan *Message, cfg.Concurrency),
}
return &r
}
// NewMessage creates and returns a Message that is made available
// to message templates while they're compiled.
func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message {
return &Message{
to: s.Email,
Campaign: c,
Subscriber: s,
UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID),
}
}
// AddMessenger adds a Messenger messaging backend to the runner process.
func (r *Runner) AddMessenger(msg messenger.Messenger) error {
id := msg.Name()
if _, ok := r.messengers[id]; ok {
return fmt.Errorf("messenger '%s' is already loaded", id)
}
r.messengers[id] = msg
return nil
}
// GetMessengerNames returns the list of registered messengers.
func (r *Runner) GetMessengerNames() []string {
var names []string
for n := range r.messengers {
names = append(names, n)
}
return names
}
// HasMessenger checks if a given messenger is registered.
func (r *Runner) HasMessenger(id string) bool {
_, ok := r.messengers[id]
return ok
}
// Run is a blocking function (and hence should be invoked as a goroutine)
// that scans the source db at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of
// subscribers and pushes messages to them for each queued campaign
// until all subscribers are exhausted, at which point, a campaign is marked
// as "finished".
func (r *Runner) Run(tick time.Duration) {
var (
tScanCampaigns = time.NewTicker(tick)
)
for {
select {
// Fetch all 'running campaigns that aren't being processed.
case <-tScanCampaigns.C:
campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs())
if err != nil {
r.logger.Printf("error fetching campaigns: %v", err)
return
}
for _, c := range campaigns {
if err := r.addCampaign(c); err != nil {
r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
continue
}
r.logger.Printf("start processing campaign (%s)", c.Name)
r.subFetchQueue <- c
}
// Fetch next set of subscribers for the incoming campaign ID
// and process them.
case c := <-r.subFetchQueue:
has, err := r.nextSubscribers(c, batchSize)
if err != nil {
r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
}
if has {
// There are more subscribers to fetch.
r.subFetchQueue <- c
} else {
// No subscribers.
if err := r.processExhaustedCampaign(c); err != nil {
r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
}
}
}
}
}
// SpawnWorkers spawns workers goroutines that push out messages.
func (r *Runner) SpawnWorkers() {
for i := 0; i < r.cfg.Concurrency; i++ {
go func(ch chan *Message) {
for {
select {
case m := <-ch:
r.messengers[m.Campaign.MessengerID].Push(
m.Campaign.FromEmail,
m.Subscriber.Email,
m.Campaign.Subject,
m.Body)
}
}
}(r.msgQueue)
}
}
// TemplateFuncs returns the template functions to be applied into
// compiled campaign templates.
func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
return template.FuncMap{
"Track": func(url, campUUID, subUUID string) string {
return r.trackLink(url, campUUID, subUUID)
},
}
}
// addCampaign adds a campaign to the process queue.
func (r *Runner) addCampaign(c *models.Campaign) error {
// Validate messenger.
if _, ok := r.messengers[c.MessengerID]; !ok {
r.src.CancelCampaign(c.ID)
return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
}
// Load the template.
if err := c.CompileTemplate(r.TemplateFuncs(c)); err != nil {
return err
}
// Add the campaign to the active map.
r.camps[c.ID] = c
return nil
}
// getPendingCampaignIDs returns the IDs of campaigns currently being processed.
func (r *Runner) getPendingCampaignIDs() []int64 {
// Needs to return an empty slice in case there are no campaigns.
ids := make([]int64, 0)
for _, c := range r.camps {
ids = append(ids, int64(c.ID))
}
return ids
}
// nextSubscribers processes the next batch of subscribers in a given campaign.
// If returns a bool indicating whether there any subscribers were processed
// in the current batch or not. This can happen when all the subscribers
// have been processed, or if a campaign has been paused or cancelled abruptly.
func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
// Fetch a batch of subscribers.
subs, err := r.src.NextSubscribers(c.ID, batchSize)
if err != nil {
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
}
// There are no subscribers.
if len(subs) == 0 {
return false, nil
}
// Push messages.
for _, s := range subs {
m := r.NewMessage(c, s)
if err := m.Render(); err != nil {
r.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
continue
}
// Send the message.
r.msgQueue <- m
}
return true, nil
}
func (r *Runner) processExhaustedCampaign(c *models.Campaign) error {
cm, err := r.src.GetCampaign(c.ID)
if err != nil {
return err
}
// If a running campaign has exhausted subscribers, it's finished.
// Otherwise, it's paused or cancelled.
if cm.Status == models.CampaignStatusRunning {
if err := r.src.FinishCampaign(c.ID); err != nil {
r.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
} else {
r.logger.Printf("campaign (%s) finished", c.Name)
}
} else {
r.logger.Printf("stop processing campaign (%s)", c.Name)
}
delete(r.camps, c.ID)
return nil
}
// Render takes a Message, executes its pre-compiled Campaign.Tpl
// and applies the resultant bytes to Message.body to be used in messages.
func (m *Message) Render() error {
out := bytes.Buffer{}
if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
return err
}
m.Body = out.Bytes()
return nil
}
// trackLink register a URL and return its UUID to be used in message templates
// for tracking links.
func (r *Runner) trackLink(url, campUUID, subUUID string) string {
r.linksMutex.RLock()
if uu, ok := r.links[url]; ok {
return uu
}
r.linksMutex.RUnlock()
// Register link.
uu, err := r.src.CreateLink(url)
if err != nil {
r.logger.Printf("error registering tracking for link '%s': %v", url, err)
// If the registration fails, fail over to the original URL.
return url
}
r.linksMutex.Lock()
r.links[url] = uu
r.linksMutex.Unlock()
return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
}
// CompileMessageTemplate takes a base template body string and a child (message) template
// body string, compiles both and inserts the child template as the named template "content"
// and returns the resultant template.
func CompileMessageTemplate(baseBody, childBody string) (*template.Template, error) {
// Compile the base template.
baseTPL, err := template.New(BaseTPL).Parse(baseBody)
if err != nil {
return nil, fmt.Errorf("error compiling base template: %v", err)
}
// Compile the campaign message.
msgTpl, err := template.New(ContentTpl).Parse(childBody)
if err != nil {
return nil, fmt.Errorf("error compiling message: %v", err)
}
out, err := baseTPL.AddParseTree(ContentTpl, msgTpl.Tree)
if err != nil {
return nil, fmt.Errorf("error inserting child template: %v", err)
}
return out, nil
}