diff --git a/apis/database/api.go b/apis/database/api.go new file mode 100644 index 0000000..fc1080f --- /dev/null +++ b/apis/database/api.go @@ -0,0 +1,348 @@ +package database + +import ( + // Stdlib + "encoding/json" + "errors" + + // RPC + "github.com/go-steem/rpc/interfaces" + "github.com/go-steem/rpc/internal/call" +) + +type API struct { + caller interfaces.Caller +} + +func NewAPI(caller interfaces.Caller) *API { + return &API{caller} +} + +/* + // Subscriptions + (set_subscribe_callback) + (set_pending_transaction_callback) + (set_block_applied_callback) + (cancel_all_subscriptions) +*/ + +/* + // Tags + (get_trending_tags) + (get_discussions_by_trending) + (get_discussions_by_created) + (get_discussions_by_active) + (get_discussions_by_cashout) + (get_discussions_by_payout) + (get_discussions_by_votes) + (get_discussions_by_children) + (get_discussions_by_hot) + (get_recommended_for) +*/ + +func (api *API) GetTrendingTagsRaw(afterTag string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_trending_tags", []interface{}{afterTag, limit}) +} + +type DiscussionQuery struct { + Tag string `json:"tag"` + Limit uint32 `json:"limit"` + // XXX: Not sure about the type here. + FilterTags []string `json:"filter_tags"` + StartAuthor string `json:"start_author,omitempty"` + StartPermlink string `json:"start_permlink,omitempty"` + ParentAuthor string `json:"parent_author,omitempty"` + ParentPermlink string `json:"parent_permlink"` +} + +func (api *API) GetDiscussionsByTrendingRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_trending", query) +} + +func (api *API) GetDiscussionsByCreatedRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_created", query) +} + +func (api *API) GetDiscussionsByActiveRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_active", query) +} + +func (api *API) GetDiscussionsByCashoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_cashout", query) +} + +func (api *API) GetDiscussionsByPayoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_payout", query) +} + +func (api *API) GetDiscussionsByVotesRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_votes", query) +} + +func (api *API) GetDiscussionsByChildrenRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_children", query) +} + +func (api *API) GetDiscussionsByHotRaw(query *DiscussionQuery) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_hot", query) +} + +func (api *API) GetRecommendedForRaw(user string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_discussions_by_votes", []interface{}{user, limit}) +} + +/* + // Blocks and transactions + (get_block_header) + (get_block) + (get_state) + (get_trending_categories) + (get_best_categories) + (get_active_categories) + (get_recent_categories) +*/ + +func (api *API) GetBlockHeaderRaw(blockNum uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_block_header", []uint32{blockNum}) +} + +func (api *API) GetBlockRaw(blockNum uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_block", []uint32{blockNum}) +} + +func (api *API) GetBlock(blockNum uint32) (*Block, error) { + var resp Block + if err := api.caller.Call("get_block", []uint32{blockNum}, &resp); err != nil { + return nil, err + } + resp.Number = blockNum + return &resp, nil +} + +func (api *API) GetStateRaw(path string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_state", []string{path}) +} + +func (api *API) GetTrendingCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_trending_categories", []interface{}{after, limit}) +} + +func (api *API) GetBestCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_best_categories", []interface{}{after, limit}) +} + +func (api *API) GetActiveCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_active_categories", []interface{}{after, limit}) +} + +func (api *API) GetRecentCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_recent_categories", []interface{}{after, limit}) +} + +/* + // Globals + (get_config) + (get_dynamic_global_properties) + (get_chain_properties) + (get_feed_history) + (get_current_median_history_price) + (get_witness_schedule) + (get_hardfork_version) + (get_next_scheduled_hardfork) +*/ + +func (api *API) GetConfigRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_config", call.EmptyParams) +} + +func (api *API) GetConfig() (*Config, error) { + var resp Config + if err := api.caller.Call("get_config", call.EmptyParams, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetDynamicGlobalPropertiesRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_dynamic_global_properties", call.EmptyParams) +} + +func (api *API) GetDynamicGlobalProperties() (*DynamicGlobalProperties, error) { + var resp DynamicGlobalProperties + if err := api.caller.Call("get_dynamic_global_properties", call.EmptyParams, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetChainPropertiesRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_chain_properties", call.EmptyParams) +} + +func (api *API) GetFeedHistoryRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_feed_history", call.EmptyParams) +} + +func (api *API) GetCurrentMedianHistoryPriceRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_current_median_history_price", call.EmptyParams) +} + +func (api *API) GetWitnessScheduleRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_witness_schedule", call.EmptyParams) +} + +func (api *API) GetHardforkVersionRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_hardfork_version", call.EmptyParams) +} + +func (api *API) GetHardforkVersion() (string, error) { + var resp string + if err := api.caller.Call("get_hardfork_version", call.EmptyParams, &resp); err != nil { + return "", err + } + return resp, nil +} + +func (api *API) GetNextScheduledHardforkRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_next_scheduled_hardfork", call.EmptyParams) +} + +/* + // Keys + (get_key_references) +*/ + +// XXX: Not sure about params. +//func (api *API) GetKeyReferencesRaw(key []string) (*json.RawMessage, error) { +// return call.Raw(api.caller, "get_key_references", [][]string{key}) +//} + +/* + // Accounts + (get_accounts) + (get_account_references) + (lookup_account_names) + (lookup_accounts) + (get_account_count) + (get_conversion_requests) + (get_account_history) +*/ + +func (api *API) GetAccountsRaw(accountNames []string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_accounts", [][]string{accountNames}) +} + +// XXX: Not sure about params. +//func (api *API) GetAccountReferenceRaw(id string) (*json.RawMessage, error) { +// return call.Raw(api.caller, "get_account_reference", []string{id}) +//} + +func (api *API) LookupAccountNamesRaw(accountNames []string) (*json.RawMessage, error) { + return call.Raw(api.caller, "lookup_account_names", [][]string{accountNames}) +} + +func (api *API) LookupAccountsRaw(lowerBoundName string, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "lookup_accounts", []interface{}{lowerBoundName, limit}) +} + +func (api *API) GetAccountCountRaw() (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_count", call.EmptyParams) +} + +func (api *API) GetConversionRequestsRaw(accountName string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_conversion_requests", []string{accountName}) +} + +func (api *API) GetAccountHistoryRaw(account string, from uint64, limit uint32) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_history", []interface{}{account, from, limit}) +} + +/* + // Market + (get_order_book) +*/ + +func (api *API) GetOrderBookRaw(limit uint32) (*json.RawMessage, error) { + if limit > 1000 { + return nil, errors.New("GetOrderBook: limit must not exceed 1000") + } + return call.Raw(api.caller, "get_order_book", []interface{}{limit}) +} + +/* + // Authority / validation + (get_transaction_hex) + (get_transaction) + (get_required_signatures) + (get_potential_signatures) + (verify_authority) + (verify_account_authority) +*/ + +/* + // Votes + (get_active_votes) + (get_account_votes) +*/ + +func (api *API) GetActiveVotesRaw(author, permlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_active_votes", []string{author, permlink}) +} + +func (api *API) GetAccountVotesRaw(voter string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_account_votes", []string{voter}) +} + +/* + // Content + (get_content) + (get_content_replies) + (get_discussions_by_author_before_date) - MISSING + (get_replies_by_last_update) +*/ + +func (api *API) GetContentRaw(author, permlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_content", []string{author, permlink}) +} + +func (api *API) GetContent(author, permlink string) (*Content, error) { + var resp Content + if err := api.caller.Call("get_content", []string{author, permlink}, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (api *API) GetContentRepliesRaw(parentAuthor, parentPermlink string) (*json.RawMessage, error) { + return call.Raw(api.caller, "get_content_replies", []string{parentAuthor, parentPermlink}) +} + +func (api *API) GetContentReplies(parentAuthor, parentPermlink string) ([]*Content, error) { + var resp []*Content + err := api.caller.Call("get_content_replies", []string{parentAuthor, parentPermlink}, &resp) + if err != nil { + return nil, err + } + return resp, nil +} + +func (api *API) GetRepliesByLastUpdateRaw( + startAuthor string, + startPermlink string, + limit uint32, +) (*json.RawMessage, error) { + + return call.Raw( + api.caller, "get_replies_by_last_update", []interface{}{startAuthor, startPermlink, limit}) +} + +/* + // Witnesses + (get_witnesses) + (get_witness_by_account) + (get_witnesses_by_vote) + (lookup_witness_accounts) + (get_witness_count) + (get_active_witnesses) + (get_miner_queue) +*/ diff --git a/constants.go b/apis/database/constants.go similarity index 98% rename from constants.go rename to apis/database/constants.go index 34652ac..2657b5a 100644 --- a/constants.go +++ b/apis/database/constants.go @@ -1,4 +1,4 @@ -package rpc +package database const ( OpTypeVote = "vote" diff --git a/data.go b/apis/database/data.go similarity index 99% rename from data.go rename to apis/database/data.go index cc6bac3..e5ee91f 100644 --- a/data.go +++ b/apis/database/data.go @@ -1,4 +1,4 @@ -package rpc +package database import ( "encoding/json" @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/go-steem/rpc/types" + "github.com/go-steem/rpc/apis/types" ) type Config struct { diff --git a/types/id.go b/apis/types/id.go similarity index 100% rename from types/id.go rename to apis/types/id.go diff --git a/types/int.go b/apis/types/int.go similarity index 100% rename from types/int.go rename to apis/types/int.go diff --git a/types/time.go b/apis/types/time.go similarity index 100% rename from types/time.go rename to apis/types/time.go diff --git a/client.go b/client.go index 3401a6a..e261cf1 100644 --- a/client.go +++ b/client.go @@ -1,380 +1,31 @@ package rpc import ( - "encoding/json" - "errors" - "net/url" + // RPC + "github.com/go-steem/rpc/apis/database" + "github.com/go-steem/rpc/interfaces" ) -var emptyParams = []string{} - +// Client can be used to access Steem remote APIs. +// +// There is a public field for every Steem API available, +// e.g. Client.Database corresponds to database_api. type Client struct { - t Transport -} - -func Dial(address string) (*Client, error) { - // Parse the address URL. - u, err := url.Parse(address) - if err != nil { - return nil, err - } - - // Look for the constructor associated with the given URL scheme. - constructor, ok := registeredTransportConstructors[u.Scheme] - if !ok { - return nil, errors.New("no transport registered for URL scheme: " + u.Scheme) - } - - // Use the constructor to get a Transport. - transport, err := constructor(address) - if err != nil { - return nil, err - } - - // Return the new Client, at last. - return &Client{transport}, nil -} - -func (client *Client) Close() error { - return client.t.Close() -} - -/* - // Subscriptions - (set_subscribe_callback) - (set_pending_transaction_callback) - (set_block_applied_callback) - (cancel_all_subscriptions) -*/ - -/* - // Tags - (get_trending_tags) - (get_discussions_by_trending) - (get_discussions_by_created) - (get_discussions_by_active) - (get_discussions_by_cashout) - (get_discussions_by_payout) - (get_discussions_by_votes) - (get_discussions_by_children) - (get_discussions_by_hot) - (get_recommended_for) -*/ - -func (client *Client) GetTrendingTagsRaw(afterTag string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_trending_tags", []interface{}{afterTag, limit}) -} - -type DiscussionQuery struct { - Tag string `json:"tag"` - Limit uint32 `json:"limit"` - // XXX: Not sure about the type here. - FilterTags []string `json:"filter_tags"` - StartAuthor string `json:"start_author,omitempty"` - StartPermlink string `json:"start_permlink,omitempty"` - ParentAuthor string `json:"parent_author,omitempty"` - ParentPermlink string `json:"parent_permlink"` -} - -func (client *Client) GetDiscussionsByTrendingRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_trending", query) -} - -func (client *Client) GetDiscussionsByCreatedRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_created", query) -} - -func (client *Client) GetDiscussionsByActiveRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_active", query) -} - -func (client *Client) GetDiscussionsByCashoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_cashout", query) -} - -func (client *Client) GetDiscussionsByPayoutRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_payout", query) -} - -func (client *Client) GetDiscussionsByVotesRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_votes", query) -} - -func (client *Client) GetDiscussionsByChildrenRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_children", query) -} - -func (client *Client) GetDiscussionsByHotRaw(query *DiscussionQuery) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_hot", query) -} - -func (client *Client) GetRecommendedForRaw(user string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_discussions_by_votes", []interface{}{user, limit}) -} - -/* - // Blocks and transactions - (get_block_header) - (get_block) - (get_state) - (get_trending_categories) - (get_best_categories) - (get_active_categories) - (get_recent_categories) -*/ - -func (client *Client) GetBlockHeaderRaw(blockNum uint32) (*json.RawMessage, error) { - return client.callRaw("get_block_header", []uint32{blockNum}) -} - -func (client *Client) GetBlockRaw(blockNum uint32) (*json.RawMessage, error) { - return client.callRaw("get_block", []uint32{blockNum}) -} - -func (client *Client) GetBlock(blockNum uint32) (*Block, error) { - var resp Block - if err := client.t.Call("get_block", []uint32{blockNum}, &resp); err != nil { - return nil, err - } - resp.Number = blockNum - return &resp, nil -} - -func (client *Client) GetStateRaw(path string) (*json.RawMessage, error) { - return client.callRaw("get_state", []string{path}) -} - -func (client *Client) GetTrendingCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_trending_categories", []interface{}{after, limit}) -} - -func (client *Client) GetBestCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_best_categories", []interface{}{after, limit}) -} - -func (client *Client) GetActiveCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_active_categories", []interface{}{after, limit}) -} - -func (client *Client) GetRecentCategoriesRaw(after string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_recent_categories", []interface{}{after, limit}) -} - -/* - // Globals - (get_config) - (get_dynamic_global_properties) - (get_chain_properties) - (get_feed_history) - (get_current_median_history_price) - (get_witness_schedule) - (get_hardfork_version) - (get_next_scheduled_hardfork) -*/ - -func (client *Client) GetConfigRaw() (*json.RawMessage, error) { - return client.callRaw("get_config", emptyParams) -} - -func (client *Client) GetConfig() (*Config, error) { - var resp Config - if err := client.t.Call("get_config", emptyParams, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetDynamicGlobalPropertiesRaw() (*json.RawMessage, error) { - return client.callRaw("get_dynamic_global_properties", emptyParams) -} - -func (client *Client) GetDynamicGlobalProperties() (*DynamicGlobalProperties, error) { - var resp DynamicGlobalProperties - if err := client.t.Call("get_dynamic_global_properties", emptyParams, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetChainPropertiesRaw() (*json.RawMessage, error) { - return client.callRaw("get_chain_properties", emptyParams) -} - -func (client *Client) GetFeedHistoryRaw() (*json.RawMessage, error) { - return client.callRaw("get_feed_history", emptyParams) -} - -func (client *Client) GetCurrentMedianHistoryPriceRaw() (*json.RawMessage, error) { - return client.callRaw("get_current_median_history_price", emptyParams) -} - -func (client *Client) GetWitnessScheduleRaw() (*json.RawMessage, error) { - return client.callRaw("get_witness_schedule", emptyParams) -} - -func (client *Client) GetHardforkVersionRaw() (*json.RawMessage, error) { - return client.callRaw("get_hardfork_version", emptyParams) -} - -func (client *Client) GetHardforkVersion() (string, error) { - var resp string - if err := client.t.Call("get_hardfork_version", emptyParams, &resp); err != nil { - return "", err - } - return resp, nil -} - -func (client *Client) GetNextScheduledHardforkRaw() (*json.RawMessage, error) { - return client.callRaw("get_next_scheduled_hardfork", emptyParams) -} - -/* - // Keys - (get_key_references) -*/ - -// XXX: Not sure about params. -//func (client *Client) GetKeyReferencesRaw(key []string) (*json.RawMessage, error) { -// return client.callRaw("get_key_references", [][]string{key}) -//} + cc interfaces.CallCloser -/* - // Accounts - (get_accounts) - (get_account_references) - (lookup_account_names) - (lookup_accounts) - (get_account_count) - (get_conversion_requests) - (get_account_history) -*/ - -func (client *Client) GetAccountsRaw(accountNames []string) (*json.RawMessage, error) { - return client.callRaw("get_accounts", [][]string{accountNames}) -} - -// XXX: Not sure about params. -//func (client *Client) GetAccountReferenceRaw(id string) (*json.RawMessage, error) { -// return client.callRaw("get_account_reference", []string{id}) -//} - -func (client *Client) LookupAccountNamesRaw(accountNames []string) (*json.RawMessage, error) { - return client.callRaw("lookup_account_names", [][]string{accountNames}) + // Database represents database_api. + Database *database.API } -func (client *Client) LookupAccountsRaw(lowerBoundName string, limit uint32) (*json.RawMessage, error) { - return client.callRaw("lookup_accounts", []interface{}{lowerBoundName, limit}) +// NewClient creates a new RPC client that use the given CallCloser internally. +func NewClient(cc interfaces.CallCloser) *Client { + client := &Client{cc: cc} + client.Database = database.NewAPI(client.cc) + return client } -func (client *Client) GetAccountCountRaw() (*json.RawMessage, error) { - return client.callRaw("get_account_count", emptyParams) -} - -func (client *Client) GetConversionRequestsRaw(accountName string) (*json.RawMessage, error) { - return client.callRaw("get_conversion_requests", []string{accountName}) -} - -func (client *Client) GetAccountHistoryRaw(account string, from uint64, limit uint32) (*json.RawMessage, error) { - return client.callRaw("get_account_history", []interface{}{account, from, limit}) -} - -/* - // Market - (get_order_book) -*/ - -func (client *Client) GetOrderBookRaw(limit uint32) (*json.RawMessage, error) { - if limit > 1000 { - return nil, errors.New("GetOrderBook: limit must not exceed 1000") - } - return client.callRaw("get_order_book", []interface{}{limit}) -} - -/* - // Authority / validation - (get_transaction_hex) - (get_transaction) - (get_required_signatures) - (get_potential_signatures) - (verify_authority) - (verify_account_authority) -*/ - -/* - // Votes - (get_active_votes) - (get_account_votes) -*/ - -func (client *Client) GetActiveVotesRaw(author, permlink string) (*json.RawMessage, error) { - return client.callRaw("get_active_votes", []string{author, permlink}) -} - -func (client *Client) GetAccountVotesRaw(voter string) (*json.RawMessage, error) { - return client.callRaw("get_account_votes", []string{voter}) -} - -/* - // Content - (get_content) - (get_content_replies) - (get_discussions_by_author_before_date) - MISSING - (get_replies_by_last_update) -*/ - -func (client *Client) GetContentRaw(author, permlink string) (*json.RawMessage, error) { - return client.callRaw("get_content", []string{author, permlink}) -} - -func (client *Client) GetContent(author, permlink string) (*Content, error) { - var resp Content - if err := client.t.Call("get_content", []string{author, permlink}, &resp); err != nil { - return nil, err - } - return &resp, nil -} - -func (client *Client) GetContentRepliesRaw(parentAuthor, parentPermlink string) (*json.RawMessage, error) { - return client.callRaw("get_content_replies", []string{parentAuthor, parentPermlink}) -} - -func (client *Client) GetContentReplies(parentAuthor, parentPermlink string) ([]*Content, error) { - var resp []*Content - err := client.t.Call("get_content_replies", []string{parentAuthor, parentPermlink}, &resp) - if err != nil { - return nil, err - } - return resp, nil -} - -func (client *Client) GetRepliesByLastUpdateRaw( - startAuthor string, - startPermlink string, - limit uint32, -) (*json.RawMessage, error) { - - return client.callRaw("get_replies_by_last_update", []interface{}{startAuthor, startPermlink, limit}) -} - -/* - // Witnesses - (get_witnesses) - (get_witness_by_account) - (get_witnesses_by_vote) - (lookup_witness_accounts) - (get_witness_count) - (get_active_witnesses) - (get_miner_queue) -*/ - -/* - * Helpers - */ - -func (client *Client) callRaw(method string, params interface{}) (*json.RawMessage, error) { - var resp json.RawMessage - if err := client.t.Call(method, params, &resp); err != nil { - return nil, err - } - return &resp, nil +// Close should be used to close the client when no longer needed. +// It simply calls Close() on the underlying CallCloser. +func (client *Client) Close() error { + return client.cc.Close() } diff --git a/examples/voting_monitor/main.go b/examples/voting_monitor/main.go index 6fdcbfb..0941f5e 100644 --- a/examples/voting_monitor/main.go +++ b/examples/voting_monitor/main.go @@ -4,40 +4,89 @@ import ( "flag" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/go-steem/rpc" + "github.com/go-steem/rpc/apis/database" + "github.com/go-steem/rpc/transports/websocket" ) func main() { if err := run(); err != nil { - log.Fatalln(err) + log.Fatalln("Error:", err) } } -func run() error { +func run() (err error) { // Process flags. flagAddress := flag.String("rpc_endpoint", "ws://localhost:8090", "steemd RPC endpoint address") + flagReconnect := flag.Bool("reconnect", false, "enable auto-reconnect mode") flag.Parse() - // Connect to the RPC endpoint. - addr := *flagAddress - log.Printf("---> Dial(\"%v\")\n", addr) - client, err := rpc.Dial(addr) + var ( + url = *flagAddress + reconnect = *flagReconnect + ) + + // Start catching signals. + var interrupted bool + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) + + // Drop the error in case it is a request being interrupted. + defer func() { + if err == websocket.ErrClosing && interrupted { + err = nil + } + }() + + // Start the connection monitor. + monitorChan := make(chan interface{}, 1) + if reconnect { + go func() { + for { + e := <-monitorChan + log.Println(e) + } + }() + } + + // Instantiate the WebSocket transport. + log.Printf("---> Dial(\"%v\")\n", url) + t, err := websocket.NewTransport(url, + websocket.SetAutoReconnectEnabled(reconnect), + websocket.SetAutoReconnectMaxDelay(30*time.Second), + websocket.SetMonitor(monitorChan)) if err != nil { return err } + + // Use the transport to get an RPC client. + client := rpc.NewClient(t) defer client.Close() + // Start processing signals. + go func() { + <-signalCh + fmt.Println() + log.Println("Signal received, exiting...") + signal.Stop(signalCh) + interrupted = true + client.Close() + }() + // Get config. log.Println("---> GetConfig()") - config, err := client.GetConfig() + config, err := client.Database.GetConfig() if err != nil { return err } // Use the last irreversible block number as the initial last block number. - props, err := client.GetDynamicGlobalProperties() + props, err := client.Database.GetDynamicGlobalProperties() if err != nil { return err } @@ -47,27 +96,27 @@ func run() error { log.Printf("---> Entering the block processing loop (last block = %v)\n", lastBlock) for { // Get current properties. - props, err := client.GetDynamicGlobalProperties() + props, err := client.Database.GetDynamicGlobalProperties() if err != nil { return err } // Process new blocks. for props.LastIrreversibleBlockNum-lastBlock > 0 { - block, err := client.GetBlock(lastBlock) + block, err := client.Database.GetBlock(lastBlock) if err != nil { return err } // Process the transactions. for _, tx := range block.Transactions { - for _, op := range tx.Operations { - switch body := op.Body.(type) { - case *rpc.VoteOperation: - fmt.Printf("@%v voted for @%v/%v\n", body.Voter, body.Author, body.Permlink) + for _, operation := range tx.Operations { + switch op := operation.Body.(type) { + case *database.VoteOperation: + fmt.Printf("@%v voted for @%v/%v\n", op.Voter, op.Author, op.Permlink) - // You can add more cases here, it depends on what - // operations you actually need to process. + // You can add more cases here, it depends on + // what operations you actually need to process. } } } diff --git a/interfaces/callcloser.go b/interfaces/callcloser.go new file mode 100644 index 0000000..978f1fa --- /dev/null +++ b/interfaces/callcloser.go @@ -0,0 +1,8 @@ +package interfaces + +import "io" + +type CallCloser interface { + Caller + io.Closer +} diff --git a/interfaces/caller.go b/interfaces/caller.go new file mode 100644 index 0000000..238c30b --- /dev/null +++ b/interfaces/caller.go @@ -0,0 +1,5 @@ +package interfaces + +type Caller interface { + Call(method string, params, response interface{}) error +} diff --git a/internal/call/utils.go b/internal/call/utils.go new file mode 100644 index 0000000..8886e44 --- /dev/null +++ b/internal/call/utils.go @@ -0,0 +1,19 @@ +package call + +import ( + // Stdlib + "encoding/json" + + // Vendor + "github.com/go-steem/rpc/interfaces" +) + +var EmptyParams = []string{} + +func Raw(caller interfaces.Caller, method string, params interface{}) (*json.RawMessage, error) { + var resp json.RawMessage + if err := caller.Call(method, params, &resp); err != nil { + return nil, err + } + return &resp, nil +} diff --git a/transport.go b/transport.go deleted file mode 100644 index 4dee564..0000000 --- a/transport.go +++ /dev/null @@ -1,44 +0,0 @@ -package rpc - -import ( - "github.com/go-steem/rpc/transports/websocket" -) - -// -// Types -// - -type Transport interface { - Call(method string, params, response interface{}) error - Close() error -} - -type TransportConstructor func(address string) (Transport, error) - -// -// Transport Registry -// - -var registeredTransportConstructors = map[string]TransportConstructor{} - -func RegisterTransport(scheme string, constructor TransportConstructor) { - registeredTransportConstructors[scheme] = constructor -} - -func AvailableTransports() []string { - schemes := make([]string, 0, len(registeredTransportConstructors)) - for scheme := range registeredTransportConstructors { - schemes = append(schemes, scheme) - } - return schemes -} - -// -// Transport Registry Init -// - -func init() { - RegisterTransport("ws", func(address string) (Transport, error) { - return websocket.Dial(address) - }) -} diff --git a/transports/websocket/errors.go b/transports/websocket/errors.go new file mode 100644 index 0000000..e111f33 --- /dev/null +++ b/transports/websocket/errors.go @@ -0,0 +1,7 @@ +package websocket + +import "errors" + +var ( + ErrClosing = errors.New("closing") +) diff --git a/transports/websocket/events.go b/transports/websocket/events.go new file mode 100644 index 0000000..a878eec --- /dev/null +++ b/transports/websocket/events.go @@ -0,0 +1,45 @@ +package websocket + +import ( + "fmt" + "time" +) + +// ConnectingEvent is emitted when a new connection is being established. +type ConnectingEvent struct { + URL string +} + +func (e *ConnectingEvent) String() string { + return fmt.Sprintf("CONNECTING [url=%v]", e.URL) +} + +// ConnectedEvent is emitted when the WebSocket connection is established. +type ConnectedEvent struct { + URL string +} + +func (e *ConnectedEvent) String() string { + return fmt.Sprintf("CONNECTED [url=%v]", e.URL) +} + +// DisconnectedEvent is emitted when the WebSocket connection is lost. +type DisconnectedEvent struct { + URL string + Err error +} + +func (e *DisconnectedEvent) String() string { + return fmt.Sprintf("DISCONNECTED [url=%v, err=%v]", e.URL, e.Err) +} + +// DialTimeoutEvent is emitted when establishing a new connection times out. +type DialTimeoutEvent struct { + URL string + Err error + Timeout time.Duration +} + +func (e *DialTimeoutEvent) String() string { + return fmt.Sprintf("DIAL_TIMEOUT [url=%v, err=%v, timeout=%v]", e.URL, e.Err, e.Timeout) +} diff --git a/transports/websocket/transport.go b/transports/websocket/transport.go index afb4abc..a5bf488 100644 --- a/transports/websocket/transport.go +++ b/transports/websocket/transport.go @@ -1,32 +1,234 @@ package websocket import ( - "github.com/go-steem/rpc-codec/jsonrpc2" + // Stdlib + "crypto/tls" + "net" + "net/url" + "time" + + // RPC + "github.com/go-steem/rpc/interfaces" + + // Vendor + "github.com/pkg/errors" "golang.org/x/net/websocket" ) +const ( + DefaultDialTimeout = 30 * time.Second + DefaultAutoReconnectMaxDelay = 5 * time.Minute +) + +// Transport implements a CallCloser accessing the Steem RPC endpoint over WebSocket. type Transport struct { - client *jsonrpc2.Client + // URL as passed into the constructor. + url *url.URL + + // Options. + dialTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + + autoReconnectEnabled bool + autoReconnectMaxDelay time.Duration + + monitorChan chan<- interface{} + + // Underlying CallCloser. + cc interfaces.CallCloser } -func Dial(address string) (*Transport, error) { - // Connect to the given WebSocket URL. - conn, err := websocket.Dial(address, "", "http://localhost") +// Option represents an option that can be passed into the transport constructor. +type Option func(*Transport) + +// SetDialTimeout can be used to set the timeout when establishing a new connection. +func SetDialTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.dialTimeout = timeout + } +} + +// SetReadTimeout sets the connection read timeout. +// The timeout is implemented using net.Conn.SetReadDeadline. +func SetReadTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.readTimeout = timeout + } +} + +// SetWriteTimeout sets the connection read timeout. +// The timeout is implemented using net.Conn.SetWriteDeadline. +func SetWriteTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.writeTimeout = timeout + } +} + +// SetReadWriteTimeout sets the connection read and write timeout. +// The timeout is implemented using net.Conn.SetDeadline. +func SetReadWriteTimeout(timeout time.Duration) Option { + return func(t *Transport) { + t.readTimeout = timeout + t.writeTimeout = timeout + } +} + +// SetAutoReconnectEnabled can be used to enable automatic reconnection to the RPC endpoint. +// Exponential backoff is used when the connection cannot be established repetitively. +// +// See SetAutoReconnectMaxDelay to set the maximum delay between the reconnection attempts. +func SetAutoReconnectEnabled(enabled bool) Option { + return func(t *Transport) { + t.autoReconnectEnabled = enabled + } +} + +// SetAutoReconnectMaxDelay can be used to set the maximum delay between the reconnection attempts. +// +// This option only takes effect when the auto-reconnect mode is enabled. +// +// The default value is 5 minutes. +func SetAutoReconnectMaxDelay(delay time.Duration) Option { + return func(t *Transport) { + t.autoReconnectMaxDelay = delay + } +} + +// SetMonitor can be used to set the monitoring channel that can be used to watch +// connection-related state changes. +// +// All channel send operations are happening synchronously, so not receiving messages +// from the channel will lead to the whole thing getting stuck completely. +// +// This option only takes effect when the auto-reconnect mode is enabled. +// +// The channel is closed when the transport is closed. +func SetMonitor(monitorChan chan<- interface{}) Option { + return func(t *Transport) { + t.monitorChan = monitorChan + } +} + +// NewTransport creates a new transport that connects to the given WebSocket URL. +func NewTransport(endpointURL string, options ...Option) (*Transport, error) { + // Parse the URL. + epURL, err := url.Parse(endpointURL) if err != nil { - return nil, err + return nil, errors.Wrap(err, "invalid endpoint URL") } - // Instantiate a JSON-RPC client. - client := jsonrpc2.NewClient(conn) + // Prepare a transport instance. + t := &Transport{ + url: epURL, + dialTimeout: DefaultDialTimeout, + autoReconnectMaxDelay: DefaultAutoReconnectMaxDelay, + } - // Return the transport. - return &Transport{client}, nil + // Apply the options. + for _, opt := range options { + opt(t) + } + + // Instantiate the underlying CallCloser based on the options. + var cc interfaces.CallCloser + if t.autoReconnectEnabled { + cc = newReconnectingTransport(t) + } else { + cc, err = newSimpleTransport(t) + } + if err != nil { + return nil, err + } + t.cc = cc + + // Return the new transport. + return t, nil } +// Call implements interfaces.CallCloser. func (t *Transport) Call(method string, params, response interface{}) error { - return t.client.Call(method, params, response) + return t.cc.Call(method, params, response) } +// Close implements interfaces.CallCloser. func (t *Transport) Close() error { - return t.client.Close() + return t.cc.Close() +} + +// dial establishes a WebSocket connection according to the transport configuration. +func (t *Transport) dial(cancel <-chan struct{}) (*websocket.Conn, error) { + // Prepare a WebSocket config. + urlString := t.url.String() + config, err := websocket.NewConfig(urlString, "http://localhost") + if err != nil { + return nil, errors.Wrap(err, "failed to create WebSocket config") + } + + // Establish the underlying TCP connection. + // We need to do this manually so that we can set up the timeout and the cancel channel. + var conn net.Conn + dialer := &net.Dialer{ + Timeout: t.dialTimeout, + Cancel: cancel, + } + switch t.url.Scheme { + case "ws": + conn, err = dialer.Dial("tcp", toHostPort(t.url)) + + case "wss": + conn, err = tls.DialWithDialer(dialer, "tcp", toHostPort(t.url), nil) + + default: + err = errors.Wrapf(websocket.ErrBadScheme, "invalid WebSocket URL scheme: %v", t.url.Scheme) + } + if err != nil { + return nil, errors.Wrap(err, "failed to establish TCP connection") + } + + // Establish the WebSocket connection. + ws, err := websocket.NewClient(config, conn) + if err != nil { + return nil, errors.Wrap(err, "failed to establish WebSocket connection") + } + return ws, nil +} + +func (t *Transport) updateDeadline(ws *websocket.Conn) error { + // Set deadline in case read timeout is the same as write timeout. + if t.readTimeout != 0 && t.writeTimeout == t.readTimeout { + if err := ws.SetDeadline(time.Now().Add(t.readTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection deadline") + } + return nil + } + + // Set read deadline. + if t.readTimeout != 0 { + if err := ws.SetReadDeadline(time.Now().Add(t.readTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection read deadline") + } + } + + // Set write deadline. + if t.writeTimeout != 0 { + if err := ws.SetWriteDeadline(time.Now().Add(t.writeTimeout)); err != nil { + return errors.Wrap(err, "failed to set connection write deadline") + } + } + return nil +} + +var portMap = map[string]string{ + "ws": "80", + "wss": "443", +} + +func toHostPort(u *url.URL) string { + if _, ok := portMap[u.Scheme]; ok { + if _, _, err := net.SplitHostPort(u.Host); err != nil { + return net.JoinHostPort(u.Host, portMap[u.Scheme]) + } + } + return u.Host } diff --git a/transports/websocket/transport_reconnect.go b/transports/websocket/transport_reconnect.go new file mode 100644 index 0000000..a0a00cf --- /dev/null +++ b/transports/websocket/transport_reconnect.go @@ -0,0 +1,218 @@ +package websocket + +import ( + // Stdlib + "io" + "net" + "time" + + // Vendor + "github.com/go-steem/rpc-codec/jsonrpc2" + "github.com/pkg/errors" + "golang.org/x/net/websocket" + "gopkg.in/tomb.v2" +) + +type callRequest struct { + method string + params interface{} + response interface{} + errCh chan<- error +} + +type reconnectingTransport struct { + parent *Transport + + ws *websocket.Conn + client *jsonrpc2.Client + requestCh chan *callRequest + + t *tomb.Tomb +} + +func newReconnectingTransport(parent *Transport) *reconnectingTransport { + cc := &reconnectingTransport{ + parent: parent, + requestCh: make(chan *callRequest), + t: &tomb.Tomb{}, + } + cc.t.Go(cc.worker) + return cc +} + +// Call implements interfaces.CallCloser. +func (t *reconnectingTransport) Call(method string, params, response interface{}) error { + errCh := make(chan error, 1) + select { + case t.requestCh <- &callRequest{method, params, response, errCh}: + return <-errCh + case <-t.t.Dying(): + return ErrClosing + } +} + +// Close implements interfaces.CallCloser. +func (t *reconnectingTransport) Close() error { + t.t.Kill(nil) + return t.t.Wait() +} + +func (t *reconnectingTransport) worker() error { + // Close the monitoring channel when returning. + if ch := t.parent.monitorChan; ch != nil { + defer func() { + close(ch) + }() + } + + // Keep processing incoming call requests until interrupted. + for { + select { + case req := <-t.requestCh: + req.errCh <- t.handleCall(req.method, req.params, req.response) + + case <-t.t.Dying(): + return nil + } + } +} + +func (t *reconnectingTransport) handleCall(method string, params, response interface{}) error { + for { + // Get an RPC client. This blocks until the client is available or Close() is called. + client, err := t.getClient() + if err != nil { + return err + } + + // Update the connection timeout if necessary. + if err := t.parent.updateDeadline(t.ws); err != nil { + return err + } + + // Perform the call. + if err := client.Call(method, params, response); err != nil { + // In case there is a network error, we retry immediately. + if err, ok := asNetworkError(err); ok { + t.dropClient(err) + continue + } + // The connection can be also closed unexpectedly. + // That counts as a network error for us as well. + if err == io.ErrUnexpectedEOF { + t.dropClient(err) + continue + } + // Otherwise we just return the error. + return err + } + + // Done. + return nil + } +} + +func (t *reconnectingTransport) getClient() (*jsonrpc2.Client, error) { + // In case the client is not set, establish a new connection. + if t.client == nil { + ws, err := t.connect() + if err != nil { + return nil, err + } + t.ws = ws + t.client = jsonrpc2.NewClient(ws) + } + + // Return the cached client. + return t.client, nil +} + +func (t *reconnectingTransport) dropClient(err error) { + if t.client != nil { + // Close and drop the client. + t.client.Close() + t.client = nil + t.ws = nil + + // Emit DISCONNECTED. + t.emitEvent(&DisconnectedEvent{ + URL: t.parent.url.String(), + Err: err, + }) + } +} + +func (t *reconnectingTransport) connect() (*websocket.Conn, error) { + // Get a new client. Keep trying to establish a new connection using exponential backoff. + timeout := 1 * time.Second + wait := func() error { + // Wait for the given period. + select { + case <-time.After(timeout): + case <-t.t.Dying(): + return ErrClosing + } + + // Update the timeout value. + timeout = 2 * timeout + if timeout > t.parent.autoReconnectMaxDelay { + timeout = t.parent.autoReconnectMaxDelay + } + return nil + } + + urlString := t.parent.url.String() + + for { + // Emit CONNECTING. + t.emitEvent(&ConnectingEvent{urlString}) + + // Try to establish a new WebSocket connection. + ws, err := t.parent.dial(t.t.Dying()) + if err != nil { + // Handle network errors. + if err, ok := asNetworkError(err); ok { + if err.Timeout() { + // Emit DIAL_TIMEOUT. + t.emitEvent(&DialTimeoutEvent{ + URL: urlString, + Err: err, + Timeout: timeout, + }) + } else { + // Emit DISCONNECTED. + t.emitEvent(&DisconnectedEvent{ + URL: urlString, + Err: err, + }) + } + + // Wait for the given period. + if err := wait(); err != nil { + return nil, err + } + // Try again. + continue + } + + // Otherwise just return the error. + return nil, err + } + + // Connection established. + // Emit CONNECTED and return a new client. + t.emitEvent(&ConnectedEvent{urlString}) + return ws, nil + } +} + +func (t *reconnectingTransport) emitEvent(event interface{}) { + if ch := t.parent.monitorChan; ch != nil { + ch <- event + } +} + +func asNetworkError(err error) (opError *net.OpError, ok bool) { + opError, ok = errors.Cause(err).(*net.OpError) + return +} diff --git a/transports/websocket/transport_simple.go b/transports/websocket/transport_simple.go new file mode 100644 index 0000000..4b207bb --- /dev/null +++ b/transports/websocket/transport_simple.go @@ -0,0 +1,52 @@ +package websocket + +import ( + // Vendor + "github.com/go-steem/rpc-codec/jsonrpc2" + "github.com/pkg/errors" + "golang.org/x/net/websocket" +) + +// simpleTransport is not trying to be particularly clever about network errors. +// In case an error occurs, it is immediately returned and the transport is closed. +type simpleTransport struct { + parent *Transport + + ws *websocket.Conn + client *jsonrpc2.Client +} + +// newSimpleTransport establishes a new WebSocket connection. +// The function blocks until the process is finished. +func newSimpleTransport(parent *Transport) (*simpleTransport, error) { + // Establish the WebSocket connection. + ws, err := parent.dial(nil) + if err != nil { + return nil, err + } + + // Instantiate a JSON-RPC client. + client := jsonrpc2.NewClient(ws) + + // Return a new simple transport. + return &simpleTransport{parent, ws, client}, nil +} + +// Call implements interfaces.CallCloser. +func (t *simpleTransport) Call(method string, params, response interface{}) error { + if err := t.parent.updateDeadline(t.ws); err != nil { + return err + } + if err := t.client.Call(method, params, response); err != nil { + return errors.Wrapf(err, "failed to call %v(%v)", method, params) + } + return nil +} + +// Close implements interfaces.CallCloser. +func (t *simpleTransport) Close() error { + if err := t.client.Close(); err != nil { + return errors.Wrap(err, "failed to close JSON-RPC client") + } + return nil +}