mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	work-in-progress heap-based queue structure
This commit is contained in:
		
							parent
							
								
									761ae531cb
								
							
						
					
					
						commit
						1f65ffb310
					
				
					 1 changed files with 70 additions and 19 deletions
				
			
		| 
						 | 
					@ -1,15 +1,13 @@
 | 
				
			||||||
package yggdrasil
 | 
					package yggdrasil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"math/rand"
 | 
						"container/heap"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
*/
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO separate queues per e.g. traffic flow
 | 
					// TODO separate queues per e.g. traffic flow
 | 
				
			||||||
//  For now, we put everything in queue
 | 
					//  For now, we put everything in queue
 | 
				
			||||||
/*
 | 
					
 | 
				
			||||||
type pqStreamID string
 | 
					type pqStreamID string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type pqPacketInfo struct {
 | 
					type pqPacketInfo struct {
 | 
				
			||||||
| 
						 | 
					@ -18,15 +16,13 @@ type pqPacketInfo struct {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type pqStream struct {
 | 
					type pqStream struct {
 | 
				
			||||||
  id   string
 | 
						id    pqStreamID
 | 
				
			||||||
	infos []pqPacketInfo
 | 
						infos []pqPacketInfo
 | 
				
			||||||
	size  int
 | 
						size  uint64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
*/
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
type packetQueue struct {
 | 
					type packetQueue struct {
 | 
				
			||||||
	//streams []pqStream
 | 
						streams []pqStream
 | 
				
			||||||
	packets [][]byte
 | 
					 | 
				
			||||||
	size    uint64
 | 
						size    uint64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -36,24 +32,79 @@ func (q *packetQueue) drop() bool {
 | 
				
			||||||
	if q.size == 0 {
 | 
						if q.size == 0 {
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	packet := q.packets[0]
 | 
						var longestIdx int
 | 
				
			||||||
	q.packets = q.packets[1:]
 | 
						for idx := range q.streams {
 | 
				
			||||||
	q.size -= uint64(len(packet))
 | 
							if q.streams[idx].size > q.streams[longestIdx].size {
 | 
				
			||||||
	pool_putBytes(packet)
 | 
								longestIdx = idx
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						stream := heap.Remove(q, longestIdx).(pqStream)
 | 
				
			||||||
 | 
						info := stream.infos[0]
 | 
				
			||||||
 | 
						if len(stream.infos) > 1 {
 | 
				
			||||||
 | 
							stream.infos = stream.infos[1:]
 | 
				
			||||||
 | 
							stream.size -= uint64(len(info.packet))
 | 
				
			||||||
 | 
							heap.Push(q, stream)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pool_putBytes(info.packet)
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (q *packetQueue) push(packet []byte) {
 | 
					func (q *packetQueue) push(packet []byte) {
 | 
				
			||||||
	q.packets = append(q.packets, packet)
 | 
						id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
 | 
				
			||||||
 | 
						info := pqPacketInfo{packet: packet, time: time.Now()}
 | 
				
			||||||
 | 
						for idx := range q.streams {
 | 
				
			||||||
 | 
							if q.streams[idx].id == id {
 | 
				
			||||||
 | 
								q.streams[idx].infos = append(q.streams[idx].infos, info)
 | 
				
			||||||
 | 
								q.streams[idx].size += uint64(len(packet))
 | 
				
			||||||
			q.size += uint64(len(packet))
 | 
								q.size += uint64(len(packet))
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						stream := pqStream{id: id, size: uint64(len(packet))}
 | 
				
			||||||
 | 
						stream.infos = append(stream.infos, info)
 | 
				
			||||||
 | 
						heap.Push(q, stream)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (q *packetQueue) pop() ([]byte, bool) {
 | 
					func (q *packetQueue) pop() ([]byte, bool) {
 | 
				
			||||||
	if q.size > 0 {
 | 
						if q.size > 0 {
 | 
				
			||||||
		packet := q.packets[0]
 | 
							stream := heap.Pop(q).(pqStream)
 | 
				
			||||||
		q.packets = q.packets[1:]
 | 
							info := stream.infos[0]
 | 
				
			||||||
		q.size -= uint64(len(packet))
 | 
							if len(stream.infos) > 1 {
 | 
				
			||||||
		return packet, true
 | 
								stream.infos = stream.infos[1:]
 | 
				
			||||||
 | 
								stream.size -= uint64(len(info.packet))
 | 
				
			||||||
 | 
								heap.Push(q, stream)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return info.packet, true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil, false
 | 
						return nil, false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					////////////////////////////////////////////////////////////////////////////////
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Interface methods for packetQueue to satisfy heap.Interface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Len() int {
 | 
				
			||||||
 | 
						return len(q.streams)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Less(i, j int) bool {
 | 
				
			||||||
 | 
						return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Swap(i, j int) {
 | 
				
			||||||
 | 
						q.streams[i], q.streams[j] = q.streams[j], q.streams[i]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Push(x interface{}) {
 | 
				
			||||||
 | 
						stream := x.(pqStream)
 | 
				
			||||||
 | 
						q.streams = append(q.streams, stream)
 | 
				
			||||||
 | 
						q.size += stream.size
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *packetQueue) Pop() interface{} {
 | 
				
			||||||
 | 
						idx := len(q.streams) - 1
 | 
				
			||||||
 | 
						stream := q.streams[idx]
 | 
				
			||||||
 | 
						q.streams = q.streams[:idx]
 | 
				
			||||||
 | 
						q.size -= stream.size
 | 
				
			||||||
 | 
						return stream
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue