Skip to content

Commit

Permalink
Add generic HTTP postback Messenger support.
Browse files Browse the repository at this point in the history
This is a major feature that builds upon the `Messenger` interface
that has been in listmonk since its inception (with SMTP as the only
messenger). This commit introduces a new Messenger implementation, an
HTTP "postback", that can post campaign messages as a standard JSON
payload to arbitrary HTTP servers. These servers can in turn push them
to FCM, SMS, or any or any such upstream, enabling listmonk to be a
generic campaign messenger for any type of communication, not just
e-mails.

Postback HTTP endpoints can be defined in settings and they can be
selected on campaigns.
  • Loading branch information
knadh committed Oct 10, 2020
1 parent be9fbcd commit 6cf43ea
Show file tree
Hide file tree
Showing 23 changed files with 944 additions and 119 deletions.
14 changes: 13 additions & 1 deletion cmd/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"syscall"
"time"

Expand All @@ -29,11 +30,22 @@ func handleGetConfigScript(c echo.Context) error {
out = configScript{
RootURL: app.constants.RootURL,
FromEmail: app.constants.FromEmail,
Messengers: app.manager.GetMessengerNames(),
MediaProvider: app.constants.MediaProvider,
}
)

// Sort messenger names with `email` always as the first item.
var names []string
for name := range app.messengers {
if name == emailMsgr {
continue
}
names = append(names, name)
}
sort.Strings(names)
out.Messengers = append(out.Messengers, emailMsgr)
out.Messengers = append(out.Messengers, names...)

app.Lock()
out.NeedsRestart = app.needsRestart
out.Update = app.update
Expand Down
31 changes: 19 additions & 12 deletions cmd/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,6 @@ func handleCreateCampaign(c echo.Context) error {
o = c
}

if !app.manager.HasMessenger(o.MessengerID) {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Unknown messenger %s", o.MessengerID))
}

uu, err := uuid.NewV4()
if err != nil {
app.log.Printf("error generating UUID: %v", err)
Expand All @@ -243,7 +238,7 @@ func handleCreateCampaign(c echo.Context) error {
o.ContentType,
o.SendAt,
pq.StringArray(normalizeTags(o.Tags)),
"email",
o.Messenger,
o.TemplateID,
o.ListIDs,
); err != nil {
Expand Down Expand Up @@ -312,6 +307,7 @@ func handleUpdateCampaign(c echo.Context) error {
o.SendAt,
o.SendLater,
pq.StringArray(normalizeTags(o.Tags)),
o.Messenger,
o.TemplateID,
o.ListIDs)
if err != nil {
Expand Down Expand Up @@ -492,6 +488,7 @@ func handleTestCampaign(c echo.Context) error {
if err := c.Bind(&req); err != nil {
return err
}

// Validate.
if c, err := validateCampaignFields(req, app); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -532,6 +529,9 @@ func handleTestCampaign(c echo.Context) error {
camp.Subject = req.Subject
camp.FromEmail = req.FromEmail
camp.Body = req.Body
camp.Messenger = req.Messenger
camp.ContentType = req.ContentType
camp.TemplateID = req.TemplateID

// Send the test messages.
for _, s := range subs {
Expand Down Expand Up @@ -560,11 +560,14 @@ func sendTestMessage(sub models.Subscriber, camp *models.Campaign, app *App) err
fmt.Sprintf("Error rendering message: %v", err))
}

return app.messenger.Push(messenger.Message{
From: camp.FromEmail,
To: []string{sub.Email},
Subject: m.Subject(),
Body: m.Body(),
return app.messengers[camp.Messenger].Push(messenger.Message{
From: camp.FromEmail,
To: []string{sub.Email},
Subject: m.Subject(),
ContentType: camp.ContentType,
Body: m.Body(),
Subscriber: sub,
Campaign: camp,
})
}

Expand Down Expand Up @@ -600,9 +603,13 @@ func validateCampaignFields(c campaignReq, app *App) (campaignReq, error) {
return c, errors.New("no lists selected")
}

if !app.manager.HasMessenger(c.Messenger) {
return c, fmt.Errorf("unknown messenger %s", c.Messenger)
}

camp := models.Campaign{Body: c.Body, TemplateBody: tplTag}
if err := c.CompileTemplate(app.manager.TemplateFuncs(&camp)); err != nil {
return c, fmt.Errorf("Error compiling campaign body: %v", err)
return c, fmt.Errorf("error compiling campaign body: %v", err)
}

return c, nil
Expand Down
60 changes: 48 additions & 12 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/knadh/listmonk/internal/media/providers/filesystem"
"github.com/knadh/listmonk/internal/media/providers/s3"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/messenger/email"
"github.com/knadh/listmonk/internal/messenger/postback"
"github.com/knadh/listmonk/internal/subimporter"
"github.com/knadh/stuffbin"
"github.com/labstack/echo"
Expand Down Expand Up @@ -290,49 +292,83 @@ func initImporter(q *Queries, db *sqlx.DB, app *App) *subimporter.Importer {
}, db.DB)
}

// initMessengers initializes various messenger backends.
func initMessengers(m *manager.Manager) messenger.Messenger {
// initSMTPMessenger initializes the SMTP messenger.
func initSMTPMessenger(m *manager.Manager) messenger.Messenger {
var (
mapKeys = ko.MapKeys("smtp")
servers = make([]messenger.Server, 0, len(mapKeys))
servers = make([]email.Server, 0, len(mapKeys))
)

items := ko.Slices("smtp")
if len(items) == 0 {
lo.Fatalf("no SMTP servers found in config")
}

// Load the default SMTP messengers.
// Load the config for multipme SMTP servers.
for _, item := range items {
if !item.Bool("enabled") {
continue
}

// Read the SMTP config.
var s messenger.Server
var s email.Server
if err := item.UnmarshalWithConf("", &s, koanf.UnmarshalConf{Tag: "json"}); err != nil {
lo.Fatalf("error loading SMTP: %v", err)
lo.Fatalf("error reading SMTP config: %v", err)
}

servers = append(servers, s)
lo.Printf("loaded SMTP: %s@%s", item.String("username"), item.String("host"))
lo.Printf("loaded email (SMTP) messenger: %s@%s",
item.String("username"), item.String("host"))
}
if len(servers) == 0 {
lo.Fatalf("no SMTP servers enabled in settings")
}

// Initialize the default e-mail messenger.
msgr, err := messenger.NewEmailer(servers...)
// Initialize the e-mail messenger with multiple SMTP servers.
msgr, err := email.New(servers...)
if err != nil {
lo.Fatalf("error loading e-mail messenger: %v", err)
}
if err := m.AddMessenger(msgr); err != nil {
lo.Printf("error registering messenger %s", err)
}

return msgr
}

// initPostbackMessengers initializes and returns all the enabled
// HTTP postback messenger backends.
func initPostbackMessengers(m *manager.Manager) []messenger.Messenger {
items := ko.Slices("messengers")
if len(items) == 0 {
return nil
}

var out []messenger.Messenger
for _, item := range items {
if !item.Bool("enabled") {
continue
}

// Read the Postback server config.
var (
name = item.String("name")
o postback.Options
)
if err := item.UnmarshalWithConf("", &o, koanf.UnmarshalConf{Tag: "json"}); err != nil {
lo.Fatalf("error reading Postback config: %v", err)
}

// Initialize the Messenger.
p, err := postback.New(o)
if err != nil {
lo.Fatalf("error initializing Postback messenger %s: %v", name, err)
}
out = append(out, p)

lo.Printf("loaded Postback messenger: %s", name)
}

return out
}

// initMediaStore initializes Upload manager with a custom backend.
func initMediaStore() media.Store {
switch provider := ko.String("upload.provider"); provider {
Expand Down
2 changes: 1 addition & 1 deletion cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func install(lastVer string, db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
"richtext",
nil,
pq.StringArray{"test-campaign"},
"email",
emailMsgr,
1,
pq.Int64Array{1},
); err != nil {
Expand Down
53 changes: 36 additions & 17 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ import (
"github.com/knadh/stuffbin"
)

const (
emailMsgr = "email"
)

// App contains the "global" components that are
// passed around, especially through HTTP handlers.
type App struct {
fs stuffbin.FileSystem
db *sqlx.DB
queries *Queries
constants *constants
manager *manager.Manager
importer *subimporter.Importer
messenger messenger.Messenger
media media.Store
notifTpls *template.Template
log *log.Logger
fs stuffbin.FileSystem
db *sqlx.DB
queries *Queries
constants *constants
manager *manager.Manager
importer *subimporter.Importer
messengers map[string]messenger.Messenger
media media.Store
notifTpls *template.Template
log *log.Logger

// Channel for passing reload signals.
sigChan chan os.Signal
Expand Down Expand Up @@ -122,18 +126,31 @@ func main() {
// Initialize the main app controller that wraps all of the app's
// components. This is passed around HTTP handlers.
app := &App{
fs: fs,
db: db,
constants: initConstants(),
media: initMediaStore(),
log: lo,
fs: fs,
db: db,
constants: initConstants(),
media: initMediaStore(),
messengers: make(map[string]messenger.Messenger),
log: lo,
}
_, app.queries = initQueries(queryFilePath, db, fs, true)
app.manager = initCampaignManager(app.queries, app.constants, app)
app.importer = initImporter(app.queries, db, app)
app.messenger = initMessengers(app.manager)
app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.constants)

// Initialize the default SMTP (`email`) messenger.
app.messengers[emailMsgr] = initSMTPMessenger(app.manager)

// Initialize any additional postback messengers.
for _, m := range initPostbackMessengers(app.manager) {
app.messengers[m.Name()] = m
}

// Attach all messengers to the campaign manager.
for _, m := range app.messengers {
app.manager.AddMessenger(m)
}

// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5)
Expand Down Expand Up @@ -164,7 +181,9 @@ func main() {
app.db.DB.Close()

// Close the messenger pool.
app.messenger.Close()
for _, m := range app.messengers {
m.Close()
}

// Signal the close.
closerWait <- true
Expand Down
15 changes: 7 additions & 8 deletions cmd/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ func (app *App) sendNotification(toEmails []string, subject, tplName string, dat
return err
}

err := app.manager.PushMessage(manager.Message{
From: app.constants.FromEmail,
To: toEmails,
Subject: subject,
Body: b.Bytes(),
Messenger: "email",
})
if err != nil {
m := manager.Message{}
m.From = app.constants.FromEmail
m.To = toEmails
m.Subject = subject
m.Body = b.Bytes()
m.Messenger = emailMsgr
if err := app.manager.PushMessage(m); err != nil {
app.log.Printf("error sending admin notification (%s): %v", subject, err)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func handleSelfExportSubscriberData(c echo.Context) error {

// Send the data as a JSON attachment to the subscriber.
const fname = "data.json"
if err := app.messenger.Push(messenger.Message{
if err := app.messengers[emailMsgr].Push(messenger.Message{
From: app.constants.FromEmail,
To: []string{data.Email},
Subject: "Your data",
Expand Down
Loading

0 comments on commit 6cf43ea

Please sign in to comment.