mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	start migrating tunConn to the actor model
This commit is contained in:
		
							parent
							
								
									b582c444f8
								
							
						
					
					
						commit
						4893a07696
					
				
					 3 changed files with 192 additions and 4 deletions
				
			
		| 
						 | 
				
			
			@ -5,6 +5,7 @@ import (
 | 
			
		|||
	"errors"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/Arceliar/phony"
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/address"
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
			
		||||
| 
						 | 
				
			
			@ -16,6 +17,7 @@ import (
 | 
			
		|||
const tunConnTimeout = 2 * time.Minute
 | 
			
		||||
 | 
			
		||||
type tunConn struct {
 | 
			
		||||
	phony.Actor
 | 
			
		||||
	tun   *TunAdapter
 | 
			
		||||
	conn  *yggdrasil.Conn
 | 
			
		||||
	addr  address.Address
 | 
			
		||||
| 
						 | 
				
			
			@ -45,6 +47,83 @@ func (s *tunConn) _close_nomutex() {
 | 
			
		|||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *tunConn) _read(bs []byte) (err error) {
 | 
			
		||||
	select {
 | 
			
		||||
	case <-s.stop:
 | 
			
		||||
		err = errors.New("session was already closed")
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		return
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
	if len(bs) == 0 {
 | 
			
		||||
		err = errors.New("read packet with 0 size")
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
 | 
			
		||||
	ipv6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
 | 
			
		||||
	isCGA := true
 | 
			
		||||
	// Check source addresses
 | 
			
		||||
	switch {
 | 
			
		||||
	case ipv6 && bs[8] == 0x02 && bytes.Equal(s.addr[:16], bs[8:24]): // source
 | 
			
		||||
	case ipv6 && bs[8] == 0x03 && bytes.Equal(s.snet[:8], bs[8:16]): // source
 | 
			
		||||
	default:
 | 
			
		||||
		isCGA = false
 | 
			
		||||
	}
 | 
			
		||||
	// Check destiantion addresses
 | 
			
		||||
	switch {
 | 
			
		||||
	case ipv6 && bs[24] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[24:40]): // destination
 | 
			
		||||
	case ipv6 && bs[24] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[24:32]): // destination
 | 
			
		||||
	default:
 | 
			
		||||
		isCGA = false
 | 
			
		||||
	}
 | 
			
		||||
	// Decide how to handle the packet
 | 
			
		||||
	var skip bool
 | 
			
		||||
	switch {
 | 
			
		||||
	case isCGA: // Allowed
 | 
			
		||||
	case s.tun.ckr.isEnabled() && (ipv4 || ipv6):
 | 
			
		||||
		var srcAddr address.Address
 | 
			
		||||
		var dstAddr address.Address
 | 
			
		||||
		var addrlen int
 | 
			
		||||
		if ipv4 {
 | 
			
		||||
			copy(srcAddr[:], bs[12:16])
 | 
			
		||||
			copy(dstAddr[:], bs[16:20])
 | 
			
		||||
			addrlen = 4
 | 
			
		||||
		}
 | 
			
		||||
		if ipv6 {
 | 
			
		||||
			copy(srcAddr[:], bs[8:24])
 | 
			
		||||
			copy(dstAddr[:], bs[24:40])
 | 
			
		||||
			addrlen = 16
 | 
			
		||||
		}
 | 
			
		||||
		if !s.tun.ckr.isValidLocalAddress(dstAddr, addrlen) {
 | 
			
		||||
			// The destination address isn't in our CKR allowed range
 | 
			
		||||
			skip = true
 | 
			
		||||
		} else if key, err := s.tun.ckr.getPublicKeyForAddress(srcAddr, addrlen); err == nil {
 | 
			
		||||
			srcNodeID := crypto.GetNodeID(&key)
 | 
			
		||||
			if s.conn.RemoteAddr() == *srcNodeID {
 | 
			
		||||
				// This is the one allowed CKR case, where source and destination addresses are both good
 | 
			
		||||
			} else {
 | 
			
		||||
				// The CKR key associated with this address doesn't match the sender's NodeID
 | 
			
		||||
				skip = true
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			// We have no CKR route for this source address
 | 
			
		||||
			skip = true
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		skip = true
 | 
			
		||||
	}
 | 
			
		||||
	if skip {
 | 
			
		||||
		err = errors.New("address not allowed")
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// FIXME this send can block if the tuntap isn't running, which isn't really safe...
 | 
			
		||||
	s.tun.send <- bs
 | 
			
		||||
	s.stillAlive()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *tunConn) reader() (err error) {
 | 
			
		||||
	select {
 | 
			
		||||
	case _, ok := <-s.stop:
 | 
			
		||||
| 
						 | 
				
			
			@ -137,6 +216,110 @@ func (s *tunConn) reader() (err error) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *tunConn) _write(bs []byte) (err error) {
 | 
			
		||||
	select {
 | 
			
		||||
	case <-s.stop:
 | 
			
		||||
		err = errors.New("session was already closed")
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		return
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
	v4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
 | 
			
		||||
	v6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
 | 
			
		||||
	isCGA := true
 | 
			
		||||
	// Check source addresses
 | 
			
		||||
	switch {
 | 
			
		||||
	case v6 && bs[8] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[8:24]): // source
 | 
			
		||||
	case v6 && bs[8] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[8:16]): // source
 | 
			
		||||
	default:
 | 
			
		||||
		isCGA = false
 | 
			
		||||
	}
 | 
			
		||||
	// Check destiantion addresses
 | 
			
		||||
	switch {
 | 
			
		||||
	case v6 && bs[24] == 0x02 && bytes.Equal(s.addr[:16], bs[24:40]): // destination
 | 
			
		||||
	case v6 && bs[24] == 0x03 && bytes.Equal(s.snet[:8], bs[24:32]): // destination
 | 
			
		||||
	default:
 | 
			
		||||
		isCGA = false
 | 
			
		||||
	}
 | 
			
		||||
	// Decide how to handle the packet
 | 
			
		||||
	var skip bool
 | 
			
		||||
	switch {
 | 
			
		||||
	case isCGA: // Allowed
 | 
			
		||||
	case s.tun.ckr.isEnabled() && (v4 || v6):
 | 
			
		||||
		var srcAddr address.Address
 | 
			
		||||
		var dstAddr address.Address
 | 
			
		||||
		var addrlen int
 | 
			
		||||
		if v4 {
 | 
			
		||||
			copy(srcAddr[:], bs[12:16])
 | 
			
		||||
			copy(dstAddr[:], bs[16:20])
 | 
			
		||||
			addrlen = 4
 | 
			
		||||
		}
 | 
			
		||||
		if v6 {
 | 
			
		||||
			copy(srcAddr[:], bs[8:24])
 | 
			
		||||
			copy(dstAddr[:], bs[24:40])
 | 
			
		||||
			addrlen = 16
 | 
			
		||||
		}
 | 
			
		||||
		if !s.tun.ckr.isValidLocalAddress(srcAddr, addrlen) {
 | 
			
		||||
			// The source address isn't in our CKR allowed range
 | 
			
		||||
			skip = true
 | 
			
		||||
		} else if key, err := s.tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil {
 | 
			
		||||
			dstNodeID := crypto.GetNodeID(&key)
 | 
			
		||||
			if s.conn.RemoteAddr() == *dstNodeID {
 | 
			
		||||
				// This is the one allowed CKR case, where source and destination addresses are both good
 | 
			
		||||
			} else {
 | 
			
		||||
				// The CKR key associated with this address doesn't match the sender's NodeID
 | 
			
		||||
				skip = true
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			// We have no CKR route for this destination address... why do we have the packet in the first place?
 | 
			
		||||
			skip = true
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		skip = true
 | 
			
		||||
	}
 | 
			
		||||
	if skip {
 | 
			
		||||
		err = errors.New("address not allowed")
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	msg := yggdrasil.FlowKeyMessage{
 | 
			
		||||
		FlowKey: util.GetFlowKey(bs),
 | 
			
		||||
		Message: bs,
 | 
			
		||||
	}
 | 
			
		||||
	s.conn.WriteFrom(s, msg, func(err error) {
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			// No point in wasting resources to send back an error if there was none
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		s.EnqueueFrom(s.conn, func() {
 | 
			
		||||
			if e, eok := err.(yggdrasil.ConnError); !eok {
 | 
			
		||||
				if e.Closed() {
 | 
			
		||||
					s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
 | 
			
		||||
				} else {
 | 
			
		||||
					s.tun.log.Errorln(s.conn.String(), "TUN/TAP generic write error:", err)
 | 
			
		||||
				}
 | 
			
		||||
			} else if e.PacketTooBig() {
 | 
			
		||||
				// TODO: This currently isn't aware of IPv4 for CKR
 | 
			
		||||
				ptb := &icmp.PacketTooBig{
 | 
			
		||||
					MTU:  int(e.PacketMaximumSize()),
 | 
			
		||||
					Data: bs[:900],
 | 
			
		||||
				}
 | 
			
		||||
				if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
 | 
			
		||||
					s.tun.send <- packet
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				if e.Closed() {
 | 
			
		||||
					s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn write debug:", err)
 | 
			
		||||
				} else {
 | 
			
		||||
					s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
	s.stillAlive()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *tunConn) writer() error {
 | 
			
		||||
	select {
 | 
			
		||||
	case _, ok := <-s.stop:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -232,7 +232,7 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
 | 
			
		|||
				if tc != nil {
 | 
			
		||||
					for _, packet := range packets {
 | 
			
		||||
						p := packet // Possibly required because of how range
 | 
			
		||||
						tc.send <- p
 | 
			
		||||
						<-tc.SyncExec(func() { tc._write(p) })
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}()
 | 
			
		||||
| 
						 | 
				
			
			@ -242,7 +242,7 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
 | 
			
		|||
		}
 | 
			
		||||
		// If we have a connection now, try writing to it
 | 
			
		||||
		if isIn && session != nil {
 | 
			
		||||
			session.send <- bs
 | 
			
		||||
			<-session.SyncExec(func() { session._write(bs) })
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -257,8 +257,13 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
 | 
			
		|||
	tun.addrToConn[s.addr] = &s
 | 
			
		||||
	tun.subnetToConn[s.snet] = &s
 | 
			
		||||
	// Start the connection goroutines
 | 
			
		||||
	go s.reader()
 | 
			
		||||
	go s.writer()
 | 
			
		||||
	conn.SetReadCallback(func(bs []byte) {
 | 
			
		||||
		s.EnqueueFrom(conn, func() {
 | 
			
		||||
			s._read(bs)
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
	//go s.reader()
 | 
			
		||||
	//go s.writer()
 | 
			
		||||
	go s.checkForTimeouts()
 | 
			
		||||
	// Return
 | 
			
		||||
	return c, err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue