From 4a81eff217c40c459c9a9ed4f318b4dd9bc5ee8a Mon Sep 17 00:00:00 2001 From: Alexander Kiryukhin Date: Sun, 22 May 2022 16:37:48 +0300 Subject: [PATCH] Middlewares and options --- README.md | 30 +++++++++++---- example/main.go | 13 +++++-- example/test.http | 36 ------------------ rpc/meta.go | 3 -- rpc/middleware.go | 43 +++++++++++++++++++++ rpc/options.go | 42 +++++++++++++++++++++ rpc/server.go | 86 +++++++++++++++++++++++++----------------- transport/http.go | 37 ++++++++++++++++-- transport/tcp.go | 33 ++++++++++++++-- transport/transport.go | 21 ++++++++++- 10 files changed, 253 insertions(+), 91 deletions(-) delete mode 100644 rpc/meta.go create mode 100644 rpc/middleware.go create mode 100644 rpc/options.go diff --git a/README.md b/README.md index 55e8cf9..8819880 100644 --- a/README.md +++ b/README.md @@ -13,19 +13,29 @@ Go 1.18+ required ## Usage (http transport) -1. Create JSON-RPC server: +1. Create JSON-RPC server with options: ```go import "go.neonxp.dev/jsonrpc2/rpc" ... - s := rpc.New() + s := rpc.New( + rpc.WithTransport(&transport.HTTP{ + Bind: ":8000", // Port to bind + CORSOrigin: "*", // CORS origin + TLS: &tls.Config{}, // Optional TLS config (default nil) + Parallel: true, // Allow parallel run batch methods (default false) + }), + //Other options like transports/middlewares... + ) ``` 2. Add required transport(s): ```go import "go.neonxp.dev/jsonrpc2/transport" ... - s.AddTransport(&transport.HTTP{Bind: ":8000"}) - s.AddTransport(&transport.TCP{Bind: ":3000"}) + s.Use( + rpc.WithTransport(&transport.TCP{Bind: ":3000"}), + //... + ) ``` 3. Write handler: @@ -72,10 +82,16 @@ import ( ) func main() { - s := rpc.New() + s := rpc.New( + rpc.WithLogger(rpc.StdLogger), // Optional logger + rpc.WithTransport(&transport.HTTP{Bind: ":8000"}), // HTTP transport + ) - s.AddTransport(&transport.HTTP{Bind: ":8000"}) // HTTP transport - s.AddTransport(&transport.TCP{Bind: ":3000"}) // TCP transport + // Set options after constructor + s.Use( + rpc.WithTransport(&transport.TCP{Bind: ":3000"}), // TCP transport + rpc.WithMiddleware(rpc.LoggerMiddleware(rpc.StdLogger)), // Logger middleware + ) s.Register("multiply", rpc.H(Multiply)) s.Register("divide", rpc.H(Divide)) diff --git a/example/main.go b/example/main.go index 9f25e61..d5d034e 100644 --- a/example/main.go +++ b/example/main.go @@ -12,10 +12,15 @@ import ( ) func main() { - s := rpc.New() - - s.AddTransport(&transport.HTTP{Bind: ":8000"}) - s.AddTransport(&transport.TCP{Bind: ":3000"}) + s := rpc.New( + rpc.WithLogger(rpc.StdLogger), + rpc.WithTransport(&transport.HTTP{Bind: ":8000", CORSOrigin: "*"}), + ) + // Set options after constructor + s.Use( + rpc.WithTransport(&transport.TCP{Bind: ":3000"}), + rpc.WithMiddleware(rpc.LoggerMiddleware(rpc.StdLogger)), + ) s.Register("multiply", rpc.H(Multiply)) s.Register("divide", rpc.H(Divide)) diff --git a/example/test.http b/example/test.http index 7ed134a..e126917 100644 --- a/example/test.http +++ b/example/test.http @@ -11,18 +11,6 @@ Content-Type: application/json } -date: Sat, 21 May 2022 16:53:31 GMT -content-length: 36 -content-type: text/plain; charset=utf-8 -connection: close - -HTTP/1.1 200 - OK -date: Sat, 21 May 2022 16:53:31 GMT -content-length: 36 -content-type: text/plain; charset=utf-8 -connection: close - - ### POST http://localhost:8000/ Content-Type: application/json @@ -37,18 +25,6 @@ Content-Type: application/json } -date: Sat, 21 May 2022 16:53:51 GMT -content-length: 52 -content-type: text/plain; charset=utf-8 -connection: close - -HTTP/1.1 200 - OK -date: Sat, 21 May 2022 16:53:51 GMT -content-length: 52 -content-type: text/plain; charset=utf-8 -connection: close - - ### Batch request POST http://localhost:8000/ Content-Type: application/json @@ -68,15 +44,3 @@ Content-Type: application/json {"foo": "boo"} {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"} {"jsonrpc": "2.0", "method": "get_data", "id": "9"} - - -date: Sat, 21 May 2022 17:18:17 GMT -content-type: text/plain; charset=utf-8 -connection: close -transfer-encoding: chunked - -HTTP/1.1 200 - OK -date: Sat, 21 May 2022 17:18:17 GMT -content-type: text/plain; charset=utf-8 -connection: close -transfer-encoding: chunked \ No newline at end of file diff --git a/rpc/meta.go b/rpc/meta.go deleted file mode 100644 index 77de69b..0000000 --- a/rpc/meta.go +++ /dev/null @@ -1,3 +0,0 @@ -package rpc - -type Meta map[string]interface{} diff --git a/rpc/middleware.go b/rpc/middleware.go new file mode 100644 index 0000000..cd99823 --- /dev/null +++ b/rpc/middleware.go @@ -0,0 +1,43 @@ +//Package rpc provides abstract rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + +package rpc + +import ( + "context" + "strings" + "time" +) + +type Middleware func(handler RpcHandler) RpcHandler + +type RpcHandler func(ctx context.Context, req *RpcRequest) *RpcResponse + +func LoggerMiddleware(logger Logger) Middleware { + return func(handler RpcHandler) RpcHandler { + return func(ctx context.Context, req *RpcRequest) *RpcResponse { + t1 := time.Now().UnixMicro() + resp := handler(ctx, req) + t2 := time.Now().UnixMicro() + args := strings.ReplaceAll(string(req.Params), "\n", "") + logger.Logf("rpc call=%s, args=%s, take=%dμs", req.Method, args, (t2 - t1)) + return resp + } + } +} diff --git a/rpc/options.go b/rpc/options.go new file mode 100644 index 0000000..825dbca --- /dev/null +++ b/rpc/options.go @@ -0,0 +1,42 @@ +//Package rpc provides abstract rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + +package rpc + +import "go.neonxp.dev/jsonrpc2/transport" + +type Option func(s *RpcServer) + +func WithTransport(transport transport.Transport) Option { + return func(s *RpcServer) { + s.transports = append(s.transports, transport) + } +} + +func WithMiddleware(mw Middleware) Option { + return func(s *RpcServer) { + s.middlewares = append(s.middlewares, mw) + } +} + +func WithLogger(l Logger) Option { + return func(s *RpcServer) { + s.logger = l + } +} diff --git a/rpc/server.go b/rpc/server.go index 9c5e847..b2f6158 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -33,20 +33,29 @@ import ( const version = "2.0" type RpcServer struct { - Logger Logger - IgnoreNotifications bool - handlers map[string]Handler - transports []transport.Transport - mu sync.RWMutex + logger Logger + handlers map[string]Handler + middlewares []Middleware + transports []transport.Transport + mu sync.RWMutex } -func New() *RpcServer { - return &RpcServer{ - Logger: nopLogger{}, - IgnoreNotifications: true, - handlers: map[string]Handler{}, - transports: []transport.Transport{}, - mu: sync.RWMutex{}, +func New(opts ...Option) *RpcServer { + s := &RpcServer{ + logger: nopLogger{}, + handlers: map[string]Handler{}, + transports: []transport.Transport{}, + mu: sync.RWMutex{}, + } + for _, opt := range opts { + opt(s) + } + return s +} + +func (r *RpcServer) Use(opts ...Option) { + for _, opt := range opts { + opt(r) } } @@ -56,10 +65,6 @@ func (r *RpcServer) Register(method string, handler Handler) { r.handlers[method] = handler } -func (r *RpcServer) AddTransport(transport transport.Transport) { - r.transports = append(r.transports, transport) -} - func (r *RpcServer) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) for _, t := range r.transports { @@ -70,25 +75,27 @@ func (r *RpcServer) Run(ctx context.Context) error { return eg.Wait() } -func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { +func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer, parallel bool) { dec := json.NewDecoder(rd) enc := json.NewEncoder(w) mu := sync.Mutex{} wg := sync.WaitGroup{} for { - req := new(rpcRequest) + req := new(RpcRequest) if err := dec.Decode(req); err != nil { if err == io.EOF { break } - r.Logger.Logf("Can't read body: %v", err) + r.logger.Logf("Can't read body: %v", err) WriteError(ErrCodeParseError, enc) break } - wg.Add(1) - go func(req *rpcRequest) { - defer wg.Done() - resp := r.callMethod(ctx, req) + exec := func() { + h := r.callMethod + for _, m := range r.middlewares { + h = m(h) + } + resp := h(ctx, req) if req.Id == nil { // notification request return @@ -96,23 +103,34 @@ func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { mu.Lock() defer mu.Unlock() if err := enc.Encode(resp); err != nil { - r.Logger.Logf("Can't write response: %v", err) + r.logger.Logf("Can't write response: %v", err) WriteError(ErrCodeInternalError, enc) } if w, canFlush := w.(Flusher); canFlush { w.Flush() } - }(req) + } + if parallel { + wg.Add(1) + go func(req *RpcRequest) { + defer wg.Done() + exec() + }(req) + } else { + exec() + } + } + if parallel { + wg.Wait() } - wg.Wait() } -func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse { +func (r *RpcServer) callMethod(ctx context.Context, req *RpcRequest) *RpcResponse { r.mu.RLock() h, ok := r.handlers[req.Method] r.mu.RUnlock() if !ok { - return &rpcResponse{ + return &RpcResponse{ Jsonrpc: version, Error: ErrorFromCode(ErrCodeMethodNotFound), Id: req.Id, @@ -120,14 +138,14 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons } resp, err := h(ctx, req.Params) if err != nil { - r.Logger.Logf("User error %v", err) - return &rpcResponse{ + r.logger.Logf("User error %v", err) + return &RpcResponse{ Jsonrpc: version, Error: err, Id: req.Id, } } - return &rpcResponse{ + return &RpcResponse{ Jsonrpc: version, Result: resp, Id: req.Id, @@ -135,20 +153,20 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons } func WriteError(code int, enc *json.Encoder) { - enc.Encode(rpcResponse{ + enc.Encode(RpcResponse{ Jsonrpc: version, Error: ErrorFromCode(code), }) } -type rpcRequest struct { +type RpcRequest struct { Jsonrpc string `json:"jsonrpc"` Method string `json:"method"` Params json.RawMessage `json:"params"` Id any `json:"id"` } -type rpcResponse struct { +type RpcResponse struct { Jsonrpc string `json:"jsonrpc"` Result json.RawMessage `json:"result,omitempty"` Error error `json:"error,omitempty"` diff --git a/transport/http.go b/transport/http.go index a795b10..bb2321b 100644 --- a/transport/http.go +++ b/transport/http.go @@ -1,3 +1,22 @@ +//Package transport provides transports for rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + package transport import ( @@ -8,21 +27,33 @@ import ( ) type HTTP struct { - Bind string - TLS *tls.Config + Bind string + TLS *tls.Config + CORSOrigin string + Parallel bool } func (h *HTTP) Run(ctx context.Context, resolver Resolver) error { srv := http.Server{ Addr: h.Bind, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodOptions && h.CORSOrigin != "" { + w.Header().Set("Access-Control-Allow-Origin", h.CORSOrigin) + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(http.StatusOK) + return + } if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } + if h.CORSOrigin != "" { + w.Header().Set("Access-Control-Allow-Origin", h.CORSOrigin) + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + } w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - resolver.Resolve(ctx, r.Body, w) + resolver.Resolve(ctx, r.Body, w, h.Parallel) }), BaseContext: func(l net.Listener) context.Context { return ctx diff --git a/transport/tcp.go b/transport/tcp.go index 2ab946a..e8bbabe 100644 --- a/transport/tcp.go +++ b/transport/tcp.go @@ -1,3 +1,22 @@ +//Package transport provides transports for rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + package transport import ( @@ -6,17 +25,25 @@ import ( ) type TCP struct { - Bind string + Bind string + Parallel bool } func (t *TCP) Run(ctx context.Context, resolver Resolver) error { - ln, _ := net.Listen("tcp", t.Bind) + ln, err := net.Listen("tcp", t.Bind) + if err != nil { + return err + } + go func() { + <-ctx.Done() + ln.Close() + }() for { conn, err := ln.Accept() if err != nil { return err } - go resolver.Resolve(ctx, conn, conn) + go resolver.Resolve(ctx, conn, conn, t.Parallel) } } diff --git a/transport/transport.go b/transport/transport.go index 2a54295..ff9c1ba 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -1,3 +1,22 @@ +//Package transport provides transports for rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + package transport import ( @@ -10,5 +29,5 @@ type Transport interface { } type Resolver interface { - Resolve(context.Context, io.Reader, io.Writer) + Resolve(ctx context.Context, reader io.Reader, writer io.Writer, isParallel bool) }