Add generic HTTP postback `Messenger` support.

This is a major feature that builds upon the `Messenger` interface
that has been in listmonk since its inception (with SMTP as the only
messenger). This commit introduces a new Messenger implementation, an
HTTP "postback", that can post campaign messages as a standard JSON
payload to arbitrary HTTP servers. These servers can in turn push them
to FCM, SMS, or any or any such upstream, enabling listmonk to be a
generic campaign messenger for any type of communication, not just
e-mails.

Postback HTTP endpoints can be defined in settings and they can be
selected on campaigns.
This commit is contained in:
Kailash Nadh 2020-09-20 16:31:24 +05:30
parent be9fbcd542
commit 6cf43ea674
23 changed files with 944 additions and 119 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"syscall"
"time"
@ -29,11 +30,22 @@ func handleGetConfigScript(c echo.Context) error {
out = configScript{
RootURL: app.constants.RootURL,
FromEmail: app.constants.FromEmail,
Messengers: app.manager.GetMessengerNames(),
MediaProvider: app.constants.MediaProvider,
}
)
// Sort messenger names with `email` always as the first item.
var names []string
for name := range app.messengers {
if name == emailMsgr {
continue
}
names = append(names, name)
}
sort.Strings(names)
out.Messengers = append(out.Messengers, emailMsgr)
out.Messengers = append(out.Messengers, names...)
app.Lock()
out.NeedsRestart = app.needsRestart
out.Update = app.update

View File

@ -220,11 +220,6 @@ func handleCreateCampaign(c echo.Context) error {
o = c
}
if !app.manager.HasMessenger(o.MessengerID) {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Unknown messenger %s", o.MessengerID))
}
uu, err := uuid.NewV4()
if err != nil {
app.log.Printf("error generating UUID: %v", err)
@ -243,7 +238,7 @@ func handleCreateCampaign(c echo.Context) error {
o.ContentType,
o.SendAt,
pq.StringArray(normalizeTags(o.Tags)),
"email",
o.Messenger,
o.TemplateID,
o.ListIDs,
); err != nil {
@ -312,6 +307,7 @@ func handleUpdateCampaign(c echo.Context) error {
o.SendAt,
o.SendLater,
pq.StringArray(normalizeTags(o.Tags)),
o.Messenger,
o.TemplateID,
o.ListIDs)
if err != nil {
@ -492,6 +488,7 @@ func handleTestCampaign(c echo.Context) error {
if err := c.Bind(&req); err != nil {
return err
}
// Validate.
if c, err := validateCampaignFields(req, app); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
@ -532,6 +529,9 @@ func handleTestCampaign(c echo.Context) error {
camp.Subject = req.Subject
camp.FromEmail = req.FromEmail
camp.Body = req.Body
camp.Messenger = req.Messenger
camp.ContentType = req.ContentType
camp.TemplateID = req.TemplateID
// Send the test messages.
for _, s := range subs {
@ -560,11 +560,14 @@ func sendTestMessage(sub models.Subscriber, camp *models.Campaign, app *App) err
fmt.Sprintf("Error rendering message: %v", err))
}
return app.messenger.Push(messenger.Message{
From: camp.FromEmail,
To: []string{sub.Email},
Subject: m.Subject(),
Body: m.Body(),
return app.messengers[camp.Messenger].Push(messenger.Message{
From: camp.FromEmail,
To: []string{sub.Email},
Subject: m.Subject(),
ContentType: camp.ContentType,
Body: m.Body(),
Subscriber: sub,
Campaign: camp,
})
}
@ -600,9 +603,13 @@ func validateCampaignFields(c campaignReq, app *App) (campaignReq, error) {
return c, errors.New("no lists selected")
}
if !app.manager.HasMessenger(c.Messenger) {
return c, fmt.Errorf("unknown messenger %s", c.Messenger)
}
camp := models.Campaign{Body: c.Body, TemplateBody: tplTag}
if err := c.CompileTemplate(app.manager.TemplateFuncs(&camp)); err != nil {
return c, fmt.Errorf("Error compiling campaign body: %v", err)
return c, fmt.Errorf("error compiling campaign body: %v", err)
}
return c, nil

View File

@ -25,6 +25,8 @@ import (
"github.com/knadh/listmonk/internal/media/providers/filesystem"
"github.com/knadh/listmonk/internal/media/providers/s3"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/internal/messenger/email"
"github.com/knadh/listmonk/internal/messenger/postback"
"github.com/knadh/listmonk/internal/subimporter"
"github.com/knadh/stuffbin"
"github.com/labstack/echo"
@ -290,11 +292,11 @@ func initImporter(q *Queries, db *sqlx.DB, app *App) *subimporter.Importer {
}, db.DB)
}
// initMessengers initializes various messenger backends.
func initMessengers(m *manager.Manager) messenger.Messenger {
// initSMTPMessenger initializes the SMTP messenger.
func initSMTPMessenger(m *manager.Manager) messenger.Messenger {
var (
mapKeys = ko.MapKeys("smtp")
servers = make([]messenger.Server, 0, len(mapKeys))
servers = make([]email.Server, 0, len(mapKeys))
)
items := ko.Slices("smtp")
@ -302,37 +304,71 @@ func initMessengers(m *manager.Manager) messenger.Messenger {
lo.Fatalf("no SMTP servers found in config")
}
// Load the default SMTP messengers.
// Load the config for multipme SMTP servers.
for _, item := range items {
if !item.Bool("enabled") {
continue
}
// Read the SMTP config.
var s messenger.Server
var s email.Server
if err := item.UnmarshalWithConf("", &s, koanf.UnmarshalConf{Tag: "json"}); err != nil {
lo.Fatalf("error loading SMTP: %v", err)
lo.Fatalf("error reading SMTP config: %v", err)
}
servers = append(servers, s)
lo.Printf("loaded SMTP: %s@%s", item.String("username"), item.String("host"))
lo.Printf("loaded email (SMTP) messenger: %s@%s",
item.String("username"), item.String("host"))
}
if len(servers) == 0 {
lo.Fatalf("no SMTP servers enabled in settings")
}
// Initialize the default e-mail messenger.
msgr, err := messenger.NewEmailer(servers...)
// Initialize the e-mail messenger with multiple SMTP servers.
msgr, err := email.New(servers...)
if err != nil {
lo.Fatalf("error loading e-mail messenger: %v", err)
}
if err := m.AddMessenger(msgr); err != nil {
lo.Printf("error registering messenger %s", err)
}
return msgr
}
// initPostbackMessengers initializes and returns all the enabled
// HTTP postback messenger backends.
func initPostbackMessengers(m *manager.Manager) []messenger.Messenger {
items := ko.Slices("messengers")
if len(items) == 0 {
return nil
}
var out []messenger.Messenger
for _, item := range items {
if !item.Bool("enabled") {
continue
}
// Read the Postback server config.
var (
name = item.String("name")
o postback.Options
)
if err := item.UnmarshalWithConf("", &o, koanf.UnmarshalConf{Tag: "json"}); err != nil {
lo.Fatalf("error reading Postback config: %v", err)
}
// Initialize the Messenger.
p, err := postback.New(o)
if err != nil {
lo.Fatalf("error initializing Postback messenger %s: %v", name, err)
}
out = append(out, p)
lo.Printf("loaded Postback messenger: %s", name)
}
return out
}
// initMediaStore initializes Upload manager with a custom backend.
func initMediaStore() media.Store {
switch provider := ko.String("upload.provider"); provider {

View File

@ -123,7 +123,7 @@ func install(lastVer string, db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
"richtext",
nil,
pq.StringArray{"test-campaign"},
"email",
emailMsgr,
1,
pq.Int64Array{1},
); err != nil {

View File

@ -22,19 +22,23 @@ import (
"github.com/knadh/stuffbin"
)
const (
emailMsgr = "email"
)
// App contains the "global" components that are
// passed around, especially through HTTP handlers.
type App struct {
fs stuffbin.FileSystem
db *sqlx.DB
queries *Queries
constants *constants
manager *manager.Manager
importer *subimporter.Importer
messenger messenger.Messenger
media media.Store
notifTpls *template.Template
log *log.Logger
fs stuffbin.FileSystem
db *sqlx.DB
queries *Queries
constants *constants
manager *manager.Manager
importer *subimporter.Importer
messengers map[string]messenger.Messenger
media media.Store
notifTpls *template.Template
log *log.Logger
// Channel for passing reload signals.
sigChan chan os.Signal
@ -122,18 +126,31 @@ func main() {
// Initialize the main app controller that wraps all of the app's
// components. This is passed around HTTP handlers.
app := &App{
fs: fs,
db: db,
constants: initConstants(),
media: initMediaStore(),
log: lo,
fs: fs,
db: db,
constants: initConstants(),
media: initMediaStore(),
messengers: make(map[string]messenger.Messenger),
log: lo,
}
_, app.queries = initQueries(queryFilePath, db, fs, true)
app.manager = initCampaignManager(app.queries, app.constants, app)
app.importer = initImporter(app.queries, db, app)
app.messenger = initMessengers(app.manager)
app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.constants)
// Initialize the default SMTP (`email`) messenger.
app.messengers[emailMsgr] = initSMTPMessenger(app.manager)
// Initialize any additional postback messengers.
for _, m := range initPostbackMessengers(app.manager) {
app.messengers[m.Name()] = m
}
// Attach all messengers to the campaign manager.
for _, m := range app.messengers {
app.manager.AddMessenger(m)
}
// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5)
@ -164,7 +181,9 @@ func main() {
app.db.DB.Close()
// Close the messenger pool.
app.messenger.Close()
for _, m := range app.messengers {
m.Close()
}
// Signal the close.
closerWait <- true

View File

@ -28,14 +28,13 @@ func (app *App) sendNotification(toEmails []string, subject, tplName string, dat
return err
}
err := app.manager.PushMessage(manager.Message{
From: app.constants.FromEmail,
To: toEmails,
Subject: subject,
Body: b.Bytes(),
Messenger: "email",
})
if err != nil {
m := manager.Message{}
m.From = app.constants.FromEmail
m.To = toEmails
m.Subject = subject
m.Body = b.Bytes()
m.Messenger = emailMsgr
if err := app.manager.PushMessage(m); err != nil {
app.log.Printf("error sending admin notification (%s): %v", subject, err)
return err
}

View File

@ -367,7 +367,7 @@ func handleSelfExportSubscriberData(c echo.Context) error {
// Send the data as a JSON attachment to the subscriber.
const fname = "data.json"
if err := app.messenger.Push(messenger.Message{
if err := app.messengers[emailMsgr].Push(messenger.Message{
From: app.constants.FromEmail,
To: []string{data.Email},
Subject: "Your data",

View File

@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"syscall"
"time"
@ -22,14 +24,24 @@ type settings struct {
AppMaxSendErrors int `json:"app.max_send_errors"`
AppMessageRate int `json:"app.message_rate"`
Messengers []interface{} `json:"messengers"`
PrivacyUnsubHeader bool `json:"privacy.unsubscribe_header"`
PrivacyAllowBlocklist bool `json:"privacy.allow_blocklist"`
PrivacyAllowExport bool `json:"privacy.allow_export"`
PrivacyAllowWipe bool `json:"privacy.allow_wipe"`
PrivacyExportable []string `json:"privacy.exportable"`
UploadProvider string `json:"upload.provider"`
UploadFilesystemUploadPath string `json:"upload.filesystem.upload_path"`
UploadFilesystemUploadURI string `json:"upload.filesystem.upload_uri"`
UploadS3AwsAccessKeyID string `json:"upload.s3.aws_access_key_id"`
UploadS3AwsDefaultRegion string `json:"upload.s3.aws_default_region"`
UploadS3AwsSecretAccessKey string `json:"upload.s3.aws_secret_access_key,omitempty"`
UploadS3Bucket string `json:"upload.s3.bucket"`
UploadS3BucketDomain string `json:"upload.s3.bucket_domain"`
UploadS3BucketPath string `json:"upload.s3.bucket_path"`
UploadS3BucketType string `json:"upload.s3.bucket_type"`
UploadS3Expiry string `json:"upload.s3.expiry"`
SMTP []struct {
Enabled bool `json:"enabled"`
Host string `json:"host"`
@ -47,21 +59,22 @@ type settings struct {
TLSSkipVerify bool `json:"tls_skip_verify"`
} `json:"smtp"`
UploadProvider string `json:"upload.provider"`
UploadFilesystemUploadPath string `json:"upload.filesystem.upload_path"`
UploadFilesystemUploadURI string `json:"upload.filesystem.upload_uri"`
UploadS3AwsAccessKeyID string `json:"upload.s3.aws_access_key_id"`
UploadS3AwsDefaultRegion string `json:"upload.s3.aws_default_region"`
UploadS3AwsSecretAccessKey string `json:"upload.s3.aws_secret_access_key,omitempty"`
UploadS3Bucket string `json:"upload.s3.bucket"`
UploadS3BucketDomain string `json:"upload.s3.bucket_domain"`
UploadS3BucketPath string `json:"upload.s3.bucket_path"`
UploadS3BucketType string `json:"upload.s3.bucket_type"`
UploadS3Expiry string `json:"upload.s3.expiry"`
Messengers []struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
RootURL string `json:"root_url"`
Username string `json:"username"`
Password string `json:"password,omitempty"`
MaxConns int `json:"max_conns"`
Timeout string `json:"timeout"`
MaxMsgRetries int `json:"max_msg_retries"`
} `json:"messengers"`
}
var (
reAlphaNum = regexp.MustCompile(`[^a-z0-9\-]`)
)
// handleGetSettings returns settings from the DB.
func handleGetSettings(c echo.Context) error {
app := c.Get("app").(*App)
@ -75,6 +88,9 @@ func handleGetSettings(c echo.Context) error {
for i := 0; i < len(s.SMTP); i++ {
s.SMTP[i].Password = ""
}
for i := 0; i < len(s.Messengers); i++ {
s.Messengers[i].Password = ""
}
s.UploadS3AwsSecretAccessKey = ""
return c.JSON(http.StatusOK, okResp{s})
@ -111,13 +127,43 @@ func handleUpdateSettings(c echo.Context) error {
if len(cur.SMTP) > i &&
set.SMTP[i].Host == cur.SMTP[i].Host &&
set.SMTP[i].Username == cur.SMTP[i].Username {
// Copy the existing password as password's needn't be
// sent from the frontend for updating entries.
set.SMTP[i].Password = cur.SMTP[i].Password
}
}
}
if !has {
return echo.NewHTTPError(http.StatusBadRequest,
"Minimum one SMTP block should be enabled.")
"At least one SMTP block should be enabled.")
}
// Validate and sanitize postback Messenger names. Duplicates are disallowed
// and "email" is a reserved name.
names := map[string]bool{emailMsgr: true}
for i := range set.Messengers {
if set.Messengers[i].Password == "" {
if len(cur.Messengers) > i &&
set.Messengers[i].RootURL == cur.Messengers[i].RootURL &&
set.Messengers[i].Username == cur.Messengers[i].Username {
// Copy the existing password as password's needn't be
// sent from the frontend for updating entries.
set.Messengers[i].Password = cur.Messengers[i].Password
}
}
name := reAlphaNum.ReplaceAllString(strings.ToLower(set.Messengers[i].Name), "")
if _, ok := names[name]; ok {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Duplicate messenger name `%s`.", name))
}
if len(name) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid messenger name.")
}
set.Messengers[i].Name = name
names[name] = true
}
// S3 password?

View File

@ -7,7 +7,7 @@
<link rel="icon" href="<%= BASE_URL %>frontend/favicon.png" />
<link href="https://fonts.googleapis.com/css?family=Inter:400,600" rel="stylesheet" />
<title><%= htmlWebpackPlugin.options.title %></title>
<script src="<%= BASE_URL %>api/config.js"></script>
<script src="<%= BASE_URL %>api/config.js" id="server-config"></script>
</head>
<body>
<noscript>

View File

@ -11,11 +11,14 @@
<b-radio v-model="form.radioFormat"
@input="onChangeFormat" :disabled="disabled" name="format"
native-value="html">Raw HTML</b-radio>
<b-radio v-model="form.radioFormat"
@input="onChangeFormat" :disabled="disabled" name="format"
native-value="plain">Plain text</b-radio>
</div>
</b-field>
</div>
<div class="column is-6 has-text-right">
<b-button @click="togglePreview" type="is-primary"
<b-button @click="onTogglePreview" type="is-primary"
icon-left="file-find-outline">Preview</b-button>
</div>
</div>
@ -37,9 +40,13 @@
<div v-if="form.format === 'html'"
ref="htmlEditor" id="html-editor" class="html-editor"></div>
<!-- plain text editor //-->
<b-input v-if="form.format === 'plain'" v-model="form.body" @input="onEditorChange"
type="textarea" ref="plainEditor" class="plain-editor" />
<!-- campaign preview //-->
<campaign-preview v-if="isPreviewing"
@close="togglePreview"
@close="onTogglePreview"
type='campaign'
:id='id'
:title='title'
@ -136,7 +143,7 @@ export default {
esc: {
key: 27,
handler: () => {
this.toggleFullscreen(true);
this.onToggleFullscreen(true);
},
},
},
@ -163,8 +170,8 @@ export default {
],
handlers: {
image: this.toggleMedia,
fullscreen: () => this.toggleFullscreen(false),
image: this.onToggleMedia,
fullscreen: () => this.onToggleFullscreen(false),
},
},
},
@ -227,16 +234,16 @@ export default {
});
},
togglePreview() {
onTogglePreview() {
this.isPreviewing = !this.isPreviewing;
},
toggleMedia() {
onToggleMedia() {
this.lastSel = this.$refs.quill.quill.getSelection();
this.isMediaVisible = !this.isMediaVisible;
},
toggleFullscreen(onlyMinimize) {
onToggleFullscreen(onlyMinimize) {
if (onlyMinimize) {
if (!this.isEditorFullscreen) {
return;

View File

@ -16,6 +16,22 @@ Vue.config.productionTip = false;
Vue.prototype.$api = api;
Vue.prototype.$utils = utils;
Vue.prototype.$reloadServerConfig = () => {
// Get the config.js <script> tag, remove it, and re-add it.
let s = document.querySelector('#server-config');
const url = s.getAttribute('src');
s.remove();
s = document.createElement('script');
s.setAttribute('src', url);
s.setAttribute('id', 'server-config');
s.onload = () => {
store.commit('setModelResponse',
{ model: models.serverConfig, data: humps.camelizeKeys(window.CONFIG) });
};
document.body.appendChild(s);
};
// window.CONFIG is loaded from /api/config.js directly in a <script> tag.
if (window.CONFIG) {
store.commit('setModelResponse',

View File

@ -69,6 +69,14 @@
</b-select>
</b-field>
<b-field label="Messenger" label-position="on-border">
<b-select placeholder="Messenger" v-model="form.messenger"
:disabled="!canEdit" required>
<option v-for="m in serverConfig.messengers"
:value="m" :key="m">{{ m }}</option>
</b-select>
</b-field>
<b-field label="Tags" label-position="on-border">
<b-taginput v-model="form.tags" :disabled="!canEdit"
ellipsis icon="tag-outline" placeholder="Tags"></b-taginput>
@ -206,12 +214,12 @@ export default Vue.extend({
subject: this.form.subject,
lists: this.form.lists.map((l) => l.id),
from_email: this.form.fromEmail,
content_type: 'richtext',
messenger: 'email',
messenger: this.form.messenger,
type: 'regular',
tags: this.form.tags,
template_id: this.form.templateId,
body: this.form.body,
content_type: this.form.content.contentType,
body: this.form.content.body,
subscribers: this.form.testEmails,
};
@ -255,7 +263,7 @@ export default Vue.extend({
subject: this.form.subject,
lists: this.form.lists.map((l) => l.id),
from_email: this.form.fromEmail,
messenger: 'email',
messenger: this.form.messenger,
type: 'regular',
tags: this.form.tags,
send_later: this.form.sendLater,
@ -305,7 +313,7 @@ export default Vue.extend({
},
computed: {
...mapState(['lists', 'templates', 'loading']),
...mapState(['serverConfig', 'loading', 'lists', 'templates']),
canEdit() {
return this.isNew
@ -353,6 +361,8 @@ export default Vue.extend({
this.activeTab = 1;
}
});
} else {
this.form.messenger = 'email';
}
this.$nextTick(() => {

View File

@ -183,7 +183,7 @@ export default Vue.extend({
},
computed: {
...mapState(['lists', 'serverConfig', 'loading']),
...mapState(['serverConfig', 'loading', 'lists']),
},
mounted() {

View File

@ -330,8 +330,7 @@
</div>
<div class="column is-3">
<b-field label="Retries" label-position="on-border"
message="The number of times a message should be retried
if sending fails.">
message="Number of times to rety when a message fails.">
<b-numberinput v-model="item.max_msg_retries" name="max_msg_retries"
type="is-light"
controls-position="compact"
@ -378,7 +377,95 @@
<b-button @click="addSMTP" icon-left="plus" type="is-primary">Add new</b-button>
</b-tab-item><!-- mail servers -->
<b-tab-item label="Messengers">
<div class="items messengers">
<div class="block box" v-for="(item, n) in form.messengers" :key="n">
<div class="columns">
<div class="column is-2">
<b-field label="Enabled">
<b-switch v-model="item.enabled" name="enabled"
:native-value="true" />
</b-field>
<b-field>
<a @click.prevent="$utils.confirm(null, () => removeMessenger(n))"
href="#" class="is-size-7">
<b-icon icon="trash-can-outline" size="is-small" /> Delete
</a>
</b-field>
</div><!-- first column -->
<div class="column" :class="{'disabled': !item.enabled}">
<div class="columns">
<div class="column is-4">
<b-field label="Name" label-position="on-border"
message="eg: my-sms. Alphanumeric / dash.">
<b-input v-model="item.name" name="name"
placeholder='mymessenger' :maxlength="200" />
</b-field>
</div>
<div class="column is-8">
<b-field label="URL" label-position="on-border"
message="Root URL of the Postback server.">
<b-input v-model="item.root_url" name="root_url"
placeholder='https://postback.messenger.net/path' :maxlength="200" />
</b-field>
</div>
</div><!-- host -->
<div class="columns">
<div class="column">
<b-field grouped>
<b-field label="Username" label-position="on-border" expanded>
<b-input v-model="item.username" name="username" :maxlength="200" />
</b-field>
<b-field label="Password" label-position="on-border" expanded
message="Enter a value to change.">
<b-input v-model="item.password"
name="password" type="password" placeholder="Enter to change"
:maxlength="200" />
</b-field>
</b-field>
</div>
</div><!-- auth -->
<hr />
<div class="columns">
<div class="column is-4">
<b-field label="Max. connections" label-position="on-border"
message="Maximum concurrent connections to the server.">
<b-numberinput v-model="item.max_conns" name="max_conns" type="is-light"
controls-position="compact"
placeholder="25" min="1" max="65535" />
</b-field>
</div>
<div class="column is-4">
<b-field label="Retries" label-position="on-border"
message="Number of times to rety when a message fails.">
<b-numberinput v-model="item.max_msg_retries" name="max_msg_retries"
type="is-light"
controls-position="compact"
placeholder="2" min="1" max="1000" />
</b-field>
</div>
<div class="column is-4">
<b-field label="Request imeout" label-position="on-border"
message="Request timeout duration (s for second, m for minute).">
<b-input v-model="item.timeout" name="timeout"
placeholder="5s" :pattern="regDuration" :maxlength="10" />
</b-field>
</div>
</div>
<hr />
</div>
</div><!-- second container column -->
</div><!-- block -->
</div><!-- mail-servers -->
<b-button @click="addMessenger" icon-left="plus" type="is-primary">Add new</b-button>
</b-tab-item><!-- messengers -->
</b-tabs>
</form>
</section>
</section>
@ -421,6 +508,24 @@ export default Vue.extend({
this.form.smtp.splice(i, 1, s);
},
addMessenger() {
this.form.messengers.push({
enabled: true,
root_url: '',
name: '',
username: '',
password: '',
max_conns: 25,
max_msg_retries: 2,
timeout: '5s',
});
},
removeMessenger(i) {
this.form.messengers.splice(i, 1);
},
onSubmit() {
const form = JSON.parse(JSON.stringify(this.form));
@ -442,6 +547,13 @@ export default Vue.extend({
form['upload.s3.aws_secret_access_key'] = '';
}
for (let i = 0; i < form.messengers.length; i += 1) {
// If it's the dummy UI password placeholder, ignore it.
if (form.messengers[i].password === dummyPassword) {
form.messengers[i].password = '';
}
}
this.isLoading = true;
this.$api.updateSettings(form).then((data) => {
if (data.needsRestart) {
@ -461,6 +573,7 @@ export default Vue.extend({
this.$api.getHealth().then(() => {
clearInterval(pollId);
this.getSettings();
this.$reloadServerConfig();
});
}, 500);
}, () => {
@ -480,6 +593,12 @@ export default Vue.extend({
d.smtp[i].password = dummyPassword;
}
for (let i = 0; i < d.messengers.length; i += 1) {
// The backend doesn't send passwords, so add a dummy so that it
// the password looks filled on the UI.
d.messengers[i].password = dummyPassword;
}
if (d['upload.provider'] === 's3') {
d['upload.s3.aws_secret_access_key'] = dummyPassword;
}

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/lib/pq v1.3.0
github.com/mailru/easyjson v0.7.6
github.com/nats-io/nats-server/v2 v2.1.7 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/olekukonko/tablewriter v0.0.4 // indirect

4
go.sum
View File

@ -34,6 +34,8 @@ github.com/jaytaylor/html2text v0.0.0-20200220170450-61d9dc4d7195 h1:j0UEFmS7wSj
github.com/jaytaylor/html2text v0.0.0-20200220170450-61d9dc4d7195/go.mod h1:CVKlgaMiht+LXvHG173ujK6JUhZXKb2u/BQtjPDIvyk=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/knadh/email v0.0.0-20200206100304-6d2c7064c2e8 h1:HVq7nA5uWjpo93WsWjva1YIBuQrr8UkWQEUbzg1DX+E=
github.com/knadh/email v0.0.0-20200206100304-6d2c7064c2e8/go.mod h1:Fy2gCFfZhay8jplf/Csj6cyH/oshQTkLQYZbKkcV+SY=
github.com/knadh/goyesql v2.0.0+incompatible h1:hJFJrU8kaiLmvYt9I/1k1AB7q+qRhHs/afzTfQ3eGqk=
@ -63,6 +65,8 @@ github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=

View File

@ -74,10 +74,10 @@ type CampaignMessage struct {
// Message represents a generic message to be pushed to a messenger.
type Message struct {
From string
To []string
Subject string
Body []byte
messenger.Message
Subscriber models.Subscriber
// Messenger is the messenger backend to use: email|postback.
Messenger string
}
@ -171,15 +171,6 @@ func (m *Manager) PushMessage(msg Message) error {
return nil
}
// GetMessengerNames returns the list of registered messengers.
func (m *Manager) GetMessengerNames() []string {
names := make([]string, 0, len(m.messengers))
for n := range m.messengers {
names = append(names, n)
}
return names
}
// HasMessenger checks if a given messenger is registered.
func (m *Manager) HasMessenger(id string) bool {
_, ok := m.messengers[id]
@ -253,10 +244,13 @@ func (m *Manager) messageWorker() {
// Outgoing message.
out := messenger.Message{
From: msg.from,
To: []string{msg.to},
Subject: msg.subject,
Body: msg.body,
From: msg.from,
To: []string{msg.to},
Subject: msg.subject,
ContentType: msg.Campaign.ContentType,
Body: msg.body,
Subscriber: msg.Subscriber,
Campaign: msg.Campaign,
}
// Attach List-Unsubscribe headers?
@ -267,7 +261,7 @@ func (m *Manager) messageWorker() {
out.Headers = h
}
if err := m.messengers[msg.Campaign.MessengerID].Push(out); err != nil {
if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err)
select {
@ -283,10 +277,13 @@ func (m *Manager) messageWorker() {
}
err := m.messengers[msg.Messenger].Push(messenger.Message{
From: msg.From,
To: msg.To,
Subject: msg.Subject,
Body: msg.Body,
From: msg.From,
To: msg.To,
Subject: msg.Subject,
ContentType: msg.ContentType,
Body: msg.Body,
Subscriber: msg.Subscriber,
Campaign: msg.Campaign,
})
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
@ -394,9 +391,9 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
// addCampaign adds a campaign to the process queue.
func (m *Manager) addCampaign(c *models.Campaign) error {
// Validate messenger.
if _, ok := m.messengers[c.MessengerID]; !ok {
if _, ok := m.messengers[c.Messenger]; !ok {
m.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
return fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
}
// Load the template.

View File

@ -1,4 +1,4 @@
package messenger
package email
import (
"crypto/tls"
@ -8,6 +8,7 @@ import (
"net/textproto"
"github.com/jaytaylor/html2text"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/smtppool"
)
@ -35,9 +36,8 @@ type Emailer struct {
servers []*Server
}
// NewEmailer creates and returns an e-mail Messenger backend.
// It takes multiple SMTP configurations.
func NewEmailer(servers ...Server) (*Emailer, error) {
// New returns an SMTP e-mail Messenger backend with a the given SMTP servers.
func New(servers ...Server) (*Emailer, error) {
e := &Emailer{
servers: make([]*Server, 0, len(servers)),
}
@ -86,7 +86,7 @@ func (e *Emailer) Name() string {
}
// Push pushes a message to the server.
func (e *Emailer) Push(m Message) error {
func (e *Emailer) Push(m messenger.Message) error {
// If there are more than one SMTP servers, send to a random
// one from the list.
var (

View File

@ -1,6 +1,10 @@
package messenger
import "net/textproto"
import (
"net/textproto"
"github.com/knadh/listmonk/models"
)
// Messenger is an interface for a generic messaging backend,
// for instance, e-mail, SMS etc.
@ -16,9 +20,15 @@ type Message struct {
From string
To []string
Subject string
ContentType string
Body []byte
Headers textproto.MIMEHeader
Attachments []Attachment
Subscriber models.Subscriber
// Campaign is generally the same instance for a large number of subscribers.
Campaign *models.Campaign
}
// Attachment represents a file or blob attachment that can be

View File

@ -0,0 +1,183 @@
package postback
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
)
// postback is the payload that's posted as JSON to the HTTP Postback server.
//easyjson:json
type postback struct {
Subject string `json:"subject"`
ContentType string `json:"content_type"`
Body string `json:"body"`
Recipients []recipient `json:"recipients"`
Campaign *campaign `json:"campaign"`
}
type campaign struct {
UUID string `db:"uuid" json:"uuid"`
Name string `db:"name" json:"name"`
Tags []string `db:"tags" json:"tags"`
}
type recipient struct {
UUID string `db:"uuid" json:"uuid"`
Email string `db:"email" json:"email"`
Name string `db:"name" json:"name"`
Attribs models.SubscriberAttribs `db:"attribs" json:"attribs"`
Status string `db:"status" json:"status"`
}
// Options represents HTTP Postback server options.
type Options struct {
Name string `json:"name"`
Username string `json:"username"`
Password string `json:"password"`
RootURL string `json:"root_url"`
MaxConns int `json:"max_conns"`
Retries int `json:"retries"`
Timeout time.Duration `json:"timeout"`
}
// Postback represents an HTTP Message server.
type Postback struct {
authStr string
o Options
c *http.Client
}
// New returns a new instance of the HTTP Postback messenger.
func New(o Options) (*Postback, error) {
authStr := ""
if o.Username != "" && o.Password != "" {
authStr = fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(
[]byte(o.Username+":"+o.Password)))
}
return &Postback{
authStr: authStr,
o: o,
c: &http.Client{
Timeout: o.Timeout,
Transport: &http.Transport{
MaxIdleConnsPerHost: o.MaxConns,
MaxConnsPerHost: o.MaxConns,
ResponseHeaderTimeout: o.Timeout,
IdleConnTimeout: o.Timeout,
},
},
}, nil
}
// Name returns the messenger's name.
func (p *Postback) Name() string {
return p.o.Name
}
// Push pushes a message to the server.
func (p *Postback) Push(m messenger.Message) error {
pb := postback{
Subject: m.Subject,
ContentType: m.ContentType,
Body: string(m.Body),
Recipients: []recipient{{
UUID: m.Subscriber.UUID,
Email: m.Subscriber.Email,
Name: m.Subscriber.Name,
Status: m.Subscriber.Status,
Attribs: m.Subscriber.Attribs,
}},
}
if m.Campaign != nil {
pb.Campaign = &campaign{
UUID: m.Campaign.UUID,
Name: m.Campaign.Name,
Tags: m.Campaign.Tags,
}
}
b, err := pb.MarshalJSON()
if err != nil {
return err
}
return p.exec(http.MethodPost, p.o.RootURL, b, nil)
}
// Flush flushes the message queue to the server.
func (p *Postback) Flush() error {
return nil
}
// Close closes idle HTTP connections.
func (p *Postback) Close() error {
p.c.CloseIdleConnections()
return nil
}
func (p *Postback) exec(method, rURL string, reqBody []byte, headers http.Header) error {
var (
err error
postBody io.Reader
)
// Encode POST / PUT params.
if method == http.MethodPost || method == http.MethodPut {
postBody = bytes.NewReader(reqBody)
}
req, err := http.NewRequest(method, rURL, postBody)
if err != nil {
return err
}
if headers != nil {
req.Header = headers
} else {
req.Header = http.Header{}
}
req.Header.Set("User-Agent", "listmonk")
// Optional BasicAuth.
if p.authStr != "" {
req.Header.Set("Authorization", p.authStr)
}
// If a content-type isn't set, set the default one.
if req.Header.Get("Content-Type") == "" {
if method == http.MethodPost || method == http.MethodPut {
req.Header.Add("Content-Type", "application/json")
}
}
// If the request method is GET or DELETE, add the params as QueryString.
if method == http.MethodGet || method == http.MethodDelete {
req.URL.RawQuery = string(reqBody)
}
r, err := p.c.Do(req)
if err != nil {
return err
}
defer func() {
// Drain and close the body to let the Transport reuse the connection
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
}()
if r.StatusCode != http.StatusOK {
return fmt.Errorf("non-OK response from Postback server: %d", r.StatusCode)
}
return nil
}

View File

@ -0,0 +1,358 @@
// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT.
package postback
import (
json "encoding/json"
models "github.com/knadh/listmonk/models"
easyjson "github.com/mailru/easyjson"
jlexer "github.com/mailru/easyjson/jlexer"
jwriter "github.com/mailru/easyjson/jwriter"
)
// suppress unused package warning
var (
_ *json.RawMessage
_ *jlexer.Lexer
_ *jwriter.Writer
_ easyjson.Marshaler
)
func easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback(in *jlexer.Lexer, out *postback) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "subject":
out.Subject = string(in.String())
case "content_type":
out.ContentType = string(in.String())
case "body":
out.Body = string(in.String())
case "recipients":
if in.IsNull() {
in.Skip()
out.Recipients = nil
} else {
in.Delim('[')
if out.Recipients == nil {
if !in.IsDelim(']') {
out.Recipients = make([]recipient, 0, 0)
} else {
out.Recipients = []recipient{}
}
} else {
out.Recipients = (out.Recipients)[:0]
}
for !in.IsDelim(']') {
var v1 recipient
easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback1(in, &v1)
out.Recipients = append(out.Recipients, v1)
in.WantComma()
}
in.Delim(']')
}
case "campaign":
if in.IsNull() {
in.Skip()
out.Campaign = nil
} else {
if out.Campaign == nil {
out.Campaign = new(campaign)
}
easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback2(in, out.Campaign)
}
default:
in.SkipRecursive()
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
func easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback(out *jwriter.Writer, in postback) {
out.RawByte('{')
first := true
_ = first
{
const prefix string = ",\"subject\":"
out.RawString(prefix[1:])
out.String(string(in.Subject))
}
{
const prefix string = ",\"content_type\":"
out.RawString(prefix)
out.String(string(in.ContentType))
}
{
const prefix string = ",\"body\":"
out.RawString(prefix)
out.String(string(in.Body))
}
{
const prefix string = ",\"recipients\":"
out.RawString(prefix)
if in.Recipients == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 {
out.RawString("null")
} else {
out.RawByte('[')
for v2, v3 := range in.Recipients {
if v2 > 0 {
out.RawByte(',')
}
easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback1(out, v3)
}
out.RawByte(']')
}
}
{
const prefix string = ",\"campaign\":"
out.RawString(prefix)
if in.Campaign == nil {
out.RawString("null")
} else {
easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback2(out, *in.Campaign)
}
}
out.RawByte('}')
}
// MarshalJSON supports json.Marshaler interface
func (v postback) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v postback) MarshalEasyJSON(w *jwriter.Writer) {
easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *postback) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *postback) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback(l, v)
}
func easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback2(in *jlexer.Lexer, out *campaign) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "uuid":
out.UUID = string(in.String())
case "name":
out.Name = string(in.String())
case "tags":
if in.IsNull() {
in.Skip()
out.Tags = nil
} else {
in.Delim('[')
if out.Tags == nil {
if !in.IsDelim(']') {
out.Tags = make([]string, 0, 4)
} else {
out.Tags = []string{}
}
} else {
out.Tags = (out.Tags)[:0]
}
for !in.IsDelim(']') {
var v4 string
v4 = string(in.String())
out.Tags = append(out.Tags, v4)
in.WantComma()
}
in.Delim(']')
}
default:
in.SkipRecursive()
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
func easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback2(out *jwriter.Writer, in campaign) {
out.RawByte('{')
first := true
_ = first
{
const prefix string = ",\"uuid\":"
out.RawString(prefix[1:])
out.String(string(in.UUID))
}
{
const prefix string = ",\"name\":"
out.RawString(prefix)
out.String(string(in.Name))
}
{
const prefix string = ",\"tags\":"
out.RawString(prefix)
if in.Tags == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 {
out.RawString("null")
} else {
out.RawByte('[')
for v5, v6 := range in.Tags {
if v5 > 0 {
out.RawByte(',')
}
out.String(string(v6))
}
out.RawByte(']')
}
}
out.RawByte('}')
}
func easyjsonDf11841fDecodeGithubComKnadhListmonkInternalMessengerPostback1(in *jlexer.Lexer, out *recipient) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "uuid":
out.UUID = string(in.String())
case "email":
out.Email = string(in.String())
case "name":
out.Name = string(in.String())
case "attribs":
if in.IsNull() {
in.Skip()
} else {
in.Delim('{')
out.Attribs = make(models.SubscriberAttribs)
for !in.IsDelim('}') {
key := string(in.String())
in.WantColon()
var v7 interface{}
if m, ok := v7.(easyjson.Unmarshaler); ok {
m.UnmarshalEasyJSON(in)
} else if m, ok := v7.(json.Unmarshaler); ok {
_ = m.UnmarshalJSON(in.Raw())
} else {
v7 = in.Interface()
}
(out.Attribs)[key] = v7
in.WantComma()
}
in.Delim('}')
}
case "status":
out.Status = string(in.String())
default:
in.SkipRecursive()
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
func easyjsonDf11841fEncodeGithubComKnadhListmonkInternalMessengerPostback1(out *jwriter.Writer, in recipient) {
out.RawByte('{')
first := true
_ = first
{
const prefix string = ",\"uuid\":"
out.RawString(prefix[1:])
out.String(string(in.UUID))
}
{
const prefix string = ",\"email\":"
out.RawString(prefix)
out.String(string(in.Email))
}
{
const prefix string = ",\"name\":"
out.RawString(prefix)
out.String(string(in.Name))
}
{
const prefix string = ",\"attribs\":"
out.RawString(prefix)
if in.Attribs == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 {
out.RawString(`null`)
} else {
out.RawByte('{')
v8First := true
for v8Name, v8Value := range in.Attribs {
if v8First {
v8First = false
} else {
out.RawByte(',')
}
out.String(string(v8Name))
out.RawByte(':')
if m, ok := v8Value.(easyjson.Marshaler); ok {
m.MarshalEasyJSON(out)
} else if m, ok := v8Value.(json.Marshaler); ok {
out.Raw(m.MarshalJSON())
} else {
out.Raw(json.Marshal(v8Value))
}
}
out.RawByte('}')
}
}
{
const prefix string = ",\"status\":"
out.RawString(prefix)
out.String(string(in.Status))
}
out.RawByte('}')
}

View File

@ -164,7 +164,7 @@ type Campaign struct {
ContentType string `db:"content_type" json:"content_type"`
Tags pq.StringArray `db:"tags" json:"tags"`
TemplateID int `db:"template_id" json:"template_id"`
MessengerID string `db:"messenger" json:"messenger"`
Messenger string `db:"messenger" json:"messenger"`
// TemplateBody is joined in from templates by the next-campaigns query.
TemplateBody string `db:"template_body" json:"-"`

View File

@ -561,16 +561,17 @@ WITH camp AS (
send_at=(CASE WHEN $8 THEN $7::TIMESTAMP WITH TIME ZONE WHEN NOT $8 THEN NULL ELSE send_at END),
status=(CASE WHEN NOT $8 THEN 'draft' ELSE status END),
tags=$9::VARCHAR(100)[],
template_id=(CASE WHEN $10 != 0 THEN $10 ELSE template_id END),
messenger=(CASE WHEN $10 != '' THEN $10 ELSE messenger END),
template_id=(CASE WHEN $11 != 0 THEN $11 ELSE template_id END),
updated_at=NOW()
WHERE id = $1 RETURNING id
),
d AS (
-- Reset list relationships
DELETE FROM campaign_lists WHERE campaign_id = $1 AND NOT(list_id = ANY($11))
DELETE FROM campaign_lists WHERE campaign_id = $1 AND NOT(list_id = ANY($12))
)
INSERT INTO campaign_lists (campaign_id, list_id, list_name)
(SELECT $1 as campaign_id, id, name FROM lists WHERE id=ANY($11::INT[]))
(SELECT $1 as campaign_id, id, name FROM lists WHERE id=ANY($12::INT[]))
ON CONFLICT (campaign_id, list_id) DO UPDATE SET list_name = EXCLUDED.list_name;
-- name: update-campaign-counts