1. added multipath protocol and schema suport

2. added SCTP protocol and schema support
3. added set of NAS models support (Asustor, ReadyNAS, Drobo, QNAP, WD, Synology, Terramaster)
4. moved to fc00::/7 private segment
5. added Windows, MacOS and Linux UI for peers edit and current status
This commit is contained in:
vadym 2022-10-27 22:03:37 +03:00
parent cfa293d189
commit d8a4000141
198 changed files with 8589 additions and 697 deletions

View file

@ -2,15 +2,22 @@ package core
import (
"crypto/ed25519"
"encoding/json"
"fmt"
"net"
"net/url"
"sync/atomic"
"time"
//"encoding/hex"
"encoding/json"
//"errors"
//"fmt"
"net"
"net/url"
//"sort"
//"time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/RiV-chain/RiV-mesh/src/address"
)
type SelfInfo struct {
@ -146,7 +153,7 @@ func (c *Core) Listen(u *url.URL, sintf string) (*Listener, error) {
}
}
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128
// Address gets the IPv6 address of the Mesh node. This is always a /128
// address. The IPv6 address is only relevant when the node is operating as an
// IP router and often is meaningless when embedded into an application, unless
// that application also implements either VPN functionality or deals with IP
@ -156,7 +163,7 @@ func (c *Core) Address() net.IP {
return addr
}
// Subnet gets the routed IPv6 subnet of the Yggdrasil node. This is always a
// Subnet gets the routed IPv6 subnet of the Mesh node. This is always a
// /64 subnet. The IPv6 subnet is only relevant when the node is operating as an
// IP router and often is meaningless when embedded into an application, unless
// that application also implements either VPN functionality or deals with IP
@ -167,7 +174,7 @@ func (c *Core) Subnet() net.IPNet {
return net.IPNet{IP: subnet, Mask: net.CIDRMask(64, 128)}
}
// SetLogger sets the output logger of the Yggdrasil node after startup. This
// SetLogger sets the output logger of the Mesh node after startup. This
// may be useful if you want to redirect the output later. Note that this
// expects a Logger from the github.com/gologme/log package and not from Go's
// built-in log package.
@ -176,36 +183,29 @@ func (c *Core) SetLogger(log Logger) {
}
// AddPeer adds a peer. This should be specified in the peer URI format, e.g.:
//
// tcp://a.b.c.d:e
// socks://a.b.c.d:e/f.g.h.i:j
//
// tcp://a.b.c.d:e
// socks://a.b.c.d:e/f.g.h.i:j
// This adds the peer to the peer list, so that they will be called again if the
// connection drops.
func (c *Core) AddPeer(uri string, sourceInterface string) error {
var known bool
phony.Block(c, func() {
_, known = c.config._peers[Peer{uri, sourceInterface}]
})
if known {
return fmt.Errorf("peer already configured")
func (c *Core) AddPeer(peer string, intf string) error {
select {
case <-c.ctx.Done():
return nil
default:
}
u, err := url.Parse(uri)
u, err := url.Parse(peer)
if err != nil {
c.log.Errorln("Failed to parse peer url:", peer, err)
return err
}
info, err := c.links.call(u, sourceInterface)
if err != nil {
if err := c.CallPeer(u, intf); err != nil {
c.log.Errorln("Failed to add peer:", err)
return err
}
phony.Block(c, func() {
c.config._peers[Peer{uri, sourceInterface}] = &info
})
return nil
}
// RemovePeer removes a peer. The peer should be specified in URI format, see AddPeer.
// The peer is not disconnected immediately.
func (c *Core) RemovePeer(uri string, sourceInterface string) error {
var err error
phony.Block(c, func() {
@ -227,6 +227,15 @@ func (c *Core) RemovePeer(uri string, sourceInterface string) error {
return err
}
func (c *Core) RemovePeers() error {
c.config._peers = map[Peer]*linkInfo{}
//for k := range c.config.InterfacePeers {
// delete(c.config.InterfacePeers, k)
//}
return nil
}
// CallPeer calls a peer once. This should be specified in the peer URI format,
// e.g.:
//

View file

@ -14,11 +14,11 @@ import (
"github.com/Arceliar/phony"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
"github.com/RiV-chain/RiV-mesh/src/version"
)
// The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run.
// The Core object represents the Mesh node. You should create a Core
// object for each Mesh node you plan to run.
type Core struct {
// This is the main data structure that holds everything else for a node
// We're going to keep our own copy of the provided config - that way we can
@ -34,7 +34,7 @@ type Core struct {
log Logger
addPeerTimer *time.Timer
config struct {
_peers map[Peer]*linkInfo // configurable after startup
_peers map[Peer]*linkInfo // configurable after startup
_listeners map[ListenAddress]struct{} // configurable after startup
nodeinfo NodeInfo // immutable after startup
nodeinfoPrivacy NodeInfoPrivacy // immutable after startup
@ -121,7 +121,7 @@ func (c *Core) _addPeerLoop() {
})
}
// Stop shuts down the Yggdrasil node.
// Stop shuts down the Mesh node.
func (c *Core) Stop() {
phony.Block(c, func() {
c.log.Infoln("Stopping...")

View file

@ -10,191 +10,73 @@ import (
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
)
"sync/atomic"
type links struct {
phony.Inbox
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
_links map[linkInfo]*link // *link is nil if connection in progress
}
"github.com/Arceliar/phony"
"github.com/RiV-chain/RiV-mesh/src/address"
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
)
// linkInfo is used as a map key
type linkInfo struct {
linkType string // Type of link, e.g. TCP, AWDL
local string // Local name or address
remote string // Remote name or address
linkType string // Type of link, e.g. TCP, AWDL
local string // Local name or address
remote string // Remote name or address
}
type link struct {
lname string
links *links
conn *linkConn
options linkOptions
info linkInfo
incoming bool
force bool
lname string
links *links
conn *linkConn
options linkOptions
info linkInfo
incoming bool
force bool
}
type linkOptions struct {
pinnedEd25519Keys map[keyArray]struct{}
pinnedEd25519Keys map[keyArray]struct{}
priority uint8
}
type Listener struct {
net.Listener
closed chan struct{}
net.Listener
closed chan struct{}
}
func (l *Listener) Close() error {
err := l.Listener.Close()
<-l.closed
return err
}
func (l *links) init(c *Core) error {
l.core = c
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l._links = make(map[linkInfo]*link)
var listeners []ListenAddress
phony.Block(c, func() {
listeners = make([]ListenAddress, 0, len(c.config._listeners))
for listener := range c.config._listeners {
listeners = append(listeners, listener)
}
})
return nil
err := l.Listener.Close()
<-l.closed
return err
}
func (l *links) shutdown() {
phony.Block(l.tcp, func() {
for l := range l.tcp._listeners {
_ = l.Close()
}
})
phony.Block(l.tls, func() {
for l := range l.tls._listeners {
_ = l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
_ = l.Close()
}
})
phony.Block(l.tcp, func() {
for l := range l.tcp._listeners {
_ = l.Close()
}
})
phony.Block(l.tls, func() {
for l := range l.tls._listeners {
_ = l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
_ = l.Close()
}
})
}
func (l *links) isConnectedTo(info linkInfo) bool {
var isConnected bool
phony.Block(l, func() {
_, isConnected = l._links[info]
})
return isConnected
}
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
info := linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
return info, nil
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
}
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
return info, fmt.Errorf("pinned key contains invalid hex characters")
}
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
return info, fmt.Errorf("priority invalid: %w", err)
}
options.priority = uint8(pi)
}
switch info.linkType {
case "tcp":
go func() {
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
}
}()
case "socks":
go func() {
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
}
}()
case "tls":
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
tlsSNI = sni
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tlsSNI = host
}
}
go func() {
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
}
}()
case "unix":
go func() {
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
}
}()
default:
return info, errors.New("unknown call scheme: " + u.Scheme)
}
return info, nil
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
var isConnected bool
phony.Block(l, func() {
_, isConnected = l._links[info]
})
return isConnected
}
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
@ -329,6 +211,9 @@ func (intf *link) close() error {
}
func linkInfoFor(linkType, sintf, remote string) linkInfo {
if h, _, err := net.SplitHostPort(remote); err == nil {
remote = h
}
return linkInfo{
linkType: linkType,
local: sintf,

159
src/core/link_linux.go Normal file
View file

@ -0,0 +1,159 @@
//go:build linux
// +build linux
package core
import (
"encoding/hex"
"errors"
"io"
"fmt"
"net"
"net/url"
"strconv"
"github.com/Arceliar/phony"
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
)
type links struct {
phony.Inbox
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
sctp *linkSCTP // SCTP interface support
mpath *linkMPATH // Multipath interface support
_links map[linkInfo]*link // *link is nil if connection in progress
// TODO timeout (to remove from switch), read from config.ReadTimeout
}
func (l *links) init(c *Core) error {
l.core = c
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l.sctp = l.newLinkSCTP()
l.mpath = l.newLinkMPATH()
l._links = make(map[linkInfo]*link)
var listeners []ListenAddress
phony.Block(c, func() {
listeners = make([]ListenAddress, 0, len(c.config._listeners))
for listener := range c.config._listeners {
listeners = append(listeners, listener)
}
})
return nil
}
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
info := linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
return info, nil
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
}
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
return info, fmt.Errorf("pinned key contains invalid hex characters")
}
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
return info, fmt.Errorf("priority invalid: %w", err)
}
options.priority = uint8(pi)
}
switch info.linkType {
case "tcp":
go func() {
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
}
}()
case "socks":
go func() {
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
}
}()
case "tls":
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
tlsSNI = sni
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tlsSNI = host
}
}
go func() {
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
}
}()
case "unix":
go func() {
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
}
}()
case "sctp":
go func() {
if err := l.sctp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SCTP %s: %s\n", u.Host, err)
}
}()
case "mpath":
go func() {
if err := l.mpath.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial MPATH %s: %s\n", u.Host, err)
}
}()
default:
return info, errors.New("unknown call scheme: " + u.Scheme)
}
return info, nil
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
case "sctp":
listener, err = l.sctp.listen(u, sintf)
case "mpath":
listener, err = l.mpath.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}

