Remove 'campaignDone?' check that incorrectly ignored queued

messages in a finishing campaign.
This commit is contained in:
Kailash Nadh 2020-04-25 15:54:25 +05:30
parent 0abc1ae05a
commit 6bba55f0eb
3 changed files with 868 additions and 4 deletions

View File

@ -262,10 +262,6 @@ func (m *Manager) SpawnWorkers() {
select {
// Campaign message.
case msg := <-m.campMsgQueue:
if !m.isCampaignProcessing(msg.Campaign.ID) {
continue
}
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)

710
static/queries.sql Normal file
View File

@ -0,0 +1,710 @@
-- subscribers
-- name: get-subscriber
-- Get a single subscriber by id or UUID.
SELECT * FROM subscribers WHERE CASE WHEN $1 > 0 THEN id = $1 ELSE uuid = $2 END;
-- name: subscriber-exists
-- Check if a subscriber exists by id or UUID.
SELECT exists (SELECT true FROM subscribers WHERE CASE WHEN $1 > 0 THEN id = $1 ELSE uuid = $2 END);
-- name: get-subscribers-by-emails
-- Get subscribers by emails.
SELECT * FROM subscribers WHERE email=ANY($1);
-- name: get-subscriber-lists
WITH sub AS (
SELECT id FROM subscribers WHERE CASE WHEN $1 > 0 THEN id = $1 ELSE uuid = $2 END
)
SELECT * FROM lists
LEFT JOIN subscriber_lists ON (lists.id = subscriber_lists.list_id)
WHERE subscriber_id = (SELECT id FROM sub)
-- Optional list IDs or UUIDs to filter.
AND (CASE WHEN $3::INT[] IS NOT NULL THEN id = ANY($3::INT[])
WHEN $4::UUID[] IS NOT NULL THEN uuid = ANY($4::UUID[])
ELSE TRUE
END)
AND (CASE WHEN $5 != '' THEN subscriber_lists.status = $5::subscription_status END)
AND (CASE WHEN $6 != '' THEN lists.optin = $6::list_optin ELSE TRUE END);
-- name: get-subscriber-lists-lazy
-- Get lists associations of subscribers given a list of subscriber IDs.
-- This query is used to lazy load given a list of subscriber IDs.
-- The query returns results in the same order as the given subscriber IDs, and for non-existent subscriber IDs,
-- the query still returns a row with 0 values. Thus, for lazy loading, the application simply iterate on the results in
-- the same order as the list of campaigns it would've queried and attach the results.
WITH subs AS (
SELECT subscriber_id, JSON_AGG(
ROW_TO_JSON(
(SELECT l FROM (SELECT subscriber_lists.status AS subscription_status, lists.*) l)
)
) AS lists FROM lists
LEFT JOIN subscriber_lists ON (subscriber_lists.list_id = lists.id)
WHERE subscriber_lists.subscriber_id = ANY($1)
GROUP BY subscriber_id
)
SELECT id as subscriber_id,
COALESCE(s.lists, '[]') AS lists
FROM (SELECT id FROM UNNEST($1) AS id) x
LEFT JOIN subs AS s ON (s.subscriber_id = id)
ORDER BY ARRAY_POSITION($1, id);
-- name: insert-subscriber
WITH sub AS (
INSERT INTO subscribers (uuid, email, name, status, attribs)
VALUES($1, $2, $3, $4, $5)
returning id
),
listIDs AS (
SELECT id FROM lists WHERE
(CASE WHEN ARRAY_LENGTH($6::INT[], 1) > 0 THEN id=ANY($6)
ELSE uuid=ANY($7::UUID[]) END)
),
subs AS (
INSERT INTO subscriber_lists (subscriber_id, list_id, status)
VALUES(
(SELECT id FROM sub),
UNNEST(ARRAY(SELECT id FROM listIDs)),
(CASE WHEN $4='blacklisted' THEN 'unsubscribed'::subscription_status ELSE 'unconfirmed' END)
)
ON CONFLICT (subscriber_id, list_id) DO UPDATE
SET updated_at=NOW()
)
SELECT id from sub;
-- name: upsert-subscriber
-- Upserts a subscriber where existing subscribers get their names and attributes overwritten.
-- The status field is only updated when $6 = 'override_status'.
WITH sub AS (
INSERT INTO subscribers (uuid, email, name, attribs)
VALUES($1, $2, $3, $4)
ON CONFLICT (email) DO UPDATE
SET name=$3,
attribs=$4,
updated_at=NOW()
RETURNING uuid, id
),
subs AS (
INSERT INTO subscriber_lists (subscriber_id, list_id)
VALUES((SELECT id FROM sub), UNNEST($5::INT[]))
ON CONFLICT (subscriber_id, list_id) DO UPDATE
SET updated_at=NOW()
)
SELECT uuid, id from sub;
-- name: upsert-blacklist-subscriber
-- Upserts a subscriber where the update will only set the status to blacklisted
-- unlike upsert-subscribers where name and attributes are updated. In addition, all
-- existing subscriptions are marked as 'unsubscribed'.
-- This is used in the bulk importer.
WITH sub AS (
INSERT INTO subscribers (uuid, email, name, attribs, status)
VALUES($1, $2, $3, $4, 'blacklisted')
ON CONFLICT (email) DO UPDATE SET status='blacklisted', updated_at=NOW()
RETURNING id
)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE subscriber_id = (SELECT id FROM sub);
-- name: update-subscriber
-- Updates a subscriber's data, and given a list of list_ids, inserts subscriptions
-- for them while deleting existing subscriptions not in the list.
WITH s AS (
UPDATE subscribers SET
email=(CASE WHEN $2 != '' THEN $2 ELSE email END),
name=(CASE WHEN $3 != '' THEN $3 ELSE name END),
status=(CASE WHEN $4 != '' THEN $4::subscriber_status ELSE status END),
attribs=(CASE WHEN $5::TEXT != '' THEN $5::JSONB ELSE attribs END),
updated_at=NOW()
WHERE id = $1 RETURNING id
),
d AS (
DELETE FROM subscriber_lists WHERE subscriber_id = $1 AND list_id != ALL($6)
)
INSERT INTO subscriber_lists (subscriber_id, list_id, status)
VALUES(
(SELECT id FROM s),
UNNEST($6),
(CASE WHEN $4='blacklisted' THEN 'unsubscribed'::subscription_status ELSE 'unconfirmed' END)
)
ON CONFLICT (subscriber_id, list_id) DO UPDATE
SET status = (CASE WHEN $4='blacklisted' THEN 'unsubscribed'::subscription_status ELSE 'unconfirmed' END);
-- name: delete-subscribers
-- Delete one or more subscribers by ID or UUID.
DELETE FROM subscribers WHERE CASE WHEN ARRAY_LENGTH($1::INT[], 1) > 0 THEN id = ANY($1) ELSE uuid = ANY($2::UUID[]) END;
-- name: blacklist-subscribers
WITH b AS (
UPDATE subscribers SET status='blacklisted', updated_at=NOW()
WHERE id = ANY($1::INT[])
)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE subscriber_id = ANY($1::INT[]);
-- name: add-subscribers-to-lists
INSERT INTO subscriber_lists (subscriber_id, list_id)
(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b)
ON CONFLICT (subscriber_id, list_id) DO NOTHING;
-- name: delete-subscriptions
DELETE FROM subscriber_lists
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b);
-- name: confirm-subscription-optin
WITH subID AS (
SELECT id FROM subscribers WHERE uuid = $1::UUID
),
listIDs AS (
SELECT id FROM lists WHERE uuid = ANY($2::UUID[])
)
UPDATE subscriber_lists SET status='confirmed', updated_at=NOW()
WHERE subscriber_id = (SELECT id FROM subID) AND list_id = ANY(SELECT id FROM listIDs);
-- name: unsubscribe-subscribers-from-lists
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b);
-- name: unsubscribe
-- Unsubscribes a subscriber given a campaign UUID (from all the lists in the campaign) and the subscriber UUID.
-- If $3 is TRUE, then all subscriptions of the subscriber is blacklisted
-- and all existing subscriptions, irrespective of lists, unsubscribed.
WITH lists AS (
SELECT list_id FROM campaign_lists
LEFT JOIN campaigns ON (campaign_lists.campaign_id = campaigns.id)
WHERE campaigns.uuid = $1
),
sub AS (
UPDATE subscribers SET status = (CASE WHEN $3 IS TRUE THEN 'blacklisted' ELSE status END)
WHERE uuid = $2 RETURNING id
)
UPDATE subscriber_lists SET status = 'unsubscribed' WHERE
subscriber_id = (SELECT id FROM sub) AND status != 'unsubscribed' AND
-- If $3 is false, unsubscribe from the campaign's lists, otherwise all lists.
CASE WHEN $3 IS FALSE THEN list_id = ANY(SELECT list_id FROM lists) ELSE list_id != 0 END;
-- privacy
-- name: export-subscriber-data
WITH prof AS (
SELECT id, uuid, email, name, attribs, status, created_at, updated_at FROM subscribers WHERE
CASE WHEN $1 > 0 THEN id = $1 ELSE uuid = $2 END
),
subs AS (
SELECT subscriber_lists.status AS subscription_status,
(CASE WHEN lists.type = 'private' THEN 'Private list' ELSE lists.name END) as name,
lists.type, subscriber_lists.created_at
FROM lists
LEFT JOIN subscriber_lists ON (subscriber_lists.list_id = lists.id)
WHERE subscriber_lists.subscriber_id = (SELECT id FROM prof)
),
views AS (
SELECT subject as campaign, COUNT(subscriber_id) as views FROM campaign_views
LEFT JOIN campaigns ON (campaigns.id = campaign_views.campaign_id)
WHERE subscriber_id = (SELECT id FROM prof)
GROUP BY campaigns.id ORDER BY id
),
clicks AS (
SELECT url, COUNT(subscriber_id) as clicks FROM link_clicks
LEFT JOIN links ON (links.id = link_clicks.link_id)
WHERE subscriber_id = (SELECT id FROM prof)
GROUP BY links.id ORDER BY id
)
SELECT (SELECT email FROM prof) as email,
COALESCE((SELECT JSON_AGG(t) FROM prof t), '{}') AS profile,
COALESCE((SELECT JSON_AGG(t) FROM subs t), '[]') AS subscriptions,
COALESCE((SELECT JSON_AGG(t) FROM views t), '[]') AS campaign_views,
COALESCE((SELECT JSON_AGG(t) FROM clicks t), '[]') AS link_clicks;
-- Partial and RAW queries used to construct arbitrary subscriber
-- queries for segmentation follow.
-- name: query-subscribers
-- raw: true
-- Unprepared statement for issuring arbitrary WHERE conditions for
-- searching subscribers. While the results are sliced using offset+limit,
-- there's a COUNT() OVER() that still returns the total result count
-- for pagination in the frontend, albeit being a field that'll repeat
-- with every resultant row.
SELECT COUNT(*) OVER () AS total, subscribers.* FROM subscribers
LEFT JOIN subscriber_lists
ON (
-- Optional list filtering.
(CASE WHEN CARDINALITY($1::INT[]) > 0 THEN true ELSE false END)
AND subscriber_lists.subscriber_id = subscribers.id
)
WHERE subscriber_lists.list_id = ALL($1::INT[])
%s
ORDER BY $2 DESC OFFSET $3 LIMIT $4;
-- name: query-subscribers-template
-- raw: true
-- This raw query is reused in multiple queries (blacklist, add to list, delete)
-- etc., so it's kept has a raw template to be injected into other raw queries,
-- and for the same reason, it is not terminated with a semicolon.
--
-- All queries that embed this query should expect
-- $1=true/false (dry-run or not) and $2=[]INT (option list IDs).
-- That is, their positional arguments should start from $3.
SELECT subscribers.id FROM subscribers
LEFT JOIN subscriber_lists
ON (
-- Optional list filtering.
(CASE WHEN CARDINALITY($2::INT[]) > 0 THEN true ELSE false END)
AND subscriber_lists.subscriber_id = subscribers.id
)
WHERE subscriber_lists.list_id = ALL($2::INT[]) %s
LIMIT (CASE WHEN $1 THEN 1 END)
-- name: delete-subscribers-by-query
-- raw: true
WITH subs AS (%s)
DELETE FROM subscribers WHERE id=ANY(SELECT id FROM subs);
-- name: blacklist-subscribers-by-query
-- raw: true
WITH subs AS (%s),
b AS (
UPDATE subscribers SET status='blacklisted', updated_at=NOW()
WHERE id = ANY(SELECT id FROM subs)
)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE subscriber_id = ANY(SELECT id FROM subs);
-- name: add-subscribers-to-lists-by-query
-- raw: true
WITH subs AS (%s)
INSERT INTO subscriber_lists (subscriber_id, list_id)
(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b)
ON CONFLICT (subscriber_id, list_id) DO NOTHING;
-- name: delete-subscriptions-by-query
-- raw: true
WITH subs AS (%s)
DELETE FROM subscriber_lists
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b);
-- name: unsubscribe-subscribers-from-lists-by-query
-- raw: true
WITH subs AS (%s)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b);
-- lists
-- name: get-lists
SELECT COUNT(*) OVER () AS total, lists.*, COUNT(subscriber_lists.subscriber_id) AS subscriber_count
FROM lists LEFT JOIN subscriber_lists
ON (subscriber_lists.list_id = lists.id AND subscriber_lists.status != 'unsubscribed')
WHERE ($1 = 0 OR id = $1)
GROUP BY lists.id ORDER BY lists.created_at OFFSET $2 LIMIT (CASE WHEN $3 = 0 THEN NULL ELSE $3 END);
-- name: get-lists-by-optin
-- Can have a list of IDs or a list of UUIDs.
SELECT * FROM lists WHERE (CASE WHEN $1 != '' THEN optin=$1::list_optin ELSE TRUE END) AND
(CASE WHEN $2::INT[] IS NOT NULL THEN id = ANY($2::INT[])
WHEN $3::UUID[] IS NOT NULL THEN uuid = ANY($3::UUID[])
END) ORDER BY name;
-- name: create-list
INSERT INTO lists (uuid, name, type, optin, tags) VALUES($1, $2, $3, $4, $5) RETURNING id;
-- name: update-list
UPDATE lists SET
name=(CASE WHEN $2 != '' THEN $2 ELSE name END),
type=(CASE WHEN $3 != '' THEN $3::list_type ELSE type END),
optin=(CASE WHEN $4 != '' THEN $4::list_optin ELSE optin END),
tags=(CASE WHEN ARRAY_LENGTH($5::VARCHAR(100)[], 1) > 0 THEN $5 ELSE tags END),
updated_at=NOW()
WHERE id = $1;
-- name: update-lists-date
UPDATE lists SET updated_at=NOW() WHERE id = ANY($1);
-- name: delete-lists
DELETE FROM lists WHERE id = ALL($1);
-- campaigns
-- name: create-campaign
-- This creates the campaign and inserts campaign_lists relationships.
WITH campLists AS (
-- Get the list_ids and their optin statuses for the campaigns found in the previous step.
SELECT id AS list_id, campaign_id, optin FROM lists
INNER JOIN campaign_lists ON (campaign_lists.list_id = lists.id)
WHERE id=ANY($12::INT[])
),
tpl AS (
-- If there's no template_id given, use the defualt template.
SELECT (CASE WHEN $11 = 0 THEN id ELSE $11 END) AS id FROM templates WHERE is_default IS TRUE
),
counts AS (
SELECT COALESCE(COUNT(id), 0) as to_send, COALESCE(MAX(id), 0) as max_sub_id
FROM subscribers
LEFT JOIN campLists ON (campLists.campaign_id = ANY($12::INT[]))
LEFT JOIN subscriber_lists ON (
subscriber_lists.status != 'unsubscribed' AND
subscribers.id = subscriber_lists.subscriber_id AND
subscriber_lists.list_id = campLists.list_id AND
-- For double opt-in lists, consider only 'confirmed' subscriptions. For single opt-ins,
-- any status except for 'unsubscribed' (already excluded above) works.
(CASE WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed' ELSE true END)
)
WHERE subscriber_lists.list_id=ANY($12::INT[])
AND subscribers.status='enabled'
),
camp AS (
INSERT INTO campaigns (uuid, type, name, subject, from_email, body, content_type, send_at, tags, messenger, template_id, to_send, max_subscriber_id)
SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, (SELECT id FROM tpl), (SELECT to_send FROM counts), (SELECT max_sub_id FROM counts)
RETURNING id
)
INSERT INTO campaign_lists (campaign_id, list_id, list_name)
(SELECT (SELECT id FROM camp), id, name FROM lists WHERE id=ANY($12::INT[]))
RETURNING (SELECT id FROM camp);
-- name: query-campaigns
-- Here, 'lists' is returned as an aggregated JSON array from campaign_lists because
-- the list reference may have been deleted.
-- While the results are sliced using offset+limit,
-- there's a COUNT() OVER() that still returns the total result count
-- for pagination in the frontend, albeit being a field that'll repeat
-- with every resultant row.
SELECT COUNT(*) OVER () AS total, campaigns.*, (
SELECT COALESCE(ARRAY_TO_JSON(ARRAY_AGG(l)), '[]') FROM (
SELECT COALESCE(campaign_lists.list_id, 0) AS id,
campaign_lists.list_name AS name
FROM campaign_lists WHERE campaign_lists.campaign_id = campaigns.id
) l
) AS lists
FROM campaigns
WHERE ($1 = 0 OR id = $1)
AND status=ANY(CASE WHEN ARRAY_LENGTH($2::campaign_status[], 1) != 0 THEN $2::campaign_status[] ELSE ARRAY[status] END)
AND ($3 = '' OR (to_tsvector(name || subject) @@ to_tsquery($3)))
ORDER BY created_at DESC OFFSET $4 LIMIT $5;
-- name: get-campaign
SELECT * FROM campaigns WHERE id = $1;
-- name: get-campaign-stats
-- This query is used to lazy load campaign stats (views, counts, list of lists) given a list of campaign IDs.
-- The query returns results in the same order as the given campaign IDs, and for non-existent campaign IDs,
-- the query still returns a row with 0 values. Thus, for lazy loading, the application simply iterate on the results in
-- the same order as the list of campaigns it would've queried and attach the results.
WITH lists AS (
SELECT campaign_id, JSON_AGG(JSON_BUILD_OBJECT('id', list_id, 'name', list_name)) AS lists FROM campaign_lists
WHERE campaign_id = ANY($1) GROUP BY campaign_id
), views AS (
SELECT campaign_id, COUNT(campaign_id) as num FROM campaign_views
WHERE campaign_id = ANY($1)
GROUP BY campaign_id
),
clicks AS (
SELECT campaign_id, COUNT(campaign_id) as num FROM link_clicks
WHERE campaign_id = ANY($1)
GROUP BY campaign_id
)
SELECT id as campaign_id,
COALESCE(v.num, 0) AS views,
COALESCE(c.num, 0) AS clicks,
COALESCE(l.lists, '[]') AS lists
FROM (SELECT id FROM UNNEST($1) AS id) x
LEFT JOIN lists AS l ON (l.campaign_id = id)
LEFT JOIN views AS v ON (v.campaign_id = id)
LEFT JOIN clicks AS c ON (c.campaign_id = id)
ORDER BY ARRAY_POSITION($1, id);
-- name: get-campaign-for-preview
SELECT campaigns.*, COALESCE(templates.body, (SELECT body FROM templates WHERE is_default = true LIMIT 1)) AS template_body,
(
SELECT COALESCE(ARRAY_TO_JSON(ARRAY_AGG(l)), '[]') FROM (
SELECT COALESCE(campaign_lists.list_id, 0) AS id,
campaign_lists.list_name AS name
FROM campaign_lists WHERE campaign_lists.campaign_id = campaigns.id
) l
) AS lists
FROM campaigns
LEFT JOIN templates ON (templates.id = campaigns.template_id)
WHERE campaigns.id = $1;
-- name: get-campaign-status
SELECT id, status, to_send, sent, started_at, updated_at
FROM campaigns
WHERE status=$1;
-- name: next-campaigns
-- Retreives campaigns that are running (or scheduled and the time's up) and need
-- to be processed. It updates the to_send count and max_subscriber_id of the campaign,
-- that is, the total number of subscribers to be processed across all lists of a campaign.
-- Thus, it has a sideaffect.
-- In addition, it finds the max_subscriber_id, the upper limit across all lists of
-- a campaign. This is used to fetch and slice subscribers for the campaign in next-subscriber-campaigns.
WITH camps AS (
-- Get all running campaigns and their template bodies (if the template's deleted, the default template body instead)
SELECT campaigns.*, COALESCE(templates.body, (SELECT body FROM templates WHERE is_default = true LIMIT 1)) AS template_body
FROM campaigns
LEFT JOIN templates ON (templates.id = campaigns.template_id)
WHERE (status='running' OR (status='scheduled' AND NOW() >= campaigns.send_at))
AND NOT(campaigns.id = ANY($1::INT[]))
),
campLists AS (
-- Get the list_ids and their optin statuses for the campaigns found in the previous step.
SELECT id AS list_id, campaign_id, optin FROM lists
INNER JOIN campaign_lists ON (campaign_lists.list_id = lists.id)
WHERE campaign_lists.campaign_id = ANY(SELECT id FROM camps)
),
counts AS (
-- For each campaign above, get the total number of subscribers and the max_subscriber_id
-- across all its lists.
SELECT id AS campaign_id,
COUNT(DISTINCT(subscriber_lists.subscriber_id)) AS to_send,
COALESCE(MAX(subscriber_lists.subscriber_id), 0) AS max_subscriber_id
FROM camps
LEFT JOIN campLists ON (campLists.campaign_id = camps.id)
LEFT JOIN subscriber_lists ON (
subscriber_lists.list_id = campLists.list_id AND
(CASE
-- For optin campaigns, only e-mail 'unconfirmed' subscribers belonging to 'double' optin lists.
WHEN camps.type = 'optin' THEN subscriber_lists.status = 'unconfirmed' AND campLists.optin = 'double'
-- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers.
WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed'
-- For regular campaigns with non-double optin lists, e-mail everyone
-- except unsubscribed subscribers.
ELSE subscriber_lists.status != 'unsubscribed'
END)
)
GROUP BY camps.id
),
u AS (
-- For each campaign, update the to_send count and set the max_subscriber_id.
UPDATE campaigns AS ca
SET to_send = co.to_send,
status = (CASE WHEN status != 'running' THEN 'running' ELSE status END),
max_subscriber_id = co.max_subscriber_id,
started_at=(CASE WHEN ca.started_at IS NULL THEN NOW() ELSE ca.started_at END)
FROM (SELECT * FROM counts) co
WHERE ca.id = co.campaign_id
)
SELECT * FROM camps;
-- name: next-campaign-subscribers
-- Returns a batch of subscribers in a given campaign starting from the last checkpoint
-- (last_subscriber_id). Every fetch updates the checkpoint and the sent count, which means
-- every fetch returns a new batch of subscribers until all rows are exhausted.
WITH camps AS (
SELECT last_subscriber_id, max_subscriber_id, type
FROM campaigns
WHERE id=$1 AND status='running'
),
campLists AS (
SELECT id AS list_id, optin FROM lists
INNER JOIN campaign_lists ON (campaign_lists.list_id = lists.id)
WHERE campaign_lists.campaign_id = $1
),
subs AS (
SELECT DISTINCT ON(subscribers.id) id AS uniq_id, subscribers.* FROM subscriber_lists
INNER JOIN campLists ON (
campLists.list_id = subscriber_lists.list_id
)
INNER JOIN subscribers ON (
subscribers.status != 'blacklisted' AND
subscribers.id = subscriber_lists.subscriber_id AND
(CASE
-- For optin campaigns, only e-mail 'unconfirmed' subscribers.
WHEN (SELECT type FROM camps) = 'optin' THEN subscriber_lists.status = 'unconfirmed' AND campLists.optin = 'double'
-- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers.
WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed'
-- For regular campaigns with non-double optin lists, e-mail everyone
-- except unsubscribed subscribers.
ELSE subscriber_lists.status != 'unsubscribed'
END)
)
WHERE subscriber_lists.status != 'unsubscribed' AND
id > (SELECT last_subscriber_id FROM camps) AND
id <= (SELECT max_subscriber_id FROM camps)
ORDER BY id LIMIT $2
),
u AS (
UPDATE campaigns
SET last_subscriber_id = (SELECT MAX(id) FROM subs),
sent = sent + (SELECT COUNT(id) FROM subs),
updated_at = NOW()
WHERE (SELECT COUNT(id) FROM subs) > 0 AND id=$1
)
SELECT * FROM subs;
-- name: get-one-campaign-subscriber
SELECT * FROM subscribers
LEFT JOIN subscriber_lists ON (subscribers.id = subscriber_lists.subscriber_id AND subscriber_lists.status != 'unsubscribed')
WHERE subscriber_lists.list_id=ANY(
SELECT list_id FROM campaign_lists where campaign_id=$1 AND list_id IS NOT NULL
)
ORDER BY RANDOM() LIMIT 1;
-- name: update-campaign
WITH camp AS (
UPDATE campaigns SET
name=(CASE WHEN $2 != '' THEN $2 ELSE name END),
subject=(CASE WHEN $3 != '' THEN $3 ELSE subject END),
from_email=(CASE WHEN $4 != '' THEN $4 ELSE from_email END),
body=(CASE WHEN $5 != '' THEN $5 ELSE body END),
content_type=(CASE WHEN $6 != '' THEN $6::content_type ELSE content_type END),
send_at=(CASE WHEN $8 THEN $7::TIMESTAMP WITH TIME ZONE WHEN NOT $8 THEN NULL ELSE send_at END),
tags=(CASE WHEN ARRAY_LENGTH($9::VARCHAR(100)[], 1) > 0 THEN $9 ELSE tags END),
template_id=(CASE WHEN $10 != 0 THEN $10 ELSE template_id END),
updated_at=NOW()
WHERE id = $1 RETURNING id
),
d AS (
-- Reset list relationships
DELETE FROM campaign_lists WHERE campaign_id = $1 AND NOT(list_id = ANY($11))
)
INSERT INTO campaign_lists (campaign_id, list_id, list_name)
(SELECT $1 as campaign_id, id, name FROM lists WHERE id=ANY($11::INT[]))
ON CONFLICT (campaign_id, list_id) DO UPDATE SET list_name = EXCLUDED.list_name;
-- name: update-campaign-counts
UPDATE campaigns SET
to_send=(CASE WHEN $2 != 0 THEN $2 ELSE to_send END),
sent=(CASE WHEN $3 != 0 THEN $3 ELSE sent END),
last_subscriber_id=(CASE WHEN $4 != 0 THEN $4 ELSE last_subscriber_id END),
updated_at=NOW()
WHERE id=$1;
-- name: update-campaign-status
UPDATE campaigns SET status=$2, updated_at=NOW() WHERE id = $1;
-- name: delete-campaign
DELETE FROM campaigns WHERE id=$1 AND (status = 'draft' OR status = 'scheduled');
-- name: register-campaign-view
WITH view AS (
SELECT campaigns.id as campaign_id, subscribers.id AS subscriber_id FROM campaigns
LEFT JOIN subscribers ON (subscribers.uuid = $2)
WHERE campaigns.uuid = $1
)
INSERT INTO campaign_views (campaign_id, subscriber_id)
VALUES((SELECT campaign_id FROM view), (SELECT subscriber_id FROM view));
-- users
-- name: get-users
SELECT * FROM users WHERE $1 = 0 OR id = $1 OFFSET $2 LIMIT $3;
-- name: create-user
INSERT INTO users (email, name, password, type, status) VALUES($1, $2, $3, $4, $5) RETURNING id;
-- name: update-user
UPDATE users SET
email=(CASE WHEN $2 != '' THEN $2 ELSE email END),
name=(CASE WHEN $3 != '' THEN $3 ELSE name END),
password=(CASE WHEN $4 != '' THEN $4 ELSE password END),
type=(CASE WHEN $5 != '' THEN $5::user_type ELSE type END),
status=(CASE WHEN $6 != '' THEN $6::user_status ELSE status END),
updated_at=NOW()
WHERE id = $1;
-- name: delete-user
-- Delete a user, except for the primordial super admin.
DELETE FROM users WHERE $1 != 1 AND id=$1;
-- templates
-- name: get-templates
-- Only if the second param ($2) is true, body is returned.
SELECT id, name, (CASE WHEN $2 = false THEN body ELSE '' END) as body,
is_default, created_at, updated_at
FROM templates WHERE $1 = 0 OR id = $1
ORDER BY created_at;
-- name: create-template
INSERT INTO templates (name, body) VALUES($1, $2) RETURNING id;
-- name: update-template
UPDATE templates SET
name=(CASE WHEN $2 != '' THEN $2 ELSE name END),
body=(CASE WHEN $3 != '' THEN $3 ELSE body END),
updated_at=NOW()
WHERE id = $1;
-- name: set-default-template
WITH u AS (
UPDATE templates SET is_default=true WHERE id=$1 RETURNING id
)
UPDATE templates SET is_default=false WHERE id != $1;
-- name: delete-template
-- Delete a template as long as there's more than one. One deletion, set all campaigns
-- with that template to the default template instead.
WITH tpl AS (
DELETE FROM templates WHERE id = $1 AND (SELECT COUNT(id) FROM templates) > 1 AND is_default = false RETURNING id
),
def AS (
SELECT id FROM templates WHERE is_default = true LIMIT 1
)
UPDATE campaigns SET template_id = (SELECT id FROM def) WHERE (SELECT id FROM tpl) > 0 AND template_id = $1
RETURNING (SELECT id FROM tpl);
-- media
-- name: insert-media
INSERT INTO media (uuid, filename, thumb, width, height, created_at) VALUES($1, $2, $3, $4, $5, NOW());
-- name: get-media
SELECT * FROM media ORDER BY created_at DESC;
-- name: delete-media
DELETE FROM media WHERE id=$1 RETURNING filename;
-- links
-- name: create-link
INSERT INTO links (uuid, url) VALUES($1, $2) ON CONFLICT (url) DO UPDATE SET url=EXCLUDED.url RETURNING uuid;
-- name: register-link-click
WITH link AS (
SELECT url, links.id AS link_id, campaigns.id as campaign_id, subscribers.id AS subscriber_id FROM links
LEFT JOIN campaigns ON (campaigns.uuid = $2)
LEFT JOIN subscribers ON (subscribers.uuid = $3)
WHERE links.uuid = $1
)
INSERT INTO link_clicks (campaign_id, subscriber_id, link_id)
VALUES((SELECT campaign_id FROM link), (SELECT subscriber_id FROM link), (SELECT link_id FROM link))
RETURNING (SELECT url FROM link);
-- name: get-dashboard-stats
WITH lists AS (
SELECT JSON_OBJECT_AGG(type, num) FROM (SELECT type, COUNT(id) AS num FROM lists GROUP BY type) row
),
subs AS (
SELECT JSON_OBJECT_AGG(status, num) FROM (SELECT status, COUNT(id) AS num FROM subscribers GROUP by status) row
),
orphans AS (
SELECT COUNT(id) FROM subscribers LEFT JOIN subscriber_lists ON (subscribers.id = subscriber_lists.subscriber_id)
WHERE subscriber_lists.subscriber_id IS NULL
),
camps AS (
SELECT JSON_OBJECT_AGG(status, num) FROM (SELECT status, COUNT(id) AS num FROM campaigns GROUP by status) row
),
clicks AS (
-- Clicks by day for the last 3 months
SELECT JSON_AGG(ROW_TO_JSON(row))
FROM (SELECT COUNT(*) AS count, created_at::DATE as date
FROM link_clicks GROUP by date ORDER BY date DESC LIMIT 100
) row
),
views AS (
-- Views by day for the last 3 months
SELECT JSON_AGG(ROW_TO_JSON(row))
FROM (SELECT COUNT(*) AS count, created_at::DATE as date
FROM campaign_views GROUP by date ORDER BY date DESC LIMIT 100
) row
)
SELECT JSON_BUILD_OBJECT('lists', COALESCE((SELECT * FROM lists), '[]'),
'subscribers', COALESCE((SELECT * FROM subs), '[]'),
'orphan_subscribers', (SELECT * FROM orphans),
'campaigns', COALESCE((SELECT * FROM camps), '[]'),
'link_clicks', COALESCE((SELECT * FROM clicks), '[]'),
'campaign_views', COALESCE((SELECT * FROM views), '[]')) AS stats;

158
static/schema.sql Normal file
View File

@ -0,0 +1,158 @@
DROP TYPE IF EXISTS list_type CASCADE; CREATE TYPE list_type AS ENUM ('public', 'private', 'temporary');
DROP TYPE IF EXISTS list_optin CASCADE; CREATE TYPE list_optin AS ENUM ('single', 'double');
DROP TYPE IF EXISTS subscriber_status CASCADE; CREATE TYPE subscriber_status AS ENUM ('enabled', 'disabled', 'blacklisted');
DROP TYPE IF EXISTS subscription_status CASCADE; CREATE TYPE subscription_status AS ENUM ('unconfirmed', 'confirmed', 'unsubscribed');
DROP TYPE IF EXISTS campaign_status CASCADE; CREATE TYPE campaign_status AS ENUM ('draft', 'running', 'scheduled', 'paused', 'cancelled', 'finished');
DROP TYPE IF EXISTS campaign_type CASCADE; CREATE TYPE campaign_type AS ENUM ('regular', 'optin');
DROP TYPE IF EXISTS content_type CASCADE; CREATE TYPE content_type AS ENUM ('richtext', 'html', 'plain');
-- subscribers
DROP TABLE IF EXISTS subscribers CASCADE;
CREATE TABLE subscribers (
id SERIAL PRIMARY KEY,
uuid uuid NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
attribs JSONB NOT NULL DEFAULT '{}',
status subscriber_status NOT NULL DEFAULT 'enabled',
campaigns INTEGER[],
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP INDEX IF EXISTS idx_subs_email; CREATE UNIQUE INDEX idx_subs_email ON subscribers(LOWER(email));
DROP INDEX IF EXISTS idx_subs_status; CREATE INDEX idx_subs_status ON subscribers(status);
-- lists
DROP TABLE IF EXISTS lists CASCADE;
CREATE TABLE lists (
id SERIAL PRIMARY KEY,
uuid uuid NOT NULL UNIQUE,
name TEXT NOT NULL,
type list_type NOT NULL,
optin list_optin NOT NULL DEFAULT 'single',
tags VARCHAR(100)[],
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP TABLE IF EXISTS subscriber_lists CASCADE;
CREATE TABLE subscriber_lists (
subscriber_id INTEGER REFERENCES subscribers(id) ON DELETE CASCADE ON UPDATE CASCADE,
list_id INTEGER NULL REFERENCES lists(id) ON DELETE CASCADE ON UPDATE CASCADE,
status subscription_status NOT NULL DEFAULT 'unconfirmed',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY(subscriber_id, list_id)
);
DROP INDEX IF EXISTS idx_sub_lists_sub_id; CREATE INDEX idx_sub_lists_sub_id ON subscriber_lists(subscriber_id);
DROP INDEX IF EXISTS idx_sub_lists_list_id; CREATE INDEX idx_sub_lists_list_id ON subscriber_lists(list_id);
DROP INDEX IF EXISTS idx_sub_lists_status; CREATE INDEX idx_sub_lists_status ON subscriber_lists(status);
-- templates
DROP TABLE IF EXISTS templates CASCADE;
CREATE TABLE templates (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
body TEXT NOT NULL,
is_default BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE UNIQUE INDEX ON templates (is_default) WHERE is_default = true;
-- campaigns
DROP TABLE IF EXISTS campaigns CASCADE;
CREATE TABLE campaigns (
id SERIAL PRIMARY KEY,
uuid uuid NOT NULL UNIQUE,
name TEXT NOT NULL,
subject TEXT NOT NULL,
from_email TEXT NOT NULL,
body TEXT NOT NULL,
content_type content_type NOT NULL DEFAULT 'richtext',
send_at TIMESTAMP WITH TIME ZONE,
status campaign_status NOT NULL DEFAULT 'draft',
tags VARCHAR(100)[],
-- The subscription statuses of subscribers to which a campaign will be sent.
-- For opt-in campaigns, this will be 'unsubscribed'.
type campaign_type DEFAULT 'regular',
-- The ID of the messenger backend used to send this campaign.
messenger TEXT NOT NULL,
template_id INTEGER REFERENCES templates(id) ON DELETE SET DEFAULT DEFAULT 1,
-- Progress and stats.
to_send INT NOT NULL DEFAULT 0,
sent INT NOT NULL DEFAULT 0,
max_subscriber_id INT NOT NULL DEFAULT 0,
last_subscriber_id INT NOT NULL DEFAULT 0,
started_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP TABLE IF EXISTS campaign_lists CASCADE;
CREATE TABLE campaign_lists (
campaign_id INTEGER NOT NULL REFERENCES campaigns(id) ON DELETE CASCADE ON UPDATE CASCADE,
-- Lists may be deleted, so list_id is nullable
-- and a copy of the original list name is maintained here.
list_id INTEGER NULL REFERENCES lists(id) ON DELETE SET NULL ON UPDATE CASCADE,
list_name TEXT NOT NULL DEFAULT ''
);
CREATE UNIQUE INDEX ON campaign_lists (campaign_id, list_id);
DROP INDEX IF EXISTS idx_camp_lists_camp_id; CREATE INDEX idx_camp_lists_camp_id ON campaign_lists(campaign_id);
DROP INDEX IF EXISTS idx_camp_lists_list_id; CREATE INDEX idx_camp_lists_list_id ON campaign_lists(list_id);
DROP TABLE IF EXISTS campaign_views CASCADE;
CREATE TABLE campaign_views (
campaign_id INTEGER NOT NULL REFERENCES campaigns(id) ON DELETE CASCADE ON UPDATE CASCADE,
-- Subscribers may be deleted, but the view counts should remain.
subscriber_id INTEGER NULL REFERENCES subscribers(id) ON DELETE SET NULL ON UPDATE CASCADE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP INDEX IF EXISTS idx_views_camp_id; CREATE INDEX idx_views_camp_id ON campaign_views(campaign_id);
DROP INDEX IF EXISTS idx_views_subscriber_id; CREATE INDEX idx_views_subscriber_id ON campaign_views(subscriber_id);
-- media
DROP TABLE IF EXISTS media CASCADE;
CREATE TABLE media (
id SERIAL PRIMARY KEY,
uuid uuid NOT NULL UNIQUE,
filename TEXT NOT NULL,
thumb TEXT NOT NULL,
width INT NOT NULL DEFAULT 0,
height INT NOT NULL DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- links
DROP TABLE IF EXISTS links CASCADE;
CREATE TABLE links (
id SERIAL PRIMARY KEY,
uuid uuid NOT NULL UNIQUE,
url TEXT NOT NULL UNIQUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP TABLE IF EXISTS link_clicks CASCADE;
CREATE TABLE link_clicks (
campaign_id INTEGER REFERENCES campaigns(id) ON DELETE CASCADE ON UPDATE CASCADE,
link_id INTEGER REFERENCES links(id) ON DELETE CASCADE ON UPDATE CASCADE,
-- Subscribers may be deleted, but the link counts should remain.
subscriber_id INTEGER NULL REFERENCES subscribers(id) ON DELETE SET NULL ON UPDATE CASCADE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
DROP INDEX IF EXISTS idx_clicks_camp_id; CREATE INDEX idx_clicks_camp_id ON link_clicks(campaign_id);
DROP INDEX IF EXISTS idx_clicks_link_id; CREATE INDEX idx_clicks_link_id ON link_clicks(link_id);
DROP INDEX IF EXISTS idx_clicks_sub_id; CREATE INDEX idx_clicks_sub_id ON link_clicks(subscriber_id);