Cleaner shutdowns, UNIX socket support, more tweaks

This commit is contained in:
Neil Alexander 2022-09-04 17:35:06 +01:00
parent a4c80626f4
commit 3ff2b83e76
8 changed files with 212 additions and 43 deletions

View file

@ -322,7 +322,9 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
if n.admin, err = admin.New(n.core, logger, options...); err != nil { if n.admin, err = admin.New(n.core, logger, options...); err != nil {
panic(err) panic(err)
} }
n.admin.SetupAdminHandlers() if n.admin != nil {
n.admin.SetupAdminHandlers()
}
} }
// Setup the multicast module. // Setup the multicast module.
@ -339,7 +341,7 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
if n.multicast, err = multicast.New(n.core, logger, options...); err != nil { if n.multicast, err = multicast.New(n.core, logger, options...); err != nil {
panic(err) panic(err)
} }
if n.admin != nil { if n.admin != nil && n.multicast != nil {
n.multicast.SetupAdminHandlers(n.admin) n.multicast.SetupAdminHandlers(n.admin)
} }
} }
@ -353,7 +355,7 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) {
if n.tuntap, err = tuntap.New(ipv6rwc.NewReadWriteCloser(n.core), logger, options...); err != nil { if n.tuntap, err = tuntap.New(ipv6rwc.NewReadWriteCloser(n.core), logger, options...); err != nil {
panic(err) panic(err)
} }
if n.admin != nil { if n.admin != nil && n.tuntap != nil {
n.tuntap.SetupAdminHandlers(n.admin) n.tuntap.SetupAdminHandlers(n.admin)
} }
} }

View file

@ -175,6 +175,9 @@ func (a *AdminSocket) IsStarted() bool {
// Stop will stop the admin API and close the socket. // Stop will stop the admin API and close the socket.
func (a *AdminSocket) Stop() error { func (a *AdminSocket) Stop() error {
if a == nil {
return nil
}
if a.listener != nil { if a.listener != nil {
select { select {
case <-a.done: case <-a.done:

View file

@ -143,6 +143,8 @@ func (c *Core) Listen(u *url.URL, sintf string) (*Listener, error) {
return c.links.tcp.listen(u, sintf) return c.links.tcp.listen(u, sintf)
case "tls": case "tls":
return c.links.tls.listen(u, sintf) return c.links.tls.listen(u, sintf)
case "unix":
return c.links.unix.listen(u, sintf)
default: default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme) return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
} }

View file

@ -48,6 +48,12 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C
c := &Core{ c := &Core{
log: logger, log: logger,
} }
if name := version.BuildName(); name != "unknown" {
c.log.Infoln("Build name:", name)
}
if version := version.BuildVersion(); version != "unknown" {
c.log.Infoln("Build version:", version)
}
c.ctx, c.cancel = context.WithCancel(context.Background()) c.ctx, c.cancel = context.WithCancel(context.Background())
// Take a copy of the private key so that it is in our own memory space. // Take a copy of the private key so that it is in our own memory space.
if len(secret) != ed25519.PrivateKeySize { if len(secret) != ed25519.PrivateKeySize {
@ -76,15 +82,17 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C
if err := c.proto.nodeinfo.setNodeInfo(c.config.nodeinfo, bool(c.config.nodeinfoPrivacy)); err != nil { if err := c.proto.nodeinfo.setNodeInfo(c.config.nodeinfo, bool(c.config.nodeinfoPrivacy)); err != nil {
return nil, fmt.Errorf("error setting node info: %w", err) return nil, fmt.Errorf("error setting node info: %w", err)
} }
c.addPeerTimer = time.AfterFunc(time.Minute, func() { for listenaddr := range c.config._listeners {
c.Act(nil, c._addPeerLoop) u, err := url.Parse(string(listenaddr))
}) if err != nil {
if name := version.BuildName(); name != "unknown" { c.log.Errorf("Invalid listener URI %q specified, ignoring\n", listenaddr)
c.log.Infoln("Build name:", name) continue
} }
if version := version.BuildVersion(); version != "unknown" { if _, err = c.links.listen(u, ""); err != nil {
c.log.Infoln("Build version:", version) c.log.Errorf("Failed to start listener %q: %s\n", listenaddr, err)
}
} }
c.Act(nil, c._addPeerLoop)
return c, nil return c, nil
} }
@ -92,10 +100,11 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C
// configure them. The loop ensures that disconnected peers will eventually // configure them. The loop ensures that disconnected peers will eventually
// be reconnected with. // be reconnected with.
func (c *Core) _addPeerLoop() { func (c *Core) _addPeerLoop() {
if c.addPeerTimer == nil { select {
case <-c.ctx.Done():
return return
default:
} }
// Add peers from the Peers section // Add peers from the Peers section
for peer := range c.config._peers { for peer := range c.config._peers {
go func(peer string, intf string) { go func(peer string, intf string) {
@ -126,6 +135,7 @@ func (c *Core) Stop() {
// This function is unsafe and should only be ran by the core actor. // This function is unsafe and should only be ran by the core actor.
func (c *Core) _close() error { func (c *Core) _close() error {
c.cancel() c.cancel()
_ = c.links.shutdown()
err := c.PacketConn.Close() err := c.PacketConn.Close()
if c.addPeerTimer != nil { if c.addPeerTimer != nil {
c.addPeerTimer.Stop() c.addPeerTimer.Stop()

View file

@ -2,7 +2,6 @@ package core
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -27,6 +26,7 @@ type links struct {
core *Core core *Core
tcp *linkTCP // TCP interface support tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
mutex sync.RWMutex // protects links below mutex sync.RWMutex // protects links below
links map[linkInfo]*link // *link is nil if connection in progress links map[linkInfo]*link // *link is nil if connection in progress
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
@ -55,13 +55,20 @@ type linkOptions struct {
type Listener struct { type Listener struct {
net.Listener net.Listener
Close context.CancelFunc // deliberately replaces net.Listener.Close() closed chan struct{}
}
func (l *Listener) Close() error {
err := l.Listener.Close()
<-l.closed
return err
} }
func (l *links) init(c *Core) error { func (l *links) init(c *Core) error {
l.core = c l.core = c
l.tcp = l.newLinkTCP() l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp) l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.mutex.Lock() l.mutex.Lock()
l.links = make(map[linkInfo]*link) l.links = make(map[linkInfo]*link)
@ -78,6 +85,25 @@ func (l *links) init(c *Core) error {
return nil return nil
} }
func (l *links) shutdown() error {
phony.Block(l.tcp, func() {
for l := range l.tcp._listeners {
l.Close()
}
})
phony.Block(l.tls, func() {
for l := range l.tls._listeners {
l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
l.Close()
}
})
return nil
}
func (l *links) isConnectedTo(info linkInfo) bool { func (l *links) isConnectedTo(info linkInfo) bool {
l.mutex.RLock() l.mutex.RLock()
defer l.mutex.RUnlock() defer l.mutex.RUnlock()
@ -146,12 +172,35 @@ func (l *links) call(u *url.URL, sintf string) error {
} }
}() }()
case "unix":
go func() {
if err := l.unix.dial(u, tcpOpts.linkOptions, sintf); err != nil {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
}
}()
default: default:
return errors.New("unknown call scheme: " + u.Scheme) return errors.New("unknown call scheme: " + u.Scheme)
} }
return nil return nil
} }
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error { func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
intf := link{ intf := link{
conn: &linkConn{ conn: &linkConn{
@ -167,7 +216,7 @@ func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, forc
} }
go func() { go func() {
if err := intf.handler(); err != nil { if err := intf.handler(); err != nil {
l.core.log.Errorf("Link handler error (%s): %s", conn.RemoteAddr(), err) l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
} }
}() }()
return nil return nil
@ -178,7 +227,7 @@ func (intf *link) handler() error {
// Don't connect to this link more than once. // Don't connect to this link more than once.
if intf.links.isConnectedTo(intf.info) { if intf.links.isConnectedTo(intf.info) {
return fmt.Errorf("already connected to %+v", intf.info) return fmt.Errorf("already connected to this node")
} }
// Mark the connection as in progress. // Mark the connection as in progress.
@ -280,7 +329,7 @@ func (intf *link) handler() error {
strings.ToUpper(intf.info.linkType), remoteStr, localStr) strings.ToUpper(intf.info.linkType), remoteStr, localStr)
// TODO don't report an error if it's just a 'use of closed network connection' // TODO don't report an error if it's just a 'use of closed network connection'
if err = intf.links.core.HandleConn(meta.key, intf.conn); err != nil { if err = intf.links.core.HandleConn(meta.key, intf.conn); err != nil && err != io.EOF {
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s", intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), remoteStr, localStr, err) strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
} else { } else {

View file

@ -16,7 +16,7 @@ type linkTCP struct {
phony.Inbox phony.Inbox
*links *links
listener *net.ListenConfig listener *net.ListenConfig
_listeners map[net.Listener]context.CancelFunc _listeners map[*Listener]context.CancelFunc
} }
type tcpOptions struct { type tcpOptions struct {
@ -33,7 +33,7 @@ func (l *links) newLinkTCP() *linkTCP {
listener: &net.ListenConfig{ listener: &net.ListenConfig{
KeepAlive: -1, KeepAlive: -1,
}, },
_listeners: map[net.Listener]context.CancelFunc{}, _listeners: map[*Listener]context.CancelFunc{},
} }
lt.listener.Control = lt.tcpContext lt.listener.Control = lt.tcpContext
return lt return lt
@ -64,8 +64,7 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx) ctx, cancel := context.WithCancel(l.core.ctx)
hostport := url.Host hostport := url.Host
if sintf != "" { if sintf != "" {
host, port, err := net.SplitHostPort(hostport) if host, port, err := net.SplitHostPort(hostport); err == nil {
if err == nil {
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port) hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
} }
} }
@ -74,18 +73,23 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
cancel() cancel()
return nil, err return nil, err
} }
entry := &Listener{
Listener: listener,
closed: make(chan struct{}),
}
phony.Block(l, func() { phony.Block(l, func() {
l._listeners[listener] = cancel l._listeners[entry] = cancel
}) })
l.core.log.Printf("TCP listener started on %s", listener.Addr())
go func() { go func() {
defer phony.Block(l, func() { defer phony.Block(l, func() {
delete(l._listeners, listener) delete(l._listeners, entry)
}) })
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
cancel() cancel()
return break
} }
addr := conn.RemoteAddr().(*net.TCPAddr) addr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("tls://%s", addr) name := fmt.Sprintf("tls://%s", addr)
@ -94,11 +98,11 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
l.core.log.Errorln("Failed to create inbound link:", err) l.core.log.Errorln("Failed to create inbound link:", err)
} }
} }
listener.Close()
close(entry.closed)
l.core.log.Printf("TCP listener stopped on %s", listener.Addr())
}() }()
return &Listener{ return entry, nil
Listener: listener,
Close: cancel,
}, nil
} }
func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options tcpOptions, incoming bool) error { func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options tcpOptions, incoming bool) error {
@ -160,9 +164,6 @@ func (l *linkTCP) dialerFor(saddr, sintf string) (*net.Dialer, error) {
if err != nil { if err != nil {
continue continue
} }
if src.Equal(dst.IP) {
continue
}
if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() { if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() {
continue continue
} }

View file

@ -25,7 +25,7 @@ type linkTLS struct {
tcp *linkTCP tcp *linkTCP
listener *net.ListenConfig listener *net.ListenConfig
config *tls.Config config *tls.Config
_listeners map[net.Listener]context.CancelFunc _listeners map[*Listener]context.CancelFunc
} }
func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS { func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS {
@ -36,7 +36,7 @@ func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS {
Control: tcp.tcpContext, Control: tcp.tcpContext,
KeepAlive: -1, KeepAlive: -1,
}, },
_listeners: map[net.Listener]context.CancelFunc{}, _listeners: map[*Listener]context.CancelFunc{},
} }
var err error var err error
lt.config, err = lt.generateConfig() lt.config, err = lt.generateConfig()
@ -75,8 +75,7 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx) ctx, cancel := context.WithCancel(l.core.ctx)
hostport := url.Host hostport := url.Host
if sintf != "" { if sintf != "" {
host, port, err := net.SplitHostPort(hostport) if host, port, err := net.SplitHostPort(hostport); err == nil {
if err == nil {
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port) hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
} }
} }
@ -86,18 +85,23 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
return nil, err return nil, err
} }
tlslistener := tls.NewListener(listener, l.config) tlslistener := tls.NewListener(listener, l.config)
entry := &Listener{
Listener: tlslistener,
closed: make(chan struct{}),
}
phony.Block(l, func() { phony.Block(l, func() {
l._listeners[tlslistener] = cancel l._listeners[entry] = cancel
}) })
l.core.log.Printf("TLS listener started on %s", listener.Addr())
go func() { go func() {
defer phony.Block(l, func() { defer phony.Block(l, func() {
delete(l._listeners, tlslistener) delete(l._listeners, entry)
}) })
for { for {
conn, err := tlslistener.Accept() conn, err := tlslistener.Accept()
if err != nil { if err != nil {
cancel() cancel()
return break
} }
addr := conn.RemoteAddr().(*net.TCPAddr) addr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("tls://%s", addr) name := fmt.Sprintf("tls://%s", addr)
@ -106,11 +110,11 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
l.core.log.Errorln("Failed to create inbound link:", err) l.core.log.Errorln("Failed to create inbound link:", err)
} }
} }
tlslistener.Close()
close(entry.closed)
l.core.log.Printf("TLS listener stopped on %s", listener.Addr())
}() }()
return &Listener{ return entry, nil
Listener: tlslistener,
Close: cancel,
}, nil
} }
func (l *linkTLS) generateConfig() (*tls.Config, error) { func (l *linkTLS) generateConfig() (*tls.Config, error) {

98
src/core/link_unix.go Normal file
View file

@ -0,0 +1,98 @@
package core
import (
"context"
"fmt"
"net"
"net/url"
"time"
"github.com/Arceliar/phony"
)
type linkUNIX struct {
phony.Inbox
*links
dialer *net.Dialer
listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
}
func (l *links) newLinkUNIX() *linkUNIX {
lt := &linkUNIX{
links: l,
dialer: &net.Dialer{
Timeout: time.Second * 5,
KeepAlive: -1,
},
listener: &net.ListenConfig{
KeepAlive: -1,
},
_listeners: map[*Listener]context.CancelFunc{},
}
return lt
}
func (l *linkUNIX) dial(url *url.URL, options linkOptions, _ string) error {
info := linkInfoFor("unix", "", url.Path)
if l.links.isConnectedTo(info) {
return fmt.Errorf("duplicate connection attempt")
}
addr, err := net.ResolveUnixAddr("unix", url.Path)
if err != nil {
return err
}
conn, err := l.dialer.DialContext(l.core.ctx, "unix", addr.String())
if err != nil {
return err
}
return l.handler(url.String(), info, conn, options, false)
}
func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx)
listener, err := l.listener.Listen(ctx, "unix", url.Path)
if err != nil {
cancel()
return nil, err
}
entry := &Listener{
Listener: listener,
closed: make(chan struct{}),
}
phony.Block(l, func() {
l._listeners[entry] = cancel
})
l.core.log.Printf("UNIX listener started on %s", listener.Addr())
go func() {
defer phony.Block(l, func() {
delete(l._listeners, entry)
})
for {
conn, err := listener.Accept()
if err != nil {
cancel()
break
}
info := linkInfoFor("unix", "", url.String())
if err = l.handler(url.String(), info, conn, linkOptions{}, true); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
listener.Close()
close(entry.closed)
l.core.log.Printf("UNIX listener stopped on %s", listener.Addr())
}()
return entry, nil
}
func (l *linkUNIX) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
return l.links.create(
conn, // connection
name, // connection name
info, // connection info
incoming, // not incoming
false, // not forced
options, // connection options
)
}