Merge branch 'develop' into neilalexander/tryall

This commit is contained in:
Neil Alexander 2022-11-26 16:00:46 +00:00
commit 1adc88ec77
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
13 changed files with 190 additions and 50 deletions

View file

@ -194,7 +194,7 @@ func (c *Core) AddPeer(uri string, sourceInterface string) error {
if err != nil {
return err
}
info, err := c.links.call(u, sourceInterface)
info, err := c.links.call(u, sourceInterface, nil)
if err != nil {
return err
}
@ -236,7 +236,7 @@ func (c *Core) RemovePeer(uri string, sourceInterface string) error {
// This does not add the peer to the peer list, so if the connection drops, the
// peer will not be called again automatically.
func (c *Core) CallPeer(u *url.URL, sintf string) error {
_, err := c.links.call(u, sintf)
_, err := c.links.call(u, sintf, nil)
return err
}

View file

@ -121,6 +121,13 @@ func (c *Core) _addPeerLoop() {
})
}
func (c *Core) RetryPeersNow() {
if c.addPeerTimer != nil && !c.addPeerTimer.Stop() {
<-c.addPeerTimer.C
}
c.Act(nil, c._addPeerLoop)
}
// Stop shuts down the Yggdrasil node.
func (c *Core) Stop() {
phony.Block(c, func() {

View file

@ -34,6 +34,11 @@ type linkInfo struct {
remote string // Remote name or address
}
type linkDial struct {
url *url.URL
sintf string
}
type link struct {
lname string
links *links
@ -105,9 +110,12 @@ func (l *links) isConnectedTo(info linkInfo) bool {
return isConnected
}
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
info := linkInfoFor(u.Scheme, sintf, u.Host)
func (l *links) call(u *url.URL, sintf string, errch chan<- error) (info linkInfo, err error) {
info = linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
if errch != nil {
close(errch) // already connected, no error
}
return info, nil
}
options := linkOptions{
@ -116,6 +124,9 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
if errch != nil {
close(errch)
}
return info, fmt.Errorf("pinned key contains invalid hex characters")
}
var sigPubKey keyArray
@ -125,6 +136,9 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
if errch != nil {
close(errch)
}
return info, fmt.Errorf("priority invalid: %w", err)
}
options.priority = uint8(pi)
@ -132,15 +146,27 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
switch info.linkType {
case "tcp":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "socks":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
@ -163,19 +189,34 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
}
}
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "unix":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
default:
if errch != nil {
close(errch)
}
return info, errors.New("unknown call scheme: " + u.Scheme)
}
return info, nil
@ -197,7 +238,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
return listener, err
}
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
func (l *links) create(conn net.Conn, dial *linkDial, name string, info linkInfo, incoming, force bool, options linkOptions) error {
intf := link{
conn: &linkConn{
Conn: conn,
@ -211,14 +252,14 @@ func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, forc
force: force,
}
go func() {
if err := intf.handler(); err != nil {
if err := intf.handler(dial); err != nil {
l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
}
}()
return nil
}
func (intf *link) handler() error {
func (intf *link) handler(dial *linkDial) error {
defer intf.conn.Close() // nolint:errcheck
// Don't connect to this link more than once.
@ -321,6 +362,30 @@ func (intf *link) handler() error {
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s; error: %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
}
if !intf.incoming && dial != nil {
// The connection was one that we dialled, so wait a second and try to
// dial it again.
var retry func(attempt int)
retry = func(attempt int) {
// intf.links.core.log.Infof("Retrying %s (attempt %d of 5)...", dial.url.String(), attempt)
errch := make(chan error, 1)
if _, err := intf.links.call(dial.url, dial.sintf, errch); err != nil {
return
}
if err := <-errch; err != nil {
if attempt < 3 {
time.AfterFunc(time.Second, func() {
retry(attempt + 1)
})
}
}
}
time.AfterFunc(time.Second, func() {
retry(1)
})
}
return nil
}

View file

@ -37,16 +37,20 @@ func (l *linkSOCKS) dial(url *url.URL, options linkOptions) error {
if err != nil {
return err
}
return l.handler(url.String(), info, conn, options, false)
dial := &linkDial{
url: url,
}
return l.handler(dial, info, conn, options, false)
}
func (l *linkSOCKS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
func (l *linkSOCKS) handler(dial *linkDial, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
return l.links.create(
conn, // connection
name, // connection name
info, // connection info
incoming, // not incoming
false, // not forced
options, // connection options
conn, // connection
dial, // connection URL
dial.url.String(), // connection name
info, // connection info
incoming, // not incoming
false, // not forced
options, // connection options
)
}

View file

@ -32,10 +32,6 @@ func (l *links) newLinkTCP() *linkTCP {
}
func (l *linkTCP) dial(url *url.URL, options linkOptions, sintf string) error {
info := linkInfoFor("tcp", sintf, url.Host)
if l.links.isConnectedTo(info) {
return nil
}
host, p, err := net.SplitHostPort(url.Host)
if err != nil {
return err
@ -57,12 +53,20 @@ func (l *linkTCP) dial(url *url.URL, options linkOptions, sintf string) error {
if err != nil {
continue
}
info := linkInfoFor("tcp", sintf, tcpIDFor(dialer.LocalAddr, addr))
if l.links.isConnectedTo(info) {
return nil
}
conn, err := dialer.DialContext(l.core.ctx, "tcp", addr.String())
if err != nil {
continue
}
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
return l.handler(uri, info, conn, options, false, false)
name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
dial := &linkDial{
url: url,
sintf: sintf,
}
return l.handler(dial, name, info, conn, options, false, false)
}
return fmt.Errorf("failed to connect via %d addresses", len(ips))
}
@ -98,10 +102,11 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
cancel()
break
}
laddr := conn.LocalAddr().(*net.TCPAddr)
raddr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("tcp://%s", raddr)
info := linkInfoFor("tcp", sintf, raddr.String())
if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
info := linkInfoFor("tcp", sintf, tcpIDFor(laddr, raddr))
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
@ -112,9 +117,10 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
return entry, nil
}
func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
func (l *linkTCP) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
return l.links.create(
conn, // connection
dial, // connection URL
name, // connection name
info, // connection info
incoming, // not incoming
@ -195,3 +201,16 @@ func (l *linkTCP) dialerFor(dst *net.TCPAddr, sintf string) (*net.Dialer, error)
}
return dialer, nil
}
func tcpIDFor(local net.Addr, remoteAddr *net.TCPAddr) string {
if localAddr, ok := local.(*net.TCPAddr); ok && localAddr.IP.Equal(remoteAddr.IP) {
// Nodes running on the same host — include both the IP and port.
return remoteAddr.String()
}
if remoteAddr.IP.IsLinkLocalUnicast() {
// Nodes discovered via multicast — include the IP only.
return remoteAddr.IP.String()
}
// Nodes connected remotely — include both the IP and port.
return remoteAddr.String()
}

View file

@ -48,10 +48,6 @@ func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS {
}
func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) error {
info := linkInfoFor("tls", sintf, url.Host)
if l.links.isConnectedTo(info) {
return nil
}
host, p, err := net.SplitHostPort(url.Host)
if err != nil {
return err
@ -73,6 +69,10 @@ func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) err
if err != nil {
continue
}
info := linkInfoFor("tls", sintf, tcpIDFor(dialer.LocalAddr, addr))
if l.links.isConnectedTo(info) {
return nil
}
tlsconfig := l.config.Clone()
tlsconfig.ServerName = sni
tlsdialer := &tls.Dialer{
@ -83,8 +83,12 @@ func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) err
if err != nil {
continue
}
uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
return l.handler(uri, info, conn, options, false, false)
name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
dial := &linkDial{
url: url,
sintf: sintf,
}
return l.handler(dial, name, info, conn, options, false, false)
}
return fmt.Errorf("failed to connect via %d addresses", len(ips))
}
@ -121,10 +125,11 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
cancel()
break
}
laddr := conn.LocalAddr().(*net.TCPAddr)
raddr := conn.RemoteAddr().(*net.TCPAddr)
name := fmt.Sprintf("tls://%s", raddr)
info := linkInfoFor("tls", sintf, raddr.String())
if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
info := linkInfoFor("tls", sintf, tcpIDFor(laddr, raddr))
if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
@ -135,20 +140,18 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
return entry, nil
}
// RFC5280 section 4.1.2.5
var notAfterNeverExpires = time.Date(9999, time.December, 31, 23, 59, 59, 0, time.UTC)
func (l *linkTLS) generateConfig() (*tls.Config, error) {
certBuf := &bytes.Buffer{}
// TODO: because NotAfter is finite, we should add some mechanism to
// regenerate the certificate and restart the listeners periodically
// for nodes with very high uptimes. Perhaps regenerate certs and restart
// listeners every few months or so.
cert := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
CommonName: hex.EncodeToString(l.links.core.public[:]),
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
NotAfter: notAfterNeverExpires,
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
@ -182,6 +185,6 @@ func (l *linkTLS) generateConfig() (*tls.Config, error) {
}, nil
}
func (l *linkTLS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
return l.tcp.handler(name, info, conn, options, incoming, force)
func (l *linkTLS) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
return l.tcp.handler(dial, name, info, conn, options, incoming, force)
}

View file

@ -45,7 +45,10 @@ func (l *linkUNIX) dial(url *url.URL, options linkOptions, _ string) error {
if err != nil {
return err
}
return l.handler(url.String(), info, conn, options, false)
dial := &linkDial{
url: url,
}
return l.handler(dial, url.String(), info, conn, options, false)
}
func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
@ -74,7 +77,7 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
break
}
info := linkInfoFor("unix", "", url.String())
if err = l.handler(url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
if err = l.handler(nil, url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
l.core.log.Errorln("Failed to create inbound link:", err)
}
}
@ -85,9 +88,10 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
return entry, nil
}
func (l *linkUNIX) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
func (l *linkUNIX) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
return l.links.create(
conn, // connection
dial, // connection URL
name, // connection name
info, // connection info
incoming, // not incoming

View file

@ -30,6 +30,7 @@ type Multicast struct {
_isOpen bool
_listeners map[string]*listenerInfo
_interfaces map[string]*interfaceInfo
_timer *time.Timer
config struct {
_groupAddr GroupAddress
_interfaces map[MulticastInterface]struct{}
@ -207,6 +208,15 @@ func (m *Multicast) _getAllowedInterfaces() map[string]*interfaceInfo {
return interfaces
}
func (m *Multicast) AnnounceNow() {
phony.Block(m, func() {
if m._timer != nil && !m._timer.Stop() {
<-m._timer.C
}
m.Act(nil, m._announce)
})
}
func (m *Multicast) _announce() {
if !m._isOpen {
return
@ -329,7 +339,7 @@ func (m *Multicast) _announce() {
break
}
}
time.AfterFunc(time.Second, func() {
m._timer = time.AfterFunc(time.Second, func() {
m.Act(nil, m._announce)
})
}