mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-28 22:25:07 +03:00
minor documentation updates, code comments, and a couple of bugfixes that I noticed when going through the code to comment it
This commit is contained in:
parent
b0acc19e3d
commit
56802d569e
15 changed files with 436 additions and 85 deletions
|
@ -4,19 +4,16 @@ package yggdrasil
|
|||
// Commented code should be removed
|
||||
// Live code should be better commented
|
||||
|
||||
// FIXME (!) this part may be at least sligtly vulnerable to replay attacks
|
||||
// The switch message part should catch / drop old tstamps
|
||||
// So the damage is limited
|
||||
// But you could still mess up msgAnc / msgHops and break some things there
|
||||
// It needs to ignore messages with a lower seq
|
||||
// Probably best to start setting seq to a timestamp in that case...
|
||||
|
||||
import "time"
|
||||
import "sync"
|
||||
import "sync/atomic"
|
||||
|
||||
//import "fmt"
|
||||
|
||||
// The peers struct represents peers with an active connection.
|
||||
// Incomping packets are passed to the corresponding peer, which handles them somehow.
|
||||
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
|
||||
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
|
||||
type peers struct {
|
||||
core *Core
|
||||
mutex sync.Mutex // Synchronize writes to atomic
|
||||
|
@ -26,6 +23,7 @@ type peers struct {
|
|||
allowedEncryptionPublicKeys map[boxPubKey]struct{}
|
||||
}
|
||||
|
||||
// Initializes the peers struct.
|
||||
func (ps *peers) init(c *Core) {
|
||||
ps.mutex.Lock()
|
||||
defer ps.mutex.Unlock()
|
||||
|
@ -34,6 +32,7 @@ func (ps *peers) init(c *Core) {
|
|||
ps.allowedEncryptionPublicKeys = make(map[boxPubKey]struct{})
|
||||
}
|
||||
|
||||
// Returns true if an incoming peer connection to a key is allowed, either because the key is in the whitelist or because the whitelist is empty.
|
||||
func (ps *peers) isAllowedEncryptionPublicKey(box *boxPubKey) bool {
|
||||
ps.authMutex.RLock()
|
||||
defer ps.authMutex.RUnlock()
|
||||
|
@ -41,18 +40,21 @@ func (ps *peers) isAllowedEncryptionPublicKey(box *boxPubKey) bool {
|
|||
return isIn || len(ps.allowedEncryptionPublicKeys) == 0
|
||||
}
|
||||
|
||||
// Adds a key to the whitelist.
|
||||
func (ps *peers) addAllowedEncryptionPublicKey(box *boxPubKey) {
|
||||
ps.authMutex.Lock()
|
||||
defer ps.authMutex.Unlock()
|
||||
ps.allowedEncryptionPublicKeys[*box] = struct{}{}
|
||||
}
|
||||
|
||||
// Removes a key from the whitelist.
|
||||
func (ps *peers) removeAllowedEncryptionPublicKey(box *boxPubKey) {
|
||||
ps.authMutex.Lock()
|
||||
defer ps.authMutex.Unlock()
|
||||
delete(ps.allowedEncryptionPublicKeys, *box)
|
||||
}
|
||||
|
||||
// Gets the whitelist of allowed keys for incoming connections.
|
||||
func (ps *peers) getAllowedEncryptionPublicKeys() []boxPubKey {
|
||||
ps.authMutex.RLock()
|
||||
defer ps.authMutex.RUnlock()
|
||||
|
@ -63,14 +65,17 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []boxPubKey {
|
|||
return keys
|
||||
}
|
||||
|
||||
// Atomically gets a map[switchPort]*peer of known peers.
|
||||
func (ps *peers) getPorts() map[switchPort]*peer {
|
||||
return ps.ports.Load().(map[switchPort]*peer)
|
||||
}
|
||||
|
||||
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
|
||||
func (ps *peers) putPorts(ports map[switchPort]*peer) {
|
||||
ps.ports.Store(ports)
|
||||
}
|
||||
|
||||
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure.
|
||||
type peer struct {
|
||||
queueSize int64 // used to track local backpressure
|
||||
bytesSent uint64 // To track bandwidth usage for getPeers
|
||||
|
@ -90,14 +95,17 @@ type peer struct {
|
|||
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
|
||||
}
|
||||
|
||||
// Size of the queue of packets to be sent to the node.
|
||||
func (p *peer) getQueueSize() int64 {
|
||||
return atomic.LoadInt64(&p.queueSize)
|
||||
}
|
||||
|
||||
// Used to increment or decrement the queue.
|
||||
func (p *peer) updateQueueSize(delta int64) {
|
||||
atomic.AddInt64(&p.queueSize, delta)
|
||||
}
|
||||
|
||||
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
|
||||
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer {
|
||||
now := time.Now()
|
||||
p := peer{box: *box,
|
||||
|
@ -125,12 +133,13 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKe
|
|||
return &p
|
||||
}
|
||||
|
||||
// Removes a peer for a given port, if one exists.
|
||||
func (ps *peers) removePeer(port switchPort) {
|
||||
if port == 0 {
|
||||
return
|
||||
} // Can't remove self peer
|
||||
ps.core.router.doAdmin(func() {
|
||||
ps.core.switchTable.removePeer(port)
|
||||
ps.core.switchTable.unlockedRemovePeer(port)
|
||||
})
|
||||
ps.mutex.Lock()
|
||||
oldPorts := ps.getPorts()
|
||||
|
@ -150,6 +159,8 @@ func (ps *peers) removePeer(port switchPort) {
|
|||
}
|
||||
}
|
||||
|
||||
// If called, sends a notification to each peer that they should send a new switch message.
|
||||
// Mainly called by the switch after an update.
|
||||
func (ps *peers) sendSwitchMsgs() {
|
||||
ports := ps.getPorts()
|
||||
for _, p := range ports {
|
||||
|
@ -163,6 +174,8 @@ func (ps *peers) sendSwitchMsgs() {
|
|||
}
|
||||
}
|
||||
|
||||
// This must be launched in a separate goroutine by whatever sets up the peer struct.
|
||||
// It handles link protocol traffic.
|
||||
func (p *peer) linkLoop() {
|
||||
go func() { p.doSend <- struct{}{} }()
|
||||
tick := time.NewTicker(time.Second)
|
||||
|
@ -182,6 +195,8 @@ func (p *peer) linkLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// Called to handle incoming packets.
|
||||
// Passes the packet to a handler for that packet type.
|
||||
func (p *peer) handlePacket(packet []byte) {
|
||||
// TODO See comment in sendPacket about atomics technically being done wrong
|
||||
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet)))
|
||||
|
@ -197,10 +212,12 @@ func (p *peer) handlePacket(packet []byte) {
|
|||
case wire_LinkProtocolTraffic:
|
||||
p.handleLinkTraffic(packet)
|
||||
default:
|
||||
return
|
||||
util_putBytes(packet)
|
||||
}
|
||||
}
|
||||
|
||||
// Called to handle traffic or protocolTraffic packets.
|
||||
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
|
||||
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
||||
if p.port != 0 && p.dinfo == nil {
|
||||
// Drop traffic until the peer manages to send us at least one good switchMsg
|
||||
|
@ -221,14 +238,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
|||
to.sendPacket(packet)
|
||||
}
|
||||
|
||||
// This just calls p.out(packet) for now.
|
||||
func (p *peer) sendPacket(packet []byte) {
|
||||
// Is there ever a case where something more complicated is needed?
|
||||
// What if p.out blocks?
|
||||
p.out(packet)
|
||||
// TODO this should really happen at the interface, to account for LIFO packet drops and additional per-packet/per-message overhead, but this should be pretty close... better to move it to the tcp/udp stuff *after* rewriting both to give a common interface
|
||||
atomic.AddUint64(&p.bytesSent, uint64(len(packet)))
|
||||
}
|
||||
|
||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
||||
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
||||
func (p *peer) sendLinkPacket(packet []byte) {
|
||||
innerPayload, innerNonce := boxSeal(&p.linkShared, packet, nil)
|
||||
innerLinkPacket := wire_linkProtoTrafficPacket{
|
||||
|
@ -245,6 +263,8 @@ func (p *peer) sendLinkPacket(packet []byte) {
|
|||
p.linkOut <- packet
|
||||
}
|
||||
|
||||
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
|
||||
// Identifies the link traffic type and calls the appropriate handler.
|
||||
func (p *peer) handleLinkTraffic(bs []byte) {
|
||||
packet := wire_linkProtoTrafficPacket{}
|
||||
if !packet.decode(bs) {
|
||||
|
@ -269,10 +289,12 @@ func (p *peer) handleLinkTraffic(bs []byte) {
|
|||
switch pType {
|
||||
case wire_SwitchMsg:
|
||||
p.handleSwitchMsg(payload)
|
||||
default: // TODO?...
|
||||
default:
|
||||
util_putBytes(bs)
|
||||
}
|
||||
}
|
||||
|
||||
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
|
||||
func (p *peer) sendSwitchMsg() {
|
||||
msg := p.core.switchTable.getMsg()
|
||||
if msg == nil {
|
||||
|
@ -290,6 +312,8 @@ func (p *peer) sendSwitchMsg() {
|
|||
p.sendLinkPacket(packet)
|
||||
}
|
||||
|
||||
// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch.
|
||||
// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins).
|
||||
func (p *peer) handleSwitchMsg(packet []byte) {
|
||||
var msg switchMsg
|
||||
if !msg.decode(packet) {
|
||||
|
@ -330,6 +354,8 @@ func (p *peer) handleSwitchMsg(packet []byte) {
|
|||
p.dinfo = &dinfo
|
||||
}
|
||||
|
||||
// This generates the bytes that we sign or check the signature of for a switchMsg.
|
||||
// It begins with the next node's key, followed by the root and the timetsamp, followed by coords being advertised to the next node.
|
||||
func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte {
|
||||
var loc switchLocator
|
||||
for _, hop := range msg.Hops {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue