Middlewares and options

This commit is contained in:
Александр Кирюхин 2022-05-22 16:37:48 +03:00
parent c74596c6a6
commit 4a81eff217
No known key found for this signature in database
GPG key ID: 6DF7A2910D0699E9
10 changed files with 253 additions and 91 deletions

View file

@ -13,19 +13,29 @@ Go 1.18+ required
## Usage (http transport) ## Usage (http transport)
1. Create JSON-RPC server: 1. Create JSON-RPC server with options:
```go ```go
import "go.neonxp.dev/jsonrpc2/rpc" import "go.neonxp.dev/jsonrpc2/rpc"
... ...
s := rpc.New() s := rpc.New(
rpc.WithTransport(&transport.HTTP{
Bind: ":8000", // Port to bind
CORSOrigin: "*", // CORS origin
TLS: &tls.Config{}, // Optional TLS config (default nil)
Parallel: true, // Allow parallel run batch methods (default false)
}),
//Other options like transports/middlewares...
)
``` ```
2. Add required transport(s): 2. Add required transport(s):
```go ```go
import "go.neonxp.dev/jsonrpc2/transport" import "go.neonxp.dev/jsonrpc2/transport"
... ...
s.AddTransport(&transport.HTTP{Bind: ":8000"}) s.Use(
s.AddTransport(&transport.TCP{Bind: ":3000"}) rpc.WithTransport(&transport.TCP{Bind: ":3000"}),
//...
)
``` ```
3. Write handler: 3. Write handler:
@ -72,10 +82,16 @@ import (
) )
func main() { func main() {
s := rpc.New() s := rpc.New(
rpc.WithLogger(rpc.StdLogger), // Optional logger
rpc.WithTransport(&transport.HTTP{Bind: ":8000"}), // HTTP transport
)
s.AddTransport(&transport.HTTP{Bind: ":8000"}) // HTTP transport // Set options after constructor
s.AddTransport(&transport.TCP{Bind: ":3000"}) // TCP transport s.Use(
rpc.WithTransport(&transport.TCP{Bind: ":3000"}), // TCP transport
rpc.WithMiddleware(rpc.LoggerMiddleware(rpc.StdLogger)), // Logger middleware
)
s.Register("multiply", rpc.H(Multiply)) s.Register("multiply", rpc.H(Multiply))
s.Register("divide", rpc.H(Divide)) s.Register("divide", rpc.H(Divide))

View file

@ -12,10 +12,15 @@ import (
) )
func main() { func main() {
s := rpc.New() s := rpc.New(
rpc.WithLogger(rpc.StdLogger),
s.AddTransport(&transport.HTTP{Bind: ":8000"}) rpc.WithTransport(&transport.HTTP{Bind: ":8000", CORSOrigin: "*"}),
s.AddTransport(&transport.TCP{Bind: ":3000"}) )
// Set options after constructor
s.Use(
rpc.WithTransport(&transport.TCP{Bind: ":3000"}),
rpc.WithMiddleware(rpc.LoggerMiddleware(rpc.StdLogger)),
)
s.Register("multiply", rpc.H(Multiply)) s.Register("multiply", rpc.H(Multiply))
s.Register("divide", rpc.H(Divide)) s.Register("divide", rpc.H(Divide))

View file

@ -11,18 +11,6 @@ Content-Type: application/json
} }
date: Sat, 21 May 2022 16:53:31 GMT
content-length: 36
content-type: text/plain; charset=utf-8
connection: close
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 16:53:31 GMT
content-length: 36
content-type: text/plain; charset=utf-8
connection: close
### ###
POST http://localhost:8000/ POST http://localhost:8000/
Content-Type: application/json Content-Type: application/json
@ -37,18 +25,6 @@ Content-Type: application/json
} }
date: Sat, 21 May 2022 16:53:51 GMT
content-length: 52
content-type: text/plain; charset=utf-8
connection: close
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 16:53:51 GMT
content-length: 52
content-type: text/plain; charset=utf-8
connection: close
### Batch request ### Batch request
POST http://localhost:8000/ POST http://localhost:8000/
Content-Type: application/json Content-Type: application/json
@ -68,15 +44,3 @@ Content-Type: application/json
{"foo": "boo"} {"foo": "boo"}
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"} {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}
{"jsonrpc": "2.0", "method": "get_data", "id": "9"} {"jsonrpc": "2.0", "method": "get_data", "id": "9"}
date: Sat, 21 May 2022 17:18:17 GMT
content-type: text/plain; charset=utf-8
connection: close
transfer-encoding: chunked
HTTP/1.1 200 - OK
date: Sat, 21 May 2022 17:18:17 GMT
content-type: text/plain; charset=utf-8
connection: close
transfer-encoding: chunked

View file

@ -1,3 +0,0 @@
package rpc
type Meta map[string]interface{}

43
rpc/middleware.go Normal file
View file

@ -0,0 +1,43 @@
//Package rpc provides abstract rpc server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package rpc
import (
"context"
"strings"
"time"
)
type Middleware func(handler RpcHandler) RpcHandler
type RpcHandler func(ctx context.Context, req *RpcRequest) *RpcResponse
func LoggerMiddleware(logger Logger) Middleware {
return func(handler RpcHandler) RpcHandler {
return func(ctx context.Context, req *RpcRequest) *RpcResponse {
t1 := time.Now().UnixMicro()
resp := handler(ctx, req)
t2 := time.Now().UnixMicro()
args := strings.ReplaceAll(string(req.Params), "\n", "")
logger.Logf("rpc call=%s, args=%s, take=%dμs", req.Method, args, (t2 - t1))
return resp
}
}
}

42
rpc/options.go Normal file
View file

@ -0,0 +1,42 @@
//Package rpc provides abstract rpc server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package rpc
import "go.neonxp.dev/jsonrpc2/transport"
type Option func(s *RpcServer)
func WithTransport(transport transport.Transport) Option {
return func(s *RpcServer) {
s.transports = append(s.transports, transport)
}
}
func WithMiddleware(mw Middleware) Option {
return func(s *RpcServer) {
s.middlewares = append(s.middlewares, mw)
}
}
func WithLogger(l Logger) Option {
return func(s *RpcServer) {
s.logger = l
}
}

View file

@ -33,20 +33,29 @@ import (
const version = "2.0" const version = "2.0"
type RpcServer struct { type RpcServer struct {
Logger Logger logger Logger
IgnoreNotifications bool handlers map[string]Handler
handlers map[string]Handler middlewares []Middleware
transports []transport.Transport transports []transport.Transport
mu sync.RWMutex mu sync.RWMutex
} }
func New() *RpcServer { func New(opts ...Option) *RpcServer {
return &RpcServer{ s := &RpcServer{
Logger: nopLogger{}, logger: nopLogger{},
IgnoreNotifications: true, handlers: map[string]Handler{},
handlers: map[string]Handler{}, transports: []transport.Transport{},
transports: []transport.Transport{}, mu: sync.RWMutex{},
mu: sync.RWMutex{}, }
for _, opt := range opts {
opt(s)
}
return s
}
func (r *RpcServer) Use(opts ...Option) {
for _, opt := range opts {
opt(r)
} }
} }
@ -56,10 +65,6 @@ func (r *RpcServer) Register(method string, handler Handler) {
r.handlers[method] = handler r.handlers[method] = handler
} }
func (r *RpcServer) AddTransport(transport transport.Transport) {
r.transports = append(r.transports, transport)
}
func (r *RpcServer) Run(ctx context.Context) error { func (r *RpcServer) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
for _, t := range r.transports { for _, t := range r.transports {
@ -70,25 +75,27 @@ func (r *RpcServer) Run(ctx context.Context) error {
return eg.Wait() return eg.Wait()
} }
func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer, parallel bool) {
dec := json.NewDecoder(rd) dec := json.NewDecoder(rd)
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
mu := sync.Mutex{} mu := sync.Mutex{}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for { for {
req := new(rpcRequest) req := new(RpcRequest)
if err := dec.Decode(req); err != nil { if err := dec.Decode(req); err != nil {
if err == io.EOF { if err == io.EOF {
break break
} }
r.Logger.Logf("Can't read body: %v", err) r.logger.Logf("Can't read body: %v", err)
WriteError(ErrCodeParseError, enc) WriteError(ErrCodeParseError, enc)
break break
} }
wg.Add(1) exec := func() {
go func(req *rpcRequest) { h := r.callMethod
defer wg.Done() for _, m := range r.middlewares {
resp := r.callMethod(ctx, req) h = m(h)
}
resp := h(ctx, req)
if req.Id == nil { if req.Id == nil {
// notification request // notification request
return return
@ -96,23 +103,34 @@ func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if err := enc.Encode(resp); err != nil { if err := enc.Encode(resp); err != nil {
r.Logger.Logf("Can't write response: %v", err) r.logger.Logf("Can't write response: %v", err)
WriteError(ErrCodeInternalError, enc) WriteError(ErrCodeInternalError, enc)
} }
if w, canFlush := w.(Flusher); canFlush { if w, canFlush := w.(Flusher); canFlush {
w.Flush() w.Flush()
} }
}(req) }
if parallel {
wg.Add(1)
go func(req *RpcRequest) {
defer wg.Done()
exec()
}(req)
} else {
exec()
}
}
if parallel {
wg.Wait()
} }
wg.Wait()
} }
func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse { func (r *RpcServer) callMethod(ctx context.Context, req *RpcRequest) *RpcResponse {
r.mu.RLock() r.mu.RLock()
h, ok := r.handlers[req.Method] h, ok := r.handlers[req.Method]
r.mu.RUnlock() r.mu.RUnlock()
if !ok { if !ok {
return &rpcResponse{ return &RpcResponse{
Jsonrpc: version, Jsonrpc: version,
Error: ErrorFromCode(ErrCodeMethodNotFound), Error: ErrorFromCode(ErrCodeMethodNotFound),
Id: req.Id, Id: req.Id,
@ -120,14 +138,14 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons
} }
resp, err := h(ctx, req.Params) resp, err := h(ctx, req.Params)
if err != nil { if err != nil {
r.Logger.Logf("User error %v", err) r.logger.Logf("User error %v", err)
return &rpcResponse{ return &RpcResponse{
Jsonrpc: version, Jsonrpc: version,
Error: err, Error: err,
Id: req.Id, Id: req.Id,
} }
} }
return &rpcResponse{ return &RpcResponse{
Jsonrpc: version, Jsonrpc: version,
Result: resp, Result: resp,
Id: req.Id, Id: req.Id,
@ -135,20 +153,20 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons
} }
func WriteError(code int, enc *json.Encoder) { func WriteError(code int, enc *json.Encoder) {
enc.Encode(rpcResponse{ enc.Encode(RpcResponse{
Jsonrpc: version, Jsonrpc: version,
Error: ErrorFromCode(code), Error: ErrorFromCode(code),
}) })
} }
type rpcRequest struct { type RpcRequest struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"` Method string `json:"method"`
Params json.RawMessage `json:"params"` Params json.RawMessage `json:"params"`
Id any `json:"id"` Id any `json:"id"`
} }
type rpcResponse struct { type RpcResponse struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
Result json.RawMessage `json:"result,omitempty"` Result json.RawMessage `json:"result,omitempty"`
Error error `json:"error,omitempty"` Error error `json:"error,omitempty"`

View file

@ -1,3 +1,22 @@
//Package transport provides transports for rpc server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package transport package transport
import ( import (
@ -8,21 +27,33 @@ import (
) )
type HTTP struct { type HTTP struct {
Bind string Bind string
TLS *tls.Config TLS *tls.Config
CORSOrigin string
Parallel bool
} }
func (h *HTTP) Run(ctx context.Context, resolver Resolver) error { func (h *HTTP) Run(ctx context.Context, resolver Resolver) error {
srv := http.Server{ srv := http.Server{
Addr: h.Bind, Addr: h.Bind,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions && h.CORSOrigin != "" {
w.Header().Set("Access-Control-Allow-Origin", h.CORSOrigin)
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
return return
} }
if h.CORSOrigin != "" {
w.Header().Set("Access-Control-Allow-Origin", h.CORSOrigin)
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
}
w.Header().Add("Content-Type", "application/json") w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
resolver.Resolve(ctx, r.Body, w) resolver.Resolve(ctx, r.Body, w, h.Parallel)
}), }),
BaseContext: func(l net.Listener) context.Context { BaseContext: func(l net.Listener) context.Context {
return ctx return ctx

View file

@ -1,3 +1,22 @@
//Package transport provides transports for rpc server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package transport package transport
import ( import (
@ -6,17 +25,25 @@ import (
) )
type TCP struct { type TCP struct {
Bind string Bind string
Parallel bool
} }
func (t *TCP) Run(ctx context.Context, resolver Resolver) error { func (t *TCP) Run(ctx context.Context, resolver Resolver) error {
ln, _ := net.Listen("tcp", t.Bind) ln, err := net.Listen("tcp", t.Bind)
if err != nil {
return err
}
go func() {
<-ctx.Done()
ln.Close()
}()
for { for {
conn, err := ln.Accept() conn, err := ln.Accept()
if err != nil { if err != nil {
return err return err
} }
go resolver.Resolve(ctx, conn, conn) go resolver.Resolve(ctx, conn, conn, t.Parallel)
} }
} }

View file

@ -1,3 +1,22 @@
//Package transport provides transports for rpc server
//
//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
//
//This file is part of go.neonxp.dev/jsonrpc2 project.
//
//This program is free software: you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation, either version 3 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program. If not, see <https://www.gnu.org/licenses/>.
package transport package transport
import ( import (
@ -10,5 +29,5 @@ type Transport interface {
} }
type Resolver interface { type Resolver interface {
Resolve(context.Context, io.Reader, io.Writer) Resolve(ctx context.Context, reader io.Reader, writer io.Writer, isParallel bool)
} }