From 99616b967a45b04190fa70153b833523760f6ed8 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 4 Jul 2018 18:35:54 -0500 Subject: [PATCH 1/5] limit maximum queue number and size. don't cause the last packet in a queue to drop due to timeout (makes sure *some* packets can get through for every stream, even if delayed) --- src/yggdrasil/switch.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 5b72620c..38aa34fa 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -598,11 +598,18 @@ type switch_buffer struct { count uint64 // Total queue size, including dropped packets } -func (b *switch_buffer) dropTimedOut() { +// Clean up old packets from buffers, to help keep latency within some reasonable bound +func (t *switchTable) cleanBuffer(b *switch_buffer) { // TODO figure out what timeout makes sense + if len(b.packets) == 0 { + return + } + coords := switch_getPacketCoords(b.packets[0].bytes) + dropAll := t.selfIsClosest(coords) const timeout = 25 * time.Millisecond now := time.Now() - for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { + for len(b.packets) > 0 && (dropAll || len(b.packets) > 32 || (len(b.packets) > 1 && now.Sub(b.packets[0].time) > timeout)) { + t.core.log.Println("DEBUG:", len(b.packets), now.Sub(b.packets[0].time)) util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] } @@ -621,7 +628,7 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer for streamID, buf := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - buf.dropTimedOut() + t.cleanBuffer(&buf) if len(buf.packets) == 0 { delete(buffs, streamID) continue @@ -662,9 +669,20 @@ func (t *switchTable) doWorker() { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later + // First drop random queues if we're already tracking too much, to prevent OOM DoS + for streamID, buf := range buffs { + if len(buffs) < 32 { + break + } + for _, packet := range buf.packets { + util_putBytes(packet.bytes) + } + delete(buffs, streamID) + } + // Now add the packet to the appropriate queue streamID := switch_getPacketStreamID(packet) buf := buffs[streamID] - buf.dropTimedOut() + t.cleanBuffer(&buf) pinfo := switch_packetInfo{packet, time.Now()} buf.packets = append(buf.packets, pinfo) buf.count++ From 42a6c24a843918ef0fd1662d8f1781f29e9192d2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 4 Jul 2018 20:06:18 -0500 Subject: [PATCH 2/5] remove debug output --- src/yggdrasil/switch.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 38aa34fa..ae904306 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -609,7 +609,6 @@ func (t *switchTable) cleanBuffer(b *switch_buffer) { const timeout = 25 * time.Millisecond now := time.Now() for len(b.packets) > 0 && (dropAll || len(b.packets) > 32 || (len(b.packets) > 1 && now.Sub(b.packets[0].time) > timeout)) { - t.core.log.Println("DEBUG:", len(b.packets), now.Sub(b.packets[0].time)) util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] } From 7539c0e4fee16386f56778a18b5b87de8470b55b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 4 Jul 2018 21:34:26 -0500 Subject: [PATCH 3/5] remove queue timeout, don't decrement queue size count when sending a packet (so flooding connections eventually need to yield) --- src/yggdrasil/switch.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ae904306..ff2229c5 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,7 +589,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Info about a buffered packet type switch_packetInfo struct { bytes []byte - time time.Time // Timestamp of when the packet arrived + //time time.Time // Timestamp of when the packet arrived } // Used to keep track of buffered packets @@ -600,15 +600,8 @@ type switch_buffer struct { // Clean up old packets from buffers, to help keep latency within some reasonable bound func (t *switchTable) cleanBuffer(b *switch_buffer) { - // TODO figure out what timeout makes sense - if len(b.packets) == 0 { - return - } - coords := switch_getPacketCoords(b.packets[0].bytes) - dropAll := t.selfIsClosest(coords) - const timeout = 25 * time.Millisecond - now := time.Now() - for len(b.packets) > 0 && (dropAll || len(b.packets) > 32 || (len(b.packets) > 1 && now.Sub(b.packets[0].time) > timeout)) { + // TODO sane maximum buffer size, or else CoDel-like maximum time + for len(b.packets) > 32 || (len(b.packets) > 0 && t.selfIsClosest(switch_getPacketCoords(b.packets[0].bytes))) { util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] } @@ -641,11 +634,12 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer } } if bestSize != 0 { + t.core.log.Println("DEBUG:", []byte(best), bestSize) buf := buffs[best] var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.count-- + //buf.count-- // Force flooding connections to eventually yield if len(buf.packets) == 0 { delete(buffs, best) } else { @@ -682,7 +676,7 @@ func (t *switchTable) doWorker() { streamID := switch_getPacketStreamID(packet) buf := buffs[streamID] t.cleanBuffer(&buf) - pinfo := switch_packetInfo{packet, time.Now()} + pinfo := switch_packetInfo{packet/*, time.Now()*/} buf.packets = append(buf.packets, pinfo) buf.count++ buffs[streamID] = buf From 5261bac1d8d8567489822b43dedc03dcdd04a12f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 4 Jul 2018 21:43:42 -0500 Subject: [PATCH 4/5] cleanup --- src/yggdrasil/switch.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ff2229c5..05aa18c5 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,7 +589,6 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Info about a buffered packet type switch_packetInfo struct { bytes []byte - //time time.Time // Timestamp of when the packet arrived } // Used to keep track of buffered packets @@ -600,7 +599,7 @@ type switch_buffer struct { // Clean up old packets from buffers, to help keep latency within some reasonable bound func (t *switchTable) cleanBuffer(b *switch_buffer) { - // TODO sane maximum buffer size, or else CoDel-like maximum time + // TODO sane maximum buffer size, or else CoDel-like maximum time for len(b.packets) > 32 || (len(b.packets) > 0 && t.selfIsClosest(switch_getPacketCoords(b.packets[0].bytes))) { util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] @@ -634,12 +633,10 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer } } if bestSize != 0 { - t.core.log.Println("DEBUG:", []byte(best), bestSize) buf := buffs[best] var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] - //buf.count-- // Force flooding connections to eventually yield if len(buf.packets) == 0 { delete(buffs, best) } else { @@ -676,7 +673,7 @@ func (t *switchTable) doWorker() { streamID := switch_getPacketStreamID(packet) buf := buffs[streamID] t.cleanBuffer(&buf) - pinfo := switch_packetInfo{packet/*, time.Now()*/} + pinfo := switch_packetInfo{packet} buf.packets = append(buf.packets, pinfo) buf.count++ buffs[streamID] = buf From 432b2f7bdda97bcff7140291ca238edfb0ff2202 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 4 Jul 2018 22:07:01 -0500 Subject: [PATCH 5/5] use a stupidly large FIFO --- src/yggdrasil/switch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 05aa18c5..37b828de 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -600,7 +600,7 @@ type switch_buffer struct { // Clean up old packets from buffers, to help keep latency within some reasonable bound func (t *switchTable) cleanBuffer(b *switch_buffer) { // TODO sane maximum buffer size, or else CoDel-like maximum time - for len(b.packets) > 32 || (len(b.packets) > 0 && t.selfIsClosest(switch_getPacketCoords(b.packets[0].bytes))) { + for len(b.packets) > 1024 || (len(b.packets) > 0 && t.selfIsClosest(switch_getPacketCoords(b.packets[0].bytes))) { util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] }