Clean up a bit

This commit is contained in:
Neil Alexander 2020-07-07 13:45:09 +01:00
parent ae8099819e
commit 6b2028f781
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 55 additions and 30 deletions

View file

@ -235,8 +235,8 @@ func (c *Core) PacketConn() *PacketConn {
// Resolve takes a masked node ID and performs a search, returning the complete // Resolve takes a masked node ID and performs a search, returning the complete
// node ID and the node's public key. // node ID and the node's public key.
func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.NodeID, boxPubKey *crypto.BoxPubKey, err error) { func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.NodeID, boxPubKey *crypto.BoxPubKey, err error) {
fmt.Println("**** START RESOLVE") c.log.Debugln("Resolving", nodeID.String())
defer fmt.Println("**** END RESOLVE") defer c.log.Debugln("Finished resolving", nodeID.String())
done := make(chan struct{}) done := make(chan struct{})
c.router.Act(c, func() { c.router.Act(c, func() {
@ -261,7 +261,7 @@ func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.Node
} }
c.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted).startSearch() c.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted).startSearch()
} else { } else {
err = errors.New("search already exists") err = errors.New("a search for this node ID is already in progress")
close(done) close(done)
} }
}) })

View file

@ -59,54 +59,74 @@ func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
} }
// implements net.PacketConn // implements net.PacketConn
func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (n int, err error) {
if c.closed { // TODO: unsafe? if c.closed { // TODO: unsafe?
return 0, PacketConnError{closed: true} return 0, PacketConnError{closed: true}
} }
// Make sure that the net.Addr we were given was actually a
// *crypto.BoxPubKey. If it wasn't then fail.
boxPubKey, ok := addr.(*crypto.BoxPubKey) boxPubKey, ok := addr.(*crypto.BoxPubKey)
if !ok { if !ok {
return 0, errors.New("expected *crypto.BoxPubKey as net.Addr") return 0, errors.New("expected *crypto.BoxPubKey as net.Addr")
} }
// Work out the node ID for the public key we were given. If
// we need to perform a search then we will need to know this.
nodeID := crypto.GetNodeID(boxPubKey) nodeID := crypto.GetNodeID(boxPubKey)
nodeMask := &crypto.NodeID{} nodeMask := &crypto.NodeID{}
for i := range nodeMask { for i := range nodeMask {
nodeMask[i] = 0xFF nodeMask[i] = 0xFF
} }
// TODO: This is all a mess // Look up if we have an open session for that destination.
var err error
var session *sessionInfo var session *sessionInfo
phony.Block(c.sessions.router, func() { phony.Block(c.sessions.router, func() {
session, ok = c.sessions.getByTheirPerm(boxPubKey) session, ok = c.sessions.getByTheirPerm(boxPubKey)
}) })
// If we don't have an open session then we will need to perform
// a search to find the coordinates for the node. Doing this will
// implicitly open a session to the remote node.
if !ok { if !ok {
nodeID, boxPubKey, err = c.sessions.router.core.Resolve(nodeID, nodeMask) // Try and look up the node ID and mask.
if err == nil { if nodeID, boxPubKey, err = c.sessions.router.core.Resolve(nodeID, nodeMask); err != nil {
return 0, fmt.Errorf("search failed: %w", err)
}
// The previous function will block until it is done and by
// that point we should have a session. Try to retrieve it.
phony.Block(c.sessions.router, func() { phony.Block(c.sessions.router, func() {
session, ok = c.sessions.getByTheirPerm(boxPubKey) session, ok = c.sessions.getByTheirPerm(boxPubKey)
}) })
}
} // If we still don't have an open session then something's
if err != nil { // gone wrong. Give up at this point.
return 0, fmt.Errorf("failed to find session/start search: %w", err)
}
if !ok || session == nil { if !ok || session == nil {
return 0, errors.New("expected a session but there was none") return 0, errors.New("session is not open")
}
} }
// Delegate to the sessions actor to actually send the message
// through the session. We'll wait on the sendErr channel for
// the actor to finish at least the initial checks.
sendErr := make(chan error, 1) sendErr := make(chan error, 1)
msg := FlowKeyMessage{Message: b} msg := FlowKeyMessage{Message: b}
session.Act(c, func() { session.Act(c, func() {
// Check if the packet is small enough to go through this session // Check if the packet is small enough to go through this session.
// If it isn't then we'll send back an error with the maximum
// session MTU. The sender can decide what to do with it.
sessionMTU := session._getMTU() sessionMTU := session._getMTU()
if types.MTU(len(b)) > sessionMTU { if types.MTU(len(b)) > sessionMTU {
sendErr <- PacketConnError{maxsize: int(sessionMTU)} sendErr <- PacketConnError{maxsize: int(sessionMTU)}
return return
} }
// Send the packet // If we got to this point then our initial checks passed - there
// isn't much point blocking the sender any further so release it.
sendErr <- nil
// Send the packet.
session._send(msg) session._send(msg)
// Session keep-alive, while we wait for the crypto workers from send // Session keep-alive, while we wait for the crypto workers from send
@ -115,28 +135,33 @@ func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
if session.time.Before(session.pingTime) && time.Since(session.pingTime) > 6*time.Second { if session.time.Before(session.pingTime) && time.Since(session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct // TODO double check that the above condition is correct
c.sessions.router.Act(session, func() { c.sessions.router.Act(session, func() {
// Check to see if there is a search already matching the destination // Check to see if there is a search already matching the
// supplied destination. If there is then don't start another
// one.
sinfo, isIn := c.sessions.router.searches.searches[*nodeID] sinfo, isIn := c.sessions.router.searches.searches[*nodeID]
if !isIn { if !isIn {
// Nothing was found, so create a new search // Nothing was found, so create a new search.
searchCompleted := func(sinfo *sessionInfo, e error) {} searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.sessions.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted) sinfo = c.sessions.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted)
c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo)
// Start the search // Start the search.
sinfo.startSearch() sinfo.startSearch()
c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo)
} }
}) })
} else { } else {
session.ping(session) // TODO send from self if this becomes an actor session._sendPingPong(false)
}
case session.reset && session.pingTime.Before(session.time):
session.ping(session) // TODO send from self if this becomes an actor
default: // Don't do anything, to keep traffic throttled
} }
sendErr <- nil case session.reset && session.pingTime.Before(session.time):
session._sendPingPong(false)
default:
}
}) })
// Wait for the checks to pass. Then return the success
// values to the caller.
err = <-sendErr err = <-sendErr
return len(b), err return len(b), err
} }