From 22322c972721308f2b0ff6c5df4c85fc1b6ae23e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 2 Sep 2019 18:46:41 +0100 Subject: [PATCH] Remove NodeConfig references from packages --- src/admin/admin.go | 41 +++++------ src/multicast/multicast.go | 27 ++++--- src/tuntap/ckr.go | 82 ++++++++++++--------- src/tuntap/tun.go | 29 ++------ src/yggdrasil/api.go | 59 ++++++++++++--- src/yggdrasil/conn.go | 2 +- src/yggdrasil/core.go | 145 +++++++++++++++---------------------- src/yggdrasil/dht.go | 18 ++--- src/yggdrasil/link.go | 9 +-- src/yggdrasil/nodeinfo.go | 4 +- src/yggdrasil/peer.go | 60 ++++++++------- src/yggdrasil/router.go | 32 ++------ src/yggdrasil/search.go | 6 +- src/yggdrasil/session.go | 24 ++---- src/yggdrasil/switch.go | 38 ++++------ src/yggdrasil/tcp.go | 24 +++--- 16 files changed, 286 insertions(+), 314 deletions(-) diff --git a/src/admin/admin.go b/src/admin/admin.go index fdc005ff..1a4cac4a 100644 --- a/src/admin/admin.go +++ b/src/admin/admin.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/Arceliar/phony" "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -24,12 +25,12 @@ import ( // TODO: Add authentication type AdminSocket struct { - core *yggdrasil.Core - log *log.Logger - reconfigure chan chan error - listenaddr string - listener net.Listener - handlers map[string]handler + phony.Inbox + core *yggdrasil.Core + log *log.Logger + listenaddr string + listener net.Listener + handlers map[string]handler } // Info refers to information that is returned to the admin socket handler. @@ -56,20 +57,7 @@ func (a *AdminSocket) AddHandler(name string, args []string, handlerfunc func(In func (a *AdminSocket) Init(c *yggdrasil.Core, log *log.Logger, options interface{}) { a.core = c a.log = log - a.reconfigure = make(chan chan error, 1) a.handlers = make(map[string]handler) - go func() { - for { - e := <-a.reconfigure - if newlistenaddr := c.GetConfig().AdminListen; newlistenaddr != a.listenaddr { - a.listenaddr = newlistenaddr - a.Stop() - a.Start() - } - e <- nil - } - }() - a.listenaddr = c.GetConfig().AdminListen a.AddHandler("list", []string{}, func(in Info) (Info, error) { handlers := make(map[string]interface{}) for handlername, handler := range a.handlers { @@ -309,13 +297,22 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, log *log.Logger, options interface }) } -// start runs the admin API socket to listen for / respond to admin API calls. -func (a *AdminSocket) Start() error { +// Start runs the admin API socket to listen for / respond to admin API calls. +// You should provide a listen address in tcp://a.b.c.d:e, tcp://[a::b:c:d]:e or +// unix:///path/to/yggdrasil.sock format. +func (a *AdminSocket) Start(listenaddr string) error { if a.core == nil { return errors.New("admin socket has not been initialised, call Init first") } - if a.listenaddr != "none" && a.listenaddr != "" { + if listenaddr == "none" || listenaddr == "" { + return errors.New("admin socket listen address not provided") + } + if a.listenaddr != listenaddr { + if a.listener != nil { + a.listener.Close() + } go a.listen() + a.listenaddr = listenaddr } return nil } diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 36cd71f1..b3d92ad4 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -8,6 +8,7 @@ import ( "regexp" "time" + "github.com/Arceliar/phony" "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" @@ -19,10 +20,12 @@ import ( // configured multicast interface, Yggdrasil will attempt to peer with that node // automatically. type Multicast struct { + phony.Inbox core *yggdrasil.Core log *log.Logger sock *ipv6.PacketConn groupAddr string + interfaces []string listeners map[string]*yggdrasil.TcpListener listenPort uint16 isOpen bool @@ -33,11 +36,18 @@ func (m *Multicast) Init(core *yggdrasil.Core, log *log.Logger, options interfac m.core = core m.log = log m.listeners = make(map[string]*yggdrasil.TcpListener) - m.listenPort = m.core.GetConfig().LinkLocalTCPPort m.groupAddr = "[ff02::114]:9001" return nil } +// SetLinkLocalTCPPort lets you decide which port will be used when listening on +// multicast interfaces. If not set, a random port number will be selected. +func (m *Multicast) SetLinkLocalTCPPort(port uint16) { + m.Inbox.Act(m, func() { + m.listenPort = port + }) +} + // Start starts the multicast interface. This launches goroutines which will // listen for multicast beacons from other hosts and will advertise multicast // beacons out to the network. @@ -77,21 +87,16 @@ func (m *Multicast) Stop() error { return nil } -// UpdateConfig updates the multicast module with the provided config.NodeConfig -// and then signals the various module goroutines to reconfigure themselves if -// needed. -func (m *Multicast) UpdateConfig() { - -} - -// GetInterfaces returns the currently known/enabled multicast interfaces. It is +// Interfaces returns the currently known/enabled multicast interfaces. It is // expected that UpdateInterfaces has been called at least once before calling // this method. func (m *Multicast) Interfaces() map[string]net.Interface { interfaces := make(map[string]net.Interface) // Get interface expressions from config - current := m.core.GetConfig() - exprs := current.MulticastInterfaces + var exprs []string + phony.Block(m, func() { + exprs = m.interfaces + }) // Ask the system for network interfaces allifaces, err := net.Interfaces() if err != nil { diff --git a/src/tuntap/ckr.go b/src/tuntap/ckr.go index 9018c099..7aa0565c 100644 --- a/src/tuntap/ckr.go +++ b/src/tuntap/ckr.go @@ -44,53 +44,65 @@ func (c *cryptokey) init(tun *TunAdapter) { // Configure the CKR routes. This should only ever be ran by the TUN/TAP actor. func (c *cryptokey) configure() { - current := c.tun.core.GetConfig() + /* + current := c.tun.core.GetConfig() - // Set enabled/disabled state - c.setEnabled(current.TunnelRouting.Enable) + // Set enabled/disabled state + c.setEnabled(current.TunnelRouting.Enable) + + // Clear out existing routes + c.mutexremotes.Lock() + c.ipv6remotes = make([]cryptokey_route, 0) + c.ipv4remotes = make([]cryptokey_route, 0) + c.mutexremotes.Unlock() + + // Add IPv6 routes + for ipv6, pubkey := range current.TunnelRouting.IPv6RemoteSubnets { + if err := c.addRemoteSubnet(ipv6, pubkey); err != nil { + c.tun.log.Errorln("Error adding CKR IPv6 remote subnet:", err) + } + } + + // Add IPv4 routes + for ipv4, pubkey := range current.TunnelRouting.IPv4RemoteSubnets { + if err := c.addRemoteSubnet(ipv4, pubkey); err != nil { + c.tun.log.Errorln("Error adding CKR IPv4 remote subnet:", err) + } + } + + // Clear out existing sources + c.mutexlocals.Lock() + c.ipv6locals = make([]net.IPNet, 0) + c.ipv4locals = make([]net.IPNet, 0) + c.mutexlocals.Unlock() + + // Add IPv6 sources + c.ipv6locals = make([]net.IPNet, 0) + for _, source := range current.TunnelRouting.IPv6LocalSubnets { + if err := c.addLocalSubnet(source); err != nil { + c.tun.log.Errorln("Error adding CKR IPv6 local subnet:", err) + } + } + + // Add IPv4 sources + c.ipv4locals = make([]net.IPNet, 0) + for _, source := range current.TunnelRouting.IPv4LocalSubnets { + if err := c.addLocalSubnet(source); err != nil { + c.tun.log.Errorln("Error adding CKR IPv4 local subnet:", err) + } + } + */ - // Clear out existing routes c.mutexremotes.Lock() c.ipv6remotes = make([]cryptokey_route, 0) c.ipv4remotes = make([]cryptokey_route, 0) c.mutexremotes.Unlock() - // Add IPv6 routes - for ipv6, pubkey := range current.TunnelRouting.IPv6RemoteSubnets { - if err := c.addRemoteSubnet(ipv6, pubkey); err != nil { - c.tun.log.Errorln("Error adding CKR IPv6 remote subnet:", err) - } - } - - // Add IPv4 routes - for ipv4, pubkey := range current.TunnelRouting.IPv4RemoteSubnets { - if err := c.addRemoteSubnet(ipv4, pubkey); err != nil { - c.tun.log.Errorln("Error adding CKR IPv4 remote subnet:", err) - } - } - - // Clear out existing sources c.mutexlocals.Lock() c.ipv6locals = make([]net.IPNet, 0) c.ipv4locals = make([]net.IPNet, 0) c.mutexlocals.Unlock() - // Add IPv6 sources - c.ipv6locals = make([]net.IPNet, 0) - for _, source := range current.TunnelRouting.IPv6LocalSubnets { - if err := c.addLocalSubnet(source); err != nil { - c.tun.log.Errorln("Error adding CKR IPv6 local subnet:", err) - } - } - - // Add IPv4 sources - c.ipv4locals = make([]net.IPNet, 0) - for _, source := range current.TunnelRouting.IPv4LocalSubnets { - if err := c.addLocalSubnet(source); err != nil { - c.tun.log.Errorln("Error adding CKR IPv4 local subnet:", err) - } - } - // Wipe the caches c.mutexcaches.Lock() c.ipv4cache = make(map[address.Address]cryptokey_route, 0) diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index afb8439c..ff6369c0 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -9,7 +9,6 @@ package tuntap // TODO: Don't block in reader on writes that are pending searches import ( - "encoding/hex" "errors" "fmt" "net" @@ -38,7 +37,6 @@ type TunAdapter struct { writer tunWriter reader tunReader log *log.Logger - reconfigure chan chan error listener *yggdrasil.Listener dialer *yggdrasil.Dialer addr address.Address @@ -123,31 +121,25 @@ func (tun *TunAdapter) Init(c *yggdrasil.Core, log *log.Logger, listener *yggdra // Start the setup process for the TUN/TAP adapter. If successful, starts the // reader actor to handle packets on that interface. -func (tun *TunAdapter) Start() error { +func (tun *TunAdapter) Start(ifname string, ifmtu int, iftapmode bool) error { var err error phony.Block(tun, func() { - err = tun._start() + err = tun._start(ifname, ifmtu, iftapmode) }) return err } -func (tun *TunAdapter) _start() error { - current := tun.core.GetConfig() +func (tun *TunAdapter) _start(ifname string, ifmtu int, iftapmode bool) error { if tun.core == nil || tun.listener == nil || tun.dialer == nil { return errors.New("TUN/TAP has not been initialised, call Init first") } - var boxPub crypto.BoxPubKey - boxPubHex, err := hex.DecodeString(current.EncryptionPublicKey) - if err != nil { - return err + if tun.isOpen { + return errors.New("TUN/TAP has already been started") } - copy(boxPub[:], boxPubHex) - nodeID := crypto.GetNodeID(&boxPub) + nodeID := tun.core.NodeID() tun.addr = *address.AddrForNodeID(nodeID) tun.subnet = *address.SubnetForNodeID(nodeID) - tun.mtu = current.IfMTU - ifname := current.IfName - iftapmode := current.IfTAPMode + tun.mtu = ifmtu addr := fmt.Sprintf("%s/%d", net.IP(tun.addr[:]).String(), 8*len(address.GetPrefix())-1) if ifname != "none" { if err := tun.setup(ifname, iftapmode, addr, tun.mtu); err != nil { @@ -159,13 +151,6 @@ func (tun *TunAdapter) _start() error { return nil } tun.isOpen = true - tun.reconfigure = make(chan chan error) - go func() { - for { - e := <-tun.reconfigure - e <- nil - } - }() go tun.handler() tun.reader.Act(nil, tun.reader._read) // Start the reader tun.icmpv6.Init(tun) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index e1911e65..1a7c3d96 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -267,22 +267,22 @@ func (c *Core) ListenTCP(uri string) (*TcpListener, error) { // NodeID gets the node ID. func (c *Core) NodeID() *crypto.NodeID { - return crypto.GetNodeID(&c.boxPub) + return crypto.GetNodeID(&c.router.boxPub) } // TreeID gets the tree ID. func (c *Core) TreeID() *crypto.TreeID { - return crypto.GetTreeID(&c.sigPub) + return crypto.GetTreeID(&c.switchTable.sigPub) } // SigningPublicKey gets the node's signing public key. func (c *Core) SigningPublicKey() string { - return hex.EncodeToString(c.sigPub[:]) + return hex.EncodeToString(c.switchTable.sigPub[:]) } // EncryptionPublicKey gets the node's encryption public key. func (c *Core) EncryptionPublicKey() string { - return hex.EncodeToString(c.boxPub[:]) + return hex.EncodeToString(c.router.boxPub[:]) } // Coords returns the current coordinates of the node. @@ -361,6 +361,19 @@ func (c *Core) SetLogger(log *log.Logger) { c.log = log } +// SetSwitchMaxTotalQueueSize sets the maximum allowed size that switch queues +// can occupy in memory. +func (c *Core) SetSwitchMaxTotalQueueSize(value uint64) (err error) { + phony.Block(&c.switchTable, func() { + if value > SwitchQueueTotalMinSize { + c.switchTable.queues.totalMaxSize = value + } else { + err = fmt.Errorf("queue total size minimum is", SwitchQueueTotalMinSize) + } + }) + return +} + // AddPeer adds a peer. This should be specified in the peer URI format, e.g.: // tcp://a.b.c.d:e // socks://a.b.c.d:e/f.g.h.i:j @@ -370,12 +383,13 @@ func (c *Core) AddPeer(addr string, sintf string) error { if err := c.CallPeer(addr, sintf); err != nil { return err } - config := c.GetConfig() + // TODO: PERSISTENCE! + /*config := c.GetConfig() if sintf == "" { config.Peers = append(config.Peers, addr) } else { config.InterfacePeers[sintf] = append(config.InterfacePeers[sintf], addr) - } + }*/ return nil } @@ -407,21 +421,44 @@ func (c *Core) DisconnectPeer(port uint64) error { // GetAllowedEncryptionPublicKeys returns the public keys permitted for incoming // peer connections. func (c *Core) GetAllowedEncryptionPublicKeys() []string { - return c.peers.getAllowedEncryptionPublicKeys() + var strs []string + phony.Block(&c.peers, func() { + keys := c.peers._getAllowedEncryptionPublicKeys() + for _, v := range keys { + strs = append(strs, hex.EncodeToString(v[:])) + } + }) + return strs } // AddAllowedEncryptionPublicKey whitelists a key for incoming peer connections. func (c *Core) AddAllowedEncryptionPublicKey(bstr string) (err error) { - c.peers.addAllowedEncryptionPublicKey(bstr) - return nil + key, err := hex.DecodeString(bstr) + if err != nil { + return + } + phony.Block(&c.peers, func() { + var k crypto.BoxPubKey + copy(k[:], key[:]) + err = c.peers._addAllowedEncryptionPublicKey(&k) + }) + return } // RemoveAllowedEncryptionPublicKey removes a key from the whitelist for // incoming peer connections. If none are set, an empty list permits all // incoming connections. func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) { - c.peers.removeAllowedEncryptionPublicKey(bstr) - return nil + key, err := hex.DecodeString(bstr) + if err != nil { + return + } + phony.Block(&c.peers, func() { + var k crypto.BoxPubKey + copy(k[:], key[:]) + err = c.peers._removeAllowedEncryptionPublicKey(&k) + }) + return } // DHTPing sends a DHT ping to the node with the provided key and coords, diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 352cf9d9..74c1d743 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -315,7 +315,7 @@ func (c *Conn) Close() (err error) { } func (c *Conn) LocalAddr() crypto.NodeID { - return *crypto.GetNodeID(&c.core.boxPub) + return *crypto.GetNodeID(&c.core.router.boxPub) } func (c *Conn) RemoteAddr() crypto.NodeID { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 3f216eba..5f2baef6 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -1,16 +1,11 @@ package yggdrasil import ( - "encoding/hex" - "errors" "io/ioutil" - "sync/atomic" - "time" "github.com/Arceliar/phony" "github.com/gologme/log" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/version" ) @@ -22,11 +17,6 @@ type Core struct { // We're going to keep our own copy of the provided config - that way we can // guarantee that it will be covered by the mutex phony.Inbox - config atomic.Value // *config.NodeConfig - boxPub crypto.BoxPubKey - boxPriv crypto.BoxPrivKey - sigPub crypto.SigPubKey - sigPriv crypto.SigPrivKey switchTable switchTable peers peers router router @@ -34,7 +24,7 @@ type Core struct { log *log.Logger } -func (c *Core) _init() error { +func (c *Core) _init(boxPrivKey *crypto.BoxPrivKey, sigPrivKey *crypto.SigPrivKey) error { // TODO separate init and start functions // Init sets up structs // Start launches goroutines that depend on structs being set up @@ -43,42 +33,49 @@ func (c *Core) _init() error { c.log = log.New(ioutil.Discard, "", 0) } - current := c.config.Load().(*config.NodeConfig) + /* + current := c.config.Load().(*config.NodeConfig) - boxPrivHex, err := hex.DecodeString(current.EncryptionPrivateKey) - if err != nil { - return err - } - if len(boxPrivHex) < crypto.BoxPrivKeyLen { - return errors.New("EncryptionPrivateKey is incorrect length") - } + boxPrivHex, err := hex.DecodeString(current.EncryptionPrivateKey) + if err != nil { + return err + } + if len(boxPrivHex) < crypto.BoxPrivKeyLen { + return errors.New("EncryptionPrivateKey is incorrect length") + } - sigPrivHex, err := hex.DecodeString(current.SigningPrivateKey) - if err != nil { - return err - } - if len(sigPrivHex) < crypto.SigPrivKeyLen { - return errors.New("SigningPrivateKey is incorrect length") - } + sigPrivHex, err := hex.DecodeString(current.SigningPrivateKey) + if err != nil { + return err + } + if len(sigPrivHex) < crypto.SigPrivKeyLen { + return errors.New("SigningPrivateKey is incorrect length") + } - copy(c.boxPriv[:], boxPrivHex) - copy(c.sigPriv[:], sigPrivHex) + copy(c.boxPriv[:], boxPrivHex) + copy(c.sigPriv[:], sigPrivHex) + */ + /* + copy(c.boxPriv[:], boxPrivKey[:]) + copy(c.sigPriv[:], sigPrivKey[:]) - boxPub, sigPub := c.boxPriv.Public(), c.sigPriv.Public() + boxPub, sigPub := c.boxPriv.Public(), c.sigPriv.Public() - copy(c.boxPub[:], boxPub[:]) - copy(c.sigPub[:], sigPub[:]) - - if bp := hex.EncodeToString(c.boxPub[:]); current.EncryptionPublicKey != bp { - c.log.Warnln("EncryptionPublicKey in config is incorrect, should be", bp) - } - if sp := hex.EncodeToString(c.sigPub[:]); current.SigningPublicKey != sp { - c.log.Warnln("SigningPublicKey in config is incorrect, should be", sp) - } + copy(c.boxPub[:], boxPub[:]) + copy(c.sigPub[:], sigPub[:]) + */ + /* + if bp := hex.EncodeToString(c.boxPub[:]); current.EncryptionPublicKey != bp { + c.log.Warnln("EncryptionPublicKey in config is incorrect, should be", bp) + } + if sp := hex.EncodeToString(c.sigPub[:]); current.SigningPublicKey != sp { + c.log.Warnln("SigningPublicKey in config is incorrect, should be", sp) + } + */ c.peers.init(c) - c.router.init(c) - c.switchTable.init(c) // TODO move before peers? before router? + c.router.init(c, *boxPrivKey) + c.switchTable.init(c, *sigPrivKey) // TODO move before peers? before router? return nil } @@ -87,52 +84,30 @@ func (c *Core) _init() error { // configure them. The loop ensures that disconnected peers will eventually // be reconnected with. func (c *Core) _addPeerLoop() { - // Get the peers from the config - these could change! - current := c.GetConfig() + // TODO: PERSISTENCE + /* + // Get the peers from the config - these could change! + current := c.GetConfig() - // Add peers from the Peers section - for _, peer := range current.Peers { - go c.AddPeer(peer, "") // TODO: this should be acted and not in a goroutine? - time.Sleep(time.Second) - } - - // Add peers from the InterfacePeers section - for intf, intfpeers := range current.InterfacePeers { - for _, peer := range intfpeers { - go c.AddPeer(peer, intf) // TODO: this should be acted and not in a goroutine? + // Add peers from the Peers section + for _, peer := range current.Peers { + go c.AddPeer(peer, "") // TODO: this should be acted and not in a goroutine? time.Sleep(time.Second) } - } - // Sit for a while - time.AfterFunc(time.Minute, func() { - c.Act(c, c._addPeerLoop) - }) -} + // Add peers from the InterfacePeers section + for intf, intfpeers := range current.InterfacePeers { + for _, peer := range intfpeers { + go c.AddPeer(peer, intf) // TODO: this should be acted and not in a goroutine? + time.Sleep(time.Second) + } + } -// GetConfig atomically returns the current active node configuration. -func (c *Core) GetConfig() *config.NodeConfig { - return c.config.Load().(*config.NodeConfig) -} - -// UpdateConfig updates the configuration in Core with the provided -// config.NodeConfig and then signals the various module goroutines to -// reconfigure themselves if needed. -func (c *Core) UpdateConfig(config *config.NodeConfig) { - c.Act(nil, func() { - c.log.Debugln("Reloading node configuration...") - - new, old := config, c.GetConfig() - c.config.Store(new) - - // Notify the router and switch about the new configuration - c.router.Act(c, func() { - c.router.reconfigure(new, old) + // Sit for a while + time.AfterFunc(time.Minute, func() { + c.Act(c, c._addPeerLoop) }) - c.switchTable.Act(c, func() { - c.switchTable.reconfigure(new, old) - }) - }) + */ } // Start starts up Yggdrasil using the provided config.NodeConfig, and outputs @@ -140,17 +115,17 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { // TCP and UDP sockets, a multicast discovery socket, an admin socket, router, // switch and DHT node. A config.NodeState is returned which contains both the // current and previous configurations (from reconfigures). -func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (err error) { +func (c *Core) Start(boxPrivKey *crypto.BoxPrivKey, sigPrivKey *crypto.SigPrivKey, log *log.Logger) (err error) { phony.Block(c, func() { - err = c._start(nc, log) + err = c._start(boxPrivKey, sigPrivKey, log) }) return } // This function is unsafe and should only be ran by the core actor. -func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) error { +func (c *Core) _start(boxPrivKey *crypto.BoxPrivKey, sigPrivKey *crypto.SigPrivKey, log *log.Logger) error { c.log = log - c.config.Store(nc) + //c.config.Store(nc) if name := version.BuildName(); name != "unknown" { c.log.Infoln("Build name:", name) @@ -160,7 +135,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) error { } c.log.Infoln("Starting up...") - c._init() + c._init(boxPrivKey, sigPrivKey) if err := c.link.init(c); err != nil { c.log.Errorln("Failed to start link interfaces") diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 575c8b1a..ee8bd2ee 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -82,10 +82,6 @@ func (t *dht) init(r *router) { t.reset() } -func (t *dht) reconfigure() { - // This is where reconfiguration would go, if we had anything to do -} - // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { @@ -189,7 +185,7 @@ func (t *dht) handleReq(req *dhtReq) { loc := t.router.core.switchTable.getLocator() coords := loc.getCoords() res := dhtRes{ - Key: t.router.core.boxPub, + Key: t.router.boxPub, Coords: coords, Dest: req.Dest, Infos: t.lookup(&req.Dest, false), @@ -217,12 +213,12 @@ func (t *dht) handleReq(req *dhtReq) { func (t *dht) sendRes(res *dhtRes, req *dhtReq) { // Send a reply for a dhtReq bs := res.encode() - shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &req.Key) + shared := t.router.sessions.getSharedKey(&t.router.boxPriv, &req.Key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: req.Coords, ToKey: req.Key, - FromKey: t.router.core.boxPub, + FromKey: t.router.boxPub, Nonce: *nonce, Payload: payload, } @@ -281,12 +277,12 @@ func (t *dht) handleRes(res *dhtRes) { func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { // Send a dhtReq to the node in dhtInfo bs := req.encode() - shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &dest.key) + shared := t.router.sessions.getSharedKey(&t.router.boxPriv, &dest.key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: dest.coords, ToKey: dest.key, - FromKey: t.router.core.boxPub, + FromKey: t.router.boxPub, Nonce: *nonce, Payload: payload, } @@ -305,7 +301,7 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) { loc := t.router.core.switchTable.getLocator() coords := loc.getCoords() req := dhtReq{ - Key: t.router.core.boxPub, + Key: t.router.boxPub, Coords: coords, Dest: *target, } @@ -409,7 +405,7 @@ func (t *dht) getImportant() []*dhtInfo { // Returns true if this is a node we need to keep track of for the DHT to work. func (t *dht) isImportant(ninfo *dhtInfo) bool { - if ninfo.key == t.router.core.boxPub { + if ninfo.key == t.router.boxPub { return false } important := t.getImportant() diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 4ee579dd..06da862d 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -14,7 +14,6 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" @@ -80,10 +79,6 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) reconfigure(current, previous *config.NodeConfig) { - l.tcp.reconfigure(current, previous) -} - func (l *link) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { @@ -139,8 +134,8 @@ func (intf *linkInterface) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() - meta.box = intf.link.core.boxPub - meta.sig = intf.link.core.sigPub + meta.box = intf.link.core.router.boxPub + meta.sig = intf.link.core.switchTable.sigPub meta.link = *myLinkPub metaBytes := meta.encode() // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 8a5d7872..382a6e28 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -172,12 +172,12 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse NodeInfo: m.getNodeInfo(), } bs := nodeinfo.encode() - shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key) + shared := m.core.router.sessions.getSharedKey(&m.core.router.boxPriv, &key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: coords, ToKey: key, - FromKey: m.core.boxPub, + FromKey: m.core.router.boxPub, Nonce: *nonce, Payload: payload, } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 70ff2ea6..e09d2c29 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -5,12 +5,12 @@ package yggdrasil // Live code should be better commented import ( - "encoding/hex" + "bytes" + "errors" "sync" "sync/atomic" "time" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" @@ -22,9 +22,11 @@ import ( // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { - core *Core - mutex sync.Mutex // Synchronize writes to atomic - ports atomic.Value //map[switchPort]*peer, use CoW semantics + phony.Inbox + core *Core + mutex sync.Mutex // Synchronize writes to atomic + ports atomic.Value // map[switchPort]*peer, use CoW semantics + allowedBoxPubKeys []crypto.BoxPubKey // protected by actor } // Initializes the peers struct. @@ -35,42 +37,50 @@ func (ps *peers) init(c *Core) { ps.core = c } -func (ps *peers) reconfigure(current, previous *config.NodeConfig) { - // This is where reconfiguration would go, if we had anything to do +// AddAllowedEncryptionPublicKey whitelists a key for incoming peer connections. +func (ps *peers) isAllowedEncryptionPublicKey(key *crypto.BoxPubKey) (allowed bool) { + phony.Block(ps, func() { + allowed = ps._isAllowedEncryptionPublicKey(key) + }) + return } // Returns true if an incoming peer connection to a key is allowed, either // because the key is in the whitelist or because the whitelist is empty. -func (ps *peers) isAllowedEncryptionPublicKey(box *crypto.BoxPubKey) bool { - boxstr := hex.EncodeToString(box[:]) - current := ps.core.GetConfig() - for _, v := range current.AllowedEncryptionPublicKeys { - if v == boxstr { +func (ps *peers) _isAllowedEncryptionPublicKey(key *crypto.BoxPubKey) bool { + for _, v := range ps.allowedBoxPubKeys { + if bytes.Equal(v[:], key[:]) { return true } } - return len(current.AllowedEncryptionPublicKeys) == 0 + return len(ps.allowedBoxPubKeys) == 0 } // Adds a key to the whitelist. -func (ps *peers) addAllowedEncryptionPublicKey(box string) { - current := ps.core.GetConfig() - current.AllowedEncryptionPublicKeys = append(current.AllowedEncryptionPublicKeys, box) +func (ps *peers) _addAllowedEncryptionPublicKey(key *crypto.BoxPubKey) error { + for _, v := range ps.allowedBoxPubKeys { + if bytes.Equal(v[:], key[:]) { + return errors.New("public key already allowed") + } + } + ps.allowedBoxPubKeys = append(ps.allowedBoxPubKeys, *key) + return nil } // Removes a key from the whitelist. -func (ps *peers) removeAllowedEncryptionPublicKey(box string) { - current := ps.core.GetConfig() - for k, v := range current.AllowedEncryptionPublicKeys { - if v == box { - current.AllowedEncryptionPublicKeys = append(current.AllowedEncryptionPublicKeys[:k], current.AllowedEncryptionPublicKeys[k+1:]...) +func (ps *peers) _removeAllowedEncryptionPublicKey(key *crypto.BoxPubKey) error { + for k, v := range ps.allowedBoxPubKeys { + if bytes.Equal(v[:], key[:]) { + ps.allowedBoxPubKeys = append(ps.allowedBoxPubKeys[:k], ps.allowedBoxPubKeys[k+1:]...) + return nil } } + return errors.New("public key already not allowed") } // Gets the whitelist of allowed keys for incoming connections. -func (ps *peers) getAllowedEncryptionPublicKeys() []string { - return ps.core.GetConfig().AllowedEncryptionPublicKeys +func (ps *peers) _getAllowedEncryptionPublicKeys() []crypto.BoxPubKey { + return ps.allowedBoxPubKeys } // Atomically gets a map[switchPort]*peer of known peers. @@ -110,7 +120,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare now := time.Now() p := peer{box: *box, sig: *sig, - shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), + shared: *crypto.GetSharedKey(&ps.core.router.boxPriv, box), linkShared: *linkShared, firstSeen: now, done: make(chan struct{}), @@ -314,7 +324,7 @@ func (p *peer) _sendSwitchMsg() { msg.Hops = append(msg.Hops, switchMsgHop{ Port: p.port, Next: p.sig, - Sig: *crypto.Sign(&p.core.sigPriv, bs), + Sig: *crypto.Sign(&p.core.switchTable.sigPriv, bs), }) packet := msg.encode() p._sendLinkPacket(packet) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index ed080c01..0541153a 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -28,7 +28,6 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" @@ -40,6 +39,8 @@ import ( type router struct { phony.Inbox core *Core + boxPub crypto.BoxPubKey + boxPriv crypto.BoxPrivKey addr address.Address subnet address.Subnet out func([]byte) // packets we're sending to the network, link to peer's "in" @@ -50,7 +51,7 @@ type router struct { } // Initializes the router struct, which includes setting up channels to/from the adapter. -func (r *router) init(core *Core) { +func (r *router) init(core *Core, boxPrivKey crypto.BoxPrivKey) { r.core = core r.addr = *address.AddrForNodeID(&r.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) @@ -62,32 +63,17 @@ func (r *router) init(core *Core) { linkType: "self", }, } - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) + p := r.core.peers.newPeer(&r.boxPub, &r.core.switchTable.sigPub, &crypto.BoxSharedKey{}, &self, nil) p.out = func(packets [][]byte) { r.handlePackets(p, packets) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) - current := r.core.GetConfig() - r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) + //current := r.core.GetConfig() + //r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) r.dht.init(r) r.searches.init(r) r.sessions.init(r) } -// Reconfigures the router and any child modules. This should only ever be run -// by the router actor. -func (r *router) reconfigure(current, previous *config.NodeConfig) { - // Reconfigure the router - if err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy); err != nil { - r.core.log.Errorln("Error reloading NodeInfo:", err) - } else { - r.core.log.Infoln("NodeInfo updated") - } - // Reconfigure children - r.dht.reconfigure() - r.searches.reconfigure() - r.sessions.reconfigure() -} - // Starts the tickerLoop goroutine. func (r *router) start() error { r.core.log.Infoln("Starting router") @@ -119,8 +105,6 @@ func (r *router) reset(from phony.Actor) { }) } -// TODO remove reconfigure so this is just a ticker loop -// and then find something better than a ticker loop to schedule things... func (r *router) doMaintenance() { phony.Block(r, func() { // Any periodic maintenance stuff goes here @@ -171,9 +155,9 @@ func (r *router) _handleProto(packet []byte) { } // Now try to open the payload var sharedKey *crypto.BoxSharedKey - if p.ToKey == r.core.boxPub { + if p.ToKey == r.boxPub { // Try to open using our permanent key - sharedKey = r.sessions.getSharedKey(&r.core.boxPriv, &p.FromKey) + sharedKey = r.sessions.getSharedKey(&r.boxPriv, &p.FromKey) } else { return } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 322131ed..bbf5db42 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -55,10 +55,6 @@ func (s *searches) init(r *router) { s.searches = make(map[crypto.NodeID]*searchInfo) } -func (s *searches) reconfigure() { - // This is where reconfiguration would go, if we had anything to do -} - // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { info := searchInfo{ @@ -169,7 +165,7 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb sinfo.visited = make(map[crypto.NodeID]bool) loc := s.router.core.switchTable.getLocator() sinfo.toVisit = append(sinfo.toVisit, &dhtInfo{ - key: s.router.core.boxPub, + key: s.router.boxPub, coords: loc.getCoords(), }) // Start the search by asking ourself, useful if we're the destination return sinfo diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 9e20ca74..de901cbe 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -55,10 +55,6 @@ type sessionInfo struct { callbacks []chan func() // Finished work from crypto workers } -func (sinfo *sessionInfo) reconfigure() { - // This is where reconfiguration would go, if we had anything to do -} - // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. type sessionPing struct { SendPermPub crypto.BoxPubKey // Sender's permanent key @@ -135,12 +131,6 @@ func (ss *sessions) init(r *router) { ss.lastCleanup = time.Now() } -func (ss *sessions) reconfigure() { - for _, session := range ss.sinfos { - session.reconfigure() - } -} - // Determines whether the session with a given publickey is allowed based on // session firewall rules. func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { @@ -181,13 +171,13 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo := sessionInfo{} sinfo.sessions = ss sinfo.theirPermPub = *theirPermKey - sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub) + sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.boxPriv, &sinfo.theirPermPub) pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub sinfo.mySesPriv = *priv sinfo.myNonce = *crypto.NewBoxNonce() sinfo.theirMTU = 1280 - sinfo.myMTU = uint16(ss.router.core.GetConfig().IfMTU) + //sinfo.myMTU = uint16(ss.router.core.GetConfig().IfMTU) now := time.Now() sinfo.timeOpened = now sinfo.time = now @@ -197,11 +187,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.init = make(chan struct{}) sinfo.cancel = util.NewCancellation() higher := false - for idx := range ss.router.core.boxPub { - if ss.router.core.boxPub[idx] > sinfo.theirPermPub[idx] { + for idx := range ss.router.boxPub { + if ss.router.boxPub[idx] > sinfo.theirPermPub[idx] { higher = true break - } else if ss.router.core.boxPub[idx] < sinfo.theirPermPub[idx] { + } else if ss.router.boxPub[idx] < sinfo.theirPermPub[idx] { break } } @@ -267,7 +257,7 @@ func (sinfo *sessionInfo) _getPing() sessionPing { loc := sinfo.sessions.router.core.switchTable.getLocator() coords := loc.getCoords() ping := sessionPing{ - SendPermPub: sinfo.sessions.router.core.boxPub, + SendPermPub: sinfo.sessions.router.boxPub, Handle: sinfo.myHandle, SendSesPub: sinfo.mySesPub, Tstamp: time.Now().Unix(), @@ -318,7 +308,7 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { p := wire_protoTrafficPacket{ Coords: sinfo.coords, ToKey: sinfo.theirPermPub, - FromKey: sinfo.sessions.router.core.boxPub, + FromKey: sinfo.sessions.router.boxPub, Nonce: *nonce, Payload: payload, } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index e01147a2..5945789f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -17,7 +17,6 @@ import ( "sync/atomic" "time" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" @@ -166,7 +165,8 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core - key crypto.SigPubKey // Our own key + sigPub crypto.SigPubKey // Our own public key + sigPriv crypto.SigPrivKey // Our own private key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root mutex sync.RWMutex // Lock for reads/writes of switchData @@ -183,34 +183,24 @@ type switchTable struct { const SwitchQueueTotalMinSize = 4 * 1024 * 1024 // Initializes the switchTable struct. -func (t *switchTable) init(core *Core) { +func (t *switchTable) init(core *Core, sigPriv crypto.SigPrivKey) { now := time.Now() t.core = core - t.key = t.core.sigPub - locator := switchLocator{root: t.key, tstamp: now.Unix()} + t.sigPriv = sigPriv + t.sigPub = t.sigPriv.Public() + locator := switchLocator{root: t.sigPub, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[crypto.SigPubKey]int64) phony.Block(t, func() { - current := t.core.GetConfig() - if current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize { - t.queues.totalMaxSize = current.SwitchOptions.MaxTotalQueueSize - } else { - t.queues.totalMaxSize = SwitchQueueTotalMinSize - } + t.queues.totalMaxSize = SwitchQueueTotalMinSize t.queues.bufs = make(map[string]switch_buffer) t.idle = make(map[switchPort]time.Time) }) } -func (t *switchTable) reconfigure(current, previous *config.NodeConfig) { - // This is where reconfiguration would go, if we had anything useful to do. - t.core.link.reconfigure(current, previous) - t.core.peers.reconfigure(current, previous) -} - // Safely gets a copy of this node's locator. func (t *switchTable) getLocator() switchLocator { t.mutex.RLock() @@ -240,23 +230,23 @@ func (t *switchTable) cleanRoot() { doUpdate = true } // Or, if we're better than our root, root ourself - if firstIsBetter(&t.key, &t.data.locator.root) { + if firstIsBetter(&t.sigPub, &t.data.locator.root) { doUpdate = true } // Or, if we are the root, possibly update our timestamp - if t.data.locator.root == t.key && + if t.data.locator.root == t.sigPub && now.Sub(t.time) > switch_updateInterval { doUpdate = true } if doUpdate { t.parent = switchPort(0) t.time = now - if t.data.locator.root != t.key { + if t.data.locator.root != t.sigPub { t.data.seq++ t.updater.Store(&sync.Once{}) t.core.router.reset(nil) } - t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} + t.data.locator = switchLocator{root: t.sigPub, tstamp: now.Unix()} t.core.peers.sendSwitchMsgs(t) } } @@ -334,7 +324,7 @@ func (t *switchTable) getMsg() *switchMsg { t.mutex.RLock() defer t.mutex.RUnlock() if t.parent == 0 { - return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} + return &switchMsg{Root: t.sigPub, TStamp: t.data.locator.tstamp} } else if parent, isIn := t.data.peers[t.parent]; isIn { msg := parent.msg msg.Hops = append([]switchMsgHop(nil), msg.Hops...) @@ -454,11 +444,11 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep noParent := !isIn noLoop := func() bool { for idx := 0; idx < len(msg.Hops)-1; idx++ { - if msg.Hops[idx].Next == t.core.sigPub { + if msg.Hops[idx].Next == t.sigPub { return false } } - if sender.locator.root == t.core.sigPub { + if sender.locator.root == t.sigPub { return false } return true diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index d2feb3e0..23cb349f 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -24,9 +24,6 @@ import ( "time" "golang.org/x/net/proxy" - - "github.com/yggdrasil-network/yggdrasil-go/src/config" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) const default_timeout = 6 * time.Second @@ -81,19 +78,21 @@ func (t *tcp) init(l *link) error { t.conns = make(map[linkInfo](chan struct{})) t.listeners = make(map[string]*TcpListener) t.mutex.Unlock() - - for _, listenaddr := range t.link.core.GetConfig().Listen { - if listenaddr[:6] != "tcp://" { - continue + /* + for _, listenaddr := range t.link.core.GetConfig().Listen { + if listenaddr[:6] != "tcp://" { + continue + } + if _, err := t.listen(listenaddr[6:]); err != nil { + return err + } } - if _, err := t.listen(listenaddr[6:]); err != nil { - return err - } - } - + */ return nil } +/* +TODO: move this logic to cmd/yggdrasil func (t *tcp) reconfigure(current, previous *config.NodeConfig) { added := util.Difference(current.Listen, previous.Listen) deleted := util.Difference(previous.Listen, current.Listen) @@ -123,6 +122,7 @@ func (t *tcp) reconfigure(current, previous *config.NodeConfig) { } } } +*/ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { var err error