Link refactoring, admin socket changes

This commit is contained in:
Neil Alexander 2023-04-06 21:45:49 +01:00
parent c7ee7d9681
commit 7afa23be4c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
32 changed files with 1206 additions and 1130 deletions

View file

@ -2,10 +2,12 @@ package core
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"math"
"net"
"net/url"
"strconv"
@ -17,6 +19,14 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address"
)
type linkType int
const (
linkTypePersistent linkType = iota // Statically configured
linkTypeEphemeral // Multicast discovered
linkTypeIncoming // Incoming connection
)
type links struct {
phony.Inbox
core *Core
@ -27,41 +37,52 @@ type links struct {
_links map[linkInfo]*link // *link is nil if connection in progress
}
type linkProtocol interface {
dial(url *url.URL, info linkInfo, options linkOptions) (net.Conn, error)
listen(ctx context.Context, url *url.URL, sintf string) (net.Listener, error)
}
// linkInfo is used as a map key
type linkInfo struct {
linkType string // Type of link, e.g. TCP, AWDL
local string // Local name or address
remote string // Remote name or address
}
type linkDial struct {
url *url.URL
sintf string
uri string // Peering URI in complete form
sintf string // Peering source interface (i.e. from InterfacePeers)
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
}
// link tracks the state of a connection, either persistent or non-persistent
type link struct {
lname string
links *links
conn *linkConn
options linkOptions
info linkInfo
incoming bool
force bool
phony.Inbox
ctx context.Context //
cancel context.CancelFunc //
kick chan struct{} // Attempt to reconnect now, if backing off
info linkInfo //
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
_conn *linkConn // Connected link, if any, nil if not connected
_err error // Last error on the connection, if any
_errtime time.Time // Last time an error occured
}
type linkOptions struct {
pinnedEd25519Keys map[keyArray]struct{}
priority uint8
tlsSNI string
}
type Listener struct {
net.Listener
closed chan struct{}
listener net.Listener
ctx context.Context
Cancel context.CancelFunc
}
func (l *Listener) Addr() net.Addr {
return l.listener.Addr()
}
func (l *Listener) Close() error {
err := l.Listener.Close()
<-l.closed
l.Cancel()
err := l.listener.Close()
<-l.ctx.Done()
return err
}
@ -105,195 +126,294 @@ func (l *links) shutdown() {
func (l *links) isConnectedTo(info linkInfo) bool {
var isConnected bool
phony.Block(l, func() {
_, isConnected = l._links[info]
link, ok := l._links[info]
if !ok {
return
}
isConnected = link._conn != nil
})
return isConnected
}
func (l *links) call(u *url.URL, sintf string, errch chan<- error) (info linkInfo, err error) {
info = linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
if errch != nil {
close(errch) // already connected, no error
type linkError string
func (e linkError) Error() string { return string(e) }
const ErrLinkAlreadyConfigured = linkError("peer is already configured")
const ErrLinkPriorityInvalid = linkError("priority value is invalid")
const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid")
const ErrLinkUnrecognisedSchema = linkError("link schema unknown")
func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// Generate the link info and see whether we think we already
// have an open peering to this peer.
info := linkInfo{
uri: u.String(),
sintf: sintf,
linkType: linkType,
}
if state, ok := l._links[info]; ok {
select {
case state.kick <- struct{}{}:
default:
}
return info, nil
return ErrLinkAlreadyConfigured
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
// Create the link entry. This will contain the connection
// in progress (if any), any error details and a context that
// lets the link be cancelled later.
ctx, cancel := context.WithCancel(l.core.ctx)
state := &link{
info: info,
linkProto: strings.ToUpper(u.Scheme),
ctx: ctx,
cancel: cancel,
}
// Collect together the link options, these are global options
// that are not specific to any given protocol.
var options linkOptions
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
if errch != nil {
close(errch)
}
return info, fmt.Errorf("pinned key contains invalid hex characters")
return ErrLinkPinnedKeyInvalid
}
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
if options.pinnedEd25519Keys == nil {
options.pinnedEd25519Keys = map[keyArray]struct{}{}
}
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
if errch != nil {
close(errch)
}
return info, fmt.Errorf("priority invalid: %w", err)
return ErrLinkPriorityInvalid
}
options.priority = uint8(pi)
}
switch info.linkType {
case "tcp":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "socks":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
// Store the state of the link, try to connect and then run
// the handler.
phony.Block(l, func() {
l._links[info] = state
})
case "tls":
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
tlsSNI = sni
}
// Track how many consecutive connection failures we have had,
// as we will back off exponentially rather than hammering the
// remote node endlessly.
var backoff int
// backoffNow is called when there's a connection error. It
// will wait for the specified amount of time and then return
// true, unless the peering context was cancelled (due to a
// peer removal most likely), in which case it returns false.
// The caller should check the return value to decide whether
// or not to give up trying.
backoffNow := func() bool {
backoff++
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
select {
case <-time.After(duration):
return true
case <-state.kick:
return true
case <-ctx.Done():
return false
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tlsSNI = host
}
}
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "unix":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
default:
if errch != nil {
close(errch)
}
return info, errors.New("unknown call scheme: " + u.Scheme)
}
return info, nil
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}
func (l *links) create(conn net.Conn, dial *linkDial, name string, info linkInfo, incoming, force bool, options linkOptions) error {
intf := link{
conn: &linkConn{
Conn: conn,
up: time.Now(),
},
lname: name,
links: l,
options: options,
info: info,
incoming: incoming,
force: force,
}
// The goroutine is responsible for attempting the connection
// and then running the handler. If the connection is persistent
// then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry.
go func() {
if err := intf.handler(dial); err != nil {
l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
defer phony.Block(l, func() {
delete(l._links, info)
})
for {
conn, err := l.connect(u, info, options)
if err != nil {
if linkType == linkTypePersistent {
phony.Block(state, func() {
state._err = err
state._errtime = time.Now()
})
if backoffNow() {
continue
} else {
return
}
} else {
break
}
}
lc := &linkConn{
Conn: conn,
up: time.Now(),
}
phony.Block(state, func() {
state._conn = lc
state._err = nil
state._errtime = time.Time{}
})
if err = l.handler(&info, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
} else {
backoff = 0
}
_ = conn.Close()
phony.Block(state, func() {
state._conn = nil
if state._err = err; state._err != nil {
state._errtime = time.Now()
}
})
if linkType == linkTypePersistent {
if backoffNow() {
continue
} else {
return
}
} else {
break
}
}
}()
return nil
}
func (intf *link) handler(dial *linkDial) error {
defer intf.conn.Close() // nolint:errcheck
// Don't connect to this link more than once.
if intf.links.isConnectedTo(intf.info) {
return nil
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx)
var protocol linkProtocol
switch strings.ToLower(u.Scheme) {
case "tcp":
protocol = l.tcp
case "tls":
protocol = l.tls
case "unix":
protocol = l.unix
default:
cancel()
return nil, ErrLinkUnrecognisedSchema
}
listener, err := protocol.listen(ctx, u, sintf)
if err != nil {
cancel()
return nil, err
}
li := &Listener{
listener: listener,
ctx: ctx,
Cancel: cancel,
}
go func() {
l.core.log.Printf("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr())
defer l.core.log.Printf("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
continue
}
pu := *u
pu.Host = conn.RemoteAddr().String()
info := linkInfo{
uri: pu.String(),
sintf: sintf,
linkType: linkTypeIncoming,
}
if l.isConnectedTo(info) {
_ = conn.Close()
continue
}
state := l._links[info]
if state == nil {
state = &link{
info: info,
}
}
lc := &linkConn{
Conn: conn,
up: time.Now(),
}
var options linkOptions
phony.Block(state, func() {
state._conn = lc
state._err = nil
state.linkProto = strings.ToUpper(u.Scheme)
})
phony.Block(l, func() {
l._links[info] = state
})
if err = l.handler(&info, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", u.Host, err)
}
phony.Block(state, func() {
state._conn = nil
if state._err = err; state._err != nil {
state._errtime = time.Now()
}
})
phony.Block(l, func() {
delete(l._links, info)
})
}
}()
return li, nil
}
// Mark the connection as in progress.
phony.Block(intf.links, func() {
intf.links._links[intf.info] = nil
})
// When we're done, clean up the connection entry.
defer phony.Block(intf.links, func() {
delete(intf.links._links, intf.info)
})
func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
var dialer linkProtocol
switch strings.ToLower(u.Scheme) {
case "tcp":
dialer = l.tcp
case "tls":
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
options.tlsSNI = sni
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if options.tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
options.tlsSNI = host
}
}
dialer = l.tls
case "socks":
dialer = l.socks
case "unix":
dialer = l.unix
default:
return nil, ErrLinkUnrecognisedSchema
}
return dialer.dial(u, info, options)
}
func (l *links) handler(info *linkInfo, options linkOptions, conn net.Conn) error {
meta := version_getBaseMetadata()
meta.publicKey = intf.links.core.public
meta.publicKey = l.core.public
metaBytes := meta.encode()
if err := intf.conn.SetDeadline(time.Now().Add(time.Second * 6)); err != nil {
if err := conn.SetDeadline(time.Now().Add(time.Second * 6)); err != nil {
return fmt.Errorf("failed to set handshake deadline: %w", err)
}
n, err := intf.conn.Write(metaBytes)
n, err := conn.Write(metaBytes)
switch {
case err != nil:
return fmt.Errorf("write handshake: %w", err)
case err == nil && n != len(metaBytes):
return fmt.Errorf("incomplete handshake send")
}
if _, err = io.ReadFull(intf.conn, metaBytes); err != nil {
if _, err = io.ReadFull(conn, metaBytes); err != nil {
return fmt.Errorf("read handshake: %w", err)
}
if err = intf.conn.SetDeadline(time.Time{}); err != nil {
if err = conn.SetDeadline(time.Time{}); err != nil {
return fmt.Errorf("failed to clear handshake deadline: %w", err)
}
meta = version_metadata{}
@ -302,23 +422,14 @@ func (intf *link) handler(dial *linkDial) error {
return errors.New("failed to decode metadata")
}
if !meta.check() {
var connectError string
if intf.incoming {
connectError = "Rejected incoming connection"
} else {
connectError = "Failed to connect"
}
intf.links.core.log.Debugf("%s: %s is incompatible version (local %s, remote %s)",
connectError,
intf.lname,
return fmt.Errorf("remote node incompatible version (local %s, remote %s)",
fmt.Sprintf("%d.%d", base.majorVer, base.minorVer),
fmt.Sprintf("%d.%d", meta.majorVer, meta.minorVer),
)
return errors.New("remote node is incompatible version")
}
// Check if the remote side matches the keys we expected. This is a bit of a weak
// check - in future versions we really should check a signature or something like that.
if pinned := intf.options.pinnedEd25519Keys; len(pinned) > 0 {
if pinned := options.pinnedEd25519Keys; len(pinned) > 0 {
var key keyArray
copy(key[:], meta.publicKey)
if _, allowed := pinned[key]; !allowed {
@ -326,7 +437,10 @@ func (intf *link) handler(dial *linkDial) error {
}
}
// Check if we're authorized to connect to this key / IP
allowed := intf.links.core.config._allowedPublicKeys
var allowed map[[32]byte]struct{}
phony.Block(l.core, func() {
allowed = l.core.config._allowedPublicKeys
})
isallowed := len(allowed) == 0
for k := range allowed {
if bytes.Equal(k[:], meta.publicKey) {
@ -334,73 +448,32 @@ func (intf *link) handler(dial *linkDial) error {
break
}
}
if intf.incoming && !intf.force && !isallowed {
_ = intf.close()
if info.linkType == linkTypeIncoming && !isallowed {
return fmt.Errorf("node public key %q is not in AllowedPublicKeys", hex.EncodeToString(meta.publicKey))
}
phony.Block(intf.links, func() {
intf.links._links[intf.info] = intf
})
dir := "outbound"
if intf.incoming {
if info.linkType == linkTypeIncoming {
dir = "inbound"
}
remoteAddr := net.IP(address.AddrForKey(meta.publicKey)[:]).String()
remoteStr := fmt.Sprintf("%s@%s", remoteAddr, intf.info.remote)
localStr := intf.conn.LocalAddr()
intf.links.core.log.Infof("Connected %s %s: %s, source %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr)
remoteStr := fmt.Sprintf("%s@%s", remoteAddr, conn.RemoteAddr())
localStr := conn.LocalAddr()
l.core.log.Infof("Connected %s: %s, source %s",
dir, remoteStr, localStr)
err = intf.links.core.HandleConn(meta.publicKey, intf.conn, intf.options.priority)
err = l.core.HandleConn(meta.publicKey, conn, options.priority)
switch err {
case io.EOF, net.ErrClosed, nil:
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr)
l.core.log.Infof("Disconnected %s: %s, source %s",
dir, remoteStr, localStr)
default:
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s; error: %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
l.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
dir, remoteStr, localStr, err)
}
if !intf.incoming && dial != nil {
// The connection was one that we dialled, so wait a second and try to
// dial it again.
var retry func(attempt int)
retry = func(attempt int) {
// intf.links.core.log.Infof("Retrying %s (attempt %d of 5)...", dial.url.String(), attempt)
errch := make(chan error, 1)
if _, err := intf.links.call(dial.url, dial.sintf, errch); err != nil {
return
}
if err := <-errch; err != nil {
if attempt < 3 {
time.AfterFunc(time.Second, func() {
retry(attempt + 1)
})
}
}
}
time.AfterFunc(time.Second, func() {
retry(1)
})
}
return nil
}
func (intf *link) close() error {
return intf.conn.Close()
}
func linkInfoFor(linkType, sintf, remote string) linkInfo {
return linkInfo{
linkType: linkType,
local: sintf,
remote: remote,
}
}
type linkConn struct {
// tx and rx are at the beginning of the struct to ensure 64-bit alignment
// on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG
@ -421,12 +494,3 @@ func (c *linkConn) Write(p []byte) (n int, err error) {
atomic.AddUint64(&c.tx, uint64(n))
return
}
func linkOptionsForListener(u *url.URL) (l linkOptions) {
if p := u.Query().Get("priority"); p != "" {
if pi, err := strconv.ParseUint(p, 10, 8); err == nil {
l.priority = uint8(pi)
}
}
return
}