start migrating the router to an actor

This commit is contained in:
Arceliar 2019-08-23 18:47:15 -05:00
parent 562a7d1f19
commit 9d7e7288c6
8 changed files with 51 additions and 54 deletions

View file

@ -30,19 +30,19 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony"
)
// The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer.
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
type router struct {
phony.Actor
core *Core
reconfigure chan chan error
addr address.Address
subnet address.Subnet
in <-chan [][]byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
out func([]byte) // packets we're sending to the network, link to peer's "in"
nodeinfo nodeinfo
}
@ -52,7 +52,6 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan [][]byte, 1) // TODO something better than this...
self := linkInterface{
name: "(self)",
info: linkInfo{
@ -62,8 +61,10 @@ func (r *router) init(core *Core) {
},
}
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packets [][]byte) { in <- packets }
r.in = in
p.out = func(packets [][]byte) {
// TODO make peers and/or the switch into actors, have them pass themselves as the from field
r.handlePackets(r, packets)
}
out := make(chan []byte, 32)
go func() {
for packet := range out {
@ -90,8 +91,6 @@ func (r *router) init(core *Core) {
}
}()
r.out = func(packet []byte) { out2 <- packet }
r.reset = make(chan struct{}, 1)
r.admin = make(chan func(), 32)
r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock()
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)
@ -105,42 +104,55 @@ func (r *router) start() error {
return nil
}
// Takes traffic from the adapter and passes it to router.send, or from r.in and handles incoming traffic.
// Also adds new peer info to the DHT.
// Also resets the DHT and sesssions in the event of a coord change.
// Also does periodic maintenance stuff.
// In practice, the switch will call this with 1 packet
func (r *router) handlePackets(from phony.IActor, packets [][]byte) {
r.EnqueueFrom(from, func() {
for _, packet := range packets {
r.handlePacket(packet)
}
})
}
// Insert a peer info into the dht, TODO? make the dht a separate actor
func (r *router) insertPeer(from phony.IActor, info *dhtInfo) {
r.EnqueueFrom(from, func() {
r.core.dht.insertPeer(info)
})
}
// Reset sessions and DHT after the switch sees our coords change
func (r *router) reset(from phony.IActor) {
r.EnqueueFrom(from, func() {
r.core.sessions.reset()
r.core.dht.reset()
})
}
// TODO remove reconfigure so this is just a ticker loop
// and then find something better than a ticker loop to schedule things...
func (r *router) mainLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case ps := <-r.in:
for _, p := range ps {
r.handleIn(p)
}
case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info)
case <-r.reset:
r.core.sessions.reset()
r.core.dht.reset()
case <-ticker.C:
{
r.SyncExec(func() {
// Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance()
r.core.sessions.cleanup()
}
case f := <-r.admin:
f()
})
case e := <-r.reconfigure:
current := r.core.config.GetCurrent()
e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy)
r.SyncExec(func() {
current := r.core.config.GetCurrent()
e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy)
})
}
}
}
// Checks incoming traffic type and passes it to the appropriate handler.
func (r *router) handleIn(packet []byte) {
func (r *router) handlePacket(packet []byte) {
pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen == 0 {
return
@ -263,17 +275,7 @@ func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
r.nodeinfo.handleNodeInfo(&req)
}
// Passed a function to call.
// This will send the function to r.admin and block until it finishes.
// It's used by the admin socket to ask the router mainLoop goroutine about information in the session or dht structs, which cannot be read safely from outside that goroutine.
// TODO remove this, have things either be actors that send message or else call SyncExec directly
func (r *router) doAdmin(f func()) {
// Pass this a function that needs to be run by the router's main goroutine
// It will pass the function to the router and wait for the router to finish
done := make(chan struct{})
newF := func() {
f()
close(done)
}
r.admin <- newF
<-done
r.SyncExec(f)
}