-
-
Notifications
You must be signed in to change notification settings - Fork 21
/
callbacks.go
191 lines (172 loc) · 4.91 KB
/
callbacks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package utp
/*
#include "utp.h"
*/
import "C"
import (
"net"
"reflect"
"strings"
"sync/atomic"
"unsafe"
"github.com/anacrolix/log"
)
type utpCallbackArguments C.utp_callback_arguments
func (a *utpCallbackArguments) goContext() *utpContext {
return (*utpContext)(a.context)
}
func (a *utpCallbackArguments) bufBytes() []byte {
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
uintptr(unsafe.Pointer(a.buf)),
int(a.len),
int(a.len),
}))
}
func (a *utpCallbackArguments) state() C.int {
return *(*C.int)(unsafe.Pointer(&a.anon0))
}
func (a *utpCallbackArguments) error_code() C.int {
return *(*C.int)(unsafe.Pointer(&a.anon0))
}
func (a *utpCallbackArguments) address() *C.struct_sockaddr {
return *(**C.struct_sockaddr)(unsafe.Pointer(&a.anon0[0]))
}
func (a *utpCallbackArguments) addressLen() C.socklen_t {
return *(*C.socklen_t)(unsafe.Pointer(&a.anon1[0]))
}
var sends int64
//export sendtoCallback
func sendtoCallback(a *utpCallbackArguments) (ret C.uint64) {
s := getSocketForLibContext(a.goContext())
b := a.bufBytes()
var sendToUdpAddr net.UDPAddr
if err := structSockaddrToUDPAddr(a.address(), &sendToUdpAddr); err != nil {
panic(err)
}
newSends := atomic.AddInt64(&sends, 1)
if logCallbacks {
s.logger.Printf("sending %d bytes, %d packets", len(b), newSends)
}
expMap.Add("socket PacketConn writes", 1)
n, err := s.pc.WriteTo(b, &sendToUdpAddr)
c := s.conns[a.socket]
if err != nil {
expMap.Add("socket PacketConn write errors", 1)
if c != nil && c.userOnError != nil {
go c.userOnError(err)
} else if c != nil &&
(strings.Contains(err.Error(), "can't assign requested address") ||
strings.Contains(err.Error(), "invalid argument")) {
// Should be an bad argument or network configuration problem we
// can't recover from.
c.onError(err)
} else if c != nil && strings.Contains(err.Error(), "operation not permitted") {
// Rate-limited. Probably Linux. The implementation might try
// again later.
} else {
s.logger.Levelf(log.Debug, "error sending packet: %v", err)
}
return
}
if n != len(b) {
expMap.Add("socket PacketConn short writes", 1)
s.logger.Printf("expected to send %d bytes but only sent %d", len(b), n)
}
return
}
//export errorCallback
func errorCallback(a *utpCallbackArguments) C.uint64 {
s := getSocketForLibContext(a.goContext())
err := errorForCode(a.error_code())
if logCallbacks {
s.logger.Printf("error callback: socket %p: %s", a.socket, err)
}
libContextToSocket[a.goContext()].conns[a.socket].onError(err)
return 0
}
//export logCallback
func logCallback(a *utpCallbackArguments) C.uint64 {
s := getSocketForLibContext(a.goContext())
s.logger.Printf("libutp: %s", C.GoString((*C.char)(unsafe.Pointer(a.buf))))
return 0
}
//export stateChangeCallback
func stateChangeCallback(a *utpCallbackArguments) C.uint64 {
s := libContextToSocket[a.goContext()]
c := s.conns[a.socket]
if logCallbacks {
s.logger.Printf("state changed: conn %p: %s", c, libStateName(a.state()))
}
switch a.state() {
case C.UTP_STATE_CONNECT:
c.setConnected()
// A dialled connection will not tell the remote it's ready until it
// writes. If the dialer has no intention of writing, this will stall
// everything. We do an empty write to get things rolling again. This
// circumstance occurs when c1 in the RacyRead nettest is the dialer.
C.utp_write(a.socket, nil, 0)
case C.UTP_STATE_WRITABLE:
c.cond.Broadcast()
case C.UTP_STATE_EOF:
c.setGotEOF()
case C.UTP_STATE_DESTROYING:
c.onDestroyed()
s.onLibSocketDestroyed(a.socket)
default:
panic(a.state)
}
return 0
}
//export readCallback
func readCallback(a *utpCallbackArguments) C.uint64 {
s := libContextToSocket[a.goContext()]
c := s.conns[a.socket]
b := a.bufBytes()
if logCallbacks {
s.logger.Printf("read callback: conn %p: %d bytes", c, len(b))
}
if len(b) == 0 {
panic("that will break the read drain invariant")
}
c.readBuf.Write(b)
c.cond.Broadcast()
return 0
}
//export acceptCallback
func acceptCallback(a *utpCallbackArguments) C.uint64 {
s := getSocketForLibContext(a.goContext())
if logCallbacks {
s.logger.Printf("accept callback: %#v", *a)
}
c := s.newConn(a.socket)
c.setRemoteAddr()
s.pushBacklog(c)
return 0
}
//export getReadBufferSizeCallback
func getReadBufferSizeCallback(a *utpCallbackArguments) (ret C.uint64) {
s := libContextToSocket[a.goContext()]
c := s.conns[a.socket]
if c == nil {
// socket hasn't been added to the Socket.conns yet. The read buffer
// starts out empty, and the default implementation for this callback
// returns 0, so we'll return that.
return 0
}
ret = C.uint64(c.readBuf.Len())
return
}
//export firewallCallback
func firewallCallback(a *utpCallbackArguments) C.uint64 {
s := getSocketForLibContext(a.goContext())
if s.syncFirewallCallback != nil {
var addr net.UDPAddr
structSockaddrToUDPAddr(a.address(), &addr)
if s.syncFirewallCallback(&addr) {
return 1
}
} else if s.asyncBlock {
return 1
}
return 0
}