Skip to content

Commit

Permalink
Merge pull request #89 from nicolasbock/races
Browse files Browse the repository at this point in the history
Download files in monitor
  • Loading branch information
nicolasbock authored Oct 9, 2023
2 parents ba0e530 + 39bef5e commit 9f2e9df
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/common/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ func (fc *FilesComClient) GetFiles(dirs []string) ([]db.File, error) {
}

func (fc *FilesComClient) Download(toDownload *db.File, downloadPath string) (*files_sdk.File, error) {
return nil, nil
return &files_sdk.File{}, nil
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
PollEvery string `yaml:"poll-every" default:"5"`
FilesDelta string `yaml:"files-delta" default:"10m"`
Filetypes []string `yaml:"filetypes"`
BaseTmpDir string `yaml:"base-tmpdir" default:""`
Directories []string `yaml:"directories"`
ProcessorMap []struct {
Type string `yaml:"type"`
Expand Down
26 changes: 23 additions & 3 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package monitor
import (
"context"
"fmt"
"os"
"regexp"
"sync"
"time"

"github.com/canonical/athena-core/pkg/common"
"github.com/canonical/athena-core/pkg/common/db"
"github.com/canonical/athena-core/pkg/config"
Expand All @@ -11,9 +16,6 @@ import (
"github.com/lileio/pubsub/v2"
"github.com/lileio/pubsub/v2/middleware/defaults"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"time"
)

type Monitor struct {
Expand Down Expand Up @@ -148,6 +150,24 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
log.Infof("File %s already dispatched, skipping", file.Path)
continue
}
log.Infof("Downloading file %s to shared folder", file.Path)
basePath := m.Config.Monitor.BaseTmpDir
if basePath == "" {
basePath = "/tmp"
}
if _, err := os.Stat(basePath); os.IsNotExist(err) {
log.Debugf("Temporary base path '%s' doesn't exist - creating", basePath)
if err = os.MkdirAll(basePath, 0755); err != nil {
log.Errorf("Cannot create temporary base path: %s", err.Error())
}
}
log.Debugf("Using temporary base path: %s", basePath)
fileEntry, err := m.FilesClient.Download(&file, basePath)
if err != nil {
log.Errorf("Failed to download %s: %s", file.Path, err)
}
log.Infof("Downloaded %s", fileEntry.Path)

log.Infof("Sending file: %s to processor: %s", file.Path, processor)
publishResults := pubsub.PublishJSON(*ctx, processor, file)
if publishResults.Err != nil {
Expand Down
18 changes: 10 additions & 8 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
return nil, err
}

fileEntry, err := fc.Download(file, dir)
if err != nil {
return nil, err
}

reportRunner.Config = cfg
reportRunner.Subscriber = subscriber
reportRunner.Name = name
Expand All @@ -262,9 +257,9 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl

//TODO: document the template variables
tplContext := pongo2.Context{
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": fileEntry, // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(fileEntry.Path)), // directory where the file lives on
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": filepath.Base(file.Path), // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(file.Path)), // directory where the file lives on
}

var scripts = make(map[string]string)
Expand Down Expand Up @@ -300,6 +295,12 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
scripts[scriptName] = fd.Name()
}

log.Infof("Removing previously downloaded file: %s", filepath.Base(file.Path))
err = os.Remove(path.Join(basePath, filepath.Base(file.Path)))
if err != nil {
log.Errorf("Could not remove %s: %s", filepath.Base(file.Path), err.Error())
}

timeout, err := time.ParseDuration(report.Timeout)
if err != nil {
timeout, _ = time.ParseDuration(DefaultExecutionTimeout)
Expand All @@ -311,6 +312,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
reportToExecute.Subscriber = reportRunner.Subscriber
reportToExecute.Name = reportName
reportToExecute.File = file
reportToExecute.FileName = file.Path
reportToExecute.Scripts = scripts
reportRunner.Reports = append(reportRunner.Reports, reportToExecute)
}
Expand Down

0 comments on commit 9f2e9df

Please sign in to comment.