adapt to pipeline package

This commit is contained in:
dre 2021-07-05 01:32:17 +08:00
parent 2ca81a3ef2
commit 32af50e3b9
4 changed files with 60 additions and 76 deletions

74
main.go
View file

@ -2,14 +2,15 @@ package main
import ( import (
"bufio" "bufio"
"bytes"
"encoding/gob"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"os" "os"
"github.com/n0x1m/md2gmi/pipe"
) )
/*
type WorkItem struct { type WorkItem struct {
index int index int
payload []byte payload []byte
@ -39,6 +40,7 @@ func (w *WorkItem) Payload() []byte {
} }
return tmp return tmp
} }
*/
func reader(in string) (io.Reader, error) { func reader(in string) (io.Reader, error) {
if in != "" { if in != "" {
@ -70,39 +72,27 @@ func write(w io.Writer, b []byte) {
fmt.Fprint(w, string(b)) fmt.Fprint(w, string(b))
} }
type ir struct { func source(r io.Reader) pipe.Source {
r io.Reader return func() chan pipe.StreamItem {
data := make(chan pipe.StreamItem)
s := bufio.NewScanner(r)
go func() {
i := 0
for s.Scan() {
data <- pipe.NewItem(i, s.Bytes())
i += 1
}
close(data)
}()
return data
}
} }
func InputStream(r io.Reader) *ir { func sink(w io.Writer) pipe.Sink {
return &ir{r: r} return func(dest chan pipe.StreamItem) {
} for b := range dest {
write(w, b.Payload())
func (m *ir) Output() chan WorkItem {
data := make(chan WorkItem)
s := bufio.NewScanner(m.r)
go func() {
i := 0
for s.Scan() {
data <- New(i, s.Bytes())
i += 1
} }
close(data)
}()
return data
}
type ow struct {
w io.Writer
}
func OutputStream(w io.Writer) *ow {
return &ow{w: w}
}
func (m *ow) Input(data chan WorkItem) {
for b := range data {
write(m.w, b.Payload())
} }
} }
@ -125,20 +115,14 @@ func main() {
os.Exit(1) os.Exit(1)
} }
source := InputStream(r)
sink := OutputStream(w)
preproc := NewPreproc() preproc := NewPreproc()
//sink.Input(preproc.Process(source.Output())) //sink.Input(preproc.Process(source.Output()))
sink.Input( s := pipe.New()
FormatLinks( s.Use(preproc.Process)
FormatHeadings( s.Use(RemoveFrontMatter)
RemoveComments( s.Use(RemoveComments)
RemoveFrontMatter( s.Use(FormatHeadings)
preproc.Process(source.Output()), s.Use(FormatLinks)
), s.Handle(source(r), sink(w))
),
),
),
)
} }

View file

@ -1,13 +0,0 @@
package main
type Node interface {
Pipeline(chan []byte) chan []byte
}
type Source interface {
Output() chan []byte
}
type Sink interface {
Input(chan []byte)
}

View file

@ -3,6 +3,8 @@ package main
import ( import (
"bytes" "bytes"
"regexp" "regexp"
"github.com/n0x1m/md2gmi/pipe"
) )
// state function // state function
@ -13,7 +15,7 @@ type fsm struct {
state stateFn state stateFn
i int i int
out chan WorkItem out chan pipe.StreamItem
// combining multiple input lines // combining multiple input lines
blockBuffer []byte blockBuffer []byte
@ -26,8 +28,8 @@ func NewPreproc() *fsm {
return &fsm{} return &fsm{}
} }
func (m *fsm) Process(in chan WorkItem) chan WorkItem { func (m *fsm) Process(in chan pipe.StreamItem) chan pipe.StreamItem {
m.out = make(chan WorkItem) m.out = make(chan pipe.StreamItem)
go func() { go func() {
for m.state = normal; m.state != nil; { for m.state = normal; m.state != nil; {
b, ok := <-in b, ok := <-in
@ -49,7 +51,7 @@ func (m *fsm) Process(in chan WorkItem) chan WorkItem {
func (m *fsm) sync() { func (m *fsm) sync() {
if len(m.sendBuffer) > 0 { if len(m.sendBuffer) > 0 {
m.sendBuffer = append(m.sendBuffer, '\n') m.sendBuffer = append(m.sendBuffer, '\n')
m.out <- New(m.i, m.sendBuffer) m.out <- pipe.NewItem(m.i, m.sendBuffer)
m.sendBuffer = m.sendBuffer[:0] m.sendBuffer = m.sendBuffer[:0]
m.i += 1 m.i += 1
} }

39
proc.go
View file

@ -4,13 +4,24 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"regexp" "regexp"
"github.com/n0x1m/md2gmi/pipe"
) )
func FormatLinks(in chan WorkItem) chan WorkItem { func FormatLinks(in chan pipe.StreamItem) chan pipe.StreamItem {
out := make(chan WorkItem) out := make(chan pipe.StreamItem)
go func() { go func() {
fenceOn := false
for b := range in { for b := range in {
out <- New(b.Index(), formatLinks(b.Payload())) data := b.Payload()
if isFence(data) {
fenceOn = !fenceOn
}
if fenceOn {
out <- pipe.NewItem(b.Index(), b.Payload())
} else {
out <- pipe.NewItem(b.Index(), formatLinks(b.Payload()))
}
} }
close(out) close(out)
}() }()
@ -37,8 +48,8 @@ func formatLinks(data []byte) []byte {
return data return data
} }
func RemoveComments(in chan WorkItem) chan WorkItem { func RemoveComments(in chan pipe.StreamItem) chan pipe.StreamItem {
out := make(chan WorkItem) out := make(chan pipe.StreamItem)
go func() { go func() {
re := regexp.MustCompile(`<!--.*-->`) re := regexp.MustCompile(`<!--.*-->`)
for b := range in { for b := range in {
@ -46,16 +57,16 @@ func RemoveComments(in chan WorkItem) chan WorkItem {
for _, match := range re.FindAllSubmatch(data, -1) { for _, match := range re.FindAllSubmatch(data, -1) {
data = bytes.Replace(data, match[0], []byte(""), 1) data = bytes.Replace(data, match[0], []byte(""), 1)
} }
out <- New(b.Index(), append(bytes.TrimSpace(data), '\n')) out <- pipe.NewItem(b.Index(), append(bytes.TrimSpace(data), '\n'))
//out <- New(b.Index(), data) //out <- pipe.NewItem(b.Index(), data)
} }
close(out) close(out)
}() }()
return out return out
} }
func RemoveFrontMatter(in chan WorkItem) chan WorkItem { func RemoveFrontMatter(in chan pipe.StreamItem) chan pipe.StreamItem {
out := make(chan WorkItem) out := make(chan pipe.StreamItem)
go func() { go func() {
re := regexp.MustCompile(`---.*---`) re := regexp.MustCompile(`---.*---`)
for b := range in { for b := range in {
@ -63,16 +74,16 @@ func RemoveFrontMatter(in chan WorkItem) chan WorkItem {
for _, match := range re.FindAllSubmatch(data, -1) { for _, match := range re.FindAllSubmatch(data, -1) {
data = bytes.Replace(data, match[0], []byte(""), 1) data = bytes.Replace(data, match[0], []byte(""), 1)
} }
out <- New(b.Index(), append(bytes.TrimSpace(data), '\n')) out <- pipe.NewItem(b.Index(), append(bytes.TrimSpace(data), '\n'))
//out <- New(b.Index(), data) //out <- pipe.NewItem(b.Index(), data)
} }
close(out) close(out)
}() }()
return out return out
} }
func FormatHeadings(in chan WorkItem) chan WorkItem { func FormatHeadings(in chan pipe.StreamItem) chan pipe.StreamItem {
out := make(chan WorkItem) out := make(chan pipe.StreamItem)
go func() { go func() {
re := regexp.MustCompile(`^[#]{4,}`) re := regexp.MustCompile(`^[#]{4,}`)
re2 := regexp.MustCompile(`^(#+)[^# ]`) re2 := regexp.MustCompile(`^(#+)[^# ]`)
@ -89,7 +100,7 @@ func FormatHeadings(in chan WorkItem) chan WorkItem {
data = append(data, '\n') data = append(data, '\n')
} }
// writeback // writeback
out <- New(b.Index(), data) out <- pipe.NewItem(b.Index(), data)
} }
close(out) close(out)