mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-28 22:25:07 +03:00
WIP moving IP-specific checks from tuntap to core
This commit is contained in:
parent
bb66851c2b
commit
1147ee1934
9 changed files with 339 additions and 220 deletions
|
@ -46,7 +46,7 @@ type Session struct {
|
|||
|
||||
func (c *Core) GetSelf() Self {
|
||||
var self Self
|
||||
s := c.PacketConn.PacketConn.Debug.GetSelf()
|
||||
s := c.pc.PacketConn.Debug.GetSelf()
|
||||
self.Key = s.Key
|
||||
self.Root = s.Root
|
||||
self.Coords = s.Coords
|
||||
|
@ -55,7 +55,7 @@ func (c *Core) GetSelf() Self {
|
|||
|
||||
func (c *Core) GetPeers() []Peer {
|
||||
var peers []Peer
|
||||
ps := c.PacketConn.PacketConn.Debug.GetPeers()
|
||||
ps := c.pc.PacketConn.Debug.GetPeers()
|
||||
for _, p := range ps {
|
||||
var info Peer
|
||||
info.Key = p.Key
|
||||
|
@ -69,7 +69,7 @@ func (c *Core) GetPeers() []Peer {
|
|||
|
||||
func (c *Core) GetDHT() []DHTEntry {
|
||||
var dhts []DHTEntry
|
||||
ds := c.PacketConn.PacketConn.Debug.GetDHT()
|
||||
ds := c.pc.PacketConn.Debug.GetDHT()
|
||||
for _, d := range ds {
|
||||
var info DHTEntry
|
||||
info.Key = d.Key
|
||||
|
@ -82,7 +82,7 @@ func (c *Core) GetDHT() []DHTEntry {
|
|||
|
||||
func (c *Core) GetPaths() []PathEntry {
|
||||
var paths []PathEntry
|
||||
ps := c.PacketConn.PacketConn.Debug.GetPaths()
|
||||
ps := c.pc.PacketConn.Debug.GetPaths()
|
||||
for _, p := range ps {
|
||||
var info PathEntry
|
||||
info.Key = p.Key
|
||||
|
@ -94,7 +94,7 @@ func (c *Core) GetPaths() []PathEntry {
|
|||
|
||||
func (c *Core) GetSessions() []Session {
|
||||
var sessions []Session
|
||||
ss := c.PacketConn.Debug.GetSessions()
|
||||
ss := c.pc.Debug.GetSessions()
|
||||
for _, s := range ss {
|
||||
var info Session
|
||||
info.Key = s.Key
|
||||
|
|
|
@ -25,11 +25,13 @@ type Core struct {
|
|||
// We're going to keep our own copy of the provided config - that way we can
|
||||
// guarantee that it will be covered by the mutex
|
||||
phony.Inbox
|
||||
*iw.PacketConn
|
||||
pc *iw.PacketConn
|
||||
config *config.NodeConfig // Config
|
||||
secret ed25519.PrivateKey
|
||||
public ed25519.PublicKey
|
||||
links links
|
||||
proto protoHandler
|
||||
store keyStore
|
||||
log *log.Logger
|
||||
addPeerTimer *time.Timer
|
||||
ctx context.Context
|
||||
|
@ -59,8 +61,10 @@ func (c *Core) _init() error {
|
|||
c.public = c.secret.Public().(ed25519.PublicKey)
|
||||
// TODO check public against current.PublicKey, error if they don't match
|
||||
|
||||
c.PacketConn, err = iw.NewPacketConn(c.secret)
|
||||
c.pc, err = iw.NewPacketConn(c.secret)
|
||||
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
|
||||
c.store.init(c)
|
||||
c.proto.init(c)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -160,7 +164,7 @@ func (c *Core) Stop() {
|
|||
func (c *Core) _stop() {
|
||||
c.log.Infoln("Stopping...")
|
||||
c.ctxCancel()
|
||||
c.PacketConn.Close()
|
||||
c.pc.Close()
|
||||
if c.addPeerTimer != nil {
|
||||
c.addPeerTimer.Stop()
|
||||
c.addPeerTimer = nil
|
||||
|
@ -173,3 +177,20 @@ func (c *Core) _stop() {
|
|||
*/
|
||||
c.log.Infoln("Stopped")
|
||||
}
|
||||
|
||||
// Implement io.ReadWriteCloser
|
||||
|
||||
func (c *Core) Read(p []byte) (n int, err error) {
|
||||
n, err = c.store.readPC(p)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Core) Write(p []byte) (n int, err error) {
|
||||
n, err = c.store.writePC(p)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Core) Close() error {
|
||||
c.Stop()
|
||||
return nil
|
||||
}
|
||||
|
|
295
src/core/keystore.go
Normal file
295
src/core/keystore.go
Normal file
|
@ -0,0 +1,295 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
iwt "github.com/Arceliar/ironwood/types"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
)
|
||||
|
||||
const keyStoreTimeout = 2 * time.Minute
|
||||
|
||||
type keyArray [ed25519.PublicKeySize]byte
|
||||
|
||||
type keyStore struct {
|
||||
core *Core
|
||||
address address.Address
|
||||
subnet address.Subnet
|
||||
mutex sync.Mutex
|
||||
keyToInfo map[keyArray]*keyInfo
|
||||
addrToInfo map[address.Address]*keyInfo
|
||||
addrBuffer map[address.Address]*buffer
|
||||
subnetToInfo map[address.Subnet]*keyInfo
|
||||
subnetBuffer map[address.Subnet]*buffer
|
||||
buf []byte // scratch space to prefix with typeSessionTraffic before sending
|
||||
}
|
||||
|
||||
type keyInfo struct {
|
||||
key keyArray
|
||||
address address.Address
|
||||
subnet address.Subnet
|
||||
timeout *time.Timer // From calling a time.AfterFunc to do cleanup
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
packets [][]byte
|
||||
timeout *time.Timer
|
||||
}
|
||||
|
||||
func (k *keyStore) init(core *Core) {
|
||||
k.core = core
|
||||
k.address = *address.AddrForKey(k.core.public)
|
||||
k.subnet = *address.SubnetForKey(k.core.public)
|
||||
k.core.pc.SetOutOfBandHandler(k.oobHandler)
|
||||
k.keyToInfo = make(map[keyArray]*keyInfo)
|
||||
k.addrToInfo = make(map[address.Address]*keyInfo)
|
||||
k.addrBuffer = make(map[address.Address]*buffer)
|
||||
k.subnetToInfo = make(map[address.Subnet]*keyInfo)
|
||||
k.subnetBuffer = make(map[address.Subnet]*buffer)
|
||||
}
|
||||
|
||||
func (k *keyStore) sendToAddress(addr address.Address, bs []byte) {
|
||||
k.mutex.Lock()
|
||||
if info := k.addrToInfo[addr]; info != nil {
|
||||
k.resetTimeout(info)
|
||||
k.mutex.Unlock()
|
||||
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
|
||||
} else {
|
||||
var buf *buffer
|
||||
if buf = k.addrBuffer[addr]; buf == nil {
|
||||
buf = new(buffer)
|
||||
k.addrBuffer[addr] = buf
|
||||
}
|
||||
msg := append([]byte(nil), bs...)
|
||||
buf.packets = append(buf.packets, msg)
|
||||
if buf.timeout != nil {
|
||||
buf.timeout.Stop()
|
||||
}
|
||||
buf.timeout = time.AfterFunc(keyStoreTimeout, func() {
|
||||
k.mutex.Lock()
|
||||
defer k.mutex.Unlock()
|
||||
if nbuf := k.addrBuffer[addr]; nbuf == buf {
|
||||
delete(k.addrBuffer, addr)
|
||||
}
|
||||
})
|
||||
k.mutex.Unlock()
|
||||
k.sendKeyLookup(addr.GetKey())
|
||||
}
|
||||
}
|
||||
|
||||
func (k *keyStore) sendToSubnet(subnet address.Subnet, bs []byte) {
|
||||
k.mutex.Lock()
|
||||
if info := k.subnetToInfo[subnet]; info != nil {
|
||||
k.resetTimeout(info)
|
||||
k.mutex.Unlock()
|
||||
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
|
||||
} else {
|
||||
var buf *buffer
|
||||
if buf = k.subnetBuffer[subnet]; buf == nil {
|
||||
buf = new(buffer)
|
||||
k.subnetBuffer[subnet] = buf
|
||||
}
|
||||
msg := append([]byte(nil), bs...)
|
||||
buf.packets = append(buf.packets, msg)
|
||||
if buf.timeout != nil {
|
||||
buf.timeout.Stop()
|
||||
}
|
||||
buf.timeout = time.AfterFunc(keyStoreTimeout, func() {
|
||||
k.mutex.Lock()
|
||||
defer k.mutex.Unlock()
|
||||
if nbuf := k.subnetBuffer[subnet]; nbuf == buf {
|
||||
delete(k.subnetBuffer, subnet)
|
||||
}
|
||||
})
|
||||
k.mutex.Unlock()
|
||||
k.sendKeyLookup(subnet.GetKey())
|
||||
}
|
||||
}
|
||||
|
||||
func (k *keyStore) update(key ed25519.PublicKey) *keyInfo {
|
||||
k.mutex.Lock()
|
||||
var kArray keyArray
|
||||
copy(kArray[:], key)
|
||||
var info *keyInfo
|
||||
if info = k.keyToInfo[kArray]; info == nil {
|
||||
info = new(keyInfo)
|
||||
info.key = kArray
|
||||
info.address = *address.AddrForKey(ed25519.PublicKey(info.key[:]))
|
||||
info.subnet = *address.SubnetForKey(ed25519.PublicKey(info.key[:]))
|
||||
k.keyToInfo[info.key] = info
|
||||
k.addrToInfo[info.address] = info
|
||||
k.subnetToInfo[info.subnet] = info
|
||||
k.resetTimeout(info)
|
||||
k.mutex.Unlock()
|
||||
if buf := k.addrBuffer[info.address]; buf != nil {
|
||||
for _, bs := range buf.packets {
|
||||
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
|
||||
}
|
||||
delete(k.addrBuffer, info.address)
|
||||
}
|
||||
if buf := k.subnetBuffer[info.subnet]; buf != nil {
|
||||
for _, bs := range buf.packets {
|
||||
_, _ = k.core.pc.WriteTo(bs, iwt.Addr(info.key[:]))
|
||||
}
|
||||
delete(k.subnetBuffer, info.subnet)
|
||||
}
|
||||
} else {
|
||||
k.resetTimeout(info)
|
||||
k.mutex.Unlock()
|
||||
}
|
||||
return info
|
||||
}
|
||||
|
||||
func (k *keyStore) resetTimeout(info *keyInfo) {
|
||||
if info.timeout != nil {
|
||||
info.timeout.Stop()
|
||||
}
|
||||
info.timeout = time.AfterFunc(keyStoreTimeout, func() {
|
||||
k.mutex.Lock()
|
||||
defer k.mutex.Unlock()
|
||||
if nfo := k.keyToInfo[info.key]; nfo == info {
|
||||
delete(k.keyToInfo, info.key)
|
||||
}
|
||||
if nfo := k.addrToInfo[info.address]; nfo == info {
|
||||
delete(k.addrToInfo, info.address)
|
||||
}
|
||||
if nfo := k.subnetToInfo[info.subnet]; nfo == info {
|
||||
delete(k.subnetToInfo, info.subnet)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (k *keyStore) oobHandler(fromKey, toKey ed25519.PublicKey, data []byte) {
|
||||
if len(data) != 1+ed25519.SignatureSize {
|
||||
return
|
||||
}
|
||||
sig := data[1:]
|
||||
switch data[0] {
|
||||
case typeKeyLookup:
|
||||
snet := *address.SubnetForKey(toKey)
|
||||
if snet == k.subnet && ed25519.Verify(fromKey, toKey[:], sig) {
|
||||
// This is looking for at least our subnet (possibly our address)
|
||||
// Send a response
|
||||
k.sendKeyResponse(fromKey)
|
||||
}
|
||||
case typeKeyResponse:
|
||||
// TODO keep a list of something to match against...
|
||||
// Ignore the response if it doesn't match anything of interest...
|
||||
if ed25519.Verify(fromKey, toKey[:], sig) {
|
||||
k.update(fromKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *keyStore) sendKeyLookup(partial ed25519.PublicKey) {
|
||||
sig := ed25519.Sign(k.core.secret, partial[:])
|
||||
bs := append([]byte{typeKeyLookup}, sig...)
|
||||
_ = k.core.pc.SendOutOfBand(partial, bs)
|
||||
}
|
||||
|
||||
func (k *keyStore) sendKeyResponse(dest ed25519.PublicKey) {
|
||||
sig := ed25519.Sign(k.core.secret, dest[:])
|
||||
bs := append([]byte{typeKeyResponse}, sig...)
|
||||
_ = k.core.pc.SendOutOfBand(dest, bs)
|
||||
}
|
||||
|
||||
func (k *keyStore) maxSessionMTU() uint64 {
|
||||
const sessionTypeOverhead = 1
|
||||
return k.core.pc.MTU() - sessionTypeOverhead
|
||||
}
|
||||
|
||||
func (k *keyStore) readPC(p []byte) (int, error) {
|
||||
for {
|
||||
bs := p
|
||||
n, from, err := k.core.pc.ReadFrom(bs)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
if n == 0 {
|
||||
continue
|
||||
}
|
||||
switch bs[0] {
|
||||
case typeSessionTraffic:
|
||||
// This is what we want to handle here
|
||||
case typeSessionProto:
|
||||
var key keyArray
|
||||
copy(key[:], from.(iwt.Addr))
|
||||
data := append([]byte(nil), bs[1:n]...)
|
||||
k.core.proto.handleProto(nil, key, data)
|
||||
continue
|
||||
default:
|
||||
continue
|
||||
}
|
||||
bs = bs[1:n]
|
||||
if len(bs) == 0 {
|
||||
continue
|
||||
}
|
||||
if bs[0]&0xf0 != 0x60 {
|
||||
continue // not IPv6
|
||||
}
|
||||
if len(bs) < 40 {
|
||||
continue
|
||||
}
|
||||
/* TODO ICMP packet too big
|
||||
if len(bs) > int(tun.MTU()) {
|
||||
ptb := &icmp.PacketTooBig{
|
||||
MTU: int(tun.mtu),
|
||||
Data: bs[:40],
|
||||
}
|
||||
if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
|
||||
_, _ = tun.core.WriteTo(packet, from)
|
||||
}
|
||||
continue
|
||||
}
|
||||
*/
|
||||
var srcAddr, dstAddr address.Address
|
||||
var srcSubnet, dstSubnet address.Subnet
|
||||
copy(srcAddr[:], bs[8:])
|
||||
copy(dstAddr[:], bs[24:])
|
||||
copy(srcSubnet[:], bs[8:])
|
||||
copy(dstSubnet[:], bs[24:])
|
||||
if dstAddr != k.address && dstSubnet != k.subnet {
|
||||
continue // bad local address/subnet
|
||||
}
|
||||
info := k.update(ed25519.PublicKey(from.(iwt.Addr)))
|
||||
if srcAddr != info.address && srcSubnet != info.subnet {
|
||||
continue // bad remote address/subnet
|
||||
}
|
||||
n = copy(p, bs)
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (k *keyStore) writePC(bs []byte) (int, error) {
|
||||
if bs[0]&0xf0 != 0x60 {
|
||||
return 0, errors.New("not an IPv6 packet") // not IPv6
|
||||
}
|
||||
if len(bs) < 40 {
|
||||
strErr := fmt.Sprint("undersized IPv6 packet, length:", len(bs))
|
||||
return 0, errors.New(strErr)
|
||||
}
|
||||
var srcAddr, dstAddr address.Address
|
||||
var srcSubnet, dstSubnet address.Subnet
|
||||
copy(srcAddr[:], bs[8:])
|
||||
copy(dstAddr[:], bs[24:])
|
||||
copy(srcSubnet[:], bs[8:])
|
||||
copy(dstSubnet[:], bs[24:])
|
||||
if srcAddr != k.address && srcSubnet != k.subnet {
|
||||
return 0, errors.New("wrong source address")
|
||||
}
|
||||
k.buf = append(k.buf[:0], typeSessionTraffic)
|
||||
k.buf = append(k.buf, bs...)
|
||||
if dstAddr.IsValid() {
|
||||
k.sendToAddress(dstAddr, k.buf)
|
||||
} else if dstSubnet.IsValid() {
|
||||
k.sendToSubnet(dstSubnet, k.buf)
|
||||
} else {
|
||||
return 0, errors.New("invalid destination address")
|
||||
}
|
||||
return len(bs), nil
|
||||
}
|
|
@ -20,8 +20,6 @@ import (
|
|||
//"github.com/Arceliar/phony" // TODO? use instead of mutexes
|
||||
)
|
||||
|
||||
type keyArray [ed25519.PublicKeySize]byte
|
||||
|
||||
type links struct {
|
||||
core *Core
|
||||
mutex sync.RWMutex // protects links below
|
||||
|
@ -231,7 +229,7 @@ func (intf *link) handler() (chan struct{}, error) {
|
|||
intf.links.core.log.Infof("Connected %s: %s, source %s",
|
||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||
// Run the handler
|
||||
err = intf.links.core.PacketConn.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
|
||||
err = intf.links.core.pc.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
|
||||
// TODO don't report an error if it's just a 'use of closed network connection'
|
||||
if err != nil {
|
||||
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
||||
|
|
189
src/core/nodeinfo.go
Normal file
189
src/core/nodeinfo.go
Normal file
|
@ -0,0 +1,189 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
iwt "github.com/Arceliar/ironwood/types"
|
||||
"github.com/Arceliar/phony"
|
||||
|
||||
//"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/version"
|
||||
)
|
||||
|
||||
// NodeInfoPayload represents a RequestNodeInfo response, in bytes.
|
||||
type NodeInfoPayload []byte
|
||||
|
||||
type nodeinfo struct {
|
||||
phony.Inbox
|
||||
proto *protoHandler
|
||||
myNodeInfo NodeInfoPayload
|
||||
callbacks map[keyArray]nodeinfoCallback
|
||||
}
|
||||
|
||||
type nodeinfoCallback struct {
|
||||
call func(nodeinfo NodeInfoPayload)
|
||||
created time.Time
|
||||
}
|
||||
|
||||
// Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep
|
||||
// the cache/callback maps clean of stale entries
|
||||
func (m *nodeinfo) init(proto *protoHandler) {
|
||||
m.Act(nil, func() {
|
||||
m._init(proto)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _init(proto *protoHandler) {
|
||||
m.proto = proto
|
||||
m.callbacks = make(map[keyArray]nodeinfoCallback)
|
||||
m._cleanup()
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _cleanup() {
|
||||
for boxPubKey, callback := range m.callbacks {
|
||||
if time.Since(callback.created) > time.Minute {
|
||||
delete(m.callbacks, boxPubKey)
|
||||
}
|
||||
}
|
||||
time.AfterFunc(time.Second*30, func() {
|
||||
m.Act(nil, m._cleanup)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _addCallback(sender keyArray, call func(nodeinfo NodeInfoPayload)) {
|
||||
m.callbacks[sender] = nodeinfoCallback{
|
||||
created: time.Now(),
|
||||
call: call,
|
||||
}
|
||||
}
|
||||
|
||||
// Handles the callback, if there is one
|
||||
func (m *nodeinfo) _callback(sender keyArray, nodeinfo NodeInfoPayload) {
|
||||
if callback, ok := m.callbacks[sender]; ok {
|
||||
callback.call(nodeinfo)
|
||||
delete(m.callbacks, sender)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _getNodeInfo() NodeInfoPayload {
|
||||
return m.myNodeInfo
|
||||
}
|
||||
|
||||
// Set the current node's nodeinfo
|
||||
func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) (err error) {
|
||||
phony.Block(m, func() {
|
||||
err = m._setNodeInfo(given, privacy)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _setNodeInfo(given interface{}, privacy bool) error {
|
||||
defaults := map[string]interface{}{
|
||||
"buildname": version.BuildName(),
|
||||
"buildversion": version.BuildVersion(),
|
||||
"buildplatform": runtime.GOOS,
|
||||
"buildarch": runtime.GOARCH,
|
||||
}
|
||||
newnodeinfo := make(map[string]interface{})
|
||||
if !privacy {
|
||||
for k, v := range defaults {
|
||||
newnodeinfo[k] = v
|
||||
}
|
||||
}
|
||||
if nodeinfomap, ok := given.(map[string]interface{}); ok {
|
||||
for key, value := range nodeinfomap {
|
||||
if _, ok := defaults[key]; ok {
|
||||
if strvalue, strok := value.(string); strok && strings.EqualFold(strvalue, "null") || value == nil {
|
||||
delete(newnodeinfo, key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
newnodeinfo[key] = value
|
||||
}
|
||||
}
|
||||
newjson, err := json.Marshal(newnodeinfo)
|
||||
if err == nil {
|
||||
if len(newjson) > 16384 {
|
||||
return errors.New("NodeInfo exceeds max length of 16384 bytes")
|
||||
}
|
||||
m.myNodeInfo = newjson
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *nodeinfo) sendReq(from phony.Actor, key keyArray, callback func(nodeinfo NodeInfoPayload)) {
|
||||
m.Act(from, func() {
|
||||
m._sendReq(key, callback)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _sendReq(key keyArray, callback func(nodeinfo NodeInfoPayload)) {
|
||||
if callback != nil {
|
||||
m._addCallback(key, callback)
|
||||
}
|
||||
_, _ = m.proto.core.pc.WriteTo([]byte{typeSessionProto, typeProtoNodeInfoRequest}, iwt.Addr(key[:]))
|
||||
}
|
||||
|
||||
func (m *nodeinfo) handleReq(from phony.Actor, key keyArray) {
|
||||
m.Act(from, func() {
|
||||
m._sendRes(key)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *nodeinfo) handleRes(from phony.Actor, key keyArray, info NodeInfoPayload) {
|
||||
m.Act(from, func() {
|
||||
m._callback(key, info)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *nodeinfo) _sendRes(key keyArray) {
|
||||
bs := append([]byte{typeSessionProto, typeProtoNodeInfoResponse}, m._getNodeInfo()...)
|
||||
_, _ = m.proto.core.pc.WriteTo(bs, iwt.Addr(key[:]))
|
||||
}
|
||||
|
||||
// Admin socket stuff
|
||||
|
||||
type GetNodeInfoRequest struct {
|
||||
Key string `json:"key"`
|
||||
}
|
||||
type GetNodeInfoResponse map[string]interface{}
|
||||
|
||||
func (m *nodeinfo) nodeInfoAdminHandler(in json.RawMessage) (interface{}, error) {
|
||||
var req GetNodeInfoRequest
|
||||
if err := json.Unmarshal(in, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var key keyArray
|
||||
var kbs []byte
|
||||
var err error
|
||||
if kbs, err = hex.DecodeString(req.Key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copy(key[:], kbs)
|
||||
ch := make(chan []byte, 1)
|
||||
m.sendReq(nil, key, func(info NodeInfoPayload) {
|
||||
ch <- info
|
||||
})
|
||||
timer := time.NewTimer(6 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil, errors.New("timeout")
|
||||
case info := <-ch:
|
||||
var msg json.RawMessage
|
||||
if err := msg.UnmarshalJSON(info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip := net.IP(address.AddrForKey(kbs)[:])
|
||||
res := GetNodeInfoResponse{ip.String(): msg}
|
||||
return res, nil
|
||||
}
|
||||
}
|
349
src/core/proto.go
Normal file
349
src/core/proto.go
Normal file
|
@ -0,0 +1,349 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
iwt "github.com/Arceliar/ironwood/types"
|
||||
"github.com/Arceliar/phony"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
)
|
||||
|
||||
const (
|
||||
typeDebugDummy = iota
|
||||
typeDebugGetSelfRequest
|
||||
typeDebugGetSelfResponse
|
||||
typeDebugGetPeersRequest
|
||||
typeDebugGetPeersResponse
|
||||
typeDebugGetDHTRequest
|
||||
typeDebugGetDHTResponse
|
||||
)
|
||||
|
||||
type reqInfo struct {
|
||||
callback func([]byte)
|
||||
timer *time.Timer // time.AfterFunc cleanup
|
||||
}
|
||||
|
||||
type protoHandler struct {
|
||||
phony.Inbox
|
||||
core *Core
|
||||
nodeinfo nodeinfo
|
||||
sreqs map[keyArray]*reqInfo
|
||||
preqs map[keyArray]*reqInfo
|
||||
dreqs map[keyArray]*reqInfo
|
||||
}
|
||||
|
||||
func (p *protoHandler) init(core *Core) {
|
||||
p.core = core
|
||||
p.nodeinfo.init(p)
|
||||
p.sreqs = make(map[keyArray]*reqInfo)
|
||||
p.preqs = make(map[keyArray]*reqInfo)
|
||||
p.dreqs = make(map[keyArray]*reqInfo)
|
||||
}
|
||||
|
||||
func (p *protoHandler) handleProto(from phony.Actor, key keyArray, bs []byte) {
|
||||
if len(bs) == 0 {
|
||||
return
|
||||
}
|
||||
switch bs[0] {
|
||||
case typeProtoDummy:
|
||||
case typeProtoNodeInfoRequest:
|
||||
p.nodeinfo.handleReq(p, key)
|
||||
case typeProtoNodeInfoResponse:
|
||||
p.nodeinfo.handleRes(p, key, bs[1:])
|
||||
case typeProtoDebug:
|
||||
p._handleDebug(key, bs[1:])
|
||||
}
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleDebug(key keyArray, bs []byte) {
|
||||
if len(bs) == 0 {
|
||||
return
|
||||
}
|
||||
switch bs[0] {
|
||||
case typeDebugDummy:
|
||||
case typeDebugGetSelfRequest:
|
||||
p._handleGetSelfRequest(key)
|
||||
case typeDebugGetSelfResponse:
|
||||
p._handleGetSelfResponse(key, bs[1:])
|
||||
case typeDebugGetPeersRequest:
|
||||
p._handleGetPeersRequest(key)
|
||||
case typeDebugGetPeersResponse:
|
||||
p._handleGetPeersResponse(key, bs[1:])
|
||||
case typeDebugGetDHTRequest:
|
||||
p._handleGetDHTRequest(key)
|
||||
case typeDebugGetDHTResponse:
|
||||
p._handleGetDHTResponse(key, bs[1:])
|
||||
}
|
||||
}
|
||||
|
||||
func (p *protoHandler) sendGetSelfRequest(key keyArray, callback func([]byte)) {
|
||||
p.Act(nil, func() {
|
||||
if info := p.sreqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
delete(p.sreqs, key)
|
||||
}
|
||||
info := new(reqInfo)
|
||||
info.callback = callback
|
||||
info.timer = time.AfterFunc(time.Minute, func() {
|
||||
p.Act(nil, func() {
|
||||
if p.sreqs[key] == info {
|
||||
delete(p.sreqs, key)
|
||||
}
|
||||
})
|
||||
})
|
||||
p.sreqs[key] = info
|
||||
p._sendDebug(key, typeDebugGetSelfRequest, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetSelfRequest(key keyArray) {
|
||||
self := p.core.GetSelf()
|
||||
res := map[string]string{
|
||||
"key": hex.EncodeToString(self.Key[:]),
|
||||
"coords": fmt.Sprintf("%v", self.Coords),
|
||||
}
|
||||
bs, err := json.Marshal(res) // FIXME this puts keys in base64, not hex
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
p._sendDebug(key, typeDebugGetSelfResponse, bs)
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetSelfResponse(key keyArray, bs []byte) {
|
||||
if info := p.sreqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
info.callback(bs)
|
||||
delete(p.sreqs, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *protoHandler) sendGetPeersRequest(key keyArray, callback func([]byte)) {
|
||||
p.Act(nil, func() {
|
||||
if info := p.preqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
delete(p.preqs, key)
|
||||
}
|
||||
info := new(reqInfo)
|
||||
info.callback = callback
|
||||
info.timer = time.AfterFunc(time.Minute, func() {
|
||||
p.Act(nil, func() {
|
||||
if p.preqs[key] == info {
|
||||
delete(p.preqs, key)
|
||||
}
|
||||
})
|
||||
})
|
||||
p.preqs[key] = info
|
||||
p._sendDebug(key, typeDebugGetPeersRequest, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetPeersRequest(key keyArray) {
|
||||
peers := p.core.GetPeers()
|
||||
var bs []byte
|
||||
for _, pinfo := range peers {
|
||||
tmp := append(bs, pinfo.Key[:]...)
|
||||
const responseOverhead = 2 // 1 debug type, 1 getpeers type
|
||||
if uint64(len(tmp))+responseOverhead > p.core.store.maxSessionMTU() {
|
||||
break
|
||||
}
|
||||
bs = tmp
|
||||
}
|
||||
p._sendDebug(key, typeDebugGetPeersResponse, bs)
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetPeersResponse(key keyArray, bs []byte) {
|
||||
if info := p.preqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
info.callback(bs)
|
||||
delete(p.preqs, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *protoHandler) sendGetDHTRequest(key keyArray, callback func([]byte)) {
|
||||
p.Act(nil, func() {
|
||||
if info := p.dreqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
delete(p.dreqs, key)
|
||||
}
|
||||
info := new(reqInfo)
|
||||
info.callback = callback
|
||||
info.timer = time.AfterFunc(time.Minute, func() {
|
||||
p.Act(nil, func() {
|
||||
if p.dreqs[key] == info {
|
||||
delete(p.dreqs, key)
|
||||
}
|
||||
})
|
||||
})
|
||||
p.dreqs[key] = info
|
||||
p._sendDebug(key, typeDebugGetDHTRequest, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetDHTRequest(key keyArray) {
|
||||
dinfos := p.core.GetDHT()
|
||||
var bs []byte
|
||||
for _, dinfo := range dinfos {
|
||||
tmp := append(bs, dinfo.Key[:]...)
|
||||
const responseOverhead = 2 // 1 debug type, 1 getdht type
|
||||
if uint64(len(tmp))+responseOverhead > p.core.store.maxSessionMTU() {
|
||||
break
|
||||
}
|
||||
bs = tmp
|
||||
}
|
||||
p._sendDebug(key, typeDebugGetDHTResponse, bs)
|
||||
}
|
||||
|
||||
func (p *protoHandler) _handleGetDHTResponse(key keyArray, bs []byte) {
|
||||
if info := p.dreqs[key]; info != nil {
|
||||
info.timer.Stop()
|
||||
info.callback(bs)
|
||||
delete(p.dreqs, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *protoHandler) _sendDebug(key keyArray, dType uint8, data []byte) {
|
||||
bs := append([]byte{typeSessionProto, typeProtoDebug, dType}, data...)
|
||||
_, _ = p.core.pc.WriteTo(bs, iwt.Addr(key[:]))
|
||||
}
|
||||
|
||||
// Admin socket stuff
|
||||
|
||||
type DebugGetSelfRequest struct {
|
||||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
type DebugGetSelfResponse map[string]interface{}
|
||||
|
||||
func (p *protoHandler) getSelfHandler(in json.RawMessage) (interface{}, error) {
|
||||
var req DebugGetSelfRequest
|
||||
if err := json.Unmarshal(in, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var key keyArray
|
||||
var kbs []byte
|
||||
var err error
|
||||
if kbs, err = hex.DecodeString(req.Key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copy(key[:], kbs)
|
||||
ch := make(chan []byte, 1)
|
||||
p.sendGetSelfRequest(key, func(info []byte) {
|
||||
ch <- info
|
||||
})
|
||||
timer := time.NewTimer(6 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil, errors.New("timeout")
|
||||
case info := <-ch:
|
||||
var msg json.RawMessage
|
||||
if err := msg.UnmarshalJSON(info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip := net.IP(address.AddrForKey(kbs)[:])
|
||||
res := DebugGetSelfResponse{ip.String(): msg}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
type DebugGetPeersRequest struct {
|
||||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
type DebugGetPeersResponse map[string]interface{}
|
||||
|
||||
func (p *protoHandler) getPeersHandler(in json.RawMessage) (interface{}, error) {
|
||||
var req DebugGetPeersRequest
|
||||
if err := json.Unmarshal(in, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var key keyArray
|
||||
var kbs []byte
|
||||
var err error
|
||||
if kbs, err = hex.DecodeString(req.Key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copy(key[:], kbs)
|
||||
ch := make(chan []byte, 1)
|
||||
p.sendGetPeersRequest(key, func(info []byte) {
|
||||
ch <- info
|
||||
})
|
||||
timer := time.NewTimer(6 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil, errors.New("timeout")
|
||||
case info := <-ch:
|
||||
ks := make(map[string][]string)
|
||||
bs := info
|
||||
for len(bs) >= len(key) {
|
||||
ks["keys"] = append(ks["keys"], hex.EncodeToString(bs[:len(key)]))
|
||||
bs = bs[len(key):]
|
||||
}
|
||||
js, err := json.Marshal(ks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var msg json.RawMessage
|
||||
if err := msg.UnmarshalJSON(js); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip := net.IP(address.AddrForKey(kbs)[:])
|
||||
res := DebugGetPeersResponse{ip.String(): msg}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
type DebugGetDHTRequest struct {
|
||||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
type DebugGetDHTResponse map[string]interface{}
|
||||
|
||||
func (p *protoHandler) getDHTHandler(in json.RawMessage) (interface{}, error) {
|
||||
var req DebugGetDHTRequest
|
||||
if err := json.Unmarshal(in, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var key keyArray
|
||||
var kbs []byte
|
||||
var err error
|
||||
if kbs, err = hex.DecodeString(req.Key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
copy(key[:], kbs)
|
||||
ch := make(chan []byte, 1)
|
||||
p.sendGetDHTRequest(key, func(info []byte) {
|
||||
ch <- info
|
||||
})
|
||||
timer := time.NewTimer(6 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil, errors.New("timeout")
|
||||
case info := <-ch:
|
||||
ks := make(map[string][]string)
|
||||
bs := info
|
||||
for len(bs) >= len(key) {
|
||||
ks["keys"] = append(ks["keys"], hex.EncodeToString(bs[:len(key)]))
|
||||
bs = bs[len(key):]
|
||||
}
|
||||
js, err := json.Marshal(ks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var msg json.RawMessage
|
||||
if err := msg.UnmarshalJSON(js); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip := net.IP(address.AddrForKey(kbs)[:])
|
||||
res := DebugGetDHTResponse{ip.String(): msg}
|
||||
return res, nil
|
||||
}
|
||||
}
|
23
src/core/types.go
Normal file
23
src/core/types.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package core
|
||||
|
||||
// Out-of-band packet types
|
||||
const (
|
||||
typeKeyDummy = iota // nolint:deadcode,varcheck
|
||||
typeKeyLookup
|
||||
typeKeyResponse
|
||||
)
|
||||
|
||||
// In-band packet types
|
||||
const (
|
||||
typeSessionDummy = iota // nolint:deadcode,varcheck
|
||||
typeSessionTraffic
|
||||
typeSessionProto
|
||||
)
|
||||
|
||||
// Protocol packet types
|
||||
const (
|
||||
typeProtoDummy = iota
|
||||
typeProtoNodeInfoRequest
|
||||
typeProtoNodeInfoResponse
|
||||
typeProtoDebug = 255
|
||||
)
|
Loading…
Add table
Add a link
Reference in a new issue