Update addPeer behavior and peer loop for URIs that may have transforms (like srv:// and txt://)

This commit is contained in:
Neil Alexander 2019-03-26 19:15:59 +00:00
parent 246c60ae4d
commit a642c6009f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 123 additions and 67 deletions

View file

@ -108,10 +108,13 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo
// Check to see if the peers are in a parsable format, if not then default // Check to see if the peers are in a parsable format, if not then default
// them to the TCP scheme // them to the TCP scheme
if peers, ok := dat["Peers"].([]interface{}); ok { if peers, ok := dat["Peers"].([]interface{}); ok {
peerloop:
for index, peer := range peers { for index, peer := range peers {
uri := peer.(string) uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { for _, prefix := range []string{"tcp://", "socks://", "srv://", "txt://"} {
continue if strings.HasPrefix(uri, prefix) {
continue peerloop
}
} }
if strings.HasPrefix(uri, "tcp:") { if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:] uri = uri[4:]
@ -121,11 +124,14 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo
} }
// Now do the same with the interface peers // Now do the same with the interface peers
if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok {
interfacepeerloop:
for intf, peers := range interfacepeers { for intf, peers := range interfacepeers {
for index, peer := range peers.([]interface{}) { for index, peer := range peers.([]interface{}) {
uri := peer.(string) uri := peer.(string)
if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { for _, prefix := range []string{"tcp://", "socks://", "srv://", "txt://"} {
continue if strings.HasPrefix(uri, prefix) {
continue interfacepeerloop
}
} }
if strings.HasPrefix(uri, "tcp:") { if strings.HasPrefix(uri, "tcp:") {
uri = uri[4:] uri = uri[4:]

View file

@ -591,7 +591,11 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
// addPeer triggers a connection attempt to a node. // addPeer triggers a connection attempt to a node.
func (a *admin) addPeer(addr string, sintf string) error { func (a *admin) addPeer(addr string, sintf string) error {
err := a.core.link.call(addr, sintf) entry, err := a.core.resolvePeerEntry(addr, sintf)
if err != nil {
return err
}
err = a.core.link.call(entry.uri, entry.sintf)
if err != nil { if err != nil {
return err return err
} }

View file

@ -2,8 +2,11 @@ package yggdrasil
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
"io/ioutil" "io/ioutil"
"net" "net"
"net/url"
"strings"
"sync" "sync"
"time" "time"
@ -23,29 +26,36 @@ type module interface {
start() error start() error
} }
type peerEntry struct {
uri string
sintf string
}
// The Core object represents the Yggdrasil node. You should create a Core // The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run. // object for each Yggdrasil node you plan to run.
type Core struct { type Core struct {
// This is the main data structure that holds everything else for a node // This is the main data structure that holds everything else for a node
// We're going to keep our own copy of the provided config - that way we can // We're going to keep our own copy of the provided config - that way we can
// guarantee that it will be covered by the mutex // guarantee that it will be covered by the mutex
config config.NodeConfig // Active config config config.NodeConfig // Active config
configOld config.NodeConfig // Previous config configOld config.NodeConfig // Previous config
configMutex sync.RWMutex // Protects both config and configOld configMutex sync.RWMutex // Protects both config and configOld
boxPub crypto.BoxPubKey configPeers []peerEntry // Transformed peer list
boxPriv crypto.BoxPrivKey configPeersMutex sync.RWMutex // Protects configPeers
sigPub crypto.SigPubKey boxPub crypto.BoxPubKey
sigPriv crypto.SigPrivKey boxPriv crypto.BoxPrivKey
switchTable switchTable sigPub crypto.SigPubKey
peers peers sigPriv crypto.SigPrivKey
sessions sessions switchTable switchTable
router router peers peers
dht dht sessions sessions
admin admin router router
searches searches dht dht
multicast multicast admin admin
link link searches searches
log *log.Logger multicast multicast
link link
log *log.Logger
} }
func (c *Core) init() error { func (c *Core) init() error {
@ -94,33 +104,42 @@ func (c *Core) init() error {
// If any static peers were provided in the configuration above then we should // If any static peers were provided in the configuration above then we should
// configure them. The loop ensures that disconnected peers will eventually // configure them. The loop ensures that disconnected peers will eventually
// be reconnected with. // be reconnected with.
func (c *Core) addPeerLoop() { func (c *Core) runPeerLoop() {
for { for {
// Get the peers from the config - these could change! // Get the peers from the config - these could change!
c.configMutex.RLock() c.configPeersMutex.RLock()
peers := c.config.Peers for _, peer := range c.configPeers {
interfacepeers := c.config.InterfacePeers c.AddPeer(peer.uri, peer.sintf)
c.configMutex.RUnlock()
// Add peers from the Peers section
for _, peer := range peers {
c.AddPeer(peer, "")
time.Sleep(time.Second) time.Sleep(time.Second)
} }
c.configPeersMutex.RUnlock()
// Add peers from the InterfacePeers section
for intf, intfpeers := range interfacepeers {
for _, peer := range intfpeers {
c.AddPeer(peer, intf)
time.Sleep(time.Second)
}
}
// Sit for a while // Sit for a while
time.Sleep(time.Minute) time.Sleep(time.Minute)
} }
} }
// When we start up or reconfigure, draw up the peer loop
func (c *Core) updatePeerLoop() {
c.configMutex.RLock()
c.configPeersMutex.Lock()
c.configPeers = []peerEntry{}
for _, peer := range c.config.Peers {
if entry, err := c.resolvePeerEntry(peer, ""); err == nil {
c.configPeers = append(c.configPeers, entry)
}
}
for intf, intfpeers := range c.config.InterfacePeers {
for _, peer := range intfpeers {
if entry, err := c.resolvePeerEntry(peer, intf); err == nil {
c.configPeers = append(c.configPeers, entry)
}
}
}
c.configPeersMutex.Unlock()
c.configMutex.RUnlock()
}
// UpdateConfig updates the configuration in Core and then signals the // UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed // various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) { func (c *Core) UpdateConfig(config *config.NodeConfig) {
@ -161,6 +180,8 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
} else { } else {
c.log.Infoln("Configuration reloaded successfully") c.log.Infoln("Configuration reloaded successfully")
} }
c.updatePeerLoop()
} }
// GetBuildName gets the current build name. This is usually injected if built // GetBuildName gets the current build name. This is usually injected if built
@ -238,7 +259,8 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err return err
} }
go c.addPeerLoop() c.updatePeerLoop()
go c.runPeerLoop()
c.log.Infoln("Startup complete") c.log.Infoln("Startup complete")
return nil return nil
@ -350,3 +372,54 @@ func (c *Core) GetTUNIfName() string {
func (c *Core) GetTUNIfMTU() int { func (c *Core) GetTUNIfMTU() int {
return c.router.tun.mtu return c.router.tun.mtu
} }
// Resolves the peer URI that may require a DNS lookup (or some other transform
// into a peer URI that can be given directly to link.go
func (c *Core) resolvePeerEntry(uri, sintf string) (peerEntry, error) {
u, err := url.Parse(uri)
if err != nil {
return peerEntry{}, err
}
switch u.Scheme {
case "srv":
for _, proto := range []string{"tcp"} {
if cname, srv, err := net.LookupSRV("yggdrasil", proto, u.Host); err == nil {
for _, record := range srv {
c.log.Traceln("SRV lookup for", u.Host, "found:", cname, record.Target, record.Port)
switch proto {
case "tcp":
return peerEntry{
uri: fmt.Sprintf("tcp://%s:%d", record.Target, record.Port),
sintf: sintf,
}, nil
}
}
} else {
c.log.Debugln("SRV lookup for", u.Host, "failed:", err)
return peerEntry{}, err
}
}
case "txt":
recordname := fmt.Sprintf("_yggdrasil.%s", u.Host)
if records, err := net.LookupTXT(recordname); err == nil {
for _, record := range records {
c.log.Traceln("Found TXT record:", record)
if !strings.HasPrefix(record, "txt://") {
return peerEntry{
uri: fmt.Sprintf("%s", record),
sintf: sintf,
}, nil
}
}
} else {
c.log.Debugln("TXT lookup failed:", err)
return peerEntry{}, err
}
default:
break
}
return peerEntry{
uri: uri,
sintf: sintf,
}, nil
}

View file

@ -102,33 +102,6 @@ func (l *link) call(uri string, sintf string) error {
} }
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
switch u.Scheme { switch u.Scheme {
case "srv":
for _, proto := range []string{"tcp"} {
if cname, srv, err := net.LookupSRV("yggdrasil", proto, u.Host); err == nil {
for _, record := range srv {
l.core.log.Debugln("SRV lookup for", u.Host, "found:", cname, record.Target, record.Port)
switch proto {
case "tcp":
saddr := fmt.Sprintf("%s:%d", record.Target, record.Port)
l.tcp.call(saddr, nil, sintf)
}
}
} else {
l.core.log.Debugln("SRV lookup for", u.Host, "failed:", err)
}
}
case "txt":
recordname := fmt.Sprintf("_yggdrasil.%s", u.Host)
if records, err := net.LookupTXT(recordname); err == nil {
for _, record := range records {
l.core.log.Debugln("Found TXT record:", record)
if !strings.HasPrefix(record, "txt://") {
l.call(record, sintf)
}
}
} else {
l.core.log.Debugln("TXT lookup failed:", err)
}
case "tcp": case "tcp":
l.tcp.call(u.Host, nil, sintf) l.tcp.call(u.Host, nil, sintf)
case "socks": case "socks":