mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	resolve merge conflicts
This commit is contained in:
		
						commit
						07206b5d46
					
				
					 4 changed files with 91 additions and 29 deletions
				
			
		| 
						 | 
				
			
			@ -145,7 +145,8 @@ func (c *Conn) search() error {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// Used in session keep-alive traffic
 | 
			
		||||
func (c *Conn) doSearch() {
 | 
			
		||||
func (c *Conn) _doSearch() {
 | 
			
		||||
	s := fmt.Sprintf("conn=%p", c)
 | 
			
		||||
	routerWork := func() {
 | 
			
		||||
		// Check to see if there is a search already matching the destination
 | 
			
		||||
		sinfo, isIn := c.core.router.searches.searches[*c.nodeID]
 | 
			
		||||
| 
						 | 
				
			
			@ -153,7 +154,7 @@ func (c *Conn) doSearch() {
 | 
			
		|||
			// Nothing was found, so create a new search
 | 
			
		||||
			searchCompleted := func(sinfo *sessionInfo, e error) {}
 | 
			
		||||
			sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
 | 
			
		||||
			c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
 | 
			
		||||
			c.core.log.Debugf("%s DHT search started: %p", s, sinfo)
 | 
			
		||||
			// Start the search
 | 
			
		||||
			sinfo.startSearch()
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -268,7 +269,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
 | 
			
		|||
		case time.Since(c.session.time) > 6*time.Second:
 | 
			
		||||
			if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
 | 
			
		||||
				// TODO double check that the above condition is correct
 | 
			
		||||
				c.doSearch()
 | 
			
		||||
				c._doSearch()
 | 
			
		||||
			} else {
 | 
			
		||||
				c.session.ping(c.session) // TODO send from self if this becomes an actor
 | 
			
		||||
			}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,6 +16,7 @@ import (
 | 
			
		|||
	"github.com/yggdrasil-network/yggdrasil-go/src/address"
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
			
		||||
	"golang.org/x/net/proxy"
 | 
			
		||||
 | 
			
		||||
	"github.com/Arceliar/phony"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -50,6 +51,7 @@ type link struct {
 | 
			
		|||
	lname          string
 | 
			
		||||
	links          *links
 | 
			
		||||
	peer           *peer
 | 
			
		||||
	options        linkOptions
 | 
			
		||||
	msgIO          linkMsgIO
 | 
			
		||||
	info           linkInfo
 | 
			
		||||
	incoming       bool
 | 
			
		||||
| 
						 | 
				
			
			@ -66,6 +68,11 @@ type link struct {
 | 
			
		|||
	blocked        bool        // True if we've blocked the peer in the switch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type linkOptions struct {
 | 
			
		||||
	pinnedCurve25519Keys map[crypto.BoxPubKey]struct{}
 | 
			
		||||
	pinnedEd25519Keys    map[crypto.SigPubKey]struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *links) init(c *Core) error {
 | 
			
		||||
	l.core = c
 | 
			
		||||
	l.mutex.Lock()
 | 
			
		||||
| 
						 | 
				
			
			@ -91,13 +98,41 @@ func (l *links) call(uri string, sintf string) error {
 | 
			
		|||
		return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
 | 
			
		||||
	}
 | 
			
		||||
	pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
 | 
			
		||||
	tcpOpts := tcpOptions{}
 | 
			
		||||
	if pubkeys, ok := u.Query()["curve25519"]; ok && len(pubkeys) > 0 {
 | 
			
		||||
		tcpOpts.pinnedCurve25519Keys = make(map[crypto.BoxPubKey]struct{})
 | 
			
		||||
		for _, pubkey := range pubkeys {
 | 
			
		||||
			if boxPub, err := hex.DecodeString(pubkey); err == nil {
 | 
			
		||||
				var boxPubKey crypto.BoxPubKey
 | 
			
		||||
				copy(boxPubKey[:], boxPub)
 | 
			
		||||
				tcpOpts.pinnedCurve25519Keys[boxPubKey] = struct{}{}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if pubkeys, ok := u.Query()["ed25519"]; ok && len(pubkeys) > 0 {
 | 
			
		||||
		tcpOpts.pinnedEd25519Keys = make(map[crypto.SigPubKey]struct{})
 | 
			
		||||
		for _, pubkey := range pubkeys {
 | 
			
		||||
			if sigPub, err := hex.DecodeString(pubkey); err == nil {
 | 
			
		||||
				var sigPubKey crypto.SigPubKey
 | 
			
		||||
				copy(sigPubKey[:], sigPub)
 | 
			
		||||
				tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	switch u.Scheme {
 | 
			
		||||
	case "tcp":
 | 
			
		||||
		l.tcp.call(u.Host, nil, sintf, nil)
 | 
			
		||||
		l.tcp.call(u.Host, tcpOpts, sintf)
 | 
			
		||||
	case "socks":
 | 
			
		||||
		l.tcp.call(pathtokens[0], u.Host, sintf, nil)
 | 
			
		||||
		tcpOpts.socksProxyAddr = u.Host
 | 
			
		||||
		if u.User != nil {
 | 
			
		||||
			tcpOpts.socksProxyAuth = &proxy.Auth{}
 | 
			
		||||
			tcpOpts.socksProxyAuth.User = u.User.Username()
 | 
			
		||||
			tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
 | 
			
		||||
		}
 | 
			
		||||
		l.tcp.call(pathtokens[0], tcpOpts, sintf)
 | 
			
		||||
	case "tls":
 | 
			
		||||
		l.tcp.call(u.Host, nil, sintf, l.tcp.tls.forDialer)
 | 
			
		||||
		tcpOpts.upgrade = l.tcp.tls.forDialer
 | 
			
		||||
		l.tcp.call(u.Host, tcpOpts, sintf)
 | 
			
		||||
	default:
 | 
			
		||||
		return errors.New("unknown call scheme: " + u.Scheme)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -121,12 +156,13 @@ func (l *links) listen(uri string) error {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*link, error) {
 | 
			
		||||
func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
 | 
			
		||||
	// Technically anything unique would work for names, but let's pick something human readable, just for debugging
 | 
			
		||||
	intf := link{
 | 
			
		||||
		lname: name,
 | 
			
		||||
		links: l,
 | 
			
		||||
		msgIO: msgIO,
 | 
			
		||||
		lname:   name,
 | 
			
		||||
		links:   l,
 | 
			
		||||
		options: options,
 | 
			
		||||
		msgIO:   msgIO,
 | 
			
		||||
		info: linkInfo{
 | 
			
		||||
			linkType: linkType,
 | 
			
		||||
			local:    local,
 | 
			
		||||
| 
						 | 
				
			
			@ -190,6 +226,20 @@ func (intf *link) handler() error {
 | 
			
		|||
		intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
 | 
			
		||||
		return errors.New("failed to connect: wrong version")
 | 
			
		||||
	}
 | 
			
		||||
	// Check if the remote side matches the keys we expected. This is a bit of a weak
 | 
			
		||||
	// check - in future versions we really should check a signature or something like that.
 | 
			
		||||
	if pinned := intf.options.pinnedCurve25519Keys; pinned != nil {
 | 
			
		||||
		if _, allowed := pinned[meta.box]; !allowed {
 | 
			
		||||
			intf.links.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name)
 | 
			
		||||
			return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
 | 
			
		||||
		if _, allowed := pinned[meta.sig]; !allowed {
 | 
			
		||||
			intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
 | 
			
		||||
			return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Check if we're authorized to connect to this key / IP
 | 
			
		||||
	if intf.incoming && !intf.force && !intf.links.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
 | 
			
		||||
		intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,7 +58,7 @@ func (c *Core) NewSimlink() *Simlink {
 | 
			
		|||
	s := &Simlink{rch: make(chan []byte, 1)}
 | 
			
		||||
	n := "Simlink"
 | 
			
		||||
	var err error
 | 
			
		||||
	s.link, err = c.links.create(s, n, n, n, n, false, true)
 | 
			
		||||
	s.link, err = c.links.create(s, n, n, n, n, false, true, linkOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,6 +57,14 @@ type TcpUpgrade struct {
 | 
			
		|||
	name    string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type tcpOptions struct {
 | 
			
		||||
	linkOptions
 | 
			
		||||
	upgrade        *TcpUpgrade
 | 
			
		||||
	socksProxyAddr string
 | 
			
		||||
	socksProxyAuth *proxy.Auth
 | 
			
		||||
	socksPeerAddr  string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *TcpListener) Stop() {
 | 
			
		||||
	defer func() { recover() }()
 | 
			
		||||
	close(l.stop)
 | 
			
		||||
| 
						 | 
				
			
			@ -221,7 +229,10 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
 | 
			
		|||
			return
 | 
			
		||||
		}
 | 
			
		||||
		t.waitgroup.Add(1)
 | 
			
		||||
		go t.handler(sock, true, nil, l.upgrade)
 | 
			
		||||
		options := tcpOptions{
 | 
			
		||||
			upgrade: l.upgrade,
 | 
			
		||||
		}
 | 
			
		||||
		go t.handler(sock, true, options)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -239,12 +250,12 @@ func (t *tcp) startCalling(saddr string) bool {
 | 
			
		|||
// If the dial is successful, it launches the handler.
 | 
			
		||||
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
 | 
			
		||||
// This all happens in a separate goroutine that it spawns.
 | 
			
		||||
func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *TcpUpgrade) {
 | 
			
		||||
func (t *tcp) call(saddr string, options tcpOptions, sintf string) {
 | 
			
		||||
	go func() {
 | 
			
		||||
		callname := saddr
 | 
			
		||||
		callproto := "TCP"
 | 
			
		||||
		if upgrade != nil {
 | 
			
		||||
			callproto = strings.ToUpper(upgrade.name)
 | 
			
		||||
		if options.upgrade != nil {
 | 
			
		||||
			callproto = strings.ToUpper(options.upgrade.name)
 | 
			
		||||
		}
 | 
			
		||||
		if sintf != "" {
 | 
			
		||||
			callname = fmt.Sprintf("%s/%s/%s", callproto, saddr, sintf)
 | 
			
		||||
| 
						 | 
				
			
			@ -263,17 +274,16 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
 | 
			
		|||
		}()
 | 
			
		||||
		var conn net.Conn
 | 
			
		||||
		var err error
 | 
			
		||||
		socksaddr, issocks := options.(string)
 | 
			
		||||
		if issocks {
 | 
			
		||||
		if options.socksProxyAddr != "" {
 | 
			
		||||
			if sintf != "" {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			dialerdst, er := net.ResolveTCPAddr("tcp", socksaddr)
 | 
			
		||||
			dialerdst, er := net.ResolveTCPAddr("tcp", options.socksProxyAddr)
 | 
			
		||||
			if er != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			var dialer proxy.Dialer
 | 
			
		||||
			dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), nil, proxy.Direct)
 | 
			
		||||
			dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), options.socksProxyAuth, proxy.Direct)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
| 
						 | 
				
			
			@ -282,7 +292,8 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
 | 
			
		|||
				return
 | 
			
		||||
			}
 | 
			
		||||
			t.waitgroup.Add(1)
 | 
			
		||||
			t.handler(conn, false, saddr, nil)
 | 
			
		||||
			options.socksPeerAddr = conn.RemoteAddr().String()
 | 
			
		||||
			t.handler(conn, false, options)
 | 
			
		||||
		} else {
 | 
			
		||||
			dst, err := net.ResolveTCPAddr("tcp", saddr)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -348,19 +359,19 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
 | 
			
		|||
				return
 | 
			
		||||
			}
 | 
			
		||||
			t.waitgroup.Add(1)
 | 
			
		||||
			t.handler(conn, false, nil, upgrade)
 | 
			
		||||
			t.handler(conn, false, options)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade *TcpUpgrade) {
 | 
			
		||||
func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
 | 
			
		||||
	defer t.waitgroup.Done() // Happens after sock.close
 | 
			
		||||
	defer sock.Close()
 | 
			
		||||
	t.setExtraOptions(sock)
 | 
			
		||||
	var upgraded bool
 | 
			
		||||
	if upgrade != nil {
 | 
			
		||||
	if options.upgrade != nil {
 | 
			
		||||
		var err error
 | 
			
		||||
		if sock, err = upgrade.upgrade(sock); err != nil {
 | 
			
		||||
		if sock, err = options.upgrade.upgrade(sock); err != nil {
 | 
			
		||||
			t.links.core.log.Errorln("TCP handler upgrade failed:", err)
 | 
			
		||||
			return
 | 
			
		||||
		} else {
 | 
			
		||||
| 
						 | 
				
			
			@ -370,14 +381,14 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade
 | 
			
		|||
	stream := stream{}
 | 
			
		||||
	stream.init(sock)
 | 
			
		||||
	var name, proto, local, remote string
 | 
			
		||||
	if socksaddr, issocks := options.(string); issocks {
 | 
			
		||||
		name = "socks://" + sock.RemoteAddr().String() + "/" + socksaddr
 | 
			
		||||
	if options.socksProxyAddr != "" {
 | 
			
		||||
		name = "socks://" + sock.RemoteAddr().String() + "/" + options.socksPeerAddr
 | 
			
		||||
		proto = "socks"
 | 
			
		||||
		local, _, _ = net.SplitHostPort(sock.LocalAddr().String())
 | 
			
		||||
		remote, _, _ = net.SplitHostPort(socksaddr)
 | 
			
		||||
		remote, _, _ = net.SplitHostPort(options.socksPeerAddr)
 | 
			
		||||
	} else {
 | 
			
		||||
		if upgraded {
 | 
			
		||||
			proto = upgrade.name
 | 
			
		||||
			proto = options.upgrade.name
 | 
			
		||||
			name = proto + "://" + sock.RemoteAddr().String()
 | 
			
		||||
		} else {
 | 
			
		||||
			proto = "tcp"
 | 
			
		||||
| 
						 | 
				
			
			@ -387,7 +398,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade
 | 
			
		|||
		remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
 | 
			
		||||
	}
 | 
			
		||||
	force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
 | 
			
		||||
	link, err := t.links.create(&stream, name, proto, local, remote, incoming, force)
 | 
			
		||||
	link, err := t.links.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.links.core.log.Println(err)
 | 
			
		||||
		panic(err)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue