rutina/rutina.go
Alexander Kiryukhin d45d913c9e
Flexible run policies
Options refactoring
2019-03-27 02:44:38 +03:00

148 lines
3 KiB
Go
Executable file

package rutina
import (
"context"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
)
//Rutina is routine manager
type Rutina struct {
ctx context.Context
Cancel func()
wg sync.WaitGroup
o sync.Once
err error
logger *log.Logger
counter *uint64
cancelByError bool
}
// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
return r.With(mixins...)
}
func (r *Rutina) With(mixins ...Mixin) *Rutina {
nr := *r
for _, m := range mixins {
m.apply(&nr)
}
return &nr
}
// Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
onFail := ShutdownIfFail
for _, o := range opts {
switch o {
case ShutdownIfFail:
onFail = ShutdownIfFail
case RestartIfFail:
onFail = RestartIfFail
case DoNothingIfFail:
onFail = DoNothingIfFail
}
}
onDone := DoNothingIfDone
for _, o := range opts {
switch o {
case ShutdownIfDone:
onDone = ShutdownIfDone
case RestartIfDone:
onDone = RestartIfDone
case DoNothingIfDone:
onDone = DoNothingIfDone
}
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
// Check that context is not canceled yet
if r.ctx.Err() != nil {
return
}
id := atomic.AddUint64(r.counter, 1)
if r.logger != nil {
r.logger.Printf("starting #%d", id)
}
if err := doer(r.ctx); err != nil {
if r.logger != nil {
r.logger.Printf("error at #%d : %v", id, err)
}
switch onFail {
case ShutdownIfFail:
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
// Save error only if shutdown all routines
r.o.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
// TODO maybe store errors on restart?
if r.logger != nil {
r.logger.Printf("restarting #%d", id)
}
r.Go(doer, opts...)
case DoNothingIfFail:
// TODO maybe store errors on nothing to do?
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
}
} else {
switch onDone {
case ShutdownIfDone:
if r.logger != nil {
r.logger.Printf("stopping #%d with shutdown", id)
}
r.Cancel()
case RestartIfDone:
if r.logger != nil {
r.logger.Printf("restarting #%d", id)
}
r.Go(doer, opts...)
case DoNothingIfDone:
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
}
}
}()
}
// OS signals handler
func (r *Rutina) ListenOsSignals() {
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
select {
case s := <-sig:
if r.logger != nil {
r.logger.Printf("stopping by OS signal (%v)", s)
}
if r.cancelByError {
r.Cancel()
}
case <-ctx.Done():
}
return nil
}, ShutdownIfDone)
}
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
r.wg.Wait()
return r.err
}