diff --git a/cmd/mesh/main.go b/cmd/mesh/main.go index cc230075..cad3101a 100644 --- a/cmd/mesh/main.go +++ b/cmd/mesh/main.go @@ -28,15 +28,17 @@ import ( "github.com/RiV-chain/RiV-mesh/src/core" //"github.com/RiV-chain/RiV-mesh/src/ipv6rwc" "github.com/RiV-chain/RiV-mesh/src/multicast" + "github.com/RiV-chain/RiV-mesh/src/restapi" "github.com/RiV-chain/RiV-mesh/src/tun" "github.com/RiV-chain/RiV-mesh/src/version" ) type node struct { - core *core.Core - tun *tun.TunAdapter - multicast *multicast.Multicast - admin *admin.AdminSocket + core *core.Core + tun *tun.TunAdapter + multicast *multicast.Multicast + admin *admin.AdminSocket + rest_server *restapi.RestServer } func setLogLevel(loglevel string, logger *log.Logger) { @@ -262,6 +264,21 @@ func run(args yggArgs, ctx context.Context) { } } + // Setup the REST socket. + { + if n.rest_server, err = restapi.NewRestServer(restapi.RestServerCfg{ + Core: n.core, + Log: logger, + ListenAddress: cfg.HttpAddress, + WwwRoot: cfg.WwwRoot, + ConfigFn: args.useconffile, + }); err != nil { + logger.Errorln(err) + } else { + err = n.rest_server.Serve() + } + } + // Setup the admin socket. { options := []admin.SetupOption{ @@ -275,9 +292,6 @@ func run(args yggArgs, ctx context.Context) { } } - // Start HTTP server - n.admin.StartHttpServer(args.useconffile, cfg) - // Setup the multicast module. { options := []multicast.SetupOption{} diff --git a/src/admin/admin.go b/src/admin/admin.go index bf895993..41fee792 100644 --- a/src/admin/admin.go +++ b/src/admin/admin.go @@ -5,39 +5,25 @@ import ( "errors" "fmt" "net" - "net/http" "net/url" "os" "sort" - "strconv" - "archive/zip" "strings" "time" - "gerace.dev/zipfs" - - "github.com/RiV-chain/RiV-mesh/src/config" "github.com/RiV-chain/RiV-mesh/src/core" - "github.com/RiV-chain/RiV-mesh/src/defaults" ) // TODO: Add authentication -type ServerEvent struct { - event string - data string -} - type AdminSocket struct { - core *core.Core - log core.Logger - listener net.Listener - handlers map[string]handler - done chan struct{} - serverEvents chan ServerEvent - serverEventNextId int - config struct { + core *core.Core + log core.Logger + listener net.Listener + handlers map[string]handler + done chan struct{} + config struct { listenaddr ListenAddress } } @@ -112,7 +98,6 @@ func New(c *core.Core, log core.Logger, opts ...SetupOption) (*AdminSocket, erro return res, nil }) a.done = make(chan struct{}) - a.serverEvents = make(chan ServerEvent) go a.listen() return a, a.core.SetAdmin(a) } @@ -246,209 +231,6 @@ func (a *AdminSocket) SetupAdminHandlers() { //_ = a.AddHandler("debug_remoteGetDHT", []string{"key"}, t.proto.getDHTHandler) } -// Start runs http server -func (a *AdminSocket) StartHttpServer(configFn string, nc *config.NodeConfig) { - if nc.HttpAddress != "none" && nc.HttpAddress != "" && nc.WwwRoot != "none" && nc.WwwRoot != "" { - u, err := url.Parse(nc.HttpAddress) - if err != nil { - a.log.Errorln("An error occurred parsing http address:", err) - return - } - addNoCacheHeaders := func(w http.ResponseWriter) { - w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Add("Pragma", "no-cache") - w.Header().Add("Expires", "0") - } - http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Following methods are allowed: getself, getpeers. litening"+u.Host) - }) - http.HandleFunc("/api/self", func(w http.ResponseWriter, r *http.Request) { - addNoCacheHeaders(w) - switch r.Method { - case "GET": - w.Header().Add("Content-Type", "application/json") - req := &GetSelfRequest{} - res := &GetSelfResponse{} - if err := a.getSelfHandler(req, res); err != nil { - http.Error(w, err.Error(), 503) - } - b, err := json.Marshal(res) - if err != nil { - http.Error(w, err.Error(), 503) - } - fmt.Fprint(w, string(b[:])) - default: - http.Error(w, "Method Not Allowed", 405) - } - }) - http.HandleFunc("/api/peers", func(w http.ResponseWriter, r *http.Request) { - var handleDelete = func() error { - err := a.core.RemovePeers() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return err - } - var handlePost = func() error { - var peers []string - err := json.NewDecoder(r.Body).Decode(&peers) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return err - } - - for _, peer := range peers { - if err := a.core.AddPeer(peer, ""); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return err - } - } - - if len(configFn) > 0 { - saveHeaders := r.Header["Riv-Save-Config"] - if len(saveHeaders) > 0 && saveHeaders[0] == "true" { - cfg, err := defaults.ReadConfig(configFn) - if err == nil { - cfg.Peers = peers - err := defaults.WriteConfig(configFn, cfg) - if err != nil { - a.log.Errorln("Config file read error:", err) - } - } else { - a.log.Errorln("Config file read error:", err) - } - } - } - return nil - } - - addNoCacheHeaders(w) - switch r.Method { - case "GET": - w.Header().Add("Content-Type", "application/json") - req := &GetPeersRequest{} - res := &GetPeersResponse{} - - if err := a.getPeersHandler(req, res); err != nil { - http.Error(w, err.Error(), 503) - } - b, err := json.Marshal(res.Peers) - if err != nil { - http.Error(w, err.Error(), 503) - } - fmt.Fprint(w, string(b[:])) - case "POST": - handlePost() - case "PUT": - if handleDelete() == nil { - if handlePost() == nil { - http.Error(w, "No content", http.StatusNoContent) - } - } - case "DELETE": - if handleDelete() == nil { - http.Error(w, "No content", http.StatusNoContent) - } - default: - http.Error(w, "Method Not Allowed", 405) - } - }) - http.HandleFunc("/api/ping", func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "POST": - peer_list := []string{} - - err := json.NewDecoder(r.Body).Decode(&peer_list) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - go a.ping(peer_list) - http.Error(w, "Accepted", http.StatusAccepted) - default: - http.Error(w, "Method Not Allowed", 405) - } - }) - - http.HandleFunc("/api/sse", func(w http.ResponseWriter, r *http.Request) { - addNoCacheHeaders(w) - switch r.Method { - case "GET": - w.Header().Add("Content-Type", "text/event-stream") - Loop: - for { - select { - case v := <-a.serverEvents: - fmt.Fprintln(w, "id:", a.serverEventNextId) - fmt.Fprintln(w, "event:", v.event) - fmt.Fprintln(w, "data:", v.data) - fmt.Fprintln(w) //end of event - a.serverEventNextId += 1 - default: - break Loop - } - } - default: - http.Error(w, "Method Not Allowed", 405) - } - }) - - var docFs = "" - pakReader, err := zip.OpenReader(nc.WwwRoot) - if err == nil { - defer pakReader.Close() - fs, err := zipfs.NewZipFileSystem(&pakReader.Reader, zipfs.ServeIndexForMissing()) - if err == nil { - http.Handle("/", http.FileServer(fs)) - docFs = "zipfs" - } - } - if docFs == "" { - var nocache = func(fs http.Handler) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - addNoCacheHeaders(w) - fs.ServeHTTP(w, r) - } - } - http.Handle("/", nocache(http.FileServer(http.Dir(nc.WwwRoot)))) - docFs = "local fs" - } - l, e := net.Listen("tcp4", u.Host) - if e != nil { - a.log.Errorf("Http server start error: %s\n", e) - } else { - a.log.Infof("Http server is listening on %s and is supplied from %s %s\n", nc.HttpAddress, docFs, nc.WwwRoot) - } - go func() { - a.log.Errorln(http.Serve(l, nil)) - }() - } -} - -func (a *AdminSocket) ping(peers []string) { - for _, u := range peers { - go func(u string) { - data, _ := json.Marshal(map[string]string{"peer": u, "value": strconv.FormatInt(check(u), 10)}) - a.serverEvents <- ServerEvent{event: "ping", data: string(data)} - }(u) - } -} - -func check(peer string) int64 { - u, e := url.Parse(peer) - if e != nil { - return -1 - } - t := time.Now() - _, err := net.DialTimeout("tcp", u.Host, 5*time.Second) - if err != nil { - return -1 - } - d := time.Since(t) - return d.Milliseconds() -} - // IsStarted returns true if the module has been started. func (a *AdminSocket) IsStarted() bool { select { diff --git a/src/restapi/rest_server.go b/src/restapi/rest_server.go new file mode 100644 index 00000000..bb24eb95 --- /dev/null +++ b/src/restapi/rest_server.go @@ -0,0 +1,292 @@ +package restapi + +import ( + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "sort" + "strconv" + + "archive/zip" + "time" + + "gerace.dev/zipfs" + + "github.com/RiV-chain/RiV-mesh/src/core" + "github.com/RiV-chain/RiV-mesh/src/defaults" + "github.com/RiV-chain/RiV-mesh/src/version" +) + +type ServerEvent struct { + Event string + Data string +} + +type RestServerCfg struct { + Core *core.Core + Log core.Logger + ListenAddress string + WwwRoot string + ConfigFn string +} + +type RestServer struct { + RestServerCfg + listenUrl *url.URL + serverEvents chan ServerEvent + serverEventNextId int + docFsType string +} + +func NewRestServer(cfg RestServerCfg) (*RestServer, error) { + a := &RestServer{ + RestServerCfg: cfg, + serverEvents: make(chan ServerEvent), + serverEventNextId: 0, + } + if cfg.ListenAddress == "none" || cfg.ListenAddress == "" { + return nil, errors.New("listening address isn't configured") + } + + var err error + a.listenUrl, err = url.Parse(cfg.ListenAddress) + if err != nil { + return nil, fmt.Errorf("an error occurred parsing http address: %w", err) + } + + pakReader, err := zip.OpenReader(cfg.WwwRoot) + if err == nil { + defer pakReader.Close() + fs, err := zipfs.NewZipFileSystem(&pakReader.Reader, zipfs.ServeIndexForMissing()) + if err == nil { + http.Handle("/", http.FileServer(fs)) + a.docFsType = "zipfs" + } + } + if a.docFsType == "" { + var nocache = func(fs http.Handler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + addNoCacheHeaders(w) + fs.ServeHTTP(w, r) + } + } + http.Handle("/", nocache(http.FileServer(http.Dir(cfg.WwwRoot)))) + a.docFsType = "local fs" + } + + http.HandleFunc("/api", a.apiHandler) + http.HandleFunc("/api/self", a.apiSelfHandler) + http.HandleFunc("/api/peers", a.apiPeersHandler) + http.HandleFunc("/api/ping", a.apiPingHandler) + http.HandleFunc("/api/sse", a.apiSseHandler) + + return a, nil +} + +// Start http server +func (a *RestServer) Serve() error { + l, e := net.Listen("tcp4", a.listenUrl.Host) + if e != nil { + return fmt.Errorf("http server start error: %w", e) + } else { + a.Log.Infof("Http server is listening on %s and is supplied from %s %s\n", a.ListenAddress, a.docFsType, a.WwwRoot) + } + go func() { + a.Log.Errorln(http.Serve(l, nil)) + }() + return nil +} + +func addNoCacheHeaders(w http.ResponseWriter) { + w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Add("Pragma", "no-cache") + w.Header().Add("Expires", "0") +} + +func (a *RestServer) apiHandler(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Following methods are allowed: GET /api/self, getpeers. litening") +} + +func (a *RestServer) apiSelfHandler(w http.ResponseWriter, r *http.Request) { + addNoCacheHeaders(w) + switch r.Method { + case "GET": + w.Header().Add("Content-Type", "application/json") + self := a.Core.GetSelf() + snet := a.Core.Subnet() + var result = map[string]interface{}{ + "build_name": a.Core.GetSelf(), + "build_version": version.BuildVersion(), + "key": hex.EncodeToString(self.Key[:]), + "private_key": hex.EncodeToString(self.PrivateKey[:]), + "address": a.Core.Address().String(), + "coords": snet.String(), + "subnet": self.Coords, + } + b, err := json.Marshal(result) + if err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + } + fmt.Fprint(w, string(b[:])) + default: + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + } +} + +func (a *RestServer) apiPeersHandler(w http.ResponseWriter, r *http.Request) { + var handleDelete = func() error { + err := a.Core.RemovePeers() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return err + } + var handlePost = func() error { + var peers []string + err := json.NewDecoder(r.Body).Decode(&peers) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return err + } + + for _, peer := range peers { + if err := a.Core.AddPeer(peer, ""); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return err + } + } + + if len(a.ConfigFn) > 0 { + saveHeaders := r.Header["Riv-Save-Config"] + if len(saveHeaders) > 0 && saveHeaders[0] == "true" { + cfg, err := defaults.ReadConfig(a.ConfigFn) + if err == nil { + cfg.Peers = peers + err := defaults.WriteConfig(a.ConfigFn, cfg) + if err != nil { + a.Log.Errorln("Config file read error:", err) + } + } else { + a.Log.Errorln("Config file read error:", err) + } + } + } + return nil + } + + addNoCacheHeaders(w) + switch r.Method { + case "GET": + w.Header().Add("Content-Type", "application/json") + + peers := a.Core.GetPeers() + response := make([]map[string]interface{}, 0, len(peers)) + for _, p := range peers { + addr := a.Core.AddrForKey(p.Key) + response = append(response, map[string]interface{}{ + "address": net.IP(addr[:]).String(), + "key": hex.EncodeToString(p.Key), + "port": p.Port, + "priority": uint64(p.Priority), // can't be uint8 thanks to gobind + "coords": p.Coords, + "remote": p.Remote, + "bytes_recvd": p.RXBytes, + "bytes_sent": p.TXBytes, + "uptime": p.Uptime.Seconds(), + }) + } + sort.SliceStable(response, func(i, j int) bool { + if response[i]["port"].(uint64) == response[j]["port"].(uint64) { + return response[i]["priority"].(uint64) < response[j]["priority"].(uint64) + } + return response[i]["port"].(uint64) < response[j]["port"].(uint64) + }) + b, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + fmt.Fprint(w, string(b[:])) + case "POST": + handlePost() + case "PUT": + if handleDelete() == nil { + if handlePost() == nil { + http.Error(w, "No content", http.StatusNoContent) + } + } + case "DELETE": + if handleDelete() == nil { + http.Error(w, "No content", http.StatusNoContent) + } + default: + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + } +} + +func (a *RestServer) apiPingHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + peer_list := []string{} + + err := json.NewDecoder(r.Body).Decode(&peer_list) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + go a.ping(peer_list) + http.Error(w, "Accepted", http.StatusAccepted) + default: + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + } +} + +func (a *RestServer) apiSseHandler(w http.ResponseWriter, r *http.Request) { + addNoCacheHeaders(w) + switch r.Method { + case "GET": + w.Header().Add("Content-Type", "text/event-stream") + Loop: + for { + select { + case v := <-a.serverEvents: + fmt.Fprintln(w, "id:", a.serverEventNextId) + fmt.Fprintln(w, "event:", v.Event) + fmt.Fprintln(w, "data:", v.Data) + fmt.Fprintln(w) //end of event + a.serverEventNextId += 1 + default: + break Loop + } + } + default: + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + } +} + +func (a *RestServer) ping(peers []string) { + for _, u := range peers { + go func(u string) { + data, _ := json.Marshal(map[string]string{"peer": u, "value": strconv.FormatInt(check(u), 10)}) + a.serverEvents <- ServerEvent{Event: "ping", Data: string(data)} + }(u) + } +} + +func check(peer string) int64 { + u, e := url.Parse(peer) + if e != nil { + return -1 + } + t := time.Now() + _, err := net.DialTimeout("tcp", u.Host, 5*time.Second) + if err != nil { + return -1 + } + d := time.Since(t) + return d.Milliseconds() +}