mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-28 22:25:07 +03:00
rename src/yggdrasil to src/core
This commit is contained in:
parent
0343dad934
commit
018f35d9a2
17 changed files with 29 additions and 29 deletions
228
src/core/api.go
Normal file
228
src/core/api.go
Normal file
|
@ -0,0 +1,228 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
//"encoding/hex"
|
||||
"errors"
|
||||
//"fmt"
|
||||
"net"
|
||||
//"sort"
|
||||
//"time"
|
||||
|
||||
"github.com/gologme/log"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
//"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
//"github.com/Arceliar/phony"
|
||||
)
|
||||
|
||||
type Self struct {
|
||||
Key ed25519.PublicKey
|
||||
Root ed25519.PublicKey
|
||||
Coords []uint64
|
||||
}
|
||||
|
||||
type Peer struct {
|
||||
Key ed25519.PublicKey
|
||||
Root ed25519.PublicKey
|
||||
Coords []uint64
|
||||
Port uint64
|
||||
}
|
||||
|
||||
type DHTEntry struct {
|
||||
Key ed25519.PublicKey
|
||||
Port uint64
|
||||
Rest uint64
|
||||
}
|
||||
|
||||
type PathEntry struct {
|
||||
Key ed25519.PublicKey
|
||||
Path []uint64
|
||||
}
|
||||
|
||||
type Session struct {
|
||||
Key ed25519.PublicKey
|
||||
}
|
||||
|
||||
func (c *Core) GetSelf() Self {
|
||||
var self Self
|
||||
s := c.PacketConn.PacketConn.Debug.GetSelf()
|
||||
self.Key = s.Key
|
||||
self.Root = s.Root
|
||||
self.Coords = s.Coords
|
||||
return self
|
||||
}
|
||||
|
||||
func (c *Core) GetPeers() []Peer {
|
||||
var peers []Peer
|
||||
ps := c.PacketConn.PacketConn.Debug.GetPeers()
|
||||
for _, p := range ps {
|
||||
var info Peer
|
||||
info.Key = p.Key
|
||||
info.Root = p.Root
|
||||
info.Coords = p.Coords
|
||||
info.Port = p.Port
|
||||
peers = append(peers, info)
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
func (c *Core) GetDHT() []DHTEntry {
|
||||
var dhts []DHTEntry
|
||||
ds := c.PacketConn.PacketConn.Debug.GetDHT()
|
||||
for _, d := range ds {
|
||||
var info DHTEntry
|
||||
info.Key = d.Key
|
||||
info.Port = d.Port
|
||||
info.Rest = d.Rest
|
||||
dhts = append(dhts, info)
|
||||
}
|
||||
return dhts
|
||||
}
|
||||
|
||||
func (c *Core) GetPaths() []PathEntry {
|
||||
var paths []PathEntry
|
||||
ps := c.PacketConn.PacketConn.Debug.GetPaths()
|
||||
for _, p := range ps {
|
||||
var info PathEntry
|
||||
info.Key = p.Key
|
||||
info.Path = p.Path
|
||||
paths = append(paths, info)
|
||||
}
|
||||
return paths
|
||||
}
|
||||
|
||||
func (c *Core) GetSessions() []Session {
|
||||
var sessions []Session
|
||||
ss := c.PacketConn.Debug.GetSessions()
|
||||
for _, s := range ss {
|
||||
var info Session
|
||||
info.Key = s.Key
|
||||
sessions = append(sessions, info)
|
||||
}
|
||||
return sessions
|
||||
}
|
||||
|
||||
// ListenTCP starts a new TCP listener. The input URI should match that of the
|
||||
// "Listen" configuration item, e.g.
|
||||
// tcp://a.b.c.d:e
|
||||
func (c *Core) ListenTCP(uri string) (*TcpListener, error) {
|
||||
return c.links.tcp.listen(uri, nil)
|
||||
}
|
||||
|
||||
// ListenTLS starts a new TLS listener. The input URI should match that of the
|
||||
// "Listen" configuration item, e.g.
|
||||
// tls://a.b.c.d:e
|
||||
func (c *Core) ListenTLS(uri string) (*TcpListener, error) {
|
||||
return c.links.tcp.listen(uri, c.links.tcp.tls.forListener)
|
||||
}
|
||||
|
||||
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128
|
||||
// address. The IPv6 address is only relevant when the node is operating as an
|
||||
// IP router and often is meaningless when embedded into an application, unless
|
||||
// that application also implements either VPN functionality or deals with IP
|
||||
// packets specifically.
|
||||
func (c *Core) Address() net.IP {
|
||||
addr := net.IP(address.AddrForKey(c.public)[:])
|
||||
return addr
|
||||
}
|
||||
|
||||
// Subnet gets the routed IPv6 subnet of the Yggdrasil node. This is always a
|
||||
// /64 subnet. The IPv6 subnet is only relevant when the node is operating as an
|
||||
// IP router and often is meaningless when embedded into an application, unless
|
||||
// that application also implements either VPN functionality or deals with IP
|
||||
// packets specifically.
|
||||
func (c *Core) Subnet() net.IPNet {
|
||||
subnet := address.SubnetForKey(c.public)[:]
|
||||
subnet = append(subnet, 0, 0, 0, 0, 0, 0, 0, 0)
|
||||
return net.IPNet{IP: subnet, Mask: net.CIDRMask(64, 128)}
|
||||
}
|
||||
|
||||
// SetLogger sets the output logger of the Yggdrasil node after startup. This
|
||||
// may be useful if you want to redirect the output later. Note that this
|
||||
// expects a Logger from the github.com/gologme/log package and not from Go's
|
||||
// built-in log package.
|
||||
func (c *Core) SetLogger(log *log.Logger) {
|
||||
c.log = log
|
||||
}
|
||||
|
||||
// AddPeer adds a peer. This should be specified in the peer URI format, e.g.:
|
||||
// tcp://a.b.c.d:e
|
||||
// socks://a.b.c.d:e/f.g.h.i:j
|
||||
// This adds the peer to the peer list, so that they will be called again if the
|
||||
// connection drops.
|
||||
func (c *Core) AddPeer(addr string, sintf string) error {
|
||||
if err := c.CallPeer(addr, sintf); err != nil {
|
||||
// TODO: We maybe want this to write the peer to the persistent
|
||||
// configuration even if a connection attempt fails, but first we'll need to
|
||||
// move the code to check the peer URI so that we don't deliberately save a
|
||||
// peer with a known bad URI. Loading peers from config should really do the
|
||||
// same thing too but I don't think that happens today
|
||||
return err
|
||||
}
|
||||
c.config.Mutex.Lock()
|
||||
defer c.config.Mutex.Unlock()
|
||||
if sintf == "" {
|
||||
for _, peer := range c.config.Current.Peers {
|
||||
if peer == addr {
|
||||
return errors.New("peer already added")
|
||||
}
|
||||
}
|
||||
c.config.Current.Peers = append(c.config.Current.Peers, addr)
|
||||
} else {
|
||||
if _, ok := c.config.Current.InterfacePeers[sintf]; ok {
|
||||
for _, peer := range c.config.Current.InterfacePeers[sintf] {
|
||||
if peer == addr {
|
||||
return errors.New("peer already added")
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, ok := c.config.Current.InterfacePeers[sintf]; !ok {
|
||||
c.config.Current.InterfacePeers[sintf] = []string{addr}
|
||||
} else {
|
||||
c.config.Current.InterfacePeers[sintf] = append(c.config.Current.InterfacePeers[sintf], addr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (c *Core) RemovePeer(addr string, sintf string) error {
|
||||
if sintf == "" {
|
||||
for i, peer := range c.config.Current.Peers {
|
||||
if peer == addr {
|
||||
c.config.Current.Peers = append(c.config.Current.Peers[:i], c.config.Current.Peers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if _, ok := c.config.Current.InterfacePeers[sintf]; ok {
|
||||
for i, peer := range c.config.Current.InterfacePeers[sintf] {
|
||||
if peer == addr {
|
||||
c.config.Current.InterfacePeers[sintf] = append(c.config.Current.InterfacePeers[sintf][:i], c.config.Current.InterfacePeers[sintf][i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
panic("TODO") // Get the net.Conn to this peer (if any) and close it
|
||||
c.peers.Act(nil, func() {
|
||||
ports := c.peers.ports
|
||||
for _, peer := range ports {
|
||||
if addr == peer.intf.name() {
|
||||
c.peers._removePeer(peer)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
|
||||
// CallPeer calls a peer once. This should be specified in the peer URI format,
|
||||
// e.g.:
|
||||
// tcp://a.b.c.d:e
|
||||
// socks://a.b.c.d:e/f.g.h.i:j
|
||||
// This does not add the peer to the peer list, so if the connection drops, the
|
||||
// peer will not be called again automatically.
|
||||
func (c *Core) CallPeer(addr string, sintf string) error {
|
||||
return c.links.call(addr, sintf)
|
||||
}
|
168
src/core/core.go
Normal file
168
src/core/core.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
iw "github.com/Arceliar/ironwood/encrypted"
|
||||
"github.com/Arceliar/phony"
|
||||
"github.com/gologme/log"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||
//"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/version"
|
||||
)
|
||||
|
||||
// The Core object represents the Yggdrasil node. You should create a Core
|
||||
// object for each Yggdrasil node you plan to run.
|
||||
type Core struct {
|
||||
// This is the main data structure that holds everything else for a node
|
||||
// We're going to keep our own copy of the provided config - that way we can
|
||||
// guarantee that it will be covered by the mutex
|
||||
phony.Inbox
|
||||
*iw.PacketConn
|
||||
config config.NodeState // Config
|
||||
secret ed25519.PrivateKey
|
||||
public ed25519.PublicKey
|
||||
links links
|
||||
log *log.Logger
|
||||
addPeerTimer *time.Timer
|
||||
}
|
||||
|
||||
func (c *Core) _init() error {
|
||||
// TODO separate init and start functions
|
||||
// Init sets up structs
|
||||
// Start launches goroutines that depend on structs being set up
|
||||
// This is pretty much required to completely avoid race conditions
|
||||
if c.log == nil {
|
||||
c.log = log.New(ioutil.Discard, "", 0)
|
||||
}
|
||||
|
||||
current := c.config.GetCurrent()
|
||||
|
||||
sigPriv, err := hex.DecodeString(current.PrivateKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(sigPriv) < ed25519.PrivateKeySize {
|
||||
return errors.New("PrivateKey is incorrect length")
|
||||
}
|
||||
|
||||
c.secret = ed25519.PrivateKey(sigPriv)
|
||||
c.public = c.secret.Public().(ed25519.PublicKey)
|
||||
// TODO check public against current.PublicKey, error if they don't match
|
||||
|
||||
c.PacketConn, err = iw.NewPacketConn(c.secret)
|
||||
return err
|
||||
}
|
||||
|
||||
// If any static peers were provided in the configuration above then we should
|
||||
// configure them. The loop ensures that disconnected peers will eventually
|
||||
// be reconnected with.
|
||||
func (c *Core) _addPeerLoop() {
|
||||
// Get the peers from the config - these could change!
|
||||
current := c.config.GetCurrent()
|
||||
|
||||
// Add peers from the Peers section
|
||||
for _, peer := range current.Peers {
|
||||
go func(peer, intf string) {
|
||||
if err := c.CallPeer(peer, intf); err != nil {
|
||||
c.log.Errorln("Failed to add peer:", err)
|
||||
}
|
||||
}(peer, "") // TODO: this should be acted and not in a goroutine?
|
||||
}
|
||||
|
||||
// Add peers from the InterfacePeers section
|
||||
for intf, intfpeers := range current.InterfacePeers {
|
||||
for _, peer := range intfpeers {
|
||||
go func(peer, intf string) {
|
||||
if err := c.CallPeer(peer, intf); err != nil {
|
||||
c.log.Errorln("Failed to add peer:", err)
|
||||
}
|
||||
}(peer, intf) // TODO: this should be acted and not in a goroutine?
|
||||
}
|
||||
}
|
||||
|
||||
c.addPeerTimer = time.AfterFunc(time.Minute, func() {
|
||||
c.Act(nil, c._addPeerLoop)
|
||||
})
|
||||
}
|
||||
|
||||
// Start starts up Yggdrasil using the provided config.NodeConfig, and outputs
|
||||
// debug logging through the provided log.Logger. The started stack will include
|
||||
// TCP and UDP sockets, a multicast discovery socket, an admin socket, router,
|
||||
// switch and DHT node. A config.NodeState is returned which contains both the
|
||||
// current and previous configurations (from reconfigures).
|
||||
func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (conf *config.NodeState, err error) {
|
||||
phony.Block(c, func() {
|
||||
conf, err = c._start(nc, log)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// This function is unsafe and should only be ran by the core actor.
|
||||
func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState, error) {
|
||||
c.log = log
|
||||
|
||||
c.config = config.NodeState{
|
||||
Current: *nc,
|
||||
Previous: *nc,
|
||||
}
|
||||
|
||||
if name := version.BuildName(); name != "unknown" {
|
||||
c.log.Infoln("Build name:", name)
|
||||
}
|
||||
if version := version.BuildVersion(); version != "unknown" {
|
||||
c.log.Infoln("Build version:", version)
|
||||
}
|
||||
|
||||
c.log.Infoln("Starting up...")
|
||||
if err := c._init(); err != nil {
|
||||
c.log.Errorln("Failed to initialize core")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.links.init(c); err != nil {
|
||||
c.log.Errorln("Failed to start link interfaces")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//if err := c.switchTable.start(); err != nil {
|
||||
// c.log.Errorln("Failed to start switch")
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
//if err := c.router.start(); err != nil {
|
||||
// c.log.Errorln("Failed to start router")
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
c.Act(c, c._addPeerLoop)
|
||||
|
||||
c.log.Infoln("Startup complete")
|
||||
return &c.config, nil
|
||||
}
|
||||
|
||||
// Stop shuts down the Yggdrasil node.
|
||||
func (c *Core) Stop() {
|
||||
phony.Block(c, c._stop)
|
||||
}
|
||||
|
||||
// This function is unsafe and should only be ran by the core actor.
|
||||
func (c *Core) _stop() {
|
||||
c.PacketConn.Close()
|
||||
c.log.Infoln("Stopping...")
|
||||
if c.addPeerTimer != nil {
|
||||
c.addPeerTimer.Stop()
|
||||
}
|
||||
c.links.stop()
|
||||
/* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown
|
||||
for _, peer := range c.GetPeers() {
|
||||
c.DisconnectPeer(peer.Port)
|
||||
}
|
||||
*/
|
||||
c.log.Infoln("Stopped")
|
||||
}
|
204
src/core/core_test.go
Normal file
204
src/core/core_test.go
Normal file
|
@ -0,0 +1,204 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gologme/log"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||
)
|
||||
|
||||
// GenerateConfig produces default configuration with suitable modifications for tests.
|
||||
func GenerateConfig() *config.NodeConfig {
|
||||
cfg := config.GenerateConfig()
|
||||
cfg.AdminListen = "none"
|
||||
cfg.Listen = []string{"tcp://127.0.0.1:0"}
|
||||
cfg.IfName = "none"
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
// GetLoggerWithPrefix creates a new logger instance with prefix.
|
||||
// If verbose is set to true, three log levels are enabled: "info", "warn", "error".
|
||||
func GetLoggerWithPrefix(prefix string, verbose bool) *log.Logger {
|
||||
l := log.New(os.Stderr, prefix, log.Flags())
|
||||
if !verbose {
|
||||
return l
|
||||
}
|
||||
l.EnableLevel("info")
|
||||
l.EnableLevel("warn")
|
||||
l.EnableLevel("error")
|
||||
return l
|
||||
}
|
||||
|
||||
// CreateAndConnectTwo creates two nodes. nodeB connects to nodeA.
|
||||
// Verbosity flag is passed to logger.
|
||||
func CreateAndConnectTwo(t testing.TB, verbose bool) (nodeA *Core, nodeB *Core) {
|
||||
nodeA = new(Core)
|
||||
_, err := nodeA.Start(GenerateConfig(), GetLoggerWithPrefix("A: ", verbose))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nodeB = new(Core)
|
||||
_, err = nodeB.Start(GenerateConfig(), GetLoggerWithPrefix("B: ", verbose))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = nodeB.AddPeer("tcp://"+nodeA.link.tcp.getAddr().String(), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if l := len(nodeA.GetPeers()); l != 1 {
|
||||
t.Fatal("unexpected number of peers", l)
|
||||
}
|
||||
if l := len(nodeB.GetPeers()); l != 1 {
|
||||
t.Fatal("unexpected number of peers", l)
|
||||
}
|
||||
|
||||
return nodeA, nodeB
|
||||
}
|
||||
|
||||
// WaitConnected blocks until either nodes negotiated DHT or 5 seconds passed.
|
||||
func WaitConnected(nodeA, nodeB *Core) bool {
|
||||
// It may take up to 3 seconds, but let's wait 5.
|
||||
for i := 0; i < 50; i++ {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if len(nodeA.GetSwitchPeers()) > 0 && len(nodeB.GetSwitchPeers()) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CreateEchoListener creates a routine listening on nodeA. It expects repeats messages of length bufLen.
|
||||
// It returns a channel used to synchronize the routine with caller.
|
||||
func CreateEchoListener(t testing.TB, nodeA *Core, bufLen int, repeats int) chan struct{} {
|
||||
// Listen. Doing it here guarantees that there will be something to try to connect when it returns.
|
||||
listener, err := nodeA.ConnListen()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Start routine
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer listener.Close()
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
buf := make([]byte, bufLen)
|
||||
|
||||
for i := 0; i < repeats; i++ {
|
||||
n, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if n != bufLen {
|
||||
t.Error("missing data")
|
||||
return
|
||||
}
|
||||
_, err = conn.Write(buf)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
return done
|
||||
}
|
||||
|
||||
// TestCore_Start_Connect checks if two nodes can connect together.
|
||||
func TestCore_Start_Connect(t *testing.T) {
|
||||
CreateAndConnectTwo(t, true)
|
||||
}
|
||||
|
||||
// TestCore_Start_Transfer checks that messages can be passed between nodes (in both directions).
|
||||
func TestCore_Start_Transfer(t *testing.T) {
|
||||
nodeA, nodeB := CreateAndConnectTwo(t, true)
|
||||
|
||||
msgLen := 1500
|
||||
done := CreateEchoListener(t, nodeA, msgLen, 1)
|
||||
|
||||
if !WaitConnected(nodeA, nodeB) {
|
||||
t.Fatal("nodes did not connect")
|
||||
}
|
||||
|
||||
// Dial
|
||||
dialer, err := nodeB.ConnDialer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn, err := dialer.Dial("nodeid", nodeA.NodeID().String())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
msg := make([]byte, msgLen)
|
||||
rand.Read(msg)
|
||||
conn.Write(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
buf := make([]byte, msgLen)
|
||||
_, err = conn.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(msg, buf) != 0 {
|
||||
t.Fatal("expected echo")
|
||||
}
|
||||
<-done
|
||||
}
|
||||
|
||||
// BenchmarkCore_Start_Transfer estimates the possible transfer between nodes (in MB/s).
|
||||
func BenchmarkCore_Start_Transfer(b *testing.B) {
|
||||
nodeA, nodeB := CreateAndConnectTwo(b, false)
|
||||
|
||||
msgLen := 1500 // typical MTU
|
||||
done := CreateEchoListener(b, nodeA, msgLen, b.N)
|
||||
|
||||
if !WaitConnected(nodeA, nodeB) {
|
||||
b.Fatal("nodes did not connect")
|
||||
}
|
||||
|
||||
// Dial
|
||||
dialer, err := nodeB.ConnDialer()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
conn, err := dialer.Dial("nodeid", nodeA.NodeID().String())
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
msg := make([]byte, msgLen)
|
||||
rand.Read(msg)
|
||||
buf := make([]byte, msgLen)
|
||||
|
||||
b.SetBytes(int64(b.N * msgLen))
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
conn.Write(msg)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
_, err = conn.Read(buf)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
<-done
|
||||
}
|
33
src/core/debug.go
Normal file
33
src/core/debug.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
// +build debug
|
||||
|
||||
package core
|
||||
|
||||
import "fmt"
|
||||
|
||||
import _ "net/http/pprof"
|
||||
import "net/http"
|
||||
import "runtime"
|
||||
import "os"
|
||||
|
||||
import "github.com/gologme/log"
|
||||
|
||||
// Start the profiler in debug builds, if the required environment variable is set.
|
||||
func init() {
|
||||
envVarName := "PPROFLISTEN"
|
||||
hostPort := os.Getenv(envVarName)
|
||||
switch {
|
||||
case hostPort == "":
|
||||
fmt.Fprintf(os.Stderr, "DEBUG: %s not set, profiler not started.\n", envVarName)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "DEBUG: Starting pprof on %s\n", hostPort)
|
||||
go func() { fmt.Println(http.ListenAndServe(hostPort, nil)) }()
|
||||
}
|
||||
}
|
||||
|
||||
// Starts the function profiler. This is only supported when built with
|
||||
// '-tags build'.
|
||||
func StartProfiler(log *log.Logger) error {
|
||||
runtime.SetBlockProfileRate(1)
|
||||
go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }()
|
||||
return nil
|
||||
}
|
176
src/core/doc.go
Normal file
176
src/core/doc.go
Normal file
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
Package core implements the core functionality of the Yggdrasil Network.
|
||||
|
||||
Introduction
|
||||
|
||||
Yggdrasil is a proof-of-concept mesh network which provides end-to-end encrypted
|
||||
communication between nodes in a decentralised fashion. The network is arranged
|
||||
using a globally-agreed spanning tree which provides each node with a locator
|
||||
(coordinates relative to the root) and a distributed hash table (DHT) mechanism
|
||||
for finding other nodes.
|
||||
|
||||
Each node also implements a router, which is responsible for encryption of
|
||||
traffic, searches and connections, and a switch, which is responsible ultimately
|
||||
for forwarding traffic across the network.
|
||||
|
||||
While many Yggdrasil nodes in existence today are IP nodes - that is, they are
|
||||
transporting IPv6 packets, like a kind of mesh VPN - it is also possible to
|
||||
integrate Yggdrasil into your own applications and use it as a generic data
|
||||
transport, similar to UDP.
|
||||
|
||||
This library is what you need to integrate and use Yggdrasil in your own
|
||||
application.
|
||||
|
||||
Basics
|
||||
|
||||
In order to start an Yggdrasil node, you should start by generating node
|
||||
configuration, which amongst other things, includes encryption keypairs which
|
||||
are used to generate the node's identity, and supply a logger which Yggdrasil's
|
||||
output will be written to.
|
||||
|
||||
This may look something like this:
|
||||
|
||||
import (
|
||||
"os"
|
||||
"github.com/gologme/log"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/core"
|
||||
)
|
||||
|
||||
type node struct {
|
||||
core core.Core
|
||||
config *config.NodeConfig
|
||||
log *log.Logger
|
||||
}
|
||||
|
||||
You then can supply node configuration and a logger:
|
||||
|
||||
n := node{}
|
||||
n.log = log.New(os.Stdout, "", log.Flags())
|
||||
n.config = config.GenerateConfig()
|
||||
|
||||
In the above example, we ask the config package to supply new configuration each
|
||||
time, which results in fresh encryption keys and therefore a new identity. It is
|
||||
normally preferable in most cases to persist node configuration onto the
|
||||
filesystem or into some configuration store so that the node's identity does not
|
||||
change each time that the program starts. Note that Yggdrasil will automatically
|
||||
fill in any missing configuration items with sane defaults.
|
||||
|
||||
Once you have supplied a logger and some node configuration, you can then start
|
||||
the node:
|
||||
|
||||
n.core.Start(n.config, n.log)
|
||||
|
||||
Add some peers to connect to the network:
|
||||
|
||||
n.core.AddPeer("tcp://some-host.net:54321", "")
|
||||
n.core.AddPeer("tcp://[2001::1:2:3]:54321", "")
|
||||
n.core.AddPeer("tcp://1.2.3.4:54321", "")
|
||||
|
||||
You can also ask the API for information about our node:
|
||||
|
||||
n.log.Println("My node ID is", n.core.NodeID())
|
||||
n.log.Println("My public key is", n.core.EncryptionPublicKey())
|
||||
n.log.Println("My coords are", n.core.Coords())
|
||||
|
||||
Incoming Connections
|
||||
|
||||
Once your node is started, you can then listen for connections from other nodes
|
||||
by asking the API for a Listener:
|
||||
|
||||
listener, err := n.core.ConnListen()
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
The Listener has a blocking Accept function which will wait for incoming
|
||||
connections from remote nodes. It will return a Conn when a connection is
|
||||
received. If the node never receives any incoming connections then this function
|
||||
can block forever, so be prepared for that, perhaps by listening in a separate
|
||||
goroutine.
|
||||
|
||||
Assuming that you have defined a myConnectionHandler function to deal with
|
||||
incoming connections:
|
||||
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
// We've got a new connection
|
||||
go myConnectionHandler(conn)
|
||||
}
|
||||
|
||||
Outgoing Connections
|
||||
|
||||
If you know the node ID of the remote node that you want to talk to, you can
|
||||
dial an outbound connection to it. To do this, you should first ask the API for
|
||||
a Dialer:
|
||||
|
||||
dialer, err := n.core.ConnDialer()
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
You can then dial using the node's public key in hexadecimal format, for example:
|
||||
|
||||
conn, err := dialer.Dial("curve25519", "55071be281f50d0abbda63aadc59755624280c44b2f1f47684317aa4e0325604")
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
Using Connections
|
||||
|
||||
Conn objects are implementations of io.ReadWriteCloser, and as such, you can
|
||||
Read, Write and Close them as necessary.
|
||||
|
||||
Each Read or Write operation can deal with a buffer with a maximum size of 65535
|
||||
bytes - any bigger than this and the operation will return an error.
|
||||
|
||||
For example, to write to the Conn from the supplied buffer:
|
||||
|
||||
buf := []byte{1, 2, 3, 4, 5}
|
||||
w, err := conn.Write(buf)
|
||||
if err != nil {
|
||||
// ...
|
||||
} else {
|
||||
// written w bytes
|
||||
}
|
||||
|
||||
Reading from the Conn into the supplied buffer:
|
||||
|
||||
buf := make([]byte, 65535)
|
||||
r, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
// ...
|
||||
} else {
|
||||
// read r bytes
|
||||
}
|
||||
|
||||
When you are happy that a connection is no longer required, you can discard it:
|
||||
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
Limitations
|
||||
|
||||
You should be aware of the following limitations when working with the Yggdrasil
|
||||
library:
|
||||
|
||||
Individual messages written through Yggdrasil connections can not exceed 65535
|
||||
bytes in size. Yggdrasil has no concept of fragmentation, so if you try to send
|
||||
a message that exceeds 65535 bytes in size, it will be dropped altogether and
|
||||
an error will be returned.
|
||||
|
||||
Yggdrasil connections are unreliable by nature. Messages are delivered on a
|
||||
best-effort basis, and employs congestion control where appropriate to ensure
|
||||
that congestion does not affect message transport, but Yggdrasil will not
|
||||
retransmit any messages that have been lost. If reliable delivery is important
|
||||
then you should manually implement acknowledgement and retransmission of
|
||||
messages.
|
||||
|
||||
*/
|
||||
package core
|
283
src/core/link.go
Normal file
283
src/core/link.go
Normal file
|
@ -0,0 +1,283 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
//"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
"golang.org/x/net/proxy"
|
||||
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
|
||||
)
|
||||
|
||||
type keyArray [ed25519.PublicKeySize]byte
|
||||
|
||||
type links struct {
|
||||
core *Core
|
||||
mutex sync.RWMutex // protects links below
|
||||
links map[linkInfo]*link
|
||||
tcp tcp // TCP interface support
|
||||
stopped chan struct{}
|
||||
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
||||
}
|
||||
|
||||
// linkInfo is used as a map key
|
||||
type linkInfo struct {
|
||||
key keyArray
|
||||
linkType string // Type of link, e.g. TCP, AWDL
|
||||
local string // Local name or address
|
||||
remote string // Remote name or address
|
||||
}
|
||||
|
||||
type link struct {
|
||||
lname string
|
||||
links *links
|
||||
conn net.Conn
|
||||
options linkOptions
|
||||
info linkInfo
|
||||
incoming bool
|
||||
force bool
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
type linkOptions struct {
|
||||
pinnedEd25519Keys map[keyArray]struct{}
|
||||
}
|
||||
|
||||
func (l *links) init(c *Core) error {
|
||||
l.core = c
|
||||
l.mutex.Lock()
|
||||
l.links = make(map[linkInfo]*link)
|
||||
l.mutex.Unlock()
|
||||
l.stopped = make(chan struct{})
|
||||
|
||||
if err := l.tcp.init(l); err != nil {
|
||||
c.log.Errorln("Failed to start TCP interface")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *links) reconfigure() {
|
||||
l.tcp.reconfigure()
|
||||
}
|
||||
|
||||
func (l *links) call(uri string, sintf string) error {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
|
||||
}
|
||||
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||
tcpOpts := tcpOptions{}
|
||||
if pubkeys, ok := u.Query()["ed25519"]; ok && len(pubkeys) > 0 {
|
||||
tcpOpts.pinnedEd25519Keys = make(map[keyArray]struct{})
|
||||
for _, pubkey := range pubkeys {
|
||||
if sigPub, err := hex.DecodeString(pubkey); err == nil {
|
||||
var sigPubKey keyArray
|
||||
copy(sigPubKey[:], sigPub)
|
||||
tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
l.tcp.call(u.Host, tcpOpts, sintf)
|
||||
case "socks":
|
||||
tcpOpts.socksProxyAddr = u.Host
|
||||
if u.User != nil {
|
||||
tcpOpts.socksProxyAuth = &proxy.Auth{}
|
||||
tcpOpts.socksProxyAuth.User = u.User.Username()
|
||||
tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
|
||||
}
|
||||
l.tcp.call(pathtokens[0], tcpOpts, sintf)
|
||||
case "tls":
|
||||
tcpOpts.upgrade = l.tcp.tls.forDialer
|
||||
l.tcp.call(u.Host, tcpOpts, sintf)
|
||||
default:
|
||||
return errors.New("unknown call scheme: " + u.Scheme)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *links) listen(uri string) error {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err)
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
_, err := l.tcp.listen(u.Host, nil)
|
||||
return err
|
||||
case "tls":
|
||||
_, err := l.tcp.listen(u.Host, l.tcp.tls.forListener)
|
||||
return err
|
||||
default:
|
||||
return errors.New("unknown listen scheme: " + u.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *links) create(conn net.Conn, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
|
||||
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
|
||||
intf := link{
|
||||
conn: conn,
|
||||
lname: name,
|
||||
links: l,
|
||||
options: options,
|
||||
info: linkInfo{
|
||||
linkType: linkType,
|
||||
local: local,
|
||||
remote: remote,
|
||||
},
|
||||
incoming: incoming,
|
||||
force: force,
|
||||
}
|
||||
return &intf, nil
|
||||
}
|
||||
|
||||
func (l *links) stop() error {
|
||||
close(l.stopped)
|
||||
if err := l.tcp.stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (intf *link) handler() (chan struct{}, error) {
|
||||
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
|
||||
defer intf.conn.Close()
|
||||
meta := version_getBaseMetadata()
|
||||
meta.key = intf.links.core.public
|
||||
metaBytes := meta.encode()
|
||||
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
|
||||
var err error
|
||||
if !util.FuncTimeout(30*time.Second, func() {
|
||||
var n int
|
||||
n, err = intf.conn.Write(metaBytes)
|
||||
if err == nil && n != len(metaBytes) {
|
||||
err = errors.New("incomplete metadata send")
|
||||
}
|
||||
}) {
|
||||
return nil, errors.New("timeout on metadata send")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !util.FuncTimeout(30*time.Second, func() {
|
||||
var n int
|
||||
n, err = io.ReadFull(intf.conn, metaBytes)
|
||||
if err == nil && n != len(metaBytes) {
|
||||
err = errors.New("incomplete metadata recv")
|
||||
}
|
||||
}) {
|
||||
return nil, errors.New("timeout on metadata recv")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
meta = version_metadata{}
|
||||
base := version_getBaseMetadata()
|
||||
if !meta.decode(metaBytes) {
|
||||
return nil, errors.New("failed to decode metadata")
|
||||
}
|
||||
if !meta.check() {
|
||||
intf.links.core.log.Errorf("Failed to connect to node: %s is incompatible version (local %s, remote %s)",
|
||||
intf.lname,
|
||||
fmt.Sprintf("%d.%d", base.ver, base.minorVer),
|
||||
fmt.Sprintf("%d.%d", meta.ver, meta.minorVer),
|
||||
)
|
||||
return nil, errors.New("remote node is incompatible version")
|
||||
}
|
||||
// Check if the remote side matches the keys we expected. This is a bit of a weak
|
||||
// check - in future versions we really should check a signature or something like that.
|
||||
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
|
||||
var key keyArray
|
||||
copy(key[:], meta.key)
|
||||
if _, allowed := pinned[key]; !allowed {
|
||||
intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
|
||||
return nil, fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
|
||||
}
|
||||
}
|
||||
// Check if we're authorized to connect to this key / IP
|
||||
allowed := intf.links.core.config.GetCurrent().AllowedPublicKeys
|
||||
isallowed := len(allowed) == 0
|
||||
for _, k := range allowed {
|
||||
if k == hex.EncodeToString(meta.key) { // TODO: this is yuck
|
||||
isallowed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if intf.incoming && !intf.force && !isallowed {
|
||||
intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
|
||||
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.key))
|
||||
intf.close()
|
||||
return nil, nil
|
||||
}
|
||||
// Check if we already have a link to this node
|
||||
copy(intf.info.key[:], meta.key)
|
||||
intf.links.mutex.Lock()
|
||||
if oldIntf, isIn := intf.links.links[intf.info]; isIn {
|
||||
intf.links.mutex.Unlock()
|
||||
// FIXME we should really return an error and let the caller block instead
|
||||
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
|
||||
intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name)
|
||||
return oldIntf.closed, nil
|
||||
} else {
|
||||
intf.closed = make(chan struct{})
|
||||
intf.links.links[intf.info] = intf
|
||||
defer func() {
|
||||
intf.links.mutex.Lock()
|
||||
delete(intf.links.links, intf.info)
|
||||
intf.links.mutex.Unlock()
|
||||
close(intf.closed)
|
||||
}()
|
||||
intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name)
|
||||
}
|
||||
intf.links.mutex.Unlock()
|
||||
themAddr := address.AddrForKey(ed25519.PublicKey(intf.info.key[:]))
|
||||
themAddrString := net.IP(themAddr[:]).String()
|
||||
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
||||
intf.links.core.log.Infof("Connected %s: %s, source %s",
|
||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||
// Run the handler
|
||||
err = intf.links.core.PacketConn.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
|
||||
// TODO don't report an error if it's just a 'use of closed network connection'
|
||||
if err != nil {
|
||||
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
||||
} else {
|
||||
intf.links.core.log.Infof("Disconnected %s: %s, source %s",
|
||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (intf *link) close() {
|
||||
intf.conn.Close()
|
||||
}
|
||||
|
||||
func (intf *link) name() string {
|
||||
return intf.lname
|
||||
}
|
||||
|
||||
func (intf *link) local() string {
|
||||
return intf.info.local
|
||||
}
|
||||
|
||||
func (intf *link) remote() string {
|
||||
return intf.info.remote
|
||||
}
|
||||
|
||||
func (intf *link) interfaceType() string {
|
||||
return intf.info.linkType
|
||||
}
|
433
src/core/tcp.go
Normal file
433
src/core/tcp.go
Normal file
|
@ -0,0 +1,433 @@
|
|||
package core
|
||||
|
||||
// This sends packets to peers using TCP as a transport
|
||||
// It's generally better tested than the UDP implementation
|
||||
// Using it regularly is insane, but I find TCP easier to test/debug with it
|
||||
// Updating and optimizing the UDP version is a higher priority
|
||||
|
||||
// TODO:
|
||||
// Something needs to make sure we're getting *valid* packets
|
||||
// Could be used to DoS (connect, give someone else's keys, spew garbage)
|
||||
// I guess the "peer" part should watch for link packets, disconnect?
|
||||
|
||||
// TCP connections start with a metadata exchange.
|
||||
// It involves exchanging version numbers and crypto keys
|
||||
// See version.go for version metadata format
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/proxy"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
)
|
||||
|
||||
const default_timeout = 6 * time.Second
|
||||
const tcp_ping_interval = (default_timeout * 2 / 3)
|
||||
|
||||
// The TCP listener and information about active TCP connections, to avoid duplication.
|
||||
type tcp struct {
|
||||
links *links
|
||||
waitgroup sync.WaitGroup
|
||||
mutex sync.Mutex // Protecting the below
|
||||
listeners map[string]*TcpListener
|
||||
calls map[string]struct{}
|
||||
conns map[linkInfo](chan struct{})
|
||||
tls tcptls
|
||||
}
|
||||
|
||||
// TcpListener is a stoppable TCP listener interface. These are typically
|
||||
// returned from calls to the ListenTCP() function and are also used internally
|
||||
// to represent listeners created by the "Listen" configuration option and for
|
||||
// multicast interfaces.
|
||||
type TcpListener struct {
|
||||
Listener net.Listener
|
||||
upgrade *TcpUpgrade
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
type TcpUpgrade struct {
|
||||
upgrade func(c net.Conn) (net.Conn, error)
|
||||
name string
|
||||
}
|
||||
|
||||
type tcpOptions struct {
|
||||
linkOptions
|
||||
upgrade *TcpUpgrade
|
||||
socksProxyAddr string
|
||||
socksProxyAuth *proxy.Auth
|
||||
socksPeerAddr string
|
||||
}
|
||||
|
||||
func (l *TcpListener) Stop() {
|
||||
defer func() { recover() }()
|
||||
close(l.stop)
|
||||
}
|
||||
|
||||
// Wrapper function to set additional options for specific connection types.
|
||||
func (t *tcp) setExtraOptions(c net.Conn) {
|
||||
switch sock := c.(type) {
|
||||
case *net.TCPConn:
|
||||
sock.SetNoDelay(true)
|
||||
// TODO something for socks5
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the address of the listener.
|
||||
func (t *tcp) getAddr() *net.TCPAddr {
|
||||
// TODO: Fix this, because this will currently only give a single address
|
||||
// to multicast.go, which obviously is not great, but right now multicast.go
|
||||
// doesn't have the ability to send more than one address in a packet either
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
for _, l := range t.listeners {
|
||||
return l.Listener.Addr().(*net.TCPAddr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initializes the struct.
|
||||
func (t *tcp) init(l *links) error {
|
||||
t.links = l
|
||||
t.tls.init(t)
|
||||
t.mutex.Lock()
|
||||
t.calls = make(map[string]struct{})
|
||||
t.conns = make(map[linkInfo](chan struct{}))
|
||||
t.listeners = make(map[string]*TcpListener)
|
||||
t.mutex.Unlock()
|
||||
|
||||
t.links.core.config.Mutex.RLock()
|
||||
defer t.links.core.config.Mutex.RUnlock()
|
||||
for _, listenaddr := range t.links.core.config.Current.Listen {
|
||||
switch listenaddr[:6] {
|
||||
case "tcp://":
|
||||
if _, err := t.listen(listenaddr[6:], nil); err != nil {
|
||||
return err
|
||||
}
|
||||
case "tls://":
|
||||
if _, err := t.listen(listenaddr[6:], t.tls.forListener); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
t.links.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tcp) stop() error {
|
||||
t.mutex.Lock()
|
||||
for _, listener := range t.listeners {
|
||||
listener.Stop()
|
||||
}
|
||||
t.mutex.Unlock()
|
||||
t.waitgroup.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tcp) reconfigure() {
|
||||
t.links.core.config.Mutex.RLock()
|
||||
added := util.Difference(t.links.core.config.Current.Listen, t.links.core.config.Previous.Listen)
|
||||
deleted := util.Difference(t.links.core.config.Previous.Listen, t.links.core.config.Current.Listen)
|
||||
t.links.core.config.Mutex.RUnlock()
|
||||
if len(added) > 0 || len(deleted) > 0 {
|
||||
for _, a := range added {
|
||||
switch a[:6] {
|
||||
case "tcp://":
|
||||
if _, err := t.listen(a[6:], nil); err != nil {
|
||||
t.links.core.log.Errorln("Error adding TCP", a[6:], "listener:", err)
|
||||
}
|
||||
case "tls://":
|
||||
if _, err := t.listen(a[6:], t.tls.forListener); err != nil {
|
||||
t.links.core.log.Errorln("Error adding TLS", a[6:], "listener:", err)
|
||||
}
|
||||
default:
|
||||
t.links.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring")
|
||||
}
|
||||
}
|
||||
for _, d := range deleted {
|
||||
if d[:6] != "tcp://" && d[:6] != "tls://" {
|
||||
t.links.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring")
|
||||
continue
|
||||
}
|
||||
t.mutex.Lock()
|
||||
if listener, ok := t.listeners[d[6:]]; ok {
|
||||
t.mutex.Unlock()
|
||||
listener.Stop()
|
||||
t.links.core.log.Infoln("Stopped TCP listener:", d[6:])
|
||||
} else {
|
||||
t.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tcp) listen(listenaddr string, upgrade *TcpUpgrade) (*TcpListener, error) {
|
||||
var err error
|
||||
|
||||
ctx := context.Background()
|
||||
lc := net.ListenConfig{
|
||||
Control: t.tcpContext,
|
||||
}
|
||||
listener, err := lc.Listen(ctx, "tcp", listenaddr)
|
||||
if err == nil {
|
||||
l := TcpListener{
|
||||
Listener: listener,
|
||||
upgrade: upgrade,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
t.waitgroup.Add(1)
|
||||
go t.listener(&l, listenaddr)
|
||||
return &l, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Runs the listener, which spawns off goroutines for incoming connections.
|
||||
func (t *tcp) listener(l *TcpListener, listenaddr string) {
|
||||
defer t.waitgroup.Done()
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
// Track the listener so that we can find it again in future
|
||||
t.mutex.Lock()
|
||||
if _, isIn := t.listeners[listenaddr]; isIn {
|
||||
t.mutex.Unlock()
|
||||
l.Listener.Close()
|
||||
return
|
||||
}
|
||||
t.listeners[listenaddr] = l
|
||||
t.mutex.Unlock()
|
||||
// And here we go!
|
||||
defer func() {
|
||||
t.links.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String())
|
||||
l.Listener.Close()
|
||||
t.mutex.Lock()
|
||||
delete(t.listeners, listenaddr)
|
||||
t.mutex.Unlock()
|
||||
}()
|
||||
t.links.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String())
|
||||
go func() {
|
||||
<-l.stop
|
||||
l.Listener.Close()
|
||||
}()
|
||||
defer l.Stop()
|
||||
for {
|
||||
sock, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
t.links.core.log.Errorln("Failed to accept connection:", err)
|
||||
select {
|
||||
case <-l.stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
time.Sleep(time.Second) // So we don't busy loop
|
||||
continue
|
||||
}
|
||||
t.waitgroup.Add(1)
|
||||
options := tcpOptions{
|
||||
upgrade: l.upgrade,
|
||||
}
|
||||
go t.handler(sock, true, options)
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if we already are calling this address
|
||||
func (t *tcp) startCalling(saddr string) bool {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
_, isIn := t.calls[saddr]
|
||||
t.calls[saddr] = struct{}{}
|
||||
return !isIn
|
||||
}
|
||||
|
||||
// Checks if a connection already exists.
|
||||
// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address.
|
||||
// If the dial is successful, it launches the handler.
|
||||
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
|
||||
// This all happens in a separate goroutine that it spawns.
|
||||
func (t *tcp) call(saddr string, options tcpOptions, sintf string) {
|
||||
go func() {
|
||||
callname := saddr
|
||||
callproto := "TCP"
|
||||
if options.upgrade != nil {
|
||||
callproto = strings.ToUpper(options.upgrade.name)
|
||||
}
|
||||
if sintf != "" {
|
||||
callname = fmt.Sprintf("%s/%s/%s", callproto, saddr, sintf)
|
||||
}
|
||||
if !t.startCalling(callname) {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
// Block new calls for a little while, to mitigate livelock scenarios
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
delay := default_timeout + time.Duration(rand.Intn(10000))*time.Millisecond
|
||||
time.Sleep(delay)
|
||||
t.mutex.Lock()
|
||||
delete(t.calls, callname)
|
||||
t.mutex.Unlock()
|
||||
}()
|
||||
var conn net.Conn
|
||||
var err error
|
||||
if options.socksProxyAddr != "" {
|
||||
if sintf != "" {
|
||||
return
|
||||
}
|
||||
dialerdst, er := net.ResolveTCPAddr("tcp", options.socksProxyAddr)
|
||||
if er != nil {
|
||||
return
|
||||
}
|
||||
var dialer proxy.Dialer
|
||||
dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), options.socksProxyAuth, proxy.Direct)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
conn, err = dialer.Dial("tcp", saddr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
t.waitgroup.Add(1)
|
||||
options.socksPeerAddr = conn.RemoteAddr().String()
|
||||
if ch := t.handler(conn, false, options); ch != nil {
|
||||
<-ch
|
||||
}
|
||||
} else {
|
||||
dst, err := net.ResolveTCPAddr("tcp", saddr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if dst.IP.IsLinkLocalUnicast() {
|
||||
dst.Zone = sintf
|
||||
if dst.Zone == "" {
|
||||
return
|
||||
}
|
||||
}
|
||||
dialer := net.Dialer{
|
||||
Control: t.tcpContext,
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
if sintf != "" {
|
||||
dialer.Control = t.getControl(sintf)
|
||||
ief, err := net.InterfaceByName(sintf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if ief.Flags&net.FlagUp == 0 {
|
||||
return
|
||||
}
|
||||
addrs, err := ief.Addrs()
|
||||
if err == nil {
|
||||
for addrindex, addr := range addrs {
|
||||
src, _, err := net.ParseCIDR(addr.String())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if src.Equal(dst.IP) {
|
||||
continue
|
||||
}
|
||||
if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() {
|
||||
continue
|
||||
}
|
||||
bothglobal := src.IsGlobalUnicast() == dst.IP.IsGlobalUnicast()
|
||||
bothlinklocal := src.IsLinkLocalUnicast() == dst.IP.IsLinkLocalUnicast()
|
||||
if !bothglobal && !bothlinklocal {
|
||||
continue
|
||||
}
|
||||
if (src.To4() != nil) != (dst.IP.To4() != nil) {
|
||||
continue
|
||||
}
|
||||
if bothglobal || bothlinklocal || addrindex == len(addrs)-1 {
|
||||
dialer.LocalAddr = &net.TCPAddr{
|
||||
IP: src,
|
||||
Port: 0,
|
||||
Zone: sintf,
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if dialer.LocalAddr == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
conn, err = dialer.Dial("tcp", dst.String())
|
||||
if err != nil {
|
||||
t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err)
|
||||
return
|
||||
}
|
||||
t.waitgroup.Add(1)
|
||||
if ch := t.handler(conn, false, options); ch != nil {
|
||||
<-ch
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) chan struct{} {
|
||||
defer t.waitgroup.Done() // Happens after sock.close
|
||||
defer sock.Close()
|
||||
t.setExtraOptions(sock)
|
||||
var upgraded bool
|
||||
if options.upgrade != nil {
|
||||
var err error
|
||||
if sock, err = options.upgrade.upgrade(sock); err != nil {
|
||||
t.links.core.log.Errorln("TCP handler upgrade failed:", err)
|
||||
return nil
|
||||
}
|
||||
upgraded = true
|
||||
}
|
||||
var name, proto, local, remote string
|
||||
if options.socksProxyAddr != "" {
|
||||
name = "socks://" + sock.RemoteAddr().String() + "/" + options.socksPeerAddr
|
||||
proto = "socks"
|
||||
local, _, _ = net.SplitHostPort(sock.LocalAddr().String())
|
||||
remote, _, _ = net.SplitHostPort(options.socksPeerAddr)
|
||||
} else {
|
||||
if upgraded {
|
||||
proto = options.upgrade.name
|
||||
name = proto + "://" + sock.RemoteAddr().String()
|
||||
} else {
|
||||
proto = "tcp"
|
||||
name = proto + "://" + sock.RemoteAddr().String()
|
||||
}
|
||||
local, _, _ = net.SplitHostPort(sock.LocalAddr().String())
|
||||
remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
|
||||
}
|
||||
localIP := net.ParseIP(local)
|
||||
if localIP = localIP.To16(); localIP != nil {
|
||||
var laddr address.Address
|
||||
var lsubnet address.Subnet
|
||||
copy(laddr[:], localIP)
|
||||
copy(lsubnet[:], localIP)
|
||||
if laddr.IsValid() || lsubnet.IsValid() {
|
||||
// The local address is with the network address/prefix range
|
||||
// This would route ygg over ygg, which we don't want
|
||||
// FIXME ideally this check should happen outside of the core library
|
||||
// Maybe dial/listen at the application level
|
||||
// Then pass a net.Conn to the core library (after these kinds of checks are done)
|
||||
t.links.core.log.Debugln("Dropping ygg-tunneled connection", local, remote)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
|
||||
link, err := t.links.create(sock, name, proto, local, remote, incoming, force, options.linkOptions)
|
||||
if err != nil {
|
||||
t.links.core.log.Println(err)
|
||||
panic(err)
|
||||
}
|
||||
t.links.core.log.Debugln("DEBUG: starting handler for", name)
|
||||
ch, err := link.handler()
|
||||
t.links.core.log.Debugln("DEBUG: stopped handler for", name, err)
|
||||
return ch
|
||||
}
|
32
src/core/tcp_darwin.go
Normal file
32
src/core/tcp_darwin.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
// +build darwin
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
||||
|
||||
func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
||||
var control error
|
||||
var recvanyif error
|
||||
|
||||
control = c.Control(func(fd uintptr) {
|
||||
// sys/socket.h: #define SO_RECV_ANYIF 0x1104
|
||||
recvanyif = unix.SetsockoptInt(int(fd), syscall.SOL_SOCKET, 0x1104, 1)
|
||||
})
|
||||
|
||||
switch {
|
||||
case recvanyif != nil:
|
||||
return recvanyif
|
||||
default:
|
||||
return control
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error {
|
||||
return t.tcpContext
|
||||
}
|
45
src/core/tcp_linux.go
Normal file
45
src/core/tcp_linux.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
// +build linux
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
||||
|
||||
func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
||||
var control error
|
||||
var bbr error
|
||||
|
||||
control = c.Control(func(fd uintptr) {
|
||||
bbr = unix.SetsockoptString(int(fd), unix.IPPROTO_TCP, unix.TCP_CONGESTION, "bbr")
|
||||
})
|
||||
|
||||
// Log any errors
|
||||
if bbr != nil {
|
||||
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr)
|
||||
}
|
||||
if control != nil {
|
||||
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control)
|
||||
}
|
||||
|
||||
// Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error {
|
||||
return func(network, address string, c syscall.RawConn) error {
|
||||
var err error
|
||||
btd := func(fd uintptr) {
|
||||
err = unix.BindToDevice(int(fd), sintf)
|
||||
}
|
||||
c.Control(btd)
|
||||
if err != nil {
|
||||
t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf)
|
||||
}
|
||||
return t.tcpContext(network, address, c)
|
||||
}
|
||||
}
|
17
src/core/tcp_other.go
Normal file
17
src/core/tcp_other.go
Normal file
|
@ -0,0 +1,17 @@
|
|||
// +build !darwin,!linux
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
||||
|
||||
func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error {
|
||||
return t.tcpContext
|
||||
}
|
93
src/core/tls.go
Normal file
93
src/core/tls.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ed25519"
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/hex"
|
||||
"encoding/pem"
|
||||
"log"
|
||||
"math/big"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
type tcptls struct {
|
||||
tcp *tcp
|
||||
config *tls.Config
|
||||
forDialer *TcpUpgrade
|
||||
forListener *TcpUpgrade
|
||||
}
|
||||
|
||||
func (t *tcptls) init(tcp *tcp) {
|
||||
t.tcp = tcp
|
||||
t.forDialer = &TcpUpgrade{
|
||||
upgrade: t.upgradeDialer,
|
||||
name: "tls",
|
||||
}
|
||||
t.forListener = &TcpUpgrade{
|
||||
upgrade: t.upgradeListener,
|
||||
name: "tls",
|
||||
}
|
||||
|
||||
edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize)
|
||||
copy(edpriv[:], tcp.links.core.secret[:])
|
||||
|
||||
certBuf := &bytes.Buffer{}
|
||||
|
||||
// TODO: because NotAfter is finite, we should add some mechanism to regenerate the certificate and restart the listeners periodically for nodes with very high uptimes. Perhaps regenerate certs and restart listeners every few months or so.
|
||||
pubtemp := x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
Subject: pkix.Name{
|
||||
CommonName: hex.EncodeToString(tcp.links.core.public[:]),
|
||||
},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
BasicConstraintsValid: true,
|
||||
}
|
||||
|
||||
derbytes, err := x509.CreateCertificate(rand.Reader, &pubtemp, &pubtemp, edpriv.Public(), edpriv)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create certificate: %s", err)
|
||||
}
|
||||
|
||||
if err := pem.Encode(certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derbytes}); err != nil {
|
||||
panic("failed to encode certificate into PEM")
|
||||
}
|
||||
|
||||
cpool := x509.NewCertPool()
|
||||
cpool.AppendCertsFromPEM(derbytes)
|
||||
|
||||
t.config = &tls.Config{
|
||||
RootCAs: cpool,
|
||||
Certificates: []tls.Certificate{
|
||||
{
|
||||
Certificate: [][]byte{derbytes},
|
||||
PrivateKey: edpriv,
|
||||
},
|
||||
},
|
||||
InsecureSkipVerify: true,
|
||||
MinVersion: tls.VersionTLS13,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tcptls) upgradeListener(c net.Conn) (net.Conn, error) {
|
||||
conn := tls.Server(c, t.config)
|
||||
if err := conn.Handshake(); err != nil {
|
||||
return c, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (t *tcptls) upgradeDialer(c net.Conn) (net.Conn, error) {
|
||||
conn := tls.Client(c, t.config)
|
||||
if err := conn.Handshake(); err != nil {
|
||||
return c, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
68
src/core/version.go
Normal file
68
src/core/version.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package core
|
||||
|
||||
// This file contains the version metadata struct
|
||||
// Used in the initial connection setup and key exchange
|
||||
// Some of this could arguably go in wire.go instead
|
||||
|
||||
import "crypto/ed25519"
|
||||
|
||||
// This is the version-specific metadata exchanged at the start of a connection.
|
||||
// It must always begin with the 4 bytes "meta" and a wire formatted uint64 major version number.
|
||||
// The current version also includes a minor version number, and the box/sig/link keys that need to be exchanged to open a connection.
|
||||
type version_metadata struct {
|
||||
meta [4]byte
|
||||
ver uint8 // 1 byte in this version
|
||||
// Everything after this point potentially depends on the version number, and is subject to change in future versions
|
||||
minorVer uint8 // 1 byte in this version
|
||||
key ed25519.PublicKey
|
||||
}
|
||||
|
||||
// Gets a base metadata with no keys set, but with the correct version numbers.
|
||||
func version_getBaseMetadata() version_metadata {
|
||||
return version_metadata{
|
||||
meta: [4]byte{'m', 'e', 't', 'a'},
|
||||
ver: 0,
|
||||
minorVer: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Gets the length of the metadata for this version, used to know how many bytes to read from the start of a connection.
|
||||
func version_getMetaLength() (mlen int) {
|
||||
mlen += 4 // meta
|
||||
mlen++ // ver, as long as it's < 127, which it is in this version
|
||||
mlen++ // minorVer, as long as it's < 127, which it is in this version
|
||||
mlen += ed25519.PublicKeySize // key
|
||||
return
|
||||
}
|
||||
|
||||
// Encodes version metadata into its wire format.
|
||||
func (m *version_metadata) encode() []byte {
|
||||
bs := make([]byte, 0, version_getMetaLength())
|
||||
bs = append(bs, m.meta[:]...)
|
||||
bs = append(bs, m.ver)
|
||||
bs = append(bs, m.minorVer)
|
||||
bs = append(bs, m.key[:]...)
|
||||
if len(bs) != version_getMetaLength() {
|
||||
panic("Inconsistent metadata length")
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
// Decodes version metadata from its wire format into the struct.
|
||||
func (m *version_metadata) decode(bs []byte) bool {
|
||||
if len(bs) != version_getMetaLength() {
|
||||
return false
|
||||
}
|
||||
offset := 0
|
||||
offset += copy(m.meta[:], bs[offset:])
|
||||
m.ver, offset = bs[offset], offset+1
|
||||
m.minorVer, offset = bs[offset], offset+1
|
||||
m.key = append([]byte(nil), bs[offset:]...)
|
||||
return true
|
||||
}
|
||||
|
||||
// Checks that the "meta" bytes and the version numbers are the expected values.
|
||||
func (m *version_metadata) check() bool {
|
||||
base := version_getBaseMetadata()
|
||||
return base.meta == m.meta && base.ver == m.ver && base.minorVer == m.minorVer
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue