diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 1b6255dc..400b11b3 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -235,8 +235,8 @@ func (c *Core) PacketConn() *PacketConn { // Resolve takes a masked node ID and performs a search, returning the complete // node ID and the node's public key. func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.NodeID, boxPubKey *crypto.BoxPubKey, err error) { - fmt.Println("**** START RESOLVE") - defer fmt.Println("**** END RESOLVE") + c.log.Debugln("Resolving", nodeID.String()) + defer c.log.Debugln("Finished resolving", nodeID.String()) done := make(chan struct{}) c.router.Act(c, func() { @@ -261,7 +261,7 @@ func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.Node } c.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted).startSearch() } else { - err = errors.New("search already exists") + err = errors.New("a search for this node ID is already in progress") close(done) } }) diff --git a/src/yggdrasil/packetconn.go b/src/yggdrasil/packetconn.go index 0b40c7d6..ad26c9d0 100644 --- a/src/yggdrasil/packetconn.go +++ b/src/yggdrasil/packetconn.go @@ -59,54 +59,74 @@ func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { } // implements net.PacketConn -func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { +func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (n int, err error) { if c.closed { // TODO: unsafe? return 0, PacketConnError{closed: true} } + // Make sure that the net.Addr we were given was actually a + // *crypto.BoxPubKey. If it wasn't then fail. boxPubKey, ok := addr.(*crypto.BoxPubKey) if !ok { return 0, errors.New("expected *crypto.BoxPubKey as net.Addr") } + + // Work out the node ID for the public key we were given. If + // we need to perform a search then we will need to know this. nodeID := crypto.GetNodeID(boxPubKey) nodeMask := &crypto.NodeID{} for i := range nodeMask { nodeMask[i] = 0xFF } - // TODO: This is all a mess - var err error + // Look up if we have an open session for that destination. var session *sessionInfo phony.Block(c.sessions.router, func() { session, ok = c.sessions.getByTheirPerm(boxPubKey) }) + + // If we don't have an open session then we will need to perform + // a search to find the coordinates for the node. Doing this will + // implicitly open a session to the remote node. if !ok { - nodeID, boxPubKey, err = c.sessions.router.core.Resolve(nodeID, nodeMask) - if err == nil { - phony.Block(c.sessions.router, func() { - session, ok = c.sessions.getByTheirPerm(boxPubKey) - }) + // Try and look up the node ID and mask. + if nodeID, boxPubKey, err = c.sessions.router.core.Resolve(nodeID, nodeMask); err != nil { + return 0, fmt.Errorf("search failed: %w", err) + } + + // The previous function will block until it is done and by + // that point we should have a session. Try to retrieve it. + phony.Block(c.sessions.router, func() { + session, ok = c.sessions.getByTheirPerm(boxPubKey) + }) + + // If we still don't have an open session then something's + // gone wrong. Give up at this point. + if !ok || session == nil { + return 0, errors.New("session is not open") } } - if err != nil { - return 0, fmt.Errorf("failed to find session/start search: %w", err) - } - if !ok || session == nil { - return 0, errors.New("expected a session but there was none") - } + // Delegate to the sessions actor to actually send the message + // through the session. We'll wait on the sendErr channel for + // the actor to finish at least the initial checks. sendErr := make(chan error, 1) msg := FlowKeyMessage{Message: b} - session.Act(c, func() { - // Check if the packet is small enough to go through this session + // Check if the packet is small enough to go through this session. + // If it isn't then we'll send back an error with the maximum + // session MTU. The sender can decide what to do with it. sessionMTU := session._getMTU() if types.MTU(len(b)) > sessionMTU { sendErr <- PacketConnError{maxsize: int(sessionMTU)} return } - // Send the packet + // If we got to this point then our initial checks passed - there + // isn't much point blocking the sender any further so release it. + sendErr <- nil + + // Send the packet. session._send(msg) // Session keep-alive, while we wait for the crypto workers from send @@ -115,28 +135,33 @@ func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { if session.time.Before(session.pingTime) && time.Since(session.pingTime) > 6*time.Second { // TODO double check that the above condition is correct c.sessions.router.Act(session, func() { - // Check to see if there is a search already matching the destination + // Check to see if there is a search already matching the + // supplied destination. If there is then don't start another + // one. sinfo, isIn := c.sessions.router.searches.searches[*nodeID] if !isIn { - // Nothing was found, so create a new search + // Nothing was found, so create a new search. searchCompleted := func(sinfo *sessionInfo, e error) {} sinfo = c.sessions.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted) - c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo) - // Start the search + + // Start the search. sinfo.startSearch() + c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo) } }) } else { - session.ping(session) // TODO send from self if this becomes an actor + session._sendPingPong(false) } - case session.reset && session.pingTime.Before(session.time): - session.ping(session) // TODO send from self if this becomes an actor - default: // Don't do anything, to keep traffic throttled - } - sendErr <- nil + case session.reset && session.pingTime.Before(session.time): + session._sendPingPong(false) + + default: + } }) + // Wait for the checks to pass. Then return the success + // values to the caller. err = <-sendErr return len(b), err }