Skip to content

Commit

Permalink
Merge pull request #527 from tablelandnetwork/bcalza/blocknum2
Browse files Browse the repository at this point in the history
changes the behavior of block_num(<chain_id>)
  • Loading branch information
brunocalza authored Apr 12, 2023
2 parents ff9a6ee + 6af6e07 commit 8fe0c8a
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 35 deletions.
12 changes: 9 additions & 3 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
nonceimpl "github.com/textileio/go-tableland/pkg/nonce/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"
"github.com/textileio/go-tableland/pkg/readstatementresolver"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system"

Expand Down Expand Up @@ -81,10 +81,13 @@ func main() {
log.Fatal().Err(err).Msg("creating parser")
}

sm := sharedmemory.NewSharedMemory()

// Chain stacks.
chainStacks, closeChainStacks, err := createChainStacks(
databaseURL,
parser,
sm,
config.Chains,
config.TableConstraints,
config.Analytics.FetchExtraBlockInfo)
Expand All @@ -98,7 +101,7 @@ func main() {
}

for _, stack := range chainStacks {
stack.Store.SetReadResolver(readstatementresolver.New(eps))
stack.Store.SetReadResolver(parsing.NewReadStatementResolver(sm))
}

// HTTP API server.
Expand Down Expand Up @@ -156,6 +159,7 @@ func createChainIDStack(
dbURI string,
executorsDB *sql.DB,
parser parsing.SQLValidator,
sm *sharedmemory.SharedMemory,
tableConstraints TableConstraints,
fetchExtraBlockInfo bool,
) (chains.ChainStack, error) {
Expand Down Expand Up @@ -230,7 +234,7 @@ func createChainIDStack(
eventfeed.WithEventPersistence(config.EventFeed.PersistEvents),
eventfeed.WithFetchExtraBlockInformation(fetchExtraBlockInfo),
}
ef, err := efimpl.New(systemStore, config.ChainID, conn, scAddress, efOpts...)
ef, err := efimpl.New(systemStore, config.ChainID, conn, scAddress, sm, efOpts...)
if err != nil {
return chains.ChainStack{}, fmt.Errorf("creating event feed: %s", err)
}
Expand Down Expand Up @@ -393,6 +397,7 @@ func createParser(queryConstraints QueryConstraints) (parsing.SQLValidator, erro
func createChainStacks(
databaseURL string,
parser parsing.SQLValidator,
sm *sharedmemory.SharedMemory,
chainsConfig []ChainConfig,
tableConstraintsConfig TableConstraints,
fetchExtraBlockInfo bool,
Expand All @@ -419,6 +424,7 @@ func createChainStacks(
databaseURL,
executorsDB,
parser,
sm,
tableConstraintsConfig,
fetchExtraBlockInfo)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/tableland/impl/tableland_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system"

Expand Down Expand Up @@ -773,6 +774,7 @@ func (b *tablelandSetupBuilder) build(t *testing.T) *tablelandSetup {
1337,
backend,
addr,
sharedmemory.NewSharedMemory(),
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)
Expand Down
19 changes: 19 additions & 0 deletions pkg/client/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func TestHealth(t *testing.T) {
require.True(t, healthy)
}

func TestBlockNum(t *testing.T) {
// Our initial simulated blockchain setup already produces two blocks.
calls := setup(t)

// We create a table and do two inserts, that will increase our block number to 5.
tableName := requireCreate(t, calls)
requireReceipt(t, calls, requireWrite(t, calls, tableName), WaitFor(time.Second*10))
requireReceipt(t, calls, requireWrite(t, calls, tableName), WaitFor(time.Second*10))

type result struct {
BlockNumber int64 `json:"bn"`
}

res := []result{}
calls.query(fmt.Sprintf("select block_num(1337) as bn from %s", tableName), &res)
require.Len(t, res, 2) // it should be 2 because we inserted two rows
require.Equal(t, int64(5), res[0].BlockNumber) // the block number should be 5
}

func requireCreate(t *testing.T, calls clientCalls) string {
_, tableName := calls.create("(bar text)", WithPrefix("foo"), WithReceiptTimeout(time.Second*10))
require.Equal(t, "foo_1337_1", tableName)
Expand Down
8 changes: 8 additions & 0 deletions pkg/eventprocessor/eventfeed/impl/eventfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
logger "github.com/rs/zerolog/log"
"github.com/textileio/go-tableland/internal/tableland"
"github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore"
tbleth "github.com/textileio/go-tableland/pkg/tables/impl/ethereum"
"github.com/textileio/go-tableland/pkg/telemetry"
Expand All @@ -43,6 +44,9 @@ type EventFeed struct {
config *eventfeed.Config
maxBlocksFetchSize int

// Shared memory
sm *sharedmemory.SharedMemory

// Metrics
mBaseLabels []attribute.KeyValue
mEventTypeCounter instrument.Int64Counter
Expand All @@ -55,6 +59,7 @@ func New(
chainID tableland.ChainID,
ethClient eventfeed.ChainClient,
scAddress common.Address,
sm *sharedmemory.SharedMemory,
opts ...eventfeed.Option,
) (*EventFeed, error) {
config := eventfeed.DefaultConfig()
Expand All @@ -72,6 +77,7 @@ func New(
Int64("chain_id", int64(chainID)).
Logger()
ef := &EventFeed{
sm: sm,
log: log,
systemStore: systemStore,
chainID: chainID,
Expand Down Expand Up @@ -147,6 +153,8 @@ func (ef *EventFeed) Start(
break
}

ef.sm.SetLastSeenBlockNumber(ef.chainID, toHeight)

if toHeight-fromHeight+1 > int64(ef.maxBlocksFetchSize) {
toHeight = fromHeight + int64(ef.maxBlocksFetchSize) - 1
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/eventprocessor/eventfeed/impl/eventfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/textileio/go-tableland/internal/tableland"
"github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system"
"github.com/textileio/go-tableland/pkg/tables/impl/ethereum"
"github.com/textileio/go-tableland/pkg/tables/impl/testutil"
Expand All @@ -37,6 +38,7 @@ func TestRunSQLEvents(t *testing.T) {
1337,
backend,
addr,
sharedmemory.NewSharedMemory(),
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)
Expand Down Expand Up @@ -118,6 +120,7 @@ func TestAllEvents(t *testing.T) {
1337,
backend,
addr,
sharedmemory.NewSharedMemory(),
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0),
eventfeed.WithEventPersistence(true),
Expand Down Expand Up @@ -303,6 +306,7 @@ func TestInfura(t *testing.T) {
1337,
conn,
rinkebyContractAddr,
sharedmemory.NewSharedMemory(),
eventfeed.WithNewHeadPollFreq(time.Second),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)
Expand Down Expand Up @@ -360,6 +364,7 @@ func TestDuplicateEvents(t *testing.T) {
1337,
backend,
address,
sharedmemory.NewSharedMemory(),
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0),
eventfeed.WithEventPersistence(true),
Expand Down
2 changes: 2 additions & 0 deletions pkg/eventprocessor/impl/eventprocessor_replayhistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system"
"github.com/textileio/go-tableland/tests"
)
Expand Down Expand Up @@ -131,6 +132,7 @@ func spinValidatorStackForChainID(
chainID,
eventBasedBackend,
scAddress,
sharedmemory.NewSharedMemory(),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)

Expand Down
8 changes: 6 additions & 2 deletions pkg/eventprocessor/impl/eventprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
"github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed"
efimpl "github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed/impl"
executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"
rsresolver "github.com/textileio/go-tableland/pkg/readstatementresolver"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system"
"github.com/textileio/go-tableland/pkg/tables"
"github.com/textileio/go-tableland/pkg/tables/impl/testutil"
Expand Down Expand Up @@ -322,13 +323,16 @@ func setup(t *testing.T) (
ex, err := executor.NewExecutor(chainID, db, parser, 0, &aclMock{})
require.NoError(t, err)

sm := sharedmemory.NewSharedMemory()

systemStore, err := system.New(dbURI, tableland.ChainID(chainID))
require.NoError(t, err)
ef, err := efimpl.New(
systemStore,
chainID,
backend,
addr,
sm,
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)
Expand Down Expand Up @@ -376,7 +380,7 @@ func setup(t *testing.T) (
dbURI, 1337)
require.NoError(t, err)

store.SetReadResolver(rsresolver.New(map[tableland.ChainID]eventprocessor.EventProcessor{chainID: ep}))
store.SetReadResolver(parsing.NewReadStatementResolver(sm))

tableReader := func(readQuery string) []int64 {
rq, err := parser.ValidateReadQuery(readQuery)
Expand Down
21 changes: 21 additions & 0 deletions pkg/parsing/readstatementresolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package parsing

import (
"github.com/textileio/go-tableland/internal/tableland"
"github.com/textileio/go-tableland/pkg/sharedmemory"
)

// ReadStatementResolver implements the interface for custom functions resolution of read statements.
type ReadStatementResolver struct {
sm *sharedmemory.SharedMemory
}

// NewReadStatementResolver creates a new ReadStatementResolver.
func NewReadStatementResolver(sm *sharedmemory.SharedMemory) *ReadStatementResolver {
return &ReadStatementResolver{sm: sm}
}

// GetBlockNumber returns the block number for a given chain id.
func (rqr *ReadStatementResolver) GetBlockNumber(chainID int64) (int64, bool) {
return rqr.sm.GetLastSeenBlockNumber(tableland.ChainID(chainID))
}
30 changes: 0 additions & 30 deletions pkg/readstatementresolver/readstatementresolver.go

This file was deleted.

38 changes: 38 additions & 0 deletions pkg/sharedmemory/sharedmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package sharedmemory

import (
"sync"

"github.com/textileio/go-tableland/internal/tableland"
)

// SharedMemory is a in-memory thread-safe data structure to exchange data between the validator and gateway.
type SharedMemory struct {
mu sync.RWMutex
lastSeenBlockNumber map[tableland.ChainID]int64
}

// NewSharedMemory creates new SharedMemory object.
func NewSharedMemory() *SharedMemory {
return &SharedMemory{
lastSeenBlockNumber: make(map[tableland.ChainID]int64),
}
}

// SetLastSeenBlockNumber sets the last seen block number of a specific chain.
func (sm *SharedMemory) SetLastSeenBlockNumber(chainID tableland.ChainID, blockNumber int64) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.lastSeenBlockNumber[chainID] = blockNumber
}

// GetLastSeenBlockNumber get the last seen block number of a specific chain.
func (sm *SharedMemory) GetLastSeenBlockNumber(chainID tableland.ChainID) (int64, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
blockNumber, ok := sm.lastSeenBlockNumber[chainID]
if !ok {
return 0, false
}
return blockNumber, true
}
6 changes: 6 additions & 0 deletions tests/fullstack/fullstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl"
"github.com/textileio/go-tableland/pkg/parsing"
parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl"
"github.com/textileio/go-tableland/pkg/sharedmemory"
"github.com/textileio/go-tableland/pkg/sqlstore"
sqlstoreimplsystem "github.com/textileio/go-tableland/pkg/sqlstore/impl/system"
"github.com/textileio/go-tableland/pkg/tables"
Expand Down Expand Up @@ -95,13 +96,17 @@ func CreateFullStack(t *testing.T, deps Deps) FullStack {

ex, err := executor.NewExecutor(1337, db, parser, 0, acl)
require.NoError(t, err)

sm := sharedmemory.NewSharedMemory()

// Spin up dependencies needed for the EventProcessor.
// i.e: Executor, Parser, and EventFeed (connected to the EVM chain)
ef, err := efimpl.New(
systemStore,
ChainID,
backend,
addr,
sm,
eventfeed.WithNewHeadPollFreq(time.Millisecond),
eventfeed.WithMinBlockDepth(0))
require.NoError(t, err)
Expand All @@ -125,6 +130,7 @@ func CreateFullStack(t *testing.T, deps Deps) FullStack {

stores := make(map[tableland.ChainID]sqlstore.SystemStore, len(chainStacks))
for chainID, stack := range chainStacks {
stack.Store.SetReadResolver(parsing.NewReadStatementResolver(sm))
stores[chainID] = stack.Store
}

Expand Down

0 comments on commit 8fe0c8a

Please sign in to comment.