idecnode/pkg/fetcher/fetcher.go

168 lines
3.4 KiB
Go

package fetcher
import (
"context"
"encoding/base64"
"io"
"log"
"net/http"
"strings"
"time"
"gitrepo.ru/neonxp/idecnode/pkg/config"
"gitrepo.ru/neonxp/idecnode/pkg/idec"
)
type Fetcher struct {
idec *idec.IDEC
config *config.Config
client *http.Client
}
func New(i *idec.IDEC, cfg *config.Config) *Fetcher {
return &Fetcher{
idec: i,
config: cfg,
client: &http.Client{
Timeout: 60 * time.Second,
},
}
}
func (f *Fetcher) Run(ctx context.Context) error {
for _, node := range f.config.Fetch {
messagesToDownloads := []string{}
log.Println("fetching", node)
for _, echoID := range node.Echos {
missed, err := f.getMissedEchoMessages(node, echoID)
if err != nil {
return err
}
messagesToDownloads = append(messagesToDownloads, missed...)
}
if err := f.downloadMessages(node, messagesToDownloads); err != nil {
return err
}
if err := f.downloadBlacklist(node); err != nil {
return err
}
}
log.Println("finished")
return nil
}
func (f *Fetcher) downloadMessages(node config.Node, messagesToDownloads []string) error {
var slice []string
for {
limit := min(20, len(messagesToDownloads))
if limit == 0 {
return nil
}
slice, messagesToDownloads = messagesToDownloads[:limit-1], messagesToDownloads[limit:]
if err := f.downloadMessagesChunk(node, slice); err != nil {
return err
}
}
}
func (f *Fetcher) getMissedEchoMessages(node config.Node, echoID string) ([]string, error) {
missed := []string{}
messages, _, err := f.idec.GetMessageIDsByEcho(echoID, 0, 0)
if err != nil {
return nil, err
}
messagesIndex := map[string]struct{}{}
for _, msgID := range messages {
messagesIndex[msgID] = struct{}{}
}
p := formatCommand(node, "u/e", echoID)
resp, err := f.client.Get(p)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if strings.Contains(line, ".") {
// echo name
continue
}
if line == "" {
continue
}
if _, exist := messagesIndex[line]; !exist {
missed = append(missed, line)
}
}
return missed, nil
}
func (f *Fetcher) downloadMessagesChunk(node config.Node, messages []string) error {
p := formatCommand(node, "u/m", messages...)
resp, err := f.client.Get(p)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if line == "" {
continue
}
p := strings.Split(line, ":")
rawMessage, err := base64.StdEncoding.DecodeString(p[1])
if err != nil {
return err
}
if err := f.idec.SaveBundleMessage(p[0], string(rawMessage)); err != nil {
return err
}
}
return nil
}
func (f *Fetcher) downloadBlacklist(node config.Node) error {
p := formatCommand(node, "blacklist.txt")
resp, err := f.client.Get(p)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
lines := strings.Split(string(data), "\n")
return f.idec.MergeBlacklist(lines)
}
func formatCommand(node config.Node, method string, args ...string) string {
segments := []string{node.Addr, method}
segments = append(segments, args...)
p := strings.Join(segments, "/")
return p
}
func min(x, y int) int {
if x < y {
return x
}
return y
}