diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 1ddcc14..0c6e7b2 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -181,65 +181,19 @@ func (m *Manager) HasMessenger(id string) bool { return ok } -// Run is a blocking function (and hence should be invoked as a goroutine) -// that scans the source db at regular intervals for pending campaigns, +// Run is a blocking function (that should be invoked as a goroutine) +// that scans the data source at regular intervals for pending campaigns, // and queues them for processing. The process queue fetches batches of // subscribers and pushes messages to them for each queued campaign // until all subscribers are exhausted, at which point, a campaign is marked // as "finished". func (m *Manager) Run(tick time.Duration) { - go func() { - t := time.NewTicker(tick) - for { - select { - // Periodically scan the data source for campaigns to process. - case <-t.C: - campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs()) - if err != nil { - m.logger.Printf("error fetching campaigns: %v", err) - continue - } + go m.scanCampaigns(tick) - for _, c := range campaigns { - if err := m.addCampaign(c); err != nil { - m.logger.Printf("error processing campaign (%s): %v", c.Name, err) - continue - } - m.logger.Printf("start processing campaign (%s)", c.Name) - - // If subscriber processing is busy, move on. Blocking and waiting - // can end up in a race condition where the waiting campaign's - // state in the data source has changed. - select { - case m.subFetchQueue <- c: - default: - } - } - - // Aggregate errors from sending messages to check against the error threshold - // after which a campaign is paused. - case e := <-m.campMsgErrorQueue: - if m.cfg.MaxSendErrors < 1 { - continue - } - - // If the error threshold is met, pause the campaign. - m.campMsgErrorCounts[e.camp.ID]++ - if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors { - m.logger.Printf("error counted exceeded %d. pausing campaign %s", - m.cfg.MaxSendErrors, e.camp.Name) - - if m.isCampaignProcessing(e.camp.ID) { - m.exhaustCampaign(e.camp, models.CampaignStatusPaused) - } - delete(m.campMsgErrorCounts, e.camp.ID) - - // Notify admins. - m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors") - } - } - } - }() + // Spawn N message workers. + for i := 0; i < m.cfg.Concurrency; i++ { + go m.messageWorker() + } // Fetch the next set of subscribers for a campaign and process them. for c := range m.subFetchQueue { @@ -265,45 +219,41 @@ func (m *Manager) Run(tick time.Duration) { } } -// SpawnWorkers spawns workers goroutines that push out messages. -func (m *Manager) SpawnWorkers() { - for i := 0; i < m.cfg.Concurrency; i++ { - go func() { - // Counter to keep track of the message / sec rate limit. - numMsg := 0 +// messageWorker is a blocking function that listens to the message queue +// and pushes out incoming messages on it to the messenger. +func (m *Manager) messageWorker() { + // Counter to keep track of the message / sec rate limit. + numMsg := 0 + for { + select { + // Campaign message. + case msg := <-m.campMsgQueue: + // Pause on hitting the message rate. + if numMsg >= m.cfg.MessageRate { + time.Sleep(time.Second) + numMsg = 0 + } + numMsg++ + + err := m.messengers[msg.Campaign.MessengerID].Push( + msg.from, []string{msg.to}, msg.subject, msg.body, nil) + if err != nil { + m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err) - for { select { - // Campaign message. - case msg := <-m.campMsgQueue: - // Pause on hitting the message rate. - if numMsg >= m.cfg.MessageRate { - time.Sleep(time.Second) - numMsg = 0 - } - numMsg++ - - err := m.messengers[msg.Campaign.MessengerID].Push( - msg.from, []string{msg.to}, msg.subject, msg.body, nil) - if err != nil { - m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err) - - select { - case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: - default: - } - } - - // Arbitrary message. - case msg := <-m.msgQueue: - err := m.messengers[msg.Messenger].Push( - msg.From, msg.To, msg.Subject, msg.Body, nil) - if err != nil { - m.logger.Printf("error sending message '%s': %v", msg.Subject, err) - } + case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: + default: } } - }() + + // Arbitrary message. + case msg := <-m.msgQueue: + err := m.messengers[msg.Messenger].Push( + msg.From, msg.To, msg.Subject, msg.Body, nil) + if err != nil { + m.logger.Printf("error sending message '%s': %v", msg.Subject, err) + } + } } } @@ -338,6 +288,61 @@ func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap { } } +// scanCampaigns is a blocking function that periodically scans the data source +// for campaigns to process and dispatches them to the manager. +func (m *Manager) scanCampaigns(tick time.Duration) { + t := time.NewTicker(tick) + for { + select { + // Periodically scan the data source for campaigns to process. + case <-t.C: + campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs()) + if err != nil { + m.logger.Printf("error fetching campaigns: %v", err) + continue + } + + for _, c := range campaigns { + if err := m.addCampaign(c); err != nil { + m.logger.Printf("error processing campaign (%s): %v", c.Name, err) + continue + } + m.logger.Printf("start processing campaign (%s)", c.Name) + + // If subscriber processing is busy, move on. Blocking and waiting + // can end up in a race condition where the waiting campaign's + // state in the data source has changed. + select { + case m.subFetchQueue <- c: + default: + } + } + + // Aggregate errors from sending messages to check against the error threshold + // after which a campaign is paused. + case e := <-m.campMsgErrorQueue: + if m.cfg.MaxSendErrors < 1 { + continue + } + + // If the error threshold is met, pause the campaign. + m.campMsgErrorCounts[e.camp.ID]++ + if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors { + m.logger.Printf("error counted exceeded %d. pausing campaign %s", + m.cfg.MaxSendErrors, e.camp.Name) + + if m.isCampaignProcessing(e.camp.ID) { + m.exhaustCampaign(e.camp, models.CampaignStatusPaused) + } + delete(m.campMsgErrorCounts, e.camp.ID) + + // Notify admins. + m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors") + } + } + } +} + // addCampaign adds a campaign to the process queue. func (m *Manager) addCampaign(c *models.Campaign) error { // Validate messenger. diff --git a/main.go b/main.go index 5d86fcd..03ec0c8 100644 --- a/main.go +++ b/main.go @@ -142,7 +142,6 @@ func main() { // Start the campaign workers. The campaign batches (fetch from DB, push out // messages) get processed at the specified interval. go app.manager.Run(time.Second * 5) - app.manager.SpawnWorkers() // Start and run the app server. initHTTPServer(app)