282
src/core/link_mpath.go Normal file
View file

@ -0,0 +1,282 @@
//go:build !android
// +build !android
package core
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"github.com/getlantern/multipath"
"github.com/Arceliar/phony"
)
type linkMPATH struct {
phony.Inbox
*links
listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
}
func (l *links) newLinkMPATH() *linkMPATH {
lt := &linkMPATH{
links: l,
listener: &net.ListenConfig{
KeepAlive: -1,
},
_listeners: map[*Listener]context.CancelFunc{},
}
lt.listener.Control = lt.tcpContext
return lt
}
func (l *linkMPATH) dial(url *url.URL, options linkOptions, sintf string) error {
info := linkInfoFor("mpath", sintf, strings.SplitN(url.Host, "%", 2)[0])
conn, err := l.connFor(url, sintf)
if err != nil {
return err
}
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
return l.handler(uri, info, conn, options, false, false)
}
func (l *linkMPATH) listen(url *url.URL, sintf string) (*Listener, error) {
hostport := url.Host
if sintf != "" {
if host, port, err := net.SplitHostPort(hostport); err == nil {
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
}
}
_, cancel := context.WithCancel(l.core.ctx)
listener, err := l.listenFor(url, sintf)
if err != nil {
cancel()
return nil, err
}
entry := &Listener{
Listener: listener,
closed: make(chan struct{}),
}
phony.Block(l, func() {
l._listeners[entry] = cancel
})
l.core.log.Printf("Multipath listener started on %s", listener.Addr())
go func() {
defer phony.Block(l, func() {
delete(l._listeners, entry)
})
for {
conn, err := listener.Accept()
if err != nil {
cancel()
break
}
addr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("mpath://%s", addr)
info := linkInfoFor("mpath", sintf, strings.SplitN(addr.IP.String(), "%", 2)[0])
if err = l.handler(name, info, conn, linkOptions{}, true, addr.IP.IsLinkLocalUnicast()); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
_ = listener.Close()
close(entry.closed)
l.core.log.Printf("Multipath listener stopped on %s", listener.Addr())
}()
return entry, nil
}
func (l *linkMPATH) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
return l.links.create(
conn, // connection
name, // connection name
info, // connection info
incoming, // not incoming
force, // not forced
options, // connection options
)
}
// Returns the address of the listener.
func (l *linkMPATH) getAddr() *net.TCPAddr {
// TODO: Fix this, because this will currently only give a single address
// to multicast.go, which obviously is not great, but right now multicast.go
// doesn't have the ability to send more than one address in a packet either
var addr *net.TCPAddr
phony.Block(l, func() {
for listener := range l._listeners {
addr = listener.Addr().(*net.TCPAddr)
}
})
return addr
}
func (l *linkMPATH) connFor(url *url.URL, sinterfaces string) (net.Conn, error) {
//Peer url has following format: mpath://host-1:port-1/host-2:port-2.../host-n:port-n
hosts := strings.Split(url.String(), "/")[2:]
remoteTargets := make([]net.Addr, 0)
for _, host := range hosts {
dst, err := net.ResolveTCPAddr("tcp", host)
info := linkInfoFor("tcp", sinterfaces, dst.String())
if l.links.isConnectedTo(info) {
continue
}
if err != nil {
l.core.log.Errorln("could not resolve host ", dst.String())
continue
}
if dst.IP.IsLinkLocalUnicast() {
dst.Zone = sinterfaces
if dst.Zone == "" {
l.core.log.Errorln("link-local address requires a zone in ", dst.String())
continue
}
}
remoteTargets = append(remoteTargets, dst)
}
if len(remoteTargets) == 0 {
return nil, fmt.Errorf("no valid target hosts given")
}
dialers := make([]multipath.Dialer, 0)
trackers := make([]multipath.StatsTracker, 0)
if sinterfaces != "" {
sintfarray := strings.Split(sinterfaces, ",")
for _, dst := range remoteTargets {
for _, sintf := range sintfarray {
ief, err := net.InterfaceByName(sintf)
if err != nil {
l.core.log.Errorln("interface %s not found", sintf)
continue
}
if ief.Flags&net.FlagUp == 0 {
l.core.log.Errorln("interface %s is not up", sintf)
continue
}
addrs, err := ief.Addrs()
if err != nil {
l.core.log.Errorln("interface %s addresses not available: %w", sintf, err)
continue
}
dstIp := dst.(*net.TCPAddr).IP
for addrindex, addr := range addrs {
src, _, err := net.ParseCIDR(addr.String())
if err != nil {
continue
}
if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() {
continue
}
bothglobal := src.IsGlobalUnicast() == dstIp.IsGlobalUnicast()
bothlinklocal := src.IsLinkLocalUnicast() == dstIp.IsLinkLocalUnicast()
if !bothglobal && !bothlinklocal {
continue
}
if (src.To4() != nil) != (dstIp.To4() != nil) {
continue
}
if bothglobal || bothlinklocal || addrindex == len(addrs)-1 {
td := newOutboundDialer(src, dst)
dialers = append(dialers, td)
trackers = append(trackers, multipath.NullTracker{})
l.core.log.Printf("added outbound dialer for %s->%s", src.String(), dst.String())
break
}
}
}
}
} else {
star := net.ParseIP("0.0.0.0")
for _, dst := range remoteTargets {
td := newOutboundDialer(star, dst)
dialers = append(dialers, td)
trackers = append(trackers, multipath.NullTracker{})
l.core.log.Printf("added outbound dialer for %s", dst.String())
}
}
if len(dialers) == 0 {
return nil, fmt.Errorf("no suitable source address found on interface %q", sinterfaces)
}
dialer := multipath.NewDialer("mpath", dialers)
//conn, err := dialer.DialContext(l.core.ctx, "tcp", remoteTargets[0].String())
conn, err := dialer.DialContext(l.core.ctx)
if err != nil {
return nil, err
}
return conn, nil
}
func (l *linkMPATH) listenFor(url *url.URL, sintf string) (net.Listener, error) {
//Public node url has following format: mpath://ip-1:port-1/ip-2:port-2.../ip-n:port-n
hosts := strings.Split(url.String(), "/")[2:]
localTargets := make([]string, 0)
for _, host := range hosts {
dst, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
l.core.log.Errorln("could not resolve host ", dst.String())
continue
}
if dst.IP.IsLinkLocalUnicast() {
dst.Zone = sintf
if dst.Zone == "" {
l.core.log.Errorln("link-local address requires a zone in ", dst.String())
continue
}
}
localTargets = append(localTargets, host)
}
if len(localTargets) == 0 {
return nil, fmt.Errorf("no valid target hosts given")
}
listeners := make([]net.Listener, 0)
trackers := make([]multipath.StatsTracker, 0)
for _, lT := range localTargets {
l, err := l.listener.Listen(l.core.ctx, "tcp", lT)
if err != nil {
continue
}
listeners = append(listeners, l)
trackers = append(trackers, multipath.NullTracker{})
}
listener := multipath.NewListener(listeners, trackers)
return listener, nil
}
type targetedDailer struct {
localDialer net.Dialer
remoteAddr net.Addr
}
func newOutboundDialer(inputLocalAddr net.IP, inputRemoteAddr net.Addr) *targetedDailer {
td := &targetedDailer{
localDialer: net.Dialer{
LocalAddr: &net.TCPAddr{
IP: inputLocalAddr,
Port: 0,
},
},
remoteAddr: inputRemoteAddr,
}
return td
}
func (td *targetedDailer) DialContext(ctx context.Context) (net.Conn, error) {
conn, err := td.localDialer.DialContext(ctx, "tcp", td.remoteAddr.String())
if err != nil {
//l.core.log.Errorln("failed to dial to %v: %v", td.remoteAddr.String(), err)
return nil, err
}
//l.core.log.Printf("Dialed to %v->%v", conn.LocalAddr(), td.remoteAddr.String())
return conn, err
}
func (td *targetedDailer) Label() string {
return fmt.Sprintf("%s|%s", td.localDialer.LocalAddr, td.remoteAddr)
}

View file

@ -0,0 +1,270 @@
//go:build android
// +build android
package core
import (
"context"
"fmt"
"net"
"net/url"
"net/netip"
"strings"
"github.com/getlantern/multipath"
"github.com/Arceliar/phony"
)
type linkMPATH struct {
phony.Inbox
*links
listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
}
func (l *links) newLinkMPATH() *linkMPATH {
lt := &linkMPATH{
links: l,
listener: &net.ListenConfig{
KeepAlive: -1,
},
_listeners: map[*Listener]context.CancelFunc{},
}
lt.listener.Control = lt.tcpContext
return lt
}
func (l *linkMPATH) dial(url *url.URL, options linkOptions, sintf string) error {
info := linkInfoFor("mpath", sintf, strings.SplitN(url.Host, "%", 2)[0])
if l.links.isConnectedTo(info) {
return fmt.Errorf("duplicate connection attempt")
}
conn, err := l.connFor(url, sintf)
if err != nil {
return err
}
return l.handler(url.String(), info, conn, options, false)
}
func (l *linkMPATH) listen(url *url.URL, sintf string) (*Listener, error) {
hostport := url.Host
if sintf != "" {
if host, port, err := net.SplitHostPort(hostport); err == nil {
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
}
}
_, cancel := context.WithCancel(l.core.ctx)
listener, err := l.listenFor(url, sintf)
if err != nil {
cancel()
return nil, err
}
entry := &Listener{
Listener: listener,
closed: make(chan struct{}),
}
phony.Block(l, func() {
l._listeners[entry] = cancel
})
l.core.log.Printf("Multipath listener started on %s", listener.Addr())
go func() {
defer phony.Block(l, func() {
delete(l._listeners, entry)
})
for {
conn, err := listener.Accept()
if err != nil {
cancel()
break
}
addr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("mpath://%s", addr)
info := linkInfoFor("mpath", sintf, strings.SplitN(addr.IP.String(), "%", 2)[0])
if err = l.handler(name, info, conn, linkOptions{}, true); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
_ = listener.Close()
close(entry.closed)
l.core.log.Printf("Multipath listener stopped on %s", listener.Addr())
}()
return entry, nil
}
func (l *linkMPATH) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
return l.links.create(
conn, // connection
name, // connection name
info, // connection info
incoming, // not incoming
false, // not forced
options, // connection options
)
}
// Returns the address of the listener.
func (l *linkMPATH) getAddr() *net.TCPAddr {
// TODO: Fix this, because this will currently only give a single address
// to multicast.go, which obviously is not great, but right now multicast.go
// doesn't have the ability to send more than one address in a packet either
var addr *net.TCPAddr
phony.Block(l, func() {
for listener := range l._listeners {
addr = listener.Addr().(*net.TCPAddr)
}
})
return addr
}
func (l *linkMPATH) connFor(url *url.URL, sinterfaces string) (net.Conn, error) {
//Peer url has following format: mpath://host-1:port-1/host-2:port-2.../host-n:port-n
hosts := strings.Split(url.String(), "/")[2:]
remoteTargets := make([]net.Addr, 0)
for _, host := range hosts {
dst, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
l.core.log.Errorln("could not resolve host ", dst.String())
continue
}
if dst.IP.IsLinkLocalUnicast() {
dst.Zone = sinterfaces
if dst.Zone == "" {
l.core.log.Errorln("link-local address requires a zone in ", dst.String())
continue
}
}
remoteTargets = append(remoteTargets, dst)
}
if len(remoteTargets) == 0 {
return nil, fmt.Errorf("no valid target hosts given")
}
dialers := make([]multipath.Dialer, 0)
trackers := make([]multipath.StatsTracker, 0)
if sinterfaces != "" {
sintfarray := strings.Split(sinterfaces, ",")
for _, dst := range remoteTargets {
for _, sintf := range sintfarray {
addr, err := netip.ParseAddr(sintf)
if err != nil {
l.core.log.Errorln("interface %s address incorrect: %w", sintf, err)
continue
}
src := net.ParseIP(addr.WithZone("").String())
dstIp := dst.(*net.TCPAddr).IP
if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() {
continue
}
bothglobal := src.IsGlobalUnicast() == dstIp.IsGlobalUnicast()
bothlinklocal := src.IsLinkLocalUnicast() == dstIp.IsLinkLocalUnicast()
if !bothglobal && !bothlinklocal {
continue
}
if (src.To4() != nil) != (dstIp.To4() != nil) {
continue
}
if bothglobal || bothlinklocal {
td := newOutboundDialer(src, dst)
dialers = append(dialers, td)
trackers = append(trackers, multipath.NullTracker{})
l.core.log.Printf("added outbound dialer for %s->%s", src.String(), dst.String())
}
}
}
} else {
star := net.ParseIP("0.0.0.0")
for _, dst := range remoteTargets {
td := newOutboundDialer(star, dst)
dialers = append(dialers, td)
trackers = append(trackers, multipath.NullTracker{})
l.core.log.Printf("added outbound dialer for %s", dst.String())
}
}
if len(dialers) == 0 {
return nil, fmt.Errorf("no suitable source address found on interface %q", sinterfaces)
}
dialer := multipath.NewDialer("mpath", dialers)
//conn, err := dialer.DialContext(l.core.ctx, "tcp", remoteTargets[0].String())
conn, err := dialer.DialContext(l.core.ctx)
if err != nil {
return nil, err
}
return conn, nil
}
func (l *linkMPATH) listenFor(url *url.URL, sintf string) (net.Listener, error) {
//Public node url has following format: mpath://ip-1:port-1/ip-2:port-2.../ip-n:port-n
hosts := strings.Split(url.String(), "/")[2:]
localTargets := make([]string, 0)
for _, host := range hosts {
dst, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
l.core.log.Errorln("could not resolve host ", dst.String())
continue
}
if dst.IP.IsLinkLocalUnicast() {
dst.Zone = sintf
if dst.Zone == "" {
l.core.log.Errorln("link-local address requires a zone in ", dst.String())
continue
}
}
localTargets = append(localTargets, host)
}
if len(localTargets) == 0 {
return nil, fmt.Errorf("no valid target hosts given")
}
listeners := make([]net.Listener, 0)
trackers := make([]multipath.StatsTracker, 0)
for _, lT := range localTargets {
l, err := l.listener.Listen(l.core.ctx, "tcp", lT)
if err != nil {
continue
}
listeners = append(listeners, l)
trackers = append(trackers, multipath.NullTracker{})
}
listener := multipath.NewListener(listeners, trackers)
return listener, nil
}
type targetedDailer struct {
localDialer net.Dialer
remoteAddr net.Addr
}
func newOutboundDialer(inputLocalAddr net.IP, inputRemoteAddr net.Addr) *targetedDailer {
td := &targetedDailer{
localDialer: net.Dialer{
LocalAddr: &net.TCPAddr{
IP: inputLocalAddr,
Port: 0,
},
},
remoteAddr: inputRemoteAddr,
}
return td
}
func (td *targetedDailer) DialContext(ctx context.Context) (net.Conn, error) {
conn, err := td.localDialer.DialContext(ctx, "tcp", td.remoteAddr.String())
if err != nil {
//l.core.log.Errorln("failed to dial to %v: %v", td.remoteAddr.String(), err)
return nil, err
}
//l.core.log.Printf("Dialed to %v->%v", conn.LocalAddr(), td.remoteAddr.String())
return conn, err
}
func (td *targetedDailer) Label() string {
return fmt.Sprintf("%s|%s", td.localDialer.LocalAddr, td.remoteAddr)
}

