Add 'overwrite?' option to bulk import.

- Fix minor UI inconsitency on import states.
- Minor refactor to importer initialisation.
This commit is contained in:
Kailash Nadh 2020-07-05 21:35:17 +05:30
parent 79dd916d09
commit 61f8fae50d
6 changed files with 72 additions and 54 deletions

View File

@ -16,7 +16,15 @@
</div> </div>
</b-field> </b-field>
<list-selector <b-field v-if="form.mode === 'subscribe'"
label="Overwrite?"
message="Overwrite name and attribs of existing subscribers?">
<div>
<b-switch v-model="form.overwrite" name="overwrite" />
</div>
</b-field>
<list-selector v-if="form.mode === 'subscribe'"
label="Lists" label="Lists"
placeholder="Lists to subscribe to" placeholder="Lists to subscribe to"
message="Lists to subscribe to." message="Lists to subscribe to."
@ -31,9 +39,7 @@
placeholder="," maxlength="1" required /> placeholder="," maxlength="1" required />
</b-field> </b-field>
<b-field label="CSV or ZIP file" <b-field label="CSV or ZIP file">
message="For existing subscribers, the names and attributes
will be overwritten with the values in the CSV.">
<b-upload v-model="form.file" drag-drop expanded required> <b-upload v-model="form.file" drag-drop expanded required>
<div class="has-text-centered section"> <div class="has-text-centered section">
<p> <p>
@ -50,12 +56,12 @@
</div> </div>
<div class="buttons"> <div class="buttons">
<b-button native-type="submit" type="is-primary" <b-button native-type="submit" type="is-primary"
:disabled="form.lists.length === 0" :disabled="!form.file || (form.mode === 'subscribe' && form.lists.length === 0)"
:loading="isProcessing">Upload</b-button> :loading="isProcessing">Upload</b-button>
</div> </div>
</div> </div>
</form> </form>
<hr /> <br /><br />
<div class="import-help"> <div class="import-help">
<h5 class="title is-size-6">Instructions</h5> <h5 class="title is-size-6">Instructions</h5>
@ -145,6 +151,7 @@ export default Vue.extend({
mode: 'subscribe', mode: 'subscribe',
delim: ',', delim: ',',
lists: [], lists: [],
overwrite: true,
file: null, file: null,
}, },
@ -247,6 +254,7 @@ export default Vue.extend({
this.isProcessing = true; this.isProcessing = true;
this.$api.stopImport().then(() => { this.$api.stopImport().then(() => {
this.pollStatus(); this.pollStatus();
this.form.file = null;
}); });
}, },
@ -259,6 +267,7 @@ export default Vue.extend({
mode: this.form.mode, mode: this.form.mode,
delim: this.form.delim, delim: this.form.delim,
lists: this.form.lists.map((l) => l.id), lists: this.form.lists.map((l) => l.id),
overwrite: this.form.overwrite,
})); }));
params.set('file', this.form.file); params.set('file', this.form.file);
@ -275,6 +284,7 @@ export default Vue.extend({
this.pollStatus(); this.pollStatus();
}, () => { }, () => {
this.isProcessing = false; this.isProcessing = false;
this.form.file = null;
}); });
}, },
}, },

View File

@ -14,9 +14,10 @@ import (
// reqImport represents file upload import params. // reqImport represents file upload import params.
type reqImport struct { type reqImport struct {
Mode string `json:"mode"` Mode string `json:"mode"`
Delim string `json:"delim"` Overwrite bool `json:"overwrite"`
ListIDs []int `json:"lists"` Delim string `json:"delim"`
ListIDs []int `json:"lists"`
} }
// handleImportSubscribers handles the uploading and bulk importing of // handleImportSubscribers handles the uploading and bulk importing of
@ -71,7 +72,7 @@ func handleImportSubscribers(c echo.Context) error {
} }
// Start the importer session. // Start the importer session.
impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.ListIDs) impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.Overwrite, r.ListIDs)
if err != nil { if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error starting import session: %v", err)) fmt.Sprintf("Error starting import session: %v", err))

18
init.go
View File

@ -211,14 +211,16 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
// initImporter initializes the bulk subscriber importer. // initImporter initializes the bulk subscriber importer.
func initImporter(q *Queries, db *sqlx.DB, app *App) *subimporter.Importer { func initImporter(q *Queries, db *sqlx.DB, app *App) *subimporter.Importer {
return subimporter.New(q.UpsertSubscriber.Stmt, return subimporter.New(
q.UpsertBlacklistSubscriber.Stmt, subimporter.Options{
q.UpdateListsDate.Stmt, UpsertStmt: q.UpsertSubscriber.Stmt,
db.DB, BlacklistStmt: q.UpsertBlacklistSubscriber.Stmt,
func(subject string, data interface{}) error { UpdateListDateStmt: q.UpdateListsDate.Stmt,
app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data) NotifCB: func(subject string, data interface{}) error {
return nil app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data)
}) return nil
},
}, db.DB)
} }
// initMessengers initializes various messenger backends. // initMessengers initializes various messenger backends.

