150 lines
3 KiB
Go
150 lines
3 KiB
Go
|
package fetcher
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/base64"
|
||
|
"io"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"gitrepo.ru/neonxp/idecnode/pkg/config"
|
||
|
"gitrepo.ru/neonxp/idecnode/pkg/idec"
|
||
|
)
|
||
|
|
||
|
type Fetcher struct {
|
||
|
idec *idec.IDEC
|
||
|
config *config.Config
|
||
|
client *http.Client
|
||
|
}
|
||
|
|
||
|
func New(i *idec.IDEC, cfg *config.Config) *Fetcher {
|
||
|
return &Fetcher{
|
||
|
idec: i,
|
||
|
config: cfg,
|
||
|
client: &http.Client{
|
||
|
Timeout: 60 * time.Second,
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (f *Fetcher) Run(ctx context.Context) error {
|
||
|
for _, node := range f.config.Fetch {
|
||
|
messagesToDownloads := []string{}
|
||
|
log.Println("fetching", node)
|
||
|
for _, echoID := range node.Echos {
|
||
|
missed, err := f.getMissedEchoMessages(node, echoID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
messagesToDownloads = append(messagesToDownloads, missed...)
|
||
|
}
|
||
|
if err := f.downloadMessages(node, messagesToDownloads); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
log.Println("finished")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (f *Fetcher) downloadMessages(node config.Node, messagesToDownloads []string) error {
|
||
|
var slice []string
|
||
|
for {
|
||
|
limit := min(20, len(messagesToDownloads))
|
||
|
if limit == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
slice, messagesToDownloads = messagesToDownloads[:limit-1], messagesToDownloads[limit:]
|
||
|
if err := f.downloadMessagesChunk(node, slice); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (f *Fetcher) getMissedEchoMessages(node config.Node, echoID string) ([]string, error) {
|
||
|
missed := []string{}
|
||
|
messages, err := f.idec.GetMessagesByEcho(echoID, 0, 0)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
messagesIndex := map[string]struct{}{}
|
||
|
for _, msgID := range messages {
|
||
|
messagesIndex[msgID] = struct{}{}
|
||
|
}
|
||
|
|
||
|
p := formatCommand(node, "u/e", echoID)
|
||
|
resp, err := f.client.Get(p)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
data, err := io.ReadAll(resp.Body)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
lines := strings.Split(string(data), "\n")
|
||
|
for _, line := range lines {
|
||
|
if strings.Contains(line, ".") {
|
||
|
// echo name
|
||
|
continue
|
||
|
}
|
||
|
if line == "" {
|
||
|
continue
|
||
|
}
|
||
|
if _, exist := messagesIndex[line]; !exist {
|
||
|
missed = append(missed, line)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return missed, nil
|
||
|
}
|
||
|
|
||
|
func (f *Fetcher) downloadMessagesChunk(node config.Node, messages []string) error {
|
||
|
p := formatCommand(node, "u/m", messages...)
|
||
|
|
||
|
resp, err := f.client.Get(p)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
data, err := io.ReadAll(resp.Body)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
lines := strings.Split(string(data), "\n")
|
||
|
|
||
|
for _, line := range lines {
|
||
|
if line == "" {
|
||
|
continue
|
||
|
}
|
||
|
p := strings.Split(line, ":")
|
||
|
|
||
|
rawMessage, err := base64.StdEncoding.DecodeString(p[1])
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if err := f.idec.SaveBundleMessage(p[0], string(rawMessage)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func formatCommand(node config.Node, method string, args ...string) string {
|
||
|
segments := []string{node.Addr, method}
|
||
|
segments = append(segments, args...)
|
||
|
p := strings.Join(segments, "/")
|
||
|
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
func min(x, y int) int {
|
||
|
if x < y {
|
||
|
return x
|
||
|
}
|
||
|
return y
|
||
|
}
|