rutina/rutina.go

165 lines
4.2 KiB
Go
Raw Normal View History

2018-12-05 00:48:33 +03:00
package rutina
import (
"context"
2019-01-17 07:54:39 +03:00
"log"
2019-01-11 17:22:06 +03:00
"os"
"os/signal"
2018-12-05 00:48:33 +03:00
"sync"
2019-01-17 07:54:39 +03:00
"sync/atomic"
"syscall"
2018-12-05 00:48:33 +03:00
)
//Rutina is routine manager
type Rutina struct {
ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines
wg sync.WaitGroup // WaitGroup that wait all routines to complete
onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
onceWait sync.Once // Flag that prevents wait already waited rutina
err error // First error that shutdowns all routines
logger *log.Logger // Optional logger
counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
lifecycleListener LifecycleListener // Optional listener for events
2018-12-05 00:48:33 +03:00
}
// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
2019-01-17 07:54:39 +03:00
ctx, cancel := context.WithCancel(context.Background())
2019-04-04 12:02:49 +03:00
var counter uint64
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
return r.With(mixins...)
2018-12-05 00:48:33 +03:00
}
// With applies mixins
func (r *Rutina) With(mixins ...Mixin) *Rutina {
for _, m := range mixins {
m.apply(r)
2019-01-17 07:54:39 +03:00
}
return r
2018-12-05 00:48:33 +03:00
}
// Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
// Check that context is not canceled yet
if r.ctx.Err() != nil {
return
}
onFail := ShutdownIfFail
for _, o := range opts {
switch o {
case ShutdownIfFail:
onFail = ShutdownIfFail
case RestartIfFail:
onFail = RestartIfFail
case DoNothingIfFail:
onFail = DoNothingIfFail
}
}
onDone := ShutdownIfDone
for _, o := range opts {
switch o {
case ShutdownIfDone:
onDone = ShutdownIfDone
case RestartIfDone:
onDone = RestartIfDone
case DoNothingIfDone:
onDone = DoNothingIfDone
}
}
2018-12-05 00:48:33 +03:00
r.wg.Add(1)
go func() {
defer r.wg.Done()
2019-01-17 07:54:39 +03:00
id := atomic.AddUint64(r.counter, 1)
r.lifecycleEvent(EventRoutineStart, int(id))
2018-12-05 00:48:33 +03:00
if err := doer(r.ctx); err != nil {
r.lifecycleEvent(EventRoutineFail, int(id))
r.lifecycleEvent(EventRoutineStop, int(id))
// errors history
if r.errCh != nil {
r.errCh <- err
2019-01-17 07:54:39 +03:00
}
// region routine failed
switch onFail {
case ShutdownIfFail:
// Save error only if shutdown all routines
r.onceErr.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
r.Go(doer, opts...)
}
// endregion
} else {
r.lifecycleEvent(EventRoutineComplete, int(id))
r.lifecycleEvent(EventRoutineStop, int(id))
// region routine successfully done
switch onDone {
case ShutdownIfDone:
2019-01-17 07:54:39 +03:00
r.Cancel()
case RestartIfDone:
r.Go(doer, opts...)
2019-01-17 07:54:39 +03:00
}
// endregion
2018-12-05 00:48:33 +03:00
}
}()
}
// Errors returns chan for all errors, event if DoNothingIfFail or RestartIfFail set.
// By default it nil. Use MixinErrChan to turn it on
func (r *Rutina) Errors() <-chan error {
return r.errCh
}
// ListenOsSignals is simple OS signals handler. By default listen syscall.SIGINT and syscall.SIGTERM
2019-04-01 11:38:03 +03:00
func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
if len(signals) == 0 {
signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
2019-04-01 11:38:03 +03:00
}
go func() {
2019-01-11 17:22:06 +03:00
sig := make(chan os.Signal, 1)
2019-04-01 11:38:03 +03:00
signal.Notify(sig, signals...)
r.log("starting OS signals listener")
2019-01-11 17:22:06 +03:00
select {
2019-01-17 07:54:39 +03:00
case s := <-sig:
r.log("stopping by OS signal (%v)", s)
r.Cancel()
case <-r.ctx.Done():
2019-01-11 17:22:06 +03:00
}
}()
2019-01-11 17:22:06 +03:00
}
2018-12-05 00:48:33 +03:00
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
r.onceWait.Do(func() {
r.wg.Wait()
r.lifecycleEvent(EventAppStop, 0)
if r.err == nil {
r.lifecycleEvent(EventAppComplete, 0)
} else {
r.lifecycleEvent(EventAppFail, 0)
}
if r.errCh != nil {
close(r.errCh)
}
})
2018-12-05 00:48:33 +03:00
return r.err
}
func (r *Rutina) lifecycleEvent(ev Event, rid int) {
r.log("Event = %s Routine ID = %d", ev.String(), rid)
if r.lifecycleListener != nil {
r.lifecycleListener(ev, rid)
}
}
// Log if can
func (r *Rutina) log(format string, args ...interface{}) {
if r.logger != nil {
r.logger.Printf(format, args...)
}
}