diff --git a/src/util/util.go b/src/util/util.go index 65e6d463..45be3b19 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func Yield() { @@ -44,3 +45,14 @@ func PutBytes(bs []byte) { default: } } + +// This is a workaround to go's broken timer implementation +func TimerStop(t *time.Timer) bool { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return true +} diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f7319e46..94faba40 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -97,9 +97,7 @@ func (c *Core) DEBUG_getPeers() *peers { } func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { - //in <-chan []byte, - //out chan<- []byte) *peer { - return ps.newPeer(&box, &sig, &link, "(simulator)") //, in, out) + return ps.newPeer(&box, &sig, &link, "(simulator)", nil) } /* diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 2d2155c9..be98dd92 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,6 +20,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface awdl awdl // AWDL interface support + // TODO timeout (to remove from switch), read from config.ReadTimeout } type linkInfo struct { @@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } - //l.interfaces[intf.name] = &intf - //go intf.start() return &intf, nil } @@ -142,7 +141,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) if intf.peer == nil { return errors.New("failed to create peer") } @@ -161,52 +160,57 @@ func (intf *linkInterface) handler() error { themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.peer.close = func() { - intf.msgIO.close() - intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) + defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) + signalSent := make(chan bool, 1) + sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) + defer close(signalSent) interval := 4 * time.Second - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } + tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp + defer util.TimerStop(tcpTimer) + send := func(bs []byte) { + intf.msgIO.writeMsg(bs) + select { + case signalSent <- len(bs) > 0: + default: } } - defer clearTimer() for { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - intf.msgIO.writeMsg(msg) + send(msg) continue default: } // No protocol traffic to send, so reset the timer - clearTimer() - timer.Reset(interval) + util.TimerStop(tcpTimer) + tcpTimer.Reset(interval) // Now block until something is ready or the timer triggers keepalive traffic select { - case <-timer.C: - intf.msgIO.writeMsg(nil) + case <-tcpTimer.C: + intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case <-sendAck: + intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) case msg := <-intf.peer.linkOut: intf.msgIO.writeMsg(msg) case msg, ok := <-out: if !ok { return } - intf.msgIO.writeMsg(msg) + send(msg) util.PutBytes(msg) select { case signalReady <- struct{}{}: @@ -217,27 +221,22 @@ func (intf *linkInterface) handler() error { }() //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Used to enable/disable activity in the switch - signalAlive := make(chan struct{}, 1) + signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) go func() { var isAlive bool var isReady bool - interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - } - defer clearTimer() + var sendTimerRunning bool + var recvTimerRunning bool + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + sendTime := time.Second + sendTimer := time.NewTimer(sendTime) + defer util.TimerStop(sendTimer) + recvTimer := time.NewTimer(recvTime) + defer util.TimerStop(recvTimer) for { - clearTimer() - timer.Reset(interval) select { - case _, ok := <-signalAlive: + case gotMsg, ok := <-signalAlive: if !ok { return } @@ -249,6 +248,27 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } + if gotMsg && !sendTimerRunning { + // We got a message + // Start a timer, if it expires then send a 0-sized ack to let them know we're alive + util.TimerStop(sendTimer) + sendTimer.Reset(sendTime) + sendTimerRunning = true + } + case sentMsg, ok := <-signalSent: + // Stop any running ack timer + if !ok { + return + } + util.TimerStop(sendTimer) + sendTimerRunning = false + if sentMsg && !recvTimerRunning { + // We sent a message + // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem + util.TimerStop(recvTimer) + recvTimer.Reset(recvTime) + recvTimerRunning = true + } case _, ok := <-signalReady: if !ok { return @@ -260,7 +280,14 @@ func (intf *linkInterface) handler() error { // Keep enabled in the switch intf.link.core.switchTable.idleIn <- intf.peer.port } - case <-timer.C: + case <-sendTimer.C: + // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive + select { + case sendAck <- struct{}{}: + default: + } + case <-recvTimer.C: + // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding isAlive = false } } @@ -275,7 +302,7 @@ func (intf *linkInterface) handler() error { return err } select { - case signalAlive <- struct{}{}: + case signalAlive <- len(msg) > 0: default: } } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad54fbc3..237d6f61 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -113,7 +113,7 @@ type peer struct { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string) *peer { +func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -123,6 +123,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare firstSeen: now, doSend: make(chan struct{}, 1), dinfo: make(chan *dhtInfo, 1), + close: closer, core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d5059369..99e69828 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,7 @@ func (r *router) init(core *Core) { r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)") + p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { // This is to make very sure it never blocks select { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f2adf3fb..db39d010 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -281,6 +281,7 @@ func (t *switchTable) cleanPeers() { if now.Sub(peer.time) > switch_timeout+switch_throttle { // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. delete(t.data.peers, port) + go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe } } if _, isIn := t.data.peers[t.parent]; !isIn {