mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	use backpressure instead of estimated bandwidth, sorted by uptime to break ties
This commit is contained in:
		
							parent
							
								
									707e23d392
								
							
						
					
					
						commit
						38e7704161
					
				
					 4 changed files with 27 additions and 51 deletions
				
			
		| 
						 | 
					@ -25,7 +25,6 @@ package yggdrasil
 | 
				
			||||||
import "time"
 | 
					import "time"
 | 
				
			||||||
import "sync"
 | 
					import "sync"
 | 
				
			||||||
import "sync/atomic"
 | 
					import "sync/atomic"
 | 
				
			||||||
import "math"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
//import "fmt"
 | 
					//import "fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -86,7 +85,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
 | 
				
			||||||
type peer struct {
 | 
					type peer struct {
 | 
				
			||||||
	// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
 | 
						// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
 | 
				
			||||||
	// use get/update methods only! (atomic accessors as float64)
 | 
						// use get/update methods only! (atomic accessors as float64)
 | 
				
			||||||
	bandwidth  uint64
 | 
						queueSize  int64
 | 
				
			||||||
	bytesSent  uint64 // To track bandwidth usage for getPeers
 | 
						bytesSent  uint64 // To track bandwidth usage for getPeers
 | 
				
			||||||
	bytesRecvd uint64 // To track bandwidth usage for getPeers
 | 
						bytesRecvd uint64 // To track bandwidth usage for getPeers
 | 
				
			||||||
	// BUG: sync/atomic, 32 bit platforms need the above to be the first element
 | 
						// BUG: sync/atomic, 32 bit platforms need the above to be the first element
 | 
				
			||||||
| 
						 | 
					@ -116,22 +115,12 @@ type peer struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const peer_Throttle = 1
 | 
					const peer_Throttle = 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *peer) getBandwidth() float64 {
 | 
					func (p *peer) getQueueSize() int64 {
 | 
				
			||||||
	bits := atomic.LoadUint64(&p.bandwidth)
 | 
						return atomic.LoadInt64(&p.queueSize)
 | 
				
			||||||
	return math.Float64frombits(bits)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *peer) updateBandwidth(bytes int, duration time.Duration) {
 | 
					func (p *peer) updateQueueSize(delta int64) {
 | 
				
			||||||
	if p == nil {
 | 
						atomic.AddInt64(&p.queueSize, delta)
 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for ok := false; !ok; {
 | 
					 | 
				
			||||||
		oldBits := atomic.LoadUint64(&p.bandwidth)
 | 
					 | 
				
			||||||
		oldBandwidth := math.Float64frombits(oldBits)
 | 
					 | 
				
			||||||
		bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds()
 | 
					 | 
				
			||||||
		bits := math.Float64bits(bandwidth)
 | 
					 | 
				
			||||||
		ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ps *peers) newPeer(box *boxPubKey,
 | 
					func (ps *peers) newPeer(box *boxPubKey,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -12,6 +12,7 @@ package yggdrasil
 | 
				
			||||||
//  A little annoying to do with constant changes from bandwidth estimates
 | 
					//  A little annoying to do with constant changes from bandwidth estimates
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import "time"
 | 
					import "time"
 | 
				
			||||||
 | 
					import "sort"
 | 
				
			||||||
import "sync"
 | 
					import "sync"
 | 
				
			||||||
import "sync/atomic"
 | 
					import "sync/atomic"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -397,37 +398,36 @@ func (t *switchTable) updateTable() {
 | 
				
			||||||
			port:    pinfo.port,
 | 
								port:    pinfo.port,
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						sort.SliceStable(newTable.elems, func(i, j int) bool {
 | 
				
			||||||
 | 
							return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	t.table.Store(newTable)
 | 
						t.table.Store(newTable)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
 | 
					func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
 | 
				
			||||||
	t.updater.Load().(*sync.Once).Do(t.updateTable)
 | 
						t.updater.Load().(*sync.Once).Do(t.updateTable)
 | 
				
			||||||
	table := t.table.Load().(lookupTable)
 | 
						table := t.table.Load().(lookupTable)
 | 
				
			||||||
	ports := t.core.peers.getPorts()
 | 
					 | 
				
			||||||
	getBandwidth := func(port switchPort) float64 {
 | 
					 | 
				
			||||||
		var bandwidth float64
 | 
					 | 
				
			||||||
		if p, isIn := ports[port]; isIn {
 | 
					 | 
				
			||||||
			bandwidth = p.getBandwidth()
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return bandwidth
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	var best switchPort
 | 
					 | 
				
			||||||
	myDist := table.self.dist(dest) //getDist(table.self.coords)
 | 
						myDist := table.self.dist(dest) //getDist(table.self.coords)
 | 
				
			||||||
	if !(uint64(myDist) < ttl) {
 | 
						if !(uint64(myDist) < ttl) {
 | 
				
			||||||
		return 0, 0
 | 
							return 0, 0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// score is in units of bandwidth / distance
 | 
						// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
 | 
				
			||||||
	bestScore := float64(-1)
 | 
						ports := t.core.peers.getPorts()
 | 
				
			||||||
 | 
						var best switchPort
 | 
				
			||||||
 | 
						bestCost := int64(^uint64(0) >> 1)
 | 
				
			||||||
	for _, info := range table.elems {
 | 
						for _, info := range table.elems {
 | 
				
			||||||
		dist := info.locator.dist(dest) //getDist(info.locator.coords)
 | 
							dist := info.locator.dist(dest) //getDist(info.locator.coords)
 | 
				
			||||||
		if !(dist < myDist) {
 | 
							if !(dist < myDist) {
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		score := getBandwidth(info.port)
 | 
							p, isIn := ports[info.port]
 | 
				
			||||||
		score /= float64(1 + dist)
 | 
							if !isIn {
 | 
				
			||||||
		if score > bestScore {
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							cost := int64(dist) + p.getQueueSize()
 | 
				
			||||||
 | 
							if cost < bestCost {
 | 
				
			||||||
			best = info.port
 | 
								best = info.port
 | 
				
			||||||
			bestScore = score
 | 
								bestCost = cost
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	//t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best))
 | 
						//t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best))
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -190,26 +190,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
 | 
				
			||||||
	buf := bufio.NewWriterSize(sock, tcp_msgSize)
 | 
						buf := bufio.NewWriterSize(sock, tcp_msgSize)
 | 
				
			||||||
	send := func(msg []byte) {
 | 
						send := func(msg []byte) {
 | 
				
			||||||
		msgLen := wire_encode_uint64(uint64(len(msg)))
 | 
							msgLen := wire_encode_uint64(uint64(len(msg)))
 | 
				
			||||||
		before := buf.Buffered()
 | 
					 | 
				
			||||||
		start := time.Now()
 | 
					 | 
				
			||||||
		buf.Write(tcp_msg[:])
 | 
							buf.Write(tcp_msg[:])
 | 
				
			||||||
		buf.Write(msgLen)
 | 
							buf.Write(msgLen)
 | 
				
			||||||
		buf.Write(msg)
 | 
							buf.Write(msg)
 | 
				
			||||||
		timed := time.Since(start)
 | 
							p.updateQueueSize(-1)
 | 
				
			||||||
		after := buf.Buffered()
 | 
					 | 
				
			||||||
		written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after
 | 
					 | 
				
			||||||
		if written > 0 {
 | 
					 | 
				
			||||||
			p.updateBandwidth(written, timed)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		util_putBytes(msg)
 | 
							util_putBytes(msg)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	flush := func() {
 | 
					 | 
				
			||||||
		size := buf.Buffered()
 | 
					 | 
				
			||||||
		start := time.Now()
 | 
					 | 
				
			||||||
		buf.Flush()
 | 
					 | 
				
			||||||
		timed := time.Since(start)
 | 
					 | 
				
			||||||
		p.updateBandwidth(size, timed)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		var stack [][]byte
 | 
							var stack [][]byte
 | 
				
			||||||
		put := func(msg []byte) {
 | 
							put := func(msg []byte) {
 | 
				
			||||||
| 
						 | 
					@ -217,6 +203,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
 | 
				
			||||||
			for len(stack) > 32 {
 | 
								for len(stack) > 32 {
 | 
				
			||||||
				util_putBytes(stack[0])
 | 
									util_putBytes(stack[0])
 | 
				
			||||||
				stack = stack[1:]
 | 
									stack = stack[1:]
 | 
				
			||||||
 | 
									p.updateQueueSize(-1)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		for msg := range out {
 | 
							for msg := range out {
 | 
				
			||||||
| 
						 | 
					@ -226,7 +213,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
 | 
				
			||||||
				select {
 | 
									select {
 | 
				
			||||||
				case msg, ok := <-out:
 | 
									case msg, ok := <-out:
 | 
				
			||||||
					if !ok {
 | 
										if !ok {
 | 
				
			||||||
						flush()
 | 
											buf.Flush()
 | 
				
			||||||
						return
 | 
											return
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					put(msg)
 | 
										put(msg)
 | 
				
			||||||
| 
						 | 
					@ -236,13 +223,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
 | 
				
			||||||
					send(msg)
 | 
										send(msg)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			flush()
 | 
								buf.Flush()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	p.out = func(msg []byte) {
 | 
						p.out = func(msg []byte) {
 | 
				
			||||||
		defer func() { recover() }()
 | 
							defer func() { recover() }()
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case out <- msg:
 | 
							case out <- msg:
 | 
				
			||||||
 | 
								p.updateQueueSize(1)
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			util_putBytes(msg)
 | 
								util_putBytes(msg)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -265,6 +265,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
 | 
				
			||||||
			defer func() { recover() }()
 | 
								defer func() { recover() }()
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case conn.out <- msg:
 | 
								case conn.out <- msg:
 | 
				
			||||||
 | 
									conn.peer.updateQueueSize(1)
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				util_putBytes(msg)
 | 
									util_putBytes(msg)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
| 
						 | 
					@ -282,16 +283,14 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
 | 
				
			||||||
				if len(chunks) > 255 {
 | 
									if len(chunks) > 255 {
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				start := time.Now()
 | 
					 | 
				
			||||||
				for idx, bs := range chunks {
 | 
									for idx, bs := range chunks {
 | 
				
			||||||
					nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut
 | 
										nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut
 | 
				
			||||||
					out = udp_encode(out[:0], nChunks, nChunk, count, bs)
 | 
										out = udp_encode(out[:0], nChunks, nChunk, count, bs)
 | 
				
			||||||
					//iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs))
 | 
										//iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs))
 | 
				
			||||||
					iface.sock.WriteToUDP(out, udpAddr)
 | 
										iface.sock.WriteToUDP(out, udpAddr)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				timed := time.Since(start)
 | 
					 | 
				
			||||||
				conn.countOut += 1
 | 
									conn.countOut += 1
 | 
				
			||||||
				conn.peer.updateBandwidth(len(msg), timed)
 | 
									conn.peer.updateQueueSize(-1)
 | 
				
			||||||
				util_putBytes(msg)
 | 
									util_putBytes(msg)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue