-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
144 lines (119 loc) · 2.34 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package main
import (
"fmt"
"sync"
"time"
)
type ConfigFunc func(*PoolConfig)
type PoolConfig struct {
InitWorkers int
MaxWorkers int
MinWorkers int
Poll_Period int
Scaling bool
}
type Pool struct {
Config PoolConfig
JobQueue chan *Job
Workers chan *Worker
Current int
KillChan chan struct{}
Wg sync.WaitGroup
Poller *Poller
IsAlive bool
}
func defaultConfig() PoolConfig {
return PoolConfig{
InitWorkers: 2,
MaxWorkers: 4,
MinWorkers: 1,
Poll_Period: 10,
Scaling: false,
}
}
func withMaxWorkers(n int) ConfigFunc {
return func(pc *PoolConfig) {
pc.MaxWorkers = n
}
}
func withInitWorkers(n int) ConfigFunc {
return func(pc *PoolConfig) {
pc.InitWorkers = n
}
}
func withPollPeriod(n int) ConfigFunc {
return func(pc *PoolConfig) {
pc.Poll_Period = n
}
}
func withMinWorkers(n int) ConfigFunc {
return func(pc *PoolConfig) {
pc.MinWorkers = n
}
}
func withScaling() ConfigFunc {
return func(pc *PoolConfig) {
pc.Scaling = true
}
}
func GetPool(confs ...ConfigFunc) *Pool {
dconf := defaultConfig()
for _, cfn := range confs {
cfn(&dconf)
}
workers := make(chan *Worker, dconf.MaxWorkers)
return &Pool{
Config: dconf,
JobQueue: make(chan *Job, dconf.MaxWorkers),
KillChan: make(chan struct{}),
Workers: workers,
Current: 0,
Poller: &Poller{
Ticker: time.NewTicker(time.Duration(dconf.Poll_Period) * time.Second),
quitCh: make(chan struct{}),
},
IsAlive: true,
}
}
func (P *Pool) Start() {
fmt.Println("Spawning the Initial Workers")
for i := 0; i < P.Config.InitWorkers; i++ {
w := SpawnWorker()
P.Workers <- w
go w.Start(&P.Wg, P.Workers)
P.Current += 1
}
go listen(P)
go P.PollStatus()
}
func listen(P *Pool) {
for {
select {
case job := <-P.JobQueue:
Schedule(P, job)
case <-P.KillChan:
fmt.Println("Kill Signal Recieved...Waiting for the workers to finish the Job")
P.Poller.quitCh <- struct{}{}
P.Wg.Wait()
fmt.Println("Pool Killed")
P.IsAlive = false
return
}
}
}
func (P *Pool) Kill() {
P.KillChan <- struct{}{}
}
func (P *Pool) AddJob(J *Job) {
if !P.IsAlive {
fmt.Println("Pool is not alive. Cannot add a Job")
return
}
P.JobQueue <- J
}
func Schedule(P *Pool, j *Job) {
worker := GetAvailableWorker(P)
fmt.Println("Choosen Worker:", worker.ID, " for the Job", j.ID)
worker.JobChan <- j
P.Wg.Add(1)
}