-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathmain.go
172 lines (144 loc) · 5.19 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
package main
import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
eorc "github.com/crphang/orc"
"github.com/gorilla/mux"
"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/config/env"
"github.com/kelindar/talaria/internal/config/s3"
"github.com/kelindar/talaria/internal/config/static"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/logging"
"github.com/kelindar/talaria/internal/monitor/statsd"
"github.com/kelindar/talaria/internal/server"
"github.com/kelindar/talaria/internal/server/cluster"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/storage/disk"
"github.com/kelindar/talaria/internal/storage/writer"
"github.com/kelindar/talaria/internal/table"
"github.com/kelindar/talaria/internal/table/log"
"github.com/kelindar/talaria/internal/table/nodes"
"github.com/kelindar/talaria/internal/table/timeseries"
)
const (
logTag = "main"
)
func main() {
eorc.DefaultCompressionChunkSize = 16 * eorc.DefaultCompressionChunkSize
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s3Configurer := s3.New(logging.NewStandard())
configure := config.Load(ctx, 60*time.Second, static.New(), env.New("TALARIA"), s3Configurer)
conf := configure()
// Setup gossip
gossip := cluster.New(7946)
// Create a log table and a simple stdout monitor
stats := statsd.New(conf.Statsd.Host, int(conf.Statsd.Port))
logTable := log.New(configure, gossip, monitor.New(
logging.NewStandard(), stats, conf.AppName, conf.Env), // Use stdout monitor
)
// Setup the final logger and a monitor
logger := logging.NewComposite(logTable, logging.NewStandard())
monitor := monitor.New(logger, stats, conf.AppName, conf.Env)
// Updating the logger to use the composite logger. This is to make sure the logs from the config is sent to log table as well as stdout
s3Configurer.SetLogger(logger)
// Open every table configured
tables := []table.Table{nodes.New(gossip), logTable}
for name, tableConf := range conf.Tables {
tables = append(tables, openTable(name, conf.Storage, tableConf, gossip, monitor))
}
// Start the new server
server := server.New(configure, monitor, gossip, tables...)
waitExit := make(chan struct{}, 1)
// onSignal will be called when a OS-level signal is received.
onSignal(func(s os.Signal) {
monitor.Info("received signal %v, start graceful shutdown", s)
cancel()
monitor.Info("Cancel ctx done")
gossip.Close()
monitor.Info("Close gossip done")
server.Close(monitor) //Close the server and database, which will wait RPC/SQS request on ingestion to DB finished
monitor.Info("Graceful shutdown finished, ready to exit")
close(waitExit)
})
// Join the cluster
monitor.Info("server: joining cluster on %s...", conf.Domain)
gossip.JoinHostname(conf.Domain)
// run HTTP server for readiness and liveness probes if k8s config is set
if conf.K8s != nil {
startHTTPServerAsync(conf.K8s.ProbePort)
}
// Start listenHandler
monitor.Info("server: starting...")
monitor.Count1(logTag, "start")
if err := server.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil {
panic(err)
}
for {
select {
case <-ctx.Done(): // graceful shutdown
monitor.Info("server: wait exit...")
<-waitExit
monitor.Info("server: exit after ctx is done...")
os.Exit(1)
default:
monitor.Info("server: exit...")
os.Exit(1)
}
}
}
// openTable creates a new table with storage & optional compaction fully configured
func openTable(name string, storageConf config.Storage, tableConf config.Table, cluster cluster.Membership, monitor monitor.Monitor) table.Table {
monitor.Info("server: opening table %s...", name)
// Create a new storage layer and optional compaction
store := storage.Storage(disk.Open(storageConf.Directory, name, monitor, storageConf.Badger))
if tableConf.Compact != nil {
var err error
store, err = writer.ForCompaction(tableConf.Compact, monitor, store)
if err != nil {
panic(err)
}
}
// Returns noop streamer if array is empty
streams, err := writer.ForStreaming(tableConf.Streams, monitor)
if err != nil {
panic(err)
}
return timeseries.New(name, cluster, monitor, store, &tableConf, streams)
}
// onSignal hooks a callback for a signal.
func onSignal(callback func(sig os.Signal)) {
c := make(chan os.Signal, 1)
// SIGQUIT will be ignore as it is sended by people most of the time
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
for sig := range c {
callback(sig)
}
}()
}
func startHTTPServerAsync(portNum int32) {
go func() {
handler := mux.NewRouter()
handler.HandleFunc("/healthz", func(resp http.ResponseWriter, req *http.Request) {
_, _ = resp.Write([]byte(`talaria-health-check`))
}).Methods(http.MethodGet, http.MethodHead)
handler.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
server := &http.Server{
Addr: fmt.Sprintf(":%d", portNum),
Handler: handler,
}
if err := server.ListenAndServe(); err != nil {
panic(err)
}
}()
}