diff --git a/transports/websocket/errors.go b/transports/websocket/errors.go index e111f33..09f4e09 100644 --- a/transports/websocket/errors.go +++ b/transports/websocket/errors.go @@ -2,6 +2,4 @@ package websocket import "errors" -var ( - ErrClosing = errors.New("closing") -) +var ErrClosing = errors.New("closing") diff --git a/transports/websocket/events.go b/transports/websocket/events.go index a878eec..d376eaf 100644 --- a/transports/websocket/events.go +++ b/transports/websocket/events.go @@ -2,7 +2,6 @@ package websocket import ( "fmt" - "time" ) // ConnectingEvent is emitted when a new connection is being established. @@ -32,14 +31,3 @@ type DisconnectedEvent struct { func (e *DisconnectedEvent) String() string { return fmt.Sprintf("DISCONNECTED [url=%v, err=%v]", e.URL, e.Err) } - -// DialTimeoutEvent is emitted when establishing a new connection times out. -type DialTimeoutEvent struct { - URL string - Err error - Timeout time.Duration -} - -func (e *DialTimeoutEvent) String() string { - return fmt.Sprintf("DIAL_TIMEOUT [url=%v, err=%v, timeout=%v]", e.URL, e.Err, e.Timeout) -} diff --git a/transports/websocket/object_stream.go b/transports/websocket/object_stream.go new file mode 100644 index 0000000..ba16a13 --- /dev/null +++ b/transports/websocket/object_stream.go @@ -0,0 +1,36 @@ +package websocket + +import ( + "time" + + "github.com/gorilla/websocket" + jsonrpc2websocket "github.com/sourcegraph/jsonrpc2/websocket" +) + +// ObjectStream implements jsonrpc2.ObjectStream that uses a WebSocket. +// It extends jsonrpc2/websocket.ObjectStream with read/write timeouts. +type ObjectStream struct { + conn *websocket.Conn + stream jsonrpc2websocket.ObjectStream + + writeTimeout time.Duration + readTimeout time.Duration +} + +func NewObjectStream(conn *websocket.Conn, writeTimeout, readTimeout time.Duration) *ObjectStream { + return &ObjectStream{conn, jsonrpc2websocket.NewObjectStream(conn), writeTimeout, readTimeout} +} + +func (stream *ObjectStream) WriteObject(v interface{}) error { + stream.conn.SetWriteDeadline(time.Now().Add(stream.writeTimeout)) + return stream.stream.WriteObject(v) +} + +func (stream *ObjectStream) ReadObject(v interface{}) error { + stream.conn.SetReadDeadline(time.Now().Add(stream.readTimeout)) + return stream.stream.ReadObject(v) +} + +func (stream *ObjectStream) Close() error { + return stream.stream.Close() +} diff --git a/transports/websocket/transport.go b/transports/websocket/transport.go index a5bf488..c4e00ed 100644 --- a/transports/websocket/transport.go +++ b/transports/websocket/transport.go @@ -1,51 +1,69 @@ package websocket import ( + "io" // Stdlib - "crypto/tls" + "context" "net" - "net/url" "time" - // RPC - "github.com/go-steem/rpc/interfaces" - // Vendor + "github.com/gorilla/websocket" "github.com/pkg/errors" - "golang.org/x/net/websocket" + "github.com/sourcegraph/jsonrpc2" + tomb "gopkg.in/tomb.v2" ) const ( - DefaultDialTimeout = 30 * time.Second - DefaultAutoReconnectMaxDelay = 5 * time.Minute + DefaultHandshakeTimeout = 30 * time.Second + DefaultWriteTimeout = 10 * time.Second + DefaultReadTimeout = 20 * time.Second + DefaultAutoReconnectMaxDelay = 1 * time.Minute + + InitialAutoReconnectDelay = 1 * time.Second + AutoReconnectBackoffCoefficient = 1.5 ) +var netDialer net.Dialer + // Transport implements a CallCloser accessing the Steem RPC endpoint over WebSocket. type Transport struct { - // URL as passed into the constructor. - url *url.URL + // URLs as passed into the constructor. + urls []string + nextURLIndex int + currentURL string // Options. - dialTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration + handshakeTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration autoReconnectEnabled bool autoReconnectMaxDelay time.Duration monitorChan chan<- interface{} - // Underlying CallCloser. - cc interfaces.CallCloser + // The underlying JSON-RPC connection. + connCh chan chan *jsonrpc2.Conn + errCh chan error + + t *tomb.Tomb } // Option represents an option that can be passed into the transport constructor. type Option func(*Transport) // SetDialTimeout can be used to set the timeout when establishing a new connection. +// +// This function is deprecated, please use SetHandshakeTimeout. func SetDialTimeout(timeout time.Duration) Option { + return SetHandshakeTimeout(timeout) +} + +// SetHandshakeTimeout can be used to set the timeout for WebSocket handshake. +func SetHandshakeTimeout(timeout time.Duration) Option { return func(t *Transport) { - t.dialTimeout = timeout + t.handshakeTimeout = timeout } } @@ -110,19 +128,22 @@ func SetMonitor(monitorChan chan<- interface{}) Option { } } -// NewTransport creates a new transport that connects to the given WebSocket URL. -func NewTransport(endpointURL string, options ...Option) (*Transport, error) { - // Parse the URL. - epURL, err := url.Parse(endpointURL) - if err != nil { - return nil, errors.Wrap(err, "invalid endpoint URL") - } - +// NewTransport creates a new transport that connects to the given WebSocket URLs. +// +// It is possible to specify multiple WebSocket endpoint URLs. +// In case the transport is configured to reconnect automatically, +// the URL to connect to is rotated on every connect attempt using round-robin. +func NewTransport(urls []string, options ...Option) (*Transport, error) { // Prepare a transport instance. t := &Transport{ - url: epURL, - dialTimeout: DefaultDialTimeout, + urls: urls, + handshakeTimeout: DefaultHandshakeTimeout, + readTimeout: DefaultReadTimeout, + writeTimeout: DefaultWriteTimeout, autoReconnectMaxDelay: DefaultAutoReconnectMaxDelay, + connCh: make(chan chan *jsonrpc2.Conn), + errCh: make(chan error), + t: &tomb.Tomb{}, } // Apply the options. @@ -130,105 +151,156 @@ func NewTransport(endpointURL string, options ...Option) (*Transport, error) { opt(t) } - // Instantiate the underlying CallCloser based on the options. - var cc interfaces.CallCloser - if t.autoReconnectEnabled { - cc = newReconnectingTransport(t) - } else { - cc, err = newSimpleTransport(t) - } - if err != nil { - return nil, err - } - t.cc = cc + t.t.Go(t.dialer) // Return the new transport. return t, nil } // Call implements interfaces.CallCloser. -func (t *Transport) Call(method string, params, response interface{}) error { - return t.cc.Call(method, params, response) -} +func (t *Transport) Call(method string, params, result interface{}) error { + // Limit the request context with the tomb context. + ctx := t.t.Context(nil) + +Loop: + for { + // Request a connection. + connCh := make(chan *jsonrpc2.Conn, 1) + select { + case t.connCh <- connCh: + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context closed") + } -// Close implements interfaces.CallCloser. -func (t *Transport) Close() error { - return t.cc.Close() -} + // Receive the connection. + conn := <-connCh -// dial establishes a WebSocket connection according to the transport configuration. -func (t *Transport) dial(cancel <-chan struct{}) (*websocket.Conn, error) { - // Prepare a WebSocket config. - urlString := t.url.String() - config, err := websocket.NewConfig(urlString, "http://localhost") - if err != nil { - return nil, errors.Wrap(err, "failed to create WebSocket config") - } + // Perform the call. + err := conn.Call(ctx, method, params, result) + if err == nil { + return nil + } - // Establish the underlying TCP connection. - // We need to do this manually so that we can set up the timeout and the cancel channel. - var conn net.Conn - dialer := &net.Dialer{ - Timeout: t.dialTimeout, - Cancel: cancel, - } - switch t.url.Scheme { - case "ws": - conn, err = dialer.Dial("tcp", toHostPort(t.url)) + // In case this is a context error, return immediately. + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "context closed") + } - case "wss": - conn, err = tls.DialWithDialer(dialer, "tcp", toHostPort(t.url), nil) + // In case auto-reconnect is disabled, fail immediately. + if !t.autoReconnectEnabled { + return errors.Wrap(err, "call failed") + } - default: - err = errors.Wrapf(websocket.ErrBadScheme, "invalid WebSocket URL scheme: %v", t.url.Scheme) - } - if err != nil { - return nil, errors.Wrap(err, "failed to establish TCP connection") - } + // In case this is a connection error, request a new connection. + err = errors.Cause(err) + if _, ok := err.(*websocket.CloseError); ok || err == io.ErrUnexpectedEOF { + select { + case t.errCh <- errors.Wrap(err, "WebSocket closed"): + continue Loop + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context closed") + } + } - // Establish the WebSocket connection. - ws, err := websocket.NewClient(config, conn) - if err != nil { - return nil, errors.Wrap(err, "failed to establish WebSocket connection") + // Some other error occurred, return it immediately. + return errors.Wrap(err, "call failed") } - return ws, nil } -func (t *Transport) updateDeadline(ws *websocket.Conn) error { - // Set deadline in case read timeout is the same as write timeout. - if t.readTimeout != 0 && t.writeTimeout == t.readTimeout { - if err := ws.SetDeadline(time.Now().Add(t.readTimeout)); err != nil { - return errors.Wrap(err, "failed to set connection deadline") - } - return nil - } +func (t *Transport) dialer() error { + ctx := t.t.Context(nil) - // Set read deadline. - if t.readTimeout != 0 { - if err := ws.SetReadDeadline(time.Now().Add(t.readTimeout)); err != nil { - return errors.Wrap(err, "failed to set connection read deadline") + var conn *jsonrpc2.Conn + defer func() { + if conn != nil { + conn.Close() + err := errors.Wrap(ctx.Err(), "context closed") + t.emit(&DisconnectedEvent{t.currentURL, err}) + } + }() + + connect := func() { + delay := InitialAutoReconnectDelay + + for { + var err error + conn, err = t.dial(ctx) + if err == nil { + break + } + + select { + case <-time.After(delay): + delay = time.Duration(float64(delay) * AutoReconnectBackoffCoefficient) + if delay > t.autoReconnectMaxDelay { + delay = t.autoReconnectMaxDelay + } + + case <-ctx.Done(): + return + } } } - // Set write deadline. - if t.writeTimeout != 0 { - if err := ws.SetWriteDeadline(time.Now().Add(t.writeTimeout)); err != nil { - return errors.Wrap(err, "failed to set connection write deadline") + // Establish the initial connection. + connect() + + for { + select { + case connCh := <-t.connCh: + connCh <- conn + + case err := <-t.errCh: + conn.Close() + t.emit(&DisconnectedEvent{t.currentURL, err}) + connect() + + case <-ctx.Done(): + return nil } } - return nil } -var portMap = map[string]string{ - "ws": "80", - "wss": "443", +func (t *Transport) dial(ctx context.Context) (*jsonrpc2.Conn, error) { + // Set up a dialer. + dialer := websocket.Dialer{ + NetDial: func(network, addr string) (net.Conn, error) { + return netDialer.DialContext(ctx, network, addr) + }, + HandshakeTimeout: t.handshakeTimeout, + } + + // Get the next URL to try. + u := t.urls[t.nextURLIndex] + t.nextURLIndex = (t.nextURLIndex + 1) % len(t.urls) + t.currentURL = u + + // Connect the WebSocket. + t.emit(&ConnectingEvent{u}) + ws, _, err := dialer.Dial(u, nil) + if err != nil { + err = errors.Wrapf(err, "failed to dial %v", u) + t.emit(&DisconnectedEvent{u, err}) + return nil, err + } + t.emit(&ConnectedEvent{u}) + + // Wrap the WebSocket with JSON-RPC2. + stream := NewObjectStream(ws, t.writeTimeout, t.readTimeout) + return jsonrpc2.NewConn(ctx, stream, nil), nil } -func toHostPort(u *url.URL) string { - if _, ok := portMap[u.Scheme]; ok { - if _, _, err := net.SplitHostPort(u.Host); err != nil { - return net.JoinHostPort(u.Host, portMap[u.Scheme]) +func (t *Transport) emit(v interface{}) { + if t.monitorChan != nil { + select { + case t.monitorChan <- v: + default: } } - return u.Host +} + +// Close implements interfaces.CallCloser. +func (t *Transport) Close() error { + t.t.Kill(nil) + return t.t.Wait() } diff --git a/transports/websocket/transport_reconnect.go b/transports/websocket/transport_reconnect.go deleted file mode 100644 index a0a00cf..0000000 --- a/transports/websocket/transport_reconnect.go +++ /dev/null @@ -1,218 +0,0 @@ -package websocket - -import ( - // Stdlib - "io" - "net" - "time" - - // Vendor - "github.com/go-steem/rpc-codec/jsonrpc2" - "github.com/pkg/errors" - "golang.org/x/net/websocket" - "gopkg.in/tomb.v2" -) - -type callRequest struct { - method string - params interface{} - response interface{} - errCh chan<- error -} - -type reconnectingTransport struct { - parent *Transport - - ws *websocket.Conn - client *jsonrpc2.Client - requestCh chan *callRequest - - t *tomb.Tomb -} - -func newReconnectingTransport(parent *Transport) *reconnectingTransport { - cc := &reconnectingTransport{ - parent: parent, - requestCh: make(chan *callRequest), - t: &tomb.Tomb{}, - } - cc.t.Go(cc.worker) - return cc -} - -// Call implements interfaces.CallCloser. -func (t *reconnectingTransport) Call(method string, params, response interface{}) error { - errCh := make(chan error, 1) - select { - case t.requestCh <- &callRequest{method, params, response, errCh}: - return <-errCh - case <-t.t.Dying(): - return ErrClosing - } -} - -// Close implements interfaces.CallCloser. -func (t *reconnectingTransport) Close() error { - t.t.Kill(nil) - return t.t.Wait() -} - -func (t *reconnectingTransport) worker() error { - // Close the monitoring channel when returning. - if ch := t.parent.monitorChan; ch != nil { - defer func() { - close(ch) - }() - } - - // Keep processing incoming call requests until interrupted. - for { - select { - case req := <-t.requestCh: - req.errCh <- t.handleCall(req.method, req.params, req.response) - - case <-t.t.Dying(): - return nil - } - } -} - -func (t *reconnectingTransport) handleCall(method string, params, response interface{}) error { - for { - // Get an RPC client. This blocks until the client is available or Close() is called. - client, err := t.getClient() - if err != nil { - return err - } - - // Update the connection timeout if necessary. - if err := t.parent.updateDeadline(t.ws); err != nil { - return err - } - - // Perform the call. - if err := client.Call(method, params, response); err != nil { - // In case there is a network error, we retry immediately. - if err, ok := asNetworkError(err); ok { - t.dropClient(err) - continue - } - // The connection can be also closed unexpectedly. - // That counts as a network error for us as well. - if err == io.ErrUnexpectedEOF { - t.dropClient(err) - continue - } - // Otherwise we just return the error. - return err - } - - // Done. - return nil - } -} - -func (t *reconnectingTransport) getClient() (*jsonrpc2.Client, error) { - // In case the client is not set, establish a new connection. - if t.client == nil { - ws, err := t.connect() - if err != nil { - return nil, err - } - t.ws = ws - t.client = jsonrpc2.NewClient(ws) - } - - // Return the cached client. - return t.client, nil -} - -func (t *reconnectingTransport) dropClient(err error) { - if t.client != nil { - // Close and drop the client. - t.client.Close() - t.client = nil - t.ws = nil - - // Emit DISCONNECTED. - t.emitEvent(&DisconnectedEvent{ - URL: t.parent.url.String(), - Err: err, - }) - } -} - -func (t *reconnectingTransport) connect() (*websocket.Conn, error) { - // Get a new client. Keep trying to establish a new connection using exponential backoff. - timeout := 1 * time.Second - wait := func() error { - // Wait for the given period. - select { - case <-time.After(timeout): - case <-t.t.Dying(): - return ErrClosing - } - - // Update the timeout value. - timeout = 2 * timeout - if timeout > t.parent.autoReconnectMaxDelay { - timeout = t.parent.autoReconnectMaxDelay - } - return nil - } - - urlString := t.parent.url.String() - - for { - // Emit CONNECTING. - t.emitEvent(&ConnectingEvent{urlString}) - - // Try to establish a new WebSocket connection. - ws, err := t.parent.dial(t.t.Dying()) - if err != nil { - // Handle network errors. - if err, ok := asNetworkError(err); ok { - if err.Timeout() { - // Emit DIAL_TIMEOUT. - t.emitEvent(&DialTimeoutEvent{ - URL: urlString, - Err: err, - Timeout: timeout, - }) - } else { - // Emit DISCONNECTED. - t.emitEvent(&DisconnectedEvent{ - URL: urlString, - Err: err, - }) - } - - // Wait for the given period. - if err := wait(); err != nil { - return nil, err - } - // Try again. - continue - } - - // Otherwise just return the error. - return nil, err - } - - // Connection established. - // Emit CONNECTED and return a new client. - t.emitEvent(&ConnectedEvent{urlString}) - return ws, nil - } -} - -func (t *reconnectingTransport) emitEvent(event interface{}) { - if ch := t.parent.monitorChan; ch != nil { - ch <- event - } -} - -func asNetworkError(err error) (opError *net.OpError, ok bool) { - opError, ok = errors.Cause(err).(*net.OpError) - return -} diff --git a/transports/websocket/transport_simple.go b/transports/websocket/transport_simple.go deleted file mode 100644 index 4b207bb..0000000 --- a/transports/websocket/transport_simple.go +++ /dev/null @@ -1,52 +0,0 @@ -package websocket - -import ( - // Vendor - "github.com/go-steem/rpc-codec/jsonrpc2" - "github.com/pkg/errors" - "golang.org/x/net/websocket" -) - -// simpleTransport is not trying to be particularly clever about network errors. -// In case an error occurs, it is immediately returned and the transport is closed. -type simpleTransport struct { - parent *Transport - - ws *websocket.Conn - client *jsonrpc2.Client -} - -// newSimpleTransport establishes a new WebSocket connection. -// The function blocks until the process is finished. -func newSimpleTransport(parent *Transport) (*simpleTransport, error) { - // Establish the WebSocket connection. - ws, err := parent.dial(nil) - if err != nil { - return nil, err - } - - // Instantiate a JSON-RPC client. - client := jsonrpc2.NewClient(ws) - - // Return a new simple transport. - return &simpleTransport{parent, ws, client}, nil -} - -// Call implements interfaces.CallCloser. -func (t *simpleTransport) Call(method string, params, response interface{}) error { - if err := t.parent.updateDeadline(t.ws); err != nil { - return err - } - if err := t.client.Call(method, params, response); err != nil { - return errors.Wrapf(err, "failed to call %v(%v)", method, params) - } - return nil -} - -// Close implements interfaces.CallCloser. -func (t *simpleTransport) Close() error { - if err := t.client.Close(); err != nil { - return errors.Wrap(err, "failed to close JSON-RPC client") - } - return nil -}