Merge pull request #17 from yggdrasil-network/develop

Branch Develop: Base to Fork
This commit is contained in:
Christer Warén 2019-03-03 17:37:38 +02:00 committed by GitHub
commit 50987142a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 494 additions and 100 deletions

View file

@ -3,7 +3,7 @@
# Check https://circleci.com/docs/2.0/language-go/ for more details
version: 2
jobs:
build:
build-linux:
docker:
- image: circleci/golang:1.11
@ -43,17 +43,97 @@ jobs:
sudo alien --to-rpm yggdrasil*.deb --scripts --keep-version && mv *.rpm /tmp/upload/;
mv *.deb /tmp/upload/
- run:
name: Build for EdgeRouter
command: |
rm -f {yggdrasil,yggdrasilctl}
git clone https://github.com/neilalexander/vyatta-yggdrasil /tmp/vyatta-yggdrasil;
cd /tmp/vyatta-yggdrasil;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-x $CIRCLE_BRANCH;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-lite $CIRCLE_BRANCH;
mv *.deb /tmp/upload;
- persist_to_workspace:
root: /tmp
paths:
- upload
build-macos:
macos:
xcode: "10.0.0"
working_directory: ~/go/src/github.com/yggdrasil-network/yggdrasil-go
steps:
- checkout
- run:
name: Create artifact upload directory and set variables
command: |
mkdir /tmp/upload
echo 'export CINAME=$(sh contrib/semver/name.sh)' >> $BASH_ENV
echo 'export CIVERSION=$(sh contrib/semver/version.sh --bare)' >> $BASH_ENV
echo 'export PATH=$PATH:/usr/local/go/bin:~/go/bin' >> $BASH_ENV
git config --global user.email "$(git log --format='%ae' HEAD -1)";
git config --global user.name "$(git log --format='%an' HEAD -1)";
echo -e "Host *\n\tStrictHostKeyChecking no\n" >> ~/.ssh/config
- run:
name: Install Go 1.11
command: |
cd /tmp
curl -LO https://dl.google.com/go/go1.11.5.darwin-amd64.pkg
sudo installer -pkg /tmp/go1.11.5.darwin-amd64.pkg -target /
- run:
name: Install Gomobile
command: |
GO111MODULE=off go get golang.org/x/mobile/cmd/gomobile
gomobile init
- run:
name: Build for macOS
command: |
rm -f {yggdrasil,yggdrasilctl}
GOOS=darwin GOARCH=amd64 ./build && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-darwin-amd64 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-darwin-amd64;
GO111MODULE=on GOOS=darwin GOARCH=amd64 ./build
mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-darwin-amd64
mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-darwin-amd64;
- run:
name: Build for macOS (.pkg format)
command: |
rm -rf {yggdrasil,yggdrasilctl}
GOOS=darwin GOARCH=amd64 ./build && PKGARCH=amd64 sh contrib/macos/create-pkg.sh && mv *.pkg /tmp/upload/
PKGARCH=amd64 sh contrib/macos/create-pkg.sh
mv *.pkg /tmp/upload/
#- run:
# name: Build framework for iOS (.framework format)
# command: |
# sudo GO111MODULE=off go get -v github.com/yggdrasil-network/yggdrasil-go/cmd/...
# sudo GO111MODULE=off go get -v github.com/yggdrasil-network/yggdrasil-go/src/...
# GO111MODULE=off ./build -i
# mv *.framework /tmp/upload
- persist_to_workspace:
root: /tmp
paths:
- upload
build-other:
docker:
- image: circleci/golang:1.11
steps:
- checkout
- run:
name: Create artifact upload directory and set variables
command: |
mkdir /tmp/upload
echo 'export CINAME=$(sh contrib/semver/name.sh)' >> $BASH_ENV
echo 'export CIVERSION=$(sh contrib/semver/version.sh --bare)' >> $BASH_ENV
git config --global user.email "$(git log --format='%ae' HEAD -1)";
git config --global user.name "$(git log --format='%an' HEAD -1)";
- run:
name: Build for OpenBSD
@ -83,16 +163,31 @@ jobs:
GOOS=windows GOARCH=amd64 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-amd64.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-amd64.exe;
GOOS=windows GOARCH=386 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-i386.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-i386.exe;
- run:
name: Build for EdgeRouter
command: |
rm -f {yggdrasil,yggdrasilctl}
git clone https://github.com/neilalexander/vyatta-yggdrasil /tmp/vyatta-yggdrasil;
cd /tmp/vyatta-yggdrasil;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-x $CIRCLE_BRANCH;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-lite $CIRCLE_BRANCH;
mv *.deb /tmp/upload;
- persist_to_workspace:
root: /tmp
paths:
- upload
upload:
machine: true
steps:
- attach_workspace:
at: /tmp
- store_artifacts:
path: /tmp/upload
destination: /
workflows:
version: 2
build-all:
jobs:
- build-linux
- build-macos
- build-other
- upload:
requires:
- build-linux
- build-macos
- build-other

View file

@ -25,6 +25,27 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- in case of vulnerabilities.
-->
## [0.3.3] - 2018-02-18
### Added
- Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported)
- Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available
- Connection contexts used for TCP connections which allow more exotic socket options to be set, e.g.
- Reusing the multicast socket to allow multiple running Yggdrasil instances without having to disable multicast
- Allowing supported Macs to peer with other nearby Macs that aren't even on the same Wi-Fi network using AWDL
- Flexible logging support, which allows for logging at different levels of verbosity
### Changed
- Switch changes to improve parent selection
- Node configuration is now stored centrally, rather than having fragments/copies distributed at startup time
- Significant refactoring in various areas, including for link types (TCP, AWDL etc), generic streams and adapters
- macOS builds through CircleCI are now 64-bit only
### Fixed
- Simplified `systemd` service now in `contrib`
### Removed
- `ReadTimeout` option is now deprecated
## [0.3.2] - 2018-12-26
### Added
- The admin socket is now multithreaded, greatly improving performance of the crawler and allowing concurrent lookups to take place

View file

@ -0,0 +1,71 @@
#!/bin/sh
CONFFILE="/etc/yggdrasil.conf"
genconf() {
/usr/bin/yggdrasil -genconf > "$1"
return $?
}
probetun() {
modprobe tun
return $?
}
start() {
if [ ! -f "$CONFFILE" ]; then
printf 'Generating configuration file: '
if genconf "$CONFFILE"; then
echo "OK"
else
echo "FAIL"
return 1
fi
fi
if [ ! -e /dev/net/tun ]; then
printf 'Inserting TUN module: '
if probetun; then
echo "OK"
else
echo "FAIL"
return 1
fi
fi
printf 'Starting yggdrasil: '
if start-stop-daemon -S -b -x /usr/bin/yggdrasil \
-- --useconf < "$CONFFILE"; then
echo "OK"
else
echo "FAIL"
fi
}
stop() {
printf "Stopping yggdrasil: "
if start-stop-daemon -K -q -x /usr/bin/yggdrasil; then
echo "OK"
else
echo "FAIL"
fi
}
restart() {
stop
start
}
reload() {
restart
}
case "$1" in
start|stop|restart|reload)
"$1";;
*)
echo "Usage: $0 {start|stop|restart}"
exit 1
esac
exit 0

View file

@ -14,6 +14,7 @@ ExecStartPre=/bin/sh -ec "if ! test -s /etc/yggdrasil.conf; \
echo 'WARNING: A new /etc/yggdrasil.conf file has been generated.'; \
fi"
ExecStart=/usr/bin/yggdrasil -useconffile /etc/yggdrasil.conf
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
[Install]

33
misc/run-twolink-test Executable file
View file

