Add support for message throughput rate limiting

This commit is contained in:
Kailash Nadh 2020-04-02 18:22:27 +05:30
parent 21ffe5fb02
commit 894ede561a
3 changed files with 31 additions and 4 deletions

View File

@ -25,10 +25,17 @@ from_email = "listmonk <from@mail.com>"
notify_emails = ["admin1@mysite.com", "admin2@mysite.com"] notify_emails = ["admin1@mysite.com", "admin2@mysite.com"]
# Maximum concurrent workers that will attempt to send messages # Maximum concurrent workers that will attempt to send messages
# simultaneously. This should depend on the number of CPUs the # simultaneously. This should ideally depend on the number of CPUs
# machine has and also the number of simultaenous e-mails the # available, and should be based on the maximum number of messages
# mail server will # a target SMTP server will accept.
concurrency = 100 concurrency = 5
# Maximum number of messages to be sent out per second per worker.
# If concurrency = 10 and message_rate = 10, then up to 10x10=100 messages
# may be pushed out every second. This, along with concurrency, should be
# tweaked to keep the net messages going out per second under the target
# SMTP's rate limits, if any.
message_rate = 5
# The number of errors (eg: SMTP timeouts while e-mailing) a running # The number of errors (eg: SMTP timeouts while e-mailing) a running
# campaign should tolerate before it is paused for manual # campaign should tolerate before it is paused for manual

View File

@ -176,8 +176,17 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
campNotifCB := func(subject string, data interface{}) error { campNotifCB := func(subject string, data interface{}) error {
return app.sendNotification(cs.NotifyEmails, subject, notifTplCampaign, data) return app.sendNotification(cs.NotifyEmails, subject, notifTplCampaign, data)
} }
if ko.Int("app.concurrency") < 1 {
log.Fatal("app.concurrency should be at least 1")
}
if ko.Int("app.message_rate") < 1 {
log.Fatal("app.message_rate should be at least 1")
}
return manager.New(manager.Config{ return manager.New(manager.Config{
Concurrency: ko.Int("app.concurrency"), Concurrency: ko.Int("app.concurrency"),
MessageRate: ko.Int("app.message_rate"),
MaxSendErrors: ko.Int("app.max_send_errors"), MaxSendErrors: ko.Int("app.max_send_errors"),
FromEmail: cs.FromEmail, FromEmail: cs.FromEmail,
UnsubURL: cs.UnsubURL, UnsubURL: cs.UnsubURL,

View File

@ -84,6 +84,7 @@ type Message struct {
// Config has parameters for configuring the manager. // Config has parameters for configuring the manager.
type Config struct { type Config struct {
Concurrency int Concurrency int
MessageRate int
MaxSendErrors int MaxSendErrors int
RequeueOnError bool RequeueOnError bool
FromEmail string FromEmail string
@ -254,6 +255,9 @@ func (m *Manager) Run(tick time.Duration) {
func (m *Manager) SpawnWorkers() { func (m *Manager) SpawnWorkers() {
for i := 0; i < m.cfg.Concurrency; i++ { for i := 0; i < m.cfg.Concurrency; i++ {
go func() { go func() {
// Counter to keep track of the message / sec rate limit.
numMsg := 0
for { for {
select { select {
// Campaign message. // Campaign message.
@ -262,6 +266,13 @@ func (m *Manager) SpawnWorkers() {
continue continue
} }
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)
numMsg = 0
}
numMsg++
err := m.messengers[msg.Campaign.MessengerID].Push( err := m.messengers[msg.Campaign.MessengerID].Push(
msg.from, []string{msg.to}, msg.Campaign.Subject, msg.body, nil) msg.from, []string{msg.to}, msg.Campaign.Subject, msg.body, nil)
if err != nil { if err != nil {