From d520a8a1d509f0eab670a34b763392451ace1781 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 16:10:32 -0600 Subject: [PATCH 1/8] refactor dht code to call arbitrary callbacks instead of only searches.checkDHTRes, and add admin API fuction to dhtPing a node (with an optional target NodeID) --- src/yggdrasil/admin.go | 73 +++++++++++++++++++++++++++++++++++++++++ src/yggdrasil/dht.go | 68 +++++++++++++++++++++----------------- src/yggdrasil/search.go | 2 ++ 3 files changed, 112 insertions(+), 31 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 1e85907c..ca3baa27 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -302,6 +302,24 @@ func (a *admin) init(c *Core, listenaddr string) { return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["destPubKey"].(string))}}, errors.New("Failed to remove route") } }) + a.addHandler("dhtPing", []string{"key", "coords", "[target]"}, func(in admin_info) (admin_info, error) { + if in["target"] == nil { + in["target"] = "none" + } + result, err := a.admin_dhtPing(in["key"].(string), in["coords"].(string), in["target"].(string)) + if err == nil { + var infos []map[string]string + for _, dinfo := range result.Infos { + info := make(map[string]string) + info["key"] = hex.EncodeToString(dinfo.key[:]) + info["coords"] = fmt.Sprintf("%v", dinfo.coords) + infos = append(infos, info) + } + return admin_info{"nodes": infos}, nil + } else { + return admin_info{}, err + } + }) } // start runs the admin API socket to listen for / respond to admin API calls. @@ -536,6 +554,7 @@ func (a *admin) getData_getSelf() *admin_nodeInfo { table := a.core.switchTable.table.Load().(lookupTable) coords := table.self.getCoords() self := admin_nodeInfo{ + {"key", hex.EncodeToString(a.core.boxPub[:])}, {"ip", a.core.GetAddress().String()}, {"subnet", a.core.GetSubnet().String()}, {"coords", fmt.Sprint(coords)}, @@ -702,6 +721,60 @@ func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) { return } +// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID. +func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) { + var key boxPubKey + if keyBytes, err := hex.DecodeString(keyString); err != nil { + return dhtRes{}, err + } else { + copy(key[:], keyBytes) + } + var coords []byte + for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { + if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { + return dhtRes{}, err + } else { + coords = append(coords, uint8(u64)) + } + } + resCh := make(chan *dhtRes) + info := dhtInfo{ + key: key, + coords: coords, + } + target := *info.getNodeID() + if targetString == "none" { + // Leave the default target in place + } else if targetBytes, err := hex.DecodeString(targetString); err != nil { + return dhtRes{}, err + } else if len(targetBytes) != len(target) { + return dhtRes{}, errors.New("Incorrect target NodeID length") + } else { + target = NodeID{} + copy(target[:], targetBytes) + } + rq := dhtReqKey{info.key, target} + sendPing := func() { + a.core.dht.addCallback(&rq, func(res *dhtRes) { + defer func() { recover() }() + select { + case resCh <- res: + default: + } + }) + a.core.dht.ping(&info, &target) + } + a.core.router.doAdmin(sendPing) + go func() { + time.Sleep(6 * time.Second) + close(resCh) + }() + for res := range resCh { + return *res, nil + } + return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) +} + // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // The graph is structured as a tree with directed links leading away from the root. diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e7815b7a..fd5ca585 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -49,12 +49,19 @@ type dhtRes struct { Infos []*dhtInfo // response } +// Parts of a DHT req usable as a key in a map. +type dhtReqKey struct { + key boxPubKey + dest NodeID +} + // The main DHT struct. type dht struct { - core *Core - nodeID NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[boxPubKey]map[NodeID]time.Time + core *Core + nodeID NodeID + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey][]func(*dhtRes) // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[NodeID]*dhtInfo imp []*dhtInfo @@ -65,13 +72,14 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) + t.callbacks = make(map[dhtReqKey][]func(*dhtRes)) t.reset() } // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { - t.reqs = make(map[boxPubKey]map[NodeID]time.Time) + t.reqs = make(map[dhtReqKey]time.Time) t.table = make(map[NodeID]*dhtInfo) t.imp = nil } @@ -194,19 +202,31 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { t.core.router.out(packet) } +// Adds a callback and removes it after some timeout. +func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { + t.callbacks[*rq] = append(t.callbacks[*rq], callback) + go func() { + time.Sleep(6 * time.Second) + t.core.router.admin <- func() { + delete(t.callbacks, *rq) + } + }() +} + // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { + rq := dhtReqKey{res.Key, res.Dest} + for _, callback := range t.callbacks[rq] { + callback(res) + } + delete(t.callbacks, rq) t.core.searches.handleDHTRes(res) - reqs, isIn := t.reqs[res.Key] + _, isIn := t.reqs[rq] if !isIn { return } - _, isIn = reqs[res.Dest] - if !isIn { - return - } - delete(reqs, res.Dest) + delete(t.reqs, rq) rinfo := dhtInfo{ key: res.Key, coords: res.Coords, @@ -243,15 +263,8 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { } packet := p.encode() t.core.router.out(packet) - reqsToDest, isIn := t.reqs[dest.key] - if !isIn { - t.reqs[dest.key] = make(map[NodeID]time.Time) - reqsToDest, isIn = t.reqs[dest.key] - if !isIn { - panic("This should never happen") - } - } - reqsToDest[req.Dest] = time.Now() + rq := dhtReqKey{req.Key, req.Dest} + t.reqs[rq] = time.Now() } // Sends a lookup to this info, looking for the target. @@ -273,17 +286,10 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { // Periodic maintenance work to keep important DHT nodes alive. func (t *dht) doMaintenance() { now := time.Now() - newReqs := make(map[boxPubKey]map[NodeID]time.Time, len(t.reqs)) - for key, dests := range t.reqs { - newDests := make(map[NodeID]time.Time, len(dests)) - for nodeID, start := range dests { - if now.Sub(start) > 6*time.Second { - continue - } - newDests[nodeID] = start - } - if len(newDests) > 0 { - newReqs[key] = newDests + newReqs := make(map[dhtReqKey]time.Time, len(t.reqs)) + for key, start := range t.reqs { + if now.Sub(start) < 6*time.Second { + newReqs[key] = start } } t.reqs = newReqs diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 21694907..be156dc6 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -132,6 +132,8 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + rq := dhtReqKey{next.key, sinfo.dest} + s.core.dht.addCallback(&rq, s.handleDHTRes) s.core.dht.ping(next, &sinfo.dest) } } From 12e635f9464360187c8c6324717c3dbbbe9ced97 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 16:16:06 -0600 Subject: [PATCH 2/8] adjust dhtPing response so 'nodes' defaults to an empty list instead of null --- src/yggdrasil/admin.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index ca3baa27..fa8f8dd8 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -308,11 +308,12 @@ func (a *admin) init(c *Core, listenaddr string) { } result, err := a.admin_dhtPing(in["key"].(string), in["coords"].(string), in["target"].(string)) if err == nil { - var infos []map[string]string + infos := make([]map[string]string, 0, len(result.Infos)) for _, dinfo := range result.Infos { - info := make(map[string]string) - info["key"] = hex.EncodeToString(dinfo.key[:]) - info["coords"] = fmt.Sprintf("%v", dinfo.coords) + info := map[string]string{ + "key": hex.EncodeToString(dinfo.key[:]), + "coords": fmt.Sprintf("%v", dinfo.coords), + } infos = append(infos, info) } return admin_info{"nodes": infos}, nil From 9937a6102e91daa80e82e30820128287af92b72f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 16:29:47 -0600 Subject: [PATCH 3/8] add callbacks to maintenance map cleanup --- src/yggdrasil/dht.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index fd5ca585..e49e343a 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -293,6 +293,11 @@ func (t *dht) doMaintenance() { } } t.reqs = newReqs + newCallbacks := make(map[dhtReqKey][]func(*dhtRes), len(t.callbacks)) + for key, callback := range t.callbacks { + newCallbacks[key] = callback + } + t.callbacks = newCallbacks for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) From 7954fa3c33cbb7632a2bd24caf414221925440ef Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 17:08:45 -0600 Subject: [PATCH 4/8] store one callback instead of many, needed to prevent search failures if there are multiple outstanding packets --- src/yggdrasil/dht.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e49e343a..6e08ad23 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -59,9 +59,9 @@ type dhtReqKey struct { type dht struct { core *Core nodeID NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey][]func(*dhtRes) // Search and admin lookup callbacks + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[NodeID]*dhtInfo imp []*dhtInfo @@ -72,7 +72,7 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) - t.callbacks = make(map[dhtReqKey][]func(*dhtRes)) + t.callbacks = make(map[dhtReqKey]dht_callbackInfo) t.reset() } @@ -202,26 +202,25 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { t.core.router.out(packet) } +type dht_callbackInfo struct { + f func(*dhtRes) + time time.Time +} + // Adds a callback and removes it after some timeout. func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { - t.callbacks[*rq] = append(t.callbacks[*rq], callback) - go func() { - time.Sleep(6 * time.Second) - t.core.router.admin <- func() { - delete(t.callbacks, *rq) - } - }() + info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} + t.callbacks[*rq] = info } // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { rq := dhtReqKey{res.Key, res.Dest} - for _, callback := range t.callbacks[rq] { - callback(res) + if callback, isIn := t.callbacks[rq]; isIn { + callback.f(res) + delete(t.callbacks, rq) } - delete(t.callbacks, rq) - t.core.searches.handleDHTRes(res) _, isIn := t.reqs[rq] if !isIn { return @@ -263,7 +262,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { } packet := p.encode() t.core.router.out(packet) - rq := dhtReqKey{req.Key, req.Dest} + rq := dhtReqKey{dest.key, req.Dest} t.reqs[rq] = time.Now() } @@ -293,9 +292,11 @@ func (t *dht) doMaintenance() { } } t.reqs = newReqs - newCallbacks := make(map[dhtReqKey][]func(*dhtRes), len(t.callbacks)) + newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) for key, callback := range t.callbacks { - newCallbacks[key] = callback + if now.Before(callback.time) { + newCallbacks[key] = callback + } } t.callbacks = newCallbacks for infoID, info := range t.table { From d253bb750c8435d48e064e547e71940e05a62982 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 17:50:56 -0600 Subject: [PATCH 5/8] yggdrasilctl support --- yggdrasilctl.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/yggdrasilctl.go b/yggdrasilctl.go index a3e24096..a2b0731f 100644 --- a/yggdrasilctl.go +++ b/yggdrasilctl.go @@ -296,6 +296,17 @@ func main() { fmt.Println("-", v) } } + case "dhtping": + if _, ok := res["nodes"]; !ok { + fmt.Println("No nodes found") + } else if res["nodes"] == nil { + fmt.Println("No nodes found") + } else { + for _, v := range res["nodes"].([]interface{}) { + m := v.(map[string]interface{}) + fmt.Println("-", m["key"], m["coords"]) + } + } default: if json, err := json.MarshalIndent(recv["response"], "", " "); err == nil { fmt.Println(string(json)) From a34ca40594f70ef16786ecdd3e562f061df7163f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 17:59:36 -0600 Subject: [PATCH 6/8] use a buffered channel to avoid races, and run gofmt --- src/yggdrasil/admin.go | 2 +- yggdrasilctl.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index fa8f8dd8..00625e14 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -738,7 +738,7 @@ func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtR coords = append(coords, uint8(u64)) } } - resCh := make(chan *dhtRes) + resCh := make(chan *dhtRes, 1) info := dhtInfo{ key: key, coords: coords, diff --git a/yggdrasilctl.go b/yggdrasilctl.go index a2b0731f..4a76361f 100644 --- a/yggdrasilctl.go +++ b/yggdrasilctl.go @@ -303,7 +303,7 @@ func main() { fmt.Println("No nodes found") } else { for _, v := range res["nodes"].([]interface{}) { - m := v.(map[string]interface{}) + m := v.(map[string]interface{}) fmt.Println("-", m["key"], m["coords"]) } } From 0ec6207e054554e36f443c5b0633372d09cf32f0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 18:25:31 -0600 Subject: [PATCH 7/8] better response format and yggdrasilctl printing --- src/yggdrasil/admin.go | 5 +++-- yggdrasilctl.go | 13 +------------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 00625e14..2aee240b 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -308,13 +308,14 @@ func (a *admin) init(c *Core, listenaddr string) { } result, err := a.admin_dhtPing(in["key"].(string), in["coords"].(string), in["target"].(string)) if err == nil { - infos := make([]map[string]string, 0, len(result.Infos)) + infos := make(map[string]map[string]string, len(result.Infos)) for _, dinfo := range result.Infos { info := map[string]string{ "key": hex.EncodeToString(dinfo.key[:]), "coords": fmt.Sprintf("%v", dinfo.coords), } - infos = append(infos, info) + addr := net.IP(address_addrForNodeID(getNodeID(&dinfo.key))[:]).String() + infos[addr] = info } return admin_info{"nodes": infos}, nil } else { diff --git a/yggdrasilctl.go b/yggdrasilctl.go index 4a76361f..b3b1cef0 100644 --- a/yggdrasilctl.go +++ b/yggdrasilctl.go @@ -107,7 +107,7 @@ func main() { switch strings.ToLower(req["request"].(string)) { case "dot": fmt.Println(res["dot"]) - case "help", "getpeers", "getswitchpeers", "getdht", "getsessions": + case "help", "getpeers", "getswitchpeers", "getdht", "getsessions", "dhtping": maxWidths := make(map[string]int) var keyOrder []string keysOrdered := false @@ -296,17 +296,6 @@ func main() { fmt.Println("-", v) } } - case "dhtping": - if _, ok := res["nodes"]; !ok { - fmt.Println("No nodes found") - } else if res["nodes"] == nil { - fmt.Println("No nodes found") - } else { - for _, v := range res["nodes"].([]interface{}) { - m := v.(map[string]interface{}) - fmt.Println("-", m["key"], m["coords"]) - } - } default: if json, err := json.MarshalIndent(recv["response"], "", " "); err == nil { fmt.Println(string(json)) From d8d1e63c36735ac8eb097aa43e31c69c347beb12 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 20:33:33 -0600 Subject: [PATCH 8/8] fix infinite loop from interaction between dht.isImportant and dht.insert --- src/yggdrasil/dht.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 6e08ad23..aa694f7a 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -361,6 +361,9 @@ func (t *dht) getImportant() []*dhtInfo { // Returns true if this is a node we need to keep track of for the DHT to work. func (t *dht) isImportant(ninfo *dhtInfo) bool { + if ninfo.key == t.core.boxPub { + return false + } important := t.getImportant() // Check if ninfo is of equal or greater importance to what we already know loc := t.core.switchTable.getLocator()