@ -0,0 +1,33 @@
#!/bin/bash
# Connects nodes in two namespaces by two links with different bandwidth (10mbit and 100mbit)
ip netns add node1
ip netns add node2
ip link add veth11 type veth peer name veth21
ip link set veth11 netns node1 up
ip link set veth21 netns node2 up
ip link add veth12 type veth peer name veth22
ip link set veth12 netns node1 up
ip link set veth22 netns node2 up
ip netns exec node1 tc qdisc add dev veth11 root tbf rate 10mbit burst 8192 latency 1ms
ip netns exec node2 tc qdisc add dev veth21 root tbf rate 10mbit burst 8192 latency 1ms
ip netns exec node1 tc qdisc add dev veth12 root tbf rate 100mbit burst 8192 latency 1ms
ip netns exec node2 tc qdisc add dev veth22 root tbf rate 100mbit burst 8192 latency 1ms
echo '{AdminListen: "unix://node1.sock"}' | ip netns exec node1 env PPROFLISTEN=localhost:6060 ./yggdrasil -logging "info,warn,error,debug" -useconf &> node1.log &
echo '{AdminListen: "unix://node2.sock"}' | ip netns exec node2 env PPROFLISTEN=localhost:6060 ./yggdrasil -logging "info,warn,error,debug" -useconf &> node2.log &
echo "Started, to continue you should (possibly w/ sudo):"
echo "kill" $(jobs -p)
wait
ip netns delete node1
ip netns delete node2
ip link delete veth11
ip link delete veth12

View file

@ -6,13 +6,15 @@ import "os"
import "strings"
import "strconv"
import "time"
import "log"
import "runtime"
import "runtime/pprof"
import "flag"
import "github.com/gologme/log"
import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
import . "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
////////////////////////////////////////////////////////////////////////////////

View file

@ -56,3 +56,23 @@ func TimerStop(t *time.Timer) bool {
}
return true
}
// Run a blocking function with a timeout.
// Returns true if the function returns.
// Returns false if the timer fires.
// The blocked function remains blocked--the caller is responsible for somehow killing it.
func FuncTimeout(f func(), timeout time.Duration) bool {
success := make(chan struct{})
go func() {
defer close(success)
f()
}()
timer := time.NewTimer(timeout)
defer TimerStop(timer)
select {
case <-success:
return true
case <-timer.C:
return false
}
}

View file

@ -125,11 +125,15 @@ func (c *Core) addPeerLoop() {
// UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.log.Infoln("Reloading configuration...")
c.configMutex.Lock()
c.configOld = c.config
c.config = *config
c.configMutex.Unlock()
errors := 0
components := []chan chan error{
c.admin.reconfigure,
c.searches.reconfigure,
@ -148,9 +152,16 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
response := make(chan error)
component <- response
if err := <-response; err != nil {
c.log.Println(err)
c.log.Errorln(err)
errors++
}
}
if errors > 0 {
c.log.Warnln(errors, "modules reported errors during configuration reload")
} else {
c.log.Infoln("Configuration reloaded successfully")
}
}
// GetBuildName gets the current build name. This is usually injected if built

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
@ -91,11 +92,16 @@ func (intf *linkInterface) handler() error {
meta.link = *myLinkPub
metaBytes := meta.encode()
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
err := intf.msgIO._sendMetaBytes(metaBytes)
var err error
if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) {
return errors.New("timeout on metadata send")
}
if err != nil {
return err
}
metaBytes, err = intf.msgIO._recvMetaBytes()
if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) {
return errors.New("timeout on metadata recv")
}
if err != nil {
return err
}
@ -109,8 +115,8 @@ func (intf *linkInterface) handler() error {
return errors.New("failed to connect: wrong version")
}
// Check if we're authorized to connect to this key / IP
if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Warnf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
intf.msgIO.close()
return nil
@ -162,8 +168,6 @@ func (intf *linkInterface) handler() error {
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
intf.link.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
defer intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start the link loop
go intf.peer.linkLoop()
// Start the writer
@ -216,6 +220,8 @@ func (intf *linkInterface) handler() error {
case signalReady <- struct{}{}:
default:
}
//intf.link.core.log.Debugf("Sending packet to %s: %s, source %s",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
}
}()
@ -223,30 +229,39 @@ func (intf *linkInterface) handler() error {
// Used to enable/disable activity in the switch
signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
defer close(signalAlive)
ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved
go func() {
var isAlive bool
var isReady bool
var sendTimerRunning bool
var recvTimerRunning bool
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?...
sendTime := time.Second
sendTimer := time.NewTimer(sendTime)
defer util.TimerStop(sendTimer)
recvTimer := time.NewTimer(recvTime)
defer util.TimerStop(recvTimer)
closeTimer := time.NewTimer(closeTime)
defer util.TimerStop(closeTimer)
for {
//intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local,
// isAlive, isReady, sendTimerRunning, recvTimerRunning)
select {
case gotMsg, ok := <-signalAlive:
if !ok {
return
}
if !isAlive {
util.TimerStop(closeTimer)
closeTimer.Reset(closeTime)
util.TimerStop(recvTimer)
recvTimerRunning = false
isAlive = true
if !isReady {
// (Re-)enable in the switch
isReady = true
intf.link.core.switchTable.idleIn <- intf.peer.port
}
isReady = true
}
if gotMsg && !sendTimerRunning {
// We got a message
@ -255,6 +270,10 @@ func (intf *linkInterface) handler() error {
sendTimer.Reset(sendTime)
sendTimerRunning = true
}
if !gotMsg {
intf.link.core.log.Debugf("Received ack from %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
case sentMsg, ok := <-signalSent:
// Stop any running ack timer
if !ok {
@ -273,12 +292,13 @@ func (intf *linkInterface) handler() error {
if !ok {
return
}
if !isAlive || !isReady {
if !isAlive {
// Disable in the switch
isReady = false
} else {
// Keep enabled in the switch
intf.link.core.switchTable.idleIn <- intf.peer.port
isReady = true
}
case <-sendTimer.C:
// We haven't sent anything, so signal a send of a 0 packet to let them know we're alive
@ -289,6 +309,14 @@ func (intf *linkInterface) handler() error {
case <-recvTimer.C:
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
isAlive = false
case <-closeTimer.C:
// We haven't received anything in a really long time, so things have died at the switch level and then some...
// Just close the connection at this point...
select {
case ret <- errors.New("timeout"):
default:
}
intf.msgIO.close()
}
}
}()
@ -299,7 +327,13 @@ func (intf *linkInterface) handler() error {
intf.peer.handlePacket(msg)
}
if err != nil {
return err
if err != io.EOF {
select {
case ret <- err:
default:
}
}
break
}
select {
case signalAlive <- len(msg) > 0:
@ -307,5 +341,15 @@ func (intf *linkInterface) handler() error {
}
}
////////////////////////////////////////////////////////////////////////////////
return nil
// Remember to set `err` to something useful before returning
select {
case err = <-ret:
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
default:
err = nil
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
return err
}

View file

@ -50,7 +50,7 @@ func (m *multicast) start() error {
}
listenString := fmt.Sprintf("[::]:%v", addr.Port)
lc := net.ListenConfig{
Control: multicastReuse,
Control: m.multicastReuse,
}
conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
if err != nil {
@ -61,6 +61,7 @@ func (m *multicast) start() error {
// Windows can't set this flag, so we need to handle it in other ways
}
m.multicastWake()
go m.listen()
go m.announce()
}

View file

@ -2,10 +2,32 @@
package yggdrasil
/*
#cgo CFLAGS: -x objective-c
#cgo LDFLAGS: -framework Foundation
#import <Foundation/Foundation.h>
void WakeUpAWDL() {
NSNetServiceBrowser *serviceBrowser;
serviceBrowser = [[NSNetServiceBrowser alloc] init];
serviceBrowser.includesPeerToPeer = YES;
[serviceBrowser searchForServicesOfType:@"_yggdrasil._tcp" inDomain:@""];
}
*/
import "C"
import "syscall"
import "golang.org/x/sys/unix"
func multicastReuse(network string, address string, c syscall.RawConn) error {
func (m *multicast) multicastWake() {
for _, intf := range m.interfaces() {
if intf.Name == "awdl0" {
m.core.log.Infoln("Multicast discovery is waking up AWDL")
C.WakeUpAWDL()
}
}
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error
var reuseport error
var recvanyif error

View file

@ -4,6 +4,10 @@ package yggdrasil
import "syscall"
func multicastReuse(network string, address string, c syscall.RawConn) error {
func (m *multicast) multicastWake() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
return nil
}

View file

@ -5,7 +5,11 @@ package yggdrasil
import "syscall"
import "golang.org/x/sys/unix"
func multicastReuse(network string, address string, c syscall.RawConn) error {
func (m *multicast) multicastWake() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error
var reuseport error

View file

@ -5,7 +5,11 @@ package yggdrasil
import "syscall"
import "golang.org/x/sys/windows"
func multicastReuse(network string, address string, c syscall.RawConn) error {
func (m *multicast) multicastWake() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error
var reuseaddr error

View file

@ -66,19 +66,36 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this...
in := make(chan []byte, 1) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil)
p.out = func(packet []byte) {
// This is to make very sure it never blocks
select {
case in <- packet:
return
default:
util.PutBytes(packet)
}
}
p.out = func(packet []byte) { in <- packet }
r.in = in
r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block
out := make(chan []byte, 32)
go func() {
for packet := range out {
p.handlePacket(packet)
}
}()
out2 := make(chan []byte, 32)
go func() {
// This worker makes sure r.out never blocks
// It will buffer traffic long enough for the switch worker to take it
// If (somehow) you can send faster than the switch can receive, then this would use unbounded memory
// But crypto slows sends enough that the switch should always be able to take the packets...
var buf [][]byte
for {
buf = append(buf, <-out2)
for len(buf) > 0 {
select {
case bs := <-out2:
buf = append(buf, bs)
case out <- buf[0]:
buf = buf[1:]
}
}
}
}()
r.out = func(packet []byte) { out2 <- packet }
r.toRecv = make(chan router_recvPacket, 32)
recv := make(chan []byte, 32)
send := make(chan []byte, 32)
@ -305,7 +322,6 @@ func (r *router) sendPacket(bs []byte) {
// Don't continue - drop the packet
return
}
sinfo.send <- bs
}
}

View file

@ -525,17 +525,36 @@ func (ss *sessions) resetInits() {
// It handles calling the relatively expensive crypto operations.
// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK.
func (sinfo *sessionInfo) doWorker() {
send := make(chan []byte, 32)
defer close(send)
go func() {
for bs := range send {
sinfo.doSend(bs)
}
}()
recv := make(chan *wire_trafficPacket, 32)
defer close(recv)
go func() {
for p := range recv {
sinfo.doRecv(p)
}
}()
for {
select {
case p, ok := <-sinfo.recv:
if ok {
sinfo.doRecv(p)
select {
case recv <- p:
default:
// We need something to not block, and it's best to drop it before we decrypt
util.PutBytes(p.Payload)
}
} else {
return
}
case bs, ok := <-sinfo.send:
if ok {
sinfo.doSend(bs)
send <- bs
} else {
return
}
@ -625,8 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) {
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
select {
case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}:
default: // avoid deadlocks, maybe do this somewhere else?...
}
sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}
}

View file

@ -176,6 +176,7 @@ type switchTable struct {
admin chan func() // Pass a lambda for the admin socket to query stuff
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
queueTotalMaxSize uint64 // Maximum combined size of queues
toRouter chan []byte // Packets to be sent to the router
}
// Minimum allowed total size of switch queues.
@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) {
t.idleIn = make(chan switchPort, 1024)
t.admin = make(chan func())
t.queueTotalMaxSize = SwitchQueueTotalMinSize
t.toRouter = make(chan []byte, 1)
}
// Safely gets a copy of this node's locator.
@ -215,7 +217,6 @@ func (t *switchTable) doMaintenance() {
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanDropped()
t.cleanPeers()
}
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
@ -272,28 +273,6 @@ func (t *switchTable) forgetPeer(port switchPort) {
}
}
// Clean all unresponsive peers from the table, needed in case a peer stops updating.
// Needed in case a non-parent peer keeps the connection open but stops sending updates.
// Also reclaims space from deleted peers by copying the map.
func (t *switchTable) cleanPeers() {
now := time.Now()
for port, peer := range t.data.peers {
if now.Sub(peer.time) > switch_timeout+switch_throttle {
// Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding.
delete(t.data.peers, port)
go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe
}
}
if _, isIn := t.data.peers[t.parent]; !isIn {
// The root timestamp would probably time out before this happens, but better safe than sorry.
// We removed the current parent, so find a new one.
t.parent = 0
for _, peer := range t.data.peers {
t.unlockedHandleMsg(&peer.msg, peer.port, true)
}
}
}
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup.
@ -570,23 +549,23 @@ func (t *switchTable) start() error {
return nil
}
// Check if a packet should go to the self node
// This means there's no node closer to the destination than us
// This is mainly used to identify packets addressed to us, or that hit a blackhole
func (t *switchTable) selfIsClosest(dest []byte) bool {
// Return a map of ports onto distance, keeping only ports closer to the destination than this node
// If the map is empty (or nil), then no peer is closer
func (t *switchTable) getCloser(dest []byte) map[switchPort]int {
table := t.getTable()
myDist := table.self.dist(dest)
if myDist == 0 {
// Skip the iteration step if it's impossible to be closer
return true
return nil
}
closer := make(map[switchPort]int, len(table.elems))
for _, info := range table.elems {
dist := info.locator.dist(dest)
if dist < myDist {
return false
closer[info.port] = dist
}
}
return true
return closer
}
// Returns true if the peer is closer to the destination than ourself
@ -639,26 +618,48 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet)
ports := t.core.peers.getPorts()
if t.selfIsClosest(coords) {
closer := t.getCloser(coords)
if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer?
ports[0].sendPacket(packet)
t.toRouter <- packet
return true
}
table := t.getTable()
myDist := table.self.dist(coords)
var best *peer
bestDist := myDist
for port := range idle {
if to := ports[port]; to != nil {
if info, isIn := table.elems[to.port]; isIn {
dist := info.locator.dist(coords)
if !(dist < bestDist) {
continue
var bestDist int
var bestCoordLen int
ports := t.core.peers.getPorts()
for port, dist := range closer {
to := ports[port]
_, isIdle := idle[port]
coordLen := len(table.elems[port].locator.coords)
var update bool
switch {
case to == nil:
//nothing
case !isIdle:
//nothing
case best == nil:
update = true
case dist < bestDist:
update = true
case dist > bestDist:
//nothing
case coordLen < bestCoordLen:
update = true
/*
case coordLen > bestCoordLen:
//nothing
case port < best.port:
update = true
*/
default:
//nothing
}
if update {
best = to
bestDist = dist
}
bestCoordLen = coordLen
}
}
if best != nil {
@ -697,7 +698,7 @@ func (b *switch_buffers) cleanup(t *switchTable) {
// Remove queues for which we have no next hop
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if t.selfIsClosest(coords) {
if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets {
util.PutBytes(packet.bytes)
}
@ -776,10 +777,38 @@ func (t *switchTable) handleIdle(port switchPort) bool {
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
sendingToRouter := make(chan []byte, 1)
go func() {
// Keep sending packets to the router
self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter {
self.sendPacket(bs)
}
}()
go func() {
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
var buf [][]byte
for {
buf = append(buf, <-t.toRouter)
for len(buf) > 0 {
select {
case bs := <-t.toRouter:
buf = append(buf, bs)
for len(buf) > 32 {
util.PutBytes(buf[0])
buf = buf[1:]
}
case sendingToRouter <- buf[0]:
buf = buf[1:]
}
}
}
}()
t.queues.switchTable = t
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things
for {
//t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs))
select {
case bytes := <-t.packetIn:
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)