Minor refactor to campaign manager.

- Remove external invocation of worker goroutines into Run()
- Split Run() into smaller functions.
This commit is contained in:
Kailash Nadh 2020-07-06 20:22:57 +05:30
parent 3e755596c7
commit fd044f4cb6
2 changed files with 93 additions and 89 deletions

View File

@ -181,65 +181,19 @@ func (m *Manager) HasMessenger(id string) bool {
return ok return ok
} }
// Run is a blocking function (and hence should be invoked as a goroutine) // Run is a blocking function (that should be invoked as a goroutine)
// that scans the source db at regular intervals for pending campaigns, // that scans the data source at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of // and queues them for processing. The process queue fetches batches of
// subscribers and pushes messages to them for each queued campaign // subscribers and pushes messages to them for each queued campaign
// until all subscribers are exhausted, at which point, a campaign is marked // until all subscribers are exhausted, at which point, a campaign is marked
// as "finished". // as "finished".
func (m *Manager) Run(tick time.Duration) { func (m *Manager) Run(tick time.Duration) {
go func() { go m.scanCampaigns(tick)
t := time.NewTicker(tick)
for {
select {
// Periodically scan the data source for campaigns to process.
case <-t.C:
campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs())
if err != nil {
m.logger.Printf("error fetching campaigns: %v", err)
continue
}
for _, c := range campaigns { // Spawn N message workers.
if err := m.addCampaign(c); err != nil { for i := 0; i < m.cfg.Concurrency; i++ {
m.logger.Printf("error processing campaign (%s): %v", c.Name, err) go m.messageWorker()
continue }
}
m.logger.Printf("start processing campaign (%s)", c.Name)
// If subscriber processing is busy, move on. Blocking and waiting
// can end up in a race condition where the waiting campaign's
// state in the data source has changed.
select {
case m.subFetchQueue <- c:
default:
}
}
// Aggregate errors from sending messages to check against the error threshold
// after which a campaign is paused.
case e := <-m.campMsgErrorQueue:
if m.cfg.MaxSendErrors < 1 {
continue
}
// If the error threshold is met, pause the campaign.
m.campMsgErrorCounts[e.camp.ID]++
if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
m.logger.Printf("error counted exceeded %d. pausing campaign %s",
m.cfg.MaxSendErrors, e.camp.Name)
if m.isCampaignProcessing(e.camp.ID) {
m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
}
delete(m.campMsgErrorCounts, e.camp.ID)
// Notify admins.
m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
}
}
}
}()
// Fetch the next set of subscribers for a campaign and process them. // Fetch the next set of subscribers for a campaign and process them.
for c := range m.subFetchQueue { for c := range m.subFetchQueue {
@ -265,45 +219,41 @@ func (m *Manager) Run(tick time.Duration) {
} }
} }
// SpawnWorkers spawns workers goroutines that push out messages. // messageWorker is a blocking function that listens to the message queue
func (m *Manager) SpawnWorkers() { // and pushes out incoming messages on it to the messenger.
for i := 0; i < m.cfg.Concurrency; i++ { func (m *Manager) messageWorker() {
go func() { // Counter to keep track of the message / sec rate limit.
// Counter to keep track of the message / sec rate limit. numMsg := 0
numMsg := 0 for {
select {
// Campaign message.
case msg := <-m.campMsgQueue:
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)
numMsg = 0
}
numMsg++
err := m.messengers[msg.Campaign.MessengerID].Push(
msg.from, []string{msg.to}, msg.subject, msg.body, nil)
if err != nil {
m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err)
for {
select { select {
// Campaign message. case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
case msg := <-m.campMsgQueue: default:
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)
numMsg = 0
}
numMsg++
err := m.messengers[msg.Campaign.MessengerID].Push(
msg.from, []string{msg.to}, msg.subject, msg.body, nil)
if err != nil {
m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err)
select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
default:
}
}
// Arbitrary message.
case msg := <-m.msgQueue:
err := m.messengers[msg.Messenger].Push(
msg.From, msg.To, msg.Subject, msg.Body, nil)
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}
} }
} }
}()
// Arbitrary message.
case msg := <-m.msgQueue:
err := m.messengers[msg.Messenger].Push(
msg.From, msg.To, msg.Subject, msg.Body, nil)
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}
}
} }
} }
@ -338,6 +288,61 @@ func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap {
} }
} }
// scanCampaigns is a blocking function that periodically scans the data source
// for campaigns to process and dispatches them to the manager.
func (m *Manager) scanCampaigns(tick time.Duration) {
t := time.NewTicker(tick)
for {
select {
// Periodically scan the data source for campaigns to process.
case <-t.C:
campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs())
if err != nil {
m.logger.Printf("error fetching campaigns: %v", err)
continue
}
for _, c := range campaigns {
if err := m.addCampaign(c); err != nil {
m.logger.Printf("error processing campaign (%s): %v", c.Name, err)
continue
}
m.logger.Printf("start processing campaign (%s)", c.Name)
// If subscriber processing is busy, move on. Blocking and waiting
// can end up in a race condition where the waiting campaign's
// state in the data source has changed.
select {
case m.subFetchQueue <- c:
default:
}
}
// Aggregate errors from sending messages to check against the error threshold
// after which a campaign is paused.
case e := <-m.campMsgErrorQueue:
if m.cfg.MaxSendErrors < 1 {
continue
}
// If the error threshold is met, pause the campaign.
m.campMsgErrorCounts[e.camp.ID]++
if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
m.logger.Printf("error counted exceeded %d. pausing campaign %s",
m.cfg.MaxSendErrors, e.camp.Name)
if m.isCampaignProcessing(e.camp.ID) {
m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
}
delete(m.campMsgErrorCounts, e.camp.ID)
// Notify admins.
m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
}
}
}
}
// addCampaign adds a campaign to the process queue. // addCampaign adds a campaign to the process queue.
func (m *Manager) addCampaign(c *models.Campaign) error { func (m *Manager) addCampaign(c *models.Campaign) error {
// Validate messenger. // Validate messenger.

View File

@ -142,7 +142,6 @@ func main() {
// Start the campaign workers. The campaign batches (fetch from DB, push out // Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval. // messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5) go app.manager.Run(time.Second * 5)
app.manager.SpawnWorkers()
// Start and run the app server. // Start and run the app server.
initHTTPServer(app) initHTTPServer(app)