mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-29 06:35:07 +03:00
make sure the only place traffic is ever dropped is in the switch. this currently disables the dedicated crypto workers
This commit is contained in:
parent
68dce0dd74
commit
042adb0516
4 changed files with 46 additions and 21 deletions
|
@ -68,17 +68,34 @@ func (r *router) init(core *Core) {
|
|||
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
|
||||
in := make(chan []byte, 32) // TODO something better than this...
|
||||
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil)
|
||||
p.out = func(packet []byte) {
|
||||
// This is to make very sure it never blocks
|
||||
select {
|
||||
case in <- packet:
|
||||
return
|
||||
default:
|
||||
util.PutBytes(packet)
|
||||
}
|
||||
}
|
||||
p.out = func(packet []byte) { in <- packet }
|
||||
r.in = in
|
||||
r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block
|
||||
out := make(chan []byte, 32)
|
||||
go func() {
|
||||
for packet := range out {
|
||||
p.handlePacket(packet)
|
||||
}
|
||||
}()
|
||||
out2 := make(chan []byte, 32)
|
||||
go func() {
|
||||
// This worker makes sure r.out never blocks
|
||||
// It will buffer traffic long enough for the switch worker to take it
|
||||
// If (somehow) you can send faster than the switch can receive, then this would use unbounded memory
|
||||
// But crypto slows sends enough that the switch should always be able to take the packets...
|
||||
var buf [][]byte
|
||||
for {
|
||||
buf = append(buf, <-out2)
|
||||
for len(buf) > 0 {
|
||||
select {
|
||||
case bs := <-out2:
|
||||
buf = append(buf, bs)
|
||||
case out <- buf[0]:
|
||||
buf = buf[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
r.out = func(packet []byte) { out2 <- packet }
|
||||
r.toRecv = make(chan router_recvPacket, 32)
|
||||
recv := make(chan []byte, 32)
|
||||
send := make(chan []byte, 32)
|
||||
|
@ -306,6 +323,8 @@ func (r *router) sendPacket(bs []byte) {
|
|||
return
|
||||
}
|
||||
|
||||
sinfo.doSend(bs)
|
||||
return
|
||||
sinfo.send <- bs
|
||||
}
|
||||
}
|
||||
|
@ -385,6 +404,8 @@ func (r *router) handleTraffic(packet []byte) {
|
|||
if !isIn {
|
||||
return
|
||||
}
|
||||
sinfo.doRecv(&p)
|
||||
return
|
||||
sinfo.recv <- &p
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue