mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-06-17 14:45:06 +03:00
Link refactoring
This commit is contained in:
parent
88a393a7b3
commit
496eed7974
12 changed files with 441 additions and 601 deletions
120
src/core/link.go
120
src/core/link.go
|
@ -2,6 +2,7 @@ package core
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
|
@ -20,15 +21,15 @@ import (
|
|||
"github.com/Arceliar/phony"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
"golang.org/x/net/proxy"
|
||||
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
|
||||
)
|
||||
|
||||
type links struct {
|
||||
core *Core
|
||||
tcp *linkTCP // TCP interface support
|
||||
tls *linkTLS // TLS interface support
|
||||
mutex sync.RWMutex // protects links below
|
||||
links map[linkInfo]*link
|
||||
tcp tcp // TCP interface support
|
||||
stopped chan struct{}
|
||||
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
||||
}
|
||||
|
@ -56,8 +57,16 @@ type linkOptions struct {
|
|||
pinnedEd25519Keys map[keyArray]struct{}
|
||||
}
|
||||
|
||||
type Listener struct {
|
||||
net.Listener
|
||||
Close context.CancelFunc // deliberately replaces net.Listener.Close()
|
||||
}
|
||||
|
||||
func (l *links) init(c *Core) error {
|
||||
l.core = c
|
||||
l.tcp = l.newLinkTCP()
|
||||
l.tls = l.newLinkTLS(l.tcp)
|
||||
|
||||
l.mutex.Lock()
|
||||
l.links = make(map[linkInfo]*link)
|
||||
l.mutex.Unlock()
|
||||
|
@ -70,41 +79,46 @@ func (l *links) init(c *Core) error {
|
|||
listeners = append(listeners, listener)
|
||||
}
|
||||
})
|
||||
if err := l.tcp.init(l, listeners); err != nil {
|
||||
c.log.Errorln("Failed to start TCP interface")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *links) call(u *url.URL, sintf string) error {
|
||||
tcpOpts := tcpOptions{}
|
||||
if pubkeys, ok := u.Query()["key"]; ok && len(pubkeys) > 0 {
|
||||
tcpOpts.pinnedEd25519Keys = make(map[keyArray]struct{})
|
||||
for _, pubkey := range pubkeys {
|
||||
if sigPub, err := hex.DecodeString(pubkey); err == nil {
|
||||
var sigPubKey keyArray
|
||||
copy(sigPubKey[:], sigPub)
|
||||
tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
|
||||
}
|
||||
// TODO: don't dial duplicates here
|
||||
tcpOpts := tcpOptions{
|
||||
linkOptions: linkOptions{
|
||||
pinnedEd25519Keys: map[keyArray]struct{}{},
|
||||
},
|
||||
}
|
||||
for _, pubkey := range u.Query()["key"] {
|
||||
if sigPub, err := hex.DecodeString(pubkey); err == nil {
|
||||
var sigPubKey keyArray
|
||||
copy(sigPubKey[:], sigPub)
|
||||
tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
|
||||
}
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
l.tcp.call(u.Host, tcpOpts, sintf)
|
||||
case "socks":
|
||||
tcpOpts.socksProxyAddr = u.Host
|
||||
if u.User != nil {
|
||||
tcpOpts.socksProxyAuth = &proxy.Auth{}
|
||||
tcpOpts.socksProxyAuth.User = u.User.Username()
|
||||
tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
|
||||
}
|
||||
tcpOpts.upgrade = l.tcp.tls.forDialer // TODO make this configurable
|
||||
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||
l.tcp.call(pathtokens[0], tcpOpts, sintf)
|
||||
go func() {
|
||||
if _, err := l.tcp.dial(u, tcpOpts, sintf); err != nil {
|
||||
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
|
||||
}
|
||||
}()
|
||||
|
||||
/*
|
||||
case "socks":
|
||||
tcpOpts.socksProxyAddr = u.Host
|
||||
if u.User != nil {
|
||||
tcpOpts.socksProxyAuth = &proxy.Auth{}
|
||||
tcpOpts.socksProxyAuth.User = u.User.Username()
|
||||
tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
|
||||
}
|
||||
tcpOpts.upgrade = l.tcp.tls.forDialer // TODO make this configurable
|
||||
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||
go l.tcp.call(pathtokens[0], tcpOpts, sintf)
|
||||
*/
|
||||
|
||||
case "tls":
|
||||
tcpOpts.upgrade = l.tcp.tls.forDialer
|
||||
// SNI headers must contain hostnames and not IP addresses, so we must make sure
|
||||
// that we do not populate the SNI with an IP literal. We do this by splitting
|
||||
// the host-port combo from the query option and then seeing if it parses to an
|
||||
|
@ -121,7 +135,12 @@ func (l *links) call(u *url.URL, sintf string) error {
|
|||
tcpOpts.tlsSNI = host
|
||||
}
|
||||
}
|
||||
l.tcp.call(u.Host, tcpOpts, sintf)
|
||||
go func() {
|
||||
if _, err := l.tls.dial(u, tcpOpts, sintf); err != nil {
|
||||
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
|
||||
}
|
||||
}()
|
||||
|
||||
default:
|
||||
return errors.New("unknown call scheme: " + u.Scheme)
|
||||
}
|
||||
|
@ -130,33 +149,37 @@ func (l *links) call(u *url.URL, sintf string) error {
|
|||
|
||||
func (l *links) create(conn net.Conn, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
|
||||
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
|
||||
info := linkInfo{
|
||||
linkType: linkType,
|
||||
local: local,
|
||||
remote: remote,
|
||||
}
|
||||
l.mutex.RLock()
|
||||
_, isIn := l.links[info]
|
||||
l.mutex.RUnlock()
|
||||
if isIn {
|
||||
return nil, fmt.Errorf("duplicate")
|
||||
}
|
||||
intf := link{
|
||||
conn: &linkConn{
|
||||
Conn: conn,
|
||||
up: time.Now(),
|
||||
},
|
||||
lname: name,
|
||||
links: l,
|
||||
options: options,
|
||||
info: linkInfo{
|
||||
linkType: linkType,
|
||||
local: local,
|
||||
remote: remote,
|
||||
},
|
||||
lname: name,
|
||||
links: l,
|
||||
options: options,
|
||||
info: info,
|
||||
incoming: incoming,
|
||||
force: force,
|
||||
}
|
||||
go func() {
|
||||
if _, err := intf.handler(); err != nil {
|
||||
l.core.log.Warnf("Handler error (incoming %v): %s\n", incoming, err)
|
||||
}
|
||||
}()
|
||||
return &intf, nil
|
||||
}
|
||||
|
||||
func (l *links) stop() error {
|
||||
close(l.stopped)
|
||||
if err := l.tcp.stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (intf *link) handler() (chan struct{}, error) {
|
||||
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
|
||||
defer intf.conn.Close()
|
||||
|
@ -175,7 +198,7 @@ func (intf *link) handler() (chan struct{}, error) {
|
|||
return nil, errors.New("timeout on metadata send")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("write handshake: %w", err)
|
||||
}
|
||||
if !util.FuncTimeout(30*time.Second, func() {
|
||||
var n int
|
||||
|
@ -187,7 +210,7 @@ func (intf *link) handler() (chan struct{}, error) {
|
|||
return nil, errors.New("timeout on metadata recv")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("read handshake: %w", err)
|
||||
}
|
||||
meta = version_metadata{}
|
||||
base := version_getBaseMetadata()
|
||||
|
@ -211,7 +234,7 @@ func (intf *link) handler() (chan struct{}, error) {
|
|||
}
|
||||
// Check if the remote side matches the keys we expected. This is a bit of a weak
|
||||
// check - in future versions we really should check a signature or something like that.
|
||||
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
|
||||
if pinned := intf.options.pinnedEd25519Keys; len(pinned) > 0 {
|
||||
var key keyArray
|
||||
copy(key[:], meta.key)
|
||||
if _, allowed := pinned[key]; !allowed {
|
||||
|
@ -262,6 +285,9 @@ func (intf *link) handler() (chan struct{}, error) {
|
|||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||
// Run the handler
|
||||
err = intf.links.core.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("connection: %w", err)
|
||||
}
|
||||
// TODO don't report an error if it's just a 'use of closed network connection'
|
||||
if err != nil {
|
||||
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue