From 0163cf985f616ac79c22565653c82427ca561881 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Tue, 30 Oct 2018 18:17:26 +0530 Subject: [PATCH] Fixed bugs in importer - Added missing per line validation and error reporting using SubReq - Rollback and die if there's an error in the insert statement in the importer batch loop. Without this, connections pile up. --- queries.sql | 20 ++++++++++++++++++-- subimporter/importer.go | 34 +++++++++++++++++++++------------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/queries.sql b/queries.sql index 74197f4..d34d2a5 100644 --- a/queries.sql +++ b/queries.sql @@ -44,8 +44,8 @@ SELECT COUNT(subscribers.id) as num FROM subscribers INNER JOIN subscriber_lists -- value is overwritten with the incoming value. This is used for insertions and bulk imports. WITH s AS ( INSERT INTO subscribers (uuid, email, name, status, attribs) - VALUES($1, $2, $3, $4, $5) ON CONFLICT (email) DO UPDATE - SET name=$3, status=(CASE WHEN $6 = true THEN $4 ELSE subscribers.status END), + VALUES($1, $2, $3, (CASE WHEN $4 != '' THEN $4::subscriber_status ELSE 'enabled' END), $5) ON CONFLICT (email) DO UPDATE + SET name=$3, status=(CASE WHEN $6 IS TRUE THEN $4::subscriber_status ELSE subscribers.status END), attribs=$5, updated_at=NOW() RETURNING id ) INSERT INTO subscriber_lists (subscriber_id, list_id) @@ -349,6 +349,22 @@ SELECT * FROM media ORDER BY created_at DESC; -- name: delete-media DELETE FROM media WHERE id=$1 RETURNING filename; +-- links +-- name: create-link +INSERT INTO links (uuid, url) VALUES($1, $2) ON CONFLICT (url) DO UPDATE SET url=EXCLUDED.url RETURNING uuid; + +-- name: register-link-click +WITH link AS ( + SELECT url, links.id AS link_id, campaigns.id as campaign_id, subscribers.id AS subscriber_id FROM links + LEFT JOIN campaigns ON (campaigns.uuid = $1) + LEFT JOIN subscribers ON (subscribers.uuid = $2) + WHERE links.uuid = $3 +) +INSERT INTO link_clicks (campaign_id, subscriber_id, link_id) + VALUES((SELECT campaign_id FROM link), (SELECT subscriber_id FROM link), (SELECT link_id FROM link)) + RETURNING (SELECT url FROM link); + + -- -- name: get-stats -- WITH lists AS ( -- SELECT type, COUNT(id) AS num FROM lists GROUP BY type diff --git a/subimporter/importer.go b/subimporter/importer.go index a63c4b5..d1de6b7 100644 --- a/subimporter/importer.go +++ b/subimporter/importer.go @@ -62,7 +62,7 @@ type Importer struct { // Session represents a single import session. type Session struct { im *Importer - subQueue chan models.Subscriber + subQueue chan SubReq log *log.Logger overrideStatus bool @@ -123,7 +123,7 @@ func (im *Importer) NewSession(fName string, overrideStatus bool, listIDs []int) s := &Session{ im: im, log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime), - subQueue: make(chan models.Subscriber, commitBatchSize), + subQueue: make(chan SubReq, commitBatchSize), overrideStatus: overrideStatus, listIDs: listIDs, } @@ -219,7 +219,8 @@ func (s *Session) Start() { listIDs) if err != nil { s.log.Printf("error executing insert: %v", err) - continue + tx.Rollback() + break } cur++ total++ @@ -383,9 +384,11 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error { var ( lnHdr = len(hdrKeys) - i = 1 + i = 0 ) for { + i++ + // Check for the stop signal. select { case <-s.im.stop: @@ -413,17 +416,24 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error { // Iterate the key map and based on the indices mapped earlier, // form a map of key: csv_value, eg: email: user@user.com. row := make(map[string]string, lnCols) - for key := range csvHeaders { + for key := range hdrKeys { row[key] = cols[hdrKeys[key]] } + sub := SubReq{} // Lowercase to ensure uniqueness in the DB. - row["email"] = strings.ToLower(strings.TrimSpace(row["email"])) + sub.Email = strings.ToLower(strings.TrimSpace(row["email"])) + sub.Name = row["name"] - sub := models.Subscriber{ - Email: row["email"], - Name: row["name"], - Status: row["status"], + if _, ok := row["status"]; ok { + sub.Status = row["status"] + } else { + sub.Status = models.SubscriberStatusEnabled + } + + if err := ValidateFields(sub); err != nil { + s.log.Printf("skipping line %d: %v", i, err) + continue } // JSON attributes. @@ -441,8 +451,6 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error { // Send the subscriber to the queue. s.subQueue <- sub - - i++ } close(s.subQueue) @@ -492,7 +500,7 @@ func ValidateFields(s SubReq) error { return errors.New("invalid `email`") } if !govalidator.IsByteLength(s.Name, 1, stdInputMaxLen) { - return errors.New("invalid length for `name`") + return errors.New("invalid or empty `name`") } if s.Status != SubscriberStatusEnabled && s.Status != SubscriberStatusDisabled && s.Status != SubscriberStatusBlacklisted {