first code/readme/license commit

This commit is contained in:
Arceliar 2017-12-28 22:16:20 -06:00
parent 35852be36d
commit d7e6d814a0
60 changed files with 9768 additions and 2 deletions

108
src/yggdrasil/address.go Normal file
View file

@ -0,0 +1,108 @@
package yggdrasil
type address [16]byte // IPv6 address within the network
type subnet [8]byte // It's a /64
var address_prefix = [...]byte{0xfd} // For node addresses + local subnets
func (a *address) isValid() bool {
for idx := range address_prefix {
if (*a)[idx] != address_prefix[idx] { return false }
}
return (*a)[len(address_prefix)] & 0x80 == 0
}
func (s *subnet) isValid() bool {
for idx := range address_prefix {
if (*s)[idx] != address_prefix[idx] { return false }
}
return (*s)[len(address_prefix)] & 0x80 != 0
}
func address_addrForNodeID(nid *NodeID) *address {
// 128 bit address
// Begins with prefix
// Next bit is a 0
// Next 7 bits, interpreted as a uint, are # of leading 1s in the NodeID
// Leading 1s and first leading 0 of the NodeID are truncated off
// The rest is appended to the IPv6 address (truncated to 128 bits total)
var addr address
var temp []byte
done := false
ones := byte(0)
bits := byte(0)
nBits := 0
for idx := 0 ; idx < 8*len(nid) ; idx++ {
bit := (nid[idx/8] & (0x80 >> byte(idx % 8))) >> byte(7 - (idx % 8))
if !done && bit != 0 {
ones++
continue
}
if !done && bit == 0 {
done = true
continue // FIXME this assumes that ones <= 127
}
bits = (bits << 1) | bit
nBits++
if nBits == 8 {
nBits = 0
temp = append(temp, bits)
}
}
copy(addr[:], address_prefix[:])
addr[len(address_prefix)] = ones & 0x7f
copy(addr[len(address_prefix)+1:], temp)
return &addr
}
func address_subnetForNodeID(nid *NodeID) *subnet {
// Exactly as the address version, with two exceptions:
// 1) The first bit after the fixed prefix is a 1 instead of a 0
// 2) It's truncated to a subnet prefix length instead of 128 bits
addr := *address_addrForNodeID(nid)
var snet subnet
copy(snet[:], addr[:])
snet[len(address_prefix)] |= 0x80
return &snet
}
func (a *address) getNodeIDandMask() (*NodeID, *NodeID) {
// Mask is a bitmask to mark the bits visible from the address
// This means truncated leading 1s, first leading 0, and visible part of addr
var nid NodeID
var mask NodeID
ones := int(a[len(address_prefix)] & 0x7f)
for idx := 0 ; idx < ones ; idx++ { nid[idx/8] |= 0x80 >> byte(idx % 8) }
nidOffset := ones+1
addrOffset := 8*len(address_prefix)+8
for idx := addrOffset ; idx < 8*len(a) ; idx++ {
bits := a[idx/8] & (0x80 >> byte(idx % 8))
bits <<= byte(idx % 8)
nidIdx := nidOffset + (idx - addrOffset)
bits >>= byte(nidIdx % 8)
nid[nidIdx/8] |= bits
}
maxMask := 8*(len(a) - len(address_prefix) - 1) + ones + 1
for idx := 0 ; idx < maxMask ; idx++ { mask[idx/8] |= 0x80 >> byte(idx % 8) }
return &nid, &mask
}
func (s *subnet) getNodeIDandMask() (*NodeID, *NodeID) {
// As witht he address version, but visible parts of the subnet prefix instead
var nid NodeID
var mask NodeID
ones := int(s[len(address_prefix)] & 0x7f)
for idx := 0 ; idx < ones ; idx++ { nid[idx/8] |= 0x80 >> byte(idx % 8) }
nidOffset := ones+1
addrOffset := 8*len(address_prefix)+8
for idx := addrOffset ; idx < 8*len(s) ; idx++ {
bits := s[idx/8] & (0x80 >> byte(idx % 8))
bits <<= byte(idx % 8)
nidIdx := nidOffset + (idx - addrOffset)
bits >>= byte(nidIdx % 8)
nid[nidIdx/8] |= bits
}
maxMask := 8*(len(s) - len(address_prefix) - 1) + ones + 1
for idx := 0 ; idx < maxMask ; idx++ { mask[idx/8] |= 0x80 >> byte(idx % 8) }
return &nid, &mask
}

64
src/yggdrasil/core.go Normal file
View file

@ -0,0 +1,64 @@
package yggdrasil
import "io/ioutil"
import "log"
type Core struct {
// This is the main data structure that holds everything else for a node
// TODO? move keys out of core and into something more appropriate
// e.g. box keys live in sessions
// sig keys live in peers or sigs (or wherever signing/validating logic is)
boxPub boxPubKey
boxPriv boxPrivKey
sigPub sigPubKey
sigPriv sigPrivKey
switchTable switchTable
peers peers
sigs sigManager
sessions sessions
router router
dht dht
tun tunDevice
searches searches
tcp *tcpInterface
udp *udpInterface
log *log.Logger
}
func (c *Core) Init() {
// Only called by the simulator, to set up nodes with random keys
bpub, bpriv := newBoxKeys()
spub, spriv := newSigKeys()
c.init(bpub, bpriv, spub, spriv)
}
func (c *Core) init(bpub *boxPubKey,
bpriv *boxPrivKey,
spub *sigPubKey,
spriv *sigPrivKey) {
// TODO separate init and start functions
// Init sets up structs
// Start launches goroutines that depend on structs being set up
// This is pretty much required to avoid race conditions
util_initByteStore()
c.log = log.New(ioutil.Discard, "", 0)
c.boxPub, c.boxPriv = *bpub, *bpriv
c.sigPub, c.sigPriv = *spub, *spriv
c.sigs.init()
c.searches.init(c)
c.dht.init(c)
c.sessions.init(c)
c.peers.init(c)
c.router.init(c)
c.switchTable.init(c, c.sigPub) // TODO move before peers? before router?
c.tun.init(c)
}
func (c *Core) GetNodeID() *NodeID {
return getNodeID(&c.boxPub)
}
func (c *Core) GetTreeID() *TreeID {
return getTreeID(&c.sigPub)
}

154
src/yggdrasil/crypto.go Normal file
View file

@ -0,0 +1,154 @@
package yggdrasil
/*
This part of the package wraps crypto operations needed elsewhere
In particular, it exposes key generation for ed25519 and nacl box
It also defines NodeID and TreeID as hashes of keys, and wraps hash functions
*/
import "crypto/rand"
import "crypto/sha512"
import "golang.org/x/crypto/ed25519"
import "golang.org/x/crypto/nacl/box"
////////////////////////////////////////////////////////////////////////////////
// NodeID and TreeID
const NodeIDLen = sha512.Size
const TreeIDLen = sha512.Size
const handleLen = 8
type NodeID [NodeIDLen]byte
type TreeID [TreeIDLen]byte
type handle [handleLen]byte
func getNodeID(pub *boxPubKey) *NodeID {
h := sha512.Sum512(pub[:])
return (*NodeID)(&h)
}
func getTreeID(pub *sigPubKey) *TreeID {
h := sha512.Sum512(pub[:])
return (*TreeID)(&h)
}
func newHandle() *handle {
var h handle
_, err := rand.Read(h[:])
if err != nil { panic(err) }
return &h
}
////////////////////////////////////////////////////////////////////////////////
// Signatures
const sigPubKeyLen = ed25519.PublicKeySize
const sigPrivKeyLen = ed25519.PrivateKeySize
const sigLen = ed25519.SignatureSize
type sigPubKey [sigPubKeyLen]byte
type sigPrivKey [sigPrivKeyLen]byte
type sigBytes [sigLen]byte
func newSigKeys() (*sigPubKey, *sigPrivKey) {
var pub sigPubKey
var priv sigPrivKey
pubSlice, privSlice, err := ed25519.GenerateKey(rand.Reader)
if err != nil { panic(err) }
copy(pub[:], pubSlice)
copy(priv[:], privSlice)
return &pub, &priv
}
func sign(priv *sigPrivKey, msg []byte) *sigBytes {
var sig sigBytes
sigSlice := ed25519.Sign(priv[:], msg)
copy(sig[:], sigSlice)
return &sig
}
func verify(pub *sigPubKey, msg []byte, sig *sigBytes) bool {
// Should sig be an array instead of a slice?...
// It's fixed size, but
return ed25519.Verify(pub[:], msg, sig[:])
}
////////////////////////////////////////////////////////////////////////////////
// NaCl-like crypto "box" (curve25519+xsalsa20+poly1305)
const boxPubKeyLen = 32
const boxPrivKeyLen = 32
const boxSharedKeyLen = 32
const boxNonceLen = 24
type boxPubKey [boxPubKeyLen]byte
type boxPrivKey [boxPrivKeyLen]byte
type boxSharedKey [boxSharedKeyLen]byte
type boxNonce [boxNonceLen]byte
func newBoxKeys() (*boxPubKey, *boxPrivKey) {
pubBytes, privBytes, err := box.GenerateKey(rand.Reader)
if err != nil { panic(err) }
pub := (*boxPubKey)(pubBytes)
priv := (*boxPrivKey)(privBytes)
return pub, priv
}
func getSharedKey(myPrivKey *boxPrivKey,
othersPubKey *boxPubKey) *boxSharedKey {
var shared [boxSharedKeyLen]byte
priv := (*[boxPrivKeyLen]byte)(myPrivKey)
pub := (*[boxPubKeyLen]byte)(othersPubKey)
box.Precompute(&shared, pub, priv)
return (*boxSharedKey)(&shared)
}
func boxOpen(shared *boxSharedKey,
boxed []byte,
nonce *boxNonce) ([]byte, bool) {
out := util_getBytes()
//return append(out, boxed...), true // XXX HACK to test without encryption
s := (*[boxSharedKeyLen]byte)(shared)
n := (*[boxNonceLen]byte)(nonce)
unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s)
return unboxed, success
}
func boxSeal(shared *boxSharedKey, unboxed []byte, nonce *boxNonce) ([]byte, *boxNonce) {
if nonce == nil { nonce = newBoxNonce() }
nonce.update()
out := util_getBytes()
//return append(out, unboxed...), nonce // XXX HACK to test without encryption
s := (*[boxSharedKeyLen]byte)(shared)
n := (*[boxNonceLen]byte)(nonce)
boxed := box.SealAfterPrecomputation(out, unboxed, n, s)
return boxed, nonce
}
func newBoxNonce() *boxNonce {
var nonce boxNonce
_, err := rand.Read(nonce[:])
for ; err == nil && nonce[0] == 0xff ; _, err = rand.Read(nonce[:]){
// Make sure nonce isn't too high
// This is just to make rollover unlikely to happen
// Rollover is fine, but it may kill the session and force it to reopen
}
if err != nil { panic(err) }
return &nonce
}
func (n *boxNonce) update() {
oldNonce := *n
n[len(n)-1] += 2
for i := len(n)-2 ; i >= 0 ; i-- {
if n[i+1] < oldNonce[i+1] { n[i] += 1 }
}
}

336
src/yggdrasil/debug.go Normal file
View file

@ -0,0 +1,336 @@
package yggdrasil
// These are functions that should not exist
// They are (or were) used during development, to work around missing features
// They're also used to configure things from the outside
// It would be better to define and export a few config functions elsewhere
// Or define some remote API and call it to send/request configuration info
import _ "golang.org/x/net/ipv6" // TODO put this somewhere better
import "fmt"
import "net"
import "log"
// Core
func (c *Core) DEBUG_getSigPub() sigPubKey {
return (sigPubKey)(c.sigPub)
}
func (c *Core) DEBUG_getBoxPub() boxPubKey {
return (boxPubKey)(c.boxPub)
}
func (c *Core) DEBUG_getSend() (chan<- []byte) {
return c.tun.send
}
func (c *Core) DEBUG_getRecv() (<-chan []byte) {
return c.tun.recv
}
// Peer
func (c *Core) DEBUG_getPeers() *peers {
return &c.peers
}
func (ps *peers) DEBUG_newPeer(box boxPubKey,
sig sigPubKey) *peer {
//in <-chan []byte,
//out chan<- []byte) *peer {
return ps.newPeer(&box, &sig)//, in, out)
}
/*
func (ps *peers) DEBUG_startPeers() {
ps.mutex.RLock()
defer ps.mutex.RUnlock()
for _, p := range ps.ports {
if p == nil { continue }
go p.MainLoop()
}
}
*/
func (ps *peers) DEBUG_hasPeer(key sigPubKey) bool {
ports := ps.ports.Load().(map[switchPort]*peer)
for _, p := range ports {
if p == nil { continue }
if p.sig == key { return true }
}
return false
}
func (ps *peers) DEBUG_getPorts() map[switchPort]*peer {
ports := ps.ports.Load().(map[switchPort]*peer)
newPeers := make(map[switchPort]*peer)
for port, p := range ports{
newPeers[port] = p
}
return newPeers
}
func (p *peer) DEBUG_getSigKey() sigPubKey {
return p.sig
}
func (p *peer) DEEBUG_getPort() switchPort {
return p.port
}
// Router
func (c *Core) DEBUG_getSwitchTable() *switchTable {
return &c.switchTable
}
func (c *Core) DEBUG_getLocator() switchLocator {
return c.switchTable.getLocator()
}
func (l *switchLocator) DEBUG_getCoords() []byte {
return l.getCoords()
}
func (c *Core) DEBUG_switchLookup(dest []byte, ttl uint64) (switchPort, uint64) {
return c.switchTable.lookup(dest, ttl)
}
/*
func (t *switchTable) DEBUG_isDirty() bool {
//data := t.data.Load().(*tabledata)
t.mutex.RLock()
defer t.mutex.RUnlock()
data := t.data
return data.dirty
}
*/
func (t *switchTable) DEBUG_dumpTable() {
//data := t.data.Load().(*tabledata)
t.mutex.RLock()
defer t.mutex.RUnlock()
data := t.data
for _, peer := range data.peers {
//fmt.Println("DUMPTABLE:", t.treeID, peer.treeID, peer.port,
// peer.locator.Root, peer.coords,
// peer.reverse.Root, peer.reverse.Coords, peer.forward)
fmt.Println("DUMPTABLE:", t.key, peer.key, peer.locator.coords, peer.port/*, peer.forward*/)
}
}
func (t *switchTable) DEBUG_getReversePort(port switchPort) switchPort {
// Returns Port(0) if it cannot get the reverse peer for any reason
//data := t.data.Load().(*tabledata)
t.mutex.RLock()
defer t.mutex.RUnlock()
data := t.data
if port >= switchPort(len(data.peers)) { return switchPort(0) }
pinfo := data.peers[port]
if len(pinfo.locator.coords) < 1 { return switchPort(0) }
return pinfo.locator.coords[len(pinfo.locator.coords)-1]
}
// Wire
func DEBUG_wire_encode_coords(coords []byte) []byte {
return wire_encode_coords(coords)
}
// DHT, via core
func (c *Core) DEBUG_getDHTSize() int {
total := 0
for bidx := 0 ; bidx < c.dht.nBuckets() ; bidx++ {
b := c.dht.getBucket(bidx)
total += len(b.infos)
}
return total
}
// udpInterface
// FIXME udpInterface isn't exported
// So debug functions need to work differently...
/*
func (c *Core) DEBUG_setupLoopbackUDPInterface() {
iface := udpInterface{}
iface.init(c, "[::1]:0")
c.ifaces = append(c.ifaces[:0], &iface)
}
*/
/*
func (c *Core) DEBUG_getLoopbackAddr() net.Addr {
iface := c.ifaces[0]
return iface.sock.LocalAddr()
}
*/
/*
func (c *Core) DEBUG_addLoopbackPeer(addr *net.UDPAddr,
in (chan<- []byte),
out (<-chan []byte)) {
iface := c.ifaces[0]
iface.addPeer(addr, in, out)
}
*/
/*
func (c *Core) DEBUG_startLoopbackUDPInterface() {
iface := c.ifaces[0]
go iface.reader()
for addr, chs := range iface.peers {
udpAddr, err := net.ResolveUDPAddr("udp6", addr)
if err != nil { panic(err) }
go iface.writer(udpAddr, chs.out)
}
}
*/
////////////////////////////////////////////////////////////////////////////////
func (c *Core) DEBUG_getAddr() *address {
return address_addrForNodeID(&c.dht.nodeID)
}
func (c *Core) DEBUG_startTun() {
c.DEBUG_startTunWithMTU(1280)
}
func (c *Core) DEBUG_startTunWithMTU(mtu int) {
addr := c.DEBUG_getAddr()
straddr := fmt.Sprintf("%s/%v", net.IP(addr[:]).String(), 8*len(address_prefix))
err := c.tun.setup(straddr, mtu)
if err != nil { panic(err) }
go c.tun.read()
go c.tun.write()
}
func (c *Core) DEBUG_stopTun() {
c.tun.close()
}
////////////////////////////////////////////////////////////////////////////////
func (c *Core) DEBUG_newBoxKeys() (*boxPubKey, *boxPrivKey) {
return newBoxKeys()
}
func (c *Core) DEBUG_newSigKeys() (*sigPubKey, *sigPrivKey) {
return newSigKeys()
}
func (c *Core) DEBUG_getNodeID(pub *boxPubKey) *NodeID {
return getNodeID(pub)
}
func (c *Core) DEBUG_getTreeID(pub *sigPubKey) *TreeID {
return getTreeID(pub)
}
func (c *Core) DEBUG_addrForNodeID(nodeID *NodeID) string {
return net.IP(address_addrForNodeID(nodeID)[:]).String()
}
func (c *Core) DEBUG_init(bpub []byte,
bpriv []byte,
spub []byte,
spriv []byte) {
var boxPub boxPubKey
var boxPriv boxPrivKey
var sigPub sigPubKey
var sigPriv sigPrivKey
copy(boxPub[:], bpub)
copy(boxPriv[:], bpriv)
copy(sigPub[:], spub)
copy(sigPriv[:], spriv)
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)
}
////////////////////////////////////////////////////////////////////////////////
func (c *Core) DEBUG_setupAndStartGlobalUDPInterface(addrport string) {
iface := udpInterface{}
iface.init(c, addrport)
c.udp = &iface
}
func (c *Core) DEBUG_getGlobalUDPAddr() net.Addr {
return c.udp.sock.LocalAddr()
}
func (c *Core) DEBUG_sendUDPKeys(saddr string) {
addr := connAddr(saddr)
c.udp.sendKeys(addr)
}
////////////////////////////////////////////////////////////////////////////////
//*
func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) {
iface := tcpInterface{}
iface.init(c, addrport)
c.tcp = &iface
}
func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr {
return c.tcp.serv.Addr().(*net.TCPAddr)
}
func (c *Core) DEBUG_addTCPConn(saddr string) {
c.tcp.call(saddr)
}
//*/
/*
func (c *Core) DEBUG_startSelfPeer() {
c.Peers.mutex.RLock()
defer c.Peers.mutex.RUnlock()
p := c.Peers.ports[0]
go p.MainLoop()
}
*/
////////////////////////////////////////////////////////////////////////////////
/*
func (c *Core) DEBUG_setupAndStartGlobalKCPInterface(addrport string) {
iface := kcpInterface{}
iface.init(c, addrport)
c.kcp = &iface
}
func (c *Core) DEBUG_getGlobalKCPAddr() net.Addr {
return c.kcp.serv.Addr()
}
func (c *Core) DEBUG_addKCPConn(saddr string) {
c.kcp.call(saddr)
}
*/
////////////////////////////////////////////////////////////////////////////////
func (c *Core) DEBUG_setLogger(log *log.Logger) {
c.log = log
}
////////////////////////////////////////////////////////////////////////////////
func DEBUG_simLinkPeers(p, q *peer) {
// Sets q.out() to point to p and starts p.linkLoop()
plinkIn := make(chan []byte, 1)
qlinkIn := make(chan []byte, 1)
p.out = func(bs []byte) {
go q.handlePacket(bs, qlinkIn)
}
q.out = func(bs []byte) {
go p.handlePacket(bs, plinkIn)
}
go p.linkLoop(plinkIn)
go q.linkLoop(qlinkIn)
}

383
src/yggdrasil/dht.go Normal file
View file

@ -0,0 +1,383 @@
package yggdrasil
/*
This part has the (kademlia-like) distributed hash table
It's used to look up coords for a NodeID
Every node participates in the DHT, and the DHT stores no real keys/values
(Only the peer relationships / lookups are needed)
This version is intentionally fragile, by being recursive instead of iterative
(it's also not parallel, as a result)
This is to make sure that DHT black holes are visible if they exist
(the iterative parallel approach tends to get around them sometimes)
I haven't seen this get stuck on blackholes, but I also haven't proven it can't
Slight changes *do* make it blackhole hard, bootstrapping isn't an easy problem
*/
// TODO handle the case where we try to look ourself up
// Ends up at bucket index NodeIDLen
// That's 1 too many
import "sort"
import "time"
//import "fmt"
// Maximum size for buckets and lookups
// Exception for buckets if the next one is non-full
const dht_bucket_size = 2 // This should be at least 2
const dht_lookup_size = 2 // This should be at least 1, below 2 is impractical
const dht_bucket_number = 8*NodeIDLen // This shouldn't be changed
type dhtInfo struct {
// TODO save their nodeID so we don't need to rehash if we need it again
nodeID_hidden *NodeID
key boxPubKey
coords []byte
send time.Time // When we last sent a message
recv time.Time // When we last received a message
pings int // Decide when to drop
}
func (info *dhtInfo) getNodeID() *NodeID {
if info.nodeID_hidden == nil {
info.nodeID_hidden = getNodeID(&info.key)
}
return info.nodeID_hidden
}
type bucket struct {
infos []*dhtInfo
}
type dhtReq struct {
key boxPubKey // Key of whoever asked
coords []byte // Coords of whoever asked
dest NodeID // NodeID they're asking about
}
type dhtRes struct {
key boxPubKey // key to respond to
coords []byte // coords to respond to
dest NodeID
infos []*dhtInfo // response
}
type dht struct {
core *Core
nodeID NodeID
buckets_hidden [dht_bucket_number]bucket // Extra is for the self-bucket
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time
offset int
}
func (t *dht) init(c *Core) {
t.core = c
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1)
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
}
func (t *dht) handleReq(req *dhtReq) {
// Send them what they asked for
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{
key: t.core.boxPub,
coords: coords,
dest: req.dest,
infos: t.lookup(&req.dest),
}
t.sendRes(&res, req)
// Also (possibly) add them to our DHT
info := dhtInfo{
key: req.key,
coords: req.coords,
}
t.insertIfNew(&info) // This seems DoSable (we just trust their coords...)
//if req.dest != t.nodeID { t.ping(&info, info.getNodeID()) } // Or spam...
}
func (t *dht) handleRes(res *dhtRes) {
reqs, isIn := t.reqs[res.key]
if !isIn { return }
_, isIn = reqs[res.dest]
if !isIn { return }
rinfo := dhtInfo{
key: res.key,
coords: res.coords,
send: time.Now(), // Technically wrong but should be OK... FIXME or not
recv: time.Now(),
}
// If they're already in the table, then keep the correct send time
bidx, isOK := t.getBucketIndex(rinfo.getNodeID())
if !isOK { return }
b := t.getBucket(bidx)
for _, oldinfo := range b.infos {
if oldinfo.key == rinfo.key {rinfo.send = oldinfo.send }
}
// Insert into table
t.insert(&rinfo)
if res.dest == *rinfo.getNodeID() { return } // No infinite recursions
// ping the nodes we were told about
if len(res.infos) > dht_lookup_size {
// Ignore any "extra" lookup results
res.infos = res.infos[:dht_lookup_size]
}
for _, info := range res.infos {
bidx, isOK := t.getBucketIndex(info.getNodeID())
if !isOK { continue }
b := t.getBucket(bidx)
if b.contains(info) { continue } // wait for maintenance cycle to get them
t.ping(info, info.getNodeID())
}
}
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
// FIXME this allocates a bunch, sorts, and keeps the part it likes
// It would be better to only track the part it likes to begin with
addInfos := func (res []*dhtInfo, infos []*dhtInfo) ([]*dhtInfo) {
for _, info := range infos {
if info == nil { panic ("Should never happen!") }
if true || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
res = append(res, info)
}
}
return res
}
var res []*dhtInfo
for bidx := 0 ; bidx < t.nBuckets() ; bidx++ {
b := t.getBucket(bidx)
res = addInfos(res, b.infos)
}
doSort := func(infos []*dhtInfo) {
less := func (i, j int) bool {
return dht_firstCloserThanThird(infos[i].getNodeID(),
nodeID,
infos[j].getNodeID())
}
sort.SliceStable(infos, less)
}
doSort(res)
if len(res) > dht_lookup_size { res = res[:dht_lookup_size] }
return res
}
func (t *dht) getBucket(bidx int) *bucket {
return &t.buckets_hidden[bidx]
}
func (t *dht) nBuckets() int {
return len(t.buckets_hidden)
}
func (t *dht) insertIfNew(info *dhtInfo) {
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
// Insert a peer if and only if the bucket doesn't already contain it
nodeID := info.getNodeID()
bidx, isOK := t.getBucketIndex(nodeID)
if !isOK { return }
b := t.getBucket(bidx)
if !b.contains(info) {
// We've never heard this node before
// TODO is there a better time than "now" to set send/recv to?
// (Is there another "natural" choice that bootstraps faster?)
info.send = time.Now()
info.recv = info.send
t.insert(info)
}
}
func (t *dht) insert(info *dhtInfo) {
//fmt.Println("DEBUG: dht insert:", info.getNodeID(), info.coords)
// First update the time on this info
info.recv = time.Now()
// Get the bucket for this node
nodeID := info.getNodeID()
bidx, isOK := t.getBucketIndex(nodeID)
if !isOK { return }
b := t.getBucket(bidx)
// First drop any existing entry from the bucket
b.drop(&info.key)
// Now add to the *end* of the bucket
b.infos = append(b.infos, info)
// Check if the next bucket is non-full and return early if it is
if bidx+1 == t.nBuckets() { return }
bnext := t.getBucket(bidx+1)
if len(bnext.infos) < dht_bucket_size { return }
// Shrink from the *front* to requied size
for len(b.infos) > dht_bucket_size { b.infos = b.infos[1:] }
}
func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
for bidx := 0 ; bidx < t.nBuckets() ; bidx++ {
them := nodeID[bidx/8] & (0x80 >> byte(bidx % 8))
me := t.nodeID[bidx/8] & (0x80 >> byte(bidx % 8))
if them != me { return bidx, true }
}
return t.nBuckets(), false
}
func (b *bucket) contains(ninfo *dhtInfo) bool {
// Compares if key and coords match
for _, info := range b.infos {
if info == nil { panic("Should never happen") }
if info.key == ninfo.key {
if len(info.coords) != len(ninfo.coords) { return false }
for idx := 0 ; idx < len(info.coords) ; idx++ {
if info.coords[idx] != ninfo.coords[idx] { return false }
}
return true
}
}
return false
}
func (b *bucket) drop(key *boxPubKey) {
clean := func (infos []*dhtInfo) []*dhtInfo {
cleaned := infos[:0]
for _, info := range infos {
if info.key == *key { continue }
cleaned = append(cleaned, info)
}
return cleaned
}
b.infos = clean(b.infos)
}
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
// Send a dhtReq to the node in dhtInfo
bs := req.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: dest.coords,
toKey: dest.key,
fromKey: t.core.boxPub,
nonce: *nonce,
payload:payload,
}
packet := p.encode()
t.core.router.out(packet)
reqsToDest, isIn := t.reqs[dest.key]
if !isIn {
t.reqs[dest.key] = make(map[NodeID]time.Time)
reqsToDest, isIn = t.reqs[dest.key]
if !isIn { panic("This should never happen") }
}
reqsToDest[req.dest] = time.Now()
}
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
// Send a reply for a dhtReq
bs := res.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: req.coords,
toKey: req.key,
fromKey: t.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
t.core.router.out(packet)
}
func (b *bucket) isEmpty() bool {
return len(b.infos) == 0
}
func (b *bucket) nextToPing() *dhtInfo {
// Check the nodes in the bucket
// Return whichever one responded least recently
// Delay of 6 seconds between pinging the same node
// Gives them time to respond
// And time between traffic loss from short term congestion in the network
var toPing *dhtInfo
for _, next := range b.infos {
if time.Since(next.send) < 6*time.Second { continue }
if toPing == nil || next.recv.Before(toPing.recv) { toPing = next }
}
return toPing
}
func (t *dht) getTarget(bidx int) *NodeID {
targetID := t.nodeID
targetID[bidx/8] ^= 0x80 >> byte(bidx % 8)
return &targetID
}
func (t *dht) ping(info *dhtInfo, target *NodeID) {
if info.pings > 2 {
bidx, isOK := t.getBucketIndex(info.getNodeID())
if !isOK { panic("This should never happen") }
b := t.getBucket(bidx)
b.drop(&info.key)
return
}
if target == nil { target = &t.nodeID }
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{
key: t.core.boxPub,
coords: coords,
dest: *target,
}
info.pings++
info.send = time.Now()
t.sendReq(&req, info)
}
func (t *dht) doMaintenance() {
// First clean up reqs
for key, reqs := range t.reqs {
for target, timeout := range reqs {
if time.Since(timeout) > time.Minute { delete(reqs, target) }
}
if len(reqs) == 0 { delete(t.reqs, key) }
}
// Ping the least recently contacted node
// This is to make sure we eventually notice when someone times out
var oldest *dhtInfo
last := 0
for bidx := 0 ; bidx < t.nBuckets() ; bidx++ {
b := t.getBucket(bidx)
if !b.isEmpty() {
last = bidx
toPing := b.nextToPing()
if toPing == nil { continue } // We've recently pinged everyone in b
if oldest == nil || toPing.recv.Before(oldest.recv) {
oldest = toPing
}
}
}
if oldest != nil { t.ping(oldest, nil) } // if the DHT isn't empty
// Refresh buckets
if t.offset > last { t.offset = 0 }
target := t.getTarget(t.offset)
for _, info := range t.lookup(target) {
t.ping(info, target)
break
}
t.offset++
}
func dht_firstCloserThanThird(first *NodeID,
second *NodeID,
third *NodeID) bool {
for idx := 0 ; idx < NodeIDLen ; idx++ {
f := first[idx] ^ second[idx]
t := third[idx] ^ second[idx]
if f == t { continue }
return f < t
}
return false
}

345
src/yggdrasil/peer.go Normal file
View file

@ -0,0 +1,345 @@
package yggdrasil
// TODO cleanup, this file is kind of a mess
// FIXME? this part may be at least sligtly vulnerable to replay attacks
// The switch message part should catch / drop old tstamps
// So the damage is limited
// But you could still mess up msgAnc / msgHops and break some things there
import "time"
import "sync"
import "sync/atomic"
import "math"
//import "fmt"
type peers struct {
core *Core
mutex sync.Mutex // Synchronize writes to atomic
ports atomic.Value //map[Port]*peer, use CoW semantics
//ports map[Port]*peer
}
func (ps *peers) init(c *Core) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c
}
func (ps *peers) getPorts() map[switchPort]*peer {
return ps.ports.Load().(map[switchPort]*peer)
}
func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by tcp
// use get/update methods only! (atomic accessors as float64)
bandwidth uint64
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
box boxPubKey
sig sigPubKey
shared boxSharedKey
//in <-chan []byte
//out chan<- []byte
//in func([]byte)
out func([]byte)
core *Core
port switchPort
msgAnc *msgAnnounce
msgHops []*msgHop
myMsg *switchMessage
mySigs []sigInfo
// This is used to limit how often we perform expensive operations
// Specifically, processing switch messages, signing, and verifying sigs
// Resets at the start of each tick
throttle uint8
}
const peer_Throttle = 1
func (p *peer) getBandwidth() float64 {
bits := atomic.LoadUint64(&p.bandwidth)
return math.Float64frombits(bits)
}
func (p *peer) updateBandwidth(bytes int, duration time.Duration) {
if p == nil { return }
for ok := false ; !ok ; {
oldBits := atomic.LoadUint64(&p.bandwidth)
oldBandwidth := math.Float64frombits(oldBits)
bandwidth := oldBandwidth * 7 / 8 + float64(bytes)/duration.Seconds()
bits := math.Float64bits(bandwidth)
ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
}
}
func (ps *peers) newPeer(box *boxPubKey,
sig *sigPubKey) *peer {
//in <-chan []byte,
//out chan<- []byte) *peer {
p := peer{box: *box,
sig: *sig,
shared: *getSharedKey(&ps.core.boxPriv, box),
//in: in,
//out: out,
core: ps.core}
ps.mutex.Lock()
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer)
for k,v := range oldPorts{ newPorts[k] = v }
for idx := switchPort(0) ; true ; idx++ {
if _, isIn := newPorts[idx]; !isIn {
p.port = switchPort(idx)
newPorts[p.port] = &p
break
}
}
ps.putPorts(newPorts)
return &p
}
func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case packet, ok := <-in:
if !ok { return }
p.handleLinkTraffic(packet)
case <-ticker.C: {
p.throttle = 0
if p.port == 0 { continue } // Don't send announces on selfInterface
// Maybe we shouldn't time out, and instead wait for a kill signal?
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
p.sendSwitchAnnounce()
}
}
}
}
func (p *peer) handlePacket(packet []byte, linkIn (chan<- []byte)) {
pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen==0 { return }
switch (pType) {
case wire_Traffic: p.handleTraffic(packet, pTypeLen)
case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen)
case wire_LinkProtocolTraffic: {
select {
case linkIn<-packet:
default:
}
}
default: /*panic(pType) ;*/ return
}
}
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
ttlBegin := pTypeLen
ttlEnd := pTypeLen+ttlLen
coords, coordLen := wire_decode_coords(packet[ttlEnd:])
coordEnd := ttlEnd+coordLen
if coordEnd == len(packet) { return } // No payload
toPort, newTTL := p.core.switchTable.lookup(coords, ttl)
if toPort == p.port { return } // FIXME? shouldn't happen, does it? would loop
to := p.core.peers.getPorts()[toPort]
if to == nil { return }
newTTLSlice := wire_encode_uint64(newTTL)
// This mutates the packet in-place if the length of the TTL changes!
shift := ttlLen - len(newTTLSlice)
copy(packet[ttlBegin+shift:], newTTLSlice)
copy(packet[shift:], packet[:pTypeLen])
packet = packet[shift:]
to.sendPacket(packet)
}
func (p *peer) sendPacket(packet []byte) {
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
p.out(packet)
}
func (p *peer) sendLinkPacket(packet []byte) {
bs, nonce := boxSeal(&p.shared, packet, nil)
linkPacket := wire_linkProtoTrafficPacket{
toKey: p.box,
fromKey: p.core.boxPub,
nonce: *nonce,
payload: bs,
}
packet = linkPacket.encode()
p.sendPacket(packet)
}
func (p *peer) handleLinkTraffic(bs []byte) {
packet := wire_linkProtoTrafficPacket{}
// TODO throttle on returns?
if !packet.decode(bs) { return }
if packet.toKey != p.core.boxPub { return }
if packet.fromKey != p.box { return }
payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce)
if !isOK { return }
pType, pTypeLen := wire_decode_uint64(payload)
if pTypeLen == 0 { return }
switch pType {
case wire_SwitchAnnounce: p.handleSwitchAnnounce(payload)
case wire_SwitchHopRequest: p.handleSwitchHopRequest(payload)
case wire_SwitchHop: p.handleSwitchHop(payload)
}
}
func (p *peer) handleSwitchAnnounce(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchAnnounce")
anc := msgAnnounce{}
//err := wire_decode_struct(packet, &anc)
//if err != nil { return }
if !anc.decode(packet) { return }
//if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil }
if p.msgAnc == nil ||
anc.root != p.msgAnc.root ||
anc.tstamp != p.msgAnc.tstamp ||
anc.seq != p.msgAnc.seq { p.msgHops = nil }
p.msgAnc = &anc
p.processSwitchMessage()
}
func (p *peer) requestHop(hop uint64) {
//p.core.log.Println("DEBUG requestHop")
req := msgHopReq{}
req.root = p.msgAnc.root
req.tstamp = p.msgAnc.tstamp
req.seq = p.msgAnc.seq
req.hop = hop
packet := req.encode()
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHopRequest(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHopRequest")
if p.throttle > peer_Throttle { return }
if p.myMsg == nil { return }
req := msgHopReq{}
if !req.decode(packet) { return }
if req.root != p.myMsg.locator.root { return }
if req.tstamp != p.myMsg.locator.tstamp { return }
if req.seq != p.myMsg.seq { return }
if uint64(len(p.myMsg.locator.coords)) <= req.hop { return }
res := msgHop{}
res.root = p.myMsg.locator.root
res.tstamp = p.myMsg.locator.tstamp
res.seq = p.myMsg.seq
res.hop = req.hop
res.port = p.myMsg.locator.coords[res.hop]
sinfo := p.getSig(res.hop)
//p.core.log.Println("DEBUG sig:", sinfo)
res.next = sinfo.next
res.sig = sinfo.sig
packet = res.encode()
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHop(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHop")
if p.throttle > peer_Throttle { return }
if p.msgAnc == nil { return }
res := msgHop{}
if !res.decode(packet) { return }
if res.root != p.msgAnc.root { return }
if res.tstamp != p.msgAnc.tstamp { return }
if res.seq != p.msgAnc.seq { return }
if res.hop != uint64(len(p.msgHops)) { return } // always process in order
loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)}
loc.root = res.root
loc.tstamp = res.tstamp
for _, hop := range p.msgHops { loc.coords = append(loc.coords, hop.port) }
loc.coords = append(loc.coords, res.port)
thisHopKey := &res.root
if res.hop != 0 { thisHopKey = &p.msgHops[res.hop-1].next }
bs := getBytesForSig(&res.next, &loc)
if p.core.sigs.check(thisHopKey, &res.sig, bs) {
p.msgHops = append(p.msgHops, &res)
p.processSwitchMessage()
} else {
p.throttle++
}
}
func (p *peer) processSwitchMessage() {
//p.core.log.Println("DEBUG: processSwitchMessage")
if p.throttle > peer_Throttle { return }
if p.msgAnc == nil { return }
if uint64(len(p.msgHops)) < p.msgAnc.len {
p.requestHop(uint64(len(p.msgHops)))
return
}
p.throttle++
if p.msgAnc.len != uint64(len(p.msgHops)) { return }
msg := switchMessage{}
coords := make([]switchPort, 0, len(p.msgHops))
sigs := make([]sigInfo, 0, len(p.msgHops))
for idx, hop := range p.msgHops {
// Consistency checks, should be redundant (already checked these...)
if hop.root != p.msgAnc.root { return }
if hop.tstamp != p.msgAnc.tstamp { return }
if hop.seq != p.msgAnc.seq { return }
if hop.hop != uint64(idx) { return }
coords = append(coords, hop.port)
sigs = append(sigs, sigInfo{next: hop.next, sig: hop.sig})
}
msg.from = p.sig
msg.locator.root = p.msgAnc.root
msg.locator.tstamp = p.msgAnc.tstamp
msg.locator.coords = coords
msg.seq = p.msgAnc.seq
//msg.RSeq = p.msgAnc.RSeq
//msg.Degree = p.msgAnc.Deg
p.core.switchTable.handleMessage(&msg, p.port, sigs)
if len(coords) == 0 { return }
// Reuse locator, set the coords to the peer's coords, to use in dht
msg.locator.coords = coords[:len(coords)-1]
// Pass a mesage to the dht informing it that this peer (still) exists
dinfo := dhtInfo{
key: p.box,
coords: msg.locator.getCoords(),
}
p.core.dht.peers<-&dinfo
}
func (p *peer) sendSwitchAnnounce() {
anc := msgAnnounce{}
anc.root = p.myMsg.locator.root
anc.tstamp = p.myMsg.locator.tstamp
anc.seq = p.myMsg.seq
anc.len = uint64(len(p.myMsg.locator.coords))
//anc.Deg = p.myMsg.Degree
//anc.RSeq = p.myMsg.RSeq
packet := anc.encode()
p.sendLinkPacket(packet)
}
func (p *peer) getSig(hop uint64) sigInfo {
//p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop)
if hop < uint64(len(p.mySigs)) { return p.mySigs[hop] }
bs := getBytesForSig(&p.sig, &p.myMsg.locator)
sig := sigInfo{}
sig.next = p.sig
sig.sig = *sign(&p.core.sigPriv, bs)
p.mySigs = append(p.mySigs, sig)
//p.core.log.Println("DEBUG sig bs:", bs)
return sig
}
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
//bs, err := wire_encode_locator(loc)
//if err != nil { panic(err) }
bs := append([]byte(nil), next[:]...)
bs = append(bs, wire_encode_locator(loc)...)
//bs := wire_encode_locator(loc)
//bs = append(next[:], bs...)
return bs
}

