mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	have Conn use Cancellation instead of manually setting up timers
This commit is contained in:
		
							parent
							
								
									6bf182e341
								
							
						
					
					
						commit
						cf3ebe04a7
					
				
					 2 changed files with 48 additions and 37 deletions
				
			
		| 
						 | 
					@ -75,6 +75,8 @@ func CancellationChild(parent Cancellation) Cancellation {
 | 
				
			||||||
	return child
 | 
						return child
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var CancellationTimeoutError = errors.New("timeout")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation {
 | 
					func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation {
 | 
				
			||||||
	child := CancellationChild(parent)
 | 
						child := CancellationChild(parent)
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
| 
						 | 
					@ -83,7 +85,7 @@ func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancell
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-child.Finished():
 | 
							case <-child.Finished():
 | 
				
			||||||
		case <-timer.C:
 | 
							case <-timer.C:
 | 
				
			||||||
			child.Cancel(errors.New("timeout"))
 | 
								child.Cancel(CancellationTimeoutError)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	return child
 | 
						return child
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -46,13 +46,13 @@ func (e *ConnError) Closed() bool {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Conn struct {
 | 
					type Conn struct {
 | 
				
			||||||
	core          *Core
 | 
						core          *Core
 | 
				
			||||||
	nodeID        *crypto.NodeID
 | 
					 | 
				
			||||||
	nodeMask      *crypto.NodeID
 | 
					 | 
				
			||||||
	mutex         sync.RWMutex
 | 
					 | 
				
			||||||
	close         chan bool
 | 
					 | 
				
			||||||
	session       *sessionInfo
 | 
					 | 
				
			||||||
	readDeadline  atomic.Value // time.Time // TODO timer
 | 
						readDeadline  atomic.Value // time.Time // TODO timer
 | 
				
			||||||
	writeDeadline atomic.Value // time.Time // TODO timer
 | 
						writeDeadline atomic.Value // time.Time // TODO timer
 | 
				
			||||||
 | 
						cancel        util.Cancellation
 | 
				
			||||||
 | 
						mutex         sync.RWMutex // protects the below
 | 
				
			||||||
 | 
						nodeID        *crypto.NodeID
 | 
				
			||||||
 | 
						nodeMask      *crypto.NodeID
 | 
				
			||||||
 | 
						session       *sessionInfo
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO func NewConn() that initializes additional fields as needed
 | 
					// TODO func NewConn() that initializes additional fields as needed
 | 
				
			||||||
| 
						 | 
					@ -62,12 +62,14 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
 | 
				
			||||||
		nodeID:   nodeID,
 | 
							nodeID:   nodeID,
 | 
				
			||||||
		nodeMask: nodeMask,
 | 
							nodeMask: nodeMask,
 | 
				
			||||||
		session:  session,
 | 
							session:  session,
 | 
				
			||||||
		close:    make(chan bool),
 | 
							cancel:   util.NewCancellation(),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &conn
 | 
						return &conn
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Conn) String() string {
 | 
					func (c *Conn) String() string {
 | 
				
			||||||
 | 
						c.mutex.RLock()
 | 
				
			||||||
 | 
						defer c.mutex.RUnlock()
 | 
				
			||||||
	return fmt.Sprintf("conn=%p", c)
 | 
						return fmt.Sprintf("conn=%p", c)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -111,28 +113,31 @@ func (c *Conn) search() error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getDeadlineTimer(value *atomic.Value) *time.Timer {
 | 
					func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation {
 | 
				
			||||||
	timer := time.NewTimer(24 * 365 * time.Hour) // FIXME for some reason setting this to 0 doesn't always let it stop and drain the channel correctly
 | 
					 | 
				
			||||||
	util.TimerStop(timer)
 | 
					 | 
				
			||||||
	if deadline, ok := value.Load().(time.Time); ok {
 | 
						if deadline, ok := value.Load().(time.Time); ok {
 | 
				
			||||||
		timer.Reset(time.Until(deadline))
 | 
							// A deadline is set, so return a Cancellation that uses it
 | 
				
			||||||
 | 
							return util.CancellationWithDeadline(c.cancel, deadline)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							// No cancellation was set, so return a child cancellation with no timeout
 | 
				
			||||||
 | 
							return util.CancellationChild(c.cancel)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return timer
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Conn) Read(b []byte) (int, error) {
 | 
					func (c *Conn) Read(b []byte) (int, error) {
 | 
				
			||||||
	// Take a copy of the session object
 | 
						// Take a copy of the session object
 | 
				
			||||||
	sinfo := c.session
 | 
						sinfo := c.session
 | 
				
			||||||
	timer := getDeadlineTimer(&c.readDeadline)
 | 
						cancel := c.getDeadlineCancellation(&c.readDeadline)
 | 
				
			||||||
	defer util.TimerStop(timer)
 | 
						defer cancel.Cancel(nil)
 | 
				
			||||||
	var bs []byte
 | 
						var bs []byte
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		// Wait for some traffic to come through from the session
 | 
							// Wait for some traffic to come through from the session
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-c.close:
 | 
							case <-cancel.Finished():
 | 
				
			||||||
			return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
								if cancel.Error() == util.CancellationTimeoutError {
 | 
				
			||||||
		case <-timer.C:
 | 
									return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
				
			||||||
			return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
								} else {
 | 
				
			||||||
 | 
									return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		case p, ok := <-sinfo.recv:
 | 
							case p, ok := <-sinfo.recv:
 | 
				
			||||||
			// If the session is closed then do nothing
 | 
								// If the session is closed then do nothing
 | 
				
			||||||
			if !ok {
 | 
								if !ok {
 | 
				
			||||||
| 
						 | 
					@ -172,18 +177,22 @@ func (c *Conn) Read(b []byte) (int, error) {
 | 
				
			||||||
			// Send to worker
 | 
								// Send to worker
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case sinfo.worker <- workerFunc:
 | 
								case sinfo.worker <- workerFunc:
 | 
				
			||||||
			case <-c.close:
 | 
								case <-cancel.Finished():
 | 
				
			||||||
				return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
									if cancel.Error() == util.CancellationTimeoutError {
 | 
				
			||||||
			case <-timer.C:
 | 
										return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
				
			||||||
				return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
									} else {
 | 
				
			||||||
 | 
										return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// Wait for the worker to finish
 | 
								// Wait for the worker to finish
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
 | 
								case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
 | 
				
			||||||
			case <-c.close:
 | 
								case <-cancel.Finished():
 | 
				
			||||||
				return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
									if cancel.Error() == util.CancellationTimeoutError {
 | 
				
			||||||
			case <-timer.C:
 | 
										return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
				
			||||||
				return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
 | 
									} else {
 | 
				
			||||||
 | 
										return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// Something went wrong in the session worker so abort
 | 
								// Something went wrong in the session worker so abort
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -256,8 +265,8 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Set up a timer so this doesn't block forever
 | 
						// Set up a timer so this doesn't block forever
 | 
				
			||||||
	timer := getDeadlineTimer(&c.writeDeadline)
 | 
						cancel := c.getDeadlineCancellation(&c.writeDeadline)
 | 
				
			||||||
	defer util.TimerStop(timer)
 | 
						defer cancel.Cancel(nil)
 | 
				
			||||||
	// Hand over to the session worker
 | 
						// Hand over to the session worker
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
		if recover() != nil {
 | 
							if recover() != nil {
 | 
				
			||||||
| 
						 | 
					@ -267,8 +276,12 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
 | 
				
			||||||
	}() // In case we're racing with a close
 | 
						}() // In case we're racing with a close
 | 
				
			||||||
	select { // Send to worker
 | 
						select { // Send to worker
 | 
				
			||||||
	case sinfo.worker <- workerFunc:
 | 
						case sinfo.worker <- workerFunc:
 | 
				
			||||||
	case <-timer.C:
 | 
						case <-cancel.Finished():
 | 
				
			||||||
		return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
 | 
							if cancel.Error() == util.CancellationTimeoutError {
 | 
				
			||||||
 | 
								return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								return 0, ConnError{errors.New("session closed"), false, false, true, 0}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
 | 
						// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
 | 
				
			||||||
	<-done
 | 
						<-done
 | 
				
			||||||
| 
						 | 
					@ -287,13 +300,9 @@ func (c *Conn) Close() (err error) {
 | 
				
			||||||
		// Close the session, if it hasn't been closed already
 | 
							// Close the session, if it hasn't been closed already
 | 
				
			||||||
		c.core.router.doAdmin(c.session.close)
 | 
							c.core.router.doAdmin(c.session.close)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	func() {
 | 
						if e := c.cancel.Cancel(errors.New("connection closed")); e != nil {
 | 
				
			||||||
		defer func() {
 | 
							err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
 | 
				
			||||||
			recover()
 | 
						}
 | 
				
			||||||
			err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
 | 
					 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
		close(c.close) // Closes reader/writer goroutines
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue