Link refactor (#941)

* Link refactoring

* More refactoring

* More tweaking

* Cleaner shutdowns, UNIX socket support, more tweaks

* Actorise links, remove mutex

* SOCKS support
This commit is contained in:
Neil Alexander 2022-09-17 20:07:00 +01:00 committed by GitHub
parent 414aaf6eb9
commit 5ef61faeff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 738 additions and 698 deletions

View file

@ -2,7 +2,6 @@ package core
import (
"bytes"
"crypto/ed25519"
"encoding/hex"
"errors"
"fmt"
@ -10,7 +9,6 @@ import (
"net"
"net/url"
"strings"
"sync"
//"sync/atomic"
"time"
@ -20,22 +18,22 @@ import (
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"golang.org/x/net/proxy"
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
)
type links struct {
core *Core
mutex sync.RWMutex // protects links below
links map[linkInfo]*link
tcp tcp // TCP interface support
stopped chan struct{}
phony.Inbox
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
_links map[linkInfo]*link // *link is nil if connection in progress
// TODO timeout (to remove from switch), read from config.ReadTimeout
}
// linkInfo is used as a map key
type linkInfo struct {
key keyArray
linkType string // Type of link, e.g. TCP, AWDL
local string // Local name or address
remote string // Remote name or address
@ -49,19 +47,30 @@ type link struct {
info linkInfo
incoming bool
force bool
closed chan struct{}
}
type linkOptions struct {
pinnedEd25519Keys map[keyArray]struct{}
}
type Listener struct {
net.Listener
closed chan struct{}
}
func (l *Listener) Close() error {
err := l.Listener.Close()
<-l.closed
return err
}
func (l *links) init(c *Core) error {
l.core = c
l.mutex.Lock()
l.links = make(map[linkInfo]*link)
l.mutex.Unlock()
l.stopped = make(chan struct{})
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l._links = make(map[linkInfo]*link)
var listeners []ListenAddress
phony.Block(c, func() {
@ -70,96 +79,160 @@ func (l *links) init(c *Core) error {
listeners = append(listeners, listener)
}
})
if err := l.tcp.init(l, listeners); err != nil {
c.log.Errorln("Failed to start TCP interface")
return err
}
return nil
}
func (l *links) shutdown() error {
phony.Block(l.tcp, func() {
for l := range l.tcp._listeners {
l.Close()
}
})
phony.Block(l.tls, func() {
for l := range l.tls._listeners {
l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
l.Close()
}
})
return nil
}
func (l *links) isConnectedTo(info linkInfo) bool {
var isConnected bool
phony.Block(l, func() {
_, isConnected = l._links[info]
})
return isConnected
}
func (l *links) call(u *url.URL, sintf string) error {
tcpOpts := tcpOptions{}
if pubkeys, ok := u.Query()["key"]; ok && len(pubkeys) > 0 {
tcpOpts.pinnedEd25519Keys = make(map[keyArray]struct{})
for _, pubkey := range pubkeys {
if sigPub, err := hex.DecodeString(pubkey); err == nil {
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
info := linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
return fmt.Errorf("already connected to this node")
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
}
for _, pubkey := range u.Query()["key"] {
if sigPub, err := hex.DecodeString(pubkey); err == nil {
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
}
switch u.Scheme {
switch info.linkType {
case "tcp":
l.tcp.call(u.Host, tcpOpts, sintf)
go func() {
if err := l.tcp.dial(u, options, sintf); err != nil {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
}
}()
case "socks":
tcpOpts.socksProxyAddr = u.Host
if u.User != nil {
tcpOpts.socksProxyAuth = &proxy.Auth{}
tcpOpts.socksProxyAuth.User = u.User.Username()
tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
}
tcpOpts.upgrade = l.tcp.tls.forDialer // TODO make this configurable
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
l.tcp.call(pathtokens[0], tcpOpts, sintf)
go func() {
if err := l.socks.dial(u, options); err != nil {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
}
}()
case "tls":
tcpOpts.upgrade = l.tcp.tls.forDialer
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
tcpOpts.tlsSNI = sni
tlsSNI = sni
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tcpOpts.tlsSNI == "" {
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tcpOpts.tlsSNI = host
tlsSNI = host
}
}
l.tcp.call(u.Host, tcpOpts, sintf)
go func() {
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
}
}()
case "unix":
go func() {
if err := l.unix.dial(u, options, sintf); err != nil {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
}
}()
default:
return errors.New("unknown call scheme: " + u.Scheme)
}
return nil
}
func (l *links) create(conn net.Conn, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
intf := link{
conn: &linkConn{
Conn: conn,
up: time.Now(),
},
lname: name,
links: l,
options: options,
info: linkInfo{
linkType: linkType,
local: local,
remote: remote,
},
lname: name,
links: l,
options: options,
info: info,
incoming: incoming,
force: force,
}
return &intf, nil
}
func (l *links) stop() error {
close(l.stopped)
if err := l.tcp.stop(); err != nil {
return err
}
go func() {
if err := intf.handler(); err != nil {
l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
}
}()
return nil
}
func (intf *link) handler() (chan struct{}, error) {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
func (intf *link) handler() error {
defer intf.conn.Close()
// Don't connect to this link more than once.
if intf.links.isConnectedTo(intf.info) {
return fmt.Errorf("already connected to this node")
}
// Mark the connection as in progress.
phony.Block(intf.links, func() {
intf.links._links[intf.info] = nil
})
// When we're done, clean up the connection entry.
defer phony.Block(intf.links, func() {
delete(intf.links._links, intf.info)
})
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
meta := version_getBaseMetadata()
meta.key = intf.links.core.public
metaBytes := meta.encode()
@ -172,10 +245,10 @@ func (intf *link) handler() (chan struct{}, error) {
err = errors.New("incomplete metadata send")
}
}) {
return nil, errors.New("timeout on metadata send")
return errors.New("timeout on metadata send")
}
if err != nil {
return nil, err
return fmt.Errorf("write handshake: %w", err)
}
if !util.FuncTimeout(30*time.Second, func() {
var n int
@ -184,15 +257,15 @@ func (intf *link) handler() (chan struct{}, error) {
err = errors.New("incomplete metadata recv")
}
}) {
return nil, errors.New("timeout on metadata recv")
return errors.New("timeout on metadata recv")
}
if err != nil {
return nil, err
return fmt.Errorf("read handshake: %w", err)
}
meta = version_metadata{}
base := version_getBaseMetadata()
if !meta.decode(metaBytes) {
return nil, errors.New("failed to decode metadata")
return errors.New("failed to decode metadata")
}
if !meta.check() {
var connectError string
@ -207,16 +280,16 @@ func (intf *link) handler() (chan struct{}, error) {
fmt.Sprintf("%d.%d", base.ver, base.minorVer),
fmt.Sprintf("%d.%d", meta.ver, meta.minorVer),
)
return nil, errors.New("remote node is incompatible version")
return errors.New("remote node is incompatible version")
}
// Check if the remote side matches the keys we expected. This is a bit of a weak
// check - in future versions we really should check a signature or something like that.
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
if pinned := intf.options.pinnedEd25519Keys; len(pinned) > 0 {
var key keyArray
copy(key[:], meta.key)
if _, allowed := pinned[key]; !allowed {
intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name())
return nil, fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
}
}
// Check if we're authorized to connect to this key / IP
@ -232,55 +305,50 @@ func (intf *link) handler() (chan struct{}, error) {
intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.key))
intf.close()
return nil, nil
return fmt.Errorf("forbidden connection")
}
// Check if we already have a link to this node
copy(intf.info.key[:], meta.key)
intf.links.mutex.Lock()
if oldIntf, isIn := intf.links.links[intf.info]; isIn {
intf.links.mutex.Unlock()
// FIXME we should really return an error and let the caller block instead
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name())
return oldIntf.closed, nil
} else {
intf.closed = make(chan struct{})
intf.links.links[intf.info] = intf
defer func() {
intf.links.mutex.Lock()
delete(intf.links.links, intf.info)
intf.links.mutex.Unlock()
close(intf.closed)
}()
intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name())
}
intf.links.mutex.Unlock()
themAddr := address.AddrForKey(ed25519.PublicKey(intf.info.key[:]))
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
phony.Block(intf.links, func() {
intf.links._links[intf.info] = intf
})
remoteAddr := net.IP(address.AddrForKey(meta.key)[:]).String()
remoteStr := fmt.Sprintf("%s@%s", remoteAddr, intf.info.remote)
localStr := intf.conn.LocalAddr()
intf.links.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Run the handler
err = intf.links.core.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
strings.ToUpper(intf.info.linkType), remoteStr, localStr)
// TODO don't report an error if it's just a 'use of closed network connection'
if err != nil {
if err = intf.links.core.HandleConn(meta.key, intf.conn); err != nil && err != io.EOF {
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
} else {
intf.links.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
strings.ToUpper(intf.info.linkType), remoteStr, localStr)
}
return nil, err
return nil
}
func (intf *link) close() {
intf.conn.Close()
func (intf *link) close() error {
return intf.conn.Close()
}
func (intf *link) name() string {
return intf.lname
}
func linkInfoFor(linkType, sintf, remote string) linkInfo {
if h, _, err := net.SplitHostPort(remote); err == nil {
remote = h
}
return linkInfo{
linkType: linkType,
local: sintf,
remote: remote,
}
}
type linkConn struct {
// tx and rx are at the beginning of the struct to ensure 64-bit alignment
// on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG