mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	
						commit
						a1856258a9
					
				
					 1 changed files with 73 additions and 67 deletions
				
			
		| 
						 | 
					@ -1,10 +1,13 @@
 | 
				
			||||||
package yggdrasil
 | 
					package yggdrasil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"math/rand"
 | 
						"container/heap"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TODO separate queues per e.g. traffic flow
 | 
				
			||||||
 | 
					//  For now, we put everything in queue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type pqStreamID string
 | 
					type pqStreamID string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type pqPacketInfo struct {
 | 
					type pqPacketInfo struct {
 | 
				
			||||||
| 
						 | 
					@ -13,13 +16,13 @@ type pqPacketInfo struct {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type pqStream struct {
 | 
					type pqStream struct {
 | 
				
			||||||
 | 
						id    pqStreamID
 | 
				
			||||||
	infos []pqPacketInfo
 | 
						infos []pqPacketInfo
 | 
				
			||||||
	size  uint64
 | 
						size  uint64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO separate queues per e.g. traffic flow
 | 
					 | 
				
			||||||
type packetQueue struct {
 | 
					type packetQueue struct {
 | 
				
			||||||
	streams map[pqStreamID]pqStream
 | 
						streams []pqStream
 | 
				
			||||||
	size    uint64
 | 
						size    uint64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -29,84 +32,87 @@ func (q *packetQueue) drop() bool {
 | 
				
			||||||
	if q.size == 0 {
 | 
						if q.size == 0 {
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// select a random stream, odds based on stream size
 | 
						var longestIdx int
 | 
				
			||||||
	offset := rand.Uint64() % q.size
 | 
						for idx := range q.streams {
 | 
				
			||||||
	var worst pqStreamID
 | 
							if q.streams[idx].size > q.streams[longestIdx].size {
 | 
				
			||||||
	var size uint64
 | 
								longestIdx = idx
 | 
				
			||||||
	for id, stream := range q.streams {
 | 
					 | 
				
			||||||
		worst = id
 | 
					 | 
				
			||||||
		size += stream.size
 | 
					 | 
				
			||||||
		if size >= offset {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// drop the oldest packet from the stream
 | 
						stream := q.streams[longestIdx]
 | 
				
			||||||
	worstStream := q.streams[worst]
 | 
						info := stream.infos[0]
 | 
				
			||||||
	packet := worstStream.infos[0].packet
 | 
						if len(stream.infos) > 1 {
 | 
				
			||||||
	worstStream.infos = worstStream.infos[1:]
 | 
							stream.infos = stream.infos[1:]
 | 
				
			||||||
	worstStream.size -= uint64(len(packet))
 | 
							stream.size -= uint64(len(info.packet))
 | 
				
			||||||
	q.size -= uint64(len(packet))
 | 
							q.streams[longestIdx] = stream
 | 
				
			||||||
	pool_putBytes(packet)
 | 
							q.size -= uint64(len(info.packet))
 | 
				
			||||||
	// save the modified stream to queues
 | 
							heap.Fix(q, longestIdx)
 | 
				
			||||||
	if len(worstStream.infos) > 0 {
 | 
					 | 
				
			||||||
		q.streams[worst] = worstStream
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		delete(q.streams, worst)
 | 
							heap.Remove(q, longestIdx)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						pool_putBytes(info.packet)
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (q *packetQueue) push(packet []byte) {
 | 
					func (q *packetQueue) push(packet []byte) {
 | 
				
			||||||
	if q.streams == nil {
 | 
					 | 
				
			||||||
		q.streams = make(map[pqStreamID]pqStream)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// get stream
 | 
					 | 
				
			||||||
	id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
 | 
						id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
 | 
				
			||||||
	stream := q.streams[id]
 | 
						info := pqPacketInfo{packet: packet, time: time.Now()}
 | 
				
			||||||
	// update stream
 | 
						for idx := range q.streams {
 | 
				
			||||||
	stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()})
 | 
							if q.streams[idx].id == id {
 | 
				
			||||||
	stream.size += uint64(len(packet))
 | 
								q.streams[idx].infos = append(q.streams[idx].infos, info)
 | 
				
			||||||
	// save update to queues
 | 
								q.streams[idx].size += uint64(len(packet))
 | 
				
			||||||
	q.streams[id] = stream
 | 
					 | 
				
			||||||
			q.size += uint64(len(packet))
 | 
								q.size += uint64(len(packet))
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						stream := pqStream{id: id, size: uint64(len(packet))}
 | 
				
			||||||
 | 
						stream.infos = append(stream.infos, info)
 | 
				
			||||||
 | 
						heap.Push(q, stream)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (q *packetQueue) pop() ([]byte, bool) {
 | 
					func (q *packetQueue) pop() ([]byte, bool) {
 | 
				
			||||||
	if len(q.streams) > 0 {
 | 
						if q.size > 0 {
 | 
				
			||||||
		// get the stream that uses the least bandwidth
 | 
							stream := q.streams[0]
 | 
				
			||||||
		now := time.Now()
 | 
							info := stream.infos[0]
 | 
				
			||||||
		var best pqStreamID
 | 
							if len(stream.infos) > 1 {
 | 
				
			||||||
		for id := range q.streams {
 | 
								stream.infos = stream.infos[1:]
 | 
				
			||||||
			best = id
 | 
								stream.size -= uint64(len(info.packet))
 | 
				
			||||||
			break // get a random ID to start
 | 
								q.streams[0] = stream
 | 
				
			||||||
		}
 | 
								q.size -= uint64(len(info.packet))
 | 
				
			||||||
		bestStream := q.streams[best]
 | 
								heap.Fix(q, 0)
 | 
				
			||||||
		bestSize := float64(bestStream.size)
 | 
					 | 
				
			||||||
		bestAge := now.Sub(bestStream.infos[0].time).Seconds()
 | 
					 | 
				
			||||||
		for id, stream := range q.streams {
 | 
					 | 
				
			||||||
			thisSize := float64(stream.size)
 | 
					 | 
				
			||||||
			thisAge := now.Sub(stream.infos[0].time).Seconds()
 | 
					 | 
				
			||||||
			// cross multiply to avoid division by zero issues
 | 
					 | 
				
			||||||
			if bestSize*thisAge > thisSize*bestAge {
 | 
					 | 
				
			||||||
				// bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth
 | 
					 | 
				
			||||||
				best = id
 | 
					 | 
				
			||||||
				bestStream = stream
 | 
					 | 
				
			||||||
				bestSize = thisSize
 | 
					 | 
				
			||||||
				bestAge = thisAge
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// get the oldest packet from the best stream
 | 
					 | 
				
			||||||
		packet := bestStream.infos[0].packet
 | 
					 | 
				
			||||||
		bestStream.infos = bestStream.infos[1:]
 | 
					 | 
				
			||||||
		bestStream.size -= uint64(len(packet))
 | 
					 | 
				
			||||||
		q.size -= uint64(len(packet))
 | 
					 | 
				
			||||||
		// save the modified stream to queues
 | 
					 | 
				
			||||||
		if len(bestStream.infos) > 0 {
 | 
					 | 
				
			||||||
			q.streams[best] = bestStream
 | 
					 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			delete(q.streams, best)
 | 
								heap.Remove(q, 0)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return packet, true
 | 
							return info.packet, true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil, false
 | 
						return nil, false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					////////////////////////////////////////////////////////////////////////////////
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Interface methods for packetQueue to satisfy heap.Interface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Len() int {
 | 
				
			||||||
 | 
						return len(q.streams)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Less(i, j int) bool {
 | 
				
			||||||
 | 
						return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Swap(i, j int) {
 | 
				
			||||||
 | 
						q.streams[i], q.streams[j] = q.streams[j], q.streams[i]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Push(x interface{}) {
 | 
				
			||||||
 | 
						stream := x.(pqStream)
 | 
				
			||||||
 | 
						q.streams = append(q.streams, stream)
 | 
				
			||||||
 | 
						q.size += stream.size
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Pop() interface{} {
 | 
				
			||||||
 | 
						idx := len(q.streams) - 1
 | 
				
			||||||
 | 
						stream := q.streams[idx]
 | 
				
			||||||
 | 
						q.streams = q.streams[:idx]
 | 
				
			||||||
 | 
						q.size -= stream.size
 | 
				
			||||||
 | 
						return stream
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue