mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-29 06:35:07 +03:00
Merge branch 'develop' of https://github.com/yggdrasil-network/yggdrasil-go into yggdrasil-v0.4.7-develop
This commit is contained in:
commit
35cc180647
20 changed files with 309 additions and 178 deletions
|
@ -7,83 +7,83 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestAddress_Address_IsValid(t *testing.T) {
|
||||
func (c *Core) TestAddress_Address_IsValid(t *testing.T) {
|
||||
var address Address
|
||||
rand.Read(address[:])
|
||||
|
||||
address[0] = 0
|
||||
|
||||
if address.IsValid() {
|
||||
if c.IsValidAddress(address) {
|
||||
t.Fatal("invalid address marked as valid")
|
||||
}
|
||||
|
||||
address[0] = 0xfd
|
||||
|
||||
if address.IsValid() {
|
||||
if c.IsValidAddress(address) {
|
||||
t.Fatal("invalid address marked as valid")
|
||||
}
|
||||
|
||||
address[0] = 0xfc
|
||||
|
||||
if !address.IsValid() {
|
||||
if !c.IsValidAddress(address) {
|
||||
t.Fatal("valid address marked as invalid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress_Subnet_IsValid(t *testing.T) {
|
||||
func (c *Core) TestAddress_Subnet_IsValid(t *testing.T) {
|
||||
var subnet Subnet
|
||||
rand.Read(subnet[:])
|
||||
|
||||
subnet[0] = 0
|
||||
|
||||
if subnet.IsValid() {
|
||||
if c.IsValidSubnet(subnet) {
|
||||
t.Fatal("invalid subnet marked as valid")
|
||||
}
|
||||
|
||||
subnet[0] = 0xfc
|
||||
|
||||
if subnet.IsValid() {
|
||||
if c.IsValidSubnet(subnet) {
|
||||
t.Fatal("invalid subnet marked as valid")
|
||||
}
|
||||
|
||||
subnet[0] = 0xfd
|
||||
|
||||
if !subnet.IsValid() {
|
||||
if !c.IsValidSubnet(subnet) {
|
||||
t.Fatal("valid subnet marked as invalid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress_AddrForKey(t *testing.T) {
|
||||
func (c *Core) TestAddress_AddrForKey(t *testing.T) {
|
||||
publicKey := ed25519.PublicKey{
|
||||
189, 186, 207, 216, 34, 64, 222, 61, 205, 18, 57, 36, 203, 181, 82, 86,
|
||||
251, 141, 171, 8, 170, 152, 227, 5, 82, 138, 184, 79, 65, 158, 110, 251,
|
||||
}
|
||||
|
||||
expectedAddress := Address{
|
||||
f, c, 0, 132, 138, 96, 79, 187, 126, 67, 132, 101, 219, 141, 182, 104, 149,
|
||||
0xfc, 0, 132, 138, 96, 79, 187, 126, 67, 132, 101, 219, 141, 182, 104, 149,
|
||||
}
|
||||
|
||||
if *AddrForKey(publicKey) != expectedAddress {
|
||||
if *c.AddrForKey(publicKey) != expectedAddress {
|
||||
t.Fatal("invalid address returned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress_SubnetForKey(t *testing.T) {
|
||||
func (c *Core) TestAddress_SubnetForKey(t *testing.T) {
|
||||
publicKey := ed25519.PublicKey{
|
||||
189, 186, 207, 216, 34, 64, 222, 61, 205, 18, 57, 36, 203, 181, 82, 86,
|
||||
251, 141, 171, 8, 170, 152, 227, 5, 82, 138, 184, 79, 65, 158, 110, 251,
|
||||
}
|
||||
|
||||
expectedSubnet := Subnet{f, d, 0, 132, 138, 96, 79, 187, 126}
|
||||
expectedSubnet := Subnet{0xfd, 0, 132, 138, 96, 79, 187, 126}
|
||||
|
||||
if *SubnetForKey(publicKey) != expectedSubnet {
|
||||
if *c.SubnetForKey(publicKey) != expectedSubnet {
|
||||
t.Fatal("invalid subnet returned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress_Address_GetKey(t *testing.T) {
|
||||
func (c *Core) TestAddress_Address_GetKey(t *testing.T) {
|
||||
address := Address{
|
||||
f, c, 0, 132, 138, 96, 79, 187, 126, 67, 132, 101, 219, 141, 182, 104, 149,
|
||||
0xfc, 0, 132, 138, 96, 79, 187, 126, 67, 132, 101, 219, 141, 182, 104, 149,
|
||||
}
|
||||
|
||||
expectedPublicKey := ed25519.PublicKey{
|
||||
|
@ -93,13 +93,13 @@ func TestAddress_Address_GetKey(t *testing.T) {
|
|||
255, 255, 255, 255, 255, 255, 255, 255,
|
||||
}
|
||||
|
||||
if !bytes.Equal(address.GetKey(), expectedPublicKey) {
|
||||
if !bytes.Equal(c.GetAddressKey(address), expectedPublicKey) {
|
||||
t.Fatal("invalid public key returned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress_Subnet_GetKey(t *testing.T) {
|
||||
subnet := Subnet{f, d, 0, 132, 138, 96, 79, 187, 126}
|
||||
func (c *Core) TestAddress_Subnet_GetKey(t *testing.T) {
|
||||
subnet := Subnet{0xfd, 0, 132, 138, 96, 79, 187, 126}
|
||||
|
||||
expectedPublicKey := ed25519.PublicKey{
|
||||
189, 186, 207, 216, 34, 64, 255, 255,
|
||||
|
@ -108,7 +108,7 @@ func TestAddress_Subnet_GetKey(t *testing.T) {
|
|||
255, 255, 255, 255, 255, 255, 255, 255,
|
||||
}
|
||||
|
||||
if !bytes.Equal(subnet.GetKey(), expectedPublicKey) {
|
||||
if !bytes.Equal(c.GetSubnetKey(subnet), expectedPublicKey) {
|
||||
t.Fatal("invalid public key returned")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -188,21 +188,25 @@ func (c *Core) SetLogger(log Logger) {
|
|||
// This adds the peer to the peer list, so that they will be called again if the
|
||||
// connection drops.
|
||||
|
||||
func (c *Core) AddPeer(peer string, intf string) error {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
func (c *Core) AddPeer(uri string, sourceInterface string) error {
|
||||
var known bool
|
||||
phony.Block(c, func() {
|
||||
_, known = c.config._peers[Peer{uri, sourceInterface}]
|
||||
})
|
||||
if known {
|
||||
return fmt.Errorf("peer already configured")
|
||||
}
|
||||
u, err := url.Parse(peer)
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
c.log.Errorln("Failed to parse peer url:", peer, err)
|
||||
return err
|
||||
}
|
||||
if err := c.CallPeer(u, intf); err != nil {
|
||||
c.log.Errorln("Failed to add peer:", err)
|
||||
info, err := c.links.call(u, sourceInterface, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
phony.Block(c, func() {
|
||||
c.config._peers[Peer{uri, sourceInterface}] = &info
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -245,7 +249,7 @@ func (c *Core) RemovePeers() error {
|
|||
// This does not add the peer to the peer list, so if the connection drops, the
|
||||
// peer will not be called again automatically.
|
||||
func (c *Core) CallPeer(u *url.URL, sintf string) error {
|
||||
_, err := c.links.call(u, sintf)
|
||||
_, err := c.links.call(u, sintf, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
// The Core object represents the Mesh node. You should create a Core
|
||||
// object for each Mesh node you plan to run.
|
||||
type Core struct {
|
||||
address Address
|
||||
address Address
|
||||
// This is the main data structure that holds everything else for a node
|
||||
// We're going to keep our own copy of the provided config - that way we can
|
||||
// guarantee that it will be covered by the mutex
|
||||
|
@ -123,6 +123,13 @@ func (c *Core) _addPeerLoop() {
|
|||
})
|
||||
}
|
||||
|
||||
func (c *Core) RetryPeersNow() {
|
||||
if c.addPeerTimer != nil && !c.addPeerTimer.Stop() {
|
||||
<-c.addPeerTimer.C
|
||||
}
|
||||
c.Act(nil, c._addPeerLoop)
|
||||
}
|
||||
|
||||
// Stop shuts down the Mesh node.
|
||||
func (c *Core) Stop() {
|
||||
phony.Block(c, func() {
|
||||
|
|
107
src/core/link.go
107
src/core/link.go
|
@ -22,64 +22,69 @@ import (
|
|||
|
||||
// linkInfo is used as a map key
|
||||
type linkInfo struct {
|
||||
linkType string // Type of link, e.g. TCP, AWDL
|
||||
local string // Local name or address
|
||||
remote string // Remote name or address
|
||||
linkType string // Type of link, e.g. TCP, AWDL
|
||||
local string // Local name or address
|
||||
remote string // Remote name or address
|
||||
}
|
||||
|
||||
type linkDial struct {
|
||||
url *url.URL
|
||||
sintf string
|
||||
}
|
||||
|
||||
type link struct {
|
||||
lname string
|
||||
links *links
|
||||
conn *linkConn
|
||||
options linkOptions
|
||||
info linkInfo
|
||||
incoming bool
|
||||
force bool
|
||||
lname string
|
||||
links *links
|
||||
conn *linkConn
|
||||
options linkOptions
|
||||
info linkInfo
|
||||
incoming bool
|
||||
force bool
|
||||
}
|
||||
|
||||
type linkOptions struct {
|
||||
pinnedEd25519Keys map[keyArray]struct{}
|
||||
pinnedEd25519Keys map[keyArray]struct{}
|
||||
priority uint8
|
||||
}
|
||||
|
||||
type Listener struct {
|
||||
net.Listener
|
||||
closed chan struct{}
|
||||
net.Listener
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
func (l *Listener) Close() error {
|
||||
err := l.Listener.Close()
|
||||
<-l.closed
|
||||
return err
|
||||
err := l.Listener.Close()
|
||||
<-l.closed
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *links) shutdown() {
|
||||
phony.Block(l.tcp, func() {
|
||||
for l := range l.tcp._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
phony.Block(l.tls, func() {
|
||||
for l := range l.tls._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
phony.Block(l.unix, func() {
|
||||
for l := range l.unix._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
phony.Block(l.tcp, func() {
|
||||
for l := range l.tcp._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
phony.Block(l.tls, func() {
|
||||
for l := range l.tls._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
phony.Block(l.unix, func() {
|
||||
for l := range l.unix._listeners {
|
||||
_ = l.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (l *links) isConnectedTo(info linkInfo) bool {
|
||||
var isConnected bool
|
||||
phony.Block(l, func() {
|
||||
_, isConnected = l._links[info]
|
||||
})
|
||||
return isConnected
|
||||
var isConnected bool
|
||||
phony.Block(l, func() {
|
||||
_, isConnected = l._links[info]
|
||||
})
|
||||
return isConnected
|
||||
}
|
||||
|
||||
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
|
||||
func (l *links) create(conn net.Conn, dial *linkDial, name string, info linkInfo, incoming, force bool, options linkOptions) error {
|
||||
intf := link{
|
||||
conn: &linkConn{
|
||||
Conn: conn,
|
||||
|
@ -93,14 +98,14 @@ func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, forc
|
|||
force: force,
|
||||
}
|
||||
go func() {
|
||||
if err := intf.handler(); err != nil {
|
||||
if err := intf.handler(dial); err != nil {
|
||||
l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (intf *link) handler() error {
|
||||
func (intf *link) handler(dial *linkDial) error {
|
||||
defer intf.conn.Close() // nolint:errcheck
|
||||
|
||||
// Don't connect to this link more than once.
|
||||
|
@ -203,6 +208,30 @@ func (intf *link) handler() error {
|
|||
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s; error: %s",
|
||||
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
|
||||
}
|
||||
|
||||
if !intf.incoming && dial != nil {
|
||||
// The connection was one that we dialled, so wait a second and try to
|
||||
// dial it again.
|
||||
var retry func(attempt int)
|
||||
retry = func(attempt int) {
|
||||
// intf.links.core.log.Infof("Retrying %s (attempt %d of 5)...", dial.url.String(), attempt)
|
||||
errch := make(chan error, 1)
|
||||
if _, err := intf.links.call(dial.url, dial.sintf, errch); err != nil {
|
||||
return
|
||||
}
|
||||
if err := <-errch; err != nil {
|
||||
if attempt < 3 {
|
||||
time.AfterFunc(time.Second, func() {
|
||||
retry(attempt + 1)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
time.AfterFunc(time.Second, func() {
|
||||
retry(1)
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,8 +6,8 @@ package core
|
|||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
@ -51,9 +51,12 @@ func (l *links) init(c *Core) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
|
||||
info := linkInfoFor(u.Scheme, sintf, u.Host)
|
||||
func (l *links) call(u *url.URL, sintf string, errch chan<- error) (info linkInfo, err error) {
|
||||
info = linkInfoFor(u.Scheme, sintf, u.Host)
|
||||
if l.isConnectedTo(info) {
|
||||
if errch != nil {
|
||||
close(errch) // already connected, no error
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
options := linkOptions{
|
||||
|
@ -62,6 +65,9 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
|
|||
for _, pubkey := range u.Query()["key"] {
|
||||
sigPub, err := hex.DecodeString(pubkey)
|
||||
if err != nil {
|
||||
if errch != nil {
|
||||
close(errch)
|
||||
}
|
||||
return info, fmt.Errorf("pinned key contains invalid hex characters")
|
||||
}
|
||||
var sigPubKey keyArray
|
||||
|
@ -71,6 +77,9 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
|
|||
if p := u.Query().Get("priority"); p != "" {
|
||||
pi, err := strconv.ParseUint(p, 10, 8)
|
||||
if err != nil {
|
||||
if errch != nil {
|
||||
close(errch)
|
||||
}
|
||||
return info, fmt.Errorf("priority invalid: %w", err)
|
||||
}
|
||||
options.priority = uint8(pi)
|
||||
|
@ -78,15 +87,27 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
|
|||
switch info.linkType {
|
||||
case "tcp":
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
case "socks":
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -109,30 +130,57 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
|
|||
}
|
||||
}
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
case "unix":
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
case "sctp":
|
||||
go func() {
|
||||
case "sctp":
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.sctp.dial(u, options, sintf); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial SCTP %s: %s\n", u.Host, err)
|
||||
}
|
||||
}()
|
||||
l.core.log.Warnf("Failed to dial SCTP %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
case "mpath":
|
||||
go func() {
|
||||
if errch != nil {
|
||||
defer close(errch)
|
||||
}
|
||||
if err := l.mpath.dial(u, options, sintf); err != nil && err != io.EOF {
|
||||
l.core.log.Warnf("Failed to dial MPATH %s: %s\n", u.Host, err)
|
||||
if errch != nil {
|
||||
errch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
default:
|
||||
if errch != nil {
|
||||
close(errch)
|
||||
}
|
||||
return info, errors.New("unknown call scheme: " + u.Scheme)
|
||||
}
|
||||
return info, nil
|
||||
|
@ -148,8 +196,8 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
|
|||
listener, err = l.tls.listen(u, sintf)
|
||||
case "unix":
|
||||
listener, err = l.unix.listen(u, sintf)
|
||||
case "sctp":
|
||||
listener, err = l.sctp.listen(u, sintf)
|
||||
case "sctp":
|
||||
listener, err = l.sctp.listen(u, sintf)
|
||||
case "mpath":
|
||||
listener, err = l.mpath.listen(u, sintf)
|
||||
default:
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/getlantern/multipath"
|
||||
|
||||
"github.com/Arceliar/phony"
|
||||
|
@ -39,8 +40,12 @@ func (l *linkMPATH) dial(url *url.URL, options linkOptions, sintf string) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
return l.handler(uri, info, conn, options, false, false)
|
||||
name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
sintf: sintf,
|
||||
}
|
||||
return l.handler(dial, name, info, conn, options, false, false)
|
||||
}
|
||||
|
||||
func (l *linkMPATH) listen(url *url.URL, sintf string) (*Listener, error) {
|
||||
|
@ -74,10 +79,10 @@ func (l *linkMPATH) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
cancel()
|
||||
break
|
||||
}
|
||||
addr := conn.RemoteAddr().(*net.TCPAddr)
|
||||
name := fmt.Sprintf("mpath://%s", addr)
|
||||
info := linkInfoFor("mpath", sintf, strings.SplitN(addr.IP.String(), "%", 2)[0])
|
||||
if err = l.handler(name, info, conn, linkOptions{}, true, addr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
raddr := conn.RemoteAddr().(*net.TCPAddr)
|
||||
name := fmt.Sprintf("mpath://%s", raddr)
|
||||
info := linkInfoFor("mpath", sintf, strings.SplitN(raddr.IP.String(), "%", 2)[0])
|
||||
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
l.core.log.Errorln("Failed to create inbound link:", err)
|
||||
}
|
||||
}
|
||||
|
@ -88,9 +93,10 @@ func (l *linkMPATH) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
return entry, nil
|
||||
}
|
||||
|
||||
func (l *linkMPATH) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
func (l *linkMPATH) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
return l.links.create(
|
||||
conn, // connection
|
||||
dial, // connection URL
|
||||
name, // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
|
@ -146,7 +152,7 @@ func (l *linkMPATH) connFor(url *url.URL, sinterfaces string) (net.Conn, error)
|
|||
if sinterfaces != "" {
|
||||
sintfarray := strings.Split(sinterfaces, ",")
|
||||
for _, dst := range remoteTargets {
|
||||
for _, sintf := range sintfarray {
|
||||
for _, sintf := range sintfarray {
|
||||
ief, err := net.InterfaceByName(sintf)
|
||||
if err != nil {
|
||||
l.core.log.Errorln("interface %s not found", sintf)
|
||||
|
|
|
@ -5,12 +5,12 @@ package core
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/Arceliar/phony"
|
||||
sctp "github.com/vikulin/sctp"
|
||||
|
@ -47,7 +47,7 @@ func (l *linkSCTP) dial(url *url.URL, options linkOptions, sintf string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
raddress := l.getAddress(dst.String()+":"+port)
|
||||
raddress := l.getAddress(dst.String() + ":" + port)
|
||||
var conn net.Conn
|
||||
laddress := l.getAddress("0.0.0.0:0")
|
||||
conn, err = sctp.NewSCTPConnection(laddress, laddress.AddressFamily, sctp.InitMsg{NumOstreams: 2, MaxInstreams: 2, MaxAttempts: 2, MaxInitTimeout: 5}, sctp.OneToOne, false)
|
||||
|
@ -63,7 +63,11 @@ func (l *linkSCTP) dial(url *url.URL, options linkOptions, sintf string) error {
|
|||
//l.core.log.Printf("Read buffer %d", rbuf)
|
||||
//l.core.log.Printf("Write buffer %d", wbuf)
|
||||
conn.(*sctp.SCTPConn).SetEvents(sctp.SCTP_EVENT_DATA_IO)
|
||||
return l.handler(url.String(), info, conn, options, false, false)
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
sintf: sintf,
|
||||
}
|
||||
return l.handler(dial, url.String(), info, conn, options, false, false)
|
||||
}
|
||||
|
||||
func (l *linkSCTP) listen(url *url.URL, sintf string) (*Listener, error) {
|
||||
|
@ -102,9 +106,9 @@ func (l *linkSCTP) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
}
|
||||
addr := conn.RemoteAddr().(*sctp.SCTPAddr)
|
||||
ips, err := json.Marshal(addr.IPAddrs)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
name := fmt.Sprintf("sctp://%s", ips)
|
||||
info := linkInfoFor("sctp", sintf, string(ips))
|
||||
//conn.(*sctp.SCTPConn).SetWriteBuffer(324288)
|
||||
|
@ -113,8 +117,8 @@ func (l *linkSCTP) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
rbuf, _ := conn.(*sctp.SCTPConn).GetReadBuffer()
|
||||
|
||||
l.core.log.Printf("Read buffer %d", rbuf)
|
||||
l.core.log.Printf("Write buffer %d", wbuf)
|
||||
if err = l.handler(name, info, conn, linkOptions{}, true, addr.IPAddrs[0].IP.IsLinkLocalUnicast()); err != nil {
|
||||
l.core.log.Printf("Write buffer %d", wbuf)
|
||||
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, addr.IPAddrs[0].IP.IsLinkLocalUnicast()); err != nil {
|
||||
l.core.log.Errorln("Failed to create inbound link:", err)
|
||||
}
|
||||
}
|
||||
|
@ -125,9 +129,10 @@ func (l *linkSCTP) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
return entry, nil
|
||||
}
|
||||
|
||||
func (l *linkSCTP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool, force bool) error {
|
||||
func (l *linkSCTP) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
return l.links.create(
|
||||
conn, // connection
|
||||
dial, // connection URL
|
||||
name, // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
|
@ -161,7 +166,7 @@ func (l *linkSCTP) getAddress(host string) *sctp.SCTPAddr {
|
|||
}
|
||||
for _, i := range strings.Split(ip, ",") {
|
||||
if a, err := net.ResolveIPAddr("ip", i); err == nil {
|
||||
fmt.Sprintf("Resolved address '%s' to %s", i, a)
|
||||
l.core.log.Printf("Resolved address '%s' to %s", i, a)
|
||||
ips = append(ips, *a)
|
||||
} else {
|
||||
l.core.log.Errorln("Error resolving address '%s': %v", i, err)
|
||||
|
|
|
@ -37,16 +37,20 @@ func (l *linkSOCKS) dial(url *url.URL, options linkOptions) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return l.handler(url.String(), info, conn, options, false)
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
}
|
||||
return l.handler(dial, info, conn, options, false)
|
||||
}
|
||||
|
||||
func (l *linkSOCKS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
|
||||
func (l *linkSOCKS) handler(dial *linkDial, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
|
||||
return l.links.create(
|
||||
conn, // connection
|
||||
name, // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
false, // not forced
|
||||
options, // connection options
|
||||
conn, // connection
|
||||
dial, // connection URL
|
||||
dial.url.String(), // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
false, // not forced
|
||||
options, // connection options
|
||||
)
|
||||
}
|
||||
|
|
|
@ -47,8 +47,12 @@ func (l *linkTCP) dial(url *url.URL, options linkOptions, sintf string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
return l.handler(uri, info, conn, options, false, false)
|
||||
name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
sintf: sintf,
|
||||
}
|
||||
return l.handler(dial, name, info, conn, options, false, false)
|
||||
}
|
||||
|
||||
func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
|
||||
|
@ -86,7 +90,7 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
raddr := conn.RemoteAddr().(*net.TCPAddr)
|
||||
name := fmt.Sprintf("tcp://%s", raddr)
|
||||
info := linkInfoFor("tcp", sintf, tcpIDFor(laddr, raddr))
|
||||
if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
l.core.log.Errorln("Failed to create inbound link:", err)
|
||||
}
|
||||
}
|
||||
|
@ -97,9 +101,10 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
return entry, nil
|
||||
}
|
||||
|
||||
func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
func (l *linkTCP) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
return l.links.create(
|
||||
conn, // connection
|
||||
dial, // connection URL
|
||||
name, // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
|
|
|
@ -69,8 +69,12 @@ func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) err
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
return l.handler(uri, info, conn, options, false, false)
|
||||
name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
sintf: sintf,
|
||||
}
|
||||
return l.handler(dial, name, info, conn, options, false, false)
|
||||
}
|
||||
|
||||
func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
|
||||
|
@ -109,7 +113,7 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
raddr := conn.RemoteAddr().(*net.TCPAddr)
|
||||
name := fmt.Sprintf("tls://%s", raddr)
|
||||
info := linkInfoFor("tls", sintf, tcpIDFor(laddr, raddr))
|
||||
if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
|
||||
l.core.log.Errorln("Failed to create inbound link:", err)
|
||||
}
|
||||
}
|
||||
|
@ -120,20 +124,18 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
|
|||
return entry, nil
|
||||
}
|
||||
|
||||
// RFC5280 section 4.1.2.5
|
||||
var notAfterNeverExpires = time.Date(9999, time.December, 31, 23, 59, 59, 0, time.UTC)
|
||||
|
||||
func (l *linkTLS) generateConfig() (*tls.Config, error) {
|
||||
certBuf := &bytes.Buffer{}
|
||||
|
||||
// TODO: because NotAfter is finite, we should add some mechanism to
|
||||
// regenerate the certificate and restart the listeners periodically
|
||||
// for nodes with very high uptimes. Perhaps regenerate certs and restart
|
||||
// listeners every few months or so.
|
||||
cert := x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
Subject: pkix.Name{
|
||||
CommonName: hex.EncodeToString(l.links.core.public[:]),
|
||||
},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||
NotAfter: notAfterNeverExpires,
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
BasicConstraintsValid: true,
|
||||
|
@ -167,6 +169,6 @@ func (l *linkTLS) generateConfig() (*tls.Config, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (l *linkTLS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
return l.tcp.handler(name, info, conn, options, incoming, force)
|
||||
func (l *linkTLS) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
|
||||
return l.tcp.handler(dial, name, info, conn, options, incoming, force)
|
||||
}
|
||||
|
|
|
@ -45,7 +45,10 @@ func (l *linkUNIX) dial(url *url.URL, options linkOptions, _ string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return l.handler(url.String(), info, conn, options, false)
|
||||
dial := &linkDial{
|
||||
url: url,
|
||||
}
|
||||
return l.handler(dial, url.String(), info, conn, options, false)
|
||||
}
|
||||
|
||||
func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
|
||||
|
@ -74,7 +77,7 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
|
|||
break
|
||||
}
|
||||
info := linkInfoFor("unix", "", url.String())
|
||||
if err = l.handler(url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
|
||||
if err = l.handler(nil, url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
|
||||
l.core.log.Errorln("Failed to create inbound link:", err)
|
||||
}
|
||||
}
|
||||
|
@ -85,9 +88,10 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
|
|||
return entry, nil
|
||||
}
|
||||
|
||||
func (l *linkUNIX) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
|
||||
func (l *linkUNIX) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
|
||||
return l.links.create(
|
||||
conn, // connection
|
||||
dial, // connection URL
|
||||
name, // connection name
|
||||
info, // connection info
|
||||
incoming, // not incoming
|
||||
|
|
|
@ -30,6 +30,7 @@ type Multicast struct {
|
|||
_isOpen bool
|
||||
_listeners map[string]*listenerInfo
|
||||
_interfaces map[string]*interfaceInfo
|
||||
_timer *time.Timer
|
||||
config struct {
|
||||
_groupAddr GroupAddress
|
||||
_interfaces map[MulticastInterface]struct{}
|
||||
|
@ -206,6 +207,15 @@ func (m *Multicast) _getAllowedInterfaces() map[string]*interfaceInfo {
|
|||
return interfaces
|
||||
}
|
||||
|
||||
func (m *Multicast) AnnounceNow() {
|
||||
phony.Block(m, func() {
|
||||
if m._timer != nil && !m._timer.Stop() {
|
||||
<-m._timer.C
|
||||
}
|
||||
m.Act(nil, m._announce)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Multicast) _announce() {
|
||||
if !m._isOpen {
|
||||
return
|
||||
|
@ -328,7 +338,7 @@ func (m *Multicast) _announce() {
|
|||
break
|
||||
}
|
||||
}
|
||||
time.AfterFunc(time.Second, func() {
|
||||
m._timer = time.AfterFunc(time.Second, func() {
|
||||
m.Act(nil, m._announce)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue