This commit is contained in:
Alexander Kiryukhin 2021-03-18 17:58:00 +03:00
parent cb67839e3c
commit 14fcf184ae
No known key found for this signature in database
GPG key ID: 6DF7A2910D0699E9
5 changed files with 125 additions and 88 deletions

View file

@ -7,7 +7,7 @@ Package Rutina (russian "рутина" - ordinary boring everyday work) is routi
It seems like https://godoc.org/golang.org/x/sync/errgroup with some different: It seems like https://godoc.org/golang.org/x/sync/errgroup with some different:
1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`). 1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`).
2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled 2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled
3) already has optional signal handler `ListenOsSignals()` 3) already has optional signal handler `ListenOsSignals()`
## When it need? ## When it need?
@ -21,18 +21,17 @@ Usually, when your program consists of several routines (i.e.: http server, metr
With default options: With default options:
```go ```go
r := rutina.New(nil) r := rutina.New()
``` ```
or with custom options: or with custom options:
```go ```go
r := rutina.New( r := rutina.New(
rutina.Opt. ParentContext(ctx context.Context), // Pass parent context to Rutina (otherwise it uses own new context)
SetParentContext(ctx context.Context). // Pass parent context to Rutina (otherwise it uses own new context) ListenOsSignals(listenOsSignals ...os.Signal), // Auto listen OS signals and close context on Kill, Term signal
SetListenOsSignals(listenOsSignals bool). // Auto listen OS signals and close context on Kill, Term signal Logger(l logger), // Pass logger for debug, i.e. `log.Printf`
SetLogger(l logger). // Pass logger for debug, i.e. `log.Printf` Errors(errCh chan error), // Set errors channel for errors from routines in Restart/DoNothing errors policy
SetErrors(errCh chan error) // Set errors channel for errors from routines in Restart/DoNothing errors policy
) )
``` ```
@ -41,17 +40,21 @@ r := rutina.New(
```go ```go
r.Go(func (ctx context.Context) error { r.Go(func (ctx context.Context) error {
...do something... ...do something...
}, *runOptions) })
``` ```
#### Run Options #### Run Options
```go ```go
RunOpt. r.Go(
SetOnDone(policy Policy). // Run policy if returns no error func (ctx context.Context) error {
SetOnError(policy Policy). // Run policy if returns error ...do something...
SetTimeout(timeout time.Duration). // Timeout to routine (after it context will be closed) },
SetMaxCount(maxCount int) // Max tries on Restart policy SetOnDone(policy Policy), // Run policy if returns no error (default: Shutdown)
SetOnError(policy Policy), // Run policy if returns error (default: Shutdown)
SetTimeout(timeout time.Duration), // Timeout to routine (after it context will be closed)
SetMaxCount(maxCount int), // Max tries on Restart policy
)
``` ```
#### Run policies #### Run policies
@ -64,23 +67,23 @@ RunOpt.
```go ```go
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
// If this routine produce no error - it just completes, other routines not affected // If this routine produce no error - all other routines will shutdown (because context cancels)
// If it returns error - all other routines will shutdown (because context cancels) // If it returns error - all other routines will shutdown (because context cancels)
}, nil) },)
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
// If this routine produce no error - it restarts // If this routine produce no error - it restarts
// If it returns error - all other routines will shutdown (because context cancels) // If it returns error - all other routines will shutdown (because context cancels)
}, rutina.RunOpt.SetOnDone(rutina.Restart)) }, SetOnDone(rutina.Restart))
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
// If this routine produce no error - all other routines will shutdown (because context cancels) // If this routine produce no error - all other routines will shutdown (because context cancels)
// If it returns error - it will be restarted // If it returns error - it will be restarted (maximum 10 times)
}, rutina.RunOpt.SetOnDone(rutina.Shutdown).SetOnError(rutina.Restart)) }, SetOnError(rutina.Restart), SetMaxCount(10))
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
// If this routine stopped by any case - all other routines will shutdown (because context cancels) // If this routine stopped by any case other routines will work as before.
}, rutina.RunOpt.SetOnDone(rutina.Shutdown)) }, SetOnDone(rutina.DoNothing))
r.ListenOsSignals() // Shutdown all routines by OS signal r.ListenOsSignals() // Shutdown all routines by OS signal
``` ```
@ -104,7 +107,7 @@ r.Kill(id) // Closes individual context for #id routine that must shutdown it
### List of routines ### List of routines
```go ```go
list := r.Processes() list := r.Processes()
``` ```
Returns ids of working routines Returns ids of working routines

View file

@ -7,13 +7,14 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"os"
"github.com/neonxp/rutina" "github.com/neonxp/rutina/v3"
) )
func main() { func main() {
// New instance with builtin context // New instance with builtin context
r := rutina.New(rutina.Opt.SetListenOsSignals(true)) r := rutina.New(rutina.ListenOsSignals(os.Interrupt, os.Kill))
srv := &http.Server{Addr: ":8080"} srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -27,14 +28,14 @@ func main() {
} }
log.Println("Server stopped") log.Println("Server stopped")
return nil return nil
}, rutina.RunOpt.SetOnDone(rutina.Shutdown)) })
// Gracefully stopping server when context canceled // Gracefully stopping server when context canceled
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
<-ctx.Done() <-ctx.Done()
log.Println("Stopping server...") log.Println("Stopping server...")
return srv.Shutdown(ctx) return srv.Shutdown(ctx)
}, nil) })
if err := r.Wait(); err != nil { if err := r.Wait(); err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -6,26 +6,27 @@ import (
"context" "context"
"errors" "errors"
"log" "log"
"os"
"time" "time"
"github.com/neonxp/rutina" "github.com/neonxp/rutina/v3"
) )
func main() { func main() {
// New instance with builtin context // New instance with builtin context
r := rutina.New(rutina.Opt.SetLogger(log.Printf).SetListenOsSignals(true)) r := rutina.New(rutina.Logger(log.Printf), rutina.ListenOsSignals(os.Interrupt, os.Kill))
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second) <-time.After(1 * time.Second)
log.Println("Do something 1 second without errors and restart") log.Println("Do something 1 second without errors and restart")
return nil return nil
}, nil) })
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
<-time.After(2 * time.Second) <-time.After(2 * time.Second)
log.Println("Do something 2 seconds without errors and do nothing") log.Println("Do something 2 seconds without errors and do nothing")
return nil return nil
}, nil) })
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
select { select {
@ -34,7 +35,7 @@ func main() {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
}, rutina.RunOpt.SetOnError(rutina.Restart).SetMaxCount(10)) }, rutina.OnError(rutina.Restart), rutina.MaxCount(10))
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
select { select {
@ -43,7 +44,7 @@ func main() {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
}, rutina.RunOpt.SetOnError(rutina.Restart).SetTimeout(10*time.Second)) }, rutina.OnError(rutina.Restart), rutina.SetTimeout(10*time.Second))
if err := r.Wait(); err != nil { if err := r.Wait(); err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -2,41 +2,62 @@ package rutina
import ( import (
"context" "context"
"os"
"time" "time"
) )
type Options struct { type Options struct {
ParentContext context.Context ParentContext context.Context
ListenOsSignals bool ListenOsSignals []os.Signal
Logger func(format string, v ...interface{}) Logger func(format string, v ...interface{})
Errors chan error Errors chan error
} }
func (o *Options) SetParentContext(ctx context.Context) *Options { func ParentContext(ctx context.Context) Options {
o.ParentContext = ctx return Options{
return o ParentContext: ctx,
}
} }
func (o *Options) SetListenOsSignals(listenOsSignals bool) *Options { func ListenOsSignals(signals ...os.Signal) Options {
o.ListenOsSignals = listenOsSignals return Options{
return o ListenOsSignals: signals,
}
} }
func (o *Options) SetLogger(l logger) *Options { func Logger(l logger) Options {
o.Logger = l return Options{
return o Logger: l,
}
} }
func (o *Options) SetErrors(errCh chan error) *Options { func Errors(errCh chan error) Options {
o.Errors = errCh return Options{
return o Errors: errCh,
}
} }
var Opt = &Options{ func composeOptions(opts []Options) Options {
ParentContext: context.Background(), res := Options{
ListenOsSignals: false, ParentContext: context.Background(),
Logger: nil, Logger: nopLogger,
Errors: nil, ListenOsSignals: []os.Signal{},
}
for _, o := range opts {
if o.ParentContext != nil {
res.ParentContext = o.ParentContext
}
if o.Errors != nil {
res.Errors = o.Errors
}
if o.ListenOsSignals != nil {
res.ListenOsSignals = o.ListenOsSignals
}
if o.Logger != nil {
res.Logger = o.Logger
}
}
return res
} }
type Policy int type Policy int
@ -54,29 +75,48 @@ type RunOptions struct {
MaxCount *int MaxCount *int
} }
func (rp *RunOptions) SetOnDone(policy Policy) *RunOptions { func OnDone(policy Policy) RunOptions {
rp.OnDone = policy return RunOptions{
return rp OnDone: policy,
}
} }
func (rp *RunOptions) SetOnError(policy Policy) *RunOptions { func OnError(policy Policy) RunOptions {
rp.OnError = policy return RunOptions{
return rp OnError: policy,
}
} }
func (rp *RunOptions) SetTimeout(timeout time.Duration) *RunOptions { func Timeout(timeout time.Duration) RunOptions {
rp.Timeout = &timeout return RunOptions{
return rp Timeout: &timeout,
}
} }
func (rp *RunOptions) SetMaxCount(maxCount int) *RunOptions { func MaxCount(maxCount int) RunOptions {
rp.MaxCount = &maxCount return RunOptions{
return rp MaxCount: &maxCount,
}
} }
var RunOpt = &RunOptions{ func composeRunOptions(opts []RunOptions) RunOptions {
OnDone: DoNothing, res := RunOptions{
OnError: Shutdown, OnDone: Shutdown,
Timeout: nil, OnError: Shutdown,
MaxCount: nil, }
for _, o := range opts {
if o.OnDone != res.OnDone {
res.OnDone = o.OnDone
}
if o.OnError != res.OnError {
res.OnError = o.OnError
}
if o.MaxCount != nil {
res.MaxCount = o.MaxCount
}
if o.Timeout != nil {
res.Timeout = o.Timeout
}
}
return res
} }