View File

@ -82,7 +82,7 @@ func install(db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
"John Doe", "John Doe",
`{"type": "known", "good": true, "city": "Bengaluru"}`, `{"type": "known", "good": true, "city": "Bengaluru"}`,
pq.Int64Array{int64(defList)}, pq.Int64Array{int64(defList)},
); err != nil { true); err != nil {
lo.Fatalf("Error creating subscriber: %v", err) lo.Fatalf("Error creating subscriber: %v", err)
} }
if _, err := q.UpsertSubscriber.Exec( if _, err := q.UpsertSubscriber.Exec(
@ -91,7 +91,7 @@ func install(db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
"Anon Doe", "Anon Doe",
`{"type": "unknown", "good": true, "city": "Bengaluru"}`, `{"type": "unknown", "good": true, "city": "Bengaluru"}`,
pq.Int64Array{int64(optinList)}, pq.Int64Array{int64(optinList)},
); err != nil { true); err != nil {
lo.Fatalf("Error creating subscriber: %v", err) lo.Fatalf("Error creating subscriber: %v", err)
} }

View File

@ -49,25 +49,31 @@ const (
// Importer represents the bulk CSV subscriber import system. // Importer represents the bulk CSV subscriber import system.
type Importer struct { type Importer struct {
upsert *sql.Stmt opt Options
blacklist *sql.Stmt db *sql.DB
updateListDate *sql.Stmt
db *sql.DB
notifCB models.AdminNotifCallback
stop chan bool stop chan bool
status Status status Status
sync.RWMutex sync.RWMutex
} }
// Options represents inport options.
type Options struct {
UpsertStmt *sql.Stmt
BlacklistStmt *sql.Stmt
UpdateListDateStmt *sql.Stmt
NotifCB models.AdminNotifCallback
}
// Session represents a single import session. // Session represents a single import session.
type Session struct { type Session struct {
im *Importer im *Importer
subQueue chan SubReq subQueue chan SubReq
log *log.Logger log *log.Logger
mode string mode string
listIDs []int overwrite bool
listIDs []int
} }
// Status reporesents statistics from an ongoing import session. // Status reporesents statistics from an ongoing import session.
@ -98,7 +104,8 @@ var (
// import is already running. // import is already running.
ErrIsImporting = errors.New("import is already running") ErrIsImporting = errors.New("import is already running")
csvHeaders = map[string]bool{"email": true, csvHeaders = map[string]bool{
"email": true,
"name": true, "name": true,
"attributes": true} "attributes": true}
@ -109,23 +116,19 @@ var (
) )
// New returns a new instance of Importer. // New returns a new instance of Importer.
func New(upsert *sql.Stmt, blacklist *sql.Stmt, updateListDate *sql.Stmt, func New(opt Options, db *sql.DB) *Importer {
db *sql.DB, notifCB models.AdminNotifCallback) *Importer {
im := Importer{ im := Importer{
upsert: upsert, opt: opt,
blacklist: blacklist, stop: make(chan bool, 1),
updateListDate: updateListDate, db: db,
stop: make(chan bool, 1), status: Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)},
db: db,
notifCB: notifCB,
status: Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)},
} }
return &im return &im
} }
// NewSession returns an new instance of Session. It takes the name // NewSession returns an new instance of Session. It takes the name
// of the uploaded file, but doesn't do anything with it but retains it for stats. // of the uploaded file, but doesn't do anything with it but retains it for stats.
func (im *Importer) NewSession(fName, mode string, listIDs []int) (*Session, error) { func (im *Importer) NewSession(fName, mode string, overWrite bool, listIDs []int) (*Session, error) {
if im.getStatus() != StatusNone { if im.getStatus() != StatusNone {
return nil, errors.New("an import is already running") return nil, errors.New("an import is already running")
} }
@ -137,11 +140,12 @@ func (im *Importer) NewSession(fName, mode string, listIDs []int) (*Session, err
im.Unlock() im.Unlock()
s := &Session{ s := &Session{
im: im, im: im,
log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime), log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime),
subQueue: make(chan SubReq, commitBatchSize), subQueue: make(chan SubReq, commitBatchSize),
mode: mode, mode: mode,
listIDs: listIDs, overwrite: overWrite,
listIDs: listIDs,
} }
s.log.Printf("processing '%s'", fName) s.log.Printf("processing '%s'", fName)
@ -218,7 +222,7 @@ func (im *Importer) sendNotif(status string) error {
strings.Title(status), strings.Title(status),
s.Name) s.Name)
) )
return im.notifCB(subject, out) return im.opt.NotifCB(subject, out)
} }
// Start is a blocking function that selects on a channel queue until all // Start is a blocking function that selects on a channel queue until all
@ -249,9 +253,9 @@ func (s *Session) Start() {
} }
if s.mode == ModeSubscribe { if s.mode == ModeSubscribe {
stmt = tx.Stmt(s.im.upsert) stmt = tx.Stmt(s.im.opt.UpsertStmt)
} else { } else {
stmt = tx.Stmt(s.im.blacklist) stmt = tx.Stmt(s.im.opt.BlacklistStmt)
} }
} }
@ -263,7 +267,7 @@ func (s *Session) Start() {
} }
if s.mode == ModeSubscribe { if s.mode == ModeSubscribe {
_, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs) _, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs, s.overwrite)
} else if s.mode == ModeBlacklist { } else if s.mode == ModeBlacklist {
_, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs) _, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs)
} }
@ -293,7 +297,7 @@ func (s *Session) Start() {
if cur == 0 { if cur == 0 {
s.im.setStatus(StatusFinished) s.im.setStatus(StatusFinished)
s.log.Printf("imported finished") s.log.Printf("imported finished")
if _, err := s.im.updateListDate.Exec(listIDs); err != nil { if _, err := s.im.opt.UpdateListDateStmt.Exec(listIDs); err != nil {
s.log.Printf("error updating lists date: %v", err) s.log.Printf("error updating lists date: %v", err)
} }
s.im.sendNotif(StatusFinished) s.im.sendNotif(StatusFinished)
@ -312,7 +316,7 @@ func (s *Session) Start() {
s.im.incrementImportCount(cur) s.im.incrementImportCount(cur)
s.im.setStatus(StatusFinished) s.im.setStatus(StatusFinished)
s.log.Printf("imported finished") s.log.Printf("imported finished")
if _, err := s.im.updateListDate.Exec(listIDs); err != nil { if _, err := s.im.opt.UpdateListDateStmt.Exec(listIDs); err != nil {
s.log.Printf("error updating lists date: %v", err) s.log.Printf("error updating lists date: %v", err)
} }
s.im.sendNotif(StatusFinished) s.im.sendNotif(StatusFinished)

View File

@ -73,13 +73,14 @@ SELECT id from sub;
-- name: upsert-subscriber -- name: upsert-subscriber
-- Upserts a subscriber where existing subscribers get their names and attributes overwritten. -- Upserts a subscriber where existing subscribers get their names and attributes overwritten.
-- The status field is only updated when $6 = 'override_status'. -- If $6 = true, update values, otherwise, skip.
WITH sub AS ( WITH sub AS (
INSERT INTO subscribers (uuid, email, name, attribs) INSERT INTO subscribers as s (uuid, email, name, attribs)
VALUES($1, $2, $3, $4) VALUES($1, $2, $3, $4)
ON CONFLICT (email) DO UPDATE ON CONFLICT (email)
SET name=$3, DO UPDATE SET
attribs=$4, name=(CASE WHEN $6 THEN $3 ELSE s.name END),
attribs=(CASE WHEN $6 THEN $4 ELSE s.attribs END),
updated_at=NOW() updated_at=NOW()
RETURNING uuid, id RETURNING uuid, id
), ),