Initial commit
This commit is contained in:
parent
381df5413c
commit
c582001e89
6 changed files with 259 additions and 1 deletions
2
.gitignore
vendored
Normal file → Executable file
2
.gitignore
vendored
Normal file → Executable file
|
@ -10,3 +10,5 @@
|
|||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
.idea
|
0
LICENSE
Normal file → Executable file
0
LICENSE
Normal file → Executable file
57
README.md
Normal file → Executable file
57
README.md
Normal file → Executable file
|
@ -1 +1,58 @@
|
|||
# rutina
|
||||
|
||||
Package Rutina (russian "рутина" - ordinary boring everyday work) works like https://godoc.org/golang.org/x/sync/errgroup with small differences:
|
||||
|
||||
1) propagates context to routines
|
||||
2) cancels context when any routine ends with any result (not only when error result)
|
||||
|
||||
## When it need?
|
||||
|
||||
Usually, when yout program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber).
|
||||
|
||||
## Example
|
||||
|
||||
HTTP server with graceful shutdown (`example/http_server.go`):
|
||||
|
||||
```
|
||||
// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
|
||||
r := rutina.New()
|
||||
|
||||
srv := &http.Server{Addr: ":8080"}
|
||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
io.WriteString(w, "hello world\n")
|
||||
})
|
||||
|
||||
// Starting http server and listen connections
|
||||
r.Go(func(ctx context.Context) error {
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("Server stopped")
|
||||
return nil
|
||||
})
|
||||
|
||||
// Gracefully stoping server when context canceled
|
||||
r.Go(func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
log.Println("Stopping server...")
|
||||
return srv.Shutdown(ctx)
|
||||
})
|
||||
|
||||
// OS signals subscriber
|
||||
r.Go(func(ctx context.Context) error {
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
|
||||
select {
|
||||
case <-sig:
|
||||
log.Println("TERM or INT signal received")
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := r.Wait(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.Println("All routines successfully stopped")
|
||||
```
|
56
example/http_server.go
Normal file
56
example/http_server.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/neonxp/rutina"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
|
||||
r, _ := rutina.New()
|
||||
|
||||
srv := &http.Server{Addr: ":8080"}
|
||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
io.WriteString(w, "hello world\n")
|
||||
})
|
||||
|
||||
// Starting http server and listen connections
|
||||
r.Go(func(ctx context.Context) error {
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("Server stopped")
|
||||
return nil
|
||||
})
|
||||
|
||||
// Gracefully stoping server when context canceled
|
||||
r.Go(func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
log.Println("Stopping server...")
|
||||
return srv.Shutdown(ctx)
|
||||
})
|
||||
|
||||
// OS signals subscriber
|
||||
r.Go(func(ctx context.Context) error {
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
|
||||
select {
|
||||
case <-sig:
|
||||
log.Println("TERM or INT signal received")
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := r.Wait(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println("All routines successfully stopped")
|
||||
}
|
54
rutina.go
Executable file
54
rutina.go
Executable file
|
@ -0,0 +1,54 @@
|
|||
package rutina
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
//Rutina is routine manager
|
||||
type Rutina struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
wg sync.WaitGroup
|
||||
o sync.Once
|
||||
err error
|
||||
}
|
||||
|
||||
// New instance with builtin context
|
||||
func New() (*Rutina, context.Context) {
|
||||
return WithContext(context.Background())
|
||||
}
|
||||
|
||||
// WithContext is constructor that takes context from outside
|
||||
func WithContext(ctx context.Context) (*Rutina, context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
return &Rutina{ctx: ctx, cancel: cancel}, ctx
|
||||
}
|
||||
|
||||
// Go routine
|
||||
func (r *Rutina) Go(doer func(ctx context.Context) error) {
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
r.wg.Done()
|
||||
if r.cancel != nil {
|
||||
r.cancel()
|
||||
}
|
||||
}()
|
||||
if err := doer(r.ctx); err != nil {
|
||||
r.o.Do(func() {
|
||||
r.err = err
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait all routines and returns first error or nil if all routines completes without errors
|
||||
func (r *Rutina) Wait() error {
|
||||
r.wg.Wait()
|
||||
if r.cancel != nil {
|
||||
r.cancel()
|
||||
}
|
||||
return r.err
|
||||
}
|
89
rutina_test.go
Executable file
89
rutina_test.go
Executable file
|
@ -0,0 +1,89 @@
|
|||
package rutina
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestSuccess(t *testing.T) {
|
||||
r, _ := New()
|
||||
counter := 0
|
||||
f := func(name string, ttl time.Duration) error {
|
||||
counter++
|
||||
<-time.After(ttl)
|
||||
counter--
|
||||
t.Log(name)
|
||||
return nil
|
||||
}
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("one", 1*time.Second)
|
||||
})
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("two", 2*time.Second)
|
||||
})
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("three", 3*time.Second)
|
||||
})
|
||||
if err := r.Wait(); err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if counter == 0 {
|
||||
t.Log("All routines done")
|
||||
} else {
|
||||
t.Error("Not all routines stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
r, _ := New()
|
||||
f := func(name string, ttl time.Duration) error {
|
||||
<-time.After(ttl)
|
||||
t.Log(name)
|
||||
return errors.New("error from " + name)
|
||||
}
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("one", 1*time.Second)
|
||||
})
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("two", 2*time.Second)
|
||||
})
|
||||
r.Go(func(ctx context.Context) error {
|
||||
return f("three", 3*time.Second)
|
||||
})
|
||||
if err := r.Wait(); err != nil {
|
||||
if err.Error() != "error from one" {
|
||||
t.Error("Must be error from first routine")
|
||||
}
|
||||
t.Log(err)
|
||||
}
|
||||
t.Log("All routines done")
|
||||
}
|
||||
|
||||
func TestContext(t *testing.T) {
|
||||
r, _ := New()
|
||||
cc := false
|
||||
r.Go(func(ctx context.Context) error {
|
||||
<-time.After(1 * time.Second)
|
||||
return nil
|
||||
})
|
||||
r.Go(func(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cc = true
|
||||
return nil
|
||||
case <-time.After(3 * time.Second):
|
||||
return errors.New("Timeout")
|
||||
}
|
||||
})
|
||||
if err := r.Wait(); err != nil {
|
||||
t.Error("Unexpected error", err)
|
||||
}
|
||||
if cc {
|
||||
t.Log("Second routine succesfuly complete by context done")
|
||||
} else {
|
||||
t.Error("Routine not completed by context")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue