mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	Redial failed connections if possible (#983)
This commit is contained in:
		
							parent
							
								
									0da871f528
								
							
						
					
					
						commit
						7efd66932f
					
				
					 9 changed files with 124 additions and 28 deletions
				
			
		| 
						 | 
				
			
			@ -194,7 +194,7 @@ func (c *Core) AddPeer(uri string, sourceInterface string) error {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	info, err := c.links.call(u, sourceInterface)
 | 
			
		||||
	info, err := c.links.call(u, sourceInterface, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -236,7 +236,7 @@ func (c *Core) RemovePeer(uri string, sourceInterface string) error {
 | 
			
		|||
// This does not add the peer to the peer list, so if the connection drops, the
 | 
			
		||||
// peer will not be called again automatically.
 | 
			
		||||
func (c *Core) CallPeer(u *url.URL, sintf string) error {
 | 
			
		||||
	_, err := c.links.call(u, sintf)
 | 
			
		||||
	_, err := c.links.call(u, sintf, nil)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -121,6 +121,13 @@ func (c *Core) _addPeerLoop() {
 | 
			
		|||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Core) RetryPeersNow() {
 | 
			
		||||
	if c.addPeerTimer != nil && !c.addPeerTimer.Stop() {
 | 
			
		||||
		<-c.addPeerTimer.C
 | 
			
		||||
	}
 | 
			
		||||
	c.Act(nil, c._addPeerLoop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Stop shuts down the Yggdrasil node.
 | 
			
		||||
func (c *Core) Stop() {
 | 
			
		||||
	phony.Block(c, func() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,6 +34,11 @@ type linkInfo struct {
 | 
			
		|||
	remote   string // Remote name or address
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type linkDial struct {
 | 
			
		||||
	url   *url.URL
 | 
			
		||||
	sintf string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type link struct {
 | 
			
		||||
	lname    string
 | 
			
		||||
	links    *links
 | 
			
		||||
| 
						 | 
				
			
			@ -105,9 +110,10 @@ func (l *links) isConnectedTo(info linkInfo) bool {
 | 
			
		|||
	return isConnected
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
 | 
			
		||||
	info := linkInfoFor(u.Scheme, sintf, u.Host)
 | 
			
		||||
func (l *links) call(u *url.URL, sintf string, errch chan<- error) (info linkInfo, err error) {
 | 
			
		||||
	info = linkInfoFor(u.Scheme, sintf, u.Host)
 | 
			
		||||
	if l.isConnectedTo(info) {
 | 
			
		||||
		close(errch) // already connected, no error
 | 
			
		||||
		return info, nil
 | 
			
		||||
	}
 | 
			
		||||
	options := linkOptions{
 | 
			
		||||
| 
						 | 
				
			
			@ -116,6 +122,7 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
 | 
			
		|||
	for _, pubkey := range u.Query()["key"] {
 | 
			
		||||
		sigPub, err := hex.DecodeString(pubkey)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			close(errch)
 | 
			
		||||
			return info, fmt.Errorf("pinned key contains invalid hex characters")
 | 
			
		||||
		}
 | 
			
		||||
		var sigPubKey keyArray
 | 
			
		||||
| 
						 | 
				
			
			@ -125,6 +132,7 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
 | 
			
		|||
	if p := u.Query().Get("priority"); p != "" {
 | 
			
		||||
		pi, err := strconv.ParseUint(p, 10, 8)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			close(errch)
 | 
			
		||||
			return info, fmt.Errorf("priority invalid: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
		options.priority = uint8(pi)
 | 
			
		||||
| 
						 | 
				
			
			@ -132,15 +140,27 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
 | 
			
		|||
	switch info.linkType {
 | 
			
		||||
	case "tcp":
 | 
			
		||||
		go func() {
 | 
			
		||||
			if errch != nil {
 | 
			
		||||
				defer close(errch)
 | 
			
		||||
			}
 | 
			
		||||
			if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
 | 
			
		||||
				l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
 | 
			
		||||
				if errch != nil {
 | 
			
		||||
					errch <- err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
	case "socks":
 | 
			
		||||
		go func() {
 | 
			
		||||
			if errch != nil {
 | 
			
		||||
				defer close(errch)
 | 
			
		||||
			}
 | 
			
		||||
			if err := l.socks.dial(u, options); err != nil && err != io.EOF {
 | 
			
		||||
				l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
 | 
			
		||||
				if errch != nil {
 | 
			
		||||
					errch <- err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -163,19 +183,32 @@ func (l *links) call(u *url.URL, sintf string) (linkInfo, error) {
 | 
			
		|||
			}
 | 
			
		||||
		}
 | 
			
		||||
		go func() {
 | 
			
		||||
			if errch != nil {
 | 
			
		||||
				defer close(errch)
 | 
			
		||||
			}
 | 
			
		||||
			if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
 | 
			
		||||
				l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
 | 
			
		||||
				if errch != nil {
 | 
			
		||||
					errch <- err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
	case "unix":
 | 
			
		||||
		go func() {
 | 
			
		||||
			if errch != nil {
 | 
			
		||||
				defer close(errch)
 | 
			
		||||
			}
 | 
			
		||||
			if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
 | 
			
		||||
				l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
 | 
			
		||||
				if errch != nil {
 | 
			
		||||
					errch <- err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
	default:
 | 
			
		||||
		close(errch)
 | 
			
		||||
		return info, errors.New("unknown call scheme: " + u.Scheme)
 | 
			
		||||
	}
 | 
			
		||||
	return info, nil
 | 
			
		||||
| 
						 | 
				
			
			@ -197,7 +230,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
 | 
			
		|||
	return listener, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error {
 | 
			
		||||
func (l *links) create(conn net.Conn, dial *linkDial, name string, info linkInfo, incoming, force bool, options linkOptions) error {
 | 
			
		||||
	intf := link{
 | 
			
		||||
		conn: &linkConn{
 | 
			
		||||
			Conn: conn,
 | 
			
		||||
| 
						 | 
				
			
			@ -211,14 +244,14 @@ func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, forc
 | 
			
		|||
		force:    force,
 | 
			
		||||
	}
 | 
			
		||||
	go func() {
 | 
			
		||||
		if err := intf.handler(); err != nil {
 | 
			
		||||
		if err := intf.handler(dial); err != nil {
 | 
			
		||||
			l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (intf *link) handler() error {
 | 
			
		||||
func (intf *link) handler(dial *linkDial) error {
 | 
			
		||||
	defer intf.conn.Close() // nolint:errcheck
 | 
			
		||||
 | 
			
		||||
	// Don't connect to this link more than once.
 | 
			
		||||
| 
						 | 
				
			
			@ -321,6 +354,30 @@ func (intf *link) handler() error {
 | 
			
		|||
		intf.links.core.log.Infof("Disconnected %s %s: %s, source %s; error: %s",
 | 
			
		||||
			dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !intf.incoming && dial != nil {
 | 
			
		||||
		// The connection was one that we dialled, so wait a second and try to
 | 
			
		||||
		// dial it again.
 | 
			
		||||
		var retry func(attempt int)
 | 
			
		||||
		retry = func(attempt int) {
 | 
			
		||||
			// intf.links.core.log.Infof("Retrying %s (attempt %d of 5)...", dial.url.String(), attempt)
 | 
			
		||||
			errch := make(chan error, 1)
 | 
			
		||||
			if _, err := intf.links.call(dial.url, dial.sintf, errch); err != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if err := <-errch; err != nil {
 | 
			
		||||
				if attempt < 3 {
 | 
			
		||||
					time.AfterFunc(time.Second, func() {
 | 
			
		||||
						retry(attempt + 1)
 | 
			
		||||
					})
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		time.AfterFunc(time.Second, func() {
 | 
			
		||||
			retry(1)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,16 +37,20 @@ func (l *linkSOCKS) dial(url *url.URL, options linkOptions) error {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(url.String(), info, conn, options, false)
 | 
			
		||||
	dial := &linkDial{
 | 
			
		||||
		url: url,
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(dial, info, conn, options, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkSOCKS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
 | 
			
		||||
func (l *linkSOCKS) handler(dial *linkDial, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
 | 
			
		||||
	return l.links.create(
 | 
			
		||||
		conn,     // connection
 | 
			
		||||
		name,     // connection name
 | 
			
		||||
		info,     // connection info
 | 
			
		||||
		incoming, // not incoming
 | 
			
		||||
		false,    // not forced
 | 
			
		||||
		options,  // connection options
 | 
			
		||||
		conn,              // connection
 | 
			
		||||
		dial,              // connection URL
 | 
			
		||||
		dial.url.String(), // connection name
 | 
			
		||||
		info,              // connection info
 | 
			
		||||
		incoming,          // not incoming
 | 
			
		||||
		false,             // not forced
 | 
			
		||||
		options,           // connection options
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,8 +47,12 @@ func (l *linkTCP) dial(url *url.URL, options linkOptions, sintf string) error {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
 | 
			
		||||
	return l.handler(uri, info, conn, options, false, false)
 | 
			
		||||
	name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
 | 
			
		||||
	dial := &linkDial{
 | 
			
		||||
		url:   url,
 | 
			
		||||
		sintf: sintf,
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(dial, name, info, conn, options, false, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -86,7 +90,7 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
 | 
			
		|||
			raddr := conn.RemoteAddr().(*net.TCPAddr)
 | 
			
		||||
			name := fmt.Sprintf("tcp://%s", raddr)
 | 
			
		||||
			info := linkInfoFor("tcp", sintf, tcpIDFor(laddr, raddr))
 | 
			
		||||
			if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
 | 
			
		||||
			if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
 | 
			
		||||
				l.core.log.Errorln("Failed to create inbound link:", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -97,9 +101,10 @@ func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) {
 | 
			
		|||
	return entry, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
 | 
			
		||||
func (l *linkTCP) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
 | 
			
		||||
	return l.links.create(
 | 
			
		||||
		conn,     // connection
 | 
			
		||||
		dial,     // connection URL
 | 
			
		||||
		name,     // connection name
 | 
			
		||||
		info,     // connection info
 | 
			
		||||
		incoming, // not incoming
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -69,8 +69,12 @@ func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) err
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	uri := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
 | 
			
		||||
	return l.handler(uri, info, conn, options, false, false)
 | 
			
		||||
	name := strings.TrimRight(strings.SplitN(url.String(), "?", 2)[0], "/")
 | 
			
		||||
	dial := &linkDial{
 | 
			
		||||
		url:   url,
 | 
			
		||||
		sintf: sintf,
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(dial, name, info, conn, options, false, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -109,7 +113,7 @@ func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) {
 | 
			
		|||
			raddr := conn.RemoteAddr().(*net.TCPAddr)
 | 
			
		||||
			name := fmt.Sprintf("tls://%s", raddr)
 | 
			
		||||
			info := linkInfoFor("tls", sintf, tcpIDFor(laddr, raddr))
 | 
			
		||||
			if err = l.handler(name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
 | 
			
		||||
			if err = l.handler(nil, name, info, conn, linkOptionsForListener(url), true, raddr.IP.IsLinkLocalUnicast()); err != nil {
 | 
			
		||||
				l.core.log.Errorln("Failed to create inbound link:", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -165,6 +169,6 @@ func (l *linkTLS) generateConfig() (*tls.Config, error) {
 | 
			
		|||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkTLS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
 | 
			
		||||
	return l.tcp.handler(name, info, conn, options, incoming, force)
 | 
			
		||||
func (l *linkTLS) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming, force bool) error {
 | 
			
		||||
	return l.tcp.handler(dial, name, info, conn, options, incoming, force)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,7 +45,10 @@ func (l *linkUNIX) dial(url *url.URL, options linkOptions, _ string) error {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(url.String(), info, conn, options, false)
 | 
			
		||||
	dial := &linkDial{
 | 
			
		||||
		url: url,
 | 
			
		||||
	}
 | 
			
		||||
	return l.handler(dial, url.String(), info, conn, options, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +77,7 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
 | 
			
		|||
				break
 | 
			
		||||
			}
 | 
			
		||||
			info := linkInfoFor("unix", "", url.String())
 | 
			
		||||
			if err = l.handler(url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
 | 
			
		||||
			if err = l.handler(nil, url.String(), info, conn, linkOptionsForListener(url), true); err != nil {
 | 
			
		||||
				l.core.log.Errorln("Failed to create inbound link:", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -85,9 +88,10 @@ func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) {
 | 
			
		|||
	return entry, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *linkUNIX) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
 | 
			
		||||
func (l *linkUNIX) handler(dial *linkDial, name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error {
 | 
			
		||||
	return l.links.create(
 | 
			
		||||
		conn,     // connection
 | 
			
		||||
		dial,     // connection URL
 | 
			
		||||
		name,     // connection name
 | 
			
		||||
		info,     // connection info
 | 
			
		||||
		incoming, // not incoming
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue