From 1a39ed15eca0a38d8808c58d09b04d69ef3f7799 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Mon, 26 Nov 2018 16:40:51 +0530 Subject: [PATCH] Add support for campaign error tracking and auto-pause. When a campaign exceeds N number of message send errors, for instance SMTP errors, it is now auto-paused until there is manual intervention. For this, the master goroutine in runner.Run() that was synchronising between the tick based DB scanner and subscriber fetching has been split into two. A new queue aggregates send errors from workers again a threshold after which the campaign is paused. --- config.toml.sample | 2 +- generate-subs.py | 3 +- main.go | 5 +- runner/runner.go | 195 ++++++++++++++++++++++++++++++--------------- runner_db.go | 18 +---- 5 files changed, 138 insertions(+), 85 deletions(-) diff --git a/config.toml.sample b/config.toml.sample index 15f57f8..edbe78a 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -1,4 +1,3 @@ -# Application. [app] # Interface and port where the app will run its webserver. address = "0.0.0.0:9000" @@ -27,6 +26,7 @@ asset_path = "frontend/my/build" # mail server will concurrency = 100 + # Database. [db] host = "localhost" diff --git a/generate-subs.py b/generate-subs.py index 48b90ce..a979630 100644 --- a/generate-subs.py +++ b/generate-subs.py @@ -3,13 +3,12 @@ import random f = open("/tmp/subs.csv", "w+") w = csv.writer(f) -w.writerow(["email", "name", "status", "tags", "attributes"]) +w.writerow(["email", "name", "status", "attributes"]) for n in range(0, 100000): w.writerow([ "user%d@mail.com" % (n,), "First%d Last%d" % (n, n), "enabled", - "apple|mango|orange", "{\"age\": %d, \"city\": \"Bangalore\"}" % (random.randint(20,70),) ]) diff --git a/main.go b/main.go index 21e5f23..971ee16 100644 --- a/main.go +++ b/main.go @@ -226,7 +226,8 @@ func main() { // Campaign daemon. r := runner.New(runner.Config{ - Concurrency: viper.GetInt("app.concurrency"), + Concurrency: viper.GetInt("app.concurrency"), + MaxSendErrors: viper.GetInt("app.max_send_errors"), // url.com/unsubscribe/{campaign_uuid}/{subscriber_uuid} UnsubscribeURL: fmt.Sprintf("%s/unsubscribe/%%s/%%s", app.Constants.RootURL), @@ -242,7 +243,7 @@ func main() { // Add messengers. app.Messenger = initMessengers(app.Runner) - go r.Run(time.Duration(time.Second * 2)) + go r.Run(time.Duration(time.Second * 5)) r.SpawnWorkers() // Initialize the server. diff --git a/runner/runner.go b/runner/runner.go index ea118fd..ea5247b 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -28,9 +28,7 @@ type DataSource interface { NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) NextSubscribers(campID, limit int) ([]*models.Subscriber, error) GetCampaign(campID int) (*models.Campaign, error) - PauseCampaign(campID int) error - CancelCampaign(campID int) error - FinishCampaign(campID int) error + UpdateCampaignStatus(campID int, status string) error CreateLink(url string) (string, error) } @@ -51,8 +49,10 @@ type Runner struct { links map[string]string linksMutex sync.RWMutex - msgQueue chan *Message - subFetchQueue chan *models.Campaign + subFetchQueue chan *models.Campaign + msgQueue chan *Message + msgErrorQueue chan msgError + msgErrorCounts map[int]int } // Message represents an active subscriber that's being processed. @@ -61,28 +61,38 @@ type Message struct { Subscriber *models.Subscriber UnsubscribeURL string Body []byte + from string to string } // Config has parameters for configuring the runner. type Config struct { Concurrency int + MaxSendErrors int + RequeueOnError bool LinkTrackURL string UnsubscribeURL string ViewTrackURL string } +type msgError struct { + camp *models.Campaign + err error +} + // New returns a new instance of Mailer. func New(cfg Config, src DataSource, l *log.Logger) *Runner { r := Runner{ - cfg: cfg, - messengers: make(map[string]messenger.Messenger), - src: src, - camps: make(map[int]*models.Campaign, 0), - links: make(map[string]string, 0), - logger: l, - subFetchQueue: make(chan *models.Campaign, 100), - msgQueue: make(chan *Message, cfg.Concurrency), + cfg: cfg, + src: src, + logger: l, + messengers: make(map[string]messenger.Messenger), + camps: make(map[int]*models.Campaign, 0), + links: make(map[string]string, 0), + subFetchQueue: make(chan *models.Campaign, cfg.Concurrency), + msgQueue: make(chan *Message, cfg.Concurrency), + msgErrorQueue: make(chan msgError, cfg.MaxSendErrors), + msgErrorCounts: make(map[int]int), } return &r @@ -92,6 +102,7 @@ func New(cfg Config, src DataSource, l *log.Logger) *Runner { // to message templates while they're compiled. func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message { return &Message{ + from: c.FromEmail, to: s.Email, Campaign: c, Subscriber: s, @@ -133,69 +144,103 @@ func (r *Runner) HasMessenger(id string) bool { // until all subscribers are exhausted, at which point, a campaign is marked // as "finished". func (r *Runner) Run(tick time.Duration) { - var ( - tScanCampaigns = time.NewTicker(tick) - ) - - for { - select { - // Fetch all 'running campaigns that aren't being processed. - case <-tScanCampaigns.C: - campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs()) - if err != nil { - r.logger.Printf("error fetching campaigns: %v", err) - return - } - - for _, c := range campaigns { - if err := r.addCampaign(c); err != nil { - r.logger.Printf("error processing campaign (%s): %v", c.Name, err) + go func() { + t := time.NewTicker(tick) + for { + select { + // Periodically scan the data source for campaigns to process. + case <-t.C: + campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs()) + if err != nil { + r.logger.Printf("error fetching campaigns: %v", err) continue } - r.logger.Printf("start processing campaign (%s)", c.Name) - r.subFetchQueue <- c - } + for _, c := range campaigns { + if err := r.addCampaign(c); err != nil { + r.logger.Printf("error processing campaign (%s): %v", c.Name, err) + continue + } + r.logger.Printf("start processing campaign (%s)", c.Name) - // Fetch next set of subscribers for the incoming campaign ID - // and process them. - case c := <-r.subFetchQueue: - has, err := r.nextSubscribers(c, batchSize) - if err != nil { - r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err) - } + // If subscriber processing is busy, move on. Blocking and waiting + // can end up in a race condition where the waiting campaign's + // state in the data source has changed. + select { + case r.subFetchQueue <- c: + default: + } + } - if has { - // There are more subscribers to fetch. - r.subFetchQueue <- c - } else { - // No subscribers. - if err := r.processExhaustedCampaign(c); err != nil { - r.logger.Printf("error processing campaign (%s): %v", c.Name, err) + // Aggregate errors from sending messages to check against the error threshold + // after which a campaign is paused. + case e := <-r.msgErrorQueue: + if r.cfg.MaxSendErrors < 1 { + continue + } + + // If the error threshold is met, pause the campaign. + r.msgErrorCounts[e.camp.ID]++ + if r.msgErrorCounts[e.camp.ID] >= r.cfg.MaxSendErrors { + r.logger.Printf("error counted exceeded %d. pausing campaign %s", + r.cfg.MaxSendErrors, e.camp.Name) + + if r.isCampaignProcessing(e.camp.ID) { + r.exhaustCampaign(e.camp, models.CampaignStatusPaused) + } + delete(r.msgErrorCounts, e.camp.ID) } } } + }() + + // Fetch the next set of subscribers for a campaign and process them. + for c := range r.subFetchQueue { + has, err := r.nextSubscribers(c, batchSize) + if err != nil { + r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err) + continue + } + + if has { + // There are more subscribers to fetch. + r.subFetchQueue <- c + } else { + // There are no more subscribers. Either the campaign status + // has changed or all subscribers have been processed. + if err := r.exhaustCampaign(c, ""); err != nil { + r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err) + } + } } + } // SpawnWorkers spawns workers goroutines that push out messages. func (r *Runner) SpawnWorkers() { for i := 0; i < r.cfg.Concurrency; i++ { - go func(ch chan *Message) { - for { - select { - case m := <-ch: - err := r.messengers[m.Campaign.MessengerID].Push( - m.Campaign.FromEmail, - m.Subscriber.Email, - m.Campaign.Subject, - m.Body) - if err != nil { - r.logger.Printf("error pushing message: %v", err) + go func() { + for m := range r.msgQueue { + if !r.isCampaignProcessing(m.Campaign.ID) { + continue + } + + err := r.messengers[m.Campaign.MessengerID].Push( + m.from, + m.to, + m.Campaign.Subject, + m.Body) + if err != nil { + r.logger.Printf("error sending message in campaign %s: %v", + m.Campaign.Name, err) + + select { + case r.msgErrorQueue <- msgError{camp: m.Campaign, err: err}: + default: } } } - }(r.msgQueue) + }() } } @@ -203,7 +248,7 @@ func (r *Runner) SpawnWorkers() { func (r *Runner) addCampaign(c *models.Campaign) error { // Validate messenger. if _, ok := r.messengers[c.MessengerID]; !ok { - r.src.CancelCampaign(c.ID) + r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled) return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name) } @@ -252,23 +297,44 @@ func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error continue } - // Send the message. + // Push the message to the queue while blocking and waiting until + // the queue is drained. r.msgQueue <- m } return true, nil } -func (r *Runner) processExhaustedCampaign(c *models.Campaign) error { +// isCampaignProcessing checks if the campaign is bing processed. +func (r *Runner) isCampaignProcessing(id int) bool { + _, ok := r.camps[id] + return ok +} + +func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error { + delete(r.camps, c.ID) + + // A status has been passed. Change the campaign's status + // without further checks. + if status != "" { + if err := r.src.UpdateCampaignStatus(c.ID, status); err != nil { + r.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err) + } else { + r.logger.Printf("set campaign (%s) to %s", c.Name, status) + } + + return nil + } + + // Fetch the up-to-date campaign status from the source. cm, err := r.src.GetCampaign(c.ID) if err != nil { return err } // If a running campaign has exhausted subscribers, it's finished. - // Otherwise, it's paused or cancelled. if cm.Status == models.CampaignStatusRunning { - if err := r.src.FinishCampaign(c.ID); err != nil { + if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil { r.logger.Printf("error finishing campaign (%s): %v", c.Name, err) } else { r.logger.Printf("campaign (%s) finished", c.Name) @@ -277,7 +343,6 @@ func (r *Runner) processExhaustedCampaign(c *models.Campaign) error { r.logger.Printf("stop processing campaign (%s)", c.Name) } - delete(r.camps, c.ID) return nil } diff --git a/runner_db.go b/runner_db.go index 12bd0f1..1b5aa82 100644 --- a/runner_db.go +++ b/runner_db.go @@ -42,21 +42,9 @@ func (r *runnerDB) GetCampaign(campID int) (*models.Campaign, error) { return out, err } -// PauseCampaign marks a campaign as paused. -func (r *runnerDB) PauseCampaign(campID int) error { - _, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusPaused) - return err -} - -// CancelCampaign marks a campaign as cancelled. -func (r *runnerDB) CancelCampaign(campID int) error { - _, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusCancelled) - return err -} - -// FinishCampaign marks a campaign as finished. -func (r *runnerDB) FinishCampaign(campID int) error { - _, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusFinished) +// UpdateCampaignStatus updates a campaign's status. +func (r *runnerDB) UpdateCampaignStatus(campID int, status string) error { + _, err := r.queries.UpdateCampaignStatus.Exec(campID, status) return err }