From 81389df9484c28dfcec1cf7592b8d0f8b7e4e8e1 Mon Sep 17 00:00:00 2001 From: Alexander Kiryukhin Date: Sat, 21 May 2022 20:38:21 +0300 Subject: [PATCH] Improvments. Breaking changes --- README.md | 72 ++++++++++++++++-------- {examples/http => example}/main.go | 22 ++++++-- example/test.http | 82 +++++++++++++++++++++++++++ examples/http/test.http | 42 -------------- go.mod | 2 + go.sum | 2 + http/server.go | 52 ----------------- rpc/server.go | 89 +++++++++++++++++------------- rpc/wrapper.go | 2 +- transport/http.go | 40 ++++++++++++++ transport/tcp.go | 22 ++++++++ transport/transport.go | 14 +++++ 12 files changed, 279 insertions(+), 162 deletions(-) rename {examples/http => example}/main.go (57%) create mode 100644 example/test.http delete mode 100644 examples/http/test.http create mode 100644 go.sum delete mode 100644 http/server.go create mode 100644 transport/http.go create mode 100644 transport/tcp.go create mode 100644 transport/transport.go diff --git a/README.md b/README.md index 5938fdf..708675e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + # JSON-RPC 2.0 Golang implementation of JSON-RPC 2.0 server with generics. @@ -6,58 +7,79 @@ Go 1.18+ required ## Features: -- [x] Batch request and responses +- [x] HTTP/HTTPS transport +- [x] TCP transport - [ ] WebSocket transport ## Usage (http transport) -1. Create JSON-RPC/HTTP server: - ```go - import "go.neonxp.dev/jsonrpc2/http" +1. Create JSON-RPC server: +```go + import "go.neonxp.dev/jsonrpc2/rpc" ... - s := http.New() - ``` -2. Write handler: - ```go + s := rpc.New() +``` + +2. Add required transport(s): +```go + import "go.neonxp.dev/jsonrpc2/transport" + ... + s.AddTransport(&transport.HTTP{Bind: ":8000"}) + s.AddTransport(&transport.TCP{Bind: ":3000"}) +``` + +3. Write handler: +```go func Multiply(ctx context.Context, args *Args) (int, error) { return args.A * args.B, nil } - ``` +``` + Handler must have exact two arguments (context and input of any json serializable type) and exact two return values (output of any json serializable type and error) 3. Wrap handler with `rpc.Wrap` method and register it in server: - ```go - s.Register("multiply", rpc.Wrap(Multiply)) - ``` -4. Use server as common http handler: - ```go - http.ListenAndServe(":8000", s) - ``` +```go + s.Register("multiply", rpc.H(Multiply)) +``` + +4. Run RPC server: +```go + s.Run(ctx) +``` ## Custom transport -See [http/server.go](/http/server.go) for example of transport implementation. +Any transport must implement simple interface `transport.Transport`: + +```go +type Transport interface { + Run(ctx context.Context, resolver Resolver) error +} +``` ## Complete example -[Full code](/examples/http) +[Full code](/example) ```go package main import ( "context" - "net/http" - httpRPC "go.neonxp.dev/jsonrpc2/http" "go.neonxp.dev/jsonrpc2/rpc" + "go.neonxp.dev/jsonrpc2/transport" ) func main() { - s := httpRPC.New() - s.Register("multiply", rpc.Wrap(Multiply)) - s.Register("divide", rpc.Wrap(Divide)) + s := rpc.New() - http.ListenAndServe(":8000", s) + s.AddTransport(&transport.HTTP{Bind: ":8000"}) // HTTP transport + s.AddTransport(&transport.TCP{Bind: ":3000"}) // TCP transport + + s.Register("multiply", rpc.H(Multiply)) + s.Register("divide", rpc.H(Divide)) + + s.Run(context.Background()) } func Multiply(ctx context.Context, args *Args) (int, error) { @@ -87,3 +109,5 @@ Alexander Kiryukhin ## License ![GPL v3](https://www.gnu.org/graphics/gplv3-with-text-136x68.png) + + diff --git a/examples/http/main.go b/example/main.go similarity index 57% rename from examples/http/main.go rename to example/main.go index 5c24867..9f25e61 100644 --- a/examples/http/main.go +++ b/example/main.go @@ -3,19 +3,29 @@ package main import ( "context" "errors" - "net/http" + "log" + "os" + "os/signal" - httpRPC "go.neonxp.dev/jsonrpc2/http" "go.neonxp.dev/jsonrpc2/rpc" + "go.neonxp.dev/jsonrpc2/transport" ) func main() { - s := httpRPC.New() + s := rpc.New() - s.Register("multiply", rpc.Wrap(Multiply)) - s.Register("divide", rpc.Wrap(Divide)) + s.AddTransport(&transport.HTTP{Bind: ":8000"}) + s.AddTransport(&transport.TCP{Bind: ":3000"}) - http.ListenAndServe(":8000", s) + s.Register("multiply", rpc.H(Multiply)) + s.Register("divide", rpc.H(Divide)) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + if err := s.Run(ctx); err != nil { + log.Fatal(err) + } } func Multiply(ctx context.Context, args *Args) (int, error) { diff --git a/example/test.http b/example/test.http new file mode 100644 index 0000000..7ed134a --- /dev/null +++ b/example/test.http @@ -0,0 +1,82 @@ +POST http://localhost:8000/ +Content-Type: application/json +{ + "jsonrpc": "2.0", + "method": "multiply", + "params": { + "a": 2, + "b": 3 + }, + "id": 1 +} + + +date: Sat, 21 May 2022 16:53:31 GMT +content-length: 36 +content-type: text/plain; charset=utf-8 +connection: close + +HTTP/1.1 200 - OK +date: Sat, 21 May 2022 16:53:31 GMT +content-length: 36 +content-type: text/plain; charset=utf-8 +connection: close + + +### +POST http://localhost:8000/ +Content-Type: application/json +{ + "jsonrpc": "2.0", + "method": "divide", + "params": { + "a": 10, + "b": 3 + }, + "id": 2 +} + + +date: Sat, 21 May 2022 16:53:51 GMT +content-length: 52 +content-type: text/plain; charset=utf-8 +connection: close + +HTTP/1.1 200 - OK +date: Sat, 21 May 2022 16:53:51 GMT +content-length: 52 +content-type: text/plain; charset=utf-8 +connection: close + + +### Batch request +POST http://localhost:8000/ +Content-Type: application/json + { "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 } + {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"} + {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]} + {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"} + { + "jsonrpc": "2.0", + "method": "divide", + "params": { + "a": 10, + "b": 3 + }, + "id": "divide" + } + {"foo": "boo"} + {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"} + {"jsonrpc": "2.0", "method": "get_data", "id": "9"} + + +date: Sat, 21 May 2022 17:18:17 GMT +content-type: text/plain; charset=utf-8 +connection: close +transfer-encoding: chunked + +HTTP/1.1 200 - OK +date: Sat, 21 May 2022 17:18:17 GMT +content-type: text/plain; charset=utf-8 +connection: close +transfer-encoding: chunked \ No newline at end of file diff --git a/examples/http/test.http b/examples/http/test.http deleted file mode 100644 index d4e68b3..0000000 --- a/examples/http/test.http +++ /dev/null @@ -1,42 +0,0 @@ -POST http://localhost:8000/ -Content-Type: application/json - -{ - "jsonrpc": "2.0", - "method": "multiply", - "params": { - "a": 2, - "b": 3 - }, - "id": 1 -} - -### - -POST http://localhost:8000/ -Content-Type: application/json - -{ - "jsonrpc": "2.0", - "method": "divide", - "params": { - "a": 10, - "b": 3 - }, - "id": 2 -} - -### - -POST http://localhost:8000/ -Content-Type: application/json - -[ - { "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 }, - {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, - {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}, - {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"}, - {"foo": "boo"}, - {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}, - {"jsonrpc": "2.0", "method": "get_data", "id": "9"} -] \ No newline at end of file diff --git a/go.mod b/go.mod index c419096..f5af9b1 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module go.neonxp.dev/jsonrpc2 go 1.18 + +require golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7569778 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/http/server.go b/http/server.go deleted file mode 100644 index 9fae0a0..0000000 --- a/http/server.go +++ /dev/null @@ -1,52 +0,0 @@ -//Package http provides HTTP transport for JSON-RPC 2.0 server -// -//Copyright (C) 2022 Alexander Kiryukhin -// -//This file is part of go.neonxp.dev/jsonrpc2 project. -// -//This program is free software: you can redistribute it and/or modify -//it under the terms of the GNU General Public License as published by -//the Free Software Foundation, either version 3 of the License, or -//(at your option) any later version. -// -//This program is distributed in the hope that it will be useful, -//but WITHOUT ANY WARRANTY; without even the implied warranty of -//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//GNU General Public License for more details. -// -//You should have received a copy of the GNU General Public License -//along with this program. If not, see . - -package http - -import ( - "bufio" - "net/http" - - "go.neonxp.dev/jsonrpc2/rpc" -) - -type Server struct { - *rpc.RpcServer -} - -func New() *Server { - return &Server{RpcServer: rpc.New()} -} - -func (r *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - writer.Header().Set("Content-Type", "application/json") - reader := bufio.NewReader(request.Body) - defer request.Body.Close() - firstByte, err := reader.Peek(1) - if err != nil { - r.Logger.Logf("Can't read body: %v", err) - rpc.WriteError(rpc.ErrCodeParseError, writer) - return - } - if string(firstByte) == "[" { - r.BatchRequest(request.Context(), reader, writer) - return - } - r.SingleRequest(request.Context(), reader, writer) -} diff --git a/rpc/server.go b/rpc/server.go index 4fa004d..1bb15d5 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -24,6 +24,10 @@ import ( "encoding/json" "io" "sync" + + "golang.org/x/sync/errgroup" + + "go.neonxp.dev/jsonrpc2/transport" ) const version = "2.0" @@ -32,6 +36,7 @@ type RpcServer struct { Logger Logger IgnoreNotifications bool handlers map[string]Handler + transports []transport.Transport mu sync.RWMutex } @@ -40,6 +45,7 @@ func New() *RpcServer { Logger: nopLogger{}, IgnoreNotifications: true, handlers: map[string]Handler{}, + transports: []transport.Transport{}, mu: sync.RWMutex{}, } } @@ -50,51 +56,55 @@ func (r *RpcServer) Register(method string, handler Handler) { r.handlers[method] = handler } -func (r *RpcServer) SingleRequest(ctx context.Context, reader io.Reader, writer io.Writer) { - req := new(rpcRequest) - if err := json.NewDecoder(reader).Decode(req); err != nil { - r.Logger.Logf("Can't read body: %v", err) - WriteError(ErrCodeParseError, writer) - return - } - resp := r.callMethod(ctx, req) - if req.Id == nil && r.IgnoreNotifications { - // notification request - return - } - if err := json.NewEncoder(writer).Encode(resp); err != nil { - r.Logger.Logf("Can't write response: %v", err) - WriteError(ErrCodeInternalError, writer) - return - } +func (r *RpcServer) AddTransport(transport transport.Transport) { + r.transports = append(r.transports, transport) } -func (r *RpcServer) BatchRequest(ctx context.Context, reader io.Reader, writer io.Writer) { - var req []rpcRequest - if err := json.NewDecoder(reader).Decode(&req); err != nil { - r.Logger.Logf("Can't read body: %v", err) - WriteError(ErrCodeParseError, writer) - return +func (r *RpcServer) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + for _, t := range r.transports { + eg.Go(func(t transport.Transport) func() error { + return func() error { return t.Run(ctx, r) } + }(t)) } - var responses []*rpcResponse + return eg.Wait() +} + +func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { + dec := json.NewDecoder(rd) + enc := json.NewEncoder(w) + mu := sync.Mutex{} wg := sync.WaitGroup{} - wg.Add(len(req)) - for _, j := range req { - go func(req rpcRequest) { + for { + req := new(rpcRequest) + if err := dec.Decode(req); err != nil { + if err == io.EOF { + break + } + r.Logger.Logf("Can't read body: %v", err) + WriteError(ErrCodeParseError, enc) + break + } + wg.Add(1) + go func(req *rpcRequest) { defer wg.Done() - resp := r.callMethod(ctx, &req) - if req.Id == nil && r.IgnoreNotifications { + resp := r.callMethod(ctx, req) + if req.Id == nil { // notification request return } - responses = append(responses, resp) - }(j) + mu.Lock() + defer mu.Unlock() + if err := enc.Encode(resp); err != nil { + r.Logger.Logf("Can't write response: %v", err) + WriteError(ErrCodeInternalError, enc) + } + if w, canFlush := w.(Flusher); canFlush { + w.Flush() + } + }(req) } wg.Wait() - if err := json.NewEncoder(writer).Encode(responses); err != nil { - r.Logger.Logf("Can't write response: %v", err) - WriteError(ErrCodeInternalError, writer) - } } func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse { @@ -124,8 +134,8 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons } } -func WriteError(code int, w io.Writer) { - _ = json.NewEncoder(w).Encode(rpcResponse{ +func WriteError(code int, enc *json.Encoder) { + enc.Encode(rpcResponse{ Jsonrpc: version, Error: NewError(code), }) @@ -144,3 +154,8 @@ type rpcResponse struct { Error error `json:"error,omitempty"` Id any `json:"id,omitempty"` } + +type Flusher interface { + // Flush sends any buffered data to the client. + Flush() +} diff --git a/rpc/wrapper.go b/rpc/wrapper.go index bfcd381..9b4de28 100644 --- a/rpc/wrapper.go +++ b/rpc/wrapper.go @@ -24,7 +24,7 @@ import ( "encoding/json" ) -func Wrap[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler { +func H[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler { return func(ctx context.Context, in json.RawMessage) (json.RawMessage, error) { req := new(RQ) if err := json.Unmarshal(in, req); err != nil { diff --git a/transport/http.go b/transport/http.go new file mode 100644 index 0000000..663bb31 --- /dev/null +++ b/transport/http.go @@ -0,0 +1,40 @@ +package transport + +import ( + "context" + "crypto/tls" + "net" + "net/http" +) + +type HTTP struct { + Bind string + TLS *tls.Config +} + +func (h *HTTP) Run(ctx context.Context, resolver Resolver) error { + srv := http.Server{ + Addr: h.Bind, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + resolver.Resolve(ctx, r.Body, w) + }), + BaseContext: func(l net.Listener) context.Context { + return ctx + }, + TLSConfig: h.TLS, + } + go func() { + <-ctx.Done() + srv.Close() + }() + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + return err + } + return nil +} diff --git a/transport/tcp.go b/transport/tcp.go new file mode 100644 index 0000000..2ab946a --- /dev/null +++ b/transport/tcp.go @@ -0,0 +1,22 @@ +package transport + +import ( + "context" + "net" +) + +type TCP struct { + Bind string +} + +func (t *TCP) Run(ctx context.Context, resolver Resolver) error { + ln, _ := net.Listen("tcp", t.Bind) + + for { + conn, err := ln.Accept() + if err != nil { + return err + } + go resolver.Resolve(ctx, conn, conn) + } +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 0000000..2a54295 --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,14 @@ +package transport + +import ( + "context" + "io" +) + +type Transport interface { + Run(ctx context.Context, resolver Resolver) error +} + +type Resolver interface { + Resolve(context.Context, io.Reader, io.Writer) +}