- Mixin with errors channel
Changed:
- Default run policy now `ShutdownIfDone` && `ShutdownIfFail`
Fixed:
- Fixed OS signals listener
- Removed some dead code
This commit is contained in:
Alexander Kiryukhin 2019-04-03 23:55:11 +03:00
parent 1772990500
commit 05212e50c9
No known key found for this signature in database
GPG key ID: 5579837FDBF65965
5 changed files with 85 additions and 50 deletions

View file

@ -49,7 +49,9 @@ Available options of run policy:
Default policy:
`ShutdownIfFail` && `DoNothingIfDone`
`ShutdownIfFail` && `ShutdownIfDone`
(just like [errgroup](https://godoc.org/golang.org/x/sync/errgroup))
#### Example of run policies
@ -100,11 +102,11 @@ r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina!
### Logger
```go
r.With(rutina.WithStdLogger())
r = r.With(rutina.WithStdLogger())
```
or
```go
r.With(rutina.WithLogger(logger log.Logger))
r = r.With(rutina.WithLogger(logger log.Logger))
```
Sets standard or custom logger. By default there is no logger.
@ -112,11 +114,20 @@ Sets standard or custom logger. By default there is no logger.
### Custom context
```go
r.With(rutina.WithContext(ctx context.Context))
r = r.With(rutina.WithContext(ctx context.Context))
````
Propagates your own context to Rutina. By default it use own context.
### Errors channel
```go
errChan := make(chan error)
r = r.With(rutina.WithErrChan(errChan))
```
This channel will receive all errors from all routines with any `...Fail` run policy.
## Example
HTTP server with graceful shutdown [`example/http_server.go`](https://github.com/NeonXP/rutina/blob/master/example/http_server.go)

View file

@ -14,6 +14,10 @@ func main() {
// New instance with builtin context
r := rutina.New()
errsChan := make(chan error, 1)
r = r.With(rutina.WithErrChan(errsChan))
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
log.Println("Do something 1 second without errors and restart")
@ -24,18 +28,18 @@ func main() {
<-time.After(2 * time.Second)
log.Println("Do something 2 seconds without errors and do nothing")
return nil
}, rutina.ShutdownIfFail)
}, rutina.DoNothingIfDone, rutina.ShutdownIfFail)
r.Go(func(ctx context.Context) error {
<-time.After(3 * time.Second)
log.Println("Do something 3 seconds with error and restart")
return errors.New("Error!")
return errors.New("Error #1!")
}, rutina.RestartIfFail)
r.Go(func(ctx context.Context) error {
<-time.After(4 * time.Second)
log.Println("Do something 4 seconds with error and do nothing")
return errors.New("Error!")
return errors.New("Error #2!")
}, rutina.DoNothingIfFail)
r.Go(func(ctx context.Context) error {
@ -44,6 +48,18 @@ func main() {
return errors.New("Successfully shutdown at proper place")
}, rutina.ShutdownIfFail)
r.Go(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
log.Println("Shutdown chan listener")
return nil
case err := <-errsChan:
log.Printf("Error in chan: %v", err)
}
}
})
// OS signals subscriber
r.ListenOsSignals()

2
go.mod
View file

@ -1 +1,3 @@
module github.com/neonxp/rutina
go 1.12

View file

@ -39,3 +39,15 @@ func WithStdLogger() *MixinLogger {
func (o MixinLogger) apply(r *Rutina) {
r.logger = o.Logger
}
type MixinErrChan struct {
errCh chan error
}
func WithErrChan(errCh chan error) *MixinErrChan {
return &MixinErrChan{errCh: errCh}
}
func (o MixinErrChan) apply(r *Rutina) {
r.errCh = o.errCh
}

View file

@ -7,25 +7,26 @@ import (
"os/signal"
"sync"
"sync/atomic"
"syscall"
)
//Rutina is routine manager
type Rutina struct {
ctx context.Context
Cancel func()
wg sync.WaitGroup
o sync.Once
err error
logger *log.Logger
counter *uint64
cancelByError bool
ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines
wg sync.WaitGroup // WaitGroup that wait all routines to complete
o sync.Once // Flag that prevents overwrite first error that shutdowns all routines
err error // First error that shutdowns all routines
logger *log.Logger // Optional logger
counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
}
// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
return r.With(mixins...)
}
@ -50,7 +51,7 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
onFail = DoNothingIfFail
}
}
onDone := DoNothingIfDone
onDone := ShutdownIfDone
for _, o := range opts {
switch o {
case ShutdownIfDone:
@ -70,52 +71,42 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
return
}
id := atomic.AddUint64(r.counter, 1)
if r.logger != nil {
r.logger.Printf("starting #%d", id)
}
r.log("starting #%d", id)
if err := doer(r.ctx); err != nil {
if r.logger != nil {
r.logger.Printf("error at #%d : %v", id, err)
// errors history
if r.errCh != nil {
r.errCh <- err
}
// region routine failed
r.log("error at #%d : %v", id, err)
switch onFail {
case ShutdownIfFail:
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
r.log("stopping #%d", id)
// Save error only if shutdown all routines
r.o.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
// TODO maybe store errors on restart?
if r.logger != nil {
r.logger.Printf("restarting #%d", id)
}
r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfFail:
// TODO maybe store errors on nothing to do?
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
r.log("stopping #%d", id)
}
// endregion
} else {
// region routine successfully done
switch onDone {
case ShutdownIfDone:
if r.logger != nil {
r.logger.Printf("stopping #%d with shutdown", id)
}
r.log("stopping #%d with shutdown", id)
r.Cancel()
case RestartIfDone:
if r.logger != nil {
r.logger.Printf("restarting #%d", id)
}
r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfDone:
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
r.log("stopping #%d", id)
}
// endregion
}
}()
@ -124,23 +115,19 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
// OS signals handler
func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
if len(signals) == 0 {
signals = []os.Signal{os.Kill, os.Interrupt}
signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, signals...)
r.log("starting OS signals listener")
select {
case s := <-sig:
if r.logger != nil {
r.logger.Printf("stopping by OS signal (%v)", s)
}
if r.cancelByError {
r.Cancel()
}
r.log("stopping by OS signal (%v)", s)
case <-ctx.Done():
}
return nil
}, ShutdownIfDone)
}, ShutdownIfDone, ShutdownIfFail)
}
// Wait all routines and returns first error or nil if all routines completes without errors
@ -148,3 +135,10 @@ func (r *Rutina) Wait() error {
r.wg.Wait()
return r.err
}
// Log if can
func (r *Rutina) log(format string, args ...interface{}) {
if r.logger != nil {
r.logger.Printf(format, args...)
}
}