Refactor and fix importer state bugs
- Fixed invalid file uploads leaving the importer in a hanging state without exiting - Make the import UI always show the upload status page so that error logs are visible
This commit is contained in:
parent
cfec13c589
commit
3bf405fc1c
|
@ -79,6 +79,7 @@ class TheFormDef extends React.PureComponent {
|
||||||
message: "Error",
|
message: "Error",
|
||||||
description: e.message
|
description: e.message
|
||||||
})
|
})
|
||||||
|
this.props.fetchimportState()
|
||||||
this.setState({ formLoading: false })
|
this.setState({ formLoading: false })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -289,7 +290,7 @@ class Importing extends React.PureComponent {
|
||||||
)}
|
)}
|
||||||
|
|
||||||
<Row className="import-container">
|
<Row className="import-container">
|
||||||
<Col span="10" offset="3">
|
<Col span={10} offset={3}>
|
||||||
<div className="stats center">
|
<div className="stats center">
|
||||||
<div>
|
<div>
|
||||||
<Progress type="line" percent={progressPercent} />
|
<Progress type="line" percent={progressPercent} />
|
||||||
|
|
|
@ -90,10 +90,7 @@ func handleImportSubscribers(c echo.Context) error {
|
||||||
dir, files, err := impSess.ExtractZIP(out.Name(), 1)
|
dir, files, err := impSess.ExtractZIP(out.Name(), 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return echo.NewHTTPError(http.StatusInternalServerError,
|
return echo.NewHTTPError(http.StatusInternalServerError,
|
||||||
fmt.Sprintf("Error extracting ZIP file: %v", err))
|
fmt.Sprintf("Error processing ZIP file: %v", err))
|
||||||
} else if len(files) == 0 {
|
|
||||||
return echo.NewHTTPError(http.StatusBadRequest,
|
|
||||||
"No CSV files found to import.")
|
|
||||||
}
|
}
|
||||||
go impSess.LoadCSV(dir+"/"+files[0], rune(r.Delim[0]))
|
go impSess.LoadCSV(dir+"/"+files[0], rune(r.Delim[0]))
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,9 +59,8 @@ type Importer struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
notifCB models.AdminNotifCallback
|
notifCB models.AdminNotifCallback
|
||||||
|
|
||||||
isImporting bool
|
stop chan bool
|
||||||
stop chan bool
|
status Status
|
||||||
status *Status
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +107,7 @@ func New(upsert *sql.Stmt, blacklist *sql.Stmt, db *sql.DB, notifCB models.Admin
|
||||||
stop: make(chan bool, 1),
|
stop: make(chan bool, 1),
|
||||||
db: db,
|
db: db,
|
||||||
notifCB: notifCB,
|
notifCB: notifCB,
|
||||||
status: &Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)},
|
status: Status{Status: StatusNone, logBuf: bytes.NewBuffer(nil)},
|
||||||
}
|
}
|
||||||
|
|
||||||
return &im
|
return &im
|
||||||
|
@ -122,7 +121,7 @@ func (im *Importer) NewSession(fName, mode string, listIDs []int) (*Session, err
|
||||||
}
|
}
|
||||||
|
|
||||||
im.Lock()
|
im.Lock()
|
||||||
im.status = &Status{Status: StatusImporting,
|
im.status = Status{Status: StatusImporting,
|
||||||
Name: fName,
|
Name: fName,
|
||||||
logBuf: bytes.NewBuffer(nil)}
|
logBuf: bytes.NewBuffer(nil)}
|
||||||
im.Unlock()
|
im.Unlock()
|
||||||
|
@ -170,7 +169,7 @@ func (im *Importer) setStatus(status string) {
|
||||||
im.Unlock()
|
im.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// setStatus get's the Importer's status.
|
// getStatus get's the Importer's status.
|
||||||
func (im *Importer) getStatus() string {
|
func (im *Importer) getStatus() string {
|
||||||
im.RLock()
|
im.RLock()
|
||||||
status := im.status.Status
|
status := im.status.Status
|
||||||
|
@ -179,6 +178,18 @@ func (im *Importer) getStatus() string {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isDone returns true if the importer is working (importing|stopping).
|
||||||
|
func (im *Importer) isDone() bool {
|
||||||
|
s := true
|
||||||
|
im.RLock()
|
||||||
|
if im.getStatus() == StatusImporting || im.getStatus() == StatusStopping {
|
||||||
|
s = false
|
||||||
|
}
|
||||||
|
im.RUnlock()
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// incrementImportCount sets the Importer's "imported" counter.
|
// incrementImportCount sets the Importer's "imported" counter.
|
||||||
func (im *Importer) incrementImportCount(n int) {
|
func (im *Importer) incrementImportCount(n int) {
|
||||||
im.Lock()
|
im.Lock()
|
||||||
|
@ -290,14 +301,26 @@ func (s *Session) Start() {
|
||||||
s.im.sendNotif(StatusFinished)
|
s.im.sendNotif(StatusFinished)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop stops an active import session.
|
||||||
|
func (s *Session) Stop() {
|
||||||
|
close(s.subQueue)
|
||||||
|
}
|
||||||
|
|
||||||
// ExtractZIP takes a ZIP file's path and extracts all .csv files in it to
|
// ExtractZIP takes a ZIP file's path and extracts all .csv files in it to
|
||||||
// a temporary directory, and returns the name of the temp directory and the
|
// a temporary directory, and returns the name of the temp directory and the
|
||||||
// list of extracted .csv files.
|
// list of extracted .csv files.
|
||||||
func (s *Session) ExtractZIP(srcPath string, maxCSVs int) (string, []string, error) {
|
func (s *Session) ExtractZIP(srcPath string, maxCSVs int) (string, []string, error) {
|
||||||
if s.im.isImporting {
|
if s.im.isDone() {
|
||||||
return "", nil, ErrIsImporting
|
return "", nil, ErrIsImporting
|
||||||
}
|
}
|
||||||
|
|
||||||
|
failed := true
|
||||||
|
defer func() {
|
||||||
|
if failed {
|
||||||
|
s.im.setStatus(StatusFailed)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
z, err := zip.OpenReader(srcPath)
|
z, err := zip.OpenReader(srcPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return "", nil, err
|
||||||
|
@ -355,12 +378,18 @@ func (s *Session) ExtractZIP(srcPath string, maxCSVs int) (string, []string, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(files) == 0 {
|
||||||
|
s.log.Println("no CSV files found in the ZIP")
|
||||||
|
return "", nil, errors.New("no CSV files found in the ZIP")
|
||||||
|
}
|
||||||
|
|
||||||
|
failed = false
|
||||||
return dir, files, nil
|
return dir, files, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadCSV loads a CSV file and validates and imports the subscriber entries in it.
|
// LoadCSV loads a CSV file and validates and imports the subscriber entries in it.
|
||||||
func (s *Session) LoadCSV(srcPath string, delim rune) error {
|
func (s *Session) LoadCSV(srcPath string, delim rune) error {
|
||||||
if s.im.isImporting {
|
if s.im.isDone() {
|
||||||
return ErrIsImporting
|
return ErrIsImporting
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -488,11 +517,11 @@ func (s *Session) LoadCSV(srcPath string, delim rune) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop sends a signal to stop all existing imports.
|
// Stop sends a signal to stop the existing import.
|
||||||
func (im *Importer) Stop() {
|
func (im *Importer) Stop() {
|
||||||
if im.getStatus() != StatusImporting {
|
if im.getStatus() != StatusImporting {
|
||||||
im.Lock()
|
im.Lock()
|
||||||
im.status = &Status{Status: StatusNone}
|
im.status = Status{Status: StatusNone}
|
||||||
im.Unlock()
|
im.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -526,10 +555,10 @@ func (s *Session) mapCSVHeaders(csvHdrs []string, knownHdrs map[string]bool) map
|
||||||
// ValidateFields validates incoming subscriber field values.
|
// ValidateFields validates incoming subscriber field values.
|
||||||
func ValidateFields(s SubReq) error {
|
func ValidateFields(s SubReq) error {
|
||||||
if !govalidator.IsEmail(s.Email) {
|
if !govalidator.IsEmail(s.Email) {
|
||||||
return errors.New("invalid `email`")
|
return errors.New(`invalid email "` + s.Email + `"`)
|
||||||
}
|
}
|
||||||
if !govalidator.IsByteLength(s.Name, 1, stdInputMaxLen) {
|
if !govalidator.IsByteLength(s.Name, 1, stdInputMaxLen) {
|
||||||
return errors.New("invalid or empty `name`")
|
return errors.New(`invalid or empty name "` + s.Name + `"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue