use a session worker to try to avoid mutex hell. compiles, but incomplete and doesn't work yet

This commit is contained in:
Arceliar 2019-04-21 20:38:14 -05:00
parent 0b8f5b5dda
commit 5dada3952c
4 changed files with 191 additions and 151 deletions

View file

@ -18,38 +18,50 @@ import (
// All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct {
core *Core //
reconfigure chan chan error //
theirAddr address.Address //
theirSubnet address.Subnet //
theirPermPub crypto.BoxPubKey //
theirSesPub crypto.BoxPubKey //
mySesPub crypto.BoxPubKey //
mySesPriv crypto.BoxPrivKey //
sharedSesKey crypto.BoxSharedKey // derived from session keys
theirHandle crypto.Handle //
myHandle crypto.Handle //
theirNonce crypto.BoxNonce //
theirNonceMask uint64 //
theirNonceMutex sync.Mutex // protects the above
myNonce crypto.BoxNonce //
myNonceMutex sync.Mutex // protects the above
theirMTU uint16 //
myMTU uint16 //
wasMTUFixed bool // Was the MTU fixed by a receive error?
time atomic.Value // time.Time // Time we last received a packet
mtuTime atomic.Value // time.Time // time myMTU was last changed
pingTime atomic.Value // time.Time // time the first ping was sent since the last received packet
pingSend atomic.Value // time.Time // time the last ping was sent
coords []byte // coords of destination
packet []byte // a buffered packet, sent immediately on ping/pong
init atomic.Value // bool // Reset if coords change
send chan []byte //
recv chan *wire_trafficPacket //
closed chan interface{} //
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session
core *Core //
reconfigure chan chan error //
theirAddr address.Address //
theirSubnet address.Subnet //
theirPermPub crypto.BoxPubKey //
theirSesPub crypto.BoxPubKey //
mySesPub crypto.BoxPubKey //
mySesPriv crypto.BoxPrivKey //
sharedSesKey crypto.BoxSharedKey // derived from session keys
theirHandle crypto.Handle //
myHandle crypto.Handle //
theirNonce crypto.BoxNonce //
theirNonceMask uint64 //
myNonce crypto.BoxNonce //
theirMTU uint16 //
myMTU uint16 //
wasMTUFixed bool // Was the MTU fixed by a receive error?
time time.Time // Time we last received a packet
mtuTime time.Time // time myMTU was last changed
pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent
coords []byte // coords of destination
packet []byte // a buffered packet, sent immediately on ping/pong
init bool // Reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session
worker chan func() // Channel to send work to the session worker
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
}
func (sinfo *sessionInfo) doWorker(f func()) {
done := make(chan struct{})
sinfo.worker <- func() {
f()
close(done)
}
<-done
}
func (sinfo *sessionInfo) workerMain() {
for f := range sinfo.worker {
f()
}
}
// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -89,16 +101,19 @@ func (s *sessionInfo) update(p *sessionPing) bool {
// allocate enough space for additional coords
s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...)
}
now := time.Now()
s.time.Store(now)
atomic.StoreInt64(&s.tstamp, p.Tstamp)
s.init.Store(true)
s.time = time.Now()
s.tstamp = p.Tstamp
s.init = true
return true
}
// Returns true if the session has been idle for longer than the allowed timeout.
func (s *sessionInfo) timedout() bool {
return time.Since(s.time.Load().(time.Time)) > time.Minute
var timedout bool
s.doWorker(func() {
timedout = time.Since(s.time) > time.Minute
})
return timedout
}
// Struct of all active sessions.
@ -282,10 +297,10 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.theirMTU = 1280
sinfo.myMTU = 1280
now := time.Now()
sinfo.time.Store(now)
sinfo.mtuTime.Store(now)
sinfo.pingTime.Store(now)
sinfo.pingSend.Store(now)
sinfo.time = now
sinfo.mtuTime = now
sinfo.pingTime = now
sinfo.pingSend = now
higher := false
for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@ -305,14 +320,13 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.send = make(chan []byte, 32)
sinfo.recv = make(chan *wire_trafficPacket, 32)
sinfo.closed = make(chan interface{})
sinfo.worker = make(chan func(), 1)
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub
ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub
go sinfo.workerMain()
return &sinfo
}
@ -366,14 +380,12 @@ func (ss *sessions) cleanup() {
// Closes a session, removing it from sessions maps and killing the worker goroutine.
func (sinfo *sessionInfo) close() {
close(sinfo.closed)
delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byMySes, sinfo.mySesPub)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr)
delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet)
close(sinfo.send)
close(sinfo.recv)
close(sinfo.worker)
}
// Returns a session ping appropriate for the given session info.
@ -436,7 +448,7 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
packet := p.encode()
ss.core.router.out(packet)
if !isPong {
sinfo.pingSend.Store(time.Now())
sinfo.pingSend = time.Now()
}
}
@ -468,29 +480,36 @@ func (ss *sessions) handlePing(ping *sessionPing) {
mutex: &sync.RWMutex{},
nodeID: crypto.GetNodeID(&sinfo.theirPermPub),
nodeMask: &crypto.NodeID{},
recv: make(chan *wire_trafficPacket, 32),
}
for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF
}
sinfo.recv = conn.recv
ss.listener.conn <- conn
} else {
ss.core.log.Debugln("Received new session but there is no listener, ignoring")
}
ss.listenerMutex.Unlock()
}
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
return
}
if !ping.IsPong {
ss.sendPingPong(sinfo, true)
}
if sinfo.packet != nil {
// send
var bs []byte
bs, sinfo.packet = sinfo.packet, nil
ss.core.router.sendPacket(bs)
}
sinfo.doWorker(func() {
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
return
}
if !ping.IsPong {
ss.sendPingPong(sinfo, true)
}
if sinfo.packet != nil {
/* FIXME this needs to live in the net.Conn or something, needs work in Write
// send
var bs []byte
bs, sinfo.packet = sinfo.packet, nil
ss.core.router.sendPacket(bs) // FIXME this needs to live in the net.Conn or something, needs work in Write
*/
sinfo.packet = nil
}
})
}
// Get the MTU of the session.
@ -536,6 +555,8 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) resetInits() {
for _, sinfo := range ss.sinfos {
sinfo.init.Store(false)
sinfo.doWorker(func() {
sinfo.init = false
})
}
}