Skip to content

Commit

Permalink
Merge pull request #569 from tablelandnetwork/staging
Browse files Browse the repository at this point in the history
Prepare Release 1.4.0
  • Loading branch information
brunocalza authored May 16, 2023
2 parents 75291ba + eb099b2 commit e08793f
Show file tree
Hide file tree
Showing 14 changed files with 361 additions and 313 deletions.
8 changes: 0 additions & 8 deletions cmd/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ type ChainConfig struct {
EthEndpoint string `default:"eth_endpoint"`
ContractAddress string `default:"contract_address"`
}
Signer struct {
PrivateKey string `default:""`
}
EventFeed struct {
ChainAPIBackoff string `default:"15s"`
MinBlockDepth int `default:"5"`
Expand All @@ -115,11 +112,6 @@ type ChainConfig struct {
BlockFailedExecutionBackoff string `default:"10s"`
DedupExecutedTxns bool `default:"false"`
}
NonceTracker struct {
CheckInterval string `default:"10s"`
StuckInterval string `default:"10m"`
MinBlockDepth int `default:"5"`
}
HashCalculationStep int64 `default:"1000"`
}

Expand Down
56 changes: 11 additions & 45 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl"
"github.com/textileio/go-tableland/pkg/logging"
"github.com/textileio/go-tableland/pkg/metrics"
nonceimpl "github.com/textileio/go-tableland/pkg/nonce/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"

Expand All @@ -47,7 +46,6 @@ import (
"github.com/textileio/go-tableland/pkg/telemetry/chainscollector"
"github.com/textileio/go-tableland/pkg/telemetry/publisher"
"github.com/textileio/go-tableland/pkg/telemetry/storage"
"github.com/textileio/go-tableland/pkg/wallet"
)

type moduleCloser func(ctx context.Context) error
Expand Down Expand Up @@ -166,48 +164,6 @@ func createChainIDStack(
tableConstraints TableConstraints,
fetchExtraBlockInfo bool,
) (chains.ChainStack, error) {
conn, err := ethclient.Dial(config.Registry.EthEndpoint)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("failed to connect to ethereum endpoint: %s", err)
}

wallet, err := wallet.NewWallet(config.Signer.PrivateKey)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("failed to create wallet: %s", err)
}
log.Info().
Int64("chain_id", int64(config.ChainID)).
Str("wallet", wallet.Address().String()).
Msg("wallet public address")

checkInterval, err := time.ParseDuration(config.NonceTracker.CheckInterval)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("parsing nonce tracker check interval duration: %s", err)
}
stuckInterval, err := time.ParseDuration(config.NonceTracker.StuckInterval)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("parsing nonce tracker stuck interval duration: %s", err)
}
ctxLocalTracker, clsLocalTracker := context.WithTimeout(context.Background(), time.Second*15)
defer clsLocalTracker()
tracker, err := nonceimpl.NewLocalTracker(
ctxLocalTracker,
wallet,
nonceimpl.NewNonceStore(db),
config.ChainID,
conn,
checkInterval,
config.NonceTracker.MinBlockDepth,
stuckInterval,
)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("failed to create new tracker: %s", err)
}

ex, err := executor.NewExecutor(config.ChainID, db, parser, tableConstraints.MaxRowCount, impl.NewACL(db))
if err != nil {
return chains.ChainStack{}, fmt.Errorf("creating txn processor: %s", err)
}
chainAPIBackoff, err := time.ParseDuration(config.EventFeed.ChainAPIBackoff)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("parsing chain api backoff duration: %s", err)
Expand All @@ -229,6 +185,11 @@ func createChainIDStack(
return chains.ChainStack{}, fmt.Errorf("creating event feed store: %s", err)
}

conn, err := ethclient.Dial(config.Registry.EthEndpoint)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("failed to connect to ethereum endpoint: %s", err)
}

ef, err := efimpl.New(
eventFeedStore,
config.ChainID,
Expand All @@ -249,6 +210,12 @@ func createChainIDStack(
eventprocessor.WithDedupExecutedTxns(config.EventProcessor.DedupExecutedTxns),
eventprocessor.WithHashCalcStep(config.HashCalculationStep),
}

ex, err := executor.NewExecutor(config.ChainID, db, parser, tableConstraints.MaxRowCount, impl.NewACL(db))
if err != nil {
return chains.ChainStack{}, fmt.Errorf("creating txn processor: %s", err)
}

ep, err := epimpl.New(parser, ex, ef, config.ChainID, epOpts...)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("creating event processor: %s", err)
Expand All @@ -263,7 +230,6 @@ func createChainIDStack(
defer log.Info().Int64("chain_id", int64(config.ChainID)).Msg("stack closed")

ep.Stop()
tracker.Close()
conn.Close()
return nil
},
Expand Down
183 changes: 183 additions & 0 deletions cmd/healthbot/balancetracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package main

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/rs/zerolog"
logger "github.com/rs/zerolog/log"
"github.com/textileio/go-tableland/pkg/client"
"github.com/textileio/go-tableland/pkg/metrics"
"github.com/textileio/go-tableland/pkg/wallet"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
)

// BalanceTracker tracks the balance of a given wallet and produces metrics.
type BalanceTracker struct {
checkInterval time.Duration
wallet *wallet.Wallet
ethClient *ethclient.Client

log zerolog.Logger

mu sync.Mutex

// metrics
mBaseLabels []attribute.KeyValue
currWeiBalance int64
ethClientUnhealthy int64
}

// NewBalanceTracker returns a *BalanceTracker.
func NewBalanceTracker(
config ChainConfig,
wallet *wallet.Wallet,
checkInterval time.Duration,
) (*BalanceTracker, error) {
log := logger.With().
Str("component", "healthbot").
Int("chain_id", config.ChainID).
Logger()

client, err := getEthClient(config)
if err != nil {
return nil, fmt.Errorf("initializing eth client: %s", err)
}

cp := &BalanceTracker{
log: log,
checkInterval: checkInterval,
ethClient: client,
wallet: wallet,
}
if err := cp.initMetrics(config.ChainID, wallet.Address()); err != nil {
return nil, fmt.Errorf("initializing metrics: %s", err)
}

return cp, nil
}

// Run runs the probe until the provided ctx is canceled.
func (t *BalanceTracker) Run(ctx context.Context) {
t.log.Info().Msg("starting balance tracker...")

if err := t.checkBalance(ctx); err != nil {
t.log.Error().Err(err).Msg("check balance failed")
}

checkInterval := t.checkInterval
for {
select {
case <-ctx.Done():
t.log.Info().Msg("closing gracefully...")
return
case <-time.After(checkInterval):
if err := t.checkBalance(ctx); err != nil {
t.log.Error().Err(err).Msg("check balance failed")
checkInterval = time.Minute
} else {
checkInterval = t.checkInterval
}
}
}
}

func (t *BalanceTracker) checkBalance(ctx context.Context) error {
ctx, cls := context.WithTimeout(ctx, time.Second*15)
defer cls()
weiBalance, err := t.ethClient.BalanceAt(ctx, t.wallet.Address(), nil)
if err != nil {
t.mu.Lock()
t.ethClientUnhealthy++
t.mu.Unlock()
return fmt.Errorf("get balance: %s", err)
}

t.log.Info().
Str("balance", weiBalance.String()).
Str("address", t.wallet.Address().Hex()).
Msg("check balance")

s := weiBalance.String()
var gWeiBalance int64
if len(s) > 9 {
gWeiBalance, err = strconv.ParseInt(s[:len(s)-9], 10, 64)
if err != nil {
return fmt.Errorf("converting wei to gwei: %s", err)
}
}

t.mu.Lock()
t.currWeiBalance = gWeiBalance
t.ethClientUnhealthy = 0
t.mu.Unlock()

return nil
}

func (t *BalanceTracker) initMetrics(chainID int, addr common.Address) error {
meter := global.MeterProvider().Meter("tableland")
t.mBaseLabels = append([]attribute.KeyValue{
attribute.Int("chain_id", chainID),
attribute.String("wallet_address", addr.String()),
}, metrics.BaseAttrs...)

mBalance, err := meter.Int64ObservableGauge("tableland.wallettracker.balance.wei")
if err != nil {
return fmt.Errorf("creating balance metric: %s", err)
}

mEthClientUnhealthy, err := meter.Int64ObservableGauge("tableland.wallettracker.eth.client.unhealthy")
if err != nil {
return fmt.Errorf("creating eth client unhealthy metric: %s", err)
}

if _, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
t.mu.Lock()
defer t.mu.Unlock()
o.ObserveInt64(mBalance, t.currWeiBalance, t.mBaseLabels...)
o.ObserveInt64(mEthClientUnhealthy, t.ethClientUnhealthy, t.mBaseLabels...)

return nil
}, []instrument.Asynchronous{
mBalance,
mEthClientUnhealthy,
}...); err != nil {
return fmt.Errorf("registering async metric callback: %s", err)
}

return nil
}

func getEthClient(config ChainConfig) (*ethclient.Client, error) {
var url, key string
var ok bool
if config.ChainID == 3141 {
url, ok = client.AnkrURLs[client.ChainID(config.ChainID)]
key = config.AnkrAPIKey
} else {
url, ok = client.AlchemyURLs[client.ChainID(config.ChainID)]
key = config.AlchemyAPIKey
}

if !ok {
return nil, errors.New("chain provider not supported")
}

conn, err := ethclient.Dial(fmt.Sprintf(url, key))
if err != nil {
return nil, fmt.Errorf("dial: %s", err)
}

return conn, nil
}
16 changes: 15 additions & 1 deletion cmd/healthbot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,25 @@ func main() {
log.Fatal().Err(err).Msg("initializing counter-probe")
}

wg.Add(1)
balanceTracker, err := NewBalanceTracker(
chainCfg,
wallet,
15*time.Second,
)
if err != nil {
log.Fatal().Err(err).Msg("initializing balance tracker")
}

wg.Add(2)
go func() {
defer wg.Done()
cp.Run(ctx)
}()

go func() {
defer wg.Done()
balanceTracker.Run(ctx)
}()
}
wg.Wait()
log.Info().Msg("daemon closed")
Expand Down
6 changes: 2 additions & 4 deletions docker/deployed/mainnet/api/.env_validator.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
VALIDATOR_ALCHEMY_ARBITRUM_MAINNET_API_KEY=
VALIDATOR_ARBITRUM_MAINNET_SIGNER_PRIVATE_KEY=
VALIDATOR_ALCHEMY_ETHEREUM_MAINNET_API_KEY=
VALIDATOR_ETHEREUM_MAINNET_SIGNER_PRIVATE_KEY=
VALIDATOR_ALCHEMY_POLYGON_MAINNET_API_KEY=
VALIDATOR_POLYGON_MAINNET_SIGNER_PRIVATE_KEY=
VALIDATOR_ALCHEMY_OPTIMISM_MAINNET_API_KEY=
VALIDATOR_OPTIMISM_MAINNET_SIGNER_PRIVATE_KEY=
VALIDATOR_ANKR_FILECOIN_MAINNET_API_KEY=
VALIDATOR_QUICKNODE_ARBITRUM_NOVA_MAINNET_API_KEY=
METRICS_HUB_API_KEY=
Loading

0 comments on commit e08793f

Please sign in to comment.