mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	WIP have peer actors queue packets, temporarily a single simple FIFO queue with head drop
This commit is contained in:
		
							parent
							
								
									9834f222db
								
							
						
					
					
						commit
						945930aa2c
					
				
					 6 changed files with 91 additions and 338 deletions
				
			
		| 
						 | 
				
			
			@ -199,35 +199,6 @@ func (c *Core) GetDHT() []DHTEntry {
 | 
			
		|||
	return dhtentries
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetSwitchQueues returns information about the switch queues that are
 | 
			
		||||
// currently in effect. These values can change within an instant.
 | 
			
		||||
func (c *Core) GetSwitchQueues() SwitchQueues {
 | 
			
		||||
	var switchqueues SwitchQueues
 | 
			
		||||
	switchTable := &c.switchTable
 | 
			
		||||
	getSwitchQueues := func() {
 | 
			
		||||
		switchqueues = SwitchQueues{
 | 
			
		||||
			Count:        uint64(len(switchTable.queues.bufs)),
 | 
			
		||||
			Size:         switchTable.queues.size,
 | 
			
		||||
			HighestCount: uint64(switchTable.queues.maxbufs),
 | 
			
		||||
			HighestSize:  switchTable.queues.maxsize,
 | 
			
		||||
			MaximumSize:  switchTable.queues.totalMaxSize,
 | 
			
		||||
		}
 | 
			
		||||
		for port, pbuf := range switchTable.queues.bufs {
 | 
			
		||||
			for k, v := range pbuf {
 | 
			
		||||
				queue := SwitchQueue{
 | 
			
		||||
					ID:      k,
 | 
			
		||||
					Size:    v.size,
 | 
			
		||||
					Packets: uint64(len(v.packets)),
 | 
			
		||||
					Port:    uint64(port),
 | 
			
		||||
				}
 | 
			
		||||
				switchqueues.Queues = append(switchqueues.Queues, queue)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	phony.Block(&c.switchTable, getSwitchQueues)
 | 
			
		||||
	return switchqueues
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetSessions returns a list of open sessions from this node to other nodes.
 | 
			
		||||
func (c *Core) GetSessions() []Session {
 | 
			
		||||
	var sessions []Session
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -62,7 +62,7 @@ type linkInterface struct {
 | 
			
		|||
	keepAliveTimer *time.Timer // Fires to send keep-alive traffic
 | 
			
		||||
	stallTimer     *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
 | 
			
		||||
	closeTimer     *time.Timer // Fires when the link has been idle so long we need to close it
 | 
			
		||||
	inSwitch       bool        // True if the switch is tracking this link
 | 
			
		||||
	isIdle         bool        // True if the peer actor knows the link is idle
 | 
			
		||||
	stalled        bool        // True if we haven't been receiving any response traffic
 | 
			
		||||
	unstalled      bool        // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -278,7 +278,7 @@ const (
 | 
			
		|||
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
 | 
			
		||||
	intf.Act(&intf.writer, func() {
 | 
			
		||||
		if !isLinkTraffic {
 | 
			
		||||
			intf.inSwitch = false
 | 
			
		||||
			intf.isIdle = false
 | 
			
		||||
		}
 | 
			
		||||
		intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
 | 
			
		||||
		intf._cancelStallTimer()
 | 
			
		||||
| 
						 | 
				
			
			@ -311,7 +311,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
 | 
			
		|||
		intf.sendTimer.Stop()
 | 
			
		||||
		intf.sendTimer = nil
 | 
			
		||||
		if !isLinkTraffic {
 | 
			
		||||
			intf._notifySwitch()
 | 
			
		||||
			intf._notifyIdle()
 | 
			
		||||
		}
 | 
			
		||||
		if size > 0 && intf.stallTimer == nil {
 | 
			
		||||
			intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
 | 
			
		||||
| 
						 | 
				
			
			@ -320,15 +320,13 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
 | 
			
		||||
func (intf *linkInterface) _notifySwitch() {
 | 
			
		||||
	if !intf.inSwitch {
 | 
			
		||||
func (intf *linkInterface) _notifyIdle() {
 | 
			
		||||
	if !intf.isIdle {
 | 
			
		||||
		if intf.stalled {
 | 
			
		||||
			intf.unstalled = false
 | 
			
		||||
		} else {
 | 
			
		||||
			intf.inSwitch = true
 | 
			
		||||
			intf.link.core.switchTable.Act(intf, func() {
 | 
			
		||||
				intf.link.core.switchTable._idleIn(intf.peer.port)
 | 
			
		||||
			})
 | 
			
		||||
			intf.isIdle = true
 | 
			
		||||
			intf.peer.Act(intf, intf.peer._handleIdle)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -364,7 +362,7 @@ func (intf *linkInterface) notifyRead(size int) {
 | 
			
		|||
		}
 | 
			
		||||
		intf.stalled = false
 | 
			
		||||
		if !intf.unstalled {
 | 
			
		||||
			intf._notifySwitch()
 | 
			
		||||
			intf._notifyIdle()
 | 
			
		||||
			intf.unstalled = true
 | 
			
		||||
		}
 | 
			
		||||
		if size > 0 && intf.stallTimer == nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										39
									
								
								src/yggdrasil/packetqueue.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								src/yggdrasil/packetqueue.go
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,39 @@
 | 
			
		|||
package yggdrasil
 | 
			
		||||
 | 
			
		||||
import "github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
			
		||||
 | 
			
		||||
// TODO take max size from config
 | 
			
		||||
const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB
 | 
			
		||||
 | 
			
		||||
// TODO separate queues per e.g. traffic flow
 | 
			
		||||
type packetQueue struct {
 | 
			
		||||
	packets [][]byte
 | 
			
		||||
	size    uint32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) cleanup() {
 | 
			
		||||
	for q.size > MAX_PACKET_QUEUE_SIZE {
 | 
			
		||||
		if packet, success := q.pop(); success {
 | 
			
		||||
			util.PutBytes(packet)
 | 
			
		||||
		} else {
 | 
			
		||||
			panic("attempted to drop packet from empty queue")
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) push(packet []byte) {
 | 
			
		||||
	q.packets = append(q.packets, packet)
 | 
			
		||||
	q.size += uint32(len(packet))
 | 
			
		||||
	q.cleanup()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *packetQueue) pop() ([]byte, bool) {
 | 
			
		||||
	if len(q.packets) > 0 {
 | 
			
		||||
		packet := q.packets[0]
 | 
			
		||||
		q.packets = q.packets[1:]
 | 
			
		||||
		q.size -= uint32(len(packet))
 | 
			
		||||
		return packet, true
 | 
			
		||||
	}
 | 
			
		||||
	return nil, false
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -100,6 +100,8 @@ type peer struct {
 | 
			
		|||
	bytesRecvd uint64
 | 
			
		||||
	ports      map[switchPort]*peer
 | 
			
		||||
	table      *lookupTable
 | 
			
		||||
	queue      packetQueue
 | 
			
		||||
	idle       bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
 | 
			
		||||
| 
						 | 
				
			
			@ -243,6 +245,13 @@ func (p *peer) _handlePacket(packet []byte) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get the coords of a packet without decoding
 | 
			
		||||
func peer_getPacketCoords(packet []byte) []byte {
 | 
			
		||||
	_, pTypeLen := wire_decode_uint64(packet)
 | 
			
		||||
	coords, _ := wire_decode_coords(packet[pTypeLen:])
 | 
			
		||||
	return coords
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called to handle traffic or protocolTraffic packets.
 | 
			
		||||
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
 | 
			
		||||
func (p *peer) _handleTraffic(packet []byte) {
 | 
			
		||||
| 
						 | 
				
			
			@ -250,7 +259,7 @@ func (p *peer) _handleTraffic(packet []byte) {
 | 
			
		|||
		// Drop traffic if the peer isn't in the switch
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	coords := switch_getPacketCoords(packet)
 | 
			
		||||
	coords := peer_getPacketCoords(packet)
 | 
			
		||||
	next := p.table.lookup(coords)
 | 
			
		||||
	if nPeer, isIn := p.ports[next]; isIn {
 | 
			
		||||
		nPeer.sendPacketsFrom(p, [][]byte{packet})
 | 
			
		||||
| 
						 | 
				
			
			@ -264,17 +273,33 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
 | 
			
		|||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This just calls p.out(packet) for now.
 | 
			
		||||
func (p *peer) _sendPackets(packets [][]byte) {
 | 
			
		||||
	// Is there ever a case where something more complicated is needed?
 | 
			
		||||
	// What if p.out blocks?
 | 
			
		||||
	var size int
 | 
			
		||||
	for _, packet := range packets {
 | 
			
		||||
		size += len(packet)
 | 
			
		||||
		p.queue.push(packet)
 | 
			
		||||
	}
 | 
			
		||||
	if p.idle {
 | 
			
		||||
		p.idle = false
 | 
			
		||||
		p._handleIdle()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *peer) _handleIdle() {
 | 
			
		||||
	var packets [][]byte
 | 
			
		||||
	var size uint64
 | 
			
		||||
	for size < 65535 {
 | 
			
		||||
		if packet, success := p.queue.pop(); success {
 | 
			
		||||
			packets = append(packets, packet)
 | 
			
		||||
			size += uint64(len(packet))
 | 
			
		||||
		} else {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(packets) > 0 {
 | 
			
		||||
		p.bytesSent += uint64(size)
 | 
			
		||||
		p.out(packets)
 | 
			
		||||
	} else {
 | 
			
		||||
		p.idle = true
 | 
			
		||||
	}
 | 
			
		||||
	p.bytesSent += uint64(size)
 | 
			
		||||
	// FIXME need to manage queues here or else things can block!
 | 
			
		||||
	p.out(packets)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -67,7 +67,14 @@ func (r *router) init(core *Core) {
 | 
			
		|||
		// FIXME don't block here!
 | 
			
		||||
		p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
 | 
			
		||||
	})
 | 
			
		||||
	p.out = func(packets [][]byte) { r.handlePackets(p, packets) }
 | 
			
		||||
	p.out = func(packets [][]byte) {
 | 
			
		||||
		r.handlePackets(p, packets)
 | 
			
		||||
		r.Act(p, func() {
 | 
			
		||||
			// after the router handle the packets, notify the peer that it's ready for more
 | 
			
		||||
			p.Act(r, p._handleIdle)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	p.Act(r, p._handleIdle)
 | 
			
		||||
	r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
 | 
			
		||||
	r.nodeinfo.init(r.core)
 | 
			
		||||
	r.core.config.Mutex.RLock()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -164,13 +164,11 @@ type switchData struct {
 | 
			
		|||
type switchTable struct {
 | 
			
		||||
	core        *Core
 | 
			
		||||
	key         crypto.SigPubKey           // Our own key
 | 
			
		||||
	phony.Inbox                            // Owns the below
 | 
			
		||||
	time        time.Time                  // Time when locator.tstamp was last updated
 | 
			
		||||
	drop        map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
 | 
			
		||||
	parent      switchPort                 // Port of whatever peer is our parent, or self if we're root
 | 
			
		||||
	data        switchData                 //
 | 
			
		||||
	phony.Inbox                            // Owns the below
 | 
			
		||||
	queues      switch_buffers             // Queues - not atomic so ONLY use through the actor
 | 
			
		||||
	idle        map[switchPort]struct{}    // idle peers - not atomic so ONLY use through the actor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Minimum allowed total size of switch queues.
 | 
			
		||||
| 
						 | 
				
			
			@ -185,18 +183,7 @@ func (t *switchTable) init(core *Core) {
 | 
			
		|||
	peers := make(map[switchPort]peerInfo)
 | 
			
		||||
	t.data = switchData{locator: locator, peers: peers}
 | 
			
		||||
	t.drop = make(map[crypto.SigPubKey]int64)
 | 
			
		||||
	phony.Block(t, func() {
 | 
			
		||||
		core.config.Mutex.RLock()
 | 
			
		||||
		if core.config.Current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize {
 | 
			
		||||
			t.queues.totalMaxSize = core.config.Current.SwitchOptions.MaxTotalQueueSize
 | 
			
		||||
		} else {
 | 
			
		||||
			t.queues.totalMaxSize = SwitchQueueTotalMinSize
 | 
			
		||||
		}
 | 
			
		||||
		core.config.Mutex.RUnlock()
 | 
			
		||||
		t.queues.bufs = make(map[switchPort]map[string]switch_buffer)
 | 
			
		||||
		t.idle = make(map[switchPort]struct{})
 | 
			
		||||
	})
 | 
			
		||||
	t._updateTable()
 | 
			
		||||
	phony.Block(t, t._updateTable)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *switchTable) reconfigure() {
 | 
			
		||||
| 
						 | 
				
			
			@ -557,73 +544,6 @@ func (t *switchTable) start() error {
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type closerInfo struct {
 | 
			
		||||
	elem tableElem
 | 
			
		||||
	dist int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Return a map of ports onto distance, keeping only ports closer to the destination than this node
 | 
			
		||||
// If the map is empty (or nil), then no peer is closer
 | 
			
		||||
/*
 | 
			
		||||
func (t *switchTable) getCloser(dest []byte) []closerInfo {
 | 
			
		||||
	table := t.getTable()
 | 
			
		||||
	myDist := table.self.dist(dest)
 | 
			
		||||
	if myDist == 0 {
 | 
			
		||||
		// Skip the iteration step if it's impossible to be closer
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	var closer []closerInfo
 | 
			
		||||
	for _, info := range table.elems {
 | 
			
		||||
		dist := info.locator.dist(dest)
 | 
			
		||||
		if dist < myDist {
 | 
			
		||||
			closer = append(closer, closerInfo{info, dist})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return closer
 | 
			
		||||
}
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// Returns true if the peer is closer to the destination than ourself
 | 
			
		||||
/*
 | 
			
		||||
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
 | 
			
		||||
	table := t.getTable()
 | 
			
		||||
	if info, isIn := table.elems[port]; isIn {
 | 
			
		||||
		theirDist := info.locator.dist(dest)
 | 
			
		||||
		myDist := table.self.dist(dest)
 | 
			
		||||
		return theirDist < myDist
 | 
			
		||||
	} else {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// Get the coords of a packet without decoding
 | 
			
		||||
func switch_getPacketCoords(packet []byte) []byte {
 | 
			
		||||
	_, pTypeLen := wire_decode_uint64(packet)
 | 
			
		||||
	coords, _ := wire_decode_coords(packet[pTypeLen:])
 | 
			
		||||
	return coords
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Returns a unique string for each stream of traffic
 | 
			
		||||
// Equal to coords
 | 
			
		||||
// The sender may append arbitrary info to the end of coords (as long as it's begins with a 0x00) to designate separate traffic streams
 | 
			
		||||
// Currently, it's the IPv6 next header type and the first 2 uint16 of the next header
 | 
			
		||||
// This is equivalent to the TCP/UDP protocol numbers and the source / dest ports
 | 
			
		||||
// TODO figure out if something else would make more sense (other transport protocols?)
 | 
			
		||||
func switch_getPacketStreamID(packet []byte) string {
 | 
			
		||||
	return string(switch_getPacketCoords(packet))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Returns the flowlabel from a given set of coords
 | 
			
		||||
func switch_getFlowLabelFromCoords(in []byte) []byte {
 | 
			
		||||
	for i, v := range in {
 | 
			
		||||
		if v == 0 {
 | 
			
		||||
			return in[i+1:]
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return []byte{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Find the best port to forward to for a given set of coords
 | 
			
		||||
func (t *lookupTable) lookup(coords []byte) switchPort {
 | 
			
		||||
	var bestPort switchPort
 | 
			
		||||
| 
						 | 
				
			
			@ -660,210 +580,3 @@ func (t *lookupTable) lookup(coords []byte) switchPort {
 | 
			
		|||
	}
 | 
			
		||||
	return bestPort
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Handle an incoming packet
 | 
			
		||||
// Either send it to ourself, or to the first idle peer that's free
 | 
			
		||||
// Returns true if the packet has been handled somehow, false if it should be queued
 | 
			
		||||
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) {
 | 
			
		||||
	/*
 | 
			
		||||
		coords := switch_getPacketCoords(packet)
 | 
			
		||||
		table := t.getTable()
 | 
			
		||||
		port := table.lookup(coords)
 | 
			
		||||
		ports := t.core.peers.getPorts()
 | 
			
		||||
		peer := ports[port]
 | 
			
		||||
		if peer == nil {
 | 
			
		||||
			// FIXME hack, if the peer disappeared durring a race then don't buffer
 | 
			
		||||
			return true, 0
 | 
			
		||||
		}
 | 
			
		||||
		if _, isIdle := idle[port]; isIdle || port == 0 {
 | 
			
		||||
			// Either no closer peers, or the closest peer is idle
 | 
			
		||||
			delete(idle, port)
 | 
			
		||||
			peer.sendPacketsFrom(t, [][]byte{packet})
 | 
			
		||||
			return true, port
 | 
			
		||||
		}
 | 
			
		||||
		// There's a closer peer, but it's not idle, so buffer it
 | 
			
		||||
		return false, port
 | 
			
		||||
	*/
 | 
			
		||||
	return true, 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Info about a buffered packet
 | 
			
		||||
type switch_packetInfo struct {
 | 
			
		||||
	bytes []byte
 | 
			
		||||
	time  time.Time // Timestamp of when the packet arrived
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used to keep track of buffered packets
 | 
			
		||||
type switch_buffer struct {
 | 
			
		||||
	packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
 | 
			
		||||
	size    uint64              // Total queue size in bytes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type switch_buffers struct {
 | 
			
		||||
	totalMaxSize uint64
 | 
			
		||||
	bufs         map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID
 | 
			
		||||
	size         uint64                                  // Total size of all buffers, in bytes
 | 
			
		||||
	maxbufs      int
 | 
			
		||||
	maxsize      uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *switch_buffers) _cleanup(t *switchTable) {
 | 
			
		||||
	/*
 | 
			
		||||
		for port, pbufs := range b.bufs {
 | 
			
		||||
			for streamID, buf := range pbufs {
 | 
			
		||||
				// Remove queues for which we have no next hop
 | 
			
		||||
				packet := buf.packets[0]
 | 
			
		||||
				coords := switch_getPacketCoords(packet.bytes)
 | 
			
		||||
				if len(t.getCloser(coords)) == 0 {
 | 
			
		||||
					for _, packet := range buf.packets {
 | 
			
		||||
						util.PutBytes(packet.bytes)
 | 
			
		||||
					}
 | 
			
		||||
					b.size -= buf.size
 | 
			
		||||
					delete(pbufs, streamID)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if len(pbufs) == 0 {
 | 
			
		||||
				delete(b.bufs, port)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for b.size > b.totalMaxSize {
 | 
			
		||||
			// Drop a random queue
 | 
			
		||||
			target := rand.Uint64() % b.size
 | 
			
		||||
			var size uint64 // running total
 | 
			
		||||
			for port, pbufs := range b.bufs {
 | 
			
		||||
				for streamID, buf := range pbufs {
 | 
			
		||||
					size += buf.size
 | 
			
		||||
					if size < target {
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					var packet switch_packetInfo
 | 
			
		||||
					packet, buf.packets = buf.packets[0], buf.packets[1:]
 | 
			
		||||
					buf.size -= uint64(len(packet.bytes))
 | 
			
		||||
					b.size -= uint64(len(packet.bytes))
 | 
			
		||||
					util.PutBytes(packet.bytes)
 | 
			
		||||
					if len(buf.packets) == 0 {
 | 
			
		||||
						delete(pbufs, streamID)
 | 
			
		||||
						if len(pbufs) == 0 {
 | 
			
		||||
							delete(b.bufs, port)
 | 
			
		||||
						}
 | 
			
		||||
					} else {
 | 
			
		||||
						// Need to update the map, since buf was retrieved by value
 | 
			
		||||
						pbufs[streamID] = buf
 | 
			
		||||
					}
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	*/
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Handles incoming idle notifications
 | 
			
		||||
// Loops over packets and sends the newest one that's OK for this peer to send
 | 
			
		||||
// Returns true if the peer is no longer idle, false if it should be added to the idle list
 | 
			
		||||
func (t *switchTable) _handleIdle(port switchPort) bool {
 | 
			
		||||
	// TODO? only send packets for which this is the best next hop that isn't currently blocked sending
 | 
			
		||||
	/*
 | 
			
		||||
		to := t.core.peers.getPorts()[port]
 | 
			
		||||
		if to == nil {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
		var packets [][]byte
 | 
			
		||||
		var psize int
 | 
			
		||||
		t.queues._cleanup(t)
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		pbufs := t.queues.bufs[port]
 | 
			
		||||
		for psize < 65535 {
 | 
			
		||||
			var best *string
 | 
			
		||||
			var bestPriority float64
 | 
			
		||||
			for streamID, buf := range pbufs {
 | 
			
		||||
				// Filter over the streams that this node is closer to
 | 
			
		||||
				// Keep the one with the smallest queue
 | 
			
		||||
				packet := buf.packets[0]
 | 
			
		||||
				priority := float64(now.Sub(packet.time)) / float64(buf.size)
 | 
			
		||||
				if priority >= bestPriority {
 | 
			
		||||
					b := streamID // copy since streamID is mutated in the loop
 | 
			
		||||
					best = &b
 | 
			
		||||
					bestPriority = priority
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if best != nil {
 | 
			
		||||
				buf := pbufs[*best]
 | 
			
		||||
				var packet switch_packetInfo
 | 
			
		||||
				// TODO decide if this should be LIFO or FIFO
 | 
			
		||||
				packet, buf.packets = buf.packets[0], buf.packets[1:]
 | 
			
		||||
				buf.size -= uint64(len(packet.bytes))
 | 
			
		||||
				t.queues.size -= uint64(len(packet.bytes))
 | 
			
		||||
				if len(buf.packets) == 0 {
 | 
			
		||||
					delete(pbufs, *best)
 | 
			
		||||
					if len(pbufs) == 0 {
 | 
			
		||||
						delete(t.queues.bufs, port)
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					// Need to update the map, since buf was retrieved by value
 | 
			
		||||
					pbufs[*best] = buf
 | 
			
		||||
 | 
			
		||||
				}
 | 
			
		||||
				packets = append(packets, packet.bytes)
 | 
			
		||||
				psize += len(packet.bytes)
 | 
			
		||||
			} else {
 | 
			
		||||
				// Finished finding packets
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if len(packets) > 0 {
 | 
			
		||||
			to.sendPacketsFrom(t, packets)
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
		return false
 | 
			
		||||
	*/
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
 | 
			
		||||
	t.Act(from, func() {
 | 
			
		||||
		t._packetIn(bytes)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *switchTable) _packetIn(bytes []byte) {
 | 
			
		||||
	// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
 | 
			
		||||
	if sent, best := t._handleIn(bytes, t.idle); !sent {
 | 
			
		||||
		// There's nobody free to take it right now, so queue it for later
 | 
			
		||||
		packet := switch_packetInfo{bytes, time.Now()}
 | 
			
		||||
		streamID := switch_getPacketStreamID(packet.bytes)
 | 
			
		||||
		if _, isIn := t.queues.bufs[best]; !isIn {
 | 
			
		||||
			t.queues.bufs[best] = make(map[string]switch_buffer)
 | 
			
		||||
		}
 | 
			
		||||
		buf, bufExists := t.queues.bufs[best][streamID]
 | 
			
		||||
		buf.packets = append(buf.packets, packet)
 | 
			
		||||
		buf.size += uint64(len(packet.bytes))
 | 
			
		||||
		t.queues.size += uint64(len(packet.bytes))
 | 
			
		||||
		// Keep a track of the max total queue size
 | 
			
		||||
		if t.queues.size > t.queues.maxsize {
 | 
			
		||||
			t.queues.maxsize = t.queues.size
 | 
			
		||||
		}
 | 
			
		||||
		t.queues.bufs[best][streamID] = buf
 | 
			
		||||
		if !bufExists {
 | 
			
		||||
			// Keep a track of the max total queue count. Only recalculate this
 | 
			
		||||
			// when the queue is new because otherwise repeating len(dict) might
 | 
			
		||||
			// cause unnecessary processing overhead
 | 
			
		||||
			var count int
 | 
			
		||||
			for _, pbufs := range t.queues.bufs {
 | 
			
		||||
				count += len(pbufs)
 | 
			
		||||
			}
 | 
			
		||||
			if count > t.queues.maxbufs {
 | 
			
		||||
				t.queues.maxbufs = count
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		t.queues._cleanup(t)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *switchTable) _idleIn(port switchPort) {
 | 
			
		||||
	// Try to find something to send to this peer
 | 
			
		||||
	if !t._handleIdle(port) {
 | 
			
		||||
		// Didn't find anything ready to send yet, so stay idle
 | 
			
		||||
		t.idle[port] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue