Add SwitchOptions and MaxTotalQueueSize

This commit is contained in:
Neil Alexander 2018-12-02 22:49:27 +00:00
parent 05b07adba2
commit 86da073226
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 34 additions and 22 deletions

View file

@ -156,19 +156,20 @@ type switchData struct {
// All the information stored by the switch.
type switchTable struct {
core *Core
key sigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData //
updater atomic.Value // *sync.Once
table atomic.Value // lookupTable
packetIn chan []byte // Incoming packets for the worker to handle
idleIn chan switchPort // Incoming idle notifications from peer links
admin chan func() // Pass a lambda for the admin socket to query stuff
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
core *Core
key sigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData //
updater atomic.Value // *sync.Once
table atomic.Value // lookupTable
packetIn chan []byte // Incoming packets for the worker to handle
idleIn chan switchPort // Incoming idle notifications from peer links
admin chan func() // Pass a lambda for the admin socket to query stuff
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
queuetotalmaxsize uint64 // Maximum combined size of queues
}
// Initializes the switchTable struct.
@ -620,8 +621,6 @@ type switch_packetInfo struct {
time time.Time // Timestamp of when the packet arrived
}
const switch_buffer_maxSize = 4 * 1048576 // Maximum 4 MB
// Used to keep track of buffered packets
type switch_buffer struct {
packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
@ -629,10 +628,11 @@ type switch_buffer struct {
}
type switch_buffers struct {
bufs map[string]switch_buffer // Buffers indexed by StreamID
size uint64 // Total size of all buffers, in bytes
maxbufs int
maxsize uint64
switchTable *switchTable
bufs map[string]switch_buffer // Buffers indexed by StreamID
size uint64 // Total size of all buffers, in bytes
maxbufs int
maxsize uint64
}
func (b *switch_buffers) cleanup(t *switchTable) {
@ -649,7 +649,7 @@ func (b *switch_buffers) cleanup(t *switchTable) {
}
}
for b.size > switch_buffer_maxSize {
for b.size > b.switchTable.queuetotalmaxsize {
// Drop a random queue
target := rand.Uint64() % b.size
var size uint64 // running total
@ -719,6 +719,7 @@ func (t *switchTable) handleIdle(port switchPort) bool {
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
t.queues.switchTable = t
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things
for {