View file

@ -22,7 +22,7 @@ type logger func(format string, v ...interface{})
var nopLogger = func(format string, v ...interface{}) {} var nopLogger = func(format string, v ...interface{}) {}
//Rutina is routine manager // Rutina is routine manager
type Rutina struct { type Rutina struct {
ctx context.Context // State of application (started/stopped) ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines Cancel func() // Cancel func that stops all routines
@ -39,19 +39,13 @@ type Rutina struct {
} }
// New instance with builtin context // New instance with builtin context
func New(opts *Options) *Rutina { func New(opts ...Options) *Rutina {
if opts == nil { if opts == nil {
opts = Opt opts = []Options{}
} }
ctx, cancel := context.WithCancel(opts.ParentContext) options := composeOptions(opts)
ctx, cancel := context.WithCancel(options.ParentContext)
var counter uint64 var counter uint64
if opts.Logger == nil {
opts.Logger = nopLogger
}
var signals []os.Signal
if opts.ListenOsSignals {
signals = []os.Signal{os.Kill, os.Interrupt}
}
return &Rutina{ return &Rutina{
ctx: ctx, ctx: ctx,
Cancel: cancel, Cancel: cancel,
@ -59,20 +53,18 @@ func New(opts *Options) *Rutina {
onceErr: sync.Once{}, onceErr: sync.Once{},
onceWait: sync.Once{}, onceWait: sync.Once{},
err: nil, err: nil,
logger: opts.Logger, logger: options.Logger,
counter: &counter, counter: &counter,
errCh: opts.Errors, errCh: options.Errors,
autoListenSignals: signals, autoListenSignals: options.ListenOsSignals,
processes: map[uint64]*process{}, processes: map[uint64]*process{},
mu: sync.Mutex{}, mu: sync.Mutex{},
} }
} }
// Go routine // Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 { func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...RunOptions) uint64 {
if opts == nil { options := composeRunOptions(opts)
opts = RunOpt
}
// Check that context is not canceled yet // Check that context is not canceled yet
if r.ctx.Err() != nil { if r.ctx.Err() != nil {
return 0 return 0
@ -83,11 +75,11 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint
process := process{ process := process{
id: id, id: id,
doer: doer, doer: doer,
onDone: opts.OnDone, onDone: options.OnDone,
onError: opts.OnError, onError: options.OnError,
restartLimit: opts.MaxCount, restartLimit: options.MaxCount,
restartCount: 0, restartCount: 0,
timeout: opts.Timeout, timeout: options.Timeout,
} }
r.processes[id] = &process r.processes[id] = &process
r.mu.Unlock() r.mu.Unlock()