diff --git a/README.md b/README.md index e27aa4d..7b01839 100755 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Package Rutina (russian "рутина" - ordinary boring everyday work) is routi It seems like https://godoc.org/golang.org/x/sync/errgroup with some different: 1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`). -2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled +2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled 3) already has optional signal handler `ListenOsSignals()` ## When it need? @@ -21,18 +21,17 @@ Usually, when your program consists of several routines (i.e.: http server, metr With default options: ```go -r := rutina.New(nil) +r := rutina.New() ``` or with custom options: ```go r := rutina.New( - rutina.Opt. - SetParentContext(ctx context.Context). // Pass parent context to Rutina (otherwise it uses own new context) - SetListenOsSignals(listenOsSignals bool). // Auto listen OS signals and close context on Kill, Term signal - SetLogger(l logger). // Pass logger for debug, i.e. `log.Printf` - SetErrors(errCh chan error) // Set errors channel for errors from routines in Restart/DoNothing errors policy + ParentContext(ctx context.Context), // Pass parent context to Rutina (otherwise it uses own new context) + ListenOsSignals(listenOsSignals ...os.Signal), // Auto listen OS signals and close context on Kill, Term signal + Logger(l logger), // Pass logger for debug, i.e. `log.Printf` + Errors(errCh chan error), // Set errors channel for errors from routines in Restart/DoNothing errors policy ) ``` @@ -41,17 +40,21 @@ r := rutina.New( ```go r.Go(func (ctx context.Context) error { ...do something... -}, *runOptions) +}) ``` #### Run Options ```go -RunOpt. - SetOnDone(policy Policy). // Run policy if returns no error - SetOnError(policy Policy). // Run policy if returns error - SetTimeout(timeout time.Duration). // Timeout to routine (after it context will be closed) - SetMaxCount(maxCount int) // Max tries on Restart policy +r.Go( + func (ctx context.Context) error { + ...do something... + }, + SetOnDone(policy Policy), // Run policy if returns no error (default: Shutdown) + SetOnError(policy Policy), // Run policy if returns error (default: Shutdown) + SetTimeout(timeout time.Duration), // Timeout to routine (after it context will be closed) + SetMaxCount(maxCount int), // Max tries on Restart policy +) ``` #### Run policies @@ -64,23 +67,23 @@ RunOpt. ```go r.Go(func(ctx context.Context) error { - // If this routine produce no error - it just completes, other routines not affected + // If this routine produce no error - all other routines will shutdown (because context cancels) // If it returns error - all other routines will shutdown (because context cancels) -}, nil) +},) r.Go(func(ctx context.Context) error { // If this routine produce no error - it restarts // If it returns error - all other routines will shutdown (because context cancels) -}, rutina.RunOpt.SetOnDone(rutina.Restart)) +}, SetOnDone(rutina.Restart)) r.Go(func(ctx context.Context) error { // If this routine produce no error - all other routines will shutdown (because context cancels) - // If it returns error - it will be restarted -}, rutina.RunOpt.SetOnDone(rutina.Shutdown).SetOnError(rutina.Restart)) + // If it returns error - it will be restarted (maximum 10 times) +}, SetOnError(rutina.Restart), SetMaxCount(10)) r.Go(func(ctx context.Context) error { - // If this routine stopped by any case - all other routines will shutdown (because context cancels) -}, rutina.RunOpt.SetOnDone(rutina.Shutdown)) + // If this routine stopped by any case other routines will work as before. +}, SetOnDone(rutina.DoNothing)) r.ListenOsSignals() // Shutdown all routines by OS signal ``` @@ -104,7 +107,7 @@ r.Kill(id) // Closes individual context for #id routine that must shutdown it ### List of routines ```go -list := r.Processes() +list := r.Processes() ``` Returns ids of working routines diff --git a/example/http_server.go b/example/http_server.go index 6bb7157..8b723b9 100755 --- a/example/http_server.go +++ b/example/http_server.go @@ -7,13 +7,14 @@ import ( "io" "log" "net/http" + "os" - "github.com/neonxp/rutina" + "github.com/neonxp/rutina/v3" ) func main() { // New instance with builtin context - r := rutina.New(rutina.Opt.SetListenOsSignals(true)) + r := rutina.New(rutina.ListenOsSignals(os.Interrupt, os.Kill)) srv := &http.Server{Addr: ":8080"} http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -27,14 +28,14 @@ func main() { } log.Println("Server stopped") return nil - }, rutina.RunOpt.SetOnDone(rutina.Shutdown)) + }) // Gracefully stopping server when context canceled r.Go(func(ctx context.Context) error { <-ctx.Done() log.Println("Stopping server...") return srv.Shutdown(ctx) - }, nil) + }) if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/example/policies.go b/example/policies.go index 8bb4643..87081cb 100644 --- a/example/policies.go +++ b/example/policies.go @@ -6,26 +6,27 @@ import ( "context" "errors" "log" + "os" "time" - "github.com/neonxp/rutina" + "github.com/neonxp/rutina/v3" ) func main() { // New instance with builtin context - r := rutina.New(rutina.Opt.SetLogger(log.Printf).SetListenOsSignals(true)) + r := rutina.New(rutina.Logger(log.Printf), rutina.ListenOsSignals(os.Interrupt, os.Kill)) r.Go(func(ctx context.Context) error { <-time.After(1 * time.Second) log.Println("Do something 1 second without errors and restart") return nil - }, nil) + }) r.Go(func(ctx context.Context) error { <-time.After(2 * time.Second) log.Println("Do something 2 seconds without errors and do nothing") return nil - }, nil) + }) r.Go(func(ctx context.Context) error { select { @@ -34,7 +35,7 @@ func main() { case <-ctx.Done(): return nil } - }, rutina.RunOpt.SetOnError(rutina.Restart).SetMaxCount(10)) + }, rutina.OnError(rutina.Restart), rutina.MaxCount(10)) r.Go(func(ctx context.Context) error { select { @@ -43,7 +44,7 @@ func main() { case <-ctx.Done(): return nil } - }, rutina.RunOpt.SetOnError(rutina.Restart).SetTimeout(10*time.Second)) + }, rutina.OnError(rutina.Restart), rutina.SetTimeout(10*time.Second)) if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/options.go b/options.go index 70a9965..9aed7df 100644 --- a/options.go +++ b/options.go @@ -2,41 +2,62 @@ package rutina import ( "context" + "os" "time" ) type Options struct { ParentContext context.Context - ListenOsSignals bool + ListenOsSignals []os.Signal Logger func(format string, v ...interface{}) Errors chan error } -func (o *Options) SetParentContext(ctx context.Context) *Options { - o.ParentContext = ctx - return o +func ParentContext(ctx context.Context) Options { + return Options{ + ParentContext: ctx, + } } -func (o *Options) SetListenOsSignals(listenOsSignals bool) *Options { - o.ListenOsSignals = listenOsSignals - return o +func ListenOsSignals(signals ...os.Signal) Options { + return Options{ + ListenOsSignals: signals, + } } -func (o *Options) SetLogger(l logger) *Options { - o.Logger = l - return o +func Logger(l logger) Options { + return Options{ + Logger: l, + } } -func (o *Options) SetErrors(errCh chan error) *Options { - o.Errors = errCh - return o +func Errors(errCh chan error) Options { + return Options{ + Errors: errCh, + } } -var Opt = &Options{ - ParentContext: context.Background(), - ListenOsSignals: false, - Logger: nil, - Errors: nil, +func composeOptions(opts []Options) Options { + res := Options{ + ParentContext: context.Background(), + Logger: nopLogger, + ListenOsSignals: []os.Signal{}, + } + for _, o := range opts { + if o.ParentContext != nil { + res.ParentContext = o.ParentContext + } + if o.Errors != nil { + res.Errors = o.Errors + } + if o.ListenOsSignals != nil { + res.ListenOsSignals = o.ListenOsSignals + } + if o.Logger != nil { + res.Logger = o.Logger + } + } + return res } type Policy int @@ -54,29 +75,48 @@ type RunOptions struct { MaxCount *int } -func (rp *RunOptions) SetOnDone(policy Policy) *RunOptions { - rp.OnDone = policy - return rp +func OnDone(policy Policy) RunOptions { + return RunOptions{ + OnDone: policy, + } } -func (rp *RunOptions) SetOnError(policy Policy) *RunOptions { - rp.OnError = policy - return rp +func OnError(policy Policy) RunOptions { + return RunOptions{ + OnError: policy, + } } -func (rp *RunOptions) SetTimeout(timeout time.Duration) *RunOptions { - rp.Timeout = &timeout - return rp +func Timeout(timeout time.Duration) RunOptions { + return RunOptions{ + Timeout: &timeout, + } } -func (rp *RunOptions) SetMaxCount(maxCount int) *RunOptions { - rp.MaxCount = &maxCount - return rp +func MaxCount(maxCount int) RunOptions { + return RunOptions{ + MaxCount: &maxCount, + } } -var RunOpt = &RunOptions{ - OnDone: DoNothing, - OnError: Shutdown, - Timeout: nil, - MaxCount: nil, +func composeRunOptions(opts []RunOptions) RunOptions { + res := RunOptions{ + OnDone: Shutdown, + OnError: Shutdown, + } + for _, o := range opts { + if o.OnDone != res.OnDone { + res.OnDone = o.OnDone + } + if o.OnError != res.OnError { + res.OnError = o.OnError + } + if o.MaxCount != nil { + res.MaxCount = o.MaxCount + } + if o.Timeout != nil { + res.Timeout = o.Timeout + } + } + return res } diff --git a/rutina.go b/rutina.go index cd5748e..3cf140c 100755 --- a/rutina.go +++ b/rutina.go @@ -22,7 +22,7 @@ type logger func(format string, v ...interface{}) var nopLogger = func(format string, v ...interface{}) {} -//Rutina is routine manager +// Rutina is routine manager type Rutina struct { ctx context.Context // State of application (started/stopped) Cancel func() // Cancel func that stops all routines @@ -39,19 +39,13 @@ type Rutina struct { } // New instance with builtin context -func New(opts *Options) *Rutina { +func New(opts ...Options) *Rutina { if opts == nil { - opts = Opt + opts = []Options{} } - ctx, cancel := context.WithCancel(opts.ParentContext) + options := composeOptions(opts) + ctx, cancel := context.WithCancel(options.ParentContext) var counter uint64 - if opts.Logger == nil { - opts.Logger = nopLogger - } - var signals []os.Signal - if opts.ListenOsSignals { - signals = []os.Signal{os.Kill, os.Interrupt} - } return &Rutina{ ctx: ctx, Cancel: cancel, @@ -59,20 +53,18 @@ func New(opts *Options) *Rutina { onceErr: sync.Once{}, onceWait: sync.Once{}, err: nil, - logger: opts.Logger, + logger: options.Logger, counter: &counter, - errCh: opts.Errors, - autoListenSignals: signals, + errCh: options.Errors, + autoListenSignals: options.ListenOsSignals, processes: map[uint64]*process{}, mu: sync.Mutex{}, } } // Go routine -func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 { - if opts == nil { - opts = RunOpt - } +func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...RunOptions) uint64 { + options := composeRunOptions(opts) // Check that context is not canceled yet if r.ctx.Err() != nil { return 0 @@ -83,11 +75,11 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint process := process{ id: id, doer: doer, - onDone: opts.OnDone, - onError: opts.OnError, - restartLimit: opts.MaxCount, + onDone: options.OnDone, + onError: options.OnError, + restartLimit: options.MaxCount, restartCount: 0, - timeout: opts.Timeout, + timeout: options.Timeout, } r.processes[id] = &process r.mu.Unlock()