fix actor EnqueueFrom stack overflow (use nil now to send from self) and replace session send/recv workers with actor functions

This commit is contained in:
Arceliar 2019-08-23 22:23:01 -05:00
parent 436c84ca33
commit 533da351f9
7 changed files with 132 additions and 17 deletions

View file

@ -72,8 +72,9 @@ type sessionInfo struct {
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers
fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session
recv chan []byte // Decrypted packets go here, picked up by the associated Conn
send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
toConn chan []byte // Decrypted packets go here, picked up by the associated Conn
fromConn chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
callbacks []chan func() // Finished work from crypto workers
}
// TODO remove this, call SyncExec directly
@ -253,8 +254,8 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.fromRouter = make(chan wire_trafficPacket, 1)
sinfo.recv = make(chan []byte, 32)
sinfo.send = make(chan FlowKeyMessage, 32)
sinfo.toConn = make(chan []byte, 32)
sinfo.fromConn = make(chan FlowKeyMessage, 32)
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go func() {
@ -264,7 +265,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.sessions.removeSession(&sinfo)
})
}()
go sinfo.startWorkers()
//go sinfo.startWorkers()
return &sinfo
}
@ -539,7 +540,7 @@ func (sinfo *sessionInfo) recvWorker() {
select {
case <-sinfo.cancel.Finished():
util.PutBytes(bs)
case sinfo.recv <- bs:
case sinfo.toConn <- bs:
}
}
}
@ -664,15 +665,127 @@ func (sinfo *sessionInfo) sendWorker() {
f()
case <-sinfo.cancel.Finished():
return
case msg := <-sinfo.send:
case msg := <-sinfo.fromConn:
doSend(msg)
}
}
select {
case <-sinfo.cancel.Finished():
return
case bs := <-sinfo.send:
case bs := <-sinfo.fromConn:
doSend(bs)
}
}
}
////////////////////////////////////////////////////////////////////////////////
func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) {
sinfo.EnqueueFrom(from, func() {
sinfo._recvPacket(packet)
})
}
func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
select {
case <-sinfo.init:
default:
// TODO find a better way to drop things until initialized
util.PutBytes(p.Payload)
return
}
switch {
case sinfo._nonceIsOK(&p.Nonce):
case len(sinfo.toConn) < cap(sinfo.toConn):
default:
// We're either full or don't like this nonce
util.PutBytes(p.Payload)
return
}
k := sinfo.sharedSesKey
var isOK bool
var bs []byte
ch := make(chan func(), 1)
poolFunc := func() {
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
callback := func() {
util.PutBytes(p.Payload)
if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
// Either we failed to decrypt, or the session was updated, or we received this packet in the mean time
util.PutBytes(bs)
return
}
sinfo._updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
select {
case sinfo.toConn <- bs:
default:
// We seem to have filled up the buffer in the mean time, so drop it
util.PutBytes(bs)
}
}
ch <- callback
sinfo.checkCallbacks()
}
sinfo.callbacks = append(sinfo.callbacks, ch)
util.WorkerGo(poolFunc)
}
func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
select {
case <-sinfo.init:
default:
// TODO find a better way to drop things until initialized
util.PutBytes(msg.Message)
return
}
sinfo.bytesSent += uint64(len(msg.Message))
coords := append([]byte(nil), sinfo.coords...)
if msg.FlowKey != 0 {
coords = append(coords, 0)
coords = append(coords, wire_encode_uint64(msg.FlowKey)...)
}
p := wire_trafficPacket{
Coords: coords,
Handle: sinfo.theirHandle,
Nonce: sinfo.myNonce,
}
sinfo.myNonce.Increment()
k := sinfo.sharedSesKey
ch := make(chan func(), 1)
poolFunc := func() {
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode()
// Cleanup
util.PutBytes(msg.Message)
util.PutBytes(p.Payload)
// Send the packet
// TODO replace this with a send to the peer struct if that becomes an actor
sinfo.sessions.router.EnqueueFrom(sinfo, func() {
sinfo.sessions.router.out(packet)
})
}
ch <- callback
sinfo.checkCallbacks()
}
sinfo.callbacks = append(sinfo.callbacks, ch)
util.WorkerGo(poolFunc)
}
func (sinfo *sessionInfo) checkCallbacks() {
sinfo.EnqueueFrom(nil, func() {
if len(sinfo.callbacks) > 0 {
select {
case callback := <-sinfo.callbacks[0]:
sinfo.callbacks = sinfo.callbacks[1:]
callback()
sinfo.checkCallbacks()
default:
}
}
})
}