Add bulk subscriber querying, segmentation, and management features

- Add a name / e-mail "quicksearch" input to the UI
- Implement row selection and aggregation at table level and a "select all" that selects all rows at the query level
- On selected subscribers, add bulk list management (add / remove / unsubscribe), blacklist, and delete
This commit is contained in:
Kailash Nadh 2018-12-18 10:54:55 +05:30
parent 755d3d2630
commit 5f0e3acfb9
8 changed files with 629 additions and 236 deletions

View File

@ -1,6 +1,6 @@
import React from "react"
import { Link } from "react-router-dom"
import { Row, Col, Modal, Form, Input, Select, Button, Table, Icon, Tooltip, Tag, Popconfirm, Spin, notification } from "antd"
import { Row, Col, Modal, Form, Input, Select, Button, Table, Icon, Tooltip, Tag, Popconfirm, Spin, notification, Radio } from "antd"
import Utils from "./utils"
import * as cs from "./constants"
@ -129,7 +129,7 @@ class CreateFormDef extends React.PureComponent {
<div id="modal-alert-container"></div>
<Spin spinning={ this.props.reqStates[cs.ModelSubscribers] === cs.StatePending }>
<Form onSubmit={this.handleSubmit}>
<Form onSubmit={ this.handleSubmit }>
<Form.Item {...formItemLayout} label="E-mail">
{getFieldDecorator("email", {
initialValue: record.email,
@ -187,13 +187,101 @@ class CreateFormDef extends React.PureComponent {
}
}
class ListsFormDef extends React.PureComponent {
state = {
modalWaiting: false
}
// Handle create / edit form submission.
handleSubmit = (e) => {
e.preventDefault()
var err = null, values = {}
this.props.form.validateFields((e, v) => {
err = e
values = v
})
if(err) {
return
}
if(this.props.allRowsSelected) {
values["list_ids"] = this.props.listIDs
values["query"] = this.props.query
} else {
values["ids"] = this.props.selectedRows.map(r => r.id)
}
this.setState({ modalWaiting: true })
this.props.request(!this.props.allRowsSelected ? cs.Routes.AddSubscribersToLists : cs.Routes.AddSubscribersToListsByQuery,
cs.MethodPut, values).then(() => {
notification["success"]({ message: "Lists changed",
description: `Lists changed for selected subscribers` })
this.props.clearSelectedRows()
this.props.fetchRecords()
this.setState({ modalWaiting: false })
this.props.onClose()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
this.setState({ modalWaiting: false })
})
}
render() {
const { getFieldDecorator } = this.props.form
const formItemLayout = {
labelCol: { xs: { span: 16 }, sm: { span: 4 } },
wrapperCol: { xs: { span: 16 }, sm: { span: 18 } }
}
return (
<Modal visible={ true } width="750px"
className="subscriber-lists-modal"
title="Manage lists"
okText="Ok"
confirmLoading={ this.state.modalWaiting }
onCancel={ this.props.onClose }
onOk={ this.handleSubmit }>
<Form onSubmit={ this.handleSubmit }>
<Form.Item {...formItemLayout} label="Action">
{getFieldDecorator("action", {
initialValue: "add",
rules: [{ required: true }]
})(
<Radio.Group>
<Radio value="add">Add</Radio>
<Radio value="remove">Remove</Radio>
<Radio value="unsubscribe">Mark as unsubscribed</Radio>
</Radio.Group>
)}
</Form.Item>
<Form.Item {...formItemLayout} label="Lists">
{getFieldDecorator("target_list_ids", { rules:[{ required: true }] })(
<Select mode="multiple">
{[...this.props.lists].map((v, i) =>
<Select.Option value={ v.id } key={ v.id }>
{ v.name }
</Select.Option>
)}
</Select>
)}
</Form.Item>
</Form>
</Modal>
)
}
}
const CreateForm = Form.create()(CreateFormDef)
const ListsForm = Form.create()(ListsFormDef)
class Subscribers extends React.PureComponent {
defaultPerPage = 20
state = {
formType: null,
listsFormVisible: false,
record: {},
queryParams: {
page: 1,
@ -204,9 +292,9 @@ class Subscribers extends React.PureComponent {
query: null,
targetLists: []
},
listAddVisible: false,
listModalVisible: false,
allRowsSelected: false,
rowsSelected: []
selectedRows: []
}
// Pagination config.
@ -374,6 +462,57 @@ class Subscribers extends React.PureComponent {
})
}
handleDeleteRecords = (records) => {
this.props.modelRequest(cs.ModelSubscribers, cs.Routes.DeleteSubscribers, cs.MethodDelete, { id: records.map(r => r.id) })
.then(() => {
notification["success"]({ message: "Subscriber(s) deleted", description: "Selected subscribers deleted" })
// Reload the table.
this.fetchRecords()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
})
}
handleBlacklistSubscribers = (records) => {
this.props.request(cs.Routes.BlacklistSubscribers, cs.MethodPut, { ids: records.map(r => r.id) })
.then(() => {
notification["success"]({ message: "Subscriber(s) blacklisted", description: "Selected subscribers blacklisted" })
// Reload the table.
this.fetchRecords()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
})
}
// Arbitrary query based calls.
handleDeleteRecordsByQuery = (listIDs, query) => {
this.props.modelRequest(cs.ModelSubscribers, cs.Routes.DeleteSubscribersByQuery, cs.MethodPost,
{ list_ids: listIDs, query: query })
.then(() => {
notification["success"]({ message: "Subscriber(s) deleted", description: "Selected subscribers have been deleted" })
// Reload the table.
this.fetchRecords()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
})
}
handleBlacklistSubscribersByQuery = (listIDs, query) => {
this.props.request(cs.Routes.BlacklistSubscribersByQuery, cs.MethodPut,
{ list_ids: listIDs, query: query })
.then(() => {
notification["success"]({ message: "Subscriber(s) blacklisted", description: "Selected subscribers have been blacklisted" })
// Reload the table.
this.fetchRecords()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
})
}
handleQuerySubscribersIntoLists = (query, sourceList, targetLists) => {
let params = {
query: query,
@ -383,7 +522,7 @@ class Subscribers extends React.PureComponent {
this.props.request(cs.Routes.QuerySubscribersIntoLists, cs.MethodPost, params).then((res) => {
notification["success"]({ message: "Subscriber(s) added", description: `${ res.data.data.count } added` })
this.handleToggleListAdd()
this.handleToggleListModal()
}).catch(e => {
notification["error"]({ message: "Error", description: e.message })
})
@ -401,6 +540,10 @@ class Subscribers extends React.PureComponent {
this.setState({ formType: cs.FormEdit, record: record })
}
handleToggleListsForm = () => {
this.setState({ listsFormVisible: !this.state.listsFormVisible })
}
handleSearch = (q) => {
q = q.trim().toLowerCase()
if(q === "") {
@ -413,16 +556,25 @@ class Subscribers extends React.PureComponent {
this.fetchRecords({ query: query })
}
handleRowSelection = (_, records) => {
this.setState({ allRowsSelected: false, rowsSelected: records.map(r => r.id) })
handleSelectRow = (_, records) => {
this.setState({ allRowsSelected: false, selectedRows: records })
}
handleSelectAllRows = () => {
this.setState({ allRowsSelected: true,
selectedRows: this.props.data[cs.ModelSubscribers].results })
}
clearSelectedRows = (_, records) => {
this.setState({ allRowsSelected: false, selectedRows: [] })
}
handleToggleQueryForm = () => {
this.setState({ queryFormVisible: !this.state.queryFormVisible })
}
handleToggleListAdd = () => {
this.setState({ listAddVisible: !this.state.listAddVisible })
handleToggleListModal = () => {
this.setState({ listModalVisible: !this.state.listModalVisible })
}
render() {
@ -506,22 +658,44 @@ class Subscribers extends React.PureComponent {
}
</Col>
<Col span={14}>
{ this.state.rowsSelected.length > 0 &&
{ this.state.selectedRows.length > 0 &&
<nav className="table-options">
<p>
<strong>{ this.state.allRowsSelected ? this.state.queryParams.total : this.state.rowsSelected.length }</strong>
<strong>{ this.state.allRowsSelected ? this.state.queryParams.total : this.state.selectedRows.length }</strong>
{" "} subscriber(s) selected
{ !this.state.allRowsSelected &&
<span> &mdash; <a role="button" onClick={ () => { this.setState({ allRowsSelected: true })
}}>Select all { this.state.queryParams.total }?</a>
{ !this.state.allRowsSelected && this.state.queryParams.total > this.state.queryParams.perPage &&
<span> &mdash; <a role="button" onClick={ this.handleSelectAllRows }>
Select all { this.state.queryParams.total }?</a>
</span>
}
</p>
<p>
<a role="button"><Icon type="bars" /> Manage lists</a>
<a role="button" onClick={ this.handleToggleListsForm }>
<Icon type="bars" /> Manage lists
</a>
<a role="button"><Icon type="rocket" /> Send campaign</a>
<a role="button"><Icon type="delete" /> Delete</a>
<a role="button"><Icon type="close" /> Blacklist</a>
<Popconfirm title="Are you sure?" onConfirm={() => {
if(this.state.allRowsSelected) {
this.handleDeleteRecordsByQuery(this.state.queryParams.listID ? [this.state.queryParams.listID] : [], this.state.queryParams.query)
this.clearSelectedRows()
} else {
this.handleDeleteRecords(this.state.selectedRows)
this.clearSelectedRows()
}
}}>
<a role="button"><Icon type="delete" /> Delete</a>
</Popconfirm>
<Popconfirm title="Are you sure?" onConfirm={() => {
if(this.state.allRowsSelected) {
this.handleBlacklistSubscribersByQuery(this.state.queryParams.listID ? [this.state.queryParams.listID] : [], this.state.queryParams.query)
this.clearSelectedRows()
} else {
this.handleBlacklistSubscribers(this.state.selectedRows)
this.clearSelectedRows()
}
}}>
<a role="button"><Icon type="close" /> Blacklist</a>
</Popconfirm>
</p>
</nav>
}
@ -531,13 +705,14 @@ class Subscribers extends React.PureComponent {
<Table
columns={ this.columns }
rowKey={ record => `${record.id}-${record.email}` }
rowKey={ record => `sub-${record.id}` }
dataSource={ this.props.data[cs.ModelSubscribers].results }
loading={ this.props.reqStates[cs.ModelSubscribers] !== cs.StateDone }
pagination={ pagination }
rowSelection = {{
columnWidth: "5%",
onChange: this.handleRowSelection
onChange: this.handleSelectRow,
selectedRowKeys: this.state.selectedRows.map(r => `sub-${r.id}`)
}}
/>
@ -551,35 +726,16 @@ class Subscribers extends React.PureComponent {
onClose={ this.handleHideForm } />
}
<Modal visible={ this.state.listAddVisible } width="750px"
className="list-add-modal"
title={ "Add " + this.props.data[cs.ModelSubscribers].total + " subscriber(s) to lists" }
okText="Add"
onCancel={ this.handleToggleListAdd }
onOk={() => {
if(this.state.queryParams.targetLists.length == 0) {
notification["warning"]({
message: "No lists selected",
description: "Select one or more lists"
})
return false
}
this.handleQuerySubscribersIntoLists(
this.state.queryParams.query,
this.state.queryParams.listID,
this.state.queryParams.targetLists
)
}}
okButtonProps={{ disabled: this.props.reqStates[cs.ModelSubscribers] === cs.StatePending }}>
<Select mode="multiple" style={{ width: "100%" }} onChange={(lists) => {
this.setState({ queryParams: { ...this.state.queryParams, targetLists: lists} })
}}>
{ this.props.data[cs.ModelLists].map((v, i) =>
<Select.Option value={ v.id } key={ v.id }>{ v.name }</Select.Option>
)}
</Select>
</Modal>
{ this.state.listsFormVisible && <ListsForm {...this.props}
lists={ this.props.data[cs.ModelLists] }
allRowsSelected={ this.state.allRowsSelected }
selectedRows={ this.state.selectedRows }
selectedLists={ this.state.queryParams.listID ? [this.state.queryParams.listID] : []}
clearSelectedRows={ this.clearSelectedRows }
query={ this.state.queryParams.query }
fetchRecords={ this.fetchRecords }
onClose={ this.handleToggleListsForm } />
}
</section>
)
}

View File

@ -67,7 +67,14 @@ export const Routes = {
UpdateSubscriber: "/api/subscribers/:id",
DeleteSubscriber: "/api/subscribers/:id",
DeleteSubscribers: "/api/subscribers",
QuerySubscribersIntoLists: "/api/subscribers/lists",
BlacklistSubscriber: "/api/subscribers/:id/blacklist",
BlacklistSubscribers: "/api/subscribers/blacklist",
AddSubscriberToLists: "/api/subscribers/lists/:id",
AddSubscribersToLists: "/api/subscribers/lists",
DeleteSubscribersByQuery: "/api/subscribers/query/delete",
BlacklistSubscribersByQuery: "/api/subscribers/query/blacklist",
AddSubscribersToListsByQuery: "/api/subscribers/query/lists",
ViewCampaigns: "/campaigns",
ViewCampaign: "/campaigns/:id",

16
main.go
View File

@ -87,12 +87,22 @@ func registerHandlers(e *echo.Echo) {
e.DELETE("/api/users/:id", handleDeleteUser)
e.GET("/api/subscribers/:id", handleGetSubscriber)
e.GET("/api/subscribers", handleQuerySubscribers)
e.POST("/api/subscribers", handleCreateSubscriber)
e.PUT("/api/subscribers/:id", handleUpdateSubscriber)
e.PUT("/api/subscribers/blacklist", handleBlacklistSubscribers)
e.PUT("/api/subscribers/:id/blacklist", handleBlacklistSubscribers)
e.PUT("/api/subscribers/lists/:id", handleManageSubscriberLists)
e.PUT("/api/subscribers/lists", handleManageSubscriberLists)
e.DELETE("/api/subscribers/:id", handleDeleteSubscribers)
e.DELETE("/api/subscribers", handleDeleteSubscribers)
e.POST("/api/subscribers/lists", handleQuerySubscribersIntoLists)
// Subscriber operations based on arbitrary SQL queries.
// These aren't very REST-like.
e.POST("/api/subscribers/query/delete", handleDeleteSubscribersByQuery)
e.PUT("/api/subscribers/query/blacklist", handleBlacklistSubscribersByQuery)
e.PUT("/api/subscribers/query/lists", handleManageSubscriberListsByQuery)
e.GET("/api/subscribers", handleQuerySubscribers)
e.GET("/api/import/subscribers", handleGetImportSubscribers)
e.GET("/api/import/subscribers/logs", handleGetImportSubscriberStats)
@ -226,7 +236,7 @@ func main() {
return sendNotification(notifTplImport, subject, data, app)
}
app.Importer = subimporter.New(q.UpsertSubscriber.Stmt,
q.BlacklistSubscriber.Stmt,
q.UpsertBlacklistSubscriber.Stmt,
db.DB,
importNotifCB)

View File

@ -91,6 +91,10 @@ type Subscriber struct {
Status string `db:"status" json:"status"`
CampaignIDs pq.Int64Array `db:"campaigns" json:"-"`
Lists []List `json:"lists"`
// Pseudofield for getting the total number of subscribers
// in searches and queries.
Total int `db:"total" json:"-"`
}
// SubscriberAttribs is the map of key:value attributes of a subscriber.

View File

@ -1,8 +1,12 @@
package main
import (
"context"
"database/sql"
"fmt"
"github.com/lib/pq"
"github.com/jmoiron/sqlx"
)
@ -10,20 +14,28 @@ import (
type Queries struct {
GetDashboardStats *sqlx.Stmt `query:"get-dashboard-stats"`
InsertSubscriber *sqlx.Stmt `query:"insert-subscriber"`
UpsertSubscriber *sqlx.Stmt `query:"upsert-subscriber"`
BlacklistSubscriber *sqlx.Stmt `query:"blacklist-subscriber"`
GetSubscriber *sqlx.Stmt `query:"get-subscriber"`
GetSubscribersByEmails *sqlx.Stmt `query:"get-subscribers-by-emails"`
GetSubscriberLists *sqlx.Stmt `query:"get-subscriber-lists"`
QuerySubscribers string `query:"query-subscribers"`
QuerySubscribersCount string `query:"query-subscribers-count"`
QuerySubscribersByList string `query:"query-subscribers-by-list"`
QuerySubscribersByListCount string `query:"query-subscribers-by-list-count"`
UpdateSubscriber *sqlx.Stmt `query:"update-subscriber"`
DeleteSubscribers *sqlx.Stmt `query:"delete-subscribers"`
Unsubscribe *sqlx.Stmt `query:"unsubscribe"`
QuerySubscribersIntoLists string `query:"query-subscribers-into-lists"`
InsertSubscriber *sqlx.Stmt `query:"insert-subscriber"`
UpsertSubscriber *sqlx.Stmt `query:"upsert-subscriber"`
UpsertBlacklistSubscriber *sqlx.Stmt `query:"upsert-blacklist-subscriber"`
GetSubscriber *sqlx.Stmt `query:"get-subscriber"`
GetSubscribersByEmails *sqlx.Stmt `query:"get-subscribers-by-emails"`
GetSubscriberLists *sqlx.Stmt `query:"get-subscriber-lists"`
UpdateSubscriber *sqlx.Stmt `query:"update-subscriber"`
BlacklistSubscribers *sqlx.Stmt `query:"blacklist-subscribers"`
AddSubscribersToLists *sqlx.Stmt `query:"add-subscribers-to-lists"`
DeleteSubscriptions *sqlx.Stmt `query:"delete-subscriptions"`
UnsubscribeSubscribersFromLists *sqlx.Stmt `query:"unsubscribe-subscribers-from-lists"`
DeleteSubscribers *sqlx.Stmt `query:"delete-subscribers"`
Unsubscribe *sqlx.Stmt `query:"unsubscribe"`
// Non-prepared arbitrary subscriber queries.
QuerySubscribers string `query:"query-subscribers"`
QuerySubscribersTpl string `query:"query-subscribers-template"`
DeleteSubscribersByQuery string `query:"delete-subscribers-by-query"`
AddSubscribersToListsByQuery string `query:"add-subscribers-to-lists-by-query"`
BlacklistSubscribersByQuery string `query:"blacklist-subscribers-by-query"`
DeleteSubscriptionsByQuery string `query:"delete-subscriptions-by-query"`
UnsubscribeSubscribersFromListsByQuery string `query:"unsubscribe-subscribers-from-lists-by-query"`
CreateList *sqlx.Stmt `query:"create-list"`
GetLists *sqlx.Stmt `query:"get-lists"`
@ -74,3 +86,49 @@ func connectDB(host string, port int, user, pwd, dbName string) (*sqlx.DB, error
return db, nil
}
// compileSubscriberQueryTpl takes a arbitrary WHERE expressions
// to filter subscribers from the subscribers table and prepares a query
// out of it using the raw `query-subscribers-template` query template.
// While doing this, a readonly transaction is created and the query is
// dry run on it to ensure that it is indeed readonly.
func (q *Queries) compileSubscriberQueryTpl(exp string, db *sqlx.DB) (string, error) {
tx, err := db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
return "", err
}
// Perform the dry run.
if exp != "" {
exp = " AND " + exp
}
stmt := fmt.Sprintf(q.QuerySubscribersTpl, exp)
if _, err := tx.Exec(stmt, true, pq.Int64Array{}); err != nil {
tx.Rollback()
return "", err
}
return stmt, nil
}
// compileSubscriberQueryTpl takes a arbitrary WHERE expressions and a subscriber
// query template that depends on the filter (eg: delete by query, blacklist by query etc.)
// combines and executes them.
func (q *Queries) execSubscriberQueryTpl(exp, tpl string, listIDs []int64, db *sqlx.DB, args ...interface{}) error {
// Perform a dry run.
filterExp, err := q.compileSubscriberQueryTpl(exp, db)
if err != nil {
return err
}
if len(listIDs) == 0 {
listIDs = pq.Int64Array{}
}
// First argument is the boolean indicating if the query is a dry run.
a := append([]interface{}{false, pq.Int64Array(listIDs)}, args...)
if _, err := db.Exec(fmt.Sprintf(tpl, filterExp), a...); err != nil {
return err
}
return nil
}

View File

@ -14,31 +14,6 @@ SELECT lists.*, subscriber_lists.subscriber_id, subscriber_lists.status AS subsc
LEFT JOIN subscriber_lists ON (subscriber_lists.list_id = lists.id)
WHERE subscriber_lists.subscriber_id = ANY($1::INT[]);
-- name: query-subscribers
-- raw: true
-- Unprepared statement for issuring arbitrary WHERE conditions.
SELECT * FROM subscribers WHERE 1=1 %s order by updated_at DESC OFFSET %d LIMIT %d;
-- name: query-subscribers-count
-- raw: true
SELECT COUNT(id) as num FROM subscribers WHERE 1=1 %s;
-- name: query-subscribers-by-list
-- raw: true
-- Unprepared statement for issuring arbitrary WHERE conditions.
SELECT subscribers.* FROM subscribers INNER JOIN subscriber_lists
ON (subscriber_lists.subscriber_id = subscribers.id)
WHERE subscriber_lists.list_id = %d
%s
ORDER BY id DESC OFFSET %d LIMIT %d;
-- name: query-subscribers-by-list-count
-- raw: true
SELECT COUNT(subscribers.id) as num FROM subscribers INNER JOIN subscriber_lists
ON (subscriber_lists.subscriber_id = subscribers.id)
WHERE subscriber_lists.list_id = %d
%s;
-- name: insert-subscriber
WITH sub AS (
INSERT INTO subscribers (uuid, email, name, status, attribs)
@ -77,10 +52,11 @@ subs AS (
)
SELECT uuid, id from sub;
-- name: blacklist-subscriber
-- name: upsert-blacklist-subscriber
-- Upserts a subscriber where the update will only set the status to blacklisted
-- unlike upsert-subscribers where name and attributes are updated. In addition, all
-- existing subscriptions are marked as 'unsubscribed'.
-- This is used in the bulk importer.
WITH sub AS (
INSERT INTO subscribers (uuid, email, name, attribs, status)
VALUES($1, $2, $3, $4, 'blacklisted')
@ -116,7 +92,28 @@ INSERT INTO subscriber_lists (subscriber_id, list_id, status)
-- name: delete-subscribers
-- Delete one or more subscribers.
DELETE FROM subscribers WHERE id = ALL($1);
DELETE FROM subscribers WHERE id = ANY($1);
-- name: blacklist-subscribers
WITH b AS (
UPDATE subscribers SET status='blacklisted', updated_at=NOW()
WHERE id = ANY($1::INT[])
)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE subscriber_id = ANY($1::INT[]);
-- name: add-subscribers-to-lists
INSERT INTO subscriber_lists (subscriber_id, list_id)
(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b)
ON CONFLICT (subscriber_id, list_id) DO NOTHING;
-- name: delete-subscriptions
DELETE FROM subscriber_lists
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b);
-- name: unsubscribe-subscribers-from-lists
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST($1::INT[]) a, UNNEST($2::INT[]) b);
-- name: unsubscribe
-- Unsubscribes a subscriber given a campaign UUID (from all the lists in the campaign) and the subscriber UUID.
@ -136,17 +133,81 @@ UPDATE subscriber_lists SET status = 'unsubscribed' WHERE
-- If $3 is false, unsubscribe from the campaign's lists, otherwise all lists.
CASE WHEN $3 IS FALSE THEN list_id = ANY(SELECT list_id FROM lists) ELSE list_id != 0 END;
-- name: query-subscribers-into-lists
-- Partial and RAW queries used to construct arbitrary subscriber
-- queries for segmentation follow.
-- name: query-subscribers
-- raw: true
-- Unprepared statement for issuring arbitrary WHERE conditions and getting
-- the resultant subscriber IDs into subscriber_lists.
WITH subs AS (
SELECT id FROM subscribers WHERE status != 'blacklisted' %s
-- Unprepared statement for issuring arbitrary WHERE conditions for
-- searching subscribers. While the results are sliced using offset+limit,
-- there's a COUNT() OVER() that still returns the total result count
-- for pagination in the frontend, albeit being a field that'll repeat
-- with every resultant row.
SELECT COUNT(*) OVER () AS total, subscribers.* FROM subscribers
LEFT JOIN subscriber_lists
ON (
-- Optional list filtering.
(CASE WHEN CARDINALITY($1::INT[]) > 0 THEN true ELSE false END)
AND subscriber_lists.subscriber_id = subscribers.id
)
WHERE subscriber_lists.list_id = ALL($1::INT[])
%s
ORDER BY $2 DESC OFFSET $3 LIMIT $4;
-- name: query-subscribers-template
-- raw: true
-- This raw query is reused in multiple queries (blacklist, add to list, delete)
-- etc., so it's kept has a raw template to be injected into other raw queries,
-- and for the same reason, it is not terminated with a semicolon.
--
-- All queries that embed this query should expect
-- $1=true/false (dry-run or not) and $2=[]INT (option list IDs).
-- That is, their positional arguments should start from $3.
SELECT subscribers.id FROM subscribers
LEFT JOIN subscriber_lists
ON (
-- Optional list filtering.
(CASE WHEN CARDINALITY($2::INT[]) > 0 THEN true ELSE false END)
AND subscriber_lists.subscriber_id = subscribers.id
)
WHERE subscriber_lists.list_id = ALL($2::INT[]) %s
LIMIT (CASE WHEN $1 THEN 1 END)
-- name: delete-subscribers-by-query
-- raw: true
WITH subs AS (%s)
DELETE FROM subscribers WHERE id=ANY(SELECT id FROM subs);
-- name: blacklist-subscribers-by-query
-- raw: true
WITH subs AS (%s),
b AS (
UPDATE subscribers SET status='blacklisted', updated_at=NOW()
WHERE id = ANY(SELECT id FROM subs)
)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE subscriber_id = ANY(SELECT id FROM subs);
-- name: add-subscribers-to-lists-by-query
-- raw: true
WITH subs AS (%s)
INSERT INTO subscriber_lists (subscriber_id, list_id)
(SELECT id, UNNEST($1::INT[]) FROM subs)
(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b)
ON CONFLICT (subscriber_id, list_id) DO NOTHING;
-- name: delete-subscriptions-by-query
-- raw: true
WITH subs AS (%s)
DELETE FROM subscriber_lists
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b);
-- name: unsubscribe-subscribers-from-lists-by-query
-- raw: true
WITH subs AS (%s)
UPDATE subscriber_lists SET status='unsubscribed', updated_at=NOW()
WHERE (subscriber_id, list_id) = ANY(SELECT a, b FROM UNNEST(ARRAY(SELECT id FROM subs)) a, UNNEST($3::INT[]) b);
-- lists
-- name: get-lists
SELECT lists.*, COUNT(subscriber_lists.subscriber_id) AS subscriber_count

View File

@ -16,6 +16,16 @@ import (
uuid "github.com/satori/go.uuid"
)
// subQueryReq is a "catch all" struct for reading various
// subscriber related requests.
type subQueryReq struct {
Query string `json:"query"`
ListIDs pq.Int64Array `json:"list_ids"`
TargetListIDs pq.Int64Array `json:"target_list_ids"`
SubscriberIDs pq.Int64Array `json:"ids"`
Action string `json:"action"`
}
type subsWrap struct {
Results models.Subscribers `json:"results"`
@ -25,26 +35,12 @@ type subsWrap struct {
Page int `json:"page"`
}
type queryAddResp struct {
Count int64 `json:"count"`
var dummySubscriber = models.Subscriber{
Email: "dummy@listmonk.app",
Name: "Dummy Subscriber",
UUID: "00000000-0000-0000-0000-000000000000",
}
type queryAddReq struct {
Query string `json:"query"`
SourceList int `json:"source_list"`
TargetLists pq.Int64Array `json:"target_lists"`
}
var (
jsonMap = []byte("{}")
dummySubscriber = models.Subscriber{
Email: "dummy@listmonk.app",
Name: "Dummy Subscriber",
UUID: "00000000-0000-0000-0000-000000000000",
}
)
// handleGetSubscriber handles the retrieval of a single subscriber by ID.
func handleGetSubscriber(c echo.Context) error {
var (
@ -70,7 +66,7 @@ func handleGetSubscriber(c echo.Context) error {
return c.JSON(http.StatusOK, okResp{out[0]})
}
// handleQuerySubscribers handles querying subscribers based on arbitrary conditions in SQL.
// handleQuerySubscribers handles querying subscribers based on an arbitrary SQL expression.
func handleQuerySubscribers(c echo.Context) error {
var (
app = c.Get("app").(*App)
@ -78,18 +74,17 @@ func handleQuerySubscribers(c echo.Context) error {
// Limit the subscribers to a particular list?
listID, _ = strconv.Atoi(c.FormValue("list_id"))
hasList bool
// The "WHERE ?" bit.
query = c.FormValue("query")
out subsWrap
out subsWrap
)
listIDs := pq.Int64Array{}
if listID < 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid `list_id`.")
} else if listID > 0 {
hasList = true
listIDs = append(listIDs, int64(listID))
}
// There's an arbitrary query condition from the frontend.
@ -98,23 +93,7 @@ func handleQuerySubscribers(c echo.Context) error {
cond = " AND " + query
}
// The SQL queries to be executed are different for global subscribers
// and subscribers belonging to a specific list.
var (
stmt = ""
stmtCount = ""
)
if hasList {
stmt = fmt.Sprintf(app.Queries.QuerySubscribersByList,
listID, cond, pg.Offset, pg.Limit)
stmtCount = fmt.Sprintf(app.Queries.QuerySubscribersByListCount,
listID, cond)
} else {
stmt = fmt.Sprintf(app.Queries.QuerySubscribers,
cond, pg.Offset, pg.Limit)
stmtCount = fmt.Sprintf(app.Queries.QuerySubscribersCount, cond)
}
stmt := fmt.Sprintf(app.Queries.QuerySubscribers, cond)
// Create a readonly transaction to prevent mutations.
tx, err := app.DB.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
@ -122,26 +101,13 @@ func handleQuerySubscribers(c echo.Context) error {
fmt.Sprintf("Error preparing query: %v", pqErrMsg(err)))
}
// Run the actual query.
if err := tx.Select(&out.Results, stmt); err != nil {
// Run the query.
if err := tx.Select(&out.Results, stmt, listIDs, "id", pg.Offset, pg.Limit); err != nil {
tx.Rollback()
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error querying subscribers: %v", pqErrMsg(err)))
}
// Run the query count.
if err := tx.Get(&out.Total, stmtCount); err != nil {
tx.Rollback()
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error running count query: %v", pqErrMsg(err)))
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error in subscriber query transaction: %v", pqErrMsg(err)))
}
// Lazy load lists for each subscriber.
if err := out.Results.LoadLists(app.Queries.GetSubscriberLists); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
@ -155,13 +121,14 @@ func handleQuerySubscribers(c echo.Context) error {
}
// Meta.
out.Total = out.Results[0].Total
out.Page = pg.Page
out.PerPage = pg.PerPage
return c.JSON(http.StatusOK, okResp{out})
}
// handleCreateSubscriber handles subscriber creation.
// handleCreateSubscriber handles the creation of a new subscriber.
func handleCreateSubscriber(c echo.Context) error {
var (
app = c.Get("app").(*App)
@ -199,7 +166,7 @@ func handleCreateSubscriber(c echo.Context) error {
return c.JSON(http.StatusOK, handleGetSubscriber(c))
}
// handleUpdateSubscriber handles subscriber modification.
// handleUpdateSubscriber handles modification of a subscriber.
func handleUpdateSubscriber(c echo.Context) error {
var (
app = c.Get("app").(*App)
@ -236,109 +203,217 @@ func handleUpdateSubscriber(c echo.Context) error {
return handleGetSubscriber(c)
}
// handleDeleteSubscribers handles subscriber deletion,
// either a single one (ID in the URI), or a list.
func handleDeleteSubscribers(c echo.Context) error {
// handleBlacklistSubscribers handles the blacklisting of one or more subscribers.
// It takes either an ID in the URI, or a list of IDs in the request body.
func handleBlacklistSubscribers(c echo.Context) error {
var (
app = c.Get("app").(*App)
id, _ = strconv.ParseInt(c.Param("id"), 10, 64)
ids pq.Int64Array
app = c.Get("app").(*App)
pID = c.Param("id")
IDs pq.Int64Array
)
// Read the list IDs if they were sent in the body.
c.Bind(&ids)
if id < 1 && len(ids) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid ID.")
// Is it a /:id call?
if pID != "" {
id, _ := strconv.ParseInt(pID, 10, 64)
if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid ID.")
}
IDs = append(IDs, id)
} else {
// Multiple IDs.
var req subQueryReq
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("One or more invalid IDs given: %v", err))
}
if len(req.SubscriberIDs) == 0 {
return echo.NewHTTPError(http.StatusBadRequest,
"No IDs given.")
}
IDs = req.SubscriberIDs
}
if id > 0 {
ids = append(ids, id)
}
if _, err := app.Queries.DeleteSubscribers.Exec(ids); err != nil {
if _, err := app.Queries.BlacklistSubscribers.Exec(IDs); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Delete failed: %v", err))
fmt.Sprintf("Error blacklisting: %v", err))
}
return c.JSON(http.StatusOK, okResp{true})
}
// handleQuerySubscribersIntoLists handles querying subscribers based on arbitrary conditions in SQL
// and adding them to given lists.
func handleQuerySubscribersIntoLists(c echo.Context) error {
// handleManageSubscriberLists handles bulk addition or removal of subscribers
// from or to one or more target lists.
// It takes either an ID in the URI, or a list of IDs in the request body.
func handleManageSubscriberLists(c echo.Context) error {
var (
app = c.Get("app").(*App)
req queryAddReq
pID = c.Param("id")
IDs pq.Int64Array
)
// Get and validate fields.
// Is it a /:id call?
if pID != "" {
id, _ := strconv.ParseInt(pID, 10, 64)
if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid ID.")
}
IDs = append(IDs, id)
}
var req subQueryReq
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error parsing request: %v", err))
fmt.Sprintf("One or more invalid IDs given: %v", err))
}
if len(req.SubscriberIDs) == 0 {
return echo.NewHTTPError(http.StatusBadRequest,
"No IDs given.")
}
if len(IDs) == 0 {
IDs = req.SubscriberIDs
}
if len(req.TargetListIDs) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "No lists given.")
}
if len(req.TargetLists) < 1 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid `target_lists`.")
// Action.
var err error
switch req.Action {
case "add":
_, err = app.Queries.AddSubscribersToLists.Exec(IDs, req.TargetListIDs)
case "remove":
_, err = app.Queries.DeleteSubscriptions.Exec(IDs, req.TargetListIDs)
case "unsubscribe":
_, err = app.Queries.UnsubscribeSubscribersFromLists.Exec(IDs, req.TargetListIDs)
default:
return echo.NewHTTPError(http.StatusBadRequest, "Invalid action.")
}
if req.SourceList < 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid `source_list`.")
}
if req.Query == "" {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid subscriber `query`.")
}
cond := " AND " + req.Query
// The SQL queries to be executed are different for global subscribers
// and subscribers belonging to a specific list.
var (
stmt = ""
stmtDry = ""
)
if req.SourceList > 0 {
stmt = fmt.Sprintf(app.Queries.QuerySubscribersByList, req.SourceList, cond)
stmtDry = fmt.Sprintf(app.Queries.QuerySubscribersByList, req.SourceList, cond, 0, 1)
} else {
stmt = fmt.Sprintf(app.Queries.QuerySubscribersIntoLists, cond)
stmtDry = fmt.Sprintf(app.Queries.QuerySubscribers, cond, 0, 1)
}
// Create a readonly transaction to prevent mutations.
// This is used to dry-run the arbitrary query before it's used to
// insert subscriptions.
tx, err := app.DB.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error preparing query (dry-run): %v", pqErrMsg(err)))
fmt.Sprintf("Error processing lists: %v", err))
}
// Perform the dry run.
if _, err := tx.Exec(stmtDry); err != nil {
tx.Rollback()
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error querying (dry-run) subscribers: %v", pqErrMsg(err)))
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error in subscriber dry-run query transaction: %v", pqErrMsg(err)))
}
// Prepare the query.
q, err := app.DB.Preparex(stmt)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error preparing query: %v", pqErrMsg(err)))
}
// Run the query.
res, err := q.Exec(req.TargetLists)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error adding subscribers to lists: %v", pqErrMsg(err)))
}
num, _ := res.RowsAffected()
return c.JSON(http.StatusOK, okResp{queryAddResp{num}})
return c.JSON(http.StatusOK, okResp{true})
}
// handleDeleteSubscribers handles subscriber deletion.
// It takes either an ID in the URI, or a list of IDs in the request body.
func handleDeleteSubscribers(c echo.Context) error {
var (
app = c.Get("app").(*App)
pID = c.Param("id")
IDs pq.Int64Array
)
// Is it an /:id call?
if pID != "" {
id, _ := strconv.ParseInt(pID, 10, 64)
if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid ID.")
}
IDs = append(IDs, id)
} else {
// Multiple IDs.
i, err := parseStringIDs(c.Request().URL.Query()["id"])
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("One or more invalid IDs given: %v", err))
}
if len(i) == 0 {
return echo.NewHTTPError(http.StatusBadRequest,
"No IDs given.")
}
IDs = i
}
if _, err := app.Queries.DeleteSubscribers.Exec(IDs); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error deleting: %v", err))
}
return c.JSON(http.StatusOK, okResp{true})
}
// handleDeleteSubscribersByQuery bulk deletes based on an
// arbitrary SQL expression.
func handleDeleteSubscribersByQuery(c echo.Context) error {
var (
app = c.Get("app").(*App)
req subQueryReq
)
if err := c.Bind(&req); err != nil {
return err
}
err := app.Queries.execSubscriberQueryTpl(req.Query,
app.Queries.DeleteSubscribersByQuery,
req.ListIDs, app.DB)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error: %v", err))
}
return c.JSON(http.StatusOK, okResp{true})
}
// handleBlacklistSubscribersByQuery bulk blacklists subscribers
// based on an arbitrary SQL expression.
func handleBlacklistSubscribersByQuery(c echo.Context) error {
var (
app = c.Get("app").(*App)
req subQueryReq
)
if err := c.Bind(&req); err != nil {
return err
}
err := app.Queries.execSubscriberQueryTpl(req.Query,
app.Queries.BlacklistSubscribersByQuery,
req.ListIDs, app.DB)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error: %v", err))
}
return c.JSON(http.StatusOK, okResp{true})
}
// handleBlacklistSubscribersByQuery bulk adds/removes/unsubscribers subscribers
// from one or more lists based on an arbitrary SQL expression.
func handleManageSubscriberListsByQuery(c echo.Context) error {
var (
app = c.Get("app").(*App)
req subQueryReq
)
if err := c.Bind(&req); err != nil {
return err
}
if len(req.TargetListIDs) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "No lists given.")
}
// Action.
var stmt string
switch req.Action {
case "add":
stmt = app.Queries.AddSubscribersToListsByQuery
case "remove":
stmt = app.Queries.DeleteSubscriptionsByQuery
case "unsubscribe":
stmt = app.Queries.UnsubscribeSubscribersFromListsByQuery
default:
return echo.NewHTTPError(http.StatusBadRequest, "Invalid action.")
}
err := app.Queries.execSubscriberQueryTpl(req.Query, stmt, req.ListIDs, app.DB, req.TargetListIDs)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error: %v", err))
}
return c.JSON(http.StatusOK, okResp{true})
}

View File

@ -259,3 +259,25 @@ func makeErrorTpl(pageTitle, heading, desc string) errorTpl {
return err
}
// parseStringIDs takes a slice of numeric string IDs and
// parses each number into an int64 and returns a slice of the
// resultant values.
func parseStringIDs(s []string) ([]int64, error) {
var vals []int64
for _, v := range s {
i, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return nil, err
}
if i < 1 {
return nil, fmt.Errorf("%d is not a valid ID", i)
}
vals = append(vals, i)
}
return vals, nil
}