Break out multicast into a separate package

This commit is contained in:
Neil Alexander 2019-03-28 16:13:14 +00:00
parent 03bc7bbcd6
commit 7ea4e9575e
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 103 additions and 71 deletions

View file

@ -36,14 +36,14 @@ type tcp struct {
link *link
reconfigure chan chan error
mutex sync.Mutex // Protecting the below
listeners map[string]*tcpListener
listeners map[string]*TcpListener
calls map[string]struct{}
conns map[linkInfo](chan struct{})
}
type tcpListener struct {
listener net.Listener
stop chan bool
type TcpListener struct {
Listener net.Listener
Stop chan bool
}
// Wrapper function to set additional options for specific connection types.
@ -64,7 +64,7 @@ func (t *tcp) getAddr() *net.TCPAddr {
t.mutex.Lock()
defer t.mutex.Unlock()
for _, l := range t.listeners {
return l.listener.Addr().(*net.TCPAddr)
return l.Listener.Addr().(*net.TCPAddr)
}
return nil
}
@ -76,7 +76,7 @@ func (t *tcp) init(l *link) error {
t.mutex.Lock()
t.calls = make(map[string]struct{})
t.conns = make(map[linkInfo](chan struct{}))
t.listeners = make(map[string]*tcpListener)
t.listeners = make(map[string]*TcpListener)
t.mutex.Unlock()
go func() {
@ -103,7 +103,7 @@ func (t *tcp) init(l *link) error {
t.mutex.Lock()
if listener, ok := t.listeners[d[6:]]; ok {
t.mutex.Unlock()
listener.stop <- true
listener.Stop <- true
} else {
t.mutex.Unlock()
}
@ -129,7 +129,7 @@ func (t *tcp) init(l *link) error {
return nil
}
func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
func (t *tcp) listen(listenaddr string) (*TcpListener, error) {
var err error
ctx := context.Background()
@ -138,9 +138,9 @@ func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
}
listener, err := lc.Listen(ctx, "tcp", listenaddr)
if err == nil {
l := tcpListener{
listener: listener,
stop: make(chan bool),
l := TcpListener{
Listener: listener,
Stop: make(chan bool),
}
go t.listener(&l, listenaddr)
return &l, nil
@ -150,7 +150,7 @@ func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
}
// Runs the listener, which spawns off goroutines for incoming connections.
func (t *tcp) listener(l *tcpListener, listenaddr string) {
func (t *tcp) listener(l *TcpListener, listenaddr string) {
if l == nil {
return
}
@ -158,7 +158,7 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) {
t.mutex.Lock()
if _, isIn := t.listeners[listenaddr]; isIn {
t.mutex.Unlock()
l.listener.Close()
l.Listener.Close()
return
} else {
t.listeners[listenaddr] = l
@ -167,20 +167,20 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) {
// And here we go!
accepted := make(chan bool)
defer func() {
t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String())
l.listener.Close()
t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String())
l.Listener.Close()
t.mutex.Lock()
delete(t.listeners, listenaddr)
t.mutex.Unlock()
}()
t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String())
t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String())
for {
var sock net.Conn
var err error
// Listen in a separate goroutine, as that way it does not block us from
// receiving "stop" events
go func() {
sock, err = l.listener.Accept()
sock, err = l.Listener.Accept()
accepted <- true
}()
// Wait for either an accepted connection, or a message telling us to stop
@ -192,7 +192,7 @@ func (t *tcp) listener(l *tcpListener, listenaddr string) {
return
}
go t.handler(sock, true, nil)
case <-l.stop:
case <-l.Stop:
return
}
}