diff --git a/config.toml.sample b/config.toml.sample index b6607fb..d5ad4ab 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -42,6 +42,11 @@ message_rate = 5 # investigation or intervention. Set to 0 to never pause. max_send_errors = 1000 +# The number of subscribers to pull from the databse in a single iteration. +# Each iteration pulls subscribers from the database, sends messages to them, +# and then moves on to the next iteration to pull the next batch. +# This should ideally be higher than the maximum achievable throughput (concurrency * message_rate) +batch_size = 1000 [privacy] # Allow subscribers to unsubscribe from all mailing lists and mark themselves diff --git a/init.go b/init.go index 4e6f926..4c8fb83 100644 --- a/init.go +++ b/init.go @@ -195,6 +195,7 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager { } return manager.New(manager.Config{ + BatchSize: ko.Int("app.batch_size"), Concurrency: ko.Int("app.concurrency"), MessageRate: ko.Int("app.message_rate"), MaxSendErrors: ko.Int("app.max_send_errors"), diff --git a/internal/manager/manager.go b/internal/manager/manager.go index e9804f3..1ddcc14 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -15,8 +15,6 @@ import ( ) const ( - batchSize = 10000 - // BaseTPL is the name of the base template. BaseTPL = "base" @@ -84,6 +82,9 @@ type Message struct { // Config has parameters for configuring the manager. type Config struct { + // Number of subscribers to pull from the DB in a single iteration. + BatchSize int + Concurrency int MessageRate int MaxSendErrors int @@ -103,6 +104,16 @@ type msgError struct { // New returns a new instance of Mailer. func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager { + if cfg.BatchSize < 1 { + cfg.BatchSize = 1000 + } + if cfg.Concurrency < 1 { + cfg.Concurrency = 1 + } + if cfg.MessageRate < 1 { + cfg.MessageRate = 1 + } + return &Manager{ cfg: cfg, src: src, @@ -232,7 +243,7 @@ func (m *Manager) Run(tick time.Duration) { // Fetch the next set of subscribers for a campaign and process them. for c := range m.subFetchQueue { - has, err := m.nextSubscribers(c, batchSize) + has, err := m.nextSubscribers(c, m.cfg.BatchSize) if err != nil { m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err) continue