mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	Try and solidify multicast interface behavior
This commit is contained in:
		
							parent
							
								
									2fd3ac6837
								
							
						
					
					
						commit
						54f1804101
					
				
					 3 changed files with 76 additions and 73 deletions
				
			
		| 
						 | 
				
			
			@ -5,7 +5,7 @@ import "github.com/yggdrasil-network/yggdrasil-go/src/admin"
 | 
			
		|||
func (m *Multicast) SetupAdminHandlers(a *admin.AdminSocket) {
 | 
			
		||||
	a.AddHandler("getMulticastInterfaces", []string{}, func(in admin.Info) (admin.Info, error) {
 | 
			
		||||
		var intfs []string
 | 
			
		||||
		for _, v := range m.interfaces() {
 | 
			
		||||
		for _, v := range m.GetInterfaces() {
 | 
			
		||||
			intfs = append(intfs, v.Name)
 | 
			
		||||
		}
 | 
			
		||||
		return admin.Info{"multicast_interfaces": intfs}, nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,6 +5,7 @@ import (
 | 
			
		|||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/gologme/log"
 | 
			
		||||
| 
						 | 
				
			
			@ -19,14 +20,16 @@ import (
 | 
			
		|||
// configured multicast interface, Yggdrasil will attempt to peer with that node
 | 
			
		||||
// automatically.
 | 
			
		||||
type Multicast struct {
 | 
			
		||||
	core        *yggdrasil.Core
 | 
			
		||||
	config      *config.NodeState
 | 
			
		||||
	log         *log.Logger
 | 
			
		||||
	reconfigure chan chan error
 | 
			
		||||
	sock        *ipv6.PacketConn
 | 
			
		||||
	groupAddr   string
 | 
			
		||||
	listeners   map[string]*yggdrasil.TcpListener
 | 
			
		||||
	listenPort  uint16
 | 
			
		||||
	core            *yggdrasil.Core
 | 
			
		||||
	config          *config.NodeState
 | 
			
		||||
	log             *log.Logger
 | 
			
		||||
	sock            *ipv6.PacketConn
 | 
			
		||||
	groupAddr       string
 | 
			
		||||
	listeners       map[string]*yggdrasil.TcpListener
 | 
			
		||||
	listenPort      uint16
 | 
			
		||||
	interfaces      map[string]net.Interface
 | 
			
		||||
	interfacesMutex sync.RWMutex
 | 
			
		||||
	interfacesTime  time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Init prepares the multicast interface for use.
 | 
			
		||||
| 
						 | 
				
			
			@ -34,25 +37,24 @@ func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log
 | 
			
		|||
	m.core = core
 | 
			
		||||
	m.config = state
 | 
			
		||||
	m.log = log
 | 
			
		||||
	m.reconfigure = make(chan chan error, 1)
 | 
			
		||||
	m.listeners = make(map[string]*yggdrasil.TcpListener)
 | 
			
		||||
	m.interfaces = make(map[string]net.Interface)
 | 
			
		||||
	current, _ := m.config.Get()
 | 
			
		||||
	m.listenPort = current.LinkLocalTCPPort
 | 
			
		||||
	m.groupAddr = "[ff02::114]:9001"
 | 
			
		||||
	// Perform our first check for multicast interfaces
 | 
			
		||||
	if count := m.UpdateInterfaces(); count != 0 {
 | 
			
		||||
		m.log.Infoln("Found", count, "multicast interface(s)")
 | 
			
		||||
	} else {
 | 
			
		||||
		m.log.Infoln("Multicast is not enabled on any interfaces")
 | 
			
		||||
	}
 | 
			
		||||
	// Keep checking quietly every minute in case they change
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			e := <-m.reconfigure
 | 
			
		||||
			// There's nothing particularly to do here because the multicast module
 | 
			
		||||
			// already consults the config.NodeState when enumerating multicast
 | 
			
		||||
			// interfaces on each pass. We just need to return nil so that the
 | 
			
		||||
			// reconfiguration doesn't block indefinitely
 | 
			
		||||
			e <- nil
 | 
			
		||||
			time.Sleep(time.Minute)
 | 
			
		||||
			m.UpdateInterfaces()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	m.groupAddr = "[ff02::114]:9001"
 | 
			
		||||
	// Check if we've been given any expressions
 | 
			
		||||
	if count := len(m.interfaces()); count != 0 {
 | 
			
		||||
		m.log.Infoln("Found", count, "multicast interface(s)")
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -60,32 +62,27 @@ func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log
 | 
			
		|||
// listen for multicast beacons from other hosts and will advertise multicast
 | 
			
		||||
// beacons out to the network.
 | 
			
		||||
func (m *Multicast) Start() error {
 | 
			
		||||
	current, _ := m.config.Get()
 | 
			
		||||
	if len(current.MulticastInterfaces) == 0 {
 | 
			
		||||
		m.log.Infoln("Multicast discovery is disabled")
 | 
			
		||||
	} else {
 | 
			
		||||
		m.log.Infoln("Multicast discovery is enabled")
 | 
			
		||||
		addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		listenString := fmt.Sprintf("[::]:%v", addr.Port)
 | 
			
		||||
		lc := net.ListenConfig{
 | 
			
		||||
			Control: m.multicastReuse,
 | 
			
		||||
		}
 | 
			
		||||
		conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		m.sock = ipv6.NewPacketConn(conn)
 | 
			
		||||
		if err = m.sock.SetControlMessage(ipv6.FlagDst, true); err != nil {
 | 
			
		||||
			// Windows can't set this flag, so we need to handle it in other ways
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		go m.multicastStarted()
 | 
			
		||||
		go m.listen()
 | 
			
		||||
		go m.announce()
 | 
			
		||||
	addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	listenString := fmt.Sprintf("[::]:%v", addr.Port)
 | 
			
		||||
	lc := net.ListenConfig{
 | 
			
		||||
		Control: m.multicastReuse,
 | 
			
		||||
	}
 | 
			
		||||
	conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	m.sock = ipv6.NewPacketConn(conn)
 | 
			
		||||
	if err = m.sock.SetControlMessage(ipv6.FlagDst, true); err != nil {
 | 
			
		||||
		// Windows can't set this flag, so we need to handle it in other ways
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go m.multicastStarted()
 | 
			
		||||
	go m.listen()
 | 
			
		||||
	go m.announce()
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -102,34 +99,37 @@ func (m *Multicast) UpdateConfig(config *config.NodeConfig) {
 | 
			
		|||
 | 
			
		||||
	m.config.Replace(*config)
 | 
			
		||||
 | 
			
		||||
	errors := 0
 | 
			
		||||
	m.log.Infoln("Multicast configuration reloaded successfully")
 | 
			
		||||
 | 
			
		||||
	components := []chan chan error{
 | 
			
		||||
		m.reconfigure,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, component := range components {
 | 
			
		||||
		response := make(chan error)
 | 
			
		||||
		component <- response
 | 
			
		||||
		if err := <-response; err != nil {
 | 
			
		||||
			m.log.Errorln(err)
 | 
			
		||||
			errors++
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if errors > 0 {
 | 
			
		||||
		m.log.Warnln(errors, "multicast module(s) reported errors during configuration reload")
 | 
			
		||||
	if count := m.UpdateInterfaces(); count != 0 {
 | 
			
		||||
		m.log.Infoln("Found", count, "multicast interface(s)")
 | 
			
		||||
	} else {
 | 
			
		||||
		m.log.Infoln("Multicast configuration reloaded successfully")
 | 
			
		||||
		m.log.Infoln("Multicast is not enabled on any interfaces")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Multicast) interfaces() map[string]net.Interface {
 | 
			
		||||
// GetInterfaces returns the currently known/enabled multicast interfaces. It is
 | 
			
		||||
// expected that UpdateInterfaces has been called at least once before calling
 | 
			
		||||
// this method.
 | 
			
		||||
func (m *Multicast) GetInterfaces() map[string]net.Interface {
 | 
			
		||||
	m.interfacesMutex.RLock()
 | 
			
		||||
	defer m.interfacesMutex.RUnlock()
 | 
			
		||||
	return m.interfaces
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateInterfaces re-enumerates the available multicast interfaces on the
 | 
			
		||||
// system, using the current MulticastInterfaces config option as a template.
 | 
			
		||||
// The number of selected interfaces is returned.
 | 
			
		||||
func (m *Multicast) UpdateInterfaces() int {
 | 
			
		||||
	m.interfacesMutex.Lock()
 | 
			
		||||
	defer m.interfacesMutex.Unlock()
 | 
			
		||||
	// Get interface expressions from config
 | 
			
		||||
	current, _ := m.config.Get()
 | 
			
		||||
	exprs := current.MulticastInterfaces
 | 
			
		||||
	// Ask the system for network interfaces
 | 
			
		||||
	interfaces := make(map[string]net.Interface)
 | 
			
		||||
	for i := range m.interfaces {
 | 
			
		||||
		delete(m.interfaces, i)
 | 
			
		||||
	}
 | 
			
		||||
	allifaces, err := net.Interfaces()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
| 
						 | 
				
			
			@ -156,11 +156,12 @@ func (m *Multicast) interfaces() map[string]net.Interface {
 | 
			
		|||
			}
 | 
			
		||||
			// Does the interface match the regular expression? Store it if so
 | 
			
		||||
			if e.MatchString(iface.Name) {
 | 
			
		||||
				interfaces[iface.Name] = iface
 | 
			
		||||
				m.interfaces[iface.Name] = iface
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return interfaces
 | 
			
		||||
	m.interfacesTime = time.Now()
 | 
			
		||||
	return len(m.interfaces)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Multicast) announce() {
 | 
			
		||||
| 
						 | 
				
			
			@ -173,7 +174,7 @@ func (m *Multicast) announce() {
 | 
			
		|||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		interfaces := m.interfaces()
 | 
			
		||||
		interfaces := m.GetInterfaces()
 | 
			
		||||
		// There might be interfaces that we configured listeners for but are no
 | 
			
		||||
		// longer up - if that's the case then we should stop the listeners
 | 
			
		||||
		for name, listener := range m.listeners {
 | 
			
		||||
| 
						 | 
				
			
			@ -307,9 +308,11 @@ func (m *Multicast) listen() {
 | 
			
		|||
		if addr.IP.String() != from.IP.String() {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		addr.Zone = ""
 | 
			
		||||
		if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil {
 | 
			
		||||
			m.log.Debugln("Call from multicast failed:", err)
 | 
			
		||||
		if _, ok := m.GetInterfaces()[from.Zone]; ok {
 | 
			
		||||
			addr.Zone = ""
 | 
			
		||||
			if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil {
 | 
			
		||||
				m.log.Debugln("Call from multicast failed:", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,12 +35,12 @@ func (m *Multicast) multicastStarted() {
 | 
			
		|||
	if awdlGoroutineStarted {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	m.log.Infoln("Multicast discovery will wake up AWDL if required")
 | 
			
		||||
	awdlGoroutineStarted = true
 | 
			
		||||
	for {
 | 
			
		||||
		C.StopAWDLBrowsing()
 | 
			
		||||
		for _, intf := range m.interfaces() {
 | 
			
		||||
		for _, intf := range m.GetInterfaces() {
 | 
			
		||||
			if intf.Name == "awdl0" {
 | 
			
		||||
				m.log.Infoln("Multicast discovery is using AWDL discovery")
 | 
			
		||||
				C.StartAWDLBrowsing()
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue