mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	allow links to send multiple packets at once, currently we still only bother to send 1 at a time from the switch level
This commit is contained in:
		
							parent
							
								
									009d9c9ec0
								
							
						
					
					
						commit
						62337bcd64
					
				
					 5 changed files with 48 additions and 50 deletions
				
			
		| 
						 | 
					@ -37,7 +37,7 @@ type linkInfo struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type linkInterfaceMsgIO interface {
 | 
					type linkInterfaceMsgIO interface {
 | 
				
			||||||
	readMsg() ([]byte, error)
 | 
						readMsg() ([]byte, error)
 | 
				
			||||||
	writeMsg([]byte) (int, error)
 | 
						writeMsgs([][]byte) (int, error)
 | 
				
			||||||
	close() error
 | 
						close() error
 | 
				
			||||||
	// These are temporary workarounds to stream semantics
 | 
						// These are temporary workarounds to stream semantics
 | 
				
			||||||
	_sendMetaBytes([]byte) error
 | 
						_sendMetaBytes([]byte) error
 | 
				
			||||||
| 
						 | 
					@ -207,11 +207,11 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
		intf.link.core.peers.removePeer(intf.peer.port)
 | 
							intf.link.core.peers.removePeer(intf.peer.port)
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	// Finish setting up the peer struct
 | 
						// Finish setting up the peer struct
 | 
				
			||||||
	out := make(chan []byte, 1)
 | 
						out := make(chan [][]byte, 1)
 | 
				
			||||||
	defer close(out)
 | 
						defer close(out)
 | 
				
			||||||
	intf.peer.out = func(msg []byte) {
 | 
						intf.peer.out = func(msgs [][]byte) {
 | 
				
			||||||
		defer func() { recover() }()
 | 
							defer func() { recover() }()
 | 
				
			||||||
		out <- msg
 | 
							out <- msgs
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	intf.peer.linkOut = make(chan []byte, 1)
 | 
						intf.peer.linkOut = make(chan []byte, 1)
 | 
				
			||||||
	themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
 | 
						themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
 | 
				
			||||||
| 
						 | 
					@ -234,12 +234,12 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
		interval := 4 * time.Second
 | 
							interval := 4 * time.Second
 | 
				
			||||||
		tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
 | 
							tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
 | 
				
			||||||
		defer util.TimerStop(tcpTimer)
 | 
							defer util.TimerStop(tcpTimer)
 | 
				
			||||||
		send := func(bs []byte) {
 | 
							send := func(bss [][]byte) {
 | 
				
			||||||
			sendBlocked.Reset(time.Second)
 | 
								sendBlocked.Reset(time.Second)
 | 
				
			||||||
			intf.msgIO.writeMsg(bs)
 | 
								size, _ := intf.msgIO.writeMsgs(bss)
 | 
				
			||||||
			util.TimerStop(sendBlocked)
 | 
								util.TimerStop(sendBlocked)
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case signalSent <- len(bs) > 0:
 | 
								case signalSent <- size > 0:
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -247,7 +247,7 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
			// First try to send any link protocol traffic
 | 
								// First try to send any link protocol traffic
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case msg := <-intf.peer.linkOut:
 | 
								case msg := <-intf.peer.linkOut:
 | 
				
			||||||
				send(msg)
 | 
									send([][]byte{msg})
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
| 
						 | 
					@ -259,19 +259,21 @@ func (intf *linkInterface) handler() error {
 | 
				
			||||||
			case <-tcpTimer.C:
 | 
								case <-tcpTimer.C:
 | 
				
			||||||
				intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
 | 
									intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
 | 
				
			||||||
					strings.ToUpper(intf.info.linkType), themString, intf.info.local)
 | 
										strings.ToUpper(intf.info.linkType), themString, intf.info.local)
 | 
				
			||||||
				send(nil)
 | 
									send([][]byte{nil})
 | 
				
			||||||
			case <-sendAck:
 | 
								case <-sendAck:
 | 
				
			||||||
				intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
 | 
									intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
 | 
				
			||||||
					strings.ToUpper(intf.info.linkType), themString, intf.info.local)
 | 
										strings.ToUpper(intf.info.linkType), themString, intf.info.local)
 | 
				
			||||||
				send(nil)
 | 
									send([][]byte{nil})
 | 
				
			||||||
			case msg := <-intf.peer.linkOut:
 | 
								case msg := <-intf.peer.linkOut:
 | 
				
			||||||
				send(msg)
 | 
									send([][]byte{msg})
 | 
				
			||||||
			case msg, ok := <-out:
 | 
								case msgs, ok := <-out:
 | 
				
			||||||
				if !ok {
 | 
									if !ok {
 | 
				
			||||||
					return
 | 
										return
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				send(msg)
 | 
									send(msgs)
 | 
				
			||||||
 | 
									for _, msg := range msgs {
 | 
				
			||||||
					util.PutBytes(msg)
 | 
										util.PutBytes(msg)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
				select {
 | 
									select {
 | 
				
			||||||
				case signalReady <- struct{}{}:
 | 
									case signalReady <- struct{}{}:
 | 
				
			||||||
				default:
 | 
									default:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -109,7 +109,7 @@ type peer struct {
 | 
				
			||||||
	linkOut    (chan []byte)   // used for protocol traffic (to bypass queues)
 | 
						linkOut    (chan []byte)   // used for protocol traffic (to bypass queues)
 | 
				
			||||||
	doSend     (chan struct{}) // tell the linkLoop to send a switchMsg
 | 
						doSend     (chan struct{}) // tell the linkLoop to send a switchMsg
 | 
				
			||||||
	dinfo      (chan *dhtInfo) // used to keep the DHT working
 | 
						dinfo      (chan *dhtInfo) // used to keep the DHT working
 | 
				
			||||||
	out        func([]byte)    // Set up by whatever created the peers struct, used to send packets to other nodes
 | 
						out        func([][]byte)  // Set up by whatever created the peers struct, used to send packets to other nodes
 | 
				
			||||||
	close      func()          // Called when a peer is removed, to close the underlying connection, or via admin api
 | 
						close      func()          // Called when a peer is removed, to close the underlying connection, or via admin api
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -250,11 +250,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// This just calls p.out(packet) for now.
 | 
					// This just calls p.out(packet) for now.
 | 
				
			||||||
func (p *peer) sendPacket(packet []byte) {
 | 
					func (p *peer) sendPackets(packets [][]byte) {
 | 
				
			||||||
	// Is there ever a case where something more complicated is needed?
 | 
						// Is there ever a case where something more complicated is needed?
 | 
				
			||||||
	// What if p.out blocks?
 | 
						// What if p.out blocks?
 | 
				
			||||||
	atomic.AddUint64(&p.bytesSent, uint64(len(packet)))
 | 
						var size int
 | 
				
			||||||
	p.out(packet)
 | 
						for _, packet := range packets {
 | 
				
			||||||
 | 
							size += len(packet)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						atomic.AddUint64(&p.bytesSent, uint64(size))
 | 
				
			||||||
 | 
						p.out(packets)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
 | 
					// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -39,7 +39,7 @@ type router struct {
 | 
				
			||||||
	reconfigure chan chan error
 | 
						reconfigure chan chan error
 | 
				
			||||||
	addr        address.Address
 | 
						addr        address.Address
 | 
				
			||||||
	subnet      address.Subnet
 | 
						subnet      address.Subnet
 | 
				
			||||||
	in          <-chan []byte // packets we received from the network, link to peer's "out"
 | 
						in          <-chan [][]byte // packets we received from the network, link to peer's "out"
 | 
				
			||||||
	out         func([]byte)    // packets we're sending to the network, link to peer's "in"
 | 
						out         func([]byte)    // packets we're sending to the network, link to peer's "in"
 | 
				
			||||||
	reset       chan struct{}   // signal that coords changed (re-init sessions/dht)
 | 
						reset       chan struct{}   // signal that coords changed (re-init sessions/dht)
 | 
				
			||||||
	admin       chan func()     // pass a lambda for the admin socket to query stuff
 | 
						admin       chan func()     // pass a lambda for the admin socket to query stuff
 | 
				
			||||||
| 
						 | 
					@ -52,7 +52,7 @@ func (r *router) init(core *Core) {
 | 
				
			||||||
	r.reconfigure = make(chan chan error, 1)
 | 
						r.reconfigure = make(chan chan error, 1)
 | 
				
			||||||
	r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
 | 
						r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
 | 
				
			||||||
	r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
 | 
						r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
 | 
				
			||||||
	in := make(chan []byte, 1) // TODO something better than this...
 | 
						in := make(chan [][]byte, 1) // TODO something better than this...
 | 
				
			||||||
	self := linkInterface{
 | 
						self := linkInterface{
 | 
				
			||||||
		name: "(self)",
 | 
							name: "(self)",
 | 
				
			||||||
		info: linkInfo{
 | 
							info: linkInfo{
 | 
				
			||||||
| 
						 | 
					@ -62,7 +62,7 @@ func (r *router) init(core *Core) {
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
 | 
						p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
 | 
				
			||||||
	p.out = func(packet []byte) { in <- packet }
 | 
						p.out = func(packets [][]byte) { in <- packets }
 | 
				
			||||||
	r.in = in
 | 
						r.in = in
 | 
				
			||||||
	out := make(chan []byte, 32)
 | 
						out := make(chan []byte, 32)
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
| 
						 | 
					@ -114,8 +114,10 @@ func (r *router) mainLoop() {
 | 
				
			||||||
	defer ticker.Stop()
 | 
						defer ticker.Stop()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case p := <-r.in:
 | 
							case ps := <-r.in:
 | 
				
			||||||
 | 
								for _, p := range ps {
 | 
				
			||||||
				r.handleIn(p)
 | 
									r.handleIn(p)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		case info := <-r.core.dht.peers:
 | 
							case info := <-r.core.dht.peers:
 | 
				
			||||||
			r.core.dht.insertPeer(info)
 | 
								r.core.dht.insertPeer(info)
 | 
				
			||||||
		case <-r.reset:
 | 
							case <-r.reset:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -35,29 +35,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// writeMsg writes a message with stream padding, and is *not* thread safe.
 | 
					// writeMsg writes a message with stream padding, and is *not* thread safe.
 | 
				
			||||||
func (s *stream) writeMsg(bs []byte) (int, error) {
 | 
					func (s *stream) writeMsgs(bss [][]byte) (int, error) {
 | 
				
			||||||
	buf := s.outputBuffer[:0]
 | 
						buf := s.outputBuffer[:0]
 | 
				
			||||||
 | 
						var written int
 | 
				
			||||||
 | 
						for _, bs := range bss {
 | 
				
			||||||
		buf = append(buf, streamMsg[:])
 | 
							buf = append(buf, streamMsg[:])
 | 
				
			||||||
	l := wire_put_uint64(uint64(len(bs)), util.GetBytes())
 | 
							buf = append(buf, wire_encode_uint64(uint64(len(bs))))
 | 
				
			||||||
	defer util.PutBytes(l)
 | 
					 | 
				
			||||||
	buf = append(buf, l)
 | 
					 | 
				
			||||||
	padLen := len(buf[0]) + len(buf[1])
 | 
					 | 
				
			||||||
		buf = append(buf, bs)
 | 
							buf = append(buf, bs)
 | 
				
			||||||
	totalLen := padLen + len(bs)
 | 
							written += len(bs)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
 | 
						s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
 | 
				
			||||||
	var bn int
 | 
						_, err := buf.WriteTo(s.rwc)
 | 
				
			||||||
	for bn < totalLen {
 | 
						// TODO only include number of bytes from bs *successfully* written?
 | 
				
			||||||
		n, err := buf.WriteTo(s.rwc)
 | 
						return written, err
 | 
				
			||||||
		bn += int(n)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			l := bn - padLen
 | 
					 | 
				
			||||||
			if l < 0 {
 | 
					 | 
				
			||||||
				l = 0
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			return l, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return len(bs), nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
 | 
					// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -709,7 +709,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
 | 
				
			||||||
	if best != nil {
 | 
						if best != nil {
 | 
				
			||||||
		// Send to the best idle next hop
 | 
							// Send to the best idle next hop
 | 
				
			||||||
		delete(idle, best.port)
 | 
							delete(idle, best.port)
 | 
				
			||||||
		best.sendPacket(packet)
 | 
							best.sendPackets([][]byte{packet})
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Didn't find anyone idle to send it to
 | 
						// Didn't find anyone idle to send it to
 | 
				
			||||||
| 
						 | 
					@ -812,7 +812,7 @@ func (t *switchTable) handleIdle(port switchPort) bool {
 | 
				
			||||||
			// Need to update the map, since buf was retrieved by value
 | 
								// Need to update the map, since buf was retrieved by value
 | 
				
			||||||
			t.queues.bufs[best] = buf
 | 
								t.queues.bufs[best] = buf
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		to.sendPacket(packet.bytes)
 | 
							to.sendPackets([][]byte{packet.bytes})
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
| 
						 | 
					@ -826,7 +826,7 @@ func (t *switchTable) doWorker() {
 | 
				
			||||||
		// Keep sending packets to the router
 | 
							// Keep sending packets to the router
 | 
				
			||||||
		self := t.core.peers.getPorts()[0]
 | 
							self := t.core.peers.getPorts()[0]
 | 
				
			||||||
		for bs := range sendingToRouter {
 | 
							for bs := range sendingToRouter {
 | 
				
			||||||
			self.sendPacket(bs)
 | 
								self.sendPackets([][]byte{bs})
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue