Improvments. Breaking changes

This commit is contained in:
Александр Кирюхин 2022-05-21 20:38:21 +03:00
parent d4708a3665
commit 81389df948
No known key found for this signature in database
GPG key ID: 6DF7A2910D0699E9
12 changed files with 279 additions and 162 deletions

View file

@ -1,3 +1,4 @@
# JSON-RPC 2.0 # JSON-RPC 2.0
Golang implementation of JSON-RPC 2.0 server with generics. Golang implementation of JSON-RPC 2.0 server with generics.
@ -6,58 +7,79 @@ Go 1.18+ required
## Features: ## Features:
- [x] Batch request and responses - [x] HTTP/HTTPS transport
- [x] TCP transport
- [ ] WebSocket transport - [ ] WebSocket transport
## Usage (http transport) ## Usage (http transport)
1. Create JSON-RPC/HTTP server: 1. Create JSON-RPC server:
```go ```go
import "go.neonxp.dev/jsonrpc2/http" import "go.neonxp.dev/jsonrpc2/rpc"
... ...
s := http.New() s := rpc.New()
``` ```
2. Write handler:
```go 2. Add required transport(s):
```go
import "go.neonxp.dev/jsonrpc2/transport"
...
s.AddTransport(&transport.HTTP{Bind: ":8000"})
s.AddTransport(&transport.TCP{Bind: ":3000"})
```
3. Write handler:
```go
func Multiply(ctx context.Context, args *Args) (int, error) { func Multiply(ctx context.Context, args *Args) (int, error) {
return args.A * args.B, nil return args.A * args.B, nil
} }
``` ```
Handler must have exact two arguments (context and input of any json serializable type) and exact two return values (output of any json serializable type and error) Handler must have exact two arguments (context and input of any json serializable type) and exact two return values (output of any json serializable type and error)
3. Wrap handler with `rpc.Wrap` method and register it in server: 3. Wrap handler with `rpc.Wrap` method and register it in server:
```go ```go
s.Register("multiply", rpc.Wrap(Multiply)) s.Register("multiply", rpc.H(Multiply))
``` ```
4. Use server as common http handler:
```go 4. Run RPC server:
http.ListenAndServe(":8000", s) ```go
``` s.Run(ctx)
```
## Custom transport ## Custom transport
See [http/server.go](/http/server.go) for example of transport implementation. Any transport must implement simple interface `transport.Transport`:
```go
type Transport interface {
Run(ctx context.Context, resolver Resolver) error
}
```
## Complete example ## Complete example
[Full code](/examples/http) [Full code](/example)
```go ```go
package main package main
import ( import (
"context" "context"
"net/http"
httpRPC "go.neonxp.dev/jsonrpc2/http"
"go.neonxp.dev/jsonrpc2/rpc" "go.neonxp.dev/jsonrpc2/rpc"
"go.neonxp.dev/jsonrpc2/transport"
) )
func main() { func main() {
s := httpRPC.New() s := rpc.New()
s.Register("multiply", rpc.Wrap(Multiply))
s.Register("divide", rpc.Wrap(Divide))
http.ListenAndServe(":8000", s) s.AddTransport(&transport.HTTP{Bind: ":8000"}) // HTTP transport
s.AddTransport(&transport.TCP{Bind: ":3000"}) // TCP transport
s.Register("multiply", rpc.H(Multiply))
s.Register("divide", rpc.H(Divide))
s.Run(context.Background())
} }
func Multiply(ctx context.Context, args *Args) (int, error) { func Multiply(ctx context.Context, args *Args) (int, error) {
@ -87,3 +109,5 @@ Alexander Kiryukhin <i@neonxp.dev>
## License ## License
![GPL v3](https://www.gnu.org/graphics/gplv3-with-text-136x68.png) ![GPL v3](https://www.gnu.org/graphics/gplv3-with-text-136x68.png)

View file

@ -3,19 +3,29 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"net/http" "log"
"os"
"os/signal"
httpRPC "go.neonxp.dev/jsonrpc2/http"
"go.neonxp.dev/jsonrpc2/rpc" "go.neonxp.dev/jsonrpc2/rpc"
"go.neonxp.dev/jsonrpc2/transport"
) )
func main() { func main() {
s := httpRPC.New() s := rpc.New()
s.Register("multiply", rpc.Wrap(Multiply)) s.AddTransport(&transport.HTTP{Bind: ":8000"})
s.Register("divide", rpc.Wrap(Divide)) s.AddTransport(&transport.TCP{Bind: ":3000"})
http.ListenAndServe(":8000", s) s.Register("multiply", rpc.H(Multiply))
s.Register("divide", rpc.H(Divide))
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
if err := s.Run(ctx); err != nil {
log.Fatal(err)
}
} }
func Multiply(ctx context.Context, args *Args) (int, error) { func Multiply(ctx context.Context, args *Args) (int, error) {

82
example/test.http Normal file
View file

@ -0,0 +1,82 @@
POST http://localhost:8000/
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "multiply",
"params": {
"a": 2,
"b": 3
},
"id": 1
}
date: Sat, 21 May 2022 16:53:31 GMT
content-length: 36
content-type: text/plain; charset=utf-8
connection: close
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 16:53:31 GMT
content-length: 36
content-type: text/plain; charset=utf-8
connection: close
###
POST http://localhost:8000/
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "divide",
"params": {
"a": 10,
"b": 3
},
"id": 2
}
date: Sat, 21 May 2022 16:53:51 GMT
content-length: 52
content-type: text/plain; charset=utf-8
connection: close
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 16:53:51 GMT
content-length: 52
content-type: text/plain; charset=utf-8
connection: close
### Batch request
POST http://localhost:8000/
Content-Type: application/json
{ "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 }
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}
{"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}
{"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"}
{
"jsonrpc": "2.0",
"method": "divide",
"params": {
"a": 10,
"b": 3
},
"id": "divide"
}
{"foo": "boo"}
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}
{"jsonrpc": "2.0", "method": "get_data", "id": "9"}
date: Sat, 21 May 2022 17:18:17 GMT
content-type: text/plain; charset=utf-8
connection: close
transfer-encoding: chunked
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 17:18:17 GMT
content-type: text/plain; charset=utf-8
connection: close
transfer-encoding: chunked

View file

@ -1,42 +0,0 @@
POST http://localhost:8000/
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "multiply",
"params": {
"a": 2,
"b": 3
},
"id": 1
}
###
POST http://localhost:8000/
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "divide",
"params": {
"a": 10,
"b": 3
},
"id": 2
}
###
POST http://localhost:8000/
Content-Type: application/json
[
{ "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 },
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
{"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"},
{"foo": "boo"},
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"},
{"jsonrpc": "2.0", "method": "get_data", "id": "9"}
]

2
go.mod
View file

@ -1,3 +1,5 @@
module go.neonxp.dev/jsonrpc2 module go.neonxp.dev/jsonrpc2
go 1.18 go 1.18
require golang.org/x/sync v0.0.0-20220513210516-0976fa681c29

2
go.sum Normal file
View file

@ -0,0 +1,2 @@
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View file

@ -1,52 +0,0 @@
//Package http provides HTTP transport for JSON-RPC 2.0 server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package http
import (
"bufio"
"net/http"
"go.neonxp.dev/jsonrpc2/rpc"
)
type Server struct {
*rpc.RpcServer
}
func New() *Server {
return &Server{RpcServer: rpc.New()}
}
func (r *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Content-Type", "application/json")
reader := bufio.NewReader(request.Body)
defer request.Body.Close()
firstByte, err := reader.Peek(1)
if err != nil {
r.Logger.Logf("Can't read body: %v", err)
rpc.WriteError(rpc.ErrCodeParseError, writer)
return
}
if string(firstByte) == "[" {
r.BatchRequest(request.Context(), reader, writer)
return
}
r.SingleRequest(request.Context(), reader, writer)
}

View file

@ -24,6 +24,10 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"sync" "sync"
"golang.org/x/sync/errgroup"
"go.neonxp.dev/jsonrpc2/transport"
) )
const version = "2.0" const version = "2.0"
@ -32,6 +36,7 @@ type RpcServer struct {
Logger Logger Logger Logger
IgnoreNotifications bool IgnoreNotifications bool
handlers map[string]Handler handlers map[string]Handler
transports []transport.Transport
mu sync.RWMutex mu sync.RWMutex
} }
@ -40,6 +45,7 @@ func New() *RpcServer {
Logger: nopLogger{}, Logger: nopLogger{},
IgnoreNotifications: true, IgnoreNotifications: true,
handlers: map[string]Handler{}, handlers: map[string]Handler{},
transports: []transport.Transport{},
mu: sync.RWMutex{}, mu: sync.RWMutex{},
} }
} }
@ -50,51 +56,55 @@ func (r *RpcServer) Register(method string, handler Handler) {
r.handlers[method] = handler r.handlers[method] = handler
} }
func (r *RpcServer) SingleRequest(ctx context.Context, reader io.Reader, writer io.Writer) { func (r *RpcServer) AddTransport(transport transport.Transport) {
req := new(rpcRequest) r.transports = append(r.transports, transport)
if err := json.NewDecoder(reader).Decode(req); err != nil {
r.Logger.Logf("Can't read body: %v", err)
WriteError(ErrCodeParseError, writer)
return
}
resp := r.callMethod(ctx, req)
if req.Id == nil && r.IgnoreNotifications {
// notification request
return
}
if err := json.NewEncoder(writer).Encode(resp); err != nil {
r.Logger.Logf("Can't write response: %v", err)
WriteError(ErrCodeInternalError, writer)
return
}
} }
func (r *RpcServer) BatchRequest(ctx context.Context, reader io.Reader, writer io.Writer) { func (r *RpcServer) Run(ctx context.Context) error {
var req []rpcRequest eg, ctx := errgroup.WithContext(ctx)
if err := json.NewDecoder(reader).Decode(&req); err != nil { for _, t := range r.transports {
r.Logger.Logf("Can't read body: %v", err) eg.Go(func(t transport.Transport) func() error {
WriteError(ErrCodeParseError, writer) return func() error { return t.Run(ctx, r) }
return }(t))
} }
var responses []*rpcResponse return eg.Wait()
}
func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) {
dec := json.NewDecoder(rd)
enc := json.NewEncoder(w)
mu := sync.Mutex{}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(req)) for {
for _, j := range req { req := new(rpcRequest)
go func(req rpcRequest) { if err := dec.Decode(req); err != nil {
if err == io.EOF {
break
}
r.Logger.Logf("Can't read body: %v", err)
WriteError(ErrCodeParseError, enc)
break
}
wg.Add(1)
go func(req *rpcRequest) {
defer wg.Done() defer wg.Done()
resp := r.callMethod(ctx, &req) resp := r.callMethod(ctx, req)
if req.Id == nil && r.IgnoreNotifications { if req.Id == nil {
// notification request // notification request
return return
} }
responses = append(responses, resp) mu.Lock()
}(j) defer mu.Unlock()
if err := enc.Encode(resp); err != nil {
r.Logger.Logf("Can't write response: %v", err)
WriteError(ErrCodeInternalError, enc)
}
if w, canFlush := w.(Flusher); canFlush {
w.Flush()
}
}(req)
} }
wg.Wait() wg.Wait()
if err := json.NewEncoder(writer).Encode(responses); err != nil {
r.Logger.Logf("Can't write response: %v", err)
WriteError(ErrCodeInternalError, writer)
}
} }
func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse { func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse {
@ -124,8 +134,8 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons
} }
} }
func WriteError(code int, w io.Writer) { func WriteError(code int, enc *json.Encoder) {
_ = json.NewEncoder(w).Encode(rpcResponse{ enc.Encode(rpcResponse{
Jsonrpc: version, Jsonrpc: version,
Error: NewError(code), Error: NewError(code),
}) })
@ -144,3 +154,8 @@ type rpcResponse struct {
Error error `json:"error,omitempty"` Error error `json:"error,omitempty"`
Id any `json:"id,omitempty"` Id any `json:"id,omitempty"`
} }
type Flusher interface {
// Flush sends any buffered data to the client.
Flush()
}

View file

@ -24,7 +24,7 @@ import (
"encoding/json" "encoding/json"
) )
func Wrap[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler { func H[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler {
return func(ctx context.Context, in json.RawMessage) (json.RawMessage, error) { return func(ctx context.Context, in json.RawMessage) (json.RawMessage, error) {
req := new(RQ) req := new(RQ)
if err := json.Unmarshal(in, req); err != nil { if err := json.Unmarshal(in, req); err != nil {

40
transport/http.go Normal file
View file

@ -0,0 +1,40 @@
package transport
import (
"context"
"crypto/tls"
"net"
"net/http"
)
type HTTP struct {
Bind string
TLS *tls.Config
}
func (h *HTTP) Run(ctx context.Context, resolver Resolver) error {
srv := http.Server{
Addr: h.Bind,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
resolver.Resolve(ctx, r.Body, w)
}),
BaseContext: func(l net.Listener) context.Context {
return ctx
},
TLSConfig: h.TLS,
}
go func() {
<-ctx.Done()
srv.Close()
}()
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
}

22
transport/tcp.go Normal file
View file

@ -0,0 +1,22 @@
package transport
import (
"context"
"net"
)
type TCP struct {
Bind string
}
func (t *TCP) Run(ctx context.Context, resolver Resolver) error {
ln, _ := net.Listen("tcp", t.Bind)
for {
conn, err := ln.Accept()
if err != nil {
return err
}
go resolver.Resolve(ctx, conn, conn)
}
}

14
transport/transport.go Normal file
View file

@ -0,0 +1,14 @@
package transport
import (
"context"
"io"
)
type Transport interface {
Run(ctx context.Context, resolver Resolver) error
}
type Resolver interface {
Resolve(context.Context, io.Reader, io.Writer)
}