Add support for campaign error tracking and auto-pause.

When a campaign exceeds N number of message send errors, for instance
SMTP errors, it is now auto-paused until there is manual intervention.

For this, the master goroutine in runner.Run() that was synchronising
between the tick based DB scanner and subscriber fetching has been
split into two. A new queue aggregates send errors from workers
again a threshold after which the campaign is paused.
This commit is contained in:
Kailash Nadh 2018-11-26 16:40:51 +05:30
parent 0577367d4f
commit 1a39ed15ec
5 changed files with 138 additions and 85 deletions

View File

@ -1,4 +1,3 @@
# Application.
[app] [app]
# Interface and port where the app will run its webserver. # Interface and port where the app will run its webserver.
address = "0.0.0.0:9000" address = "0.0.0.0:9000"
@ -27,6 +26,7 @@ asset_path = "frontend/my/build"
# mail server will # mail server will
concurrency = 100 concurrency = 100
# Database. # Database.
[db] [db]
host = "localhost" host = "localhost"

View File

@ -3,13 +3,12 @@ import random
f = open("/tmp/subs.csv", "w+") f = open("/tmp/subs.csv", "w+")
w = csv.writer(f) w = csv.writer(f)
w.writerow(["email", "name", "status", "tags", "attributes"]) w.writerow(["email", "name", "status", "attributes"])
for n in range(0, 100000): for n in range(0, 100000):
w.writerow([ w.writerow([
"user%d@mail.com" % (n,), "user%d@mail.com" % (n,),
"First%d Last%d" % (n, n), "First%d Last%d" % (n, n),
"enabled", "enabled",
"apple|mango|orange",
"{\"age\": %d, \"city\": \"Bangalore\"}" % (random.randint(20,70),) "{\"age\": %d, \"city\": \"Bangalore\"}" % (random.randint(20,70),)
]) ])

View File

@ -226,7 +226,8 @@ func main() {
// Campaign daemon. // Campaign daemon.
r := runner.New(runner.Config{ r := runner.New(runner.Config{
Concurrency: viper.GetInt("app.concurrency"), Concurrency: viper.GetInt("app.concurrency"),
MaxSendErrors: viper.GetInt("app.max_send_errors"),
// url.com/unsubscribe/{campaign_uuid}/{subscriber_uuid} // url.com/unsubscribe/{campaign_uuid}/{subscriber_uuid}
UnsubscribeURL: fmt.Sprintf("%s/unsubscribe/%%s/%%s", app.Constants.RootURL), UnsubscribeURL: fmt.Sprintf("%s/unsubscribe/%%s/%%s", app.Constants.RootURL),
@ -242,7 +243,7 @@ func main() {
// Add messengers. // Add messengers.
app.Messenger = initMessengers(app.Runner) app.Messenger = initMessengers(app.Runner)
go r.Run(time.Duration(time.Second * 2)) go r.Run(time.Duration(time.Second * 5))
r.SpawnWorkers() r.SpawnWorkers()
// Initialize the server. // Initialize the server.

View File

@ -28,9 +28,7 @@ type DataSource interface {
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
NextSubscribers(campID, limit int) ([]*models.Subscriber, error) NextSubscribers(campID, limit int) ([]*models.Subscriber, error)
GetCampaign(campID int) (*models.Campaign, error) GetCampaign(campID int) (*models.Campaign, error)
PauseCampaign(campID int) error UpdateCampaignStatus(campID int, status string) error
CancelCampaign(campID int) error
FinishCampaign(campID int) error
CreateLink(url string) (string, error) CreateLink(url string) (string, error)
} }
@ -51,8 +49,10 @@ type Runner struct {
links map[string]string links map[string]string
linksMutex sync.RWMutex linksMutex sync.RWMutex
msgQueue chan *Message subFetchQueue chan *models.Campaign
subFetchQueue chan *models.Campaign msgQueue chan *Message
msgErrorQueue chan msgError
msgErrorCounts map[int]int
} }
// Message represents an active subscriber that's being processed. // Message represents an active subscriber that's being processed.
@ -61,28 +61,38 @@ type Message struct {
Subscriber *models.Subscriber Subscriber *models.Subscriber
UnsubscribeURL string UnsubscribeURL string
Body []byte Body []byte
from string
to string to string
} }
// Config has parameters for configuring the runner. // Config has parameters for configuring the runner.
type Config struct { type Config struct {
Concurrency int Concurrency int
MaxSendErrors int
RequeueOnError bool
LinkTrackURL string LinkTrackURL string
UnsubscribeURL string UnsubscribeURL string
ViewTrackURL string ViewTrackURL string
} }
type msgError struct {
camp *models.Campaign
err error
}
// New returns a new instance of Mailer. // New returns a new instance of Mailer.
func New(cfg Config, src DataSource, l *log.Logger) *Runner { func New(cfg Config, src DataSource, l *log.Logger) *Runner {
r := Runner{ r := Runner{
cfg: cfg, cfg: cfg,
messengers: make(map[string]messenger.Messenger), src: src,
src: src, logger: l,
camps: make(map[int]*models.Campaign, 0), messengers: make(map[string]messenger.Messenger),
links: make(map[string]string, 0), camps: make(map[int]*models.Campaign, 0),
logger: l, links: make(map[string]string, 0),
subFetchQueue: make(chan *models.Campaign, 100), subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
msgQueue: make(chan *Message, cfg.Concurrency), msgQueue: make(chan *Message, cfg.Concurrency),
msgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
msgErrorCounts: make(map[int]int),
} }
return &r return &r
@ -92,6 +102,7 @@ func New(cfg Config, src DataSource, l *log.Logger) *Runner {
// to message templates while they're compiled. // to message templates while they're compiled.
func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message { func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message {
return &Message{ return &Message{
from: c.FromEmail,
to: s.Email, to: s.Email,
Campaign: c, Campaign: c,
Subscriber: s, Subscriber: s,
@ -133,69 +144,103 @@ func (r *Runner) HasMessenger(id string) bool {
// until all subscribers are exhausted, at which point, a campaign is marked // until all subscribers are exhausted, at which point, a campaign is marked
// as "finished". // as "finished".
func (r *Runner) Run(tick time.Duration) { func (r *Runner) Run(tick time.Duration) {
var ( go func() {
tScanCampaigns = time.NewTicker(tick) t := time.NewTicker(tick)
) for {
select {
for { // Periodically scan the data source for campaigns to process.
select { case <-t.C:
// Fetch all 'running campaigns that aren't being processed. campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs())
case <-tScanCampaigns.C: if err != nil {
campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs()) r.logger.Printf("error fetching campaigns: %v", err)
if err != nil {
r.logger.Printf("error fetching campaigns: %v", err)
return
}
for _, c := range campaigns {
if err := r.addCampaign(c); err != nil {
r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
continue continue
} }
r.logger.Printf("start processing campaign (%s)", c.Name) for _, c := range campaigns {
r.subFetchQueue <- c if err := r.addCampaign(c); err != nil {
} r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
continue
}
r.logger.Printf("start processing campaign (%s)", c.Name)
// Fetch next set of subscribers for the incoming campaign ID // If subscriber processing is busy, move on. Blocking and waiting
// and process them. // can end up in a race condition where the waiting campaign's
case c := <-r.subFetchQueue: // state in the data source has changed.
has, err := r.nextSubscribers(c, batchSize) select {
if err != nil { case r.subFetchQueue <- c:
r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err) default:
} }
}
if has { // Aggregate errors from sending messages to check against the error threshold
// There are more subscribers to fetch. // after which a campaign is paused.
r.subFetchQueue <- c case e := <-r.msgErrorQueue:
} else { if r.cfg.MaxSendErrors < 1 {
// No subscribers. continue
if err := r.processExhaustedCampaign(c); err != nil { }
r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
// If the error threshold is met, pause the campaign.
r.msgErrorCounts[e.camp.ID]++
if r.msgErrorCounts[e.camp.ID] >= r.cfg.MaxSendErrors {
r.logger.Printf("error counted exceeded %d. pausing campaign %s",
r.cfg.MaxSendErrors, e.camp.Name)
if r.isCampaignProcessing(e.camp.ID) {
r.exhaustCampaign(e.camp, models.CampaignStatusPaused)
}
delete(r.msgErrorCounts, e.camp.ID)
} }
} }
} }
}()
// Fetch the next set of subscribers for a campaign and process them.
for c := range r.subFetchQueue {
has, err := r.nextSubscribers(c, batchSize)
if err != nil {
r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
continue
}
if has {
// There are more subscribers to fetch.
r.subFetchQueue <- c
} else {
// There are no more subscribers. Either the campaign status
// has changed or all subscribers have been processed.
if err := r.exhaustCampaign(c, ""); err != nil {
r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
}
}
} }
} }
// SpawnWorkers spawns workers goroutines that push out messages. // SpawnWorkers spawns workers goroutines that push out messages.
func (r *Runner) SpawnWorkers() { func (r *Runner) SpawnWorkers() {
for i := 0; i < r.cfg.Concurrency; i++ { for i := 0; i < r.cfg.Concurrency; i++ {
go func(ch chan *Message) { go func() {
for { for m := range r.msgQueue {
select { if !r.isCampaignProcessing(m.Campaign.ID) {
case m := <-ch: continue
err := r.messengers[m.Campaign.MessengerID].Push( }
m.Campaign.FromEmail,
m.Subscriber.Email, err := r.messengers[m.Campaign.MessengerID].Push(
m.Campaign.Subject, m.from,
m.Body) m.to,
if err != nil { m.Campaign.Subject,
r.logger.Printf("error pushing message: %v", err) m.Body)
if err != nil {
r.logger.Printf("error sending message in campaign %s: %v",
m.Campaign.Name, err)
select {
case r.msgErrorQueue <- msgError{camp: m.Campaign, err: err}:
default:
} }
} }
} }
}(r.msgQueue) }()
} }
} }
@ -203,7 +248,7 @@ func (r *Runner) SpawnWorkers() {
func (r *Runner) addCampaign(c *models.Campaign) error { func (r *Runner) addCampaign(c *models.Campaign) error {
// Validate messenger. // Validate messenger.
if _, ok := r.messengers[c.MessengerID]; !ok { if _, ok := r.messengers[c.MessengerID]; !ok {
r.src.CancelCampaign(c.ID) r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name) return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
} }
@ -252,23 +297,44 @@ func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error
continue continue
} }
// Send the message. // Push the message to the queue while blocking and waiting until
// the queue is drained.
r.msgQueue <- m r.msgQueue <- m
} }
return true, nil return true, nil
} }
func (r *Runner) processExhaustedCampaign(c *models.Campaign) error { // isCampaignProcessing checks if the campaign is bing processed.
func (r *Runner) isCampaignProcessing(id int) bool {
_, ok := r.camps[id]
return ok
}
func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error {
delete(r.camps, c.ID)
// A status has been passed. Change the campaign's status
// without further checks.
if status != "" {
if err := r.src.UpdateCampaignStatus(c.ID, status); err != nil {
r.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
} else {
r.logger.Printf("set campaign (%s) to %s", c.Name, status)
}
return nil
}
// Fetch the up-to-date campaign status from the source.
cm, err := r.src.GetCampaign(c.ID) cm, err := r.src.GetCampaign(c.ID)
if err != nil { if err != nil {
return err return err
} }
// If a running campaign has exhausted subscribers, it's finished. // If a running campaign has exhausted subscribers, it's finished.
// Otherwise, it's paused or cancelled.
if cm.Status == models.CampaignStatusRunning { if cm.Status == models.CampaignStatusRunning {
if err := r.src.FinishCampaign(c.ID); err != nil { if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
r.logger.Printf("error finishing campaign (%s): %v", c.Name, err) r.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
} else { } else {
r.logger.Printf("campaign (%s) finished", c.Name) r.logger.Printf("campaign (%s) finished", c.Name)
@ -277,7 +343,6 @@ func (r *Runner) processExhaustedCampaign(c *models.Campaign) error {
r.logger.Printf("stop processing campaign (%s)", c.Name) r.logger.Printf("stop processing campaign (%s)", c.Name)
} }
delete(r.camps, c.ID)
return nil return nil
} }

View File

@ -42,21 +42,9 @@ func (r *runnerDB) GetCampaign(campID int) (*models.Campaign, error) {
return out, err return out, err
} }
// PauseCampaign marks a campaign as paused. // UpdateCampaignStatus updates a campaign's status.
func (r *runnerDB) PauseCampaign(campID int) error { func (r *runnerDB) UpdateCampaignStatus(campID int, status string) error {
_, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusPaused) _, err := r.queries.UpdateCampaignStatus.Exec(campID, status)
return err
}
// CancelCampaign marks a campaign as cancelled.
func (r *runnerDB) CancelCampaign(campID int) error {
_, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusCancelled)
return err
}
// FinishCampaign marks a campaign as finished.
func (r *runnerDB) FinishCampaign(campID int) error {
_, err := r.queries.UpdateCampaignStatus.Exec(campID, models.CampaignStatusFinished)
return err return err
} }