mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	fix bug in switch actor's cleanRoot, strict nonce handling at the session level, and add separate queues per stream to the packetqueue code
This commit is contained in:
		
							parent
							
								
									03a19997b8
								
							
						
					
					
						commit
						09efdfef9a
					
				
					 3 changed files with 105 additions and 26 deletions
				
			
		| 
						 | 
				
			
			@ -1,38 +1,125 @@
 | 
			
		|||
package yggdrasil
 | 
			
		||||
 | 
			
		||||
import "github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TODO take max size from config
 | 
			
		||||
const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB
 | 
			
		||||
const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB
 | 
			
		||||
 | 
			
		||||
type pqStreamID string
 | 
			
		||||
 | 
			
		||||
type pqPacketInfo struct {
 | 
			
		||||
	packet []byte
 | 
			
		||||
	time   time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type pqStream struct {
 | 
			
		||||
	infos []pqPacketInfo
 | 
			
		||||
	size  uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO separate queues per e.g. traffic flow
 | 
			
		||||
type packetQueue struct {
 | 
			
		||||
	packets [][]byte
 | 
			
		||||
	size    uint32
 | 
			
		||||
	streams map[pqStreamID]pqStream
 | 
			
		||||
	size    uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) cleanup() {
 | 
			
		||||
	for q.size > MAX_PACKET_QUEUE_SIZE {
 | 
			
		||||
		if packet, success := q.pop(); success {
 | 
			
		||||
		// TODO? drop from a random stream
 | 
			
		||||
		//  odds proportional to size? bandwidth?
 | 
			
		||||
		//  always using the worst is exploitable -> flood 1 packet per random stream
 | 
			
		||||
		// find the stream that's using the most bandwidth
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		var worst pqStreamID
 | 
			
		||||
		for id := range q.streams {
 | 
			
		||||
			worst = id
 | 
			
		||||
			break // get a random ID to start
 | 
			
		||||
		}
 | 
			
		||||
		worstStream := q.streams[worst]
 | 
			
		||||
		worstSize := float64(worstStream.size)
 | 
			
		||||
		worstAge := now.Sub(worstStream.infos[0].time).Seconds()
 | 
			
		||||
		for id, stream := range q.streams {
 | 
			
		||||
			thisSize := float64(stream.size)
 | 
			
		||||
			thisAge := now.Sub(stream.infos[0].time).Seconds()
 | 
			
		||||
			// cross multiply to avoid division by zero issues
 | 
			
		||||
			if worstSize*thisAge < thisSize*worstAge {
 | 
			
		||||
				// worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth
 | 
			
		||||
				worst = id
 | 
			
		||||
				worstStream = stream
 | 
			
		||||
				worstSize = thisSize
 | 
			
		||||
				worstAge = thisAge
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// Drop the oldest packet from the worst stream
 | 
			
		||||
		packet := worstStream.infos[0].packet
 | 
			
		||||
		worstStream.infos = worstStream.infos[1:]
 | 
			
		||||
		worstStream.size -= uint64(len(packet))
 | 
			
		||||
		q.size -= uint64(len(packet))
 | 
			
		||||
		util.PutBytes(packet)
 | 
			
		||||
		// save the modified stream to queues
 | 
			
		||||
		if len(worstStream.infos) > 0 {
 | 
			
		||||
			q.streams[worst] = worstStream
 | 
			
		||||
		} else {
 | 
			
		||||
			panic("attempted to drop packet from empty queue")
 | 
			
		||||
			break
 | 
			
		||||
			delete(q.streams, worst)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) push(packet []byte) {
 | 
			
		||||
	q.packets = append(q.packets, packet)
 | 
			
		||||
	q.size += uint32(len(packet))
 | 
			
		||||
	if q.streams == nil {
 | 
			
		||||
		q.streams = make(map[pqStreamID]pqStream)
 | 
			
		||||
	}
 | 
			
		||||
	// get stream
 | 
			
		||||
	id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
 | 
			
		||||
	stream := q.streams[id]
 | 
			
		||||
	// update stream
 | 
			
		||||
	stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()})
 | 
			
		||||
	stream.size += uint64(len(packet))
 | 
			
		||||
	// save update to queues
 | 
			
		||||
	q.streams[id] = stream
 | 
			
		||||
	q.size += uint64(len(packet))
 | 
			
		||||
	q.cleanup()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) pop() ([]byte, bool) {
 | 
			
		||||
	if len(q.packets) > 0 {
 | 
			
		||||
		packet := q.packets[0]
 | 
			
		||||
		q.packets = q.packets[1:]
 | 
			
		||||
		q.size -= uint32(len(packet))
 | 
			
		||||
	if len(q.streams) > 0 {
 | 
			
		||||
		// get the stream that uses the least bandwidth
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		var best pqStreamID
 | 
			
		||||
		for id := range q.streams {
 | 
			
		||||
			best = id
 | 
			
		||||
			break // get a random ID to start
 | 
			
		||||
		}
 | 
			
		||||
		bestStream := q.streams[best]
 | 
			
		||||
		bestSize := float64(bestStream.size)
 | 
			
		||||
		bestAge := now.Sub(bestStream.infos[0].time).Seconds()
 | 
			
		||||
		for id, stream := range q.streams {
 | 
			
		||||
			thisSize := float64(stream.size)
 | 
			
		||||
			thisAge := now.Sub(stream.infos[0].time).Seconds()
 | 
			
		||||
			// cross multiply to avoid division by zero issues
 | 
			
		||||
			if bestSize*thisAge > thisSize*bestAge {
 | 
			
		||||
				// bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth
 | 
			
		||||
				best = id
 | 
			
		||||
				bestStream = stream
 | 
			
		||||
				bestSize = thisSize
 | 
			
		||||
				bestAge = thisAge
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// get the oldest packet from the best stream
 | 
			
		||||
		packet := bestStream.infos[0].packet
 | 
			
		||||
		bestStream.infos = bestStream.infos[1:]
 | 
			
		||||
		bestStream.size -= uint64(len(packet))
 | 
			
		||||
		q.size -= uint64(len(packet))
 | 
			
		||||
		// save the modified stream to queues
 | 
			
		||||
		if len(bestStream.infos) > 0 {
 | 
			
		||||
			q.streams[best] = bestStream
 | 
			
		||||
		} else {
 | 
			
		||||
			delete(q.streams, best)
 | 
			
		||||
		}
 | 
			
		||||
		return packet, true
 | 
			
		||||
	}
 | 
			
		||||
	return nil, false
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,9 +16,6 @@ import (
 | 
			
		|||
	"github.com/Arceliar/phony"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
 | 
			
		||||
const nonceWindow = time.Second
 | 
			
		||||
 | 
			
		||||
// All the information we know about an active session.
 | 
			
		||||
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
 | 
			
		||||
type sessionInfo struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -394,14 +391,9 @@ func (sinfo *sessionInfo) _getMTU() MTU {
 | 
			
		|||
	return sinfo.myMTU
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received.
 | 
			
		||||
// Checks if a packet's nonce is newer than any previously received
 | 
			
		||||
func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool {
 | 
			
		||||
	// The bitmask is to allow for some non-duplicate out-of-order packets
 | 
			
		||||
	if theirNonce.Minus(&sinfo.theirNonce) > 0 {
 | 
			
		||||
		// This is newer than the newest nonce we've seen
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return time.Since(sinfo.time) < nonceWindow
 | 
			
		||||
	return theirNonce.Minus(&sinfo.theirNonce) > 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -227,10 +227,10 @@ func (t *switchTable) _cleanRoot() {
 | 
			
		|||
		t.time = now
 | 
			
		||||
		if t.data.locator.root != t.key {
 | 
			
		||||
			t.data.seq++
 | 
			
		||||
			defer t._updateTable()
 | 
			
		||||
			t.core.router.reset(nil)
 | 
			
		||||
			defer t.core.router.reset(nil)
 | 
			
		||||
		}
 | 
			
		||||
		t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
 | 
			
		||||
		t._updateTable() // updates base copy of switch msg in lookupTable
 | 
			
		||||
		t.core.peers.sendSwitchMsgs(t)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue