Skip to content

Commit

Permalink
Download files in monitor
Browse files Browse the repository at this point in the history
This change tried to address potential race conditions between Athena
and the file mover. The idea is to download new sos reports in the
monitor and park them in a common filesystem. The processor eventually
proceeds with processing the files from the common filesystem.

Signed-off-by: Nicolas Bock <nicolas.bock@canonical.com>
  • Loading branch information
nicolasbock committed Oct 4, 2023
1 parent a6485f6 commit 884cd27
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
PollEvery string `yaml:"poll-every" default:"5"`
FilesDelta string `yaml:"files-delta" default:"10m"`
Filetypes []string `yaml:"filetypes"`
BaseTmpDir string `yaml:"base-tmpdir" default:""`
Directories []string `yaml:"directories"`
ProcessorMap []struct {
Type string `yaml:"type"`
Expand Down
26 changes: 23 additions & 3 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package monitor
import (
"context"
"fmt"
"os"
"regexp"
"sync"
"time"

"github.com/canonical/athena-core/pkg/common"
"github.com/canonical/athena-core/pkg/common/db"
"github.com/canonical/athena-core/pkg/config"
Expand All @@ -11,9 +16,6 @@ import (
"github.com/lileio/pubsub/v2"
"github.com/lileio/pubsub/v2/middleware/defaults"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"time"
)

type Monitor struct {
Expand Down Expand Up @@ -148,6 +150,24 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
log.Infof("File %s already dispatched, skipping", file.Path)
continue
}
log.Infof("Downloading file %s to shared folder", file.Path)
basePath := m.Config.Monitor.BaseTmpDir
if basePath == "" {
basePath = "/tmp"
}
log.Debugf("Using temporary base path: %s", basePath)
fileEntry, err := m.FilesClient.Download(&file, basePath)
if err != nil {
log.Errorf("Failed to download %s: %s", file.Path, err)
}
log.Infof("Downloaded file to %s", fileEntry.Path)
if _, err := os.Stat(basePath); os.IsNotExist(err) {
log.Debugf("Temporary base path '%s' doesn't exist - creating", basePath)
if err = os.MkdirAll(basePath, 0755); err != nil {
log.Errorf("Cannot create temporary base path: %s", err.Error())
}
}

log.Infof("Sending file: %s to processor: %s", file.Path, processor)
publishResults := pubsub.PublishJSON(*ctx, processor, file)
if publishResults.Err != nil {
Expand Down
11 changes: 3 additions & 8 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
return nil, err
}

fileEntry, err := fc.Download(file, dir)
if err != nil {
return nil, err
}

reportRunner.Config = cfg
reportRunner.Subscriber = subscriber
reportRunner.Name = name
Expand All @@ -262,9 +257,9 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl

//TODO: document the template variables
tplContext := pongo2.Context{
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": fileEntry, // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(fileEntry.Path)), // directory where the file lives on
"basedir": reportRunner.Basedir, // base dir used to generate reports
"file": filepath.Base(file.Path), // file entry as returned by the files.com api client
"filepath": path.Join(reportRunner.Basedir, filepath.Base(file.Path)), // directory where the file lives on
}

var scripts = make(map[string]string)
Expand Down

0 comments on commit 884cd27

Please sign in to comment.