220
src/yggdrasil/router.go Normal file
View file

@ -0,0 +1,220 @@
package yggdrasil
// This part does most of the work to handle packets to/from yourself
// It also manages crypto and dht info
// TODO? move dht stuff into another goroutine?
// Send:
// Receive a packet from the tun
// Look up session (if none exists, trigger a search)
// Hand off to session (which encrypts, etc)
// Session will pass it back to router.out, which hands it off to the self peer
// The self peer triggers a lookup to find which peer to send to next
// And then passes it to that's peer's peer.out function
// The peer.out function sends it over the wire to the matching peer
// Recv:
// A packet comes in off the wire, and goes to a peer.handlePacket
// The peer does a lookup, sees no better peer than the self
// Hands it to the self peer.out, which passes it to router.in
// If it's dht/seach/etc. traffic, the router passes it to that part
// If it's an encapsulated IPv6 packet, the router looks up the session for it
// The packet is passed to the session, which decrypts it, router.recvPacket
// The router then runs some sanity checks before passing it to the tun
import "time"
//import "fmt"
//import "net"
type router struct {
core *Core
addr address
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht)
}
func (r *router) init(core *Core) {
r.core = core
r.addr = *address_addrForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 1) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub)//, out, in)
// TODO set in/out functions on the new peer...
p.out = func(packet []byte) { in<-packet } // FIXME in theory it blocks...
r.in = in
// TODO? make caller responsible for go-ing if it needs to not block
r.out = func(packet []byte) { p.handlePacket(packet, nil) }
// TODO attach these to the tun
// Maybe that's the core's job...
// It creates tun, creates the router, creates channels, sets them?
recv := make(chan []byte, 1)
send := make(chan []byte, 1)
r.recv = recv
r.send = send
r.core.tun.recv = recv
r.core.tun.send = send
r.reset = make(chan struct{}, 1)
go r.mainLoop()
}
func (r *router) mainLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case p := <-r.in: r.handleIn(p)
case p := <-r.send: r.sendPacket(p)
case info := <-r.core.dht.peers: r.core.dht.insert(info) //r.core.dht.insertIfNew(info)
case <-r.reset: r.core.sessions.resetInits()
case <-ticker.C: {
// Any periodic maintenance stuff goes here
r.core.dht.doMaintenance()
util_getBytes() // To slowly drain things
}
}
}
}
func (r *router) sendPacket(bs []byte) {
if len(bs) < 40 { panic("Tried to send a packet shorter than a header...") }
var sourceAddr address
var sourceSubnet subnet
copy(sourceAddr[:], bs[8:])
copy(sourceSubnet[:], bs[8:])
if !sourceAddr.isValid() && !sourceSubnet.isValid() { return }
var dest address
copy(dest[:], bs[24:])
var snet subnet
copy(snet[:], bs[24:])
if !dest.isValid() && !snet.isValid() { return }
doSearch := func (packet []byte) {
var nodeID, mask *NodeID
if dest.isValid() { nodeID, mask = dest.getNodeIDandMask() }
if snet.isValid() { nodeID, mask = snet.getNodeIDandMask() }
sinfo, isIn := r.core.searches.searches[*nodeID]
if !isIn { sinfo = r.core.searches.createSearch(nodeID, mask) }
if packet != nil { sinfo.packet = packet }
r.core.searches.sendSearch(sinfo)
}
var sinfo *sessionInfo
var isIn bool
if dest.isValid() { sinfo, isIn = r.core.sessions.getByTheirAddr(&dest) }
if snet.isValid() { sinfo, isIn = r.core.sessions.getByTheirSubnet(&snet) }
switch {
case !isIn || !sinfo.init:
// No or unintiialized session, so we need to search first
doSearch(bs)
case time.Since(sinfo.time) > 6*time.Second:
// We haven't heard from the dest in a while; they may have changed coords
// Maybe the connection is idle, or maybe one of us changed coords
// Try searching to either ping them (a little overhead) or fix the coords
doSearch(nil)
fallthrough
//default: go func() { sinfo.send<-bs }()
default: sinfo.send<-bs
}
}
func (r *router) recvPacket(bs []byte, theirAddr *address) {
// TODO pass their NodeID, check *that* instead
// Or store their address in the session?...
//fmt.Println("Recv packet")
if theirAddr == nil { panic("Should not happen ever") }
if len(bs) < 24 { return }
var source address
copy(source[:], bs[8:])
var snet subnet
copy(snet[:], bs[8:])
if !source.isValid() && !snet.isValid() { return }
//go func() { r.recv<-bs }()
r.recv<-bs
}
func (r *router) handleIn(packet []byte) {
pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen == 0 { return }
switch pType {
case wire_Traffic: r.handleTraffic(packet)
case wire_ProtocolTraffic: r.handleProto(packet)
default: /*panic("Should not happen in testing") ;*/ return
}
}
func (r *router) handleTraffic(packet []byte) {
defer util_putBytes(packet)
p := wire_trafficPacket{}
if !p.decode(packet) { return }
sinfo, isIn := r.core.sessions.getSessionForHandle(&p.handle)
if !isIn { return }
//go func () { sinfo.recv<-&p }()
sinfo.recv<-&p
}
func (r *router) handleProto(packet []byte) {
// First parse the packet
p := wire_protoTrafficPacket{}
if !p.decode(packet) { return }
// Now try to open the payload
var sharedKey *boxSharedKey
//var theirPermPub *boxPubKey
if p.toKey == r.core.boxPub {
// Try to open using our permanent key
sharedKey = r.core.sessions.getSharedKey(&r.core.boxPriv, &p.fromKey)
} else { return }
bs, isOK := boxOpen(sharedKey, p.payload, &p.nonce)
if !isOK { return }
// Now do something with the bytes in bs...
// send dht messages to dht, sessionRefresh to sessions, data to tun...
// For data, should check that key and IP match...
bsType, bsTypeLen := wire_decode_uint64(bs)
if bsTypeLen == 0 { return }
//fmt.Println("RECV bytes:", bs)
switch bsType {
case wire_SessionPing: r.handlePing(bs, &p.fromKey)
case wire_SessionPong: r.handlePong(bs, &p.fromKey)
case wire_DHTLookupRequest: r.handleDHTReq(bs, &p.fromKey)
case wire_DHTLookupResponse: r.handleDHTRes(bs, &p.fromKey)
case wire_SearchRequest: r.handleSearchReq(bs)
case wire_SearchResponse: r.handleSearchRes(bs)
default: /*panic("Should not happen in testing") ;*/ return
}
}
func (r *router) handlePing(bs []byte, fromKey *boxPubKey) {
ping := sessionPing{}
if !ping.decode(bs) { return }
ping.sendPermPub = *fromKey
r.core.sessions.handlePing(&ping)
}
func (r *router) handlePong(bs []byte, fromKey *boxPubKey) {
r.handlePing(bs, fromKey)
}
func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) {
req := dhtReq{}
if !req.decode(bs) { return }
if req.key != *fromKey { return }
r.core.dht.handleReq(&req)
}
func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) {
res := dhtRes{}
if !res.decode(bs) { return }
if res.key != *fromKey { return }
r.core.dht.handleRes(&res)
}
func (r *router) handleSearchReq(bs []byte) {
req := searchReq{}
if !req.decode(bs) { return }
r.core.searches.handleSearchReq(&req)
}
func (r *router) handleSearchRes(bs []byte) {
res := searchRes{}
if !res.decode(bs) { return }
r.core.searches.handleSearchRes(&res)
}

168
src/yggdrasil/search.go Normal file
View file

@ -0,0 +1,168 @@
package yggdrasil
// This thing manages search packets
// The basic idea is as follows:
// We may know a NodeID (with a mask) and want to connect
// We forward a searchReq packet through the dht
// The last person in the dht will respond with a searchRes
// If the responders nodeID is close enough to the requested key, it matches
// The "close enough" is handled by a bitmask, set when the request is sent
// For testing in the sim, it must match exactly
// For the real world, the mask would need to map it to the desired IPv6
// This is also where we store the temporary keys used to send a request
// Would go in sessions, but can't open one without knowing perm key
// This is largely to avoid using an iterative DHT lookup approach
// The iterative parallel lookups from kad can skip over some DHT blackholes
// This hides bugs, which I don't want to do right now
import "time"
//import "fmt"
type searchInfo struct {
dest *NodeID
mask *NodeID
time time.Time
packet []byte
}
type searches struct {
core *Core
searches map[NodeID]*searchInfo
}
func (s *searches) init(core *Core) {
s.core = core
s.searches = make(map[NodeID]*searchInfo)
}
func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo {
now := time.Now()
for dest, sinfo := range s.searches {
if now.Sub(sinfo.time) > time.Minute {
delete(s.searches, dest)
}
}
info := searchInfo{
dest: dest,
mask: mask,
time: now.Add(-time.Second),
}
s.searches[*dest] = &info
return &info
}
////////////////////////////////////////////////////////////////////////////////
type searchReq struct {
key boxPubKey // Who I am
coords []byte // Where I am
dest NodeID // Who I'm trying to connect to
}
type searchRes struct {
key boxPubKey // Who I am
coords []byte // Where I am
dest NodeID // Who I was asked about
}
func (s *searches) sendSearch(info *searchInfo) {
now := time.Now()
if now.Sub(info.time) < time.Second { return }
loc := s.core.switchTable.getLocator()
coords := loc.getCoords()
req := searchReq{
key: s.core.boxPub,
coords: coords,
dest: *info.dest,
}
info.time = time.Now()
s.handleSearchReq(&req)
}
func (s *searches) handleSearchReq(req *searchReq) {
lookup := s.core.dht.lookup(&req.dest)
sent := false
//fmt.Println("DEBUG len:", len(lookup))
for _, info := range lookup {
//fmt.Println("DEBUG lup:", info.getNodeID())
if dht_firstCloserThanThird(info.getNodeID(),
&req.dest,
&s.core.dht.nodeID) {
s.forwardSearch(req, info)
sent = true
break
}
}
if !sent { s.sendSearchRes(req) }
}
func (s *searches) forwardSearch(req *searchReq, next *dhtInfo) {
//fmt.Println("DEBUG fwd:", req.dest, next.getNodeID())
bs := req.encode()
shared := s.core.sessions.getSharedKey(&s.core.boxPriv, &next.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: next.coords,
toKey: next.key,
fromKey: s.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
s.core.router.out(packet)
}
func (s *searches) sendSearchRes(req *searchReq) {
//fmt.Println("DEBUG res:", req.dest, s.core.dht.nodeID)
loc := s.core.switchTable.getLocator()
coords := loc.getCoords()
res := searchRes{
key: s.core.boxPub,
coords: coords,
dest: req.dest,
}
bs := res.encode()
shared := s.core.sessions.getSharedKey(&s.core.boxPriv, &req.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: req.coords,
toKey: req.key,
fromKey: s.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
s.core.router.out(packet)
}
func (s *searches) handleSearchRes(res *searchRes) {
info, isIn := s.searches[res.dest]
if !isIn { return }
them := getNodeID(&res.key)
var destMasked NodeID
var themMasked NodeID
for idx := 0 ; idx < NodeIDLen ; idx++ {
destMasked[idx] = info.dest[idx] & info.mask[idx]
themMasked[idx] = them[idx] & info.mask[idx]
}
//fmt.Println("DEBUG search res1:", themMasked, destMasked)
//fmt.Println("DEBUG search res2:", *them, *info.dest, *info.mask)
if themMasked != destMasked { return }
// They match, so create a session and send a sessionRequest
sinfo, isIn := s.core.sessions.getByTheirPerm(&res.key)
if !isIn {
sinfo = s.core.sessions.createSession(&res.key)
_, isIn := s.core.sessions.getByTheirPerm(&res.key)
if !isIn { panic("This should never happen") }
}
// FIXME replay attacks could mess with coords?
sinfo.coords = res.coords
sinfo.packet = info.packet
s.core.sessions.ping(sinfo)
// Cleanup
delete(s.searches, res.dest)
}

327
src/yggdrasil/session.go Normal file
View file

@ -0,0 +1,327 @@
package yggdrasil
// This is the session manager
// It's responsible for keeping track of open sessions to other nodes
// The session information consists of crypto keys and coords
import "time"
type sessionInfo struct {
core *Core
theirAddr address
theirSubnet subnet
theirPermPub boxPubKey
theirSesPub boxPubKey
mySesPub boxPubKey
mySesPriv boxPrivKey
sharedSesKey boxSharedKey // derived from session keys
theirHandle handle
myHandle handle
theirNonce boxNonce
myNonce boxNonce
time time.Time // Time we last received a packet
coords []byte // coords of destination
packet []byte // a buffered packet, sent immediately on ping/pong
init bool // Reset if coords change
send chan []byte
recv chan *wire_trafficPacket
nonceMask uint64
tstamp int64 // tstamp from their last session ping, replay attack mitigation
}
// FIXME replay attacks (include nonce or some sequence number)
type sessionPing struct {
sendPermPub boxPubKey // Sender's permanent key
handle handle // Random number to ID session
sendSesPub boxPubKey // Session key to use
coords []byte
tstamp int64 // unix time, but the only real requirement is that it increases
isPong bool
}
// Returns true if the session was updated, false otherwise
func (s *sessionInfo) update(p *sessionPing) bool {
if !(p.tstamp > s.tstamp) { return false }
if p.sendPermPub != s.theirPermPub { return false } // Shouldn't happen
if p.sendSesPub != s.theirSesPub {
// FIXME need to protect against replay attacks
// Put a sequence number or a timestamp or something in the pings?
// Or just return false, make the session time out?
s.theirSesPub = p.sendSesPub
s.theirHandle = p.handle
s.sharedSesKey = *getSharedKey(&s.mySesPriv, &s.theirSesPub)
s.theirNonce = boxNonce{}
s.nonceMask = 0
}
s.coords = append([]byte{}, p.coords...)
s.time = time.Now()
s.tstamp = p.tstamp
s.init = true
return true
}
func (s *sessionInfo) timedout() bool {
return time.Since(s.time) > time.Minute
}
type sessions struct {
core *Core
// Maps known permanent keys to their shared key, used by DHT a lot
permShared map[boxPubKey]*boxSharedKey
// Maps (secret) handle onto session info
sinfos map[handle]*sessionInfo
// Maps mySesPub onto handle
byMySes map[boxPubKey]*handle
// Maps theirPermPub onto handle
byTheirPerm map[boxPubKey]*handle
addrToPerm map[address]*boxPubKey
subnetToPerm map[subnet]*boxPubKey
}
func (ss *sessions) init(core *Core) {
ss.core = core
ss.permShared = make(map[boxPubKey]*boxSharedKey)
ss.sinfos = make(map[handle]*sessionInfo)
ss.byMySes = make(map[boxPubKey]*handle)
ss.byTheirPerm = make(map[boxPubKey]*handle)
ss.addrToPerm = make(map[address]*boxPubKey)
ss.subnetToPerm = make(map[subnet]*boxPubKey)
}
func (ss *sessions) getSessionForHandle(handle *handle) (*sessionInfo, bool) {
sinfo, isIn := ss.sinfos[*handle]
if isIn && sinfo.timedout() {
// We have a session, but it has timed out
return nil, false
}
return sinfo, isIn
}
func (ss *sessions) getByMySes(key *boxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byMySes[*key]
if !isIn { return nil, false }
sinfo, isIn := ss.getSessionForHandle(h)
return sinfo, isIn
}
func (ss *sessions) getByTheirPerm(key *boxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byTheirPerm[*key]
if !isIn { return nil, false }
sinfo, isIn := ss.getSessionForHandle(h)
return sinfo, isIn
}
func (ss *sessions) getByTheirAddr(addr *address) (*sessionInfo, bool) {
p, isIn := ss.addrToPerm[*addr]
if !isIn { return nil, false }
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
func (ss *sessions) getByTheirSubnet(snet *subnet) (*sessionInfo, bool) {
p, isIn := ss.subnetToPerm[*snet]
if !isIn { return nil, false }
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
sinfo := sessionInfo{}
sinfo.core = ss.core
sinfo.theirPermPub = *theirPermKey
pub, priv := newBoxKeys()
sinfo.mySesPub = *pub
sinfo.mySesPriv = *priv
sinfo.myNonce = *newBoxNonce() // TODO make sure nonceIsOK tolerates this
higher := false
for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
higher = true
break
} else if ss.core.boxPub[idx] < sinfo.theirPermPub[idx] {
break
}
}
if higher {
// higher => odd nonce
sinfo.myNonce[len(sinfo.myNonce)-1] |= 0x01
} else {
// lower => even nonce
sinfo.myNonce[len(sinfo.myNonce)-1] &= 0xfe
}
sinfo.myHandle = *newHandle()
sinfo.theirAddr = *address_addrForNodeID(getNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address_subnetForNodeID(getNodeID(&sinfo.theirPermPub))
sinfo.send = make(chan []byte, 1)
sinfo.recv = make(chan *wire_trafficPacket, 1)
go sinfo.doWorker()
sinfo.time = time.Now()
// Do some cleanup
// Time thresholds almost certainly could use some adjusting
for _, s := range ss.sinfos {
if s.timedout() { s.close() }
}
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub
ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub
return &sinfo
}
func (sinfo *sessionInfo) close() {
delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byMySes, sinfo.mySesPub)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr)
delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet)
close(sinfo.send)
close(sinfo.recv)
}
func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing {
loc := ss.core.switchTable.getLocator()
coords := loc.getCoords()
ref := sessionPing{
sendPermPub: ss.core.boxPub,
handle: sinfo.myHandle,
sendSesPub: sinfo.mySesPub,
tstamp: time.Now().Unix(),
coords: coords,
}
sinfo.myNonce.update()
return ref
}
func (ss *sessions) getSharedKey(myPriv *boxPrivKey,
theirPub *boxPubKey) *boxSharedKey {
if skey, isIn := ss.permShared[*theirPub] ; isIn { return skey }
// First do some cleanup
const maxKeys = dht_bucket_number*dht_bucket_size
for key := range ss.permShared {
// Remove a random key until the store is small enough
if len(ss.permShared) < maxKeys { break }
delete(ss.permShared, key)
}
ss.permShared[*theirPub] = getSharedKey(myPriv, theirPub)
return ss.permShared[*theirPub]
}
func (ss *sessions) ping(sinfo *sessionInfo) {
ss.sendPingPong(sinfo, false)
}
func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
ping := ss.getPing(sinfo)
ping.isPong = isPong
bs := ping.encode()
shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: sinfo.coords,
toKey: sinfo.theirPermPub,
fromKey: ss.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
ss.core.router.out(packet)
}
func (ss *sessions) handlePing(ping *sessionPing) {
// Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.sendPermPub)
if !isIn || sinfo.timedout() {
if isIn { sinfo.close() }
ss.createSession(&ping.sendPermPub)
sinfo, isIn = ss.getByTheirPerm(&ping.sendPermPub)
if !isIn { panic("This should not happen") }
}
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ ; return }
if !ping.isPong{ ss.sendPingPong(sinfo, true) }
if sinfo.packet != nil {
// send
var bs []byte
bs, sinfo.packet = sinfo.packet, nil
go func() { sinfo.send<-bs }()
}
}
func (n *boxNonce) minus(m *boxNonce) int64 {
diff := int64(0)
for idx := range n {
diff *= 256
diff += int64(n[idx]) - int64(m[idx])
if diff > 64 { diff = 64 }
if diff < -64 { diff = -64 }
}
return diff
}
func (sinfo *sessionInfo) nonceIsOK(theirNonce *boxNonce) bool {
// The bitmask is to allow for some non-duplicate out-of-order packets
diff := theirNonce.minus(&sinfo.theirNonce)
if diff > 0 { return true }
return ^sinfo.nonceMask & (0x01 << uint64(-diff)) != 0
}
func (sinfo *sessionInfo) updateNonce(theirNonce *boxNonce) {
// Shift nonce mask if needed
// Set bit
diff := theirNonce.minus(&sinfo.theirNonce)
if diff > 0 {
sinfo.nonceMask <<= uint64(diff)
sinfo.nonceMask &= 0x01
} else {
sinfo.nonceMask &= 0x01 << uint64(-diff)
}
sinfo.theirNonce = *theirNonce
}
func (ss *sessions) resetInits() {
for _, sinfo := range ss.sinfos { sinfo.init = false }
}
////////////////////////////////////////////////////////////////////////////////
// This is for a per-session worker
// It handles calling the relatively expensive crypto operations
// It's also responsible for keeping nonces consistent
func (sinfo *sessionInfo) doWorker() {
for {
select {
case p, ok := <-sinfo.recv: if ok { sinfo.doRecv(p) } else { return }
case bs, ok := <-sinfo.send: if ok { sinfo.doSend(bs) } else { return }
}
}
}
func (sinfo *sessionInfo) doSend(bs []byte) {
defer util_putBytes(bs)
if !sinfo.init { return } // To prevent using empty session keys
payload, nonce := boxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce)
defer util_putBytes(payload)
p := wire_trafficPacket{
ttl: ^uint64(0),
coords: sinfo.coords,
handle: sinfo.theirHandle,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
sinfo.core.router.out(packet)
}
func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) {
defer util_putBytes(p.payload)
if !sinfo.nonceIsOK(&p.nonce) { return }
bs, isOK := boxOpen(&sinfo.sharedSesKey, p.payload, &p.nonce)
if !isOK { util_putBytes(bs) ; return }
sinfo.updateNonce(&p.nonce)
sinfo.time = time.Now()
sinfo.core.router.recvPacket(bs, &sinfo.theirAddr)
}

View file

@ -0,0 +1,58 @@
package yggdrasil
// This is where we record which signatures we've previously checked
// It's so we can avoid needlessly checking them again
import "sync"
import "time"
type sigManager struct {
mutex sync.RWMutex
checked map[sigBytes]knownSig
lastCleaned time.Time
}
type knownSig struct {
bs []byte
time time.Time
}
func (m *sigManager) init() {
m.checked = make(map[sigBytes]knownSig)
}
func (m *sigManager) check(key *sigPubKey, sig *sigBytes, bs []byte) bool {
if m.isChecked(sig, bs) { return true }
verified := verify(key, bs, sig)
if verified { m.putChecked(sig, bs) }
return verified
}
func (m *sigManager) isChecked(sig *sigBytes, bs []byte) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
k, isIn := m.checked[*sig]
if !isIn { return false }
if len(bs) != len(k.bs) { return false }
for idx := 0 ; idx < len(bs) ; idx++ {
if bs[idx] != k.bs[idx] { return false }
}
k.time = time.Now()
return true
}
func (m *sigManager) putChecked(newsig *sigBytes, bs []byte) {
m.mutex.Lock()
defer m.mutex.Unlock()
now := time.Now()
if time.Since(m.lastCleaned) > 60*time.Second {
// Since we have the write lock anyway, do some cleanup
for s, k := range m.checked {
if time.Since(k.time) > 60*time.Second { delete(m.checked, s) }
}
m.lastCleaned = now
}
k := knownSig{bs: bs, time: now}
m.checked[*newsig] = k
}

398
src/yggdrasil/switch.go Normal file
View file

@ -0,0 +1,398 @@
package yggdrasil
// This part constructs a spanning tree of the network
// It routes packets based on distance on the spanning tree
// In general, this is *not* equivalent to routing on the tree
// It falls back to the tree in the worst case, but it can take shortcuts too
// This is the part that makse routing reasonably efficient on scale-free graphs
// TODO document/comment everything in a lot more detail
// TODO? use a pre-computed lookup table (python version had this)
// A little annoying to do with constant changes from bandwidth estimates
import "time"
import "sync"
import "sync/atomic"
//import "fmt"
const switch_timeout = time.Minute
// You should be able to provide crypto signatures for this
// 1 signature per coord, from the *sender* to that coord
// E.g. A->B->C has sigA(A->B) and sigB(A->B->C)
type switchLocator struct {
root sigPubKey
tstamp int64
coords []switchPort
}
func firstIsBetter(first, second *sigPubKey) bool {
// Higher TreeID is better
ftid := getTreeID(first)
stid := getTreeID(second)
for idx := 0 ; idx < len(ftid) ; idx++ {
if ftid[idx] == stid[idx] { continue }
return ftid[idx] > stid[idx]
}
// Edge case, when comparing identical IDs
return false
}
func (l *switchLocator) clone() switchLocator {
// Used to create a deep copy for use in messages
// Copy required because we need to mutate coords before sending
// (By appending the port from us to the destination)
loc := *l
loc.coords = make([]switchPort, len(l.coords), len(l.coords)+1)
copy(loc.coords, l.coords)
return loc
}
func (l *switchLocator) dist(dest []byte) int {
// Returns distance (on the tree) from these coords
offset := 0
fdc := 0
for {
if fdc >= len(l.coords) { break }
coord, length := wire_decode_uint64(dest[offset:])
if length == 0 { break }
if l.coords[fdc] != switchPort(coord) { break }
fdc++
offset += length
}
dist := len(l.coords[fdc:])
for {
_, length := wire_decode_uint64(dest[offset:])
if length == 0 { break }
dist++
offset += length
}
return dist
}
func (l *switchLocator) getCoords() []byte {
bs := make([]byte, 0, len(l.coords))
for _, coord := range l.coords {
c := wire_encode_uint64(uint64(coord))
bs = append(bs, c...)
}
return bs
}
func (x *switchLocator) isAncestorOf(y *switchLocator) bool {
if x.root != y.root { return false }
if len(x.coords) > len(y.coords) { return false }
for idx := range x.coords {
if x.coords[idx] != y.coords[idx] { return false }
}
return true
}
type peerInfo struct {
key sigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree
coords []switchPort // Coords of this peer (taken from coords of the sent locator)
time time.Time // Time this node was last seen
firstSeen time.Time
port switchPort // Interface number of this peer
seq uint64 // Seq number we last saw this peer advertise
}
type switchMessage struct {
from sigPubKey // key of the sender
locator switchLocator // Locator advertised for the receiver, not the sender's loc!
seq uint64
}
type switchPort uint64
type tableElem struct {
locator switchLocator
firstSeen time.Time
}
type lookupTable struct {
self switchLocator
elems map[switchPort]tableElem
}
type switchData struct {
// All data that's mutable and used by exported Table methods
// To be read/written with atomic.Value Store/Load calls
locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map[switchPort]peerInfo
sigs []sigInfo
}
type switchTable struct {
core *Core
key sigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
parent switchPort // Port of whatever peer is our parent, or self if we're root
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
data switchData
updater atomic.Value //*sync.Once
table atomic.Value //lookupTable
}
func (t *switchTable) init(core *Core, key sigPubKey) {
now := time.Now()
t.core = core
t.key = key
locator := switchLocator{root: key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{elems: make(map[switchPort]tableElem)})
t.drop = make(map[sigPubKey]int64)
doTicker := func () {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
t.Tick()
}
}
go doTicker()
}
func (t *switchTable) getLocator() switchLocator {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
}
func (t *switchTable) Tick() {
// Periodic maintenance work to keep things internally consistent
t.mutex.Lock() // Write lock
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanPeers()
t.cleanDropped()
}
func (t *switchTable) cleanRoot() {
// TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out
now := time.Now()
doUpdate := false
//fmt.Println("DEBUG clean root:", now.Sub(t.time))
if now.Sub(t.time) > switch_timeout {
//fmt.Println("root timed out", t.data.locator)
dropped := t.data.peers[t.parent]
dropped.time = t.time
t.drop[t.data.locator.root] = t.data.locator.tstamp
doUpdate = true
//t.core.log.Println("DEBUG: switch root timeout", len(t.drop))
}
// Or, if we're better than our root, root ourself
if firstIsBetter(&t.key, &t.data.locator.root) {
//fmt.Println("root is worse than us", t.data.locator.Root)
doUpdate = true
//t.core.log.Println("DEBUG: switch root replace with self", t.data.locator.Root)
}
// Or, if we are the root, possibly update our timestamp
if t.data.locator.root == t.key &&
now.Sub(t.time) > switch_timeout/2 {
//fmt.Println("root is self and old, updating", t.data.locator.Root)
doUpdate = true
}
if doUpdate {
t.parent = switchPort(0)
t.time = now
if t.data.locator.root != t.key {
t.data.seq++
t.updater.Store(&sync.Once{})
select {
case t.core.router.reset<-struct{}{}:
default:
}
}
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t.data.sigs = nil
}
}
func (t *switchTable) cleanPeers() {
now := time.Now()
changed := false
for idx, info := range t.data.peers {
if info.port != switchPort(0) && now.Sub(info.time) > 6*time.Second /*switch_timeout*/ {
//fmt.Println("peer timed out", t.key, info.locator)
delete(t.data.peers, idx)
changed = true
}
}
if changed { t.updater.Store(&sync.Once{}) }
}
func (t *switchTable) cleanDropped() {
// TODO only call this after root changes, not periodically
for root, _ := range t.drop {
if !firstIsBetter(&root, &t.data.locator.root) { delete(t.drop, root) }
}
}
func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) {
t.mutex.RLock()
defer t.mutex.RUnlock()
msg := switchMessage{from: t.key, locator: t.data.locator.clone()}
msg.locator.coords = append(msg.locator.coords, port)
msg.seq = t.data.seq
return &msg, t.data.sigs
}
func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) {
t.mutex.Lock()
defer t.mutex.Unlock()
now := time.Now()
if len(msg.locator.coords) == 0 { return } // Should always have >=1 links
oldSender, isIn := t.data.peers[fromPort]
if !isIn { oldSender.firstSeen = now }
sender := peerInfo{key: msg.from,
locator: msg.locator,
coords: msg.locator.coords[:len(msg.locator.coords)-1],
time: now,
firstSeen: oldSender.firstSeen,
port: fromPort,
seq: msg.seq}
equiv := func (x *switchLocator, y *switchLocator) bool {
if x.root != y.root { return false }
if len(x.coords) != len(y.coords) { return false }
for idx := range x.coords {
if x.coords[idx] != y.coords[idx] { return false }
}
return true
}
doUpdate := false
if !equiv(&msg.locator, &oldSender.locator) {
doUpdate = true
sender.firstSeen = now
}
t.data.peers[fromPort] = sender
updateRoot := false
oldParent, isIn := t.data.peers[t.parent]
noParent := !isIn
noLoop := func () bool {
for idx := 0 ; idx < len(sigs)-1 ; idx++ {
if sigs[idx].next == t.core.sigPub { return false }
}
if msg.locator.root == t.core.sigPub { return false }
return true
}()
sTime := now.Sub(sender.firstSeen)
pTime := oldParent.time.Sub(oldParent.firstSeen) + switch_timeout
// Really want to compare sLen/sTime and pLen/pTime
// Cross multiplied to avoid divide-by-zero
cost := len(msg.locator.coords)*int(pTime.Seconds())
pCost := len(t.data.locator.coords)*int(sTime.Seconds())
dropTstamp, isIn := t.drop[msg.locator.root]
// Here be dragons
switch {
case !noLoop: // do nothing
case isIn && dropTstamp >= msg.locator.tstamp: // do nothing
case firstIsBetter(&msg.locator.root, &t.data.locator.root): updateRoot = true
case t.data.locator.root != msg.locator.root: // do nothing
case t.data.locator.tstamp > msg.locator.tstamp: // do nothing
case noParent: updateRoot = true
case cost < pCost: updateRoot = true
case sender.port == t.parent &&
(msg.locator.tstamp > t.data.locator.tstamp ||
!equiv(&msg.locator, &t.data.locator)): updateRoot = true
}
if updateRoot {
if !equiv(&msg.locator, &t.data.locator) {
doUpdate = true
t.data.seq++
select {
case t.core.router.reset<-struct{}{}:
default:
}
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
}
if t.data.locator.tstamp != msg.locator.tstamp { t.time = now }
t.data.locator = msg.locator
t.parent = sender.port
t.data.sigs = sigs
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
}
if doUpdate { t.updater.Store(&sync.Once{}) }
return
}
func (t *switchTable) updateTable() {
// WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table
// Instead of checking distance for every destination every time
// Array of structs, indexed by first coord that differs from self
// Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest
t.mutex.RLock()
defer t.mutex.RUnlock()
newTable := lookupTable{
self: t.data.locator.clone(),
elems: make(map[switchPort]tableElem),
}
for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue }
loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems[pinfo.port] = tableElem {
locator: loc,
//degree: pinfo.degree,
firstSeen: pinfo.firstSeen,
//forward: pinfo.forward,
}
}
t.table.Store(newTable)
}
func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
t.updater.Load().(*sync.Once).Do(t.updateTable)
table := t.table.Load().(lookupTable)
ports := t.core.peers.getPorts()
getBandwidth := func (port switchPort) float64 {
var bandwidth float64
if p, isIn := ports[port]; isIn {
bandwidth = p.getBandwidth()
}
return bandwidth
}
var best switchPort
myDist := table.self.dist(dest) //getDist(table.self.coords)
if !(uint64(myDist) < ttl) { return 0, 0 }
// score is in units of bandwidth / distance
bestScore := float64(-1)
for port, info := range table.elems {
if info.locator.root != table.self.root { continue }
dist := info.locator.dist(dest) //getDist(info.locator.coords)
if !(dist < myDist) { continue }
score := getBandwidth(port)
score /= float64(1+dist)
if score > bestScore {
best = port
bestScore = score
}
}
//t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best))
return best, uint64(myDist)
}
////////////////////////////////////////////////////////////////////////////////
//Signature stuff
type sigInfo struct {
next sigPubKey
sig sigBytes
}
////////////////////////////////////////////////////////////////////////////////

246
src/yggdrasil/tcp.go Normal file
View file

@ -0,0 +1,246 @@
package yggdrasil
// This sends packets to peers using TCP as a transport
// It's generally better tested than the UDP implementation
// Using it regularly is insane, but I find TCP easier to test/debug with it
// Updating and optimizing the UDP version is a higher priority
// TODO:
// Something needs to make sure we're getting *valid* packets
// Could be used to DoS (connect, give someone else's keys, spew garbage)
// I guess the "peer" part should watch for link packets, disconnect?
import "net"
import "time"
import "errors"
import "sync"
import "fmt"
const tcp_msgSize = 2048+65535 // TODO figure out what makes sense
type tcpInterface struct {
core *Core
serv *net.TCPListener
mutex sync.Mutex // Protecting the below
calls map[string]struct{}
}
type tcpKeys struct {
box boxPubKey
sig sigPubKey
}
func (iface *tcpInterface) init(core *Core, addr string) {
iface.core = core
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil { panic(err) }
iface.serv, err = net.ListenTCP("tcp", tcpAddr)
if err != nil { panic(err) }
iface.calls = make(map[string]struct{})
go iface.listener()
}
func (iface *tcpInterface) listener() {
defer iface.serv.Close()
iface.core.log.Println("Listening on:", iface.serv.Addr().String())
for {
sock, err := iface.serv.AcceptTCP()
if err != nil { panic(err) }
go iface.handler(sock)
}
}
func (iface *tcpInterface) call(saddr string) {
go func() {
quit := false
iface.mutex.Lock()
if _, isIn := iface.calls[saddr]; isIn {
quit = true
} else {
iface.calls[saddr] = struct{}{}
defer func() {
iface.mutex.Lock()
delete(iface.calls, saddr)
iface.mutex.Unlock()
}()
}
iface.mutex.Unlock()
if !quit {
conn, err := net.DialTimeout("tcp", saddr, 6*time.Second)
if err != nil { return }
sock := conn.(*net.TCPConn)
iface.handler(sock)
}
}()
}
func (iface *tcpInterface) handler(sock *net.TCPConn) {
defer sock.Close()
// Get our keys
keys := []byte{}
keys = append(keys, tcp_key[:]...)
keys = append(keys, iface.core.boxPub[:]...)
keys = append(keys, iface.core.sigPub[:]...)
_, err := sock.Write(keys)
if err != nil { return }
timeout := time.Now().Add(6*time.Second)
sock.SetReadDeadline(timeout)
n, err := sock.Read(keys)
if err != nil { return }
if n < len(keys) { /*panic("Partial key packet?") ;*/ return }
ks := tcpKeys{}
if !tcp_chop_keys(&ks.box, &ks.sig, &keys) { /*panic("Invalid key packet?") ;*/ return }
// Quit the parent call if this is a connection to ourself
equiv := func(k1, k2 []byte) bool {
for idx := range k1 {
if k1[idx] != k2[idx] { return false }
}
return true
}
if equiv(ks.box[:], iface.core.boxPub[:]) { return } // testing
if equiv(ks.sig[:], iface.core.sigPub[:]) { return }
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
linkIn := make(chan []byte, 1)
p := iface.core.peers.newPeer(&ks.box, &ks.sig)//, in, out)
in := func(bs []byte) {
p.handlePacket(bs, linkIn)
}
out := make(chan []byte, 1024) // TODO? what size makes sense
defer close(out)
go func() {
var stack [][]byte
put := func(msg []byte) {
stack = append(stack, msg)
for len(stack) > 1024 {
util_putBytes(stack[0])
stack = stack[1:]
}
}
send := func() {
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
buf := net.Buffers{tcp_msg[:],
wire_encode_uint64(uint64(len(msg))),
msg}
size := 0
for _, bs := range buf { size += len(bs) }
start := time.Now()
buf.WriteTo(sock)
timed := time.Since(start)
pType, _ := wire_decode_uint64(msg)
if pType == wire_LinkProtocolTraffic {
p.updateBandwidth(size, timed)
}
util_putBytes(msg)
}
for msg := range out {
put(msg)
for len(stack) > 0 {
// Keep trying to fill the stack (LIFO order) while sending
select {
case msg, ok := <-out:
if !ok { return }
put(msg)
default: send()
}
}
}
}()
p.out = func(msg []byte) {
defer func() { recover() }()
for {
select {
case out<-msg: return
default: util_putBytes(<-out)
}
}
}
sock.SetNoDelay(true)
go p.linkLoop(linkIn)
defer func() {
// Put all of our cleanup here...
p.core.peers.mutex.Lock()
oldPorts := p.core.peers.getPorts()
newPorts := make(map[switchPort]*peer)
for k,v := range oldPorts{ newPorts[k] = v }
delete(newPorts, p.port)
p.core.peers.putPorts(newPorts)
p.core.peers.mutex.Unlock()
close(linkIn)
}()
them := sock.RemoteAddr()
themNodeID := getNodeID(&ks.box)
themAddr := address_addrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them)
iface.core.log.Println("Connected:", themString)
iface.reader(sock, in) // In this goroutine, because of defers
iface.core.log.Println("Disconnected:", themString)
return
}
func (iface *tcpInterface) reader(sock *net.TCPConn, in func([]byte)) {
bs := make([]byte, 2*tcp_msgSize)
frag := bs[:0]
for {
timeout := time.Now().Add(6*time.Second)
sock.SetReadDeadline(timeout)
n, err := sock.Read(bs[len(frag):])
if err != nil || n == 0 { break }
frag = bs[:len(frag)+n]
for {
msg, ok, err := tcp_chop_msg(&frag)
if err != nil { return }
if !ok { break } // We didn't get the whole message yet
newMsg := append(util_getBytes(), msg...)
in(newMsg)
util_yield()
}
frag = append(bs[:0], frag...)
}
}
////////////////////////////////////////////////////////////////////////////////
// Magic bytes to check
var tcp_key = [...]byte{'k', 'e', 'y', 's'}
var tcp_msg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits"
func tcp_chop_keys(box *boxPubKey, sig *sigPubKey, bs *[]byte) bool {
// This one is pretty simple: we know how long the message should be
// So don't call this with a message that's too short
if len(*bs) < len(tcp_key) + len(*box) + len(*sig) { return false }
for idx := range tcp_key {
if (*bs)[idx] != tcp_key[idx] { return false }
}
(*bs) = (*bs)[len(tcp_key):]
copy(box[:], *bs)
(*bs) = (*bs)[len(box):]
copy(sig[:], *bs)
(*bs) = (*bs)[len(sig):]
return true
}
func tcp_chop_msg(bs *[]byte) ([]byte, bool, error) {
// Returns msg, ok, err
if len(*bs) < len(tcp_msg) { return nil, false, nil }
for idx := range tcp_msg {
if (*bs)[idx] != tcp_msg[idx] {
return nil, false, errors.New("Bad message!")
}
}
msgLen, msgLenLen := wire_decode_uint64((*bs)[len(tcp_msg):])
if msgLen > tcp_msgSize { return nil, false, errors.New("Oversized message!") }
msgBegin := len(tcp_msg) + msgLenLen
msgEnd := msgBegin + int(msgLen)
if msgLenLen == 0 || len(*bs) < msgEnd {
// We don't have the full message
// Need to buffer this and wait for the rest to come in
return nil, false, nil
}
msg := (*bs)[msgBegin:msgEnd]
(*bs) = (*bs)[msgEnd:]
return msg, true, nil
}

56
src/yggdrasil/tun.go Normal file
View file

@ -0,0 +1,56 @@
package yggdrasil
// This manages the tun driver to send/recv packets to/from applications
import water "github.com/songgao/water"
const IPv6_HEADER_LENGTH = 40
type tunDevice struct {
core *Core
send chan<- []byte
recv <-chan []byte
mtu int
iface *water.Interface
}
func (tun *tunDevice) init(core *Core) {
tun.core = core
}
func (tun *tunDevice) setup(addr string, mtu int) error {
iface, err := water.New(water.Config{ DeviceType: water.TUN })
if err != nil { panic(err) }
tun.iface = iface
tun.mtu = mtu //1280 // Lets default to the smallest thing allowed for now
return tun.setupAddress(addr)
}
func (tun *tunDevice) write() error {
for {
data := <-tun.recv
if _, err := tun.iface.Write(data); err != nil { return err }
util_putBytes(data)
}
}
func (tun *tunDevice) read() error {
buf := make([]byte, tun.mtu)
for {
n, err := tun.iface.Read(buf)
if err != nil { return err }
if buf[0] & 0xf0 != 0x60 ||
n != 256*int(buf[4]) + int(buf[5]) + IPv6_HEADER_LENGTH {
// Either not an IPv6 packet or not the complete packet for some reason
panic("Should not happen in testing")
continue
}
packet := append(util_getBytes(), buf[:n]...)
tun.send<-packet
}
}
func (tun *tunDevice) close() error {
return tun.iface.Close()
}

View file

@ -0,0 +1,36 @@
package yggdrasil
// The linux platform specific tun parts
// It depends on iproute2 being installed to set things on the tun device
import "fmt"
import "os/exec"
import "strings"
func (tun *tunDevice) setupAddress(addr string) error {
// Set address
cmd := exec.Command("ip", "-f", "inet6",
"addr", "add", addr,
"dev", tun.iface.Name())
tun.core.log.Printf("ip command: %v", strings.Join(cmd.Args, " "))
output, err := cmd.CombinedOutput()
if err != nil {
tun.core.log.Printf("Linux ip failed: %v.", err)
tun.core.log.Println(string(output))
return err
}
// Set MTU and bring device up
cmd = exec.Command("ip", "link", "set",
"dev", tun.iface.Name(),
"mtu", fmt.Sprintf("%d", tun.mtu),
"up")
tun.core.log.Printf("ip command: %v", strings.Join(cmd.Args, " "))
output, err = cmd.CombinedOutput()
if err != nil {
tun.core.log.Printf("Linux ip failed: %v.", err)
tun.core.log.Println(string(output))
return err
}
return nil
}

View file

@ -0,0 +1,12 @@
// +build !linux
package yggdrasil
// This is to catch unsupported platforms
// If your platform supports tun devices, you could try configuring it manually
func (tun *tunDevice) setupAddress(addr string) error {
tun.core.log.Println("Platform not supported, you must set the address of", tun.iface.Name(), "to", addr)
return nil
}

275
src/yggdrasil/udp.go Normal file
View file

@ -0,0 +1,275 @@
package yggdrasil
// This communicates with peers via UDP
// It's not as well tested or debugged as the TCP transport
// It's intended to use UDP, so debugging/optimzing this is a high priority
// TODO? use golang.org/x/net/ipv6.PacketConn's ReadBatch and WriteBatch?
// To send all chunks of a message / recv all available chunks in one syscall
// Chunks are currently murged, but outgoing messages aren't chunked
// This is just to support chunking in the future, if it's needed and debugged
// Basically, right now we might send UDP packets that are too large
import "net"
import "time"
import "sync"
import "fmt"
type udpInterface struct {
core *Core
sock *net.UDPConn // Or more general PacketConn?
mutex sync.RWMutex // each conn has an owner goroutine
conns map[connAddr]*connInfo
}
type connAddr string // TODO something more efficient, but still a valid map key
type connInfo struct {
addr connAddr
peer *peer
linkIn chan []byte
keysIn chan *udpKeys
timeout int // count of how many heartbeats have been missed
in func([]byte)
out chan []byte
countIn uint8
countOut uint8
}
type udpKeys struct {
box boxPubKey
sig sigPubKey
}
func (iface *udpInterface) init(core *Core, addr string) {
iface.core = core
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil { panic(err) }
iface.sock, err = net.ListenUDP("udp", udpAddr)
if err != nil { panic(err) }
iface.conns = make(map[connAddr]*connInfo)
go iface.reader()
}
func (iface *udpInterface) sendKeys(addr connAddr) {
udpAddr, err := net.ResolveUDPAddr("udp", string(addr))
if err != nil { panic(err) }
msg := []byte{}
msg = udp_encode(msg, 0, 0, 0, nil)
msg = append(msg, iface.core.boxPub[:]...)
msg = append(msg, iface.core.sigPub[:]...)
iface.sock.WriteToUDP(msg, udpAddr)
}
func udp_isKeys(msg []byte) bool {
keyLen := 3 + boxPubKeyLen + sigPubKeyLen
return len(msg) == keyLen && msg[0] == 0x00
}
func (iface *udpInterface) startConn(info *connInfo) {
ticker := time.NewTicker(6*time.Second)
defer ticker.Stop()
defer func () {
// Cleanup
// FIXME this still leaks a peer struct
iface.mutex.Lock()
delete(iface.conns, info.addr)
iface.mutex.Unlock()
iface.core.peers.mutex.Lock()
oldPorts := iface.core.peers.getPorts()
newPorts := make(map[switchPort]*peer)
for k,v := range oldPorts{ newPorts[k] = v }
delete(newPorts, info.peer.port)
iface.core.peers.putPorts(newPorts)
iface.core.peers.mutex.Unlock()
close(info.linkIn)
close(info.keysIn)
close(info.out)
iface.core.log.Println("Removing peer:", info.addr)
}()
for {
select {
case ks := <-info.keysIn: {
// FIXME? need signatures/sequence-numbers or something
// Spoofers could lock out a peer with fake/bad keys
if ks.box == info.peer.box && ks.sig == info.peer.sig {
info.timeout = 0
}
}
case <-ticker.C: {
if info.timeout > 10 { return }
info.timeout++
iface.sendKeys(info.addr)
}
}
}
}
func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
//defer util_putBytes(msg)
var ks udpKeys
_, _, _, bs := udp_decode(msg)
switch {
case !wire_chop_slice(ks.box[:], &bs): return
case !wire_chop_slice(ks.sig[:], &bs): return
}
if ks.box == iface.core.boxPub { return }
if ks.sig == iface.core.sigPub { return }
iface.mutex.RLock()
conn, isIn := iface.conns[addr]
iface.mutex.RUnlock() // TODO? keep the lock longer?...
if !isIn {
udpAddr, err := net.ResolveUDPAddr("udp", string(addr))
if err != nil { panic(err) }
conn = &connInfo{
addr: connAddr(addr),
peer: iface.core.peers.newPeer(&ks.box, &ks.sig),
linkIn: make(chan []byte, 1),
keysIn: make(chan *udpKeys, 1),
out: make(chan []byte, 1024),
}
/*
conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) }
conn.peer.out = func (msg []byte) {
start := time.Now()
iface.sock.WriteToUDP(msg, udpAddr)
timed := time.Since(start)
conn.peer.updateBandwidth(len(msg), timed)
util_putBytes(msg)
} // Old version, always one syscall per packet
//*/
/*
conn.peer.out = func (msg []byte) {
defer func() { recover() }()
select {
case conn.out<-msg:
default: util_putBytes(msg)
}
}
go func () {
for msg := range conn.out {
start := time.Now()
iface.sock.WriteToUDP(msg, udpAddr)
timed := time.Since(start)
conn.peer.updateBandwidth(len(msg), timed)
util_putBytes(msg)
}
}()
//*/
//*
var inChunks uint8
var inBuf []byte
conn.in = func(bs []byte) {
//defer util_putBytes(bs)
chunks, chunk, count, payload := udp_decode(bs)
//iface.core.log.Println("DEBUG:", addr, chunks, chunk, count, len(payload))
//iface.core.log.Println("DEBUG: payload:", payload)
if count != conn.countIn {
inChunks = 0
inBuf = inBuf[:0]
conn.countIn = count
}
if chunk <= chunks && chunk == inChunks + 1 {
//iface.core.log.Println("GOING:", addr, chunks, chunk, count, len(payload))
inChunks += 1
inBuf = append(inBuf, payload...)
if chunks != chunk { return }
msg := append(util_getBytes(), inBuf...)
conn.peer.handlePacket(msg, conn.linkIn)
//iface.core.log.Println("DONE:", addr, chunks, chunk, count, len(payload))
}
}
conn.peer.out = func (msg []byte) {
defer func() { recover() }()
select {
case conn.out<-msg:
default: util_putBytes(msg)
}
}
go func () {
//var chunks [][]byte
var out []byte
for msg := range conn.out {
var chunks [][]byte
bs := msg
for len(bs) > udp_chunkSize {
chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:]
}
chunks = append(chunks, bs)
//iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg))
if len(chunks) > 255 { continue }
start := time.Now()
for idx,bs := range chunks {
nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut
out = udp_encode(out[:0], nChunks, nChunk, count, bs)
//iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs))
iface.sock.WriteToUDP(out, udpAddr)
}
timed := time.Since(start)
conn.countOut += 1
conn.peer.updateBandwidth(len(msg), timed)
util_putBytes(msg)
}
}()
//*/
iface.mutex.Lock()
iface.conns[addr] = conn
iface.mutex.Unlock()
themNodeID := getNodeID(&ks.box)
themAddr := address_addrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, addr)
iface.core.log.Println("Adding peer:", themString)
go iface.startConn(conn)
go conn.peer.linkLoop(conn.linkIn)
iface.sendKeys(conn.addr)
}
func() {
defer func() { recover() }()
select {
case conn.keysIn<-&ks:
default:
}
}()
}
func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) {
iface.mutex.RLock()
if conn, isIn := iface.conns[addr]; isIn {
conn.in(msg)
}
iface.mutex.RUnlock()
}
func (iface *udpInterface) reader() {
bs := make([]byte, 2048) // This needs to be large enough for everything...
for {
//iface.core.log.Println("Starting read")
n, udpAddr, err := iface.sock.ReadFromUDP(bs)
//iface.core.log.Println("Read", n, udpAddr.String(), err)
if err != nil { panic(err) ; break }
if n > 1500 { panic(n) }
//msg := append(util_getBytes(), bs[:n]...)
msg := bs[:n]
addr := connAddr(udpAddr.String())
if udp_isKeys(msg) {
iface.handleKeys(msg, addr)
} else {
iface.handlePacket(msg, addr)
}
}
}
////////////////////////////////////////////////////////////////////////////////
const udp_chunkSize = 65535
func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) {
if len(bs) >= 3 {
chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:]
}
return
}
func udp_encode(out []byte, chunks, chunk, count uint8, payload []byte) []byte {
return append(append(out, chunks, chunk, count), payload...)
}

79
src/yggdrasil/util.go Normal file
View file

@ -0,0 +1,79 @@
package yggdrasil
// These are misc. utility functions that didn't really fit anywhere else
import "fmt"
import "runtime"
//import "sync"
func Util_testAddrIDMask() {
for idx := 0 ; idx < 16 ; idx++ {
var orig NodeID
orig[8] = 42
for bidx := 0 ; bidx < idx ; bidx++ {
orig[bidx/8] |= (0x80 >> uint8(bidx % 8))
}
addr := address_addrForNodeID(&orig)
nid, mask := addr.getNodeIDandMask()
for b := 0 ; b < len(mask) ; b++ {
nid[b] &= mask[b]
orig[b] &= mask[b]
}
if *nid != orig {
fmt.Println(orig)
fmt.Println(*addr)
fmt.Println(*nid)
fmt.Println(*mask)
panic(idx)
}
}
}
func util_yield() {
runtime.Gosched()
}
func util_lockthread() {
runtime.LockOSThread()
}
func util_unlockthread() {
runtime.UnlockOSThread()
}
/*
var byteStore sync.Pool = sync.Pool{
New: func () interface{} { return []byte(nil) },
}
func util_getBytes() []byte {
return byteStore.Get().([]byte)[:0]
}
func util_putBytes(bs []byte) {
byteStore.Put(bs) // FIXME? The cast to interface{} allocates...
}
*/
var byteStore chan []byte
func util_initByteStore() {
if byteStore == nil {
byteStore = make(chan []byte, 32)
}
}
func util_getBytes() []byte {
select {
case bs := <-byteStore: return bs[:0]
default: return nil
}
}
func util_putBytes(bs []byte) {
select {
case byteStore<-bs:
default:
}
}

492
src/yggdrasil/wire.go Normal file
View file

@ -0,0 +1,492 @@
package yggdrasil
// Wire formatting tools
// These are all ugly and probably not very secure
// Packet types, as an Encode_uint64 at the start of each packet
// TODO? make things still work after reordering (after things stabilize more?)
// Type safety would also be nice, `type wire_type uint64`, rewrite as needed?
const (
wire_Traffic = iota // data being routed somewhere, handle for crypto
wire_ProtocolTraffic // protocol traffic, pub keys for crypto
wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto
wire_SwitchAnnounce // TODO put inside protocol traffic header
wire_SwitchHopRequest // TODO put inside protocol traffic header
wire_SwitchHop // TODO put inside protocol traffic header
wire_SessionPing // inside protocol traffic header
wire_SessionPong // inside protocol traffic header
wire_DHTLookupRequest // inside protocol traffic header
wire_DHTLookupResponse // inside protocol traffic header
wire_SearchRequest // inside protocol traffic header
wire_SearchResponse // inside protocol traffic header
//wire_Keys // udp key packet (boxPub, sigPub)
)
// Encode uint64 using a variable length scheme
// Similar to binary.Uvarint, but big-endian
func wire_encode_uint64(elem uint64) []byte {
return wire_put_uint64(elem, nil)
}
// Occasionally useful for appending to an existing slice (if there's room)
func wire_put_uint64(elem uint64, out []byte) []byte {
bs := make([]byte, 0, 10)
bs = append(bs, byte(elem & 0x7f))
for e := elem >> 7 ; e > 0 ; e >>= 7 {
bs = append(bs, byte(e | 0x80))
}
// Now reverse bytes, because we set them in the wrong order
// TODO just put them in the right place the first time...
last := len(bs)-1
for idx := 0 ; idx < len(bs)/2 ; idx++ {
bs[idx], bs[last-idx] = bs[last-idx], bs[idx]
}
return append(out, bs...)
}
// Decode uint64 from a []byte slice
// Returns the decoded uint64 and the number of bytes used
func wire_decode_uint64(bs []byte) (uint64, int) {
length := 0
elem := uint64(0)
for _, b := range bs {
elem <<= 7
elem |= uint64(b & 0x7f)
length++
if b & 0x80 == 0 { break }
}
return elem, length
}
func wire_intToUint(i int64) uint64 {
var u uint64
if i < 0 {
u = uint64(-i) << 1
u |= 0x01 // sign bit
} else {
u = uint64(i) << 1
}
return u
}
func wire_intFromUint(u uint64) int64 {
var i int64
i = int64(u >> 1)
if u & 0x01 != 0 { i *= -1 }
return i
}
////////////////////////////////////////////////////////////////////////////////
// Takes coords, returns coords prefixed with encoded coord length
func wire_encode_coords(coords []byte) ([]byte) {
coordLen := wire_encode_uint64(uint64(len(coords)))
bs := make([]byte, 0, len(coordLen)+len(coords))
bs = append(bs, coordLen...)
bs = append(bs, coords...)
return bs
}
func wire_put_coords(coords []byte, bs []byte) ([]byte) {
bs = wire_put_uint64(uint64(len(coords)), bs)
bs = append(bs, coords...)
return bs
}
// Takes a packet that begins with coords (starting with coord length)
// Returns a slice of coords and the number of bytes read
func wire_decode_coords(packet []byte) ([]byte, int) {
coordLen, coordBegin := wire_decode_uint64(packet)
coordEnd := coordBegin+int(coordLen)
//if coordBegin == 0 { panic("No coords found") } // Testing
//if coordEnd > len(packet) { panic("Packet too short") } // Testing
if coordBegin == 0 || coordEnd > len(packet) { return nil, 0 }
return packet[coordBegin:coordEnd], coordEnd
}
////////////////////////////////////////////////////////////////////////////////
// TODO move this msg stuff somewhere else, use encode() and decode() methods
// Announces that we can send parts of a Message with a particular seq
type msgAnnounce struct {
root sigPubKey
tstamp int64
seq uint64
len uint64
//Deg uint64
//RSeq uint64
}
func (m *msgAnnounce) encode() []byte {
bs := wire_encode_uint64(wire_SwitchAnnounce)
bs = append(bs, m.root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...)
bs = append(bs, wire_encode_uint64(m.seq)...)
bs = append(bs, wire_encode_uint64(m.len)...)
//bs = append(bs, wire_encode_uint64(m.Deg)...)
//bs = append(bs, wire_encode_uint64(m.RSeq)...)
return bs
}
func (m *msgAnnounce) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SwitchAnnounce: return false
case !wire_chop_slice(m.root[:], &bs): return false
case !wire_chop_uint64(&tstamp, &bs): return false
case !wire_chop_uint64(&m.seq, &bs): return false
case !wire_chop_uint64(&m.len, &bs): return false
//case !wire_chop_uint64(&m.Deg, &bs): return false
//case !wire_chop_uint64(&m.RSeq, &bs): return false
}
m.tstamp = wire_intFromUint(tstamp)
return true
}
type msgHopReq struct {
root sigPubKey
tstamp int64
seq uint64
hop uint64
}
func (m *msgHopReq) encode() []byte {
bs := wire_encode_uint64(wire_SwitchHopRequest)
bs = append(bs, m.root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...)
bs = append(bs, wire_encode_uint64(m.seq)...)
bs = append(bs, wire_encode_uint64(m.hop)...)
return bs
}
func (m *msgHopReq) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SwitchHopRequest: return false
case !wire_chop_slice(m.root[:], &bs): return false
case !wire_chop_uint64(&tstamp, &bs): return false
case !wire_chop_uint64(&m.seq, &bs): return false
case !wire_chop_uint64(&m.hop, &bs): return false
}
m.tstamp = wire_intFromUint(tstamp)
return true
}
type msgHop struct {
root sigPubKey
tstamp int64
seq uint64
hop uint64
port switchPort
next sigPubKey
sig sigBytes
}
func (m *msgHop) encode() []byte {
bs := wire_encode_uint64(wire_SwitchHop)
bs = append(bs, m.root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...)
bs = append(bs, wire_encode_uint64(m.seq)...)
bs = append(bs, wire_encode_uint64(m.hop)...)
bs = append(bs, wire_encode_uint64(uint64(m.port))...)
bs = append(bs, m.next[:]...)
bs = append(bs, m.sig[:]...)
return bs
}
func (m *msgHop) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SwitchHop: return false
case !wire_chop_slice(m.root[:], &bs): return false
case !wire_chop_uint64(&tstamp, &bs): return false
case !wire_chop_uint64(&m.seq, &bs): return false
case !wire_chop_uint64(&m.hop, &bs): return false
case !wire_chop_uint64((*uint64)(&m.port), &bs): return false
case !wire_chop_slice(m.next[:], &bs): return false
case !wire_chop_slice(m.sig[:], &bs): return false
}
m.tstamp = wire_intFromUint(tstamp)
return true
}
// Format used to check signatures only, so no need to also support decoding
func wire_encode_locator(loc *switchLocator) []byte {
coords := wire_encode_coords(loc.getCoords())
var bs []byte
bs = append(bs, loc.root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...)
bs = append(bs, coords...)
return bs
}
func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool {
if len(*fromSlice) < len(toSlice) { return false }
copy(toSlice, *fromSlice)
*fromSlice = (*fromSlice)[len(toSlice):]
return true
}
func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool {
coords, coordLen := wire_decode_coords(*fromSlice)
if coordLen == 0 { return false }
*toCoords = append((*toCoords)[:0], coords...)
*fromSlice = (*fromSlice)[coordLen:]
return true
}
func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool {
dec, decLen := wire_decode_uint64(*fromSlice)
if decLen == 0 { return false }
*toUInt64 = dec
*fromSlice = (*fromSlice)[decLen:]
return true
}
////////////////////////////////////////////////////////////////////////////////
// Wire traffic packets
type wire_trafficPacket struct {
ttl uint64 // TODO? hide this as a wire format detail, not set by user
coords []byte
handle handle
nonce boxNonce
payload []byte
}
// This is basically MarshalBinary, but decode doesn't allow that...
func (p *wire_trafficPacket) encode() []byte {
bs := util_getBytes()
bs = wire_put_uint64(wire_Traffic, bs)
bs = wire_put_uint64(p.ttl, bs)
bs = wire_put_coords(p.coords, bs)
bs = append(bs, p.handle[:]...)
bs = append(bs, p.nonce[:]...)
bs = append(bs, p.payload...)
return bs
}
// Not just UnmarshalBinary becuase the original slice isn't always copied from
func (p *wire_trafficPacket) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_Traffic: return false
case !wire_chop_uint64(&p.ttl, &bs): return false
case !wire_chop_coords(&p.coords, &bs): return false
case !wire_chop_slice(p.handle[:], &bs): return false
case !wire_chop_slice(p.nonce[:], &bs): return false
}
p.payload = append(util_getBytes(), bs...)
return true
}
type wire_protoTrafficPacket struct {
ttl uint64 // TODO? hide this as a wire format detail, not set by user
coords []byte
toKey boxPubKey
fromKey boxPubKey
nonce boxNonce
payload []byte
}
func (p *wire_protoTrafficPacket) encode() []byte {
coords := wire_encode_coords(p.coords)
bs := wire_encode_uint64(wire_ProtocolTraffic)
bs = append(bs, wire_encode_uint64(p.ttl)...)
bs = append(bs, coords...)
bs = append(bs, p.toKey[:]...)
bs = append(bs, p.fromKey[:]...)
bs = append(bs, p.nonce[:]...)
bs = append(bs, p.payload...)
return bs
}
func(p *wire_protoTrafficPacket) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_ProtocolTraffic: return false
case !wire_chop_uint64(&p.ttl, &bs): return false
case !wire_chop_coords(&p.coords, &bs): return false
case !wire_chop_slice(p.toKey[:], &bs): return false
case !wire_chop_slice(p.fromKey[:], &bs): return false
case !wire_chop_slice(p.nonce[:], &bs): return false
}
p.payload = bs
return true
}
type wire_linkProtoTrafficPacket struct {
toKey boxPubKey
fromKey boxPubKey
nonce boxNonce
payload []byte
}
func (p *wire_linkProtoTrafficPacket) encode() []byte {
bs := wire_encode_uint64(wire_LinkProtocolTraffic)
bs = append(bs, p.toKey[:]...)
bs = append(bs, p.fromKey[:]...)
bs = append(bs, p.nonce[:]...)
bs = append(bs, p.payload...)
return bs
}
func(p *wire_linkProtoTrafficPacket) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_LinkProtocolTraffic: return false
case !wire_chop_slice(p.toKey[:], &bs): return false
case !wire_chop_slice(p.fromKey[:], &bs): return false
case !wire_chop_slice(p.nonce[:], &bs): return false
}
p.payload = bs
return true
}
////////////////////////////////////////////////////////////////////////////////
func (p *sessionPing) encode() []byte {
var pTypeVal uint64
if p.isPong {
pTypeVal = wire_SessionPong
} else {
pTypeVal = wire_SessionPing
}
bs := wire_encode_uint64(pTypeVal)
//p.sendPermPub used in top level (crypto), so skipped here
bs = append(bs, p.handle[:]...)
bs = append(bs, p.sendSesPub[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(p.tstamp))...)
coords := wire_encode_coords(p.coords)
bs = append(bs, coords...)
return bs
}
func (p *sessionPing) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SessionPing && pType != wire_SessionPong: return false
//p.sendPermPub used in top level (crypto), so skipped here
case !wire_chop_slice(p.handle[:], &bs): return false
case !wire_chop_slice(p.sendSesPub[:], &bs): return false
case !wire_chop_uint64(&tstamp, &bs): return false
case !wire_chop_coords(&p.coords, &bs): return false
}
p.tstamp = wire_intFromUint(tstamp)
if pType == wire_SessionPong { p.isPong = true }
return true
}
////////////////////////////////////////////////////////////////////////////////
func (r *dhtReq) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupRequest)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
return bs
}
func (r *dhtReq) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_DHTLookupRequest: return false
case !wire_chop_slice(r.key[:], &bs): return false
case !wire_chop_coords(&r.coords, &bs): return false
case !wire_chop_slice(r.dest[:], &bs): return false
default: return true
}
}
func (r *dhtRes) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupResponse)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
for _, info := range r.infos {
coords = wire_encode_coords(info.coords)
bs = append(bs, info.key[:]...)
bs = append(bs, coords...)
}
return bs
}
func (r *dhtRes) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_DHTLookupResponse: return false
case !wire_chop_slice(r.key[:], &bs): return false
case !wire_chop_coords(&r.coords, &bs): return false
case !wire_chop_slice(r.dest[:], &bs): return false
}
for len(bs) > 0 {
info := dhtInfo{}
switch {
case !wire_chop_slice(info.key[:], &bs): return false
case !wire_chop_coords(&info.coords, &bs): return false
}
r.infos = append(r.infos, &info)
}
return true
}
////////////////////////////////////////////////////////////////////////////////
func (r *searchReq) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_SearchRequest)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
return bs
}
func (r *searchReq) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SearchRequest: return false
case !wire_chop_slice(r.key[:], &bs): return false
case !wire_chop_coords(&r.coords, &bs): return false
case !wire_chop_slice(r.dest[:], &bs): return false
default: return true
}
}
func (r *searchRes) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_SearchResponse)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
return bs
}
func (r *searchRes) decode(bs []byte) bool {
var pType uint64
switch {
case !wire_chop_uint64(&pType, &bs): return false
case pType != wire_SearchResponse: return false
case !wire_chop_slice(r.key[:], &bs): return false
case !wire_chop_coords(&r.coords, &bs): return false
case !wire_chop_slice(r.dest[:], &bs): return false
default: return true
}
}