From 894ede561a45a5e8218613e0c68e1e557f665b6f Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Thu, 2 Apr 2020 18:22:27 +0530 Subject: [PATCH] Add support for message throughput rate limiting --- config.toml.sample | 15 +++++++++++---- init.go | 9 +++++++++ internal/manager/manager.go | 11 +++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/config.toml.sample b/config.toml.sample index fb0bd2f..5172914 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -25,10 +25,17 @@ from_email = "listmonk " notify_emails = ["admin1@mysite.com", "admin2@mysite.com"] # Maximum concurrent workers that will attempt to send messages -# simultaneously. This should depend on the number of CPUs the -# machine has and also the number of simultaenous e-mails the -# mail server will -concurrency = 100 +# simultaneously. This should ideally depend on the number of CPUs +# available, and should be based on the maximum number of messages +# a target SMTP server will accept. +concurrency = 5 + +# Maximum number of messages to be sent out per second per worker. +# If concurrency = 10 and message_rate = 10, then up to 10x10=100 messages +# may be pushed out every second. This, along with concurrency, should be +# tweaked to keep the net messages going out per second under the target +# SMTP's rate limits, if any. +message_rate = 5 # The number of errors (eg: SMTP timeouts while e-mailing) a running # campaign should tolerate before it is paused for manual diff --git a/init.go b/init.go index 9c4754a..b85d14f 100644 --- a/init.go +++ b/init.go @@ -176,8 +176,17 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager { campNotifCB := func(subject string, data interface{}) error { return app.sendNotification(cs.NotifyEmails, subject, notifTplCampaign, data) } + + if ko.Int("app.concurrency") < 1 { + log.Fatal("app.concurrency should be at least 1") + } + if ko.Int("app.message_rate") < 1 { + log.Fatal("app.message_rate should be at least 1") + } + return manager.New(manager.Config{ Concurrency: ko.Int("app.concurrency"), + MessageRate: ko.Int("app.message_rate"), MaxSendErrors: ko.Int("app.max_send_errors"), FromEmail: cs.FromEmail, UnsubURL: cs.UnsubURL, diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 930ac7a..499feab 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -84,6 +84,7 @@ type Message struct { // Config has parameters for configuring the manager. type Config struct { Concurrency int + MessageRate int MaxSendErrors int RequeueOnError bool FromEmail string @@ -254,6 +255,9 @@ func (m *Manager) Run(tick time.Duration) { func (m *Manager) SpawnWorkers() { for i := 0; i < m.cfg.Concurrency; i++ { go func() { + // Counter to keep track of the message / sec rate limit. + numMsg := 0 + for { select { // Campaign message. @@ -262,6 +266,13 @@ func (m *Manager) SpawnWorkers() { continue } + // Pause on hitting the message rate. + if numMsg >= m.cfg.MessageRate { + time.Sleep(time.Second) + numMsg = 0 + } + numMsg++ + err := m.messengers[msg.Campaign.MessengerID].Push( msg.from, []string{msg.to}, msg.Campaign.Subject, msg.body, nil) if err != nil {