Many changes

This commit is contained in:
Alexander Kiryukhin 2019-01-17 07:54:39 +03:00
parent d64ca3bd06
commit c5776ba6a3
No known key found for this signature in database
GPG key ID: 5579837FDBF65965
7 changed files with 165 additions and 46 deletions

View file

@ -1,13 +1,67 @@
# rutina # rutina
Package Rutina (russian "рутина" - ordinary boring everyday work) works like https://godoc.org/golang.org/x/sync/errgroup with small differences: Package Rutina (russian "рутина" - ordinary boring everyday work) is routine orchestrator for your application.
1) propagates context to routines It seems like https://godoc.org/golang.org/x/sync/errgroup with some different:
2) cancels context when any routine ends with any result (not only when error result)
1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`).
2) by default cancels context when any routine ends with any result (not only when error result). Can be configured by option `OptionCancelByError`.
3) already has optional signal handler `ListenOsSignals()`
## When it need? ## When it need?
Usually, when yout program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber). Usually, when your program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber).
## Usage
### New instance
`r := rutina.New()`
or with options (see below):
`r := rutina.New(...Option)` or `r.WithOptions(...Option)`
### Start new routine
```
r.Go(func (ctx context.Context) error {
...do something...
})
```
### Wait routines to complete
```
err := r.Wait()
```
Here err = first error in any routine
## Options
### Usage options
`r := rutina.New(option1, option2, ...)`
or
```
r := rutina.New()
r = r.WithOptions(option1, option2, ...) // Returns new instance of Rutina!
```
### Logger
`rutina.WithLogger(logger log.Logger) Option` or `rutina.WithStdLogger() Option`
### Custom context
`rutina.WithContext(ctx context.Context) Option`
### Cancel only by errors
`rutina.WithCancelByError() Option`
If this option set, rutina doesnt cancel context if routine completed without error.
## Example ## Example
@ -15,7 +69,7 @@ HTTP server with graceful shutdown (`example/http_server.go`):
``` ```
// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) // New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
r, _ := rutina.New() r, _ := rutina.New(rutina.WithStdLogger())
srv := &http.Server{Addr: ":8080"} srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -39,7 +93,7 @@ r.Go(func(ctx context.Context) error {
}) })
// OS signals listener // OS signals listener
r.ListenTermSignals() r.ListenOsSignals()
if err := r.Wait(); err != nil { if err := r.Wait(); err != nil {
log.Fatal(err) log.Fatal(err)

21
example/http_server.go Normal file → Executable file
View file

@ -4,19 +4,15 @@ package main
import ( import (
"context" "context"
"github.com/neonxp/rutina"
"io" "io"
"log" "log"
"net/http" "net/http"
"os"
"os/signal"
"syscall"
"github.com/neonxp/rutina"
) )
func main() { func main() {
// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) // New instance with builtin context. Alternative: r, ctx := rutina.OptionContext(ctx)
r, _ := rutina.New() r, _ := rutina.New(rutina.WithStdLogger())
srv := &http.Server{Addr: ":8080"} srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -40,16 +36,7 @@ func main() {
}) })
// OS signals subscriber // OS signals subscriber
r.Go(func(ctx context.Context) error { r.ListenOsSignals()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
select {
case <-sig:
log.Println("TERM or INT signal received")
case <-ctx.Done():
}
return nil
})
if err := r.Wait(); err != nil { if err := r.Wait(); err != nil {
log.Fatal(err) log.Fatal(err)

0
go.mod Normal file → Executable file
View file

0
go.sum Normal file → Executable file
View file

51
options.go Executable file
View file

@ -0,0 +1,51 @@
package rutina
import (
"context"
"log"
"os"
)
type Option interface {
apply(*Rutina)
}
type OptionContext struct {
Context context.Context
}
func WithContext(context context.Context) *OptionContext {
return &OptionContext{Context: context}
}
func (o OptionContext) apply(r *Rutina) {
ctx, cancel := context.WithCancel(o.Context)
r.ctx = ctx
r.Cancel = cancel
}
type OptionLogger struct {
Logger *log.Logger
}
func WithLogger(logger *log.Logger) *OptionLogger {
return &OptionLogger{Logger: logger}
}
func WithStdLogger() *OptionLogger {
return &OptionLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)}
}
func (o OptionLogger) apply(r *Rutina) {
r.logger = o.Logger
}
type OptionCancelByError struct{}
func WithCancelByError() *OptionCancelByError {
return &OptionCancelByError{}
}
func (OptionCancelByError) apply(r *Rutina) {
r.cancelByError = true
}

View file

@ -2,58 +2,85 @@ package rutina
import ( import (
"context" "context"
"log"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
"syscall" "sync/atomic"
) )
//Rutina is routine manager //Rutina is routine manager
type Rutina struct { type Rutina struct {
ctx context.Context ctx context.Context
cancel func() Cancel func()
wg sync.WaitGroup wg sync.WaitGroup
o sync.Once o sync.Once
err error err error
logger *log.Logger
counter *uint64
cancelByError bool
} }
// New instance with builtin context // New instance with builtin context
func New() (*Rutina, context.Context) { func New(opts ...Option) (*Rutina, context.Context) {
return WithContext(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
return r.WithOptions(opts...), ctx
} }
// WithContext is constructor that takes context from outside func (r *Rutina) WithOptions(opts ...Option) *Rutina {
func WithContext(ctx context.Context) (*Rutina, context.Context) { nr := *r
ctx, cancel := context.WithCancel(ctx) for _, o := range opts {
o.apply(&nr)
return &Rutina{ctx: ctx, cancel: cancel}, ctx }
return &nr
} }
// Go routine // Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error) { func (r *Rutina) Go(doer func(ctx context.Context) error) {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
id := atomic.AddUint64(r.counter, 1)
defer func() { defer func() {
if r.logger != nil {
r.logger.Printf("stopping #%d", id)
}
r.wg.Done() r.wg.Done()
if r.cancel != nil { if !r.cancelByError {
r.cancel() r.Cancel()
} }
}() }()
if r.logger != nil {
r.logger.Printf("starting #%d", id)
}
if err := doer(r.ctx); err != nil { if err := doer(r.ctx); err != nil {
if r.logger != nil {
r.logger.Printf("error at #%d : %v", id, err)
}
r.o.Do(func() { r.o.Do(func() {
r.err = err r.err = err
}) })
if r.cancelByError {
r.Cancel()
}
} }
}() }()
} }
// OS signals handler // OS signals handler
func (r *Rutina) ListenTermSignals() { func (r *Rutina) ListenOsSignals() {
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1) sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) signal.Notify(sig, os.Interrupt, os.Kill)
select { select {
case <-sig: case s := <-sig:
if r.logger != nil {
r.logger.Printf("stopping by OS signal (%v)", s)
}
if r.cancelByError {
r.Cancel()
}
case <-ctx.Done(): case <-ctx.Done():
} }
return nil return nil
@ -63,8 +90,5 @@ func (r *Rutina) ListenTermSignals() {
// Wait all routines and returns first error or nil if all routines completes without errors // Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error { func (r *Rutina) Wait() error {
r.wg.Wait() r.wg.Wait()
if r.cancel != nil {
r.cancel()
}
return r.err return r.err
} }

View file

@ -8,7 +8,10 @@ import (
) )
func TestSuccess(t *testing.T) { func TestSuccess(t *testing.T) {
r, _ := New() r, _ := New(
WithStdLogger(),
WithContext(context.Background()),
)
counter := 0 counter := 0
f := func(name string, ttl time.Duration) error { f := func(name string, ttl time.Duration) error {
counter++ counter++
@ -37,7 +40,7 @@ func TestSuccess(t *testing.T) {
} }
func TestError(t *testing.T) { func TestError(t *testing.T) {
r, _ := New() r, _ := New(WithCancelByError())
f := func(name string, ttl time.Duration) error { f := func(name string, ttl time.Duration) error {
<-time.After(ttl) <-time.After(ttl)
t.Log(name) t.Log(name)