forked from mailru/dbr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbr.go
182 lines (159 loc) · 4.39 KB
/
dbr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package dbr
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/mailru/dbr/dialect"
)
// Open instantiates a Connection for a given database/sql connection
// and event receiver
func Open(driver, dsn string, log EventReceiver) (*Connection, error) {
if log == nil {
log = nullReceiver
}
conn, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}
var d Dialect
switch driver {
case "mysql":
d = dialect.MySQL
case "postgres":
d = dialect.PostgreSQL
case "sqlite3":
d = dialect.SQLite3
case "clickhouse":
d = dialect.ClickHouse
default:
return nil, ErrNotSupported
}
return &Connection{DB: conn, EventReceiver: log, Dialect: d}, nil
}
const (
placeholder = "?"
)
// Connection is a connection to the database with an EventReceiver
// to send events, errors, and timings to
type Connection struct {
*sql.DB
Dialect Dialect
EventReceiver
}
// Session represents a business unit of execution for some connection
type Session struct {
*Connection
EventReceiver
ctx context.Context
}
// NewSession instantiates a Session for the Connection
func (conn *Connection) NewSession(log EventReceiver) *Session {
return conn.NewSessionContext(context.Background(), log)
}
// NewSessionContext instantiates a Session with context for the Connection
func (conn *Connection) NewSessionContext(ctx context.Context, log EventReceiver) *Session {
if log == nil {
log = conn.EventReceiver // Use parent instrumentation
}
return &Session{Connection: conn, EventReceiver: log, ctx: ctx}
}
// NewSession forks current session
func (sess *Session) NewSession(log EventReceiver) *Session {
if log == nil {
log = sess.EventReceiver
}
return &Session{Connection: sess.Connection, EventReceiver: log, ctx: sess.ctx}
}
// beginTx starts a transaction with context.
func (conn *Connection) beginTx() (*sql.Tx, error) {
return conn.Begin()
}
// SessionRunner can do anything that a Session can except start a transaction.
type SessionRunner interface {
Select(column ...string) SelectBuilder
SelectBySql(query string, value ...interface{}) SelectBuilder
InsertInto(table string) InsertBuilder
InsertBySql(query string, value ...interface{}) InsertBuilder
Update(table string) UpdateBuilder
UpdateBySql(query string, value ...interface{}) UpdateBuilder
DeleteFrom(table string) DeleteBuilder
DeleteBySql(query string, value ...interface{}) DeleteBuilder
}
type runner interface {
Exec(query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
}
// Executer can execute requests to database
type Executer interface {
Exec() (sql.Result, error)
}
type loader interface {
Load(value interface{}) (int, error)
LoadStruct(value interface{}) error
LoadStructs(value interface{}) (int, error)
LoadValue(value interface{}) error
LoadValues(value interface{}) (int, error)
}
func exec(runner runner, log EventReceiver, builder Builder, d Dialect) (sql.Result, error) {
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.interpolate(placeholder, []interface{}{builder})
query, value := i.String(), i.Value()
if err != nil {
return nil, log.EventErrKv("dbr.exec.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.exec", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
result, err := runner.Exec(query, value...)
if err != nil {
return result, log.EventErrKv("dbr.exec.exec", err, kvs{
"sql": query,
})
}
return result, nil
}
func query(runner runner, log EventReceiver, builder Builder, d Dialect, dest interface{}) (int, error) {
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.interpolate(placeholder, []interface{}{builder})
query, value := i.String(), i.Value()
if err != nil {
return 0, log.EventErrKv("dbr.select.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.select", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
rows, err := runner.Query(query, value...)
if err != nil {
return 0, log.EventErrKv("dbr.select.load.query", err, kvs{
"sql": query,
})
}
count, err := Load(rows, dest)
if err != nil {
return 0, log.EventErrKv("dbr.select.load.scan", err, kvs{
"sql": query,
})
}
return count, nil
}