View file

@ -0,0 +1,33 @@
//go:build darwin
// +build darwin
package core
import (
"syscall"
"golang.org/x/sys/unix"
)
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
func (t *linkMPATH) tcpContext(network, address string, c syscall.RawConn) error {
var control error
var recvanyif error
control = c.Control(func(fd uintptr) {
// sys/socket.h: #define SO_RECV_ANYIF 0x1104
recvanyif = unix.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 0x1104, 1)
})
switch {
case recvanyif != nil:
return recvanyif
default:
return control
}
}
func (t *linkMPATH) getControl(sintf string) func(string, string, syscall.RawConn) error {
return t.tcpContext
}

View file

@ -0,0 +1,46 @@
//go:build linux
// +build linux
package core
import (
"syscall"
"golang.org/x/sys/unix"
)
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
func (t *linkMPATH) tcpContext(network, address string, c syscall.RawConn) error {
var control error
var bbr error
control = c.Control(func(fd uintptr) {
bbr = unix.SetsockoptString(int(fd), unix.IPPROTO_TCP, unix.TCP_CONGESTION, "bbr")
})
// Log any errors
if bbr != nil {
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr)
}
if control != nil {
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control)
}
// Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal
return nil
}
func (t *linkMPATH) getControl(sintf string) func(string, string, syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
var err error
btd := func(fd uintptr) {
err = unix.BindToDevice(int(fd), sintf)
}
_ = c.Control(btd)
if err != nil {
t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf)
}
return t.tcpContext(network, address, c)
}
}

View file

@ -0,0 +1,18 @@
//go:build !darwin && !linux
// +build !darwin,!linux
package core
import (
"syscall"
)
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
func (t *linkMPATH) tcpContext(network, address string, c syscall.RawConn) error {
return nil
}
func (t *linkMPATH) getControl(sintf string) func(string, string, syscall.RawConn) error {
return t.tcpContext
}

147
src/core/link_other.go Normal file
View file

@ -0,0 +1,147 @@
//go:build !linux
// +build !linux
package core
import (
"encoding/hex"
"errors"
"fmt"
"net"
"net/url"
"github.com/Arceliar/phony"
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
)
type links struct {
phony.Inbox
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
mpath *linkMPATH // Multipath interface support
_links map[linkInfo]*link // *link is nil if connection in progress
// TODO timeout (to remove from switch), read from config.ReadTimeout
}
func (l *links) init(c *Core) error {
l.core = c
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l.mpath = l.newLinkMPATH()
l._links = make(map[linkInfo]*link)
var listeners []ListenAddress
phony.Block(c, func() {
listeners = make([]ListenAddress, 0, len(c.config._listeners))
for listener := range c.config._listeners {
listeners = append(listeners, listener)
}
})
return nil
}
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
info := linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
return nil
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
}
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
return fmt.Errorf("pinned key contains invalid hex characters")
}
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
return info, fmt.Errorf("priority invalid: %w", err)
}
options.priority = uint8(pi)
switch info.linkType {
case "tcp":
go func() {
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
}
}()
case "socks":
go func() {
if err := l.socks.dial(u, options); err != nil && err != io.EOF
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
}
}()
case "tls":
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
if sni := u.Query().Get("sni"); sni != "" {
if net.ParseIP(sni) == nil {
tlsSNI = sni
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tlsSNI = host
}
}
go func() {
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
}
}()
case "unix":
go func() {
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
if err := l.unix.dial(u, options, sintf); err != nil {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
}
}()
case "mpath":
go func() {
if err := l.mpath.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial MPATH %s: %s\n", u.Host, err)
}
}()
default:
return errors.New("unknown call scheme: " + u.Scheme)
}
return nil
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
case "mpath":
listener, err = l.mpath.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}

176
src/core/link_sctp_linux.go Normal file
View file

@ -0,0 +1,176 @@
//go:build linux
// +build linux
package core
import (
"context"
"fmt"
"net"
"net/url"
"encoding/json"
"strings"
"strconv"
"github.com/Arceliar/phony"
sctp "github.com/vikulin/sctp"
)
type linkSCTP struct {
phony.Inbox
*links
listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
}
func (l *links) newLinkSCTP() *linkSCTP {
lt := &linkSCTP{
links: l,
listener: &net.ListenConfig{
KeepAlive: -1,
},
_listeners: map[*Listener]context.CancelFunc{},
}
return lt
}
func (l *linkSCTP) dial(url *url.URL, options linkOptions, sintf string) error {
info := linkInfoFor("sctp", sintf, strings.SplitN(url.Host, "%", 2)[0])
if l.links.isConnectedTo(info) {
return nil
}
host, port, err := net.SplitHostPort(url.Host)
if err != nil {
return err
}
dst, err := net.ResolveIPAddr("ip", host)
if err != nil {
return err
}
raddress := l.getAddress(dst.String()+":"+port)
var conn net.Conn
laddress := l.getAddress("0.0.0.0:0")
conn, err = sctp.NewSCTPConnection(laddress, laddress.AddressFamily, sctp.InitMsg{NumOstreams: 2, MaxInstreams: 2, MaxAttempts: 2, MaxInitTimeout: 5}, sctp.OneToOne, false)
if err != nil {
return err
}
err = conn.(*sctp.SCTPConn).Connect(raddress)
//conn.(*sctp.SCTPConn).SetWriteBuffer(324288)
//conn.(*sctp.SCTPConn).SetReadBuffer(324288)
//wbuf, _ := conn.(*sctp.SCTPConn).GetWriteBuffer()
//rbuf, _ := conn.(*sctp.SCTPConn).GetReadBuffer()
//l.core.log.Printf("Read buffer %d", rbuf)
//l.core.log.Printf("Write buffer %d", wbuf)
conn.(*sctp.SCTPConn).SetEvents(sctp.SCTP_EVENT_DATA_IO)
return l.handler(url.String(), info, conn, options, false)
}
func (l *linkSCTP) listen(url *url.URL, sintf string) (*Listener, error) {
//_, cancel := context.WithCancel(l.core.ctx)
hostport := url.Host
if sintf != "" {
if host, port, err := net.SplitHostPort(hostport); err == nil {
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
}
}
addr := l.getAddress(url.Host)
listener, err := sctp.NewSCTPListener(addr, sctp.InitMsg{NumOstreams: 2, MaxInstreams: 2, MaxAttempts: 2, MaxInitTimeout: 5}, sctp.OneToOne, false)
if err != nil {
//cancel()
return nil, err
}
listener.SetEvents(sctp.SCTP_EVENT_DATA_IO)
entry := &Listener{
Listener: listener,
closed: make(chan struct{}),
}
//phony.Block(l, func() {
// l._listeners[entry] = cancel
//})
l.core.log.Printf("SCTP listener started on %s", listener.Addr())
go func() {
defer phony.Block(l, func() {
delete(l._listeners, entry)
})
for {
conn, err := listener.Accept()
if err != nil {
//cancel()
break
}
addr := conn.RemoteAddr().(*sctp.SCTPAddr)
ips, err := json.Marshal(addr.IPAddrs)
if err != nil {
break
}
name := fmt.Sprintf("sctp://%s", ips)
info := linkInfoFor("sctp", sintf, string(ips))
//conn.(*sctp.SCTPConn).SetWriteBuffer(324288)
//conn.(*sctp.SCTPConn).SetReadBuffer(324288)
wbuf, _ := conn.(*sctp.SCTPConn).GetWriteBuffer()
rbuf, _ := conn.(*sctp.SCTPConn).GetReadBuffer()
l.core.log.Printf("Read buffer %d", rbuf)
l.core.log.Printf("Write buffer %d", wbuf)
if err = l.handler(name, info, conn, linkOptions{}, true); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
_ = listener.Close()
close(entry.closed)
l.core.log.Printf("SCTP listener stopped on %s", listener.Addr())
}()
return entry, nil
}
func (l *linkSCTP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
return l.links.create(
conn, // connection
name, // connection name
info, // connection info
incoming, // not incoming
false, // not forced
options, // connection options
)
}
// Returns the address of the listener.
func (l *linkSCTP) getAddr() *net.TCPAddr {
// TODO: Fix this, because this will currently only give a single address
// to multicast.go, which obviously is not great, but right now multicast.go
// doesn't have the ability to send more than one address in a packet either
var addr *net.TCPAddr
phony.Block(l, func() {
for listener := range l._listeners {
addr = listener.Addr().(*net.TCPAddr)
}
})
return addr
}
//SCTP infrastructure
func (l *linkSCTP) getAddress(host string) *sctp.SCTPAddr {
//sctp supports multihoming but current implementation reuires only one path
ips := []net.IPAddr{}
ip, port, err := net.SplitHostPort(host)
if err != nil {
panic(err)
}
for _, i := range strings.Split(ip, ",") {
if a, err := net.ResolveIPAddr("ip", i); err == nil {
fmt.Sprintf("Resolved address '%s' to %s", i, a)
ips = append(ips, *a)
} else {
l.core.log.Errorln("Error resolving address '%s': %v", i, err)
}
}
p, _ := strconv.Atoi(port)
addr := &sctp.SCTPAddr{
IPAddrs: ips,
Port: p,
}
return addr
}

View file

@ -181,6 +181,7 @@ func (l *linkTCP) dialerFor(dst *net.TCPAddr, sintf string) (*net.Dialer, error)
return dialer, nil
}
func tcpIDFor(local net.Addr, remoteAddr *net.TCPAddr) string {
if localAddr, ok := local.(*net.TCPAddr); ok && localAddr.IP.Equal(remoteAddr.IP) {
// Nodes running on the same host — include both the IP and port.

View file

@ -10,7 +10,10 @@ import (
iwt "github.com/Arceliar/ironwood/types"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
//"github.com/RiV-chain/RiV-mesh/src/crypto"
"github.com/RiV-chain/RiV-mesh/src/version"
)
type nodeinfo struct {

View file

@ -12,7 +12,7 @@ import (
iwt "github.com/Arceliar/ironwood/types"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/RiV-chain/RiV-mesh/src/address"
)
const (

View file

@ -20,7 +20,7 @@ type version_metadata struct {
// Gets a base metadata with no keys set, but with the correct version numbers.
func version_getBaseMetadata() version_metadata {
return version_metadata{
meta: [4]byte{'m', 'e', 't', 'a'},
meta: [4]byte{'z', 'e', 't', 'a'},
ver: 0,
minorVer: 4,
}