mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	Handle errors from reconfigure tasks
This commit is contained in:
		
							parent
							
								
									2925920c70
								
							
						
					
					
						commit
						7fae1c993a
					
				
					 9 changed files with 64 additions and 72 deletions
				
			
		| 
						 | 
				
			
			@ -23,7 +23,7 @@ import (
 | 
			
		|||
 | 
			
		||||
type admin struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	listenaddr  string
 | 
			
		||||
	listener    net.Listener
 | 
			
		||||
	handlers    []admin_handlerInfo
 | 
			
		||||
| 
						 | 
				
			
			@ -54,18 +54,17 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info)
 | 
			
		|||
// init runs the initial admin setup.
 | 
			
		||||
func (a *admin) init(c *Core) {
 | 
			
		||||
	a.core = c
 | 
			
		||||
	a.reconfigure = make(chan bool, 1)
 | 
			
		||||
	a.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case _ = <-a.reconfigure:
 | 
			
		||||
			case e := <-a.reconfigure:
 | 
			
		||||
				a.core.configMutex.RLock()
 | 
			
		||||
				a.core.log.Println("Notified: admin")
 | 
			
		||||
				if a.core.config.AdminListen != a.core.configOld.AdminListen {
 | 
			
		||||
					a.core.log.Println("AdminListen has changed!")
 | 
			
		||||
				}
 | 
			
		||||
				a.core.configMutex.RUnlock()
 | 
			
		||||
				continue
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -107,14 +107,24 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
 | 
			
		|||
	c.config = *config
 | 
			
		||||
	c.configMutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	c.admin.reconfigure <- true
 | 
			
		||||
	c.searches.reconfigure <- true
 | 
			
		||||
	c.dht.reconfigure <- true
 | 
			
		||||
	c.sessions.reconfigure <- true
 | 
			
		||||
	c.multicast.reconfigure <- true
 | 
			
		||||
	c.peers.reconfigure <- true
 | 
			
		||||
	c.router.reconfigure <- true
 | 
			
		||||
	c.switchTable.reconfigure <- true
 | 
			
		||||
	components := []chan chan error{
 | 
			
		||||
		c.admin.reconfigure,
 | 
			
		||||
		c.searches.reconfigure,
 | 
			
		||||
		c.dht.reconfigure,
 | 
			
		||||
		c.sessions.reconfigure,
 | 
			
		||||
		c.multicast.reconfigure,
 | 
			
		||||
		c.peers.reconfigure,
 | 
			
		||||
		c.router.reconfigure,
 | 
			
		||||
		c.switchTable.reconfigure,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, component := range components {
 | 
			
		||||
		response := make(chan error)
 | 
			
		||||
		component <- response
 | 
			
		||||
		if err := <-response; err != nil {
 | 
			
		||||
			c.log.Println(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetBuildName gets the current build name. This is usually injected if built
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -66,7 +66,7 @@ type dhtReqKey struct {
 | 
			
		|||
// The main DHT struct.
 | 
			
		||||
type dht struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	nodeID      crypto.NodeID
 | 
			
		||||
	peers       chan *dhtInfo                  // other goroutines put incoming dht updates here
 | 
			
		||||
	reqs        map[dhtReqKey]time.Time        // Keeps track of recent outstanding requests
 | 
			
		||||
| 
						 | 
				
			
			@ -79,15 +79,12 @@ type dht struct {
 | 
			
		|||
// Initializes the DHT.
 | 
			
		||||
func (t *dht) init(c *Core) {
 | 
			
		||||
	t.core = c
 | 
			
		||||
	t.reconfigure = make(chan bool, 1)
 | 
			
		||||
	t.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case _ = <-t.reconfigure:
 | 
			
		||||
				t.core.configMutex.RLock()
 | 
			
		||||
				t.core.log.Println("Notified: dht")
 | 
			
		||||
				t.core.configMutex.RUnlock()
 | 
			
		||||
				continue
 | 
			
		||||
			case e := <-t.reconfigure:
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,22 +11,19 @@ import (
 | 
			
		|||
 | 
			
		||||
type multicast struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	sock        *ipv6.PacketConn
 | 
			
		||||
	groupAddr   string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *multicast) init(core *Core) {
 | 
			
		||||
	m.core = core
 | 
			
		||||
	m.reconfigure = make(chan bool, 1)
 | 
			
		||||
	m.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case _ = <-m.reconfigure:
 | 
			
		||||
				m.core.configMutex.RLock()
 | 
			
		||||
				m.core.log.Println("Notified: multicast")
 | 
			
		||||
				m.core.configMutex.RUnlock()
 | 
			
		||||
				continue
 | 
			
		||||
			case e := <-m.reconfigure:
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,7 +19,7 @@ import (
 | 
			
		|||
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
 | 
			
		||||
type peers struct {
 | 
			
		||||
	core                        *Core
 | 
			
		||||
	reconfigure                 chan bool
 | 
			
		||||
	reconfigure                 chan chan error
 | 
			
		||||
	mutex                       sync.Mutex   // Synchronize writes to atomic
 | 
			
		||||
	ports                       atomic.Value //map[switchPort]*peer, use CoW semantics
 | 
			
		||||
	authMutex                   sync.RWMutex
 | 
			
		||||
| 
						 | 
				
			
			@ -32,15 +32,12 @@ func (ps *peers) init(c *Core) {
 | 
			
		|||
	defer ps.mutex.Unlock()
 | 
			
		||||
	ps.putPorts(make(map[switchPort]*peer))
 | 
			
		||||
	ps.core = c
 | 
			
		||||
	ps.reconfigure = make(chan bool, 1)
 | 
			
		||||
	ps.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case _ = <-ps.reconfigure:
 | 
			
		||||
				ps.core.configMutex.RLock()
 | 
			
		||||
				ps.core.log.Println("Notified: peers")
 | 
			
		||||
				ps.core.configMutex.RUnlock()
 | 
			
		||||
				continue
 | 
			
		||||
			case e := <-ps.reconfigure:
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -38,7 +38,7 @@ import (
 | 
			
		|||
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
 | 
			
		||||
type router struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	addr        address.Address
 | 
			
		||||
	subnet      address.Subnet
 | 
			
		||||
	in          <-chan []byte          // packets we received from the network, link to peer's "out"
 | 
			
		||||
| 
						 | 
				
			
			@ -62,7 +62,7 @@ type router_recvPacket struct {
 | 
			
		|||
// Initializes the router struct, which includes setting up channels to/from the tun/tap.
 | 
			
		||||
func (r *router) init(core *Core) {
 | 
			
		||||
	r.core = core
 | 
			
		||||
	r.reconfigure = make(chan bool, 1)
 | 
			
		||||
	r.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
 | 
			
		||||
	r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
 | 
			
		||||
	in := make(chan []byte, 32) // TODO something better than this...
 | 
			
		||||
| 
						 | 
				
			
			@ -126,11 +126,8 @@ func (r *router) mainLoop() {
 | 
			
		|||
			}
 | 
			
		||||
		case f := <-r.admin:
 | 
			
		||||
			f()
 | 
			
		||||
		case _ = <-r.reconfigure:
 | 
			
		||||
			r.core.configMutex.RLock()
 | 
			
		||||
			r.core.log.Println("Notified: router")
 | 
			
		||||
			r.core.configMutex.RUnlock()
 | 
			
		||||
			continue
 | 
			
		||||
		case e := <-r.reconfigure:
 | 
			
		||||
			e <- nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -43,22 +43,19 @@ type searchInfo struct {
 | 
			
		|||
// This stores a map of active searches.
 | 
			
		||||
type searches struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	searches    map[crypto.NodeID]*searchInfo
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Intializes the searches struct.
 | 
			
		||||
func (s *searches) init(core *Core) {
 | 
			
		||||
	s.core = core
 | 
			
		||||
	s.reconfigure = make(chan bool, 1)
 | 
			
		||||
	s.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case _ = <-s.reconfigure:
 | 
			
		||||
				s.core.configMutex.RLock()
 | 
			
		||||
				s.core.log.Println("Notified: searches")
 | 
			
		||||
				s.core.configMutex.RUnlock()
 | 
			
		||||
				continue
 | 
			
		||||
			case e := <-s.reconfigure:
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,7 +18,7 @@ import (
 | 
			
		|||
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
 | 
			
		||||
type sessionInfo struct {
 | 
			
		||||
	core         *Core
 | 
			
		||||
	reconfigure  chan bool
 | 
			
		||||
	reconfigure  chan chan error
 | 
			
		||||
	theirAddr    address.Address
 | 
			
		||||
	theirSubnet  address.Subnet
 | 
			
		||||
	theirPermPub crypto.BoxPubKey
 | 
			
		||||
| 
						 | 
				
			
			@ -102,7 +102,7 @@ func (s *sessionInfo) timedout() bool {
 | 
			
		|||
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
 | 
			
		||||
type sessions struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	reconfigure chan bool
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	lastCleanup time.Time
 | 
			
		||||
	// Maps known permanent keys to their shared key, used by DHT a lot
 | 
			
		||||
	permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey
 | 
			
		||||
| 
						 | 
				
			
			@ -126,19 +126,23 @@ type sessions struct {
 | 
			
		|||
// Initializes the session struct.
 | 
			
		||||
func (ss *sessions) init(core *Core) {
 | 
			
		||||
	ss.core = core
 | 
			
		||||
	ss.reconfigure = make(chan bool, 1)
 | 
			
		||||
	ss.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case newConfig := <-ss.reconfigure:
 | 
			
		||||
				ss.core.configMutex.RLock()
 | 
			
		||||
				ss.core.log.Println("Notified: sessions")
 | 
			
		||||
				ss.core.configMutex.RUnlock()
 | 
			
		||||
 | 
			
		||||
				for _, sinfo := range ss.sinfos {
 | 
			
		||||
					sinfo.reconfigure <- newConfig
 | 
			
		||||
			case e := <-ss.reconfigure:
 | 
			
		||||
				responses := make(map[crypto.Handle]chan error)
 | 
			
		||||
				for index, session := range ss.sinfos {
 | 
			
		||||
					responses[index] = make(chan error)
 | 
			
		||||
					session.reconfigure <- responses[index]
 | 
			
		||||
				}
 | 
			
		||||
				continue
 | 
			
		||||
				for _, response := range responses {
 | 
			
		||||
					if err := <-response; err != nil {
 | 
			
		||||
						e <- err
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				e <- nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
| 
						 | 
				
			
			@ -289,7 +293,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
 | 
			
		|||
	}
 | 
			
		||||
	sinfo := sessionInfo{}
 | 
			
		||||
	sinfo.core = ss.core
 | 
			
		||||
	sinfo.reconfigure = make(chan bool, 1)
 | 
			
		||||
	sinfo.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	sinfo.theirPermPub = *theirPermKey
 | 
			
		||||
	pub, priv := crypto.NewBoxKeys()
 | 
			
		||||
	sinfo.mySesPub = *pub
 | 
			
		||||
| 
						 | 
				
			
			@ -558,11 +562,8 @@ func (sinfo *sessionInfo) doWorker() {
 | 
			
		|||
			} else {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		case _ = <-sinfo.reconfigure:
 | 
			
		||||
			sinfo.core.configMutex.RLock()
 | 
			
		||||
			sinfo.core.log.Println("Notified: sessionInfo")
 | 
			
		||||
			sinfo.core.configMutex.RUnlock()
 | 
			
		||||
			continue
 | 
			
		||||
		case e := <-sinfo.reconfigure:
 | 
			
		||||
			e <- nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -162,7 +162,7 @@ type switchData struct {
 | 
			
		|||
// All the information stored by the switch.
 | 
			
		||||
type switchTable struct {
 | 
			
		||||
	core              *Core
 | 
			
		||||
	reconfigure       chan bool
 | 
			
		||||
	reconfigure       chan chan error
 | 
			
		||||
	key               crypto.SigPubKey           // Our own key
 | 
			
		||||
	time              time.Time                  // Time when locator.tstamp was last updated
 | 
			
		||||
	drop              map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
 | 
			
		||||
| 
						 | 
				
			
			@ -185,7 +185,7 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024
 | 
			
		|||
func (t *switchTable) init(core *Core) {
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	t.core = core
 | 
			
		||||
	t.reconfigure = make(chan bool, 1)
 | 
			
		||||
	t.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	t.core.configMutex.RLock()
 | 
			
		||||
	t.key = t.core.sigPub
 | 
			
		||||
	t.core.configMutex.RUnlock()
 | 
			
		||||
| 
						 | 
				
			
			@ -812,11 +812,8 @@ func (t *switchTable) doWorker() {
 | 
			
		|||
			}
 | 
			
		||||
		case f := <-t.admin:
 | 
			
		||||
			f()
 | 
			
		||||
		case _ = <-t.reconfigure:
 | 
			
		||||
			t.core.configMutex.RLock()
 | 
			
		||||
			t.core.log.Println("Notified: switchTable")
 | 
			
		||||
			t.core.configMutex.RUnlock()
 | 
			
		||||
			continue
 | 
			
		||||
		case e := <-t.reconfigure:
 | 
			
		||||
			e <- nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue