From 61f8fae50d35833f9411041ca9b2a872ff2285f6 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Sun, 5 Jul 2020 21:35:17 +0530 Subject: [PATCH] Add 'overwrite?' option to bulk import. - Fix minor UI inconsitency on import states. - Minor refactor to importer initialisation. --- frontend/src/views/Import.vue | 22 ++++++++---- import.go | 9 ++--- init.go | 18 +++++----- install.go | 4 +-- internal/subimporter/importer.go | 62 +++++++++++++++++--------------- queries.sql | 11 +++--- 6 files changed, 72 insertions(+), 54 deletions(-) diff --git a/frontend/src/views/Import.vue b/frontend/src/views/Import.vue index a2381dd..8879f33 100644 --- a/frontend/src/views/Import.vue +++ b/frontend/src/views/Import.vue @@ -16,7 +16,15 @@ - +
+ +
+ + + - +

@@ -50,12 +56,12 @@

Upload
-
+

Instructions
@@ -145,6 +151,7 @@ export default Vue.extend({ mode: 'subscribe', delim: ',', lists: [], + overwrite: true, file: null, }, @@ -247,6 +254,7 @@ export default Vue.extend({ this.isProcessing = true; this.$api.stopImport().then(() => { this.pollStatus(); + this.form.file = null; }); }, @@ -259,6 +267,7 @@ export default Vue.extend({ mode: this.form.mode, delim: this.form.delim, lists: this.form.lists.map((l) => l.id), + overwrite: this.form.overwrite, })); params.set('file', this.form.file); @@ -275,6 +284,7 @@ export default Vue.extend({ this.pollStatus(); }, () => { this.isProcessing = false; + this.form.file = null; }); }, }, diff --git a/import.go b/import.go index 566486c..8efb7b0 100644 --- a/import.go +++ b/import.go @@ -14,9 +14,10 @@ import ( // reqImport represents file upload import params. type reqImport struct { - Mode string `json:"mode"` - Delim string `json:"delim"` - ListIDs []int `json:"lists"` + Mode string `json:"mode"` + Overwrite bool `json:"overwrite"` + Delim string `json:"delim"` + ListIDs []int `json:"lists"` } // handleImportSubscribers handles the uploading and bulk importing of @@ -71,7 +72,7 @@ func handleImportSubscribers(c echo.Context) error { } // Start the importer session. - impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.ListIDs) + impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.Overwrite, r.ListIDs) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Error starting import session: %v", err)) diff --git a/init.go b/init.go index 4c8fb83..e3a3732 100644 --- a/init.go +++ b/init.go @@ -211,14 +211,16 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager { // initImporter initializes the bulk subscriber importer. func initImporter(q *Queries, db *sqlx.DB, app *App) *subimporter.Importer { - return subimporter.New(q.UpsertSubscriber.Stmt, - q.UpsertBlacklistSubscriber.Stmt, - q.UpdateListsDate.Stmt, - db.DB, - func(subject string, data interface{}) error { - app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data) - return nil - }) + return subimporter.New( + subimporter.Options{ + UpsertStmt: q.UpsertSubscriber.Stmt, + BlacklistStmt: q.UpsertBlacklistSubscriber.Stmt, + UpdateListDateStmt: q.UpdateListsDate.Stmt, + NotifCB: func(subject string, data interface{}) error { + app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data) + return nil + }, + }, db.DB) } // initMessengers initializes various messenger backends. diff --git a/install.go b/install.go index 7ce85f9..53527b1 100644 --- a/install.go +++ b/install.go @@ -82,7 +82,7 @@ func install(db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) { "John Doe", `{"type": "known", "good": true, "city": "Bengaluru"}`, pq.Int64Array{int64(defList)}, - ); err != nil { + true); err != nil { lo.Fatalf("Error creating subscriber: %v", err) } if _, err := q.UpsertSubscriber.Exec( @@ -91,7 +91,7 @@ func install(db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) { "Anon Doe", `{"type": "unknown", "good": true, "city": "Bengaluru"}`, pq.Int64Array{int64(optinList)}, - ); err != nil { + true); err != nil { lo.Fatalf("Error creating subscriber: %v", err) } diff --git a/internal/subimporter/importer.go b/internal/subimporter/importer.go index 1ff7816..33f4184 100644 --- a/internal/subimporter/importer.go +++ b/internal/subimporter/importer.go @@ -49,25 +49,31 @@ const ( // Importer represents the bulk CSV subscriber import system. type Importer struct { - upsert *sql.Stmt - blacklist *sql.Stmt - updateListDate *sql.Stmt - db *sql.DB - notifCB models.AdminNotifCallback + opt Options + db *sql.DB stop chan bool status Status sync.RWMutex } +// Options represents inport options. +type Options struct { + UpsertStmt *sql.Stmt + BlacklistStmt *sql.Stmt + UpdateListDateStmt *sql.Stmt + NotifCB models.AdminNotifCallback +} + // Session represents a single import session. type Session struct { im *Importer subQueue chan SubReq log *log.Logger - mode string - listIDs []int + mode string + overwrite bool + listIDs []int } // Status reporesents statistics from an ongoing import session. @@ -98,7 +104,8 @@ var ( // import is already running. ErrIsImporting = errors.New("import is already running") - csvHeaders = map[string]bool{"email": true, + csvHeaders = map[string]bool{ + "email": true, "name": true, "attributes": true} @@ -109,23 +116,19 @@ var ( ) // New returns a new instance of Importer. -func New(upsert *sql.Stmt, blacklist *sql.Stmt, updateListDate *sql.Stmt, - db *sql.DB, notifCB models.AdminNotifCallback) *Importer { +func New(opt Options, db *sql.DB) *Importer { im := Importer{ - upsert: upsert, - blacklist: blacklist, - updateListDate: updateListDate, - stop: make(chan bool, 1), - db: db, - notifCB: notifCB, - status: Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)}, + opt: opt, + stop: make(chan bool, 1), + db: db, + status: Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)}, } return &im } // NewSession returns an new instance of Session. It takes the name // of the uploaded file, but doesn't do anything with it but retains it for stats. -func (im *Importer) NewSession(fName, mode string, listIDs []int) (*Session, error) { +func (im *Importer) NewSession(fName, mode string, overWrite bool, listIDs []int) (*Session, error) { if im.getStatus() != StatusNone { return nil, errors.New("an import is already running") } @@ -137,11 +140,12 @@ func (im *Importer) NewSession(fName, mode string, listIDs []int) (*Session, err im.Unlock() s := &Session{ - im: im, - log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime), - subQueue: make(chan SubReq, commitBatchSize), - mode: mode, - listIDs: listIDs, + im: im, + log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime), + subQueue: make(chan SubReq, commitBatchSize), + mode: mode, + overwrite: overWrite, + listIDs: listIDs, } s.log.Printf("processing '%s'", fName) @@ -218,7 +222,7 @@ func (im *Importer) sendNotif(status string) error { strings.Title(status), s.Name) ) - return im.notifCB(subject, out) + return im.opt.NotifCB(subject, out) } // Start is a blocking function that selects on a channel queue until all @@ -249,9 +253,9 @@ func (s *Session) Start() { } if s.mode == ModeSubscribe { - stmt = tx.Stmt(s.im.upsert) + stmt = tx.Stmt(s.im.opt.UpsertStmt) } else { - stmt = tx.Stmt(s.im.blacklist) + stmt = tx.Stmt(s.im.opt.BlacklistStmt) } } @@ -263,7 +267,7 @@ func (s *Session) Start() { } if s.mode == ModeSubscribe { - _, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs) + _, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs, s.overwrite) } else if s.mode == ModeBlacklist { _, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs) } @@ -293,7 +297,7 @@ func (s *Session) Start() { if cur == 0 { s.im.setStatus(StatusFinished) s.log.Printf("imported finished") - if _, err := s.im.updateListDate.Exec(listIDs); err != nil { + if _, err := s.im.opt.UpdateListDateStmt.Exec(listIDs); err != nil { s.log.Printf("error updating lists date: %v", err) } s.im.sendNotif(StatusFinished) @@ -312,7 +316,7 @@ func (s *Session) Start() { s.im.incrementImportCount(cur) s.im.setStatus(StatusFinished) s.log.Printf("imported finished") - if _, err := s.im.updateListDate.Exec(listIDs); err != nil { + if _, err := s.im.opt.UpdateListDateStmt.Exec(listIDs); err != nil { s.log.Printf("error updating lists date: %v", err) } s.im.sendNotif(StatusFinished) diff --git a/queries.sql b/queries.sql index 6542525..c1e2820 100644 --- a/queries.sql +++ b/queries.sql @@ -73,13 +73,14 @@ SELECT id from sub; -- name: upsert-subscriber -- Upserts a subscriber where existing subscribers get their names and attributes overwritten. --- The status field is only updated when $6 = 'override_status'. +-- If $6 = true, update values, otherwise, skip. WITH sub AS ( - INSERT INTO subscribers (uuid, email, name, attribs) + INSERT INTO subscribers as s (uuid, email, name, attribs) VALUES($1, $2, $3, $4) - ON CONFLICT (email) DO UPDATE - SET name=$3, - attribs=$4, + ON CONFLICT (email) + DO UPDATE SET + name=(CASE WHEN $6 THEN $3 ELSE s.name END), + attribs=(CASE WHEN $6 THEN $4 ELSE s.attribs END), updated_at=NOW() RETURNING uuid, id ),