mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	Merge branch 'future' of https://github.com/yggdrasil-network/yggdrasil-go into develop-future
This commit is contained in:
		
						commit
						b6c894bc01
					
				
					 2 changed files with 58 additions and 34 deletions
				
			
		| 
						 | 
					@ -64,8 +64,9 @@ type link struct {
 | 
				
			||||||
	keepAliveTimer *time.Timer // Fires to send keep-alive traffic
 | 
						keepAliveTimer *time.Timer // Fires to send keep-alive traffic
 | 
				
			||||||
	stallTimer     *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
 | 
						stallTimer     *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
 | 
				
			||||||
	closeTimer     *time.Timer // Fires when the link has been idle so long we need to close it
 | 
						closeTimer     *time.Timer // Fires when the link has been idle so long we need to close it
 | 
				
			||||||
	isSending      bool        // True between a notifySending and a notifySent
 | 
						readUnblocked  bool        // True if we've sent a read message unblocking this peer in the switch
 | 
				
			||||||
	blocked        bool        // True if we've blocked the peer in the switch
 | 
						writeUnblocked bool        // True if we've sent a write message unblocking this peer in the swithc
 | 
				
			||||||
 | 
						shutdown       bool        // True if we're shutting down, avoids sending some messages that could race with new peers being crated in the same port
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type linkOptions struct {
 | 
					type linkOptions struct {
 | 
				
			||||||
| 
						 | 
					@ -285,7 +286,10 @@ func (intf *link) handler() error {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
		// More cleanup can go here
 | 
							// More cleanup can go here
 | 
				
			||||||
		intf.peer.Act(nil, intf.peer._removeSelf)
 | 
							intf.Act(nil, func() {
 | 
				
			||||||
 | 
								intf.shutdown = true
 | 
				
			||||||
 | 
								intf.peer.Act(intf, intf.peer._removeSelf)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
 | 
						themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
 | 
				
			||||||
	themAddrString := net.IP(themAddr[:]).String()
 | 
						themAddrString := net.IP(themAddr[:]).String()
 | 
				
			||||||
| 
						 | 
					@ -385,7 +389,6 @@ const (
 | 
				
			||||||
// notify the intf that we're currently sending
 | 
					// notify the intf that we're currently sending
 | 
				
			||||||
func (intf *link) notifySending(size int) {
 | 
					func (intf *link) notifySending(size int) {
 | 
				
			||||||
	intf.Act(&intf.writer, func() {
 | 
						intf.Act(&intf.writer, func() {
 | 
				
			||||||
		intf.isSending = true
 | 
					 | 
				
			||||||
		intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
 | 
							intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
 | 
				
			||||||
		if intf.keepAliveTimer != nil {
 | 
							if intf.keepAliveTimer != nil {
 | 
				
			||||||
			intf.keepAliveTimer.Stop()
 | 
								intf.keepAliveTimer.Stop()
 | 
				
			||||||
| 
						 | 
					@ -400,10 +403,14 @@ func (intf *link) notifySending(size int) {
 | 
				
			||||||
// through other links, if alternatives exist
 | 
					// through other links, if alternatives exist
 | 
				
			||||||
func (intf *link) notifyBlockedSend() {
 | 
					func (intf *link) notifyBlockedSend() {
 | 
				
			||||||
	intf.Act(nil, func() {
 | 
						intf.Act(nil, func() {
 | 
				
			||||||
		if intf.sendTimer != nil && !intf.blocked {
 | 
							if intf.sendTimer != nil {
 | 
				
			||||||
			//As far as we know, we're still trying to send, and the timer fired.
 | 
								//As far as we know, we're still trying to send, and the timer fired.
 | 
				
			||||||
			intf.blocked = true
 | 
								intf.sendTimer.Stop()
 | 
				
			||||||
			intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
 | 
								intf.sendTimer = nil
 | 
				
			||||||
 | 
								if !intf.shutdown && intf.writeUnblocked {
 | 
				
			||||||
 | 
									intf.writeUnblocked = false
 | 
				
			||||||
 | 
									intf.links.core.switchTable.blockPeer(intf, intf.peer.port, true)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -421,10 +428,13 @@ func (intf *link) notifySent(size int) {
 | 
				
			||||||
			intf.keepAliveTimer = nil
 | 
								intf.keepAliveTimer = nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		intf._notifyIdle()
 | 
							intf._notifyIdle()
 | 
				
			||||||
		intf.isSending = false
 | 
					 | 
				
			||||||
		if size > 0 && intf.stallTimer == nil {
 | 
							if size > 0 && intf.stallTimer == nil {
 | 
				
			||||||
			intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
 | 
								intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							if !intf.shutdown && !intf.writeUnblocked {
 | 
				
			||||||
 | 
								intf.writeUnblocked = true
 | 
				
			||||||
 | 
								intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, true)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -439,9 +449,9 @@ func (intf *link) notifyStalled() {
 | 
				
			||||||
		if intf.stallTimer != nil {
 | 
							if intf.stallTimer != nil {
 | 
				
			||||||
			intf.stallTimer.Stop()
 | 
								intf.stallTimer.Stop()
 | 
				
			||||||
			intf.stallTimer = nil
 | 
								intf.stallTimer = nil
 | 
				
			||||||
			if !intf.blocked {
 | 
								if !intf.shutdown && intf.readUnblocked {
 | 
				
			||||||
				intf.blocked = true
 | 
									intf.readUnblocked = false
 | 
				
			||||||
				intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
 | 
									intf.links.core.switchTable.blockPeer(intf, intf.peer.port, false)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
| 
						 | 
					@ -467,9 +477,9 @@ func (intf *link) notifyRead(size int) {
 | 
				
			||||||
		if size > 0 && intf.keepAliveTimer == nil {
 | 
							if size > 0 && intf.keepAliveTimer == nil {
 | 
				
			||||||
			intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
 | 
								intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if intf.blocked {
 | 
							if !intf.shutdown && !intf.readUnblocked {
 | 
				
			||||||
			intf.blocked = false
 | 
								intf.readUnblocked = true
 | 
				
			||||||
			intf.links.core.switchTable.unblockPeer(intf, intf.peer.port)
 | 
								intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, false)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -143,7 +143,12 @@ type peerInfo struct {
 | 
				
			||||||
	faster     map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
 | 
						faster     map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
 | 
				
			||||||
	port       switchPort            // Interface number of this peer
 | 
						port       switchPort            // Interface number of this peer
 | 
				
			||||||
	msg        switchMsg             // The wire switchMsg used
 | 
						msg        switchMsg             // The wire switchMsg used
 | 
				
			||||||
	blocked bool                  // True if the link is blocked, used to avoid parenting a blocked link
 | 
						readBlock  bool                  // True if the link notified us of a read that blocked too long
 | 
				
			||||||
 | 
						writeBlock bool                  // True of the link notified us of a write that blocked too long
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pinfo *peerInfo) blocked() bool {
 | 
				
			||||||
 | 
						return pinfo.readBlock || pinfo.writeBlock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// This is just a uint64 with a named type for clarity reasons.
 | 
					// This is just a uint64 with a named type for clarity reasons.
 | 
				
			||||||
| 
						 | 
					@ -250,15 +255,19 @@ func (t *switchTable) _cleanRoot() {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Blocks and, if possible, unparents a peer
 | 
					// Blocks and, if possible, unparents a peer
 | 
				
			||||||
func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
 | 
					func (t *switchTable) blockPeer(from phony.Actor, port switchPort, isWrite bool) {
 | 
				
			||||||
	t.Act(from, func() {
 | 
						t.Act(from, func() {
 | 
				
			||||||
		peer, isIn := t.data.peers[port]
 | 
							peer, isIn := t.data.peers[port]
 | 
				
			||||||
		if !isIn || peer.blocked {
 | 
							switch {
 | 
				
			||||||
 | 
							case isIn && !isWrite && !peer.readBlock:
 | 
				
			||||||
 | 
								peer.readBlock = true
 | 
				
			||||||
 | 
							case isIn && isWrite && !peer.writeBlock:
 | 
				
			||||||
 | 
								peer.writeBlock = true
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		peer.blocked = true
 | 
					 | 
				
			||||||
		t.data.peers[port] = peer
 | 
							t.data.peers[port] = peer
 | 
				
			||||||
		t._updateTable()
 | 
							defer t._updateTable()
 | 
				
			||||||
		if port != t.parent {
 | 
							if port != t.parent {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -273,13 +282,17 @@ func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *switchTable) unblockPeer(from phony.Actor, port switchPort) {
 | 
					func (t *switchTable) unblockPeer(from phony.Actor, port switchPort, isWrite bool) {
 | 
				
			||||||
	t.Act(from, func() {
 | 
						t.Act(from, func() {
 | 
				
			||||||
		peer, isIn := t.data.peers[port]
 | 
							peer, isIn := t.data.peers[port]
 | 
				
			||||||
		if !isIn || !peer.blocked {
 | 
							switch {
 | 
				
			||||||
 | 
							case isIn && !isWrite && peer.readBlock:
 | 
				
			||||||
 | 
								peer.readBlock = false
 | 
				
			||||||
 | 
							case isIn && isWrite && peer.writeBlock:
 | 
				
			||||||
 | 
								peer.writeBlock = false
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		peer.blocked = false
 | 
					 | 
				
			||||||
		t.data.peers[port] = peer
 | 
							t.data.peers[port] = peer
 | 
				
			||||||
		t._updateTable()
 | 
							t._updateTable()
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
| 
						 | 
					@ -422,7 +435,8 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
 | 
				
			||||||
	if reprocessing {
 | 
						if reprocessing {
 | 
				
			||||||
		sender.faster = oldSender.faster
 | 
							sender.faster = oldSender.faster
 | 
				
			||||||
		sender.time = oldSender.time
 | 
							sender.time = oldSender.time
 | 
				
			||||||
		sender.blocked = oldSender.blocked
 | 
							sender.readBlock = oldSender.readBlock
 | 
				
			||||||
 | 
							sender.writeBlock = oldSender.writeBlock
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
 | 
							sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
 | 
				
			||||||
		for port, peer := range t.data.peers {
 | 
							for port, peer := range t.data.peers {
 | 
				
			||||||
| 
						 | 
					@ -445,7 +459,7 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if sender.blocked != oldSender.blocked {
 | 
						if sender.blocked() != oldSender.blocked() {
 | 
				
			||||||
		doUpdate = true
 | 
							doUpdate = true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Update sender
 | 
						// Update sender
 | 
				
			||||||
| 
						 | 
					@ -485,10 +499,10 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
 | 
				
			||||||
	case sender.faster[t.parent] >= switch_faster_threshold:
 | 
						case sender.faster[t.parent] >= switch_faster_threshold:
 | 
				
			||||||
		// The is reliably faster than the current parent.
 | 
							// The is reliably faster than the current parent.
 | 
				
			||||||
		updateRoot = true
 | 
							updateRoot = true
 | 
				
			||||||
	case !sender.blocked && oldParent.blocked:
 | 
						case !sender.blocked() && oldParent.blocked():
 | 
				
			||||||
		// Replace a blocked parent
 | 
							// Replace a blocked parent
 | 
				
			||||||
		updateRoot = true
 | 
							updateRoot = true
 | 
				
			||||||
	case reprocessing && sender.blocked && !oldParent.blocked:
 | 
						case reprocessing && sender.blocked() && !oldParent.blocked():
 | 
				
			||||||
		// Don't replace an unblocked parent when reprocessing
 | 
							// Don't replace an unblocked parent when reprocessing
 | 
				
			||||||
	case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
 | 
						case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
 | 
				
			||||||
		// The sender seems to be reliably faster than the current parent, so switch to them instead.
 | 
							// The sender seems to be reliably faster than the current parent, so switch to them instead.
 | 
				
			||||||
| 
						 | 
					@ -545,7 +559,7 @@ func (t *switchTable) _updateTable() {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	newTable._init()
 | 
						newTable._init()
 | 
				
			||||||
	for _, pinfo := range t.data.peers {
 | 
						for _, pinfo := range t.data.peers {
 | 
				
			||||||
		if pinfo.blocked || pinfo.locator.root != newTable.self.root {
 | 
							if pinfo.blocked() || pinfo.locator.root != newTable.self.root {
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		loc := pinfo.locator.clone()
 | 
							loc := pinfo.locator.clone()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue