From c24c19b12015c2aba803d97bbc25f7215b52cf3a Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Wed, 28 Nov 2018 13:29:57 +0530 Subject: [PATCH] Add admin e-mail notifications. - Add notifications for campaign state change - Add notifications for import state change Related changes. - Add a new 'templates' directory with HTML templates - Move the static campaign template as a .tpl file into it - Change Messenger.Push() to accept multiple recipients - Change exhaustCampaign()'s behaviour to pass metadata to admin emails --- campaigns.go | 2 +- config.toml.sample | 5 + import.go | 4 +- install.go | 2 +- main.go | 45 ++++--- messenger/emailer.go | 4 +- messenger/messenger.go | 2 +- models/models.go | 4 + notifications.go | 32 +++++ public/static/logo.svg | 111 ++++++++++++++++++ runner/runner.go | 47 ++++++-- subimporter/importer.go | 37 +++++- templates/base.html | 83 +++++++++++++ templates/campaign-status.html | 25 ++++ .../default.tpl | 0 templates/import-status.html | 19 +++ 16 files changed, 386 insertions(+), 36 deletions(-) create mode 100644 notifications.go create mode 100644 public/static/logo.svg create mode 100644 templates/base.html create mode 100644 templates/campaign-status.html rename default-template.html => templates/default.tpl (100%) create mode 100644 templates/import-status.html diff --git a/campaigns.go b/campaigns.go index 9445be3..980716f 100644 --- a/campaigns.go +++ b/campaigns.go @@ -477,7 +477,7 @@ func sendTestMessage(sub *models.Subscriber, camp *models.Campaign, app *App) er fmt.Sprintf("Error rendering message: %v", err)) } - if err := app.Messenger.Push(camp.FromEmail, sub.Email, camp.Subject, m.Body); err != nil { + if err := app.Messenger.Push(camp.FromEmail, []string{sub.Email}, camp.Subject, m.Body); err != nil { return err } diff --git a/config.toml.sample b/config.toml.sample index e97ace0..337c6d5 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -9,6 +9,11 @@ root = "http://listmonk.mysite.com" # The default 'from' e-mail for outgoing e-mail campaigns. from_email = "listmonk " +# List of e-mail addresses to which admin notifications such as +# import updates, campaign completion, failure etc. should be sent. +# To disable notifications, set an empty list, eg: notify_emails = [] +notify_emails = ["admin1@mysite.com", "admin2@mysite.com"] + # Path to the uploads directory where media will be uploaded. upload_path = "uploads" diff --git a/import.go b/import.go index ccf1711..ed63ceb 100644 --- a/import.go +++ b/import.go @@ -110,8 +110,8 @@ func handleGetImportSubscribers(c echo.Context) error { return c.JSON(http.StatusOK, okResp{s}) } -// handleGetImportSubscriberLogs returns import statistics. -func handleGetImportSubscriberLogs(c echo.Context) error { +// handleGetImportSubscriberStats returns import statistics. +func handleGetImportSubscriberStats(c echo.Context) error { app := c.Get("app").(*App) return c.JSON(http.StatusOK, okResp{string(app.Importer.GetLogs())}) } diff --git a/install.go b/install.go index 2759c42..d262d8a 100644 --- a/install.go +++ b/install.go @@ -129,7 +129,7 @@ func install(app *App, qMap goyesql.Queries) { } // Default template. - tplBody, err := ioutil.ReadFile("default-template.html") + tplBody, err := ioutil.ReadFile("templates/default.tpl") if err != nil { tplBody = []byte(tplTag) } diff --git a/main.go b/main.go index 971ee16..a7b8bc3 100644 --- a/main.go +++ b/main.go @@ -20,14 +20,13 @@ import ( "github.com/spf13/viper" ) -var logger *log.Logger - type constants struct { - AssetPath string `mapstructure:"asset_path"` - RootURL string `mapstructure:"root"` - UploadPath string `mapstructure:"upload_path"` - UploadURI string `mapstructure:"upload_uri"` - FromEmail string `mapstructure:"from_email"` + AssetPath string `mapstructure:"asset_path"` + RootURL string `mapstructure:"root"` + UploadPath string `mapstructure:"upload_path"` + UploadURI string `mapstructure:"upload_uri"` + FromEmail string `mapstructure:"from_email"` + NotifyEmails []string `mapstructure:"notify_emails"` } // App contains the "global" components that are @@ -39,10 +38,12 @@ type App struct { Importer *subimporter.Importer Runner *runner.Runner Logger *log.Logger - + NotifTpls *template.Template Messenger messenger.Messenger } +var logger *log.Logger + func init() { logger = log.New(os.Stdout, "SYS: ", log.Ldate|log.Ltime|log.Lshortfile) @@ -94,7 +95,7 @@ func registerHandlers(e *echo.Echo) { e.POST("/api/subscribers/lists", handleQuerySubscribersIntoLists) e.GET("/api/import/subscribers", handleGetImportSubscribers) - e.GET("/api/import/subscribers/logs", handleGetImportSubscriberLogs) + e.GET("/api/import/subscribers/logs", handleGetImportSubscriberStats) e.POST("/api/import/subscribers", handleImportSubscribers) e.DELETE("/api/import/subscribers", handleStopImportSubscribers) @@ -158,7 +159,6 @@ func initMessengers(r *runner.Runner) messenger.Messenger { var s messenger.Server viper.UnmarshalKey("smtp."+name, &s) - s.Name = name s.SendTimeout = s.SendTimeout * time.Millisecond srv = append(srv, s) @@ -170,7 +170,6 @@ func initMessengers(r *runner.Runner) messenger.Messenger { if err != nil { logger.Fatalf("error loading e-mail messenger: %v", err) } - if err := r.AddMessenger(msgr); err != nil { logger.Printf("error registering messenger %s", err) } @@ -220,14 +219,32 @@ func main() { if err := scanQueriesToStruct(q, qMap, db.Unsafe()); err != nil { logger.Fatalf("no SQL queries loaded: %v", err) } - app.Queries = q - app.Importer = subimporter.New(q.UpsertSubscriber.Stmt, q.BlacklistSubscriber.Stmt, db.DB) + + // Importer. + importNotifCB := func(subject string, data map[string]interface{}) error { + return sendNotification(notifTplImport, subject, data, app) + } + app.Importer = subimporter.New(q.UpsertSubscriber.Stmt, + q.BlacklistSubscriber.Stmt, + db.DB, + importNotifCB) + + // System e-mail templates. + notifTpls, err := template.ParseGlob("templates/*.html") + if err != nil { + logger.Fatalf("error loading system templates: %v", err) + } + app.NotifTpls = notifTpls // Campaign daemon. + campNotifCB := func(subject string, data map[string]interface{}) error { + return sendNotification(notifTplCampaign, subject, data, app) + } r := runner.New(runner.Config{ Concurrency: viper.GetInt("app.concurrency"), MaxSendErrors: viper.GetInt("app.max_send_errors"), + FromEmail: app.Constants.FromEmail, // url.com/unsubscribe/{campaign_uuid}/{subscriber_uuid} UnsubscribeURL: fmt.Sprintf("%s/unsubscribe/%%s/%%s", app.Constants.RootURL), @@ -237,7 +254,7 @@ func main() { // url.com/campaign/{campaign_uuid}/{subscriber_uuid}/px.png ViewTrackURL: fmt.Sprintf("%s/campaign/%%s/%%s/px.png", app.Constants.RootURL), - }, newRunnerDB(q), logger) + }, newRunnerDB(q), campNotifCB, logger) app.Runner = r // Add messengers. diff --git a/messenger/emailer.go b/messenger/emailer.go index cb0dbc4..8e03093 100644 --- a/messenger/emailer.go +++ b/messenger/emailer.go @@ -66,7 +66,7 @@ func (e *emailer) Name() string { } // Push pushes a message to the server. -func (e *emailer) Push(fromAddr, toAddr, subject string, m []byte) error { +func (e *emailer) Push(fromAddr string, toAddr []string, subject string, m []byte) error { var key string // If there are more than one SMTP servers, send to a random @@ -80,7 +80,7 @@ func (e *emailer) Push(fromAddr, toAddr, subject string, m []byte) error { srv := e.servers[key] err := srv.mailer.Send(&email.Email{ From: fromAddr, - To: []string{toAddr}, + To: toAddr, Subject: subject, HTML: m, }, srv.SendTimeout) diff --git a/messenger/messenger.go b/messenger/messenger.go index 278f862..c1883af 100644 --- a/messenger/messenger.go +++ b/messenger/messenger.go @@ -5,6 +5,6 @@ package messenger type Messenger interface { Name() string - Push(fromAddr, toAddr, subject string, message []byte) error + Push(fromAddr string, toAddr []string, subject string, message []byte) error Flush() error } diff --git a/models/models.go b/models/models.go index 2400222..c84fcf3 100644 --- a/models/models.go +++ b/models/models.go @@ -58,6 +58,10 @@ var ( regexpViewTagReplace = `{{ TrackView .Campaign.UUID .Subscriber.UUID }}` ) +// AdminNotifCallback is a callback function that's called +// when a campaign's status changes. +type AdminNotifCallback func(subject string, data map[string]interface{}) error + // Base holds common fields shared across models. type Base struct { ID int `db:"id" json:"id"` diff --git a/notifications.go b/notifications.go new file mode 100644 index 0000000..21a9fa2 --- /dev/null +++ b/notifications.go @@ -0,0 +1,32 @@ +package main + +import ( + "bytes" +) + +const ( + notifTplImport = "import-status" + notifTplCampaign = "campaign-status" +) + +// sendNotification sends out an e-mail notification to admins. +func sendNotification(tpl, subject string, data map[string]interface{}, app *App) error { + data["RootURL"] = app.Constants.RootURL + + var b bytes.Buffer + err := app.NotifTpls.ExecuteTemplate(&b, tpl, data) + if err != nil { + return err + } + + err = app.Messenger.Push(app.Constants.FromEmail, + app.Constants.NotifyEmails, + subject, + b.Bytes()) + if err != nil { + app.Logger.Printf("error sending admin notification (%s): %v", subject, err) + return err + } + + return nil +} diff --git a/public/static/logo.svg b/public/static/logo.svg new file mode 100644 index 0000000..4a41d57 --- /dev/null +++ b/public/static/logo.svg @@ -0,0 +1,111 @@ + + + + + + + + + + image/svg+xml + + + + + + + + listmonk + + + + diff --git a/runner/runner.go b/runner/runner.go index ea5247b..8e9acb5 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -5,6 +5,7 @@ import ( "fmt" "html/template" "log" + "strings" "sync" "time" @@ -38,6 +39,7 @@ type Runner struct { cfg Config src DataSource messengers map[string]messenger.Messenger + notifCB models.AdminNotifCallback logger *log.Logger // Campaigns that are currently running. @@ -70,6 +72,7 @@ type Config struct { Concurrency int MaxSendErrors int RequeueOnError bool + FromEmail string LinkTrackURL string UnsubscribeURL string ViewTrackURL string @@ -81,10 +84,11 @@ type msgError struct { } // New returns a new instance of Mailer. -func New(cfg Config, src DataSource, l *log.Logger) *Runner { +func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Runner { r := Runner{ cfg: cfg, src: src, + notifCB: notifCB, logger: l, messengers: make(map[string]messenger.Messenger), camps: make(map[int]*models.Campaign, 0), @@ -189,6 +193,11 @@ func (r *Runner) Run(tick time.Duration) { r.exhaustCampaign(e.camp, models.CampaignStatusPaused) } delete(r.msgErrorCounts, e.camp.ID) + + // Notify admins. + r.sendNotif(e.camp, + models.CampaignStatusPaused, + "Too many errors") } } } @@ -205,12 +214,15 @@ func (r *Runner) Run(tick time.Duration) { if has { // There are more subscribers to fetch. r.subFetchQueue <- c - } else { + } else if r.isCampaignProcessing(c.ID) { // There are no more subscribers. Either the campaign status // has changed or all subscribers have been processed. - if err := r.exhaustCampaign(c, ""); err != nil { + newC, err := r.exhaustCampaign(c, "") + if err != nil { r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err) + continue } + r.sendNotif(newC, newC.Status, "") } } @@ -227,7 +239,7 @@ func (r *Runner) SpawnWorkers() { err := r.messengers[m.Campaign.MessengerID].Push( m.from, - m.to, + []string{m.to}, m.Campaign.Subject, m.Body) if err != nil { @@ -311,7 +323,7 @@ func (r *Runner) isCampaignProcessing(id int) bool { return ok } -func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error { +func (r *Runner) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) { delete(r.camps, c.ID) // A status has been passed. Change the campaign's status @@ -322,18 +334,18 @@ func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error { } else { r.logger.Printf("set campaign (%s) to %s", c.Name, status) } - - return nil + return c, nil } // Fetch the up-to-date campaign status from the source. cm, err := r.src.GetCampaign(c.ID) if err != nil { - return err + return nil, err } // If a running campaign has exhausted subscribers, it's finished. if cm.Status == models.CampaignStatusRunning { + cm.Status = models.CampaignStatusFinished if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil { r.logger.Printf("error finishing campaign (%s): %v", c.Name, err) } else { @@ -343,7 +355,7 @@ func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error { r.logger.Printf("stop processing campaign (%s)", c.Name) } - return nil + return cm, nil } // Render takes a Message, executes its pre-compiled Campaign.Tpl @@ -383,6 +395,23 @@ func (r *Runner) trackLink(url, campUUID, subUUID string) string { return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID) } +// sendNotif sends a notification to registered admin e-mails. +func (r *Runner) sendNotif(c *models.Campaign, status, reason string) error { + var ( + subject = fmt.Sprintf("%s: %s", strings.Title(status), c.Name) + data = map[string]interface{}{ + "ID": c.ID, + "Name": c.Name, + "Status": status, + "Sent": c.Sent, + "ToSend": c.ToSend, + "Reason": reason, + } + ) + + return r.notifCB(subject, data) +} + // TemplateFuncs returns the template functions to be applied into // compiled campaign templates. func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap { diff --git a/subimporter/importer.go b/subimporter/importer.go index dbb2d12..112f121 100644 --- a/subimporter/importer.go +++ b/subimporter/importer.go @@ -13,6 +13,7 @@ import ( "encoding/csv" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "log" @@ -53,13 +54,14 @@ const ( // Importer represents the bulk CSV subscriber import system. type Importer struct { - upsert *sql.Stmt - blacklist *sql.Stmt - db *sql.DB + upsert *sql.Stmt + blacklist *sql.Stmt + db *sql.DB + notifCB models.AdminNotifCallback + isImporting bool stop chan bool - - status *Status + status *Status sync.RWMutex } @@ -99,12 +101,13 @@ var ( ) // New returns a new instance of Importer. -func New(upsert *sql.Stmt, blacklist *sql.Stmt, db *sql.DB) *Importer { +func New(upsert *sql.Stmt, blacklist *sql.Stmt, db *sql.DB, notifCB models.AdminNotifCallback) *Importer { im := Importer{ upsert: upsert, blacklist: blacklist, stop: make(chan bool, 1), db: db, + notifCB: notifCB, status: &Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)}, } @@ -183,6 +186,24 @@ func (im *Importer) incrementImportCount(n int) { im.Unlock() } +// sendNotif sends admin notifications for import completions. +func (im *Importer) sendNotif(status string) error { + var ( + s = im.GetStats() + data = map[string]interface{}{ + "Name": s.Name, + "Status": status, + "Imported": s.Imported, + "Total": s.Total, + } + subject = fmt.Sprintf("%s: %s import", + strings.Title(status), + s.Name) + ) + + return im.notifCB(subject, data) +} + // Start is a blocking function that selects on a channel queue until all // subscriber entries in the import session are imported. It should be // invoked as a goroutine. @@ -249,6 +270,8 @@ func (s *Session) Start() { if cur == 0 { s.im.setStatus(StatusFinished) s.log.Printf("imported finished") + s.im.sendNotif(StatusFinished) + return } @@ -257,12 +280,14 @@ func (s *Session) Start() { tx.Rollback() s.im.setStatus(StatusFailed) s.log.Printf("error committing to DB: %v", err) + s.im.sendNotif(StatusFailed) return } s.im.incrementImportCount(cur) s.im.setStatus(StatusFinished) s.log.Printf("imported finished") + s.im.sendNotif(StatusFinished) } // ExtractZIP takes a ZIP file's path and extracts all .csv files in it to diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..467434e --- /dev/null +++ b/templates/base.html @@ -0,0 +1,83 @@ +{{ define "header" }} + + + + + + + + + + +
 
