mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-28 22:25:07 +03:00
Run gofmt -s -w .
This commit is contained in:
parent
ae7b07ae6a
commit
b3ebe76b59
45 changed files with 5037 additions and 4288 deletions
|
@ -11,335 +11,424 @@ import "time"
|
|||
import "sync"
|
||||
import "sync/atomic"
|
||||
import "math"
|
||||
|
||||
//import "fmt"
|
||||
|
||||
type peers struct {
|
||||
core *Core
|
||||
mutex sync.Mutex // Synchronize writes to atomic
|
||||
ports atomic.Value //map[Port]*peer, use CoW semantics
|
||||
//ports map[Port]*peer
|
||||
core *Core
|
||||
mutex sync.Mutex // Synchronize writes to atomic
|
||||
ports atomic.Value //map[Port]*peer, use CoW semantics
|
||||
//ports map[Port]*peer
|
||||
}
|
||||
|
||||
func (ps *peers) init(c *Core) {
|
||||
ps.mutex.Lock()
|
||||
defer ps.mutex.Unlock()
|
||||
ps.putPorts(make(map[switchPort]*peer))
|
||||
ps.core = c
|
||||
ps.mutex.Lock()
|
||||
defer ps.mutex.Unlock()
|
||||
ps.putPorts(make(map[switchPort]*peer))
|
||||
ps.core = c
|
||||
}
|
||||
|
||||
func (ps *peers) getPorts() map[switchPort]*peer {
|
||||
return ps.ports.Load().(map[switchPort]*peer)
|
||||
return ps.ports.Load().(map[switchPort]*peer)
|
||||
}
|
||||
|
||||
func (ps *peers) putPorts(ports map[switchPort]*peer) {
|
||||
ps.ports.Store(ports)
|
||||
ps.ports.Store(ports)
|
||||
}
|
||||
|
||||
type peer struct {
|
||||
// Rolling approximation of bandwidth, in bps, used by switch, updated by tcp
|
||||
// use get/update methods only! (atomic accessors as float64)
|
||||
bandwidth uint64
|
||||
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
|
||||
box boxPubKey
|
||||
sig sigPubKey
|
||||
shared boxSharedKey
|
||||
//in <-chan []byte
|
||||
//out chan<- []byte
|
||||
//in func([]byte)
|
||||
out func([]byte)
|
||||
core *Core
|
||||
port switchPort
|
||||
msgAnc *msgAnnounce
|
||||
msgHops []*msgHop
|
||||
myMsg *switchMessage
|
||||
mySigs []sigInfo
|
||||
// This is used to limit how often we perform expensive operations
|
||||
// Specifically, processing switch messages, signing, and verifying sigs
|
||||
// Resets at the start of each tick
|
||||
throttle uint8
|
||||
// Rolling approximation of bandwidth, in bps, used by switch, updated by tcp
|
||||
// use get/update methods only! (atomic accessors as float64)
|
||||
bandwidth uint64
|
||||
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
|
||||
box boxPubKey
|
||||
sig sigPubKey
|
||||
shared boxSharedKey
|
||||
//in <-chan []byte
|
||||
//out chan<- []byte
|
||||
//in func([]byte)
|
||||
out func([]byte)
|
||||
core *Core
|
||||
port switchPort
|
||||
msgAnc *msgAnnounce
|
||||
msgHops []*msgHop
|
||||
myMsg *switchMessage
|
||||
mySigs []sigInfo
|
||||
// This is used to limit how often we perform expensive operations
|
||||
// Specifically, processing switch messages, signing, and verifying sigs
|
||||
// Resets at the start of each tick
|
||||
throttle uint8
|
||||
}
|
||||
|
||||
const peer_Throttle = 1
|
||||
|
||||
func (p *peer) getBandwidth() float64 {
|
||||
bits := atomic.LoadUint64(&p.bandwidth)
|
||||
return math.Float64frombits(bits)
|
||||
bits := atomic.LoadUint64(&p.bandwidth)
|
||||
return math.Float64frombits(bits)
|
||||
}
|
||||
|
||||
func (p *peer) updateBandwidth(bytes int, duration time.Duration) {
|
||||
if p == nil { return }
|
||||
for ok := false ; !ok ; {
|
||||
oldBits := atomic.LoadUint64(&p.bandwidth)
|
||||
oldBandwidth := math.Float64frombits(oldBits)
|
||||
bandwidth := oldBandwidth * 7 / 8 + float64(bytes)/duration.Seconds()
|
||||
bits := math.Float64bits(bandwidth)
|
||||
ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
|
||||
}
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
for ok := false; !ok; {
|
||||
oldBits := atomic.LoadUint64(&p.bandwidth)
|
||||
oldBandwidth := math.Float64frombits(oldBits)
|
||||
bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds()
|
||||
bits := math.Float64bits(bandwidth)
|
||||
ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *peers) newPeer(box *boxPubKey,
|
||||
sig *sigPubKey) *peer {
|
||||
//in <-chan []byte,
|
||||
//out chan<- []byte) *peer {
|
||||
p := peer{box: *box,
|
||||
sig: *sig,
|
||||
shared: *getSharedKey(&ps.core.boxPriv, box),
|
||||
//in: in,
|
||||
//out: out,
|
||||
core: ps.core}
|
||||
ps.mutex.Lock()
|
||||
defer ps.mutex.Unlock()
|
||||
oldPorts := ps.getPorts()
|
||||
newPorts := make(map[switchPort]*peer)
|
||||
for k,v := range oldPorts{ newPorts[k] = v }
|
||||
for idx := switchPort(0) ; true ; idx++ {
|
||||
if _, isIn := newPorts[idx]; !isIn {
|
||||
p.port = switchPort(idx)
|
||||
newPorts[p.port] = &p
|
||||
break
|
||||
}
|
||||
}
|
||||
ps.putPorts(newPorts)
|
||||
return &p
|
||||
sig *sigPubKey) *peer {
|
||||
//in <-chan []byte,
|
||||
//out chan<- []byte) *peer {
|
||||
p := peer{box: *box,
|
||||
sig: *sig,
|
||||
shared: *getSharedKey(&ps.core.boxPriv, box),
|
||||
//in: in,
|
||||
//out: out,
|
||||
core: ps.core}
|
||||
ps.mutex.Lock()
|
||||
defer ps.mutex.Unlock()
|
||||
oldPorts := ps.getPorts()
|
||||
newPorts := make(map[switchPort]*peer)
|
||||
for k, v := range oldPorts {
|
||||
newPorts[k] = v
|
||||
}
|
||||
for idx := switchPort(0); true; idx++ {
|
||||
if _, isIn := newPorts[idx]; !isIn {
|
||||
p.port = switchPort(idx)
|
||||
newPorts[p.port] = &p
|
||||
break
|
||||
}
|
||||
}
|
||||
ps.putPorts(newPorts)
|
||||
return &p
|
||||
}
|
||||
|
||||
func (p *peer) linkLoop(in <-chan []byte) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case packet, ok := <-in:
|
||||
if !ok { return }
|
||||
p.handleLinkTraffic(packet)
|
||||
case <-ticker.C: {
|
||||
p.throttle = 0
|
||||
if p.port == 0 { continue } // Don't send announces on selfInterface
|
||||
// Maybe we shouldn't time out, and instead wait for a kill signal?
|
||||
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
|
||||
p.sendSwitchAnnounce()
|
||||
}
|
||||
}
|
||||
}
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case packet, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.handleLinkTraffic(packet)
|
||||
case <-ticker.C:
|
||||
{
|
||||
p.throttle = 0
|
||||
if p.port == 0 {
|
||||
continue
|
||||
} // Don't send announces on selfInterface
|
||||
// Maybe we shouldn't time out, and instead wait for a kill signal?
|
||||
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
|
||||
p.sendSwitchAnnounce()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) handlePacket(packet []byte, linkIn (chan<- []byte)) {
|
||||
pType, pTypeLen := wire_decode_uint64(packet)
|
||||
if pTypeLen==0 { return }
|
||||
switch (pType) {
|
||||
case wire_Traffic: p.handleTraffic(packet, pTypeLen)
|
||||
case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen)
|
||||
case wire_LinkProtocolTraffic: {
|
||||
select {
|
||||
case linkIn<-packet:
|
||||
default:
|
||||
}
|
||||
}
|
||||
default: /*panic(pType) ;*/ return
|
||||
}
|
||||
func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) {
|
||||
pType, pTypeLen := wire_decode_uint64(packet)
|
||||
if pTypeLen == 0 {
|
||||
return
|
||||
}
|
||||
switch pType {
|
||||
case wire_Traffic:
|
||||
p.handleTraffic(packet, pTypeLen)
|
||||
case wire_ProtocolTraffic:
|
||||
p.handleTraffic(packet, pTypeLen)
|
||||
case wire_LinkProtocolTraffic:
|
||||
{
|
||||
select {
|
||||
case linkIn <- packet:
|
||||
default:
|
||||
}
|
||||
}
|
||||
default: /*panic(pType) ;*/
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
||||
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
|
||||
ttlBegin := pTypeLen
|
||||
ttlEnd := pTypeLen+ttlLen
|
||||
coords, coordLen := wire_decode_coords(packet[ttlEnd:])
|
||||
coordEnd := ttlEnd+coordLen
|
||||
if coordEnd == len(packet) { return } // No payload
|
||||
toPort, newTTL := p.core.switchTable.lookup(coords, ttl)
|
||||
if toPort == p.port { return } // FIXME? shouldn't happen, does it? would loop
|
||||
to := p.core.peers.getPorts()[toPort]
|
||||
if to == nil { return }
|
||||
newTTLSlice := wire_encode_uint64(newTTL)
|
||||
// This mutates the packet in-place if the length of the TTL changes!
|
||||
shift := ttlLen - len(newTTLSlice)
|
||||
copy(packet[ttlBegin+shift:], newTTLSlice)
|
||||
copy(packet[shift:], packet[:pTypeLen])
|
||||
packet = packet[shift:]
|
||||
to.sendPacket(packet)
|
||||
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
|
||||
ttlBegin := pTypeLen
|
||||
ttlEnd := pTypeLen + ttlLen
|
||||
coords, coordLen := wire_decode_coords(packet[ttlEnd:])
|
||||
coordEnd := ttlEnd + coordLen
|
||||
if coordEnd == len(packet) {
|
||||
return
|
||||
} // No payload
|
||||
toPort, newTTL := p.core.switchTable.lookup(coords, ttl)
|
||||
if toPort == p.port {
|
||||
return
|
||||
} // FIXME? shouldn't happen, does it? would loop
|
||||
to := p.core.peers.getPorts()[toPort]
|
||||
if to == nil {
|
||||
return
|
||||
}
|
||||
newTTLSlice := wire_encode_uint64(newTTL)
|
||||
// This mutates the packet in-place if the length of the TTL changes!
|
||||
shift := ttlLen - len(newTTLSlice)
|
||||
copy(packet[ttlBegin+shift:], newTTLSlice)
|
||||
copy(packet[shift:], packet[:pTypeLen])
|
||||
packet = packet[shift:]
|
||||
to.sendPacket(packet)
|
||||
}
|
||||
|
||||
func (p *peer) sendPacket(packet []byte) {
|
||||
// Is there ever a case where something more complicated is needed?
|
||||
// What if p.out blocks?
|
||||
p.out(packet)
|
||||
// Is there ever a case where something more complicated is needed?
|
||||
// What if p.out blocks?
|
||||
p.out(packet)
|
||||
}
|
||||
|
||||
func (p *peer) sendLinkPacket(packet []byte) {
|
||||
bs, nonce := boxSeal(&p.shared, packet, nil)
|
||||
linkPacket := wire_linkProtoTrafficPacket{
|
||||
toKey: p.box,
|
||||
fromKey: p.core.boxPub,
|
||||
nonce: *nonce,
|
||||
payload: bs,
|
||||
}
|
||||
packet = linkPacket.encode()
|
||||
p.sendPacket(packet)
|
||||
bs, nonce := boxSeal(&p.shared, packet, nil)
|
||||
linkPacket := wire_linkProtoTrafficPacket{
|
||||
toKey: p.box,
|
||||
fromKey: p.core.boxPub,
|
||||
nonce: *nonce,
|
||||
payload: bs,
|
||||
}
|
||||
packet = linkPacket.encode()
|
||||
p.sendPacket(packet)
|
||||
}
|
||||
|
||||
func (p *peer) handleLinkTraffic(bs []byte) {
|
||||
packet := wire_linkProtoTrafficPacket{}
|
||||
// TODO throttle on returns?
|
||||
if !packet.decode(bs) { return }
|
||||
if packet.toKey != p.core.boxPub { return }
|
||||
if packet.fromKey != p.box { return }
|
||||
payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce)
|
||||
if !isOK { return }
|
||||
pType, pTypeLen := wire_decode_uint64(payload)
|
||||
if pTypeLen == 0 { return }
|
||||
switch pType {
|
||||
case wire_SwitchAnnounce: p.handleSwitchAnnounce(payload)
|
||||
case wire_SwitchHopRequest: p.handleSwitchHopRequest(payload)
|
||||
case wire_SwitchHop: p.handleSwitchHop(payload)
|
||||
}
|
||||
packet := wire_linkProtoTrafficPacket{}
|
||||
// TODO throttle on returns?
|
||||
if !packet.decode(bs) {
|
||||
return
|
||||
}
|
||||
if packet.toKey != p.core.boxPub {
|
||||
return
|
||||
}
|
||||
if packet.fromKey != p.box {
|
||||
return
|
||||
}
|
||||
payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce)
|
||||
if !isOK {
|
||||
return
|
||||
}
|
||||
pType, pTypeLen := wire_decode_uint64(payload)
|
||||
if pTypeLen == 0 {
|
||||
return
|
||||
}
|
||||
switch pType {
|
||||
case wire_SwitchAnnounce:
|
||||
p.handleSwitchAnnounce(payload)
|
||||
case wire_SwitchHopRequest:
|
||||
p.handleSwitchHopRequest(payload)
|
||||
case wire_SwitchHop:
|
||||
p.handleSwitchHop(payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) handleSwitchAnnounce(packet []byte) {
|
||||
//p.core.log.Println("DEBUG: handleSwitchAnnounce")
|
||||
anc := msgAnnounce{}
|
||||
//err := wire_decode_struct(packet, &anc)
|
||||
//if err != nil { return }
|
||||
if !anc.decode(packet) { return }
|
||||
//if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil }
|
||||
if p.msgAnc == nil ||
|
||||
anc.root != p.msgAnc.root ||
|
||||
anc.tstamp != p.msgAnc.tstamp ||
|
||||
anc.seq != p.msgAnc.seq { p.msgHops = nil }
|
||||
p.msgAnc = &anc
|
||||
p.processSwitchMessage()
|
||||
//p.core.log.Println("DEBUG: handleSwitchAnnounce")
|
||||
anc := msgAnnounce{}
|
||||
//err := wire_decode_struct(packet, &anc)
|
||||
//if err != nil { return }
|
||||
if !anc.decode(packet) {
|
||||
return
|
||||
}
|
||||
//if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil }
|
||||
if p.msgAnc == nil ||
|
||||
anc.root != p.msgAnc.root ||
|
||||
anc.tstamp != p.msgAnc.tstamp ||
|
||||
anc.seq != p.msgAnc.seq {
|
||||
p.msgHops = nil
|
||||
}
|
||||
p.msgAnc = &anc
|
||||
p.processSwitchMessage()
|
||||
}
|
||||
|
||||
func (p *peer) requestHop(hop uint64) {
|
||||
//p.core.log.Println("DEBUG requestHop")
|
||||
req := msgHopReq{}
|
||||
req.root = p.msgAnc.root
|
||||
req.tstamp = p.msgAnc.tstamp
|
||||
req.seq = p.msgAnc.seq
|
||||
req.hop = hop
|
||||
packet := req.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
//p.core.log.Println("DEBUG requestHop")
|
||||
req := msgHopReq{}
|
||||
req.root = p.msgAnc.root
|
||||
req.tstamp = p.msgAnc.tstamp
|
||||
req.seq = p.msgAnc.seq
|
||||
req.hop = hop
|
||||
packet := req.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
}
|
||||
|
||||
func (p *peer) handleSwitchHopRequest(packet []byte) {
|
||||
//p.core.log.Println("DEBUG: handleSwitchHopRequest")
|
||||
if p.throttle > peer_Throttle { return }
|
||||
if p.myMsg == nil { return }
|
||||
req := msgHopReq{}
|
||||
if !req.decode(packet) { return }
|
||||
if req.root != p.myMsg.locator.root { return }
|
||||
if req.tstamp != p.myMsg.locator.tstamp { return }
|
||||
if req.seq != p.myMsg.seq { return }
|
||||
if uint64(len(p.myMsg.locator.coords)) <= req.hop { return }
|
||||
res := msgHop{}
|
||||
res.root = p.myMsg.locator.root
|
||||
res.tstamp = p.myMsg.locator.tstamp
|
||||
res.seq = p.myMsg.seq
|
||||
res.hop = req.hop
|
||||
res.port = p.myMsg.locator.coords[res.hop]
|
||||
sinfo := p.getSig(res.hop)
|
||||
//p.core.log.Println("DEBUG sig:", sinfo)
|
||||
res.next = sinfo.next
|
||||
res.sig = sinfo.sig
|
||||
packet = res.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
//p.core.log.Println("DEBUG: handleSwitchHopRequest")
|
||||
if p.throttle > peer_Throttle {
|
||||
return
|
||||
}
|
||||
if p.myMsg == nil {
|
||||
return
|
||||
}
|
||||
req := msgHopReq{}
|
||||
if !req.decode(packet) {
|
||||
return
|
||||
}
|
||||
if req.root != p.myMsg.locator.root {
|
||||
return
|
||||
}
|
||||
if req.tstamp != p.myMsg.locator.tstamp {
|
||||
return
|
||||
}
|
||||
if req.seq != p.myMsg.seq {
|
||||
return
|
||||
}
|
||||
if uint64(len(p.myMsg.locator.coords)) <= req.hop {
|
||||
return
|
||||
}
|
||||
res := msgHop{}
|
||||
res.root = p.myMsg.locator.root
|
||||
res.tstamp = p.myMsg.locator.tstamp
|
||||
res.seq = p.myMsg.seq
|
||||
res.hop = req.hop
|
||||
res.port = p.myMsg.locator.coords[res.hop]
|
||||
sinfo := p.getSig(res.hop)
|
||||
//p.core.log.Println("DEBUG sig:", sinfo)
|
||||
res.next = sinfo.next
|
||||
res.sig = sinfo.sig
|
||||
packet = res.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
}
|
||||
|
||||
func (p *peer) handleSwitchHop(packet []byte) {
|
||||
//p.core.log.Println("DEBUG: handleSwitchHop")
|
||||
if p.throttle > peer_Throttle { return }
|
||||
if p.msgAnc == nil { return }
|
||||
res := msgHop{}
|
||||
if !res.decode(packet) { return }
|
||||
if res.root != p.msgAnc.root { return }
|
||||
if res.tstamp != p.msgAnc.tstamp { return }
|
||||
if res.seq != p.msgAnc.seq { return }
|
||||
if res.hop != uint64(len(p.msgHops)) { return } // always process in order
|
||||
loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)}
|
||||
loc.root = res.root
|
||||
loc.tstamp = res.tstamp
|
||||
for _, hop := range p.msgHops { loc.coords = append(loc.coords, hop.port) }
|
||||
loc.coords = append(loc.coords, res.port)
|
||||
thisHopKey := &res.root
|
||||
if res.hop != 0 { thisHopKey = &p.msgHops[res.hop-1].next }
|
||||
bs := getBytesForSig(&res.next, &loc)
|
||||
if p.core.sigs.check(thisHopKey, &res.sig, bs) {
|
||||
p.msgHops = append(p.msgHops, &res)
|
||||
p.processSwitchMessage()
|
||||
} else {
|
||||
p.throttle++
|
||||
}
|
||||
//p.core.log.Println("DEBUG: handleSwitchHop")
|
||||
if p.throttle > peer_Throttle {
|
||||
return
|
||||
}
|
||||
if p.msgAnc == nil {
|
||||
return
|
||||
}
|
||||
res := msgHop{}
|
||||
if !res.decode(packet) {
|
||||
return
|
||||
}
|
||||
if res.root != p.msgAnc.root {
|
||||
return
|
||||
}
|
||||
if res.tstamp != p.msgAnc.tstamp {
|
||||
return
|
||||
}
|
||||
if res.seq != p.msgAnc.seq {
|
||||
return
|
||||
}
|
||||
if res.hop != uint64(len(p.msgHops)) {
|
||||
return
|
||||
} // always process in order
|
||||
loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)}
|
||||
loc.root = res.root
|
||||
loc.tstamp = res.tstamp
|
||||
for _, hop := range p.msgHops {
|
||||
loc.coords = append(loc.coords, hop.port)
|
||||
}
|
||||
loc.coords = append(loc.coords, res.port)
|
||||
thisHopKey := &res.root
|
||||
if res.hop != 0 {
|
||||
thisHopKey = &p.msgHops[res.hop-1].next
|
||||
}
|
||||
bs := getBytesForSig(&res.next, &loc)
|
||||
if p.core.sigs.check(thisHopKey, &res.sig, bs) {
|
||||
p.msgHops = append(p.msgHops, &res)
|
||||
p.processSwitchMessage()
|
||||
} else {
|
||||
p.throttle++
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) processSwitchMessage() {
|
||||
//p.core.log.Println("DEBUG: processSwitchMessage")
|
||||
if p.throttle > peer_Throttle { return }
|
||||
if p.msgAnc == nil { return }
|
||||
if uint64(len(p.msgHops)) < p.msgAnc.len {
|
||||
p.requestHop(uint64(len(p.msgHops)))
|
||||
return
|
||||
}
|
||||
p.throttle++
|
||||
if p.msgAnc.len != uint64(len(p.msgHops)) { return }
|
||||
msg := switchMessage{}
|
||||
coords := make([]switchPort, 0, len(p.msgHops))
|
||||
sigs := make([]sigInfo, 0, len(p.msgHops))
|
||||
for idx, hop := range p.msgHops {
|
||||
// Consistency checks, should be redundant (already checked these...)
|
||||
if hop.root != p.msgAnc.root { return }
|
||||
if hop.tstamp != p.msgAnc.tstamp { return }
|
||||
if hop.seq != p.msgAnc.seq { return }
|
||||
if hop.hop != uint64(idx) { return }
|
||||
coords = append(coords, hop.port)
|
||||
sigs = append(sigs, sigInfo{next: hop.next, sig: hop.sig})
|
||||
}
|
||||
msg.from = p.sig
|
||||
msg.locator.root = p.msgAnc.root
|
||||
msg.locator.tstamp = p.msgAnc.tstamp
|
||||
msg.locator.coords = coords
|
||||
msg.seq = p.msgAnc.seq
|
||||
//msg.RSeq = p.msgAnc.RSeq
|
||||
//msg.Degree = p.msgAnc.Deg
|
||||
p.core.switchTable.handleMessage(&msg, p.port, sigs)
|
||||
if len(coords) == 0 { return }
|
||||
// Reuse locator, set the coords to the peer's coords, to use in dht
|
||||
msg.locator.coords = coords[:len(coords)-1]
|
||||
// Pass a mesage to the dht informing it that this peer (still) exists
|
||||
dinfo := dhtInfo{
|
||||
key: p.box,
|
||||
coords: msg.locator.getCoords(),
|
||||
}
|
||||
p.core.dht.peers<-&dinfo
|
||||
//p.core.log.Println("DEBUG: processSwitchMessage")
|
||||
if p.throttle > peer_Throttle {
|
||||
return
|
||||
}
|
||||
if p.msgAnc == nil {
|
||||
return
|
||||
}
|
||||
if uint64(len(p.msgHops)) < p.msgAnc.len {
|
||||
p.requestHop(uint64(len(p.msgHops)))
|
||||
return
|
||||
}
|
||||
p.throttle++
|
||||
if p.msgAnc.len != uint64(len(p.msgHops)) {
|
||||
return
|
||||
}
|
||||
msg := switchMessage{}
|
||||
coords := make([]switchPort, 0, len(p.msgHops))
|
||||
sigs := make([]sigInfo, 0, len(p.msgHops))
|
||||
for idx, hop := range p.msgHops {
|
||||
// Consistency checks, should be redundant (already checked these...)
|
||||
if hop.root != p.msgAnc.root {
|
||||
return
|
||||
}
|
||||
if hop.tstamp != p.msgAnc.tstamp {
|
||||
return
|
||||
}
|
||||
if hop.seq != p.msgAnc.seq {
|
||||
return
|
||||
}
|
||||
if hop.hop != uint64(idx) {
|
||||
return
|
||||
}
|
||||
coords = append(coords, hop.port)
|
||||
sigs = append(sigs, sigInfo{next: hop.next, sig: hop.sig})
|
||||
}
|
||||
msg.from = p.sig
|
||||
msg.locator.root = p.msgAnc.root
|
||||
msg.locator.tstamp = p.msgAnc.tstamp
|
||||
msg.locator.coords = coords
|
||||
msg.seq = p.msgAnc.seq
|
||||
//msg.RSeq = p.msgAnc.RSeq
|
||||
//msg.Degree = p.msgAnc.Deg
|
||||
p.core.switchTable.handleMessage(&msg, p.port, sigs)
|
||||
if len(coords) == 0 {
|
||||
return
|
||||
}
|
||||
// Reuse locator, set the coords to the peer's coords, to use in dht
|
||||
msg.locator.coords = coords[:len(coords)-1]
|
||||
// Pass a mesage to the dht informing it that this peer (still) exists
|
||||
dinfo := dhtInfo{
|
||||
key: p.box,
|
||||
coords: msg.locator.getCoords(),
|
||||
}
|
||||
p.core.dht.peers <- &dinfo
|
||||
}
|
||||
|
||||
func (p *peer) sendSwitchAnnounce() {
|
||||
anc := msgAnnounce{}
|
||||
anc.root = p.myMsg.locator.root
|
||||
anc.tstamp = p.myMsg.locator.tstamp
|
||||
anc.seq = p.myMsg.seq
|
||||
anc.len = uint64(len(p.myMsg.locator.coords))
|
||||
//anc.Deg = p.myMsg.Degree
|
||||
//anc.RSeq = p.myMsg.RSeq
|
||||
packet := anc.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
anc := msgAnnounce{}
|
||||
anc.root = p.myMsg.locator.root
|
||||
anc.tstamp = p.myMsg.locator.tstamp
|
||||
anc.seq = p.myMsg.seq
|
||||
anc.len = uint64(len(p.myMsg.locator.coords))
|
||||
//anc.Deg = p.myMsg.Degree
|
||||
//anc.RSeq = p.myMsg.RSeq
|
||||
packet := anc.encode()
|
||||
p.sendLinkPacket(packet)
|
||||
}
|
||||
|
||||
func (p *peer) getSig(hop uint64) sigInfo {
|
||||
//p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop)
|
||||
if hop < uint64(len(p.mySigs)) { return p.mySigs[hop] }
|
||||
bs := getBytesForSig(&p.sig, &p.myMsg.locator)
|
||||
sig := sigInfo{}
|
||||
sig.next = p.sig
|
||||
sig.sig = *sign(&p.core.sigPriv, bs)
|
||||
p.mySigs = append(p.mySigs, sig)
|
||||
//p.core.log.Println("DEBUG sig bs:", bs)
|
||||
return sig
|
||||
//p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop)
|
||||
if hop < uint64(len(p.mySigs)) {
|
||||
return p.mySigs[hop]
|
||||
}
|
||||
bs := getBytesForSig(&p.sig, &p.myMsg.locator)
|
||||
sig := sigInfo{}
|
||||
sig.next = p.sig
|
||||
sig.sig = *sign(&p.core.sigPriv, bs)
|
||||
p.mySigs = append(p.mySigs, sig)
|
||||
//p.core.log.Println("DEBUG sig bs:", bs)
|
||||
return sig
|
||||
}
|
||||
|
||||
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
|
||||
//bs, err := wire_encode_locator(loc)
|
||||
//if err != nil { panic(err) }
|
||||
bs := append([]byte(nil), next[:]...)
|
||||
bs = append(bs, wire_encode_locator(loc)...)
|
||||
//bs := wire_encode_locator(loc)
|
||||
//bs = append(next[:], bs...)
|
||||
return bs
|
||||
//bs, err := wire_encode_locator(loc)
|
||||
//if err != nil { panic(err) }
|
||||
bs := append([]byte(nil), next[:]...)
|
||||
bs = append(bs, wire_encode_locator(loc)...)
|
||||
//bs := wire_encode_locator(loc)
|
||||
//bs = append(next[:], bs...)
|
||||
return bs
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue