From 32af50e3b988e4db95ce1a08df3fcbf8c7188366 Mon Sep 17 00:00:00 2001 From: dre Date: Mon, 5 Jul 2021 01:32:17 +0800 Subject: [PATCH] adapt to pipeline package --- main.go | 74 +++++++++++++++++++++-------------------------------- pipeline.go | 13 ---------- preproc.go | 10 +++++--- proc.go | 39 ++++++++++++++++++---------- 4 files changed, 60 insertions(+), 76 deletions(-) delete mode 100644 pipeline.go diff --git a/main.go b/main.go index 12f560d..5d0fb91 100644 --- a/main.go +++ b/main.go @@ -2,14 +2,15 @@ package main import ( "bufio" - "bytes" - "encoding/gob" "flag" "fmt" "io" "os" + + "github.com/n0x1m/md2gmi/pipe" ) +/* type WorkItem struct { index int payload []byte @@ -39,6 +40,7 @@ func (w *WorkItem) Payload() []byte { } return tmp } +*/ func reader(in string) (io.Reader, error) { if in != "" { @@ -70,39 +72,27 @@ func write(w io.Writer, b []byte) { fmt.Fprint(w, string(b)) } -type ir struct { - r io.Reader +func source(r io.Reader) pipe.Source { + return func() chan pipe.StreamItem { + data := make(chan pipe.StreamItem) + s := bufio.NewScanner(r) + go func() { + i := 0 + for s.Scan() { + data <- pipe.NewItem(i, s.Bytes()) + i += 1 + } + close(data) + }() + return data + } } -func InputStream(r io.Reader) *ir { - return &ir{r: r} -} - -func (m *ir) Output() chan WorkItem { - data := make(chan WorkItem) - s := bufio.NewScanner(m.r) - go func() { - i := 0 - for s.Scan() { - data <- New(i, s.Bytes()) - i += 1 +func sink(w io.Writer) pipe.Sink { + return func(dest chan pipe.StreamItem) { + for b := range dest { + write(w, b.Payload()) } - close(data) - }() - return data -} - -type ow struct { - w io.Writer -} - -func OutputStream(w io.Writer) *ow { - return &ow{w: w} -} - -func (m *ow) Input(data chan WorkItem) { - for b := range data { - write(m.w, b.Payload()) } } @@ -125,20 +115,14 @@ func main() { os.Exit(1) } - source := InputStream(r) - sink := OutputStream(w) preproc := NewPreproc() //sink.Input(preproc.Process(source.Output())) - sink.Input( - FormatLinks( - FormatHeadings( - RemoveComments( - RemoveFrontMatter( - preproc.Process(source.Output()), - ), - ), - ), - ), - ) + s := pipe.New() + s.Use(preproc.Process) + s.Use(RemoveFrontMatter) + s.Use(RemoveComments) + s.Use(FormatHeadings) + s.Use(FormatLinks) + s.Handle(source(r), sink(w)) } diff --git a/pipeline.go b/pipeline.go deleted file mode 100644 index 2350acb..0000000 --- a/pipeline.go +++ /dev/null @@ -1,13 +0,0 @@ -package main - -type Node interface { - Pipeline(chan []byte) chan []byte -} - -type Source interface { - Output() chan []byte -} - -type Sink interface { - Input(chan []byte) -} diff --git a/preproc.go b/preproc.go index b42bc87..847b2de 100644 --- a/preproc.go +++ b/preproc.go @@ -3,6 +3,8 @@ package main import ( "bytes" "regexp" + + "github.com/n0x1m/md2gmi/pipe" ) // state function @@ -13,7 +15,7 @@ type fsm struct { state stateFn i int - out chan WorkItem + out chan pipe.StreamItem // combining multiple input lines blockBuffer []byte @@ -26,8 +28,8 @@ func NewPreproc() *fsm { return &fsm{} } -func (m *fsm) Process(in chan WorkItem) chan WorkItem { - m.out = make(chan WorkItem) +func (m *fsm) Process(in chan pipe.StreamItem) chan pipe.StreamItem { + m.out = make(chan pipe.StreamItem) go func() { for m.state = normal; m.state != nil; { b, ok := <-in @@ -49,7 +51,7 @@ func (m *fsm) Process(in chan WorkItem) chan WorkItem { func (m *fsm) sync() { if len(m.sendBuffer) > 0 { m.sendBuffer = append(m.sendBuffer, '\n') - m.out <- New(m.i, m.sendBuffer) + m.out <- pipe.NewItem(m.i, m.sendBuffer) m.sendBuffer = m.sendBuffer[:0] m.i += 1 } diff --git a/proc.go b/proc.go index 636f643..20902d6 100644 --- a/proc.go +++ b/proc.go @@ -4,13 +4,24 @@ import ( "bytes" "fmt" "regexp" + + "github.com/n0x1m/md2gmi/pipe" ) -func FormatLinks(in chan WorkItem) chan WorkItem { - out := make(chan WorkItem) +func FormatLinks(in chan pipe.StreamItem) chan pipe.StreamItem { + out := make(chan pipe.StreamItem) go func() { + fenceOn := false for b := range in { - out <- New(b.Index(), formatLinks(b.Payload())) + data := b.Payload() + if isFence(data) { + fenceOn = !fenceOn + } + if fenceOn { + out <- pipe.NewItem(b.Index(), b.Payload()) + } else { + out <- pipe.NewItem(b.Index(), formatLinks(b.Payload())) + } } close(out) }() @@ -37,8 +48,8 @@ func formatLinks(data []byte) []byte { return data } -func RemoveComments(in chan WorkItem) chan WorkItem { - out := make(chan WorkItem) +func RemoveComments(in chan pipe.StreamItem) chan pipe.StreamItem { + out := make(chan pipe.StreamItem) go func() { re := regexp.MustCompile(``) for b := range in { @@ -46,16 +57,16 @@ func RemoveComments(in chan WorkItem) chan WorkItem { for _, match := range re.FindAllSubmatch(data, -1) { data = bytes.Replace(data, match[0], []byte(""), 1) } - out <- New(b.Index(), append(bytes.TrimSpace(data), '\n')) - //out <- New(b.Index(), data) + out <- pipe.NewItem(b.Index(), append(bytes.TrimSpace(data), '\n')) + //out <- pipe.NewItem(b.Index(), data) } close(out) }() return out } -func RemoveFrontMatter(in chan WorkItem) chan WorkItem { - out := make(chan WorkItem) +func RemoveFrontMatter(in chan pipe.StreamItem) chan pipe.StreamItem { + out := make(chan pipe.StreamItem) go func() { re := regexp.MustCompile(`---.*---`) for b := range in { @@ -63,16 +74,16 @@ func RemoveFrontMatter(in chan WorkItem) chan WorkItem { for _, match := range re.FindAllSubmatch(data, -1) { data = bytes.Replace(data, match[0], []byte(""), 1) } - out <- New(b.Index(), append(bytes.TrimSpace(data), '\n')) - //out <- New(b.Index(), data) + out <- pipe.NewItem(b.Index(), append(bytes.TrimSpace(data), '\n')) + //out <- pipe.NewItem(b.Index(), data) } close(out) }() return out } -func FormatHeadings(in chan WorkItem) chan WorkItem { - out := make(chan WorkItem) +func FormatHeadings(in chan pipe.StreamItem) chan pipe.StreamItem { + out := make(chan pipe.StreamItem) go func() { re := regexp.MustCompile(`^[#]{4,}`) re2 := regexp.MustCompile(`^(#+)[^# ]`) @@ -89,7 +100,7 @@ func FormatHeadings(in chan WorkItem) chan WorkItem { data = append(data, '\n') } // writeback - out <- New(b.Index(), data) + out <- pipe.NewItem(b.Index(), data) } close(out)