Fixed bugs in importer

- Added missing per line validation and error reporting using SubReq
- Rollback and die if there's an error in the insert statement in the importer batch loop. Without this, connections pile up.
This commit is contained in:
Kailash Nadh 2018-10-30 18:17:26 +05:30
parent 52f8217b77
commit 0163cf985f
2 changed files with 39 additions and 15 deletions

View File

@ -44,8 +44,8 @@ SELECT COUNT(subscribers.id) as num FROM subscribers INNER JOIN subscriber_lists
-- value is overwritten with the incoming value. This is used for insertions and bulk imports. -- value is overwritten with the incoming value. This is used for insertions and bulk imports.
WITH s AS ( WITH s AS (
INSERT INTO subscribers (uuid, email, name, status, attribs) INSERT INTO subscribers (uuid, email, name, status, attribs)
VALUES($1, $2, $3, $4, $5) ON CONFLICT (email) DO UPDATE VALUES($1, $2, $3, (CASE WHEN $4 != '' THEN $4::subscriber_status ELSE 'enabled' END), $5) ON CONFLICT (email) DO UPDATE
SET name=$3, status=(CASE WHEN $6 = true THEN $4 ELSE subscribers.status END), SET name=$3, status=(CASE WHEN $6 IS TRUE THEN $4::subscriber_status ELSE subscribers.status END),
attribs=$5, updated_at=NOW() attribs=$5, updated_at=NOW()
RETURNING id RETURNING id
) INSERT INTO subscriber_lists (subscriber_id, list_id) ) INSERT INTO subscriber_lists (subscriber_id, list_id)
@ -349,6 +349,22 @@ SELECT * FROM media ORDER BY created_at DESC;
-- name: delete-media -- name: delete-media
DELETE FROM media WHERE id=$1 RETURNING filename; DELETE FROM media WHERE id=$1 RETURNING filename;
-- links
-- name: create-link
INSERT INTO links (uuid, url) VALUES($1, $2) ON CONFLICT (url) DO UPDATE SET url=EXCLUDED.url RETURNING uuid;
-- name: register-link-click
WITH link AS (
SELECT url, links.id AS link_id, campaigns.id as campaign_id, subscribers.id AS subscriber_id FROM links
LEFT JOIN campaigns ON (campaigns.uuid = $1)
LEFT JOIN subscribers ON (subscribers.uuid = $2)
WHERE links.uuid = $3
)
INSERT INTO link_clicks (campaign_id, subscriber_id, link_id)
VALUES((SELECT campaign_id FROM link), (SELECT subscriber_id FROM link), (SELECT link_id FROM link))
RETURNING (SELECT url FROM link);
-- -- name: get-stats -- -- name: get-stats
-- WITH lists AS ( -- WITH lists AS (
-- SELECT type, COUNT(id) AS num FROM lists GROUP BY type -- SELECT type, COUNT(id) AS num FROM lists GROUP BY type

View File

@ -62,7 +62,7 @@ type Importer struct {
// Session represents a single import session. // Session represents a single import session.
type Session struct { type Session struct {
im *Importer im *Importer
subQueue chan models.Subscriber subQueue chan SubReq
log *log.Logger log *log.Logger
overrideStatus bool overrideStatus bool
@ -123,7 +123,7 @@ func (im *Importer) NewSession(fName string, overrideStatus bool, listIDs []int)
s := &Session{ s := &Session{
im: im, im: im,
log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime), log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime),
subQueue: make(chan models.Subscriber, commitBatchSize), subQueue: make(chan SubReq, commitBatchSize),
overrideStatus: overrideStatus, overrideStatus: overrideStatus,
listIDs: listIDs, listIDs: listIDs,
} }
@ -219,7 +219,8 @@ func (s *Session) Start() {
listIDs) listIDs)
if err != nil { if err != nil {
s.log.Printf("error executing insert: %v", err) s.log.Printf("error executing insert: %v", err)
continue tx.Rollback()
break
} }
cur++ cur++
total++ total++
@ -383,9 +384,11 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error {
var ( var (
lnHdr = len(hdrKeys) lnHdr = len(hdrKeys)
i = 1 i = 0
) )
for { for {
i++
// Check for the stop signal. // Check for the stop signal.
select { select {
case <-s.im.stop: case <-s.im.stop:
@ -413,17 +416,24 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error {
// Iterate the key map and based on the indices mapped earlier, // Iterate the key map and based on the indices mapped earlier,
// form a map of key: csv_value, eg: email: user@user.com. // form a map of key: csv_value, eg: email: user@user.com.
row := make(map[string]string, lnCols) row := make(map[string]string, lnCols)
for key := range csvHeaders { for key := range hdrKeys {
row[key] = cols[hdrKeys[key]] row[key] = cols[hdrKeys[key]]
} }
sub := SubReq{}
// Lowercase to ensure uniqueness in the DB. // Lowercase to ensure uniqueness in the DB.
row["email"] = strings.ToLower(strings.TrimSpace(row["email"])) sub.Email = strings.ToLower(strings.TrimSpace(row["email"]))
sub.Name = row["name"]
sub := models.Subscriber{ if _, ok := row["status"]; ok {
Email: row["email"], sub.Status = row["status"]
Name: row["name"], } else {
Status: row["status"], sub.Status = models.SubscriberStatusEnabled
}
if err := ValidateFields(sub); err != nil {
s.log.Printf("skipping line %d: %v", i, err)
continue
} }
// JSON attributes. // JSON attributes.
@ -441,8 +451,6 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error {
// Send the subscriber to the queue. // Send the subscriber to the queue.
s.subQueue <- sub s.subQueue <- sub
i++
} }
close(s.subQueue) close(s.subQueue)
@ -492,7 +500,7 @@ func ValidateFields(s SubReq) error {
return errors.New("invalid `email`") return errors.New("invalid `email`")
} }
if !govalidator.IsByteLength(s.Name, 1, stdInputMaxLen) { if !govalidator.IsByteLength(s.Name, 1, stdInputMaxLen) {
return errors.New("invalid length for `name`") return errors.New("invalid or empty `name`")
} }
if s.Status != SubscriberStatusEnabled && s.Status != SubscriberStatusDisabled && if s.Status != SubscriberStatusEnabled && s.Status != SubscriberStatusDisabled &&
s.Status != SubscriberStatusBlacklisted { s.Status != SubscriberStatusBlacklisted {