This repository has been archived by the owner on Nov 7, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaccept_tasks.go
126 lines (110 loc) · 3.17 KB
/
accept_tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"fmt"
"time"
"github.com/datatogether/task_mgmt/taskdefs/gist"
"github.com/datatogether/task_mgmt/taskdefs/ipfs"
"github.com/datatogether/task_mgmt/taskdefs/kiwix"
"github.com/datatogether/task_mgmt/taskdefs/pod"
"github.com/datatogether/task_mgmt/taskdefs/sciencebase"
"github.com/datatogether/task_mgmt/tasks"
"github.com/streadway/amqp"
)
func configureTasks() {
tasks.RegisterTaskdef("ipfs.addurl", ipfs.NewTaskAdd)
tasks.RegisterTaskdef("ipfs.addcollection", ipfs.NewAddCollection)
tasks.RegisterTaskdef("kiwix.updateSources", kiwix.NewTaskUpdateSources)
tasks.RegisterTaskdef("pod.addcatalog", pod.NewAddCatalog)
tasks.RegisterTaskdef("sb.addCatalogTree", sciencebase.NewAddCatalogTree)
tasks.RegisterTaskdef("gist.createCollection", gist.NewCollectionFromGist)
// Must set api server url to make ipfs tasks work
ipfs.IpfsApiServerUrl = cfg.IpfsApiUrl
pod.IpfsApiServerUrl = cfg.IpfsApiUrl
sciencebase.IpfsApiServerUrl = cfg.IpfsApiUrl
}
// start accepting tasks from the queue, if setup doesn't error,
// it returns a stop channel writing to stop will teardown the
// func and stop accepting tasks
func acceptTasks() (stop chan bool, err error) {
stop = make(chan bool)
if cfg.AmqpUrl == "" {
log.Infoln("no amqp url specified, queue listening disabled")
return stop, nil
}
log.Printf("connecting to: %s", cfg.AmqpUrl)
var conn *amqp.Connection
for i := 0; i <= 1000; i++ {
conn, err = amqp.Dial(cfg.AmqpUrl)
if err != nil {
log.Infof("Failed to connect to amqp server: %s", err.Error())
time.Sleep(time.Second)
continue
}
break
}
// if the connection is still nil after 1000 tries, time to bail
if conn == nil {
return nil, fmt.Errorf("Failed to connect to amqp server")
}
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("Failed to open a channel: %s", err.Error())
}
q, err := ch.QueueDeclare(
"tasks", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Error declaring que: %s", err.Error())
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("", err)
}
go func() {
for msg := range msgs {
// tasks.Tas
task, err := tasks.TaskFromDelivery(store, msg)
if err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
continue
}
tc := make(chan *tasks.Task, 10)
// accept tasks
go func() {
for t := range tc {
if err := PublishTaskProgress(rpool, t); err != nil && err != ErrNoRedisConn {
log.Infoln(err.Error())
}
}
}()
log.Infof("starting task %s,%s", task.Id, task.Type)
if err := task.Do(store, tc); err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
} else {
log.Infof("completed task: %s, %s", task.Id, msg.Type)
msg.Ack(false)
}
}
// TODO - figure out a way to bail out of the above loop
// if stop is ever published to
<-stop
ch.Close()
conn.Close()
}()
return stop, nil
}