diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 813e950b..b58c24dc 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -25,6 +25,7 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/admin" "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/dhtcrawler" "github.com/yggdrasil-network/yggdrasil-go/src/module" "github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap" @@ -33,11 +34,12 @@ import ( ) type node struct { - core yggdrasil.Core - state *config.NodeState - tuntap module.Module // tuntap.TunAdapter - multicast module.Module // multicast.Multicast - admin module.Module // admin.AdminSocket + core yggdrasil.Core + state *config.NodeState + tuntap module.Module // tuntap.TunAdapter + multicast module.Module // multicast.Multicast + admin module.Module // admin.AdminSocket + dhtcrawler module.Module // dhtcrawler.Crawler } func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *config.NodeConfig { @@ -284,6 +286,7 @@ func main() { n.admin = &admin.AdminSocket{} n.multicast = &multicast.Multicast{} n.tuntap = &tuntap.TunAdapter{} + n.dhtcrawler = &dhtcrawler.Crawler{} // Start the admin socket n.admin.Init(&n.core, n.state, logger, nil) if err := n.admin.Start(); err != nil { @@ -310,6 +313,10 @@ func main() { } else { logger.Errorln("Unable to get Listener:", err) } + + n.dhtcrawler.Init(&n.core, n.state, logger, nil) + n.dhtcrawler.SetupAdminHandlers(n.admin.(*admin.AdminSocket)) + // Make some nice output that tells us what our IPv6 address and subnet are. // This is just logged to stdout for the user. address := n.core.Address() @@ -337,6 +344,7 @@ func main() { n.core.UpdateConfig(cfg) n.tuntap.UpdateConfig(cfg) n.multicast.UpdateConfig(cfg) + n.dhtcrawler.UpdateConfig(cfg) } else { logger.Errorln("Reloading config at runtime is only possible with -useconffile") } diff --git a/src/dhtcrawler/crawler.go b/src/dhtcrawler/crawler.go new file mode 100644 index 00000000..4f6c15c5 --- /dev/null +++ b/src/dhtcrawler/crawler.go @@ -0,0 +1,273 @@ +package dhtcrawler + +import ( + "encoding/hex" + "encoding/json" + "math/rand" + "net" + "sync" + "time" + + "github.com/gologme/log" + + "github.com/yggdrasil-network/yggdrasil-go/src/address" + "github.com/yggdrasil-network/yggdrasil-go/src/admin" + "github.com/yggdrasil-network/yggdrasil-go/src/config" + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +) + +const ( + DefaultRetryCount int = 5 + DefaultExpiration time.Duration = 5 * time.Minute +) + +type ApiCache struct { + networkMap admin.Info + networkMapMutex sync.RWMutex + expiration int64 +} + +func (a *ApiCache) Get(crawler *Crawler) admin.Info { + a.networkMapMutex.Lock() + defer a.networkMapMutex.Unlock() + + if a.networkMap == nil || time.Now().UnixNano() > a.expiration { + json_value := crawler.getNetworkMap() + a.networkMap = admin.Info{"networkMap": json_value} + a.expiration = time.Now().Add(DefaultExpiration).UnixNano() + } + + return a.networkMap +} + +type Crawler struct { + core *yggdrasil.Core + config *config.NodeState + log *log.Logger + started bool + + apiCache ApiCache + dhtWaitGroup sync.WaitGroup + dhtVisited map[string]attempt + dhtMutex sync.RWMutex + nodeInfoWaitGroup sync.WaitGroup + nodeInfoVisited map[string]interface{} + nodeInfoMutex sync.RWMutex +} + +// This is the structure that we marshal at the end into JSON results +type results struct { + Meta struct { + GeneratedAtUTC int64 `json:"generated_at_utc"` + NodesAttempted int `json:"nodes_attempted"` + NodesSuccessful int `json:"nodes_successful"` + NodesFailed int `json:"nodes_failed"` + NodeInfoSuccessful int `json:"nodeinfo_successful"` + NodeInfoFailed int `json:"nodeinfo_failed"` + } `json:"meta"` + Topology *map[string]attempt `json:"topology"` + NodeInfo *map[string]interface{} `json:"nodeinfo"` +} + +type attempt struct { + NodeID string `json:"node_id"` // the node ID + IPv6Addr string `json:"ipv6_addr"` // the node address + IPv6Subnet string `json:"ipv6_subnet"` // the node subnet + Coords []uint64 `json:"coords"` // the coordinates of the node + Found bool `json:"found"` // has a search for this node completed successfully? +} + +func (s *Crawler) Init(core *yggdrasil.Core, config *config.NodeState, log *log.Logger, options interface{}) error { + s.core = core + s.config = config + s.log = log + s.started = false + + return nil +} + +func (s *Crawler) Stop() error { + return nil +} + +func (s *Crawler) Start() error { + return nil +} + +func (s *Crawler) IsStarted() bool { + return s.started +} + +func (s *Crawler) UpdateConfig(config *config.NodeConfig) {} + +func (s *Crawler) SetupAdminHandlers(a *admin.AdminSocket) { + a.AddHandler("getNetworkMap", []string{}, func(in admin.Info) (admin.Info, error) { + return s.apiCache.Get(s), nil + }) +} + +func (s *Crawler) getNetworkMap() string { + starttime := time.Now() + s.dhtVisited = make(map[string]attempt) + s.nodeInfoVisited = make(map[string]interface{}) + + if key, err := hex.DecodeString(s.core.EncryptionPublicKey()); err == nil { + var pubkey crypto.BoxPubKey + copy(pubkey[:], key) + s.dhtWaitGroup.Add(1) + go s.dhtPing(pubkey, s.core.Coords()) + } else { + panic("failed to decode pub key") + } + + s.dhtWaitGroup.Wait() + s.nodeInfoWaitGroup.Wait() + + s.dhtMutex.Lock() + defer s.dhtMutex.Unlock() + s.nodeInfoMutex.Lock() + defer s.nodeInfoMutex.Unlock() + + s.log.Infoln("The crawl took", time.Since(starttime)) + + attempted := len(s.dhtVisited) + found := 0 + for _, attempt := range s.dhtVisited { + if attempt.Found { + found++ + } + } + + res := results{ + Topology: &s.dhtVisited, + NodeInfo: &s.nodeInfoVisited, + } + res.Meta.GeneratedAtUTC = time.Now().UTC().Unix() + res.Meta.NodeInfoSuccessful = len(s.nodeInfoVisited) + res.Meta.NodeInfoFailed = found - len(s.nodeInfoVisited) + res.Meta.NodesAttempted = attempted + res.Meta.NodesSuccessful = found + res.Meta.NodesFailed = attempted - found + + if j, err := json.MarshalIndent(res, "", "\t"); err == nil { + return string(j) + } else { + return "json marshaling error" + } +} + +func (s *Crawler) dhtPing(pubkey crypto.BoxPubKey, coords []uint64) { + // Notify the main goroutine that we're done working + defer s.dhtWaitGroup.Done() + + // Generate useful information about the node, such as it's node ID, address + // and subnet + key := hex.EncodeToString(pubkey[:]) + nodeid := crypto.GetNodeID(&pubkey) + addr := net.IP(address.AddrForNodeID(nodeid)[:]) + upper := append(address.SubnetForNodeID(nodeid)[:], 0, 0, 0, 0, 0, 0, 0, 0) + subnet := net.IPNet{IP: upper, Mask: net.CIDRMask(64, 128)} + + // If we already have an entry of this node then we should stop what we're + // doing - it either means that a search is already taking place, or that we + // have already processed this node + s.dhtMutex.RLock() + if info := s.dhtVisited[key]; info.Found { + s.dhtMutex.RUnlock() + return + } + s.dhtMutex.RUnlock() + + // Make a record of this node and the coordinates so that future goroutines + // started by a rumour of this node will not repeat this search + var res yggdrasil.DHTRes + var err error + for idx := 0; idx < DefaultRetryCount; idx++ { + // Randomized delay between attempts, increases exponentially + time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)*(1<