toy with pipeline setup
This commit is contained in:
parent
2cc007629d
commit
f2b7adba1f
2 changed files with 46 additions and 43 deletions
83
main.go
83
main.go
|
@ -8,28 +8,17 @@ import (
|
|||
"os"
|
||||
)
|
||||
|
||||
func read(in string, data chan<- []byte) error {
|
||||
func reader(in string) (io.Reader, error) {
|
||||
if in != "" {
|
||||
file, err := os.Open(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
s := bufio.NewScanner(file)
|
||||
for s.Scan() {
|
||||
data <- s.Bytes()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return file, nil
|
||||
}
|
||||
|
||||
s := bufio.NewScanner(os.Stdin)
|
||||
for s.Scan() {
|
||||
data <- s.Bytes()
|
||||
}
|
||||
|
||||
return nil
|
||||
return os.Stdin, nil
|
||||
}
|
||||
|
||||
func writer(out string) (io.Writer, error) {
|
||||
|
@ -49,6 +38,27 @@ func write(w io.Writer, b []byte) {
|
|||
fmt.Fprintf(w, string(b))
|
||||
}
|
||||
|
||||
type ir struct {
|
||||
r io.Reader
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func NewIr(r io.Reader, quit chan struct{}) *ir {
|
||||
return &ir{r: r, quit: quit}
|
||||
}
|
||||
|
||||
func (m *ir) Output() chan []byte {
|
||||
data := make(chan []byte)
|
||||
s := bufio.NewScanner(m.r)
|
||||
go func() {
|
||||
for s.Scan() {
|
||||
data <- s.Bytes()
|
||||
}
|
||||
close(m.quit)
|
||||
}()
|
||||
return data
|
||||
}
|
||||
|
||||
type ow struct {
|
||||
w io.Writer
|
||||
quit chan struct{}
|
||||
|
@ -58,7 +68,7 @@ func NewOw(w io.Writer, quit chan struct{}) *ow {
|
|||
return &ow{w: w, quit: quit}
|
||||
}
|
||||
|
||||
func (m *ow) Input(data <-chan []byte) {
|
||||
func (m *ow) Input(data chan []byte) {
|
||||
for {
|
||||
select {
|
||||
case <-m.quit:
|
||||
|
@ -69,20 +79,6 @@ func (m *ow) Input(data <-chan []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
func writesetup(out string, data <-chan []byte, quit chan struct{}) error {
|
||||
w, err := writer(out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("writer: %w", err)
|
||||
}
|
||||
|
||||
sink := NewOw(w, quit)
|
||||
sink.Input(data)
|
||||
|
||||
//NewParser(data, w, quit).Parse()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
var in, out string
|
||||
|
||||
|
@ -91,16 +87,23 @@ func main() {
|
|||
flag.Parse()
|
||||
|
||||
quit := make(chan struct{})
|
||||
data := make(chan []byte)
|
||||
|
||||
go func() {
|
||||
if err := read(in, data); err != nil {
|
||||
fmt.Fprintf(os.Stderr, err.Error())
|
||||
}
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
if err := writesetup(out, data, quit); err != nil {
|
||||
fmt.Fprintf(os.Stderr, err.Error())
|
||||
r, err := reader(in)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
w, err := writer(out)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
source := NewIr(r, quit)
|
||||
sink := NewOw(w, quit)
|
||||
|
||||
sink.Input(source.Output())
|
||||
|
||||
//NewParser(data, w, quit).Parse()
|
||||
}
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
package main
|
||||
|
||||
type Node interface {
|
||||
Pipeline(<-chan []byte) <-chan []byte
|
||||
Pipeline(chan []byte) chan []byte
|
||||
}
|
||||
|
||||
type Source interface {
|
||||
Input() <-chan []byte
|
||||
Output() chan []byte
|
||||
}
|
||||
|
||||
type Sink interface {
|
||||
Output(<-chan []byte)
|
||||
Input(chan []byte)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue