mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	get rid of session workers, new util.PutBytes/GetBytes logic
This commit is contained in:
		
							parent
							
								
									39245f8134
								
							
						
					
					
						commit
						e0a3055c2f
					
				
					 6 changed files with 100 additions and 109 deletions
				
			
		| 
						 | 
				
			
			@ -13,33 +13,25 @@ type Cancellation interface {
 | 
			
		|||
	Error() error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var CancellationFinalized = errors.New("finalizer called")
 | 
			
		||||
var CancellationTimeoutError = errors.New("timeout")
 | 
			
		||||
 | 
			
		||||
func CancellationFinalizer(c Cancellation) {
 | 
			
		||||
	c.Cancel(errors.New("finalizer called"))
 | 
			
		||||
	c.Cancel(CancellationFinalized)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type cancellation struct {
 | 
			
		||||
	signal chan error
 | 
			
		||||
	cancel chan struct{}
 | 
			
		||||
	errMtx sync.RWMutex
 | 
			
		||||
	mutex  sync.RWMutex
 | 
			
		||||
	err    error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *cancellation) worker() {
 | 
			
		||||
	// Launch this in a separate goroutine when creating a cancellation
 | 
			
		||||
	err := <-c.signal
 | 
			
		||||
	c.errMtx.Lock()
 | 
			
		||||
	c.err = err
 | 
			
		||||
	c.errMtx.Unlock()
 | 
			
		||||
	close(c.cancel)
 | 
			
		||||
	done   bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewCancellation() Cancellation {
 | 
			
		||||
	c := cancellation{
 | 
			
		||||
		signal: make(chan error),
 | 
			
		||||
		cancel: make(chan struct{}),
 | 
			
		||||
	}
 | 
			
		||||
	runtime.SetFinalizer(&c, CancellationFinalizer)
 | 
			
		||||
	go c.worker()
 | 
			
		||||
	return &c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -48,18 +40,22 @@ func (c *cancellation) Finished() <-chan struct{} {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (c *cancellation) Cancel(err error) error {
 | 
			
		||||
	select {
 | 
			
		||||
	case c.signal <- err:
 | 
			
		||||
	c.mutex.Lock()
 | 
			
		||||
	defer c.mutex.Unlock()
 | 
			
		||||
	if c.done {
 | 
			
		||||
		return c.err
 | 
			
		||||
	} else {
 | 
			
		||||
		c.err = err
 | 
			
		||||
		c.done = true
 | 
			
		||||
		close(c.cancel)
 | 
			
		||||
		return nil
 | 
			
		||||
	case <-c.cancel:
 | 
			
		||||
		return c.Error()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *cancellation) Error() error {
 | 
			
		||||
	c.errMtx.RLock()
 | 
			
		||||
	c.mutex.RLock()
 | 
			
		||||
	err := c.err
 | 
			
		||||
	c.errMtx.RUnlock()
 | 
			
		||||
	c.mutex.RUnlock()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -75,8 +71,6 @@ func CancellationChild(parent Cancellation) Cancellation {
 | 
			
		|||
	return child
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var CancellationTimeoutError = errors.New("timeout")
 | 
			
		||||
 | 
			
		||||
func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation {
 | 
			
		||||
	child := CancellationChild(parent)
 | 
			
		||||
	go func() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,6 +3,7 @@ package util
 | 
			
		|||
// These are misc. utility functions that didn't really fit anywhere else
 | 
			
		||||
 | 
			
		||||
import "runtime"
 | 
			
		||||
import "sync"
 | 
			
		||||
import "time"
 | 
			
		||||
 | 
			
		||||
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
 | 
			
		||||
| 
						 | 
				
			
			@ -21,29 +22,27 @@ func UnlockThread() {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
 | 
			
		||||
// It's used like a sync.Pool, but with a fixed size and typechecked without type casts to/from interface{} (which were making the profiles look ugly).
 | 
			
		||||
var byteStore chan []byte
 | 
			
		||||
var byteStoreMutex sync.Mutex
 | 
			
		||||
var byteStore [][]byte
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	byteStore = make(chan []byte, 32)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Gets an empty slice from the byte store, if one is available, or else returns a new nil slice.
 | 
			
		||||
// Gets an empty slice from the byte store.
 | 
			
		||||
func GetBytes() []byte {
 | 
			
		||||
	select {
 | 
			
		||||
	case bs := <-byteStore:
 | 
			
		||||
		return bs[:0]
 | 
			
		||||
	default:
 | 
			
		||||
	byteStoreMutex.Lock()
 | 
			
		||||
	defer byteStoreMutex.Unlock()
 | 
			
		||||
	if len(byteStore) > 0 {
 | 
			
		||||
		var bs []byte
 | 
			
		||||
		bs, byteStore = byteStore[len(byteStore)-1][:0], byteStore[:len(byteStore)-1]
 | 
			
		||||
		return bs
 | 
			
		||||
	} else {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Puts a slice in the store, if there's room, or else returns and lets the slice get collected.
 | 
			
		||||
// Puts a slice in the store.
 | 
			
		||||
func PutBytes(bs []byte) {
 | 
			
		||||
	select {
 | 
			
		||||
	case byteStore <- bs:
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
	byteStoreMutex.Lock()
 | 
			
		||||
	defer byteStoreMutex.Unlock()
 | 
			
		||||
	byteStore = append(byteStore, bs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This is a workaround to go's broken timer implementation
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -230,7 +230,7 @@ func (c *Core) GetSessions() []Session {
 | 
			
		|||
						skip = true
 | 
			
		||||
					}
 | 
			
		||||
				}()
 | 
			
		||||
				sinfo.doWorker(workerFunc)
 | 
			
		||||
				sinfo.doFunc(workerFunc)
 | 
			
		||||
			}()
 | 
			
		||||
			if skip {
 | 
			
		||||
				continue
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -145,9 +145,9 @@ func (c *Conn) Read(b []byte) (int, error) {
 | 
			
		|||
			}
 | 
			
		||||
			defer util.PutBytes(p.Payload)
 | 
			
		||||
			var err error
 | 
			
		||||
			done := make(chan struct{})
 | 
			
		||||
			//done := make(chan struct{})
 | 
			
		||||
			workerFunc := func() {
 | 
			
		||||
				defer close(done)
 | 
			
		||||
				//defer close(done)
 | 
			
		||||
				// If the nonce is bad then drop the packet and return an error
 | 
			
		||||
				if !sinfo.nonceIsOK(&p.Nonce) {
 | 
			
		||||
					err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
 | 
			
		||||
| 
						 | 
				
			
			@ -167,33 +167,36 @@ func (c *Conn) Read(b []byte) (int, error) {
 | 
			
		|||
				sinfo.time = time.Now()
 | 
			
		||||
				sinfo.bytesRecvd += uint64(len(bs))
 | 
			
		||||
			}
 | 
			
		||||
			// Hand over to the session worker
 | 
			
		||||
			defer func() {
 | 
			
		||||
				if recover() != nil {
 | 
			
		||||
					err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0}
 | 
			
		||||
					close(done)
 | 
			
		||||
			sinfo.doFunc(workerFunc)
 | 
			
		||||
			/*
 | 
			
		||||
				// Hand over to the session worker
 | 
			
		||||
				defer func() {
 | 
			
		||||
					if recover() != nil {
 | 
			
		||||
						err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0}
 | 
			
		||||
						close(done)
 | 
			
		||||
					}
 | 
			
		||||
				}() // In case we're racing with a close
 | 
			
		||||
				// Send to worker
 | 
			
		||||
				select {
 | 
			
		||||
				case sinfo.worker <- workerFunc:
 | 
			
		||||
				case <-cancel.Finished():
 | 
			
		||||
					if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
						return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
			
		||||
					} else {
 | 
			
		||||
						return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}() // In case we're racing with a close
 | 
			
		||||
			// Send to worker
 | 
			
		||||
			select {
 | 
			
		||||
			case sinfo.worker <- workerFunc:
 | 
			
		||||
			case <-cancel.Finished():
 | 
			
		||||
				if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
					return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
			
		||||
				} else {
 | 
			
		||||
					return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
				// Wait for the worker to finish
 | 
			
		||||
				select {
 | 
			
		||||
				case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
 | 
			
		||||
				case <-cancel.Finished():
 | 
			
		||||
					if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
						return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
			
		||||
					} else {
 | 
			
		||||
						return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			// Wait for the worker to finish
 | 
			
		||||
			select {
 | 
			
		||||
			case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
 | 
			
		||||
			case <-cancel.Finished():
 | 
			
		||||
				if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
					return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
			
		||||
				} else {
 | 
			
		||||
					return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			*/
 | 
			
		||||
			// Something went wrong in the session worker so abort
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if ce, ok := err.(*ConnError); ok && ce.Temporary() {
 | 
			
		||||
| 
						 | 
				
			
			@ -214,10 +217,10 @@ func (c *Conn) Read(b []byte) (int, error) {
 | 
			
		|||
func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
 | 
			
		||||
	sinfo := c.session
 | 
			
		||||
	var packet []byte
 | 
			
		||||
	done := make(chan struct{})
 | 
			
		||||
	//done := make(chan struct{})
 | 
			
		||||
	written := len(b)
 | 
			
		||||
	workerFunc := func() {
 | 
			
		||||
		defer close(done)
 | 
			
		||||
		//defer close(done)
 | 
			
		||||
		// Does the packet exceed the permitted size for the session?
 | 
			
		||||
		if uint16(len(b)) > sinfo.getMTU() {
 | 
			
		||||
			written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())}
 | 
			
		||||
| 
						 | 
				
			
			@ -264,27 +267,30 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
 | 
			
		|||
		default: // Don't do anything, to keep traffic throttled
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Set up a timer so this doesn't block forever
 | 
			
		||||
	cancel := c.getDeadlineCancellation(&c.writeDeadline)
 | 
			
		||||
	defer cancel.Cancel(nil)
 | 
			
		||||
	// Hand over to the session worker
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if recover() != nil {
 | 
			
		||||
			err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0}
 | 
			
		||||
			close(done)
 | 
			
		||||
	sinfo.doFunc(workerFunc)
 | 
			
		||||
	/*
 | 
			
		||||
		// Set up a timer so this doesn't block forever
 | 
			
		||||
		cancel := c.getDeadlineCancellation(&c.writeDeadline)
 | 
			
		||||
		defer cancel.Cancel(nil)
 | 
			
		||||
		// Hand over to the session worker
 | 
			
		||||
		defer func() {
 | 
			
		||||
			if recover() != nil {
 | 
			
		||||
				err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0}
 | 
			
		||||
				close(done)
 | 
			
		||||
			}
 | 
			
		||||
		}() // In case we're racing with a close
 | 
			
		||||
		select { // Send to worker
 | 
			
		||||
		case sinfo.worker <- workerFunc:
 | 
			
		||||
		case <-cancel.Finished():
 | 
			
		||||
			if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
				return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
 | 
			
		||||
			} else {
 | 
			
		||||
				return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}() // In case we're racing with a close
 | 
			
		||||
	select { // Send to worker
 | 
			
		||||
	case sinfo.worker <- workerFunc:
 | 
			
		||||
	case <-cancel.Finished():
 | 
			
		||||
		if cancel.Error() == util.CancellationTimeoutError {
 | 
			
		||||
			return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
 | 
			
		||||
		} else {
 | 
			
		||||
			return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
 | 
			
		||||
	<-done
 | 
			
		||||
		// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
 | 
			
		||||
		<-done
 | 
			
		||||
	*/
 | 
			
		||||
	// Give the packet to the router
 | 
			
		||||
	if written > 0 {
 | 
			
		||||
		sinfo.core.router.out(packet)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,6 +16,7 @@ import (
 | 
			
		|||
// All the information we know about an active session.
 | 
			
		||||
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
 | 
			
		||||
type sessionInfo struct {
 | 
			
		||||
	mutex          sync.Mutex               // Protects all of the below, use it any time you read/chance the contents of a session
 | 
			
		||||
	core           *Core                    //
 | 
			
		||||
	reconfigure    chan chan error          //
 | 
			
		||||
	theirAddr      address.Address          //
 | 
			
		||||
| 
						 | 
				
			
			@ -43,24 +44,14 @@ type sessionInfo struct {
 | 
			
		|||
	tstamp         int64                    // ATOMIC - tstamp from their last session ping, replay attack mitigation
 | 
			
		||||
	bytesSent      uint64                   // Bytes of real traffic sent in this session
 | 
			
		||||
	bytesRecvd     uint64                   // Bytes of real traffic received in this session
 | 
			
		||||
	worker         chan func()              // Channel to send work to the session worker
 | 
			
		||||
	recv           chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
 | 
			
		||||
	init           chan struct{}            // Closed when the first session pong arrives, used to signal that the session is ready for initial use
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sinfo *sessionInfo) doWorker(f func()) {
 | 
			
		||||
	done := make(chan struct{})
 | 
			
		||||
	sinfo.worker <- func() {
 | 
			
		||||
		f()
 | 
			
		||||
		close(done)
 | 
			
		||||
	}
 | 
			
		||||
	<-done
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sinfo *sessionInfo) workerMain() {
 | 
			
		||||
	for f := range sinfo.worker {
 | 
			
		||||
		f()
 | 
			
		||||
	}
 | 
			
		||||
func (sinfo *sessionInfo) doFunc(f func()) {
 | 
			
		||||
	sinfo.mutex.Lock()
 | 
			
		||||
	defer sinfo.mutex.Unlock()
 | 
			
		||||
	f()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
 | 
			
		||||
| 
						 | 
				
			
			@ -231,11 +222,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
 | 
			
		|||
	sinfo.myHandle = *crypto.NewHandle()
 | 
			
		||||
	sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
 | 
			
		||||
	sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
 | 
			
		||||
	sinfo.worker = make(chan func(), 1)
 | 
			
		||||
	sinfo.recv = make(chan *wire_trafficPacket, 32)
 | 
			
		||||
	ss.sinfos[sinfo.myHandle] = &sinfo
 | 
			
		||||
	ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
 | 
			
		||||
	go sinfo.workerMain()
 | 
			
		||||
	return &sinfo
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -267,14 +256,12 @@ func (ss *sessions) cleanup() {
 | 
			
		|||
	ss.lastCleanup = time.Now()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Closes a session, removing it from sessions maps and killing the worker goroutine.
 | 
			
		||||
// Closes a session, removing it from sessions maps.
 | 
			
		||||
func (sinfo *sessionInfo) close() {
 | 
			
		||||
	if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo {
 | 
			
		||||
		delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
 | 
			
		||||
		delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
 | 
			
		||||
	}
 | 
			
		||||
	defer func() { recover() }()
 | 
			
		||||
	close(sinfo.worker)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Returns a session ping appropriate for the given session info.
 | 
			
		||||
| 
						 | 
				
			
			@ -372,7 +359,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
 | 
			
		|||
		}
 | 
			
		||||
		ss.listenerMutex.Unlock()
 | 
			
		||||
	}
 | 
			
		||||
	sinfo.doWorker(func() {
 | 
			
		||||
	sinfo.doFunc(func() {
 | 
			
		||||
		// Update the session
 | 
			
		||||
		if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
 | 
			
		||||
			return
 | 
			
		||||
| 
						 | 
				
			
			@ -426,7 +413,7 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
 | 
			
		|||
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
 | 
			
		||||
func (ss *sessions) reset() {
 | 
			
		||||
	for _, sinfo := range ss.sinfos {
 | 
			
		||||
		sinfo.doWorker(func() {
 | 
			
		||||
		sinfo.doFunc(func() {
 | 
			
		||||
			sinfo.reset = true
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue