mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-28 14:15:06 +03:00
Switch back to using an actor to manage link state, and slighty randomize the delay between multicast announcements. This seems to fix the issue with duplicate connections (and breaks a livelock in the multicast code where both nodes keep closing the listen side of their connection, but that's kind of a hack, we need a better solution)
This commit is contained in:
parent
2eda59d9e4
commit
c1ae9ea0d4
3 changed files with 257 additions and 241 deletions
443
src/core/link.go
443
src/core/link.go
|
@ -13,7 +13,6 @@ import (
|
|||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -30,13 +29,14 @@ const (
|
|||
)
|
||||
|
||||
type links struct {
|
||||
core *Core
|
||||
tcp *linkTCP // TCP interface support
|
||||
tls *linkTLS // TLS interface support
|
||||
unix *linkUNIX // UNIX interface support
|
||||
socks *linkSOCKS // SOCKS interface support
|
||||
sync.RWMutex // Protects the below
|
||||
_links map[linkInfo]*link // *link is nil if connection in progress
|
||||
phony.Inbox
|
||||
core *Core
|
||||
tcp *linkTCP // TCP interface support
|
||||
tls *linkTLS // TLS interface support
|
||||
unix *linkUNIX // UNIX interface support
|
||||
socks *linkSOCKS // SOCKS interface support
|
||||
// _links can only be modified safely from within the links actor
|
||||
_links map[linkInfo]*link // *link is nil if connection in progress
|
||||
}
|
||||
|
||||
type linkProtocol interface {
|
||||
|
@ -52,13 +52,13 @@ type linkInfo struct {
|
|||
|
||||
// link tracks the state of a connection, either persistent or non-persistent
|
||||
type link struct {
|
||||
kick chan struct{} // Attempt to reconnect now, if backing off
|
||||
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
|
||||
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
|
||||
sync.RWMutex // Protects the below
|
||||
_conn *linkConn // Connected link, if any, nil if not connected
|
||||
_err error // Last error on the connection, if any
|
||||
_errtime time.Time // Last time an error occured
|
||||
kick chan struct{} // Attempt to reconnect now, if backing off
|
||||
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
|
||||
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
|
||||
// The remaining fields can only be modified safely from within the links actor
|
||||
_conn *linkConn // Connected link, if any, nil if not connected
|
||||
_err error // Last error on the connection, if any
|
||||
_errtime time.Time // Last time an error occured
|
||||
}
|
||||
|
||||
type linkOptions struct {
|
||||
|
@ -131,183 +131,192 @@ const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid")
|
|||
const ErrLinkUnrecognisedSchema = linkError("link schema unknown")
|
||||
|
||||
func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
|
||||
// Generate the link info and see whether we think we already
|
||||
// have an open peering to this peer.
|
||||
lu := urlForLinkInfo(*u)
|
||||
info := linkInfo{
|
||||
uri: lu.String(),
|
||||
sintf: sintf,
|
||||
}
|
||||
|
||||
// Collect together the link options, these are global options
|
||||
// that are not specific to any given protocol.
|
||||
var options linkOptions
|
||||
for _, pubkey := range u.Query()["key"] {
|
||||
sigPub, err := hex.DecodeString(pubkey)
|
||||
if err != nil {
|
||||
return ErrLinkPinnedKeyInvalid
|
||||
var retErr error
|
||||
phony.Block(l, func() {
|
||||
// Generate the link info and see whether we think we already
|
||||
// have an open peering to this peer.
|
||||
lu := urlForLinkInfo(*u)
|
||||
info := linkInfo{
|
||||
uri: lu.String(),
|
||||
sintf: sintf,
|
||||
}
|
||||
var sigPubKey keyArray
|
||||
copy(sigPubKey[:], sigPub)
|
||||
if options.pinnedEd25519Keys == nil {
|
||||
options.pinnedEd25519Keys = map[keyArray]struct{}{}
|
||||
}
|
||||
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
|
||||
}
|
||||
if p := u.Query().Get("priority"); p != "" {
|
||||
pi, err := strconv.ParseUint(p, 10, 8)
|
||||
if err != nil {
|
||||
return ErrLinkPriorityInvalid
|
||||
}
|
||||
options.priority = uint8(pi)
|
||||
}
|
||||
|
||||
// If we think we're already connected to this peer, load up
|
||||
// the existing peer state. Try to kick the peer if possible,
|
||||
// which will cause an immediate connection attempt if it is
|
||||
// backing off for some reason.
|
||||
l.Lock()
|
||||
state, ok := l._links[info]
|
||||
if ok && state != nil {
|
||||
select {
|
||||
case state.kick <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
l.Unlock()
|
||||
return ErrLinkAlreadyConfigured
|
||||
}
|
||||
|
||||
// Create the link entry. This will contain the connection
|
||||
// in progress (if any), any error details and a context that
|
||||
// lets the link be cancelled later.
|
||||
state = &link{
|
||||
linkType: linkType,
|
||||
linkProto: strings.ToUpper(u.Scheme),
|
||||
kick: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Store the state of the link so that it can be queried later.
|
||||
l._links[info] = state
|
||||
l.Unlock()
|
||||
|
||||
// Track how many consecutive connection failures we have had,
|
||||
// as we will back off exponentially rather than hammering the
|
||||
// remote node endlessly.
|
||||
var backoff int
|
||||
|
||||
// backoffNow is called when there's a connection error. It
|
||||
// will wait for the specified amount of time and then return
|
||||
// true, unless the peering context was cancelled (due to a
|
||||
// peer removal most likely), in which case it returns false.
|
||||
// The caller should check the return value to decide whether
|
||||
// or not to give up trying.
|
||||
backoffNow := func() bool {
|
||||
backoff++
|
||||
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
|
||||
select {
|
||||
case <-time.After(duration):
|
||||
return true
|
||||
case <-state.kick:
|
||||
return true
|
||||
case <-l.core.ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// The goroutine is responsible for attempting the connection
|
||||
// and then running the handler. If the connection is persistent
|
||||
// then the loop will run endlessly, using backoffs as needed.
|
||||
// Otherwise the loop will end, cleaning up the link entry.
|
||||
go func() {
|
||||
defer func() {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
delete(l._links, info)
|
||||
}()
|
||||
|
||||
// This loop will run each and every time we want to attempt
|
||||
// a connection to this peer.
|
||||
for {
|
||||
conn, err := l.connect(u, info, options)
|
||||
// Collect together the link options, these are global options
|
||||
// that are not specific to any given protocol.
|
||||
var options linkOptions
|
||||
for _, pubkey := range u.Query()["key"] {
|
||||
sigPub, err := hex.DecodeString(pubkey)
|
||||
if err != nil {
|
||||
if linkType == linkTypePersistent {
|
||||
// If the link is a persistent configured peering,
|
||||
// store information about the connection error so
|
||||
// that we can report it through the admin socket.
|
||||
state.Lock()
|
||||
state._conn = nil
|
||||
state._err = err
|
||||
state._errtime = time.Now()
|
||||
state.Unlock()
|
||||
retErr = ErrLinkPinnedKeyInvalid
|
||||
return
|
||||
}
|
||||
var sigPubKey keyArray
|
||||
copy(sigPubKey[:], sigPub)
|
||||
if options.pinnedEd25519Keys == nil {
|
||||
options.pinnedEd25519Keys = map[keyArray]struct{}{}
|
||||
}
|
||||
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
|
||||
}
|
||||
if p := u.Query().Get("priority"); p != "" {
|
||||
pi, err := strconv.ParseUint(p, 10, 8)
|
||||
if err != nil {
|
||||
retErr = ErrLinkPriorityInvalid
|
||||
return
|
||||
}
|
||||
options.priority = uint8(pi)
|
||||
}
|
||||
|
||||
// Back off for a bit. If true is returned here, we
|
||||
// can continue onto the next loop iteration to try
|
||||
// the next connection.
|
||||
// If we think we're already connected to this peer, load up
|
||||
// the existing peer state. Try to kick the peer if possible,
|
||||
// which will cause an immediate connection attempt if it is
|
||||
// backing off for some reason.
|
||||
state, ok := l._links[info]
|
||||
if ok && state != nil {
|
||||
select {
|
||||
case state.kick <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
retErr = ErrLinkAlreadyConfigured
|
||||
return
|
||||
}
|
||||
|
||||
// Create the link entry. This will contain the connection
|
||||
// in progress (if any), any error details and a context that
|
||||
// lets the link be cancelled later.
|
||||
state = &link{
|
||||
linkType: linkType,
|
||||
linkProto: strings.ToUpper(u.Scheme),
|
||||
kick: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Store the state of the link so that it can be queried later.
|
||||
l._links[info] = state
|
||||
|
||||
// Track how many consecutive connection failures we have had,
|
||||
// as we will back off exponentially rather than hammering the
|
||||
// remote node endlessly.
|
||||
var backoff int
|
||||
|
||||
// backoffNow is called when there's a connection error. It
|
||||
// will wait for the specified amount of time and then return
|
||||
// true, unless the peering context was cancelled (due to a
|
||||
// peer removal most likely), in which case it returns false.
|
||||
// The caller should check the return value to decide whether
|
||||
// or not to give up trying.
|
||||
backoffNow := func() bool {
|
||||
backoff++
|
||||
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
|
||||
select {
|
||||
case <-time.After(duration):
|
||||
return true
|
||||
case <-state.kick:
|
||||
return true
|
||||
case <-l.core.ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// The goroutine is responsible for attempting the connection
|
||||
// and then running the handler. If the connection is persistent
|
||||
// then the loop will run endlessly, using backoffs as needed.
|
||||
// Otherwise the loop will end, cleaning up the link entry.
|
||||
go func() {
|
||||
defer func() {
|
||||
phony.Block(l, func() {
|
||||
if l._links[info] == state {
|
||||
delete(l._links, info)
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
// This loop will run each and every time we want to attempt
|
||||
// a connection to this peer.
|
||||
// TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed
|
||||
for {
|
||||
conn, err := l.connect(u, info, options)
|
||||
if err != nil {
|
||||
if linkType == linkTypePersistent {
|
||||
// If the link is a persistent configured peering,
|
||||
// store information about the connection error so
|
||||
// that we can report it through the admin socket.
|
||||
phony.Block(l, func() {
|
||||
state._conn = nil
|
||||
state._err = err
|
||||
state._errtime = time.Now()
|
||||
})
|
||||
|
||||
// Back off for a bit. If true is returned here, we
|
||||
// can continue onto the next loop iteration to try
|
||||
// the next connection.
|
||||
if backoffNow() {
|
||||
continue
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Ephemeral and incoming connections don't remain
|
||||
// after a connection failure, so exit out of the
|
||||
// loop and clean up the link entry.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// The linkConn wrapper allows us to track the number of
|
||||
// bytes written to and read from this connection without
|
||||
// the help of ironwood.
|
||||
lc := &linkConn{
|
||||
Conn: conn,
|
||||
up: time.Now(),
|
||||
}
|
||||
|
||||
// Update the link state with our newly wrapped connection.
|
||||
// Clear the error state.
|
||||
var doRet bool
|
||||
phony.Block(l, func() {
|
||||
if state._conn != nil {
|
||||
// If a peering has come up in this time, abort this one.
|
||||
doRet = true
|
||||
}
|
||||
state._conn = lc
|
||||
})
|
||||
if doRet {
|
||||
return
|
||||
}
|
||||
|
||||
// Give the connection to the handler. The handler will block
|
||||
// for the lifetime of the connection.
|
||||
if err = l.handler(linkType, options, lc); err != nil && err != io.EOF {
|
||||
l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
|
||||
} else {
|
||||
backoff = 0
|
||||
}
|
||||
|
||||
// The handler has stopped running so the connection is dead,
|
||||
// try to close the underlying socket just in case and then
|
||||
// update the link state.
|
||||
_ = lc.Close()
|
||||
phony.Block(l, func() {
|
||||
state._conn = nil
|
||||
if state._err = err; state._err != nil {
|
||||
state._errtime = time.Now()
|
||||
}
|
||||
})
|
||||
|
||||
// If the link is persistently configured, back off if needed
|
||||
// and then try reconnecting. Otherwise, exit out.
|
||||
if linkType == linkTypePersistent {
|
||||
if backoffNow() {
|
||||
continue
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Ephemeral and incoming connections don't remain
|
||||
// after a connection failure, so exit out of the
|
||||
// loop and clean up the link entry.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// The linkConn wrapper allows us to track the number of
|
||||
// bytes written to and read from this connection without
|
||||
// the help of ironwood.
|
||||
lc := &linkConn{
|
||||
Conn: conn,
|
||||
up: time.Now(),
|
||||
}
|
||||
|
||||
// Update the link state with our newly wrapped connection.
|
||||
// Clear the error state.
|
||||
state.Lock()
|
||||
if state._conn != nil {
|
||||
// If a peering has come up in this time, abort this one.
|
||||
state.Unlock()
|
||||
return
|
||||
}
|
||||
state._conn = lc
|
||||
state.Unlock()
|
||||
|
||||
// Give the connection to the handler. The handler will block
|
||||
// for the lifetime of the connection.
|
||||
if err = l.handler(linkType, options, lc); err != nil && err != io.EOF {
|
||||
l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
|
||||
} else {
|
||||
backoff = 0
|
||||
}
|
||||
|
||||
// The handler has stopped running so the connection is dead,
|
||||
// try to close the underlying socket just in case and then
|
||||
// update the link state.
|
||||
_ = lc.Close()
|
||||
state.Lock()
|
||||
state._conn = nil
|
||||
if state._err = err; state._err != nil {
|
||||
state._errtime = time.Now()
|
||||
}
|
||||
state.Unlock()
|
||||
|
||||
// If the link is persistently configured, back off if needed
|
||||
// and then try reconnecting. Otherwise, exit out.
|
||||
if linkType == linkTypePersistent {
|
||||
if backoffNow() {
|
||||
continue
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}()
|
||||
})
|
||||
return retErr
|
||||
}
|
||||
|
||||
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
|
||||
|
@ -369,43 +378,45 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
|
|||
// If there's an existing link state for this link, get it.
|
||||
// If this node is already connected to us, just drop the
|
||||
// connection. This prevents duplicate peerings.
|
||||
l.Lock()
|
||||
state, ok := l._links[info]
|
||||
if !ok || state == nil {
|
||||
state = &link{
|
||||
linkType: linkTypeIncoming,
|
||||
linkProto: strings.ToUpper(u.Scheme),
|
||||
kick: make(chan struct{}),
|
||||
var lc *linkConn
|
||||
var state *link
|
||||
phony.Block(l, func() {
|
||||
var ok bool
|
||||
state, ok = l._links[info]
|
||||
if !ok || state == nil {
|
||||
state = &link{
|
||||
linkType: linkTypeIncoming,
|
||||
linkProto: strings.ToUpper(u.Scheme),
|
||||
kick: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
}
|
||||
state.Lock()
|
||||
if state._conn != nil {
|
||||
// If a connection has come up in this time, abort
|
||||
// this one.
|
||||
state.Unlock()
|
||||
l.Unlock()
|
||||
if state._conn != nil {
|
||||
// If a connection has come up in this time, abort
|
||||
// this one.
|
||||
return
|
||||
}
|
||||
|
||||
// The linkConn wrapper allows us to track the number of
|
||||
// bytes written to and read from this connection without
|
||||
// the help of ironwood.
|
||||
lc = &linkConn{
|
||||
Conn: conn,
|
||||
up: time.Now(),
|
||||
}
|
||||
|
||||
// Update the link state with our newly wrapped connection.
|
||||
// Clear the error state.
|
||||
state._conn = lc
|
||||
state._err = nil
|
||||
state._errtime = time.Time{}
|
||||
|
||||
// Store the state of the link so that it can be queried later.
|
||||
l._links[info] = state
|
||||
})
|
||||
if lc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// The linkConn wrapper allows us to track the number of
|
||||
// bytes written to and read from this connection without
|
||||
// the help of ironwood.
|
||||
lc := &linkConn{
|
||||
Conn: conn,
|
||||
up: time.Now(),
|
||||
}
|
||||
|
||||
// Update the link state with our newly wrapped connection.
|
||||
// Clear the error state.
|
||||
state._conn = lc
|
||||
state._err = nil
|
||||
state._errtime = time.Time{}
|
||||
state.Unlock()
|
||||
|
||||
// Store the state of the link so that it can be queried later.
|
||||
l._links[info] = state
|
||||
l.Unlock()
|
||||
|
||||
// Give the connection to the handler. The handler will block
|
||||
// for the lifetime of the connection.
|
||||
if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF {
|
||||
|
@ -416,9 +427,11 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
|
|||
// try to close the underlying socket just in case and then
|
||||
// drop the link state.
|
||||
_ = lc.Close()
|
||||
l.Lock()
|
||||
delete(l._links, info)
|
||||
l.Unlock()
|
||||
phony.Block(l, func() {
|
||||
if l._links[info] == state {
|
||||
delete(l._links, info)
|
||||
}
|
||||
})
|
||||
}(conn)
|
||||
}
|
||||
}()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue