create pipeline package

This commit is contained in:
dre 2021-07-05 01:32:00 +08:00
parent 2d78589836
commit 2ca81a3ef2
3 changed files with 79 additions and 0 deletions

37
pipe/pipe.go Normal file
View file

@ -0,0 +1,37 @@
package pipe
import (
"bytes"
"encoding/gob"
)
type StreamItem struct {
index int
payload []byte
}
func NewItem(index int, payload []byte) StreamItem {
var buf bytes.Buffer
w := StreamItem{index: index}
if err := gob.NewEncoder(&buf).Encode(payload); err != nil {
// assert no broken pipes
panic(err)
}
w.payload = buf.Bytes()
return w
}
func (w *StreamItem) Index() int {
return w.index
}
func (w *StreamItem) Payload() []byte {
var dec []byte
buf := bytes.NewReader(w.payload)
if err := gob.NewDecoder(buf).Decode(&dec); err != nil {
// assert no broken pipes
panic(err)
}
return dec
}

7
pipe/pipeline.go Normal file
View file

@ -0,0 +1,7 @@
package pipe
type Connector chan StreamItem
type Source func() chan StreamItem
type Sink func(chan StreamItem)
type Pipeline func(chan StreamItem) chan StreamItem

35
pipe/stream.go Normal file
View file

@ -0,0 +1,35 @@
package pipe
type Pipelines []Pipeline
type Stream struct {
nodes []Pipeline
}
func New() *Stream {
return &Stream{}
}
// Use appends a pipeline processor to the Stream pipeline stack.
func (s *Stream) Use(nodes ...Pipeline) {
s.nodes = append(s.nodes, nodes...)
}
func Chain(middlewares ...Pipeline) Pipelines {
return Pipelines(middlewares)
}
// chain builds a Connector composed of an inline pipeline stack and endpoint
// processor in the order they are passed.
func chain(nodes []Pipeline, src Connector) Connector {
c := nodes[0](src)
for i := 1; i < len(nodes); i++ {
c = nodes[i](c)
}
return c
}
// Handle registers a source and maps it to a sink.
func (s *Stream) Handle(src Source, dest Sink) {
dest(chain(s.nodes, src()))
}