mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 11:15:07 +03:00 
			
		
		
		
	add a helper actor to the link reader to make it play nicer with backpressure
This commit is contained in:
		
							parent
							
								
									99be6b037d
								
							
						
					
					
						commit
						48bbdac9b3
					
				
					 1 changed files with 17 additions and 6 deletions
				
			
		| 
						 | 
					@ -16,6 +16,8 @@ import (
 | 
				
			||||||
	"github.com/yggdrasil-network/yggdrasil-go/src/address"
 | 
						"github.com/yggdrasil-network/yggdrasil-go/src/address"
 | 
				
			||||||
	"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
 | 
						"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
 | 
				
			||||||
	"github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
						"github.com/yggdrasil-network/yggdrasil-go/src/util"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/Arceliar/phony"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type link struct {
 | 
					type link struct {
 | 
				
			||||||
| 
						 | 
					@ -388,14 +390,17 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	// Run reader loop
 | 
						// Run reader loop
 | 
				
			||||||
	for {
 | 
						var helper phony.Actor
 | 
				
			||||||
 | 
						done := make(chan struct{})
 | 
				
			||||||
 | 
						var helperFunc func()
 | 
				
			||||||
 | 
						helperFunc = func() {
 | 
				
			||||||
 | 
							// The helper reads in a loop and sends to the peer
 | 
				
			||||||
 | 
							// It loops by sending itself a message, which can be delayed by backpressure
 | 
				
			||||||
 | 
							// So if the peer is busy, backpressure will pause reading until the peer catches up
 | 
				
			||||||
		msg, err := intf.msgIO.readMsg()
 | 
							msg, err := intf.msgIO.readMsg()
 | 
				
			||||||
		if len(msg) > 0 {
 | 
							if len(msg) > 0 {
 | 
				
			||||||
			// TODO rewrite this if the link becomes an actor
 | 
								// TODO rewrite this if the link becomes an actor
 | 
				
			||||||
			// FIXME this could theoretically send traffic faster than the peer can handle
 | 
								intf.peer.handlePacketFrom(&helper, msg)
 | 
				
			||||||
			//  The alternative is to SyncExec, but that causes traffic to block while *other* links work
 | 
					 | 
				
			||||||
			//  Need to figure out why and find a workaround
 | 
					 | 
				
			||||||
			intf.peer.handlePacketFrom(nil, msg)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			if err != io.EOF {
 | 
								if err != io.EOF {
 | 
				
			||||||
| 
						 | 
					@ -404,13 +409,19 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
				default:
 | 
									default:
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			break
 | 
								close(done)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case signalAlive <- len(msg) > 0:
 | 
							case signalAlive <- len(msg) > 0:
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							// Now try to read again
 | 
				
			||||||
 | 
							helper.EnqueueFrom(nil, helperFunc)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// Start the read loop
 | 
				
			||||||
 | 
						helper.EnqueueFrom(nil, helperFunc)
 | 
				
			||||||
 | 
						<-done // Wait for the helper to exit
 | 
				
			||||||
	////////////////////////////////////////////////////////////////////////////////
 | 
						////////////////////////////////////////////////////////////////////////////////
 | 
				
			||||||
	// Remember to set `err` to something useful before returning
 | 
						// Remember to set `err` to something useful before returning
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue