mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	Add a rumor mill to throttle dht maintenance traffic
This commit is contained in:
		
							parent
							
								
									032ad3a162
								
							
						
					
					
						commit
						da44ec282f
					
				
					 3 changed files with 66 additions and 39 deletions
				
			
		| 
						 | 
				
			
			@ -369,3 +369,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
 | 
			
		|||
	go p.linkLoop(plinkIn)
 | 
			
		||||
	go q.linkLoop(qlinkIn)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Core) DEBUG_simFixMTU() {
 | 
			
		||||
	c.tun.mtu = 65535
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -62,6 +62,11 @@ type dhtRes struct {
 | 
			
		|||
	infos  []*dhtInfo // response
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type dht_rumor struct {
 | 
			
		||||
	info   *dhtInfo
 | 
			
		||||
	target *NodeID
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type dht struct {
 | 
			
		||||
	core           *Core
 | 
			
		||||
	nodeID         NodeID
 | 
			
		||||
| 
						 | 
				
			
			@ -69,6 +74,7 @@ type dht struct {
 | 
			
		|||
	peers          chan *dhtInfo             // other goroutines put incoming dht updates here
 | 
			
		||||
	reqs           map[boxPubKey]map[NodeID]time.Time
 | 
			
		||||
	offset         int
 | 
			
		||||
	rumorMill      []dht_rumor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *dht) init(c *Core) {
 | 
			
		||||
| 
						 | 
				
			
			@ -143,7 +149,7 @@ func (t *dht) handleRes(res *dhtRes) {
 | 
			
		|||
		if b.contains(info) {
 | 
			
		||||
			continue
 | 
			
		||||
		} // wait for maintenance cycle to get them
 | 
			
		||||
		t.ping(info, info.getNodeID())
 | 
			
		||||
		t.addToMill(info, info.getNodeID())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -379,6 +385,14 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
 | 
			
		|||
	t.sendReq(&req, info)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *dht) addToMill(info *dhtInfo, target *NodeID) {
 | 
			
		||||
	rumor := dht_rumor{
 | 
			
		||||
		info:   info,
 | 
			
		||||
		target: target,
 | 
			
		||||
	}
 | 
			
		||||
	t.rumorMill = append(t.rumorMill, rumor)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *dht) doMaintenance() {
 | 
			
		||||
	// First clean up reqs
 | 
			
		||||
	for key, reqs := range t.reqs {
 | 
			
		||||
| 
						 | 
				
			
			@ -391,36 +405,43 @@ func (t *dht) doMaintenance() {
 | 
			
		|||
			delete(t.reqs, key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Ping the least recently contacted node
 | 
			
		||||
	//  This is to make sure we eventually notice when someone times out
 | 
			
		||||
	var oldest *dhtInfo
 | 
			
		||||
	last := 0
 | 
			
		||||
	for bidx := 0; bidx < t.nBuckets(); bidx++ {
 | 
			
		||||
		b := t.getBucket(bidx)
 | 
			
		||||
		if !b.isEmpty() {
 | 
			
		||||
			last = bidx
 | 
			
		||||
			toPing := b.nextToPing()
 | 
			
		||||
			if toPing == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			} // We've recently pinged everyone in b
 | 
			
		||||
			if oldest == nil || toPing.recv.Before(oldest.recv) {
 | 
			
		||||
				oldest = toPing
 | 
			
		||||
	if len(t.rumorMill) == 0 {
 | 
			
		||||
		// Ping the least recently contacted node
 | 
			
		||||
		//  This is to make sure we eventually notice when someone times out
 | 
			
		||||
		var oldest *dhtInfo
 | 
			
		||||
		last := 0
 | 
			
		||||
		for bidx := 0; bidx < t.nBuckets(); bidx++ {
 | 
			
		||||
			b := t.getBucket(bidx)
 | 
			
		||||
			if !b.isEmpty() {
 | 
			
		||||
				last = bidx
 | 
			
		||||
				toPing := b.nextToPing()
 | 
			
		||||
				if toPing == nil {
 | 
			
		||||
					continue
 | 
			
		||||
				} // We've recently pinged everyone in b
 | 
			
		||||
				if oldest == nil || toPing.recv.Before(oldest.recv) {
 | 
			
		||||
					oldest = toPing
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if oldest != nil {
 | 
			
		||||
			t.addToMill(oldest, nil)
 | 
			
		||||
		} // if the DHT isn't empty
 | 
			
		||||
		// Refresh buckets
 | 
			
		||||
		if t.offset > last {
 | 
			
		||||
			t.offset = 0
 | 
			
		||||
		}
 | 
			
		||||
		target := t.getTarget(t.offset)
 | 
			
		||||
		for _, info := range t.lookup(target) {
 | 
			
		||||
			t.addToMill(info, target)
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		t.offset++
 | 
			
		||||
	}
 | 
			
		||||
	if oldest != nil {
 | 
			
		||||
		t.ping(oldest, nil)
 | 
			
		||||
	} // if the DHT isn't empty
 | 
			
		||||
	// Refresh buckets
 | 
			
		||||
	if t.offset > last {
 | 
			
		||||
		t.offset = 0
 | 
			
		||||
	if len(t.rumorMill) > 0 {
 | 
			
		||||
		var rumor dht_rumor
 | 
			
		||||
		rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
 | 
			
		||||
		t.ping(rumor.info, rumor.target)
 | 
			
		||||
	}
 | 
			
		||||
	target := t.getTarget(t.offset)
 | 
			
		||||
	for _, info := range t.lookup(target) {
 | 
			
		||||
		t.ping(info, target)
 | 
			
		||||
		break
 | 
			
		||||
	}
 | 
			
		||||
	t.offset++
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func dht_firstCloserThanThird(first *NodeID,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue