mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	refactor session worker code slightly
This commit is contained in:
		
							parent
							
								
									7a9ad0c8cc
								
							
						
					
					
						commit
						befd1b43a0
					
				
					 1 changed files with 67 additions and 61 deletions
				
			
		| 
						 | 
				
			
			@ -442,50 +442,53 @@ func (sinfo *sessionInfo) recvWorker() {
 | 
			
		|||
	//  Since there's no reason for anywhere else in the session code to need to *read* it...
 | 
			
		||||
	//  Only needs to be updated from the outside if a ping resets it...
 | 
			
		||||
	//  That would get rid of the need to take a mutex for the sessionFunc
 | 
			
		||||
	doRecv := func(p *wire_trafficPacket) {
 | 
			
		||||
		var bs []byte
 | 
			
		||||
		var err error
 | 
			
		||||
		var k crypto.BoxSharedKey
 | 
			
		||||
		sessionFunc := func() {
 | 
			
		||||
			if !sinfo.nonceIsOK(&p.Nonce) {
 | 
			
		||||
				err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			k = sinfo.sharedSesKey
 | 
			
		||||
		}
 | 
			
		||||
		sinfo.doFunc(sessionFunc)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			util.PutBytes(p.Payload)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		var isOK bool
 | 
			
		||||
		bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
 | 
			
		||||
		if !isOK {
 | 
			
		||||
			util.PutBytes(bs)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		sessionFunc = func() {
 | 
			
		||||
			if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) {
 | 
			
		||||
				// The session updated in the mean time, so return an error
 | 
			
		||||
				err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			sinfo.updateNonce(&p.Nonce)
 | 
			
		||||
			sinfo.time = time.Now()
 | 
			
		||||
			sinfo.bytesRecvd += uint64(len(bs))
 | 
			
		||||
		}
 | 
			
		||||
		sinfo.doFunc(sessionFunc)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			// Not sure what else to do with this packet, I guess just drop it
 | 
			
		||||
			util.PutBytes(bs)
 | 
			
		||||
		} else {
 | 
			
		||||
			// Pass the packet to the buffer for Conn.Read
 | 
			
		||||
			sinfo.recv <- bs
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-sinfo.cancel.Finished():
 | 
			
		||||
			return
 | 
			
		||||
		case p := <-sinfo.fromRouter:
 | 
			
		||||
			var bs []byte
 | 
			
		||||
			var err error
 | 
			
		||||
			var k crypto.BoxSharedKey
 | 
			
		||||
			sessionFunc := func() {
 | 
			
		||||
				if !sinfo.nonceIsOK(&p.Nonce) {
 | 
			
		||||
					err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				k = sinfo.sharedSesKey
 | 
			
		||||
			}
 | 
			
		||||
			sinfo.doFunc(sessionFunc)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				util.PutBytes(p.Payload)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			var isOK bool
 | 
			
		||||
			bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
 | 
			
		||||
			if !isOK {
 | 
			
		||||
				util.PutBytes(bs)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			sessionFunc = func() {
 | 
			
		||||
				if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) {
 | 
			
		||||
					// The session updated in the mean time, so return an error
 | 
			
		||||
					err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				sinfo.updateNonce(&p.Nonce)
 | 
			
		||||
				sinfo.time = time.Now()
 | 
			
		||||
				sinfo.bytesRecvd += uint64(len(bs))
 | 
			
		||||
			}
 | 
			
		||||
			sinfo.doFunc(sessionFunc)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				// Not sure what else to do with this packet, I guess just drop it
 | 
			
		||||
				util.PutBytes(bs)
 | 
			
		||||
			} else {
 | 
			
		||||
				// Pass the packet to the buffer for Conn.Read
 | 
			
		||||
				sinfo.recv <- bs
 | 
			
		||||
			}
 | 
			
		||||
			doRecv(p)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -493,33 +496,36 @@ func (sinfo *sessionInfo) recvWorker() {
 | 
			
		|||
func (sinfo *sessionInfo) sendWorker() {
 | 
			
		||||
	// TODO move info that this worker needs here, send updates via a channel
 | 
			
		||||
	//  Otherwise we need to take a mutex to avoid races with update()
 | 
			
		||||
	doSend := func(bs []byte) {
 | 
			
		||||
		var p wire_trafficPacket
 | 
			
		||||
		var k crypto.BoxSharedKey
 | 
			
		||||
		sessionFunc := func() {
 | 
			
		||||
			sinfo.bytesSent += uint64(len(bs))
 | 
			
		||||
			p = wire_trafficPacket{
 | 
			
		||||
				Coords: append([]byte(nil), sinfo.coords...),
 | 
			
		||||
				Handle: sinfo.theirHandle,
 | 
			
		||||
				Nonce:  sinfo.myNonce,
 | 
			
		||||
			}
 | 
			
		||||
			sinfo.myNonce.Increment()
 | 
			
		||||
			k = sinfo.sharedSesKey
 | 
			
		||||
		}
 | 
			
		||||
		// Get the mutex-protected info needed to encrypt the packet
 | 
			
		||||
		sinfo.doFunc(sessionFunc)
 | 
			
		||||
		// Encrypt the packet
 | 
			
		||||
		p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce)
 | 
			
		||||
		packet := p.encode()
 | 
			
		||||
		// Cleanup
 | 
			
		||||
		util.PutBytes(bs)
 | 
			
		||||
		util.PutBytes(p.Payload)
 | 
			
		||||
		// Send the packet
 | 
			
		||||
		sinfo.core.router.out(packet)
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-sinfo.cancel.Finished():
 | 
			
		||||
			return
 | 
			
		||||
		case bs := <-sinfo.send:
 | 
			
		||||
			var p wire_trafficPacket
 | 
			
		||||
			var k crypto.BoxSharedKey
 | 
			
		||||
			sessionFunc := func() {
 | 
			
		||||
				sinfo.bytesSent += uint64(len(bs))
 | 
			
		||||
				p = wire_trafficPacket{
 | 
			
		||||
					Coords: append([]byte(nil), sinfo.coords...),
 | 
			
		||||
					Handle: sinfo.theirHandle,
 | 
			
		||||
					Nonce:  sinfo.myNonce,
 | 
			
		||||
				}
 | 
			
		||||
				sinfo.myNonce.Increment()
 | 
			
		||||
				k = sinfo.sharedSesKey
 | 
			
		||||
			}
 | 
			
		||||
			// Get the mutex-protected info needed to encrypt the packet
 | 
			
		||||
			sinfo.doFunc(sessionFunc)
 | 
			
		||||
			// Encrypt the packet
 | 
			
		||||
			p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce)
 | 
			
		||||
			packet := p.encode()
 | 
			
		||||
			// Cleanup
 | 
			
		||||
			util.PutBytes(bs)
 | 
			
		||||
			util.PutBytes(p.Payload)
 | 
			
		||||
			// Send the packet
 | 
			
		||||
			sinfo.core.router.out(packet)
 | 
			
		||||
			doSend(bs)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue