mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	Add getSwitchQueues
This commit is contained in:
		
							parent
							
								
									4666b8f6cd
								
							
						
					
					
						commit
						f57567ea56
					
				
					 3 changed files with 79 additions and 17 deletions
				
			
		| 
						 | 
				
			
			@ -90,6 +90,10 @@ func (a *admin) init(c *Core, listenaddr string) {
 | 
			
		|||
		}
 | 
			
		||||
		return admin_info{"switchpeers": switchpeers}, nil
 | 
			
		||||
	})
 | 
			
		||||
	a.addHandler("getSwitchQueues", []string{}, func(in admin_info) (admin_info, error) {
 | 
			
		||||
		queues := a.getData_getSwitchQueues()
 | 
			
		||||
		return admin_info{"switchqueues": queues.asMap()}, nil
 | 
			
		||||
	})
 | 
			
		||||
	a.addHandler("getDHT", []string{}, func(in admin_info) (admin_info, error) {
 | 
			
		||||
		sort := "ip"
 | 
			
		||||
		dht := make(admin_info)
 | 
			
		||||
| 
						 | 
				
			
			@ -510,6 +514,22 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
 | 
			
		|||
	return peerInfos
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getData_getSwitchPeers returns info from Core.switchTable for an admin response.
 | 
			
		||||
func (a *admin) getData_getSwitchQueues() admin_nodeInfo {
 | 
			
		||||
	var peerInfos admin_nodeInfo
 | 
			
		||||
	switchTable := a.core.switchTable
 | 
			
		||||
	getSwitchQueues := func() {
 | 
			
		||||
		peerInfos = admin_nodeInfo{
 | 
			
		||||
			{"queues_count", len(switchTable.queues.bufs)},
 | 
			
		||||
			{"queues_size", switchTable.queues.size},
 | 
			
		||||
			{"max_queues_count", switchTable.queues.maxbufs},
 | 
			
		||||
			{"max_queues_size", switchTable.queues.maxsize},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	a.core.switchTable.doAdmin(getSwitchQueues)
 | 
			
		||||
	return peerInfos
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getData_getDHT returns info from Core.dht for an admin response.
 | 
			
		||||
func (a *admin) getData_getDHT() []admin_nodeInfo {
 | 
			
		||||
	var infos []admin_nodeInfo
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -166,6 +166,8 @@ type switchTable struct {
 | 
			
		|||
	table    atomic.Value    //lookupTable
 | 
			
		||||
	packetIn chan []byte     // Incoming packets for the worker to handle
 | 
			
		||||
	idleIn   chan switchPort // Incoming idle notifications from peer links
 | 
			
		||||
	admin    chan func()     // pass a lambda for the admin socket to query stuff
 | 
			
		||||
	queues   switch_buffers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Initializes the switchTable struct.
 | 
			
		||||
| 
						 | 
				
			
			@ -181,6 +183,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
 | 
			
		|||
	t.drop = make(map[sigPubKey]int64)
 | 
			
		||||
	t.packetIn = make(chan []byte, 1024)
 | 
			
		||||
	t.idleIn = make(chan switchPort, 1024)
 | 
			
		||||
	t.admin = make(chan func())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Safely gets a copy of this node's locator.
 | 
			
		||||
| 
						 | 
				
			
			@ -589,8 +592,10 @@ type switch_buffer struct {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
type switch_buffers struct {
 | 
			
		||||
	bufs map[string]switch_buffer // Buffers indexed by StreamID
 | 
			
		||||
	size uint64                   // Total size of all buffers, in bytes
 | 
			
		||||
	bufs    map[string]switch_buffer // Buffers indexed by StreamID
 | 
			
		||||
	size    uint64                   // Total size of all buffers, in bytes
 | 
			
		||||
	maxbufs int
 | 
			
		||||
	maxsize uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *switch_buffers) cleanup(t *switchTable) {
 | 
			
		||||
| 
						 | 
				
			
			@ -635,16 +640,16 @@ func (b *switch_buffers) cleanup(t *switchTable) {
 | 
			
		|||
// Handles incoming idle notifications
 | 
			
		||||
// Loops over packets and sends the newest one that's OK for this peer to send
 | 
			
		||||
// Returns true if the peer is no longer idle, false if it should be added to the idle list
 | 
			
		||||
func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool {
 | 
			
		||||
func (t *switchTable) handleIdle(port switchPort) bool {
 | 
			
		||||
	to := t.core.peers.getPorts()[port]
 | 
			
		||||
	if to == nil {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	var best string
 | 
			
		||||
	var bestPriority float64
 | 
			
		||||
	bufs.cleanup(t)
 | 
			
		||||
	t.queues.cleanup(t)
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	for streamID, buf := range bufs.bufs {
 | 
			
		||||
	for streamID, buf := range t.queues.bufs {
 | 
			
		||||
		// Filter over the streams that this node is closer to
 | 
			
		||||
		// Keep the one with the smallest queue
 | 
			
		||||
		packet := buf.packets[0]
 | 
			
		||||
| 
						 | 
				
			
			@ -656,17 +661,17 @@ func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool {
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if bestPriority != 0 {
 | 
			
		||||
		buf := bufs.bufs[best]
 | 
			
		||||
		buf := t.queues.bufs[best]
 | 
			
		||||
		var packet switch_packetInfo
 | 
			
		||||
		// TODO decide if this should be LIFO or FIFO
 | 
			
		||||
		packet, buf.packets = buf.packets[0], buf.packets[1:]
 | 
			
		||||
		buf.size -= uint64(len(packet.bytes))
 | 
			
		||||
		bufs.size -= uint64(len(packet.bytes))
 | 
			
		||||
		t.queues.size -= uint64(len(packet.bytes))
 | 
			
		||||
		if len(buf.packets) == 0 {
 | 
			
		||||
			delete(bufs.bufs, best)
 | 
			
		||||
			delete(t.queues.bufs, best)
 | 
			
		||||
		} else {
 | 
			
		||||
			// Need to update the map, since buf was retrieved by value
 | 
			
		||||
			bufs.bufs[best] = buf
 | 
			
		||||
			t.queues.bufs[best] = buf
 | 
			
		||||
		}
 | 
			
		||||
		to.sendPacket(packet.bytes)
 | 
			
		||||
		return true
 | 
			
		||||
| 
						 | 
				
			
			@ -677,9 +682,8 @@ func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool {
 | 
			
		|||
 | 
			
		||||
// The switch worker does routing lookups and sends packets to where they need to be
 | 
			
		||||
func (t *switchTable) doWorker() {
 | 
			
		||||
	var bufs switch_buffers
 | 
			
		||||
	bufs.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
 | 
			
		||||
	idle := make(map[switchPort]struct{})      // this is to deduplicate things
 | 
			
		||||
	t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
 | 
			
		||||
	idle := make(map[switchPort]struct{})          // this is to deduplicate things
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case bytes := <-t.packetIn:
 | 
			
		||||
| 
						 | 
				
			
			@ -688,19 +692,43 @@ func (t *switchTable) doWorker() {
 | 
			
		|||
				// There's nobody free to take it right now, so queue it for later
 | 
			
		||||
				packet := switch_packetInfo{bytes, time.Now()}
 | 
			
		||||
				streamID := switch_getPacketStreamID(packet.bytes)
 | 
			
		||||
				buf := bufs.bufs[streamID]
 | 
			
		||||
				buf, bufExists := t.queues.bufs[streamID]
 | 
			
		||||
				buf.packets = append(buf.packets, packet)
 | 
			
		||||
				buf.size += uint64(len(packet.bytes))
 | 
			
		||||
				bufs.size += uint64(len(packet.bytes))
 | 
			
		||||
				bufs.bufs[streamID] = buf
 | 
			
		||||
				bufs.cleanup(t)
 | 
			
		||||
				t.queues.size += uint64(len(packet.bytes))
 | 
			
		||||
				if t.queues.size > t.queues.maxsize {
 | 
			
		||||
					t.queues.maxsize = t.queues.size
 | 
			
		||||
				}
 | 
			
		||||
				if !bufExists {
 | 
			
		||||
					if len(t.queues.bufs) > t.queues.maxbufs {
 | 
			
		||||
						t.queues.maxbufs = len(t.queues.bufs)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				t.queues.bufs[streamID] = buf
 | 
			
		||||
				t.queues.cleanup(t)
 | 
			
		||||
			}
 | 
			
		||||
		case port := <-t.idleIn:
 | 
			
		||||
			// Try to find something to send to this peer
 | 
			
		||||
			if !t.handleIdle(port, &bufs) {
 | 
			
		||||
			if !t.handleIdle(port) {
 | 
			
		||||
				// Didn't find anything ready to send yet, so stay idle
 | 
			
		||||
				idle[port] = struct{}{}
 | 
			
		||||
			}
 | 
			
		||||
		case f := <-t.admin:
 | 
			
		||||
			f()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Passed a function to call.
 | 
			
		||||
// This will send the function to t.admin and block until it finishes.
 | 
			
		||||
func (t *switchTable) doAdmin(f func()) {
 | 
			
		||||
	// Pass this a function that needs to be run by the router's main goroutine
 | 
			
		||||
	// It will pass the function to the router and wait for the router to finish
 | 
			
		||||
	done := make(chan struct{})
 | 
			
		||||
	newF := func() {
 | 
			
		||||
		f()
 | 
			
		||||
		close(done)
 | 
			
		||||
	}
 | 
			
		||||
	t.admin <- newF
 | 
			
		||||
	<-done
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue