diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 5b72620c..38aa34fa 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -598,11 +598,18 @@ type switch_buffer struct { count uint64 // Total queue size, including dropped packets } -func (b *switch_buffer) dropTimedOut() { +// Clean up old packets from buffers, to help keep latency within some reasonable bound +func (t *switchTable) cleanBuffer(b *switch_buffer) { // TODO figure out what timeout makes sense + if len(b.packets) == 0 { + return + } + coords := switch_getPacketCoords(b.packets[0].bytes) + dropAll := t.selfIsClosest(coords) const timeout = 25 * time.Millisecond now := time.Now() - for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { + for len(b.packets) > 0 && (dropAll || len(b.packets) > 32 || (len(b.packets) > 1 && now.Sub(b.packets[0].time) > timeout)) { + t.core.log.Println("DEBUG:", len(b.packets), now.Sub(b.packets[0].time)) util_putBytes(b.packets[0].bytes) b.packets = b.packets[1:] } @@ -621,7 +628,7 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer for streamID, buf := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - buf.dropTimedOut() + t.cleanBuffer(&buf) if len(buf.packets) == 0 { delete(buffs, streamID) continue @@ -662,9 +669,20 @@ func (t *switchTable) doWorker() { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later + // First drop random queues if we're already tracking too much, to prevent OOM DoS + for streamID, buf := range buffs { + if len(buffs) < 32 { + break + } + for _, packet := range buf.packets { + util_putBytes(packet.bytes) + } + delete(buffs, streamID) + } + // Now add the packet to the appropriate queue streamID := switch_getPacketStreamID(packet) buf := buffs[streamID] - buf.dropTimedOut() + t.cleanBuffer(&buf) pinfo := switch_packetInfo{packet, time.Now()} buf.packets = append(buf.packets, pinfo) buf.count++