Skip to content

Commit

Permalink
Download files in monitor
Browse files Browse the repository at this point in the history
This change tried to address potential race conditions between Athena
and the file mover. The idea is to download new sos reports in the
monitor and park them in a common filesystem. The processor eventually
proceeds with processing the files from the common filesystem.

Signed-off-by: Nicolas Bock <nicolas.bock@canonical.com>
  • Loading branch information
nicolasbock committed Oct 4, 2023
1 parent a6485f6 commit a517209
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/common/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ func (fc *FilesComClient) GetFiles(dirs []string) ([]db.File, error) {
}

func (fc *FilesComClient) Download(toDownload *db.File, downloadPath string) (*files_sdk.File, error) {
return nil, nil
return &files_sdk.File{}, nil
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
PollEvery string `yaml:"poll-every" default:"5"`
FilesDelta string `yaml:"files-delta" default:"10m"`
Filetypes []string `yaml:"filetypes"`
BaseTmpDir string `yaml:"base-tmpdir" default:""`
Directories []string `yaml:"directories"`
ProcessorMap []struct {
Type string `yaml:"type"`
Expand Down
26 changes: 23 additions & 3 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package monitor
import (
"context"
"fmt"
"os"
"regexp"
"sync"
"time"

"github.com/canonical/athena-core/pkg/common"
"github.com/canonical/athena-core/pkg/common/db"
"github.com/canonical/athena-core/pkg/config"
Expand All @@ -11,9 +16,6 @@ import (
"github.com/lileio/pubsub/v2"
"github.com/lileio/pubsub/v2/middleware/defaults"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"time"
)

type Monitor struct {
Expand Down Expand Up @@ -148,6 +150,24 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
log.Infof("File %s already dispatched, skipping", file.Path)
continue
}
log.Infof("Downloading file %s to shared folder", file.Path)
basePath := m.Config.Monitor.BaseTmpDir
if basePath == "" {
basePath = "/tmp"
}
log.Debugf("Using temporary base path: %s", basePath)
fileEntry, err := m.FilesClient.Download(&file, basePath)
if err != nil {
log.Errorf("Failed to download %s: %s", file.Path, err)
}
log.Infof("Downloaded file to %s", fileEntry.Path)
if _, err := os.Stat(basePath); os.IsNotExist(err) {
log.Debugf("Temporary base path '%s' doesn't exist - creating", basePath)
if err = os.MkdirAll(basePath, 0755); err != nil {
log.Errorf("Cannot create temporary base path: %s", err.Error())
}
}

log.Infof("Sending file: %s to processor: %s", file.Path, processor)
publishResults := pubsub.PublishJSON(*ctx, processor, file)
if publishResults.Err != nil {
Expand Down
17 changes: 9 additions & 8 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
return nil, err
}

fileEntry, err := fc.Download(file, dir)
if err != nil {
return nil, err
}

reportRunner.Config = cfg
reportRunner.Subscriber = subscriber
reportRunner.Name = name
Expand All @@ -262,9 +257,9 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl

//TODO: document the template variables
tplContext := pongo2.Context{
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": fileEntry, // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(fileEntry.Path)), // directory where the file lives on
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": filepath.Base(file.Path), // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(file.Path)), // directory where the file lives on
}

var scripts = make(map[string]string)
Expand Down Expand Up @@ -300,6 +295,12 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
scripts[scriptName] = fd.Name()
}

log.Infof("Removing previously downloaded file: %s", filepath.Base(file.Path))
err = os.Remove(path.Join(basePath, filepath.Base(file.Path)))
if err != nil {
log.Errorf("Could not remove %s: %s", filepath.Base(file.Path), err.Error())
}

timeout, err := time.ParseDuration(report.Timeout)
if err != nil {
timeout, _ = time.ParseDuration(DefaultExecutionTimeout)
Expand Down

0 comments on commit a517209

Please sign in to comment.