From 9290441361557fbc7e348eae6c34ec243f9650df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kupka?= Date: Thu, 7 Jul 2016 15:22:48 +0200 Subject: [PATCH] Release go-steem/rpc v0.2.0 This release includes multiple breaking changes. * The way the RPC client is created is different now. You don't pass a URL into the constructor, but rather a transport implementation that is then used by the client internally. * Since there are multiple APIs exposed via the RPC endpoint, the database api, that is the only API supported so far, is not accessible on the client object itself any more, you need to use client.Database, i.e client.Database.GetConfig() instead of client.GetConfig(). * The WebSocket transport now supports auto-reconnect mode. When enabled, the transport will try to reconnect automatically when the connection is not there yet or it is lost. * It is also possible to set WebSocket connection read and write timeouts. --- apis/database/api.go | 348 ++++++++++++++++++ constants.go => apis/database/constants.go | 2 +- data.go => apis/database/data.go | 4 +- {types => apis/types}/id.go | 0 {types => apis/types}/int.go | 0 {types => apis/types}/time.go | 0 client.go | 387 +------------------- examples/voting_monitor/main.go | 81 +++- interfaces/callcloser.go | 8 + interfaces/caller.go | 5 + internal/call/utils.go | 19 + transport.go | 44 --- transports/websocket/errors.go | 7 + transports/websocket/events.go | 45 +++ transports/websocket/transport.go | 226 +++++++++++- transports/websocket/transport_reconnect.go | 218 +++++++++++ transports/websocket/transport_simple.go | 52 +++ 17 files changed, 1003 insertions(+), 443 deletions(-) create mode 100644 apis/database/api.go rename constants.go => apis/database/constants.go (98%) rename data.go => apis/database/data.go (99%) rename {types => apis/types}/id.go (100%) rename {types => apis/types}/int.go (100%) rename {types => apis/types}/time.go (100%) create mode 100644 interfaces/callcloser.go create mode 100644 interfaces/caller.go create mode 100644 internal/call/utils.go delete mode 100644 transport.go create mode 100644 transports/websocket/errors.go create mode 100644 transports/websocket/events.go create mode 100644 transports/websocket/transport_reconnect.go create mode 100644 transports/websocket/transport_simple.go diff --git a/apis/database/api.go b/apis/database/api.go new file mode 100644 index 0000000..fc1080f --- /dev/null +++ b/apis/database/api.go @@ -0,0 +1,348 @@ +package database + +import ( + // Stdlib + "encoding/json" + "errors" + + // RPC + "github.com/go-steem/rpc/interfaces" + "github.com/go-steem/rpc/internal/call" +) + +type API struct { + caller interfaces.Caller +} + +func NewAPI(caller interfaces.Caller) *API { + return &API{caller} +} + +/* + // Subscriptions + (set_subscribe_callback) + (set_pending_transaction_callback) + (set_block_applied_callback) + (cancel_all_subscriptions) +*/ + +/* + // Tags + (get_trending_tags) + (get_discussions_by_trending) + (get_discussions_by_created) + (get_discussions_by_active) + (get_discussions_by_cashout) + (get_discussions_by_payout) + (get_discussions_by_votes) + (get_discussions_by_children) + (get_discussions_by_hot) + (get_recommended_for) +*/ + +func (api *API) GetTrendingTagsRaw(afterTag string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_trending_tags", []interface{}{afterTag, limit}) +} + +type DiscussionQuery struct { + Tag string `json:"tag"` + Limit uint32 `json:"limit"` + // XXX: Not sure about the type here. + FilterTags []string `json:"filter_tags"` + StartAuthor string `json:"start_author,omitempty"` + StartPermlink string `json:"start_permlink,omitempty"` + ParentAuthor string `json:"parent_author,omitempty"` + ParentPermlink string `json:"parent_permlink"` +} + +func (api *API) GetDiscussionsByTrendingRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_trending", query) +} + +func (api *API) GetDiscussionsByCreatedRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_created", query) +} + +func (api *API) GetDiscussionsByActiveRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_active", query) +} + +func (api *API) GetDiscussionsByCashoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_cashout", query) +} + +func (api *API) GetDiscussionsByPayoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_payout", query) +} + +func (api *API) GetDiscussionsByVotesRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_votes", query) +} + +func (api *API) GetDiscussionsByChildrenRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_children", query) +} + +func (api *API) GetDiscussionsByHotRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_hot", query) +} + +func (api *API) GetRecommendedForRaw(user string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_votes", []interface{}{user, limit}) +} + +/* + // Blocks and transactions + (get_block_header) + (get_block) + (get_state) + (get_trending_categories) + (get_best_categories) + (get_active_categories) + (get_recent_categories) +*/ + +func (api *API) GetBlockHeaderRaw(blockNum uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_block_header", []uint32{blockNum}) +} + +func (api *API) GetBlockRaw(blockNum uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_block", []uint32{blockNum}) +} + +func (api *API) GetBlock(blockNum uint32) (*Block, error) { + var resp Block + if err := api.caller.Call("get_block", []uint32{blockNum}, &resp); err != nil { + return nil, err + } + resp.Number = blockNum + return &resp, nil +} + +func (api *API) GetStateRaw(path string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_state", []string{path}) +} + +func (api *API) GetTrendingCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_trending_categories", []interface{}{after, limit}) +} + +func (api *API) GetBestCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_best_categories", []interface{}{after, limit}) +} + +func (api *API) GetActiveCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_active_categories", []interface{}{after, limit}) +} + +func (api *API) GetRecentCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_recent_categories", []interface{}{after, limit}) +} + +/* + // Globals + (get_config) + (get_dynamic_global_properties) + (get_chain_properties) + (get_feed_history) + (get_current_median_history_price) + (get_witness_schedule) + (get_hardfork_version) + (get_next_scheduled_hardfork) +*/ + +func (api *API) GetConfigRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_config", call.EmptyParams) +} + +func (api *API) GetConfig() (*Config, error) { + var resp Config + if err := api.caller.Call("get_config", call.EmptyParams, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetDynamicGlobalPropertiesRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_dynamic_global_properties", call.EmptyParams) +} + +func (api *API) GetDynamicGlobalProperties() (*DynamicGlobalProperties, error) { + var resp DynamicGlobalProperties + if err := api.caller.Call("get_dynamic_global_properties", call.EmptyParams, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetChainPropertiesRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_chain_properties", call.EmptyParams) +} + +func (api *API) GetFeedHistoryRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_feed_history", call.EmptyParams) +} + +func (api *API) GetCurrentMedianHistoryPriceRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_current_median_history_price", call.EmptyParams) +} + +func (api *API) GetWitnessScheduleRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_witness_schedule", call.EmptyParams) +} + +func (api *API) GetHardforkVersionRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_hardfork_version", call.EmptyParams) +} + +func (api *API) GetHardforkVersion() (string, error) { + var resp string + if err := api.caller.Call("get_hardfork_version", call.EmptyParams, &resp); err != nil { + return "", err + } + return resp, nil +} + +func (api *API) GetNextScheduledHardforkRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_next_scheduled_hardfork", call.EmptyParams) +} + +/* + // Keys + (get_key_references) +*/ + +// XXX: Not sure about params. +//func (api *API) GetKeyReferencesRaw(key []string) (*json.RawMessage, error) { +// return call.Raw(api.caller, "get_key_references", [][]string{key}) +//} + +/* + // Accounts + (get_accounts) + (get_account_references) + (lookup_account_names) + (lookup_accounts) + (get_account_count) + (get_conversion_requests) + (get_account_history) +*/ + +func (api *API) GetAccountsRaw(accountNames []string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_accounts", [][]string{accountNames}) +} + +// XXX: Not sure about params. +//func (api *API) GetAccountReferenceRaw(id string) (*json.RawMessage, error) { +// return call.Raw(api.caller, "get_account_reference", []string{id}) +//} + +func (api *API) LookupAccountNamesRaw(accountNames []string) (*json.RawMessage, error) { + return call.Raw(api.caller, "lookup_account_names", [][]string{accountNames}) +} + +func (api *API) LookupAccountsRaw(lowerBoundName string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "lookup_accounts", []interface{}{lowerBoundName, limit}) +} + +func (api *API) GetAccountCountRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_count", call.EmptyParams) +} + +func (api *API) GetConversionRequestsRaw(accountName string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_conversion_requests", []string{accountName}) +} + +func (api *API) GetAccountHistoryRaw(account string, from uint64, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_history", []interface{}{account, from, limit}) +} + +/* + // Market + (get_order_book) +*/ + +func (api *API) GetOrderBookRaw(limit uint32) (*json.RawMessage, error) { + if limit > 1000 { + return nil, errors.New("GetOrderBook: limit must not exceed 1000") + } + return call.Raw(api.caller, "get_order_book", []interface{}{limit}) +} + +/* + // Authority / validation + (get_transaction_hex) + (get_transaction) + (get_required_signatures) + (get_potential_signatures) + (verify_authority) + (verify_account_authority) +*/ + +/* + // Votes + (get_active_votes) + (get_account_votes) +*/ + +func (api *API) GetActiveVotesRaw(author, permlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_active_votes", []string{author, permlink}) +} + +func (api *API) GetAccountVotesRaw(voter string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_votes", []string{voter}) +} + +/* + // Content + (get_content) + (get_content_replies) + (get_discussions_by_author_before_date) - MISSING + (get_replies_by_last_update) +*/ + +func (api *API) GetContentRaw(author, permlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_content", []string{author, permlink}) +} + +func (api *API) GetContent(author, permlink string) (*Content, error) { + var resp Content + if err := api.caller.Call("get_content", []string{author, permlink}, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetContentRepliesRaw(parentAuthor, parentPermlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_content_replies", []string{parentAuthor, parentPermlink}) +} + +func (api *API) GetContentReplies(parentAuthor, parentPermlink string) ([]*Content, error) { + var resp []*Content + err := api.caller.Call("get_content_replies", []string{parentAuthor, parentPermlink}, &resp) + if err != nil { + return nil, err + } + return resp, nil +} + +func (api *API) GetRepliesByLastUpdateRaw( + startAuthor string, + startPermlink string, + limit uint32, +) (*json.RawMessage, error) { + + return call.Raw( + api.caller, "get_replies_by_last_update", []interface{}{startAuthor, startPermlink, limit}) +} + +/* + // Witnesses + (get_witnesses) + (get_witness_by_account) + (get_witnesses_by_vote) + (lookup_witness_accounts) + (get_witness_count) + (get_active_witnesses) + (get_miner_queue) +*/ diff --git a/constants.go b/apis/database/constants.go similarity index 98% rename from constants.go rename to apis/database/constants.go index 34652ac..2657b5a 100644 --- a/constants.go +++ b/apis/database/constants.go @@ -1,4 +1,4 @@ -package rpc +package database const ( OpTypeVote = "vote" diff --git a/data.go b/apis/database/data.go similarity index 99% rename from data.go rename to apis/database/data.go index cc6bac3..e5ee91f 100644 --- a/data.go +++ b/apis/database/data.go @@ -1,4 +1,4 @@ -package rpc +package database import ( "encoding/json" @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/go-steem/rpc/types" + "github.com/go-steem/rpc/apis/types" ) type Config struct { diff --git a/types/id.go b/apis/types/id.go similarity index 100% rename from types/id.go rename to apis/types/id.go diff --git a/types/int.go b/apis/types/int.go similarity index 100% rename from types/int.go rename to apis/types/int.go diff --git a/types/time.go b/apis/types/time.go similarity index 100% rename from types/time.go rename to apis/types/time.go diff --git a/client.go b/client.go index 3401a6a..e261cf1 100644 --- a/client.go +++ b/client.go @@ -1,380 +1,31 @@ package rpc import ( - "encoding/json" - "errors" - "net/url" + // RPC + "github.com/go-steem/rpc/apis/database" + "github.com/go-steem/rpc/interfaces" ) -var emptyParams = []string{} - +// Client can be used to access Steem remote APIs. +// +// There is a public field for every Steem API available, +// e.g. Client.Database corresponds to database_api. type Client struct { - t Transport -} - -func Dial(address string) (*Client, error) { - // Parse the address URL. - u, err := url.Parse(address) - if err != nil { - return nil, err - } - - // Look for the constructor associated with the given URL scheme. - constructor, ok := registeredTransportConstructors[u.Scheme] - if !ok { - return nil, errors.New("no transport registered for URL scheme: " + u.Scheme) - } - - // Use the constructor to get a Transport. - transport, err := constructor(address) - if err != nil { - return nil, err - } - - // Return the new Client, at last. - return &Client{transport}, nil -} - -func (client *Client) Close() error { - return client.t.Close() -} - -/* - // Subscriptions - (set_subscribe_callback) - (set_pending_transaction_callback) - (set_block_applied_callback) - (cancel_all_subscriptions) -*/ - -/* - // Tags - (get_trending_tags) - (get_discussions_by_trending) - (get_discussions_by_created) - (get_discussions_by_active) - (get_discussions_by_cashout) - (get_discussions_by_payout) - (get_discussions_by_votes) - (get_discussions_by_children) - (get_discussions_by_hot) - (get_recommended_for) -*/ - -func (client *Client) GetTrendingTagsRaw(afterTag string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_trending_tags", []interface{}{afterTag, limit}) -} - -type DiscussionQuery struct { - Tag string `json:"tag"` - Limit uint32 `json:"limit"` - // XXX: Not sure about the type here. - FilterTags []string `json:"filter_tags"` - StartAuthor string `json:"start_author,omitempty"` - StartPermlink string `json:"start_permlink,omitempty"` - ParentAuthor string `json:"parent_author,omitempty"` - ParentPermlink string `json:"parent_permlink"` -} - -func (client *Client) GetDiscussionsByTrendingRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_trending", query) -} - -func (client *Client) GetDiscussionsByCreatedRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_created", query) -} - -func (client *Client) GetDiscussionsByActiveRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_active", query) -} - -func (client *Client) GetDiscussionsByCashoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_cashout", query) -} - -func (client *Client) GetDiscussionsByPayoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_payout", query) -} - -func (client *Client) GetDiscussionsByVotesRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_votes", query) -} - -func (client *Client) GetDiscussionsByChildrenRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_children", query) -} - -func (client *Client) GetDiscussionsByHotRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_hot", query) -} - -func (client *Client) GetRecommendedForRaw(user string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_votes", []interface{}{user, limit}) -} - -/* - // Blocks and transactions - (get_block_header) - (get_block) - (get_state) - (get_trending_categories) - (get_best_categories) - (get_active_categories) - (get_recent_categories) -*/ - -func (client *Client) GetBlockHeaderRaw(blockNum uint32) (*json.RawMessage, error) { - return client.callRaw("get_block_header", []uint32{blockNum}) -} - -func (client *Client) GetBlockRaw(blockNum uint32) (*json.RawMessage, error) { - return client.callRaw("get_block", []uint32{blockNum}) -} - -func (client *Client) GetBlock(blockNum uint32) (*Block, error) { - var resp Block - if err := client.t.Call("get_block", []uint32{blockNum}, &resp); err != nil { - return nil, err - } - resp.Number = blockNum - return &resp, nil -} - -func (client *Client) GetStateRaw(path string) (*json.RawMessage, error) { - return client.callRaw("get_state", []string{path}) -} - -func (client *Client) GetTrendingCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_trending_categories", []interface{}{after, limit}) -} - -func (client *Client) GetBestCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_best_categories", []interface{}{after, limit}) -} - -func (client *Client) GetActiveCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_active_categories", []interface{}{after, limit}) -} - -func (client *Client) GetRecentCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_recent_categories", []interface{}{after, limit}) -} - -/* - // Globals - (get_config) - (get_dynamic_global_properties) - (get_chain_properties) - (get_feed_history) - (get_current_median_history_price) - (get_witness_schedule) - (get_hardfork_version) - (get_next_scheduled_hardfork) -*/ - -func (client *Client) GetConfigRaw() (*json.RawMessage, error) { - return client.callRaw("get_config", emptyParams) -} - -func (client *Client) GetConfig() (*Config, error) { - var resp Config - if err := client.t.Call("get_config", emptyParams, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetDynamicGlobalPropertiesRaw() (*json.RawMessage, error) { - return client.callRaw("get_dynamic_global_properties", emptyParams) -} - -func (client *Client) GetDynamicGlobalProperties() (*DynamicGlobalProperties, error) { - var resp DynamicGlobalProperties - if err := client.t.Call("get_dynamic_global_properties", emptyParams, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetChainPropertiesRaw() (*json.RawMessage, error) { - return client.callRaw("get_chain_properties", emptyParams) -} - -func (client *Client) GetFeedHistoryRaw() (*json.RawMessage, error) { - return client.callRaw("get_feed_history", emptyParams) -} - -func (client *Client) GetCurrentMedianHistoryPriceRaw() (*json.RawMessage, error) { - return client.callRaw("get_current_median_history_price", emptyParams) -} - -func (client *Client) GetWitnessScheduleRaw() (*json.RawMessage, error) { - return client.callRaw("get_witness_schedule", emptyParams) -} - -func (client *Client) GetHardforkVersionRaw() (*json.RawMessage, error) { - return client.callRaw("get_hardfork_version", emptyParams) -} - -func (client *Client) GetHardforkVersion() (string, error) { - var resp string - if err := client.t.Call("get_hardfork_version", emptyParams, &resp); err != nil { - return "", err - } - return resp, nil -} - -func (client *Client) GetNextScheduledHardforkRaw() (*json.RawMessage, error) { - return client.callRaw("get_next_scheduled_hardfork", emptyParams) -} - -/* - // Keys - (get_key_references) -*/ - -// XXX: Not sure about params. -//func (client *Client) GetKeyReferencesRaw(key []string) (*json.RawMessage, error) { -// return client.callRaw("get_key_references", [][]string{key}) -//} + cc interfaces.CallCloser -/* - // Accounts - (get_accounts) - (get_account_references) - (lookup_account_names) - (lookup_accounts) - (get_account_count) - (get_conversion_requests) - (get_account_history) -*/ - -func (client *Client) GetAccountsRaw(accountNames []string) (*json.RawMessage, error) { - return client.callRaw("get_accounts", [][]string{accountNames}) -} - -// XXX: Not sure about params. -//func (client *Client) GetAccountReferenceRaw(id string) (*json.RawMessage, error) { -// return client.callRaw("get_account_reference", []string{id}) -//} - -func (client *Client) LookupAccountNamesRaw(accountNames []string) (*json.RawMessage, error) { - return client.callRaw("lookup_account_names", [][]string{accountNames}) + // Database represents database_api. + Database *database.API } -func (client *Client) LookupAccountsRaw(lowerBoundName string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("lookup_accounts", []interface{}{lowerBoundName, limit}) +// NewClient creates a new RPC client that use the given CallCloser internally. +func NewClient(cc interfaces.CallCloser) *Client { + client := &Client{cc: cc} + client.Database = database.NewAPI(client.cc) + return client } -func (client *Client) GetAccountCountRaw() (*json.RawMessage, error) { - return client.callRaw("get_account_count", emptyParams) -} - -func (client *Client) GetConversionRequestsRaw(accountName string) (*json.RawMessage, error) { - return client.callRaw("get_conversion_requests", []string{accountName}) -} - -func (client *Client) GetAccountHistoryRaw(account string, from uint64, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_account_history", []interface{}{account, from, limit}) -} - -/* - // Market - (get_order_book) -*/ - -func (client *Client) GetOrderBookRaw(limit uint32) (*json.RawMessage, error) { - if limit > 1000 { - return nil, errors.New("GetOrderBook: limit must not exceed 1000") - } - return client.callRaw("get_order_book", []interface{}{limit}) -} - -/* - // Authority / validation - (get_transaction_hex) - (get_transaction) - (get_required_signatures) - (get_potential_signatures) - (verify_authority) - (verify_account_authority) -*/ - -/* - // Votes - (get_active_votes) - (get_account_votes) -*/ - -func (client *Client) GetActiveVotesRaw(author, permlink string) (*json.RawMessage, error) { - return client.callRaw("get_active_votes", []string{author, permlink}) -} - -func (client *Client) GetAccountVotesRaw(voter string) (*json.RawMessage, error) { - return client.callRaw("get_account_votes", []string{voter}) -} - -/* - // Content - (get_content) - (get_content_replies) - (get_discussions_by_author_before_date) - MISSING - (get_replies_by_last_update) -*/ - -func (client *Client) GetContentRaw(author, permlink string) (*json.RawMessage, error) { - return client.callRaw("get_content", []string{author, permlink}) -} - -func (client *Client) GetContent(author, permlink string) (*Content, error) { - var resp Content - if err := client.t.Call("get_content", []string{author, permlink}, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetContentRepliesRaw(parentAuthor, parentPermlink string) (*json.RawMessage, error) { - return client.callRaw("get_content_replies", []string{parentAuthor, parentPermlink}) -} - -func (client *Client) GetContentReplies(parentAuthor, parentPermlink string) ([]*Content, error) { - var resp []*Content - err := client.t.Call("get_content_replies", []string{parentAuthor, parentPermlink}, &resp) - if err != nil { - return nil, err - } - return resp, nil -} - -func (client *Client) GetRepliesByLastUpdateRaw( - startAuthor string, - startPermlink string, - limit uint32, -) (*json.RawMessage, error) { - - return client.callRaw("get_replies_by_last_update", []interface{}{startAuthor, startPermlink, limit}) -} - -/* - // Witnesses - (get_witnesses) - (get_witness_by_account) - (get_witnesses_by_vote) - (lookup_witness_accounts) - (get_witness_count) - (get_active_witnesses) - (get_miner_queue) -*/ - -/* - * Helpers - */ - -func (client *Client) callRaw(method string, params interface{}) (*json.RawMessage, error) { - var resp json.RawMessage - if err := client.t.Call(method, params, &resp); err != nil { - return nil, err - } - return &resp, nil +// Close should be used to close the client when no longer needed. +// It simply calls Close() on the underlying CallCloser. +func (client *Client) Close() error { + return client.cc.Close() } diff --git a/examples/voting_monitor/main.go b/examples/voting_monitor/main.go index 6fdcbfb..0941f5e 100644 --- a/examples/voting_monitor/main.go +++ b/examples/voting_monitor/main.go @@ -4,40 +4,89 @@ import ( "flag" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/go-steem/rpc" + "github.com/go-steem/rpc/apis/database" + "github.com/go-steem/rpc/transports/websocket" ) func main() { if err := run(); err != nil { - log.Fatalln(err) + log.Fatalln("Error:", err) } } -func run() error { +func run() (err error) { // Process flags. flagAddress := flag.String("rpc_endpoint", "ws://localhost:8090", "steemd RPC endpoint address") + flagReconnect := flag.Bool("reconnect", false, "enable auto-reconnect mode") flag.Parse() - // Connect to the RPC endpoint. - addr := *flagAddress - log.Printf("---> Dial(\"%v\")\n", addr) - client, err := rpc.Dial(addr) + var ( + url = *flagAddress + reconnect = *flagReconnect + ) + + // Start catching signals. + var interrupted bool + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) + + // Drop the error in case it is a request being interrupted. + defer func() { + if err == websocket.ErrClosing && interrupted { + err = nil + } + }() + + // Start the connection monitor. + monitorChan := make(chan interface{}, 1) + if reconnect { + go func() { + for { + e := <-monitorChan + log.Println(e) + } + }() + } + + // Instantiate the WebSocket transport. + log.Printf("---> Dial(\"%v\")\n", url) + t, err := websocket.NewTransport(url, + websocket.SetAutoReconnectEnabled(reconnect), + websocket.SetAutoReconnectMaxDelay(30*time.Second), + websocket.SetMonitor(monitorChan)) if err != nil { return err } + + // Use the transport to get an RPC client. + client := rpc.NewClient(t) defer client.Close() + // Start processing signals. + go func() { + <-signalCh + fmt.Println() + log.Println("Signal received, exiting...") + signal.Stop(signalCh) + interrupted = true + client.Close() + }() + // Get config. log.Println("---> GetConfig()") - config, err := client.GetConfig() + config, err := client.Database.GetConfig() if err != nil { return err } // Use the last irreversible block number as the initial last block number. - props, err := client.GetDynamicGlobalProperties() + props, err := client.Database.GetDynamicGlobalProperties() if err != nil { return err } @@ -47,27 +96,27 @@ func run() error { log.Printf("---> Entering the block processing loop (last block = %v)\n", lastBlock) for { // Get current properties. - props, err := client.GetDynamicGlobalProperties() + props, err := client.Database.GetDynamicGlobalProperties() if err != nil { return err } // Process new blocks. for props.LastIrreversibleBlockNum-lastBlock > 0 { - block, err := client.GetBlock(lastBlock) + block, err := client.Database.GetBlock(lastBlock) if err != nil { return err } // Process the transactions. for _, tx := range block.Transactions { - for _, op := range tx.Operations { - switch body := op.Body.(type) { - case *rpc.VoteOperation: - fmt.Printf("@%v voted for @%v/%v\n", body.Voter, body.Author, body.Permlink) + for _, operation := range tx.Operations { + switch op := operation.Body.(type) { + case *database.VoteOperation: + fmt.Printf("@%v voted for @%v/%v\n", op.Voter, op.Author, op.Permlink) - // You can add more cases here, it depends on what - // operations you actually need to process. + // You can add more cases here, it depends on + // what operations you actually need to process. } } } diff --git a/interfaces/callcloser.go b/interfaces/callcloser.go new file mode 100644 index 0000000..978f1fa --- /dev/null +++ b/interfaces/callcloser.go @@ -0,0 +1,8 @@ +package interfaces + +import "io" + +type CallCloser interface { + Caller + io.Closer +} diff --git a/interfaces/caller.go b/interfaces/caller.go new file mode 100644 index 0000000..238c30b --- /dev/null +++ b/interfaces/caller.go @@ -0,0 +1,5 @@ +package interfaces + +type Caller interface { + Call(method string, params, response interface{}) error +} diff --git a/internal/call/utils.go b/internal/call/utils.go new file mode 100644 index 0000000..8886e44 --- /dev/null +++ b/internal/call/utils.go @@ -0,0 +1,19 @@ +package call + +import ( + // Stdlib + "encoding/json" + + // Vendor + "github.com/go-steem/rpc/interfaces" +) + +var EmptyParams = []string{} + +func Raw(caller interfaces.Caller, method string, params interface{}) (*json.RawMessage, error) { + var resp json.RawMessage + if err := caller.Call(method, params, &resp); err != nil { + return nil, err + } + return &resp, nil +} diff --git a/transport.go b/transport.go deleted file mode 100644 index 4dee564..0000000 --- a/transport.go +++ /dev/null @@ -1,44 +0,0 @@ -package rpc - -import ( - "github.com/go-steem/rpc/transports/websocket" -) - -// -// Types -// - -type Transport interface { - Call(method string, params, response interface{}) error - Close() error -} - -type TransportConstructor func(address string) (Transport, error) - -// -// Transport Registry -// - -var registeredTransportConstructors = map[string]TransportConstructor{} - -func RegisterTransport(scheme string, constructor TransportConstructor) { - registeredTransportConstructors[scheme] = constructor -} - -func AvailableTransports() []string { - schemes := make([]string, 0, len(registeredTransportConstructors)) - for scheme := range registeredTransportConstructors { - schemes = append(schemes, scheme) - } - return schemes -} - -// -// Transport Registry Init -// - -func init() { - RegisterTransport("ws", func(address string) (Transport, error) { - return websocket.Dial(address) - }) -} diff --git a/transports/websocket/errors.go b/transports/websocket/errors.go new file mode 100644 index 0000000..e111f33 --- /dev/null +++ b/transports/websocket/errors.go @@ -0,0 +1,7 @@ +package websocket + +import "errors" + +var ( + ErrClosing = errors.New("closing") +) diff --git a/transports/websocket/events.go b/transports/websocket/events.go new file mode 100644 index 0000000..a878eec --- /dev/null +++ b/transports/websocket/events.go @@ -0,0 +1,45 @@ +package websocket + +import ( + "fmt" + "time" +) + +// ConnectingEvent is emitted when a new connection is being established. +type ConnectingEvent struct { + URL string +} + +func (e *ConnectingEvent) String() string { + return fmt.Sprintf("CONNECTING [url=%v]", e.URL) +} + +// ConnectedEvent is emitted when the WebSocket connection is established. +type ConnectedEvent struct { + URL string +} + +func (e *ConnectedEvent) String() string { + return fmt.Sprintf("CONNECTED [url=%v]", e.URL) +} + +// DisconnectedEvent is emitted when the WebSocket connection is lost. +type DisconnectedEvent struct { + URL string + Err error +} + +func (e *DisconnectedEvent) String() string { + return fmt.Sprintf("DISCONNECTED [url=%v, err=%v]", e.URL, e.Err) +} + +// DialTimeoutEvent is emitted when establishing a new connection times out. +type DialTimeoutEvent struct { + URL string + Err error + Timeout time.Duration +} + +func (e *DialTimeoutEvent) String() string { + return fmt.Sprintf("DIAL_TIMEOUT [url=%v, err=%v, timeout=%v]", e.URL, e.Err, e.Timeout) +} diff --git a/transports/websocket/transport.go b/transports/websocket/transport.go index afb4abc..a5bf488 100644 --- a/transports/websocket/transport.go +++ b/transports/websocket/transport.go @@ -1,32 +1,234 @@ package websocket import ( - "github.com/go-steem/rpc-codec/jsonrpc2" + // Stdlib + "crypto/tls" + "net" + "net/url" + "time" + + // RPC + "github.com/go-steem/rpc/interfaces" + + // Vendor + "github.com/pkg/errors" "golang.org/x/net/websocket" ) +const ( + DefaultDialTimeout = 30 * time.Second + DefaultAutoReconnectMaxDelay = 5 * time.Minute +) + +// Transport implements a CallCloser accessing the Steem RPC endpoint over WebSocket. type Transport struct { - client *jsonrpc2.Client + // URL as passed into the constructor. + url *url.URL + + // Options. + dialTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + + autoReconnectEnabled bool + autoReconnectMaxDelay time.Duration + + monitorChan chan<- interface{} + + // Underlying CallCloser. + cc interfaces.CallCloser } -func Dial(address string) (*Transport, error) { - // Connect to the given WebSocket URL. - conn, err := websocket.Dial(address, "", "http://localhost") +// Option represents an option that can be passed into the transport constructor. +type Option func(*Transport) + +// SetDialTimeout can be used to set the timeout when establishing a new connection. +func SetDialTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.dialTimeout = timeout + } +} + +// SetReadTimeout sets the connection read timeout. +// The timeout is implemented using net.Conn.SetReadDeadline. +func SetReadTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.readTimeout = timeout + } +} + +// SetWriteTimeout sets the connection read timeout. +// The timeout is implemented using net.Conn.SetWriteDeadline. +func SetWriteTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.writeTimeout = timeout + } +} + +// SetReadWriteTimeout sets the connection read and write timeout. +// The timeout is implemented using net.Conn.SetDeadline. +func SetReadWriteTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.readTimeout = timeout + t.writeTimeout = timeout + } +} + +// SetAutoReconnectEnabled can be used to enable automatic reconnection to the RPC endpoint. +// Exponential backoff is used when the connection cannot be established repetitively. +// +// See SetAutoReconnectMaxDelay to set the maximum delay between the reconnection attempts. +func SetAutoReconnectEnabled(enabled bool) Option { + return func(t *Transport) { + t.autoReconnectEnabled = enabled + } +} + +// SetAutoReconnectMaxDelay can be used to set the maximum delay between the reconnection attempts. +// +// This option only takes effect when the auto-reconnect mode is enabled. +// +// The default value is 5 minutes. +func SetAutoReconnectMaxDelay(delay time.Duration) Option { + return func(t *Transport) { + t.autoReconnectMaxDelay = delay + } +} + +// SetMonitor can be used to set the monitoring channel that can be used to watch +// connection-related state changes. +// +// All channel send operations are happening synchronously, so not receiving messages +// from the channel will lead to the whole thing getting stuck completely. +// +// This option only takes effect when the auto-reconnect mode is enabled. +// +// The channel is closed when the transport is closed. +func SetMonitor(monitorChan chan<- interface{}) Option { + return func(t *Transport) { + t.monitorChan = monitorChan + } +} + +// NewTransport creates a new transport that connects to the given WebSocket URL. +func NewTransport(endpointURL string, options ...Option) (*Transport, error) { + // Parse the URL. + epURL, err := url.Parse(endpointURL) if err != nil { - return nil, err + return nil, errors.Wrap(err, "invalid endpoint URL") } - // Instantiate a JSON-RPC client. - client := jsonrpc2.NewClient(conn) + // Prepare a transport instance. + t := &Transport{ + url: epURL, + dialTimeout: DefaultDialTimeout, + autoReconnectMaxDelay: DefaultAutoReconnectMaxDelay, + } - // Return the transport. - return &Transport{client}, nil + // Apply the options. + for _, opt := range options { + opt(t) + } + + // Instantiate the underlying CallCloser based on the options. + var cc interfaces.CallCloser + if t.autoReconnectEnabled { + cc = newReconnectingTransport(t) + } else { + cc, err = newSimpleTransport(t) + } + if err != nil { + return nil, err + } + t.cc = cc + + // Return the new transport. + return t, nil } +// Call implements interfaces.CallCloser. func (t *Transport) Call(method string, params, response interface{}) error { - return t.client.Call(method, params, response) + return t.cc.Call(method, params, response) } +// Close implements interfaces.CallCloser. func (t *Transport) Close() error { - return t.client.Close() + return t.cc.Close() +} + +// dial establishes a WebSocket connection according to the transport configuration. +func (t *Transport) dial(cancel <-chan struct{}) (*websocket.Conn, error) { + // Prepare a WebSocket config. + urlString := t.url.String() + config, err := websocket.NewConfig(urlString, "http://localhost") + if err != nil { + return nil, errors.Wrap(err, "failed to create WebSocket config") + } + + // Establish the underlying TCP connection. + // We need to do this manually so that we can set up the timeout and the cancel channel. + var conn net.Conn + dialer := &net.Dialer{ + Timeout: t.dialTimeout, + Cancel: cancel, + } + switch t.url.Scheme { + case "ws": + conn, err = dialer.Dial("tcp", toHostPort(t.url)) + + case "wss": + conn, err = tls.DialWithDialer(dialer, "tcp", toHostPort(t.url), nil) + + default: + err = errors.Wrapf(websocket.ErrBadScheme, "invalid WebSocket URL scheme: %v", t.url.Scheme) + } + if err != nil { + return nil, errors.Wrap(err, "failed to establish TCP connection") + } + + // Establish the WebSocket connection. + ws, err := websocket.NewClient(config, conn) + if err != nil { + return nil, errors.Wrap(err, "failed to establish WebSocket connection") + } + return ws, nil +} + +func (t *Transport) updateDeadline(ws *websocket.Conn) error { + // Set deadline in case read timeout is the same as write timeout. + if t.readTimeout != 0 && t.writeTimeout == t.readTimeout { + if err := ws.SetDeadline(time.Now().Add(t.readTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection deadline") + } + return nil + } + + // Set read deadline. + if t.readTimeout != 0 { + if err := ws.SetReadDeadline(time.Now().Add(t.readTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection read deadline") + } + } + + // Set write deadline. + if t.writeTimeout != 0 { + if err := ws.SetWriteDeadline(time.Now().Add(t.writeTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection write deadline") + } + } + return nil +} + +var portMap = map[string]string{ + "ws": "80", + "wss": "443", +} + +func toHostPort(u *url.URL) string { + if _, ok := portMap[u.Scheme]; ok { + if _, _, err := net.SplitHostPort(u.Host); err != nil { + return net.JoinHostPort(u.Host, portMap[u.Scheme]) + } + } + return u.Host } diff --git a/transports/websocket/transport_reconnect.go b/transports/websocket/transport_reconnect.go new file mode 100644 index 0000000..a0a00cf --- /dev/null +++ b/transports/websocket/transport_reconnect.go @@ -0,0 +1,218 @@ +package websocket + +import ( + // Stdlib + "io" + "net" + "time" + + // Vendor + "github.com/go-steem/rpc-codec/jsonrpc2" + "github.com/pkg/errors" + "golang.org/x/net/websocket" + "gopkg.in/tomb.v2" +) + +type callRequest struct { + method string + params interface{} + response interface{} + errCh chan<- error +} + +type reconnectingTransport struct { + parent *Transport + + ws *websocket.Conn + client *jsonrpc2.Client + requestCh chan *callRequest + + t *tomb.Tomb +} + +func newReconnectingTransport(parent *Transport) *reconnectingTransport { + cc := &reconnectingTransport{ + parent: parent, + requestCh: make(chan *callRequest), + t: &tomb.Tomb{}, + } + cc.t.Go(cc.worker) + return cc +} + +// Call implements interfaces.CallCloser. +func (t *reconnectingTransport) Call(method string, params, response interface{}) error { + errCh := make(chan error, 1) + select { + case t.requestCh <- &callRequest{method, params, response, errCh}: + return <-errCh + case <-t.t.Dying(): + return ErrClosing + } +} + +// Close implements interfaces.CallCloser. +func (t *reconnectingTransport) Close() error { + t.t.Kill(nil) + return t.t.Wait() +} + +func (t *reconnectingTransport) worker() error { + // Close the monitoring channel when returning. + if ch := t.parent.monitorChan; ch != nil { + defer func() { + close(ch) + }() + } + + // Keep processing incoming call requests until interrupted. + for { + select { + case req := <-t.requestCh: + req.errCh <- t.handleCall(req.method, req.params, req.response) + + case <-t.t.Dying(): + return nil + } + } +} + +func (t *reconnectingTransport) handleCall(method string, params, response interface{}) error { + for { + // Get an RPC client. This blocks until the client is available or Close() is called. + client, err := t.getClient() + if err != nil { + return err + } + + // Update the connection timeout if necessary. + if err := t.parent.updateDeadline(t.ws); err != nil { + return err + } + + // Perform the call. + if err := client.Call(method, params, response); err != nil { + // In case there is a network error, we retry immediately. + if err, ok := asNetworkError(err); ok { + t.dropClient(err) + continue + } + // The connection can be also closed unexpectedly. + // That counts as a network error for us as well. + if err == io.ErrUnexpectedEOF { + t.dropClient(err) + continue + } + // Otherwise we just return the error. + return err + } + + // Done. + return nil + } +} + +func (t *reconnectingTransport) getClient() (*jsonrpc2.Client, error) { + // In case the client is not set, establish a new connection. + if t.client == nil { + ws, err := t.connect() + if err != nil { + return nil, err + } + t.ws = ws + t.client = jsonrpc2.NewClient(ws) + } + + // Return the cached client. + return t.client, nil +} + +func (t *reconnectingTransport) dropClient(err error) { + if t.client != nil { + // Close and drop the client. + t.client.Close() + t.client = nil + t.ws = nil + + // Emit DISCONNECTED. + t.emitEvent(&DisconnectedEvent{ + URL: t.parent.url.String(), + Err: err, + }) + } +} + +func (t *reconnectingTransport) connect() (*websocket.Conn, error) { + // Get a new client. Keep trying to establish a new connection using exponential backoff. + timeout := 1 * time.Second + wait := func() error { + // Wait for the given period. + select { + case <-time.After(timeout): + case <-t.t.Dying(): + return ErrClosing + } + + // Update the timeout value. + timeout = 2 * timeout + if timeout > t.parent.autoReconnectMaxDelay { + timeout = t.parent.autoReconnectMaxDelay + } + return nil + } + + urlString := t.parent.url.String() + + for { + // Emit CONNECTING. + t.emitEvent(&ConnectingEvent{urlString}) + + // Try to establish a new WebSocket connection. + ws, err := t.parent.dial(t.t.Dying()) + if err != nil { + // Handle network errors. + if err, ok := asNetworkError(err); ok { + if err.Timeout() { + // Emit DIAL_TIMEOUT. + t.emitEvent(&DialTimeoutEvent{ + URL: urlString, + Err: err, + Timeout: timeout, + }) + } else { + // Emit DISCONNECTED. + t.emitEvent(&DisconnectedEvent{ + URL: urlString, + Err: err, + }) + } + + // Wait for the given period. + if err := wait(); err != nil { + return nil, err + } + // Try again. + continue + } + + // Otherwise just return the error. + return nil, err + } + + // Connection established. + // Emit CONNECTED and return a new client. + t.emitEvent(&ConnectedEvent{urlString}) + return ws, nil + } +} + +func (t *reconnectingTransport) emitEvent(event interface{}) { + if ch := t.parent.monitorChan; ch != nil { + ch <- event + } +} + +func asNetworkError(err error) (opError *net.OpError, ok bool) { + opError, ok = errors.Cause(err).(*net.OpError) + return +} diff --git a/transports/websocket/transport_simple.go b/transports/websocket/transport_simple.go new file mode 100644 index 0000000..4b207bb --- /dev/null +++ b/transports/websocket/transport_simple.go @@ -0,0 +1,52 @@ +package websocket + +import ( + // Vendor + "github.com/go-steem/rpc-codec/jsonrpc2" + "github.com/pkg/errors" + "golang.org/x/net/websocket" +) + +// simpleTransport is not trying to be particularly clever about network errors. +// In case an error occurs, it is immediately returned and the transport is closed. +type simpleTransport struct { + parent *Transport + + ws *websocket.Conn + client *jsonrpc2.Client +} + +// newSimpleTransport establishes a new WebSocket connection. +// The function blocks until the process is finished. +func newSimpleTransport(parent *Transport) (*simpleTransport, error) { + // Establish the WebSocket connection. + ws, err := parent.dial(nil) + if err != nil { + return nil, err + } + + // Instantiate a JSON-RPC client. + client := jsonrpc2.NewClient(ws) + + // Return a new simple transport. + return &simpleTransport{parent, ws, client}, nil +} + +// Call implements interfaces.CallCloser. +func (t *simpleTransport) Call(method string, params, response interface{}) error { + if err := t.parent.updateDeadline(t.ws); err != nil { + return err + } + if err := t.client.Call(method, params, response); err != nil { + return errors.Wrapf(err, "failed to call %v(%v)", method, params) + } + return nil +} + +// Close implements interfaces.CallCloser. +func (t *simpleTransport) Close() error { + if err := t.client.Close(); err != nil { + return errors.Wrap(err, "failed to close JSON-RPC client") + } + return nil +}