+
+
+ listmonk +
+{{ end }} + +{{ define "footer" }} +
+ + +
 
+ + +{{ end }} diff --git a/templates/campaign-status.html b/templates/campaign-status.html new file mode 100644 index 0000000..d174c02 --- /dev/null +++ b/templates/campaign-status.html @@ -0,0 +1,25 @@ +{{ define "campaign-status" }} +{{ template "header" . }} +

Campaign update

+ + + + + + + + + + + + + + {{ if ne (index . "Reason") "" }} + + + + + {{ end }} +
Campaign{{ index . "Name" }}
Status{{ index . "Status" }}
Sent{{ index . "Sent" }} / {{ index . "ToSend" }}
Reason{{ index . "Reason" }}
+{{ template "footer" }} +{{ end }} \ No newline at end of file diff --git a/default-template.html b/templates/default.tpl similarity index 100% rename from default-template.html rename to templates/default.tpl diff --git a/templates/import-status.html b/templates/import-status.html new file mode 100644 index 0000000..1c4ff91 --- /dev/null +++ b/templates/import-status.html @@ -0,0 +1,19 @@ +{{ define "import-status" }} +{{ template "header" . }} +

Import update

+ + + + + + + + + + + + + +
File{{ .Name }}
Status{{ .Status }}
Records{{ .Imported }} / {{ .Total }}
+{{ template "footer" }} +{{ end }} \ No newline at end of file