diff --git a/cmd/api/main.go b/cmd/api/main.go index 1d4c8ff6..6a4cd049 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -35,7 +35,7 @@ import ( nonceimpl "github.com/textileio/go-tableland/pkg/nonce/impl" "github.com/textileio/go-tableland/pkg/parsing" parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl" - "github.com/textileio/go-tableland/pkg/readstatementresolver" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore" "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" @@ -81,10 +81,13 @@ func main() { log.Fatal().Err(err).Msg("creating parser") } + sm := sharedmemory.NewSharedMemory() + // Chain stacks. chainStacks, closeChainStacks, err := createChainStacks( databaseURL, parser, + sm, config.Chains, config.TableConstraints, config.Analytics.FetchExtraBlockInfo) @@ -98,7 +101,7 @@ func main() { } for _, stack := range chainStacks { - stack.Store.SetReadResolver(readstatementresolver.New(eps)) + stack.Store.SetReadResolver(parsing.NewReadStatementResolver(sm)) } // HTTP API server. @@ -156,6 +159,7 @@ func createChainIDStack( dbURI string, executorsDB *sql.DB, parser parsing.SQLValidator, + sm *sharedmemory.SharedMemory, tableConstraints TableConstraints, fetchExtraBlockInfo bool, ) (chains.ChainStack, error) { @@ -230,7 +234,7 @@ func createChainIDStack( eventfeed.WithEventPersistence(config.EventFeed.PersistEvents), eventfeed.WithFetchExtraBlockInformation(fetchExtraBlockInfo), } - ef, err := efimpl.New(systemStore, config.ChainID, conn, scAddress, efOpts...) + ef, err := efimpl.New(systemStore, config.ChainID, conn, scAddress, sm, efOpts...) if err != nil { return chains.ChainStack{}, fmt.Errorf("creating event feed: %s", err) } @@ -393,6 +397,7 @@ func createParser(queryConstraints QueryConstraints) (parsing.SQLValidator, erro func createChainStacks( databaseURL string, parser parsing.SQLValidator, + sm *sharedmemory.SharedMemory, chainsConfig []ChainConfig, tableConstraintsConfig TableConstraints, fetchExtraBlockInfo bool, @@ -419,6 +424,7 @@ func createChainStacks( databaseURL, executorsDB, parser, + sm, tableConstraintsConfig, fetchExtraBlockInfo) if err != nil { diff --git a/internal/tableland/impl/tableland_test.go b/internal/tableland/impl/tableland_test.go index 4c07d9ad..47994476 100644 --- a/internal/tableland/impl/tableland_test.go +++ b/internal/tableland/impl/tableland_test.go @@ -29,6 +29,7 @@ import ( executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl" "github.com/textileio/go-tableland/pkg/parsing" parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore" "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" @@ -773,6 +774,7 @@ func (b *tablelandSetupBuilder) build(t *testing.T) *tablelandSetup { 1337, backend, addr, + sharedmemory.NewSharedMemory(), eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) diff --git a/pkg/client/v1/client_test.go b/pkg/client/v1/client_test.go index 673ed3b2..4f2a6a0f 100644 --- a/pkg/client/v1/client_test.go +++ b/pkg/client/v1/client_test.go @@ -146,6 +146,25 @@ func TestHealth(t *testing.T) { require.True(t, healthy) } +func TestBlockNum(t *testing.T) { + // Our initial simulated blockchain setup already produces two blocks. + calls := setup(t) + + // We create a table and do two inserts, that will increase our block number to 5. + tableName := requireCreate(t, calls) + requireReceipt(t, calls, requireWrite(t, calls, tableName), WaitFor(time.Second*10)) + requireReceipt(t, calls, requireWrite(t, calls, tableName), WaitFor(time.Second*10)) + + type result struct { + BlockNumber int64 `json:"bn"` + } + + res := []result{} + calls.query(fmt.Sprintf("select block_num(1337) as bn from %s", tableName), &res) + require.Len(t, res, 2) // it should be 2 because we inserted two rows + require.Equal(t, int64(5), res[0].BlockNumber) // the block number should be 5 +} + func requireCreate(t *testing.T, calls clientCalls) string { _, tableName := calls.create("(bar text)", WithPrefix("foo"), WithReceiptTimeout(time.Second*10)) require.Equal(t, "foo_1337_1", tableName) diff --git a/pkg/eventprocessor/eventfeed/impl/eventfeed.go b/pkg/eventprocessor/eventfeed/impl/eventfeed.go index 332d7598..1d26d11e 100644 --- a/pkg/eventprocessor/eventfeed/impl/eventfeed.go +++ b/pkg/eventprocessor/eventfeed/impl/eventfeed.go @@ -20,6 +20,7 @@ import ( logger "github.com/rs/zerolog/log" "github.com/textileio/go-tableland/internal/tableland" "github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore" tbleth "github.com/textileio/go-tableland/pkg/tables/impl/ethereum" "github.com/textileio/go-tableland/pkg/telemetry" @@ -43,6 +44,9 @@ type EventFeed struct { config *eventfeed.Config maxBlocksFetchSize int + // Shared memory + sm *sharedmemory.SharedMemory + // Metrics mBaseLabels []attribute.KeyValue mEventTypeCounter instrument.Int64Counter @@ -55,6 +59,7 @@ func New( chainID tableland.ChainID, ethClient eventfeed.ChainClient, scAddress common.Address, + sm *sharedmemory.SharedMemory, opts ...eventfeed.Option, ) (*EventFeed, error) { config := eventfeed.DefaultConfig() @@ -72,6 +77,7 @@ func New( Int64("chain_id", int64(chainID)). Logger() ef := &EventFeed{ + sm: sm, log: log, systemStore: systemStore, chainID: chainID, @@ -147,6 +153,8 @@ func (ef *EventFeed) Start( break } + ef.sm.SetLastSeenBlockNumber(ef.chainID, toHeight) + if toHeight-fromHeight+1 > int64(ef.maxBlocksFetchSize) { toHeight = fromHeight + int64(ef.maxBlocksFetchSize) - 1 } diff --git a/pkg/eventprocessor/eventfeed/impl/eventfeed_test.go b/pkg/eventprocessor/eventfeed/impl/eventfeed_test.go index 0216b1ce..a78eaf71 100644 --- a/pkg/eventprocessor/eventfeed/impl/eventfeed_test.go +++ b/pkg/eventprocessor/eventfeed/impl/eventfeed_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/textileio/go-tableland/internal/tableland" "github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" "github.com/textileio/go-tableland/pkg/tables/impl/ethereum" "github.com/textileio/go-tableland/pkg/tables/impl/testutil" @@ -37,6 +38,7 @@ func TestRunSQLEvents(t *testing.T) { 1337, backend, addr, + sharedmemory.NewSharedMemory(), eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) @@ -118,6 +120,7 @@ func TestAllEvents(t *testing.T) { 1337, backend, addr, + sharedmemory.NewSharedMemory(), eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0), eventfeed.WithEventPersistence(true), @@ -303,6 +306,7 @@ func TestInfura(t *testing.T) { 1337, conn, rinkebyContractAddr, + sharedmemory.NewSharedMemory(), eventfeed.WithNewHeadPollFreq(time.Second), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) @@ -360,6 +364,7 @@ func TestDuplicateEvents(t *testing.T) { 1337, backend, address, + sharedmemory.NewSharedMemory(), eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0), eventfeed.WithEventPersistence(true), diff --git a/pkg/eventprocessor/impl/eventprocessor_replayhistory_test.go b/pkg/eventprocessor/impl/eventprocessor_replayhistory_test.go index da5ab1a9..d980749a 100644 --- a/pkg/eventprocessor/impl/eventprocessor_replayhistory_test.go +++ b/pkg/eventprocessor/impl/eventprocessor_replayhistory_test.go @@ -23,6 +23,7 @@ import ( executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl" "github.com/textileio/go-tableland/pkg/parsing" parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" "github.com/textileio/go-tableland/tests" ) @@ -131,6 +132,7 @@ func spinValidatorStackForChainID( chainID, eventBasedBackend, scAddress, + sharedmemory.NewSharedMemory(), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) diff --git a/pkg/eventprocessor/impl/eventprocessor_test.go b/pkg/eventprocessor/impl/eventprocessor_test.go index d4d3b5c3..42704e61 100644 --- a/pkg/eventprocessor/impl/eventprocessor_test.go +++ b/pkg/eventprocessor/impl/eventprocessor_test.go @@ -15,8 +15,9 @@ import ( "github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed" efimpl "github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed/impl" executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl" + "github.com/textileio/go-tableland/pkg/parsing" parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl" - rsresolver "github.com/textileio/go-tableland/pkg/readstatementresolver" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" "github.com/textileio/go-tableland/pkg/tables" "github.com/textileio/go-tableland/pkg/tables/impl/testutil" @@ -322,6 +323,8 @@ func setup(t *testing.T) ( ex, err := executor.NewExecutor(chainID, db, parser, 0, &aclMock{}) require.NoError(t, err) + sm := sharedmemory.NewSharedMemory() + systemStore, err := system.New(dbURI, tableland.ChainID(chainID)) require.NoError(t, err) ef, err := efimpl.New( @@ -329,6 +332,7 @@ func setup(t *testing.T) ( chainID, backend, addr, + sm, eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) @@ -376,7 +380,7 @@ func setup(t *testing.T) ( dbURI, 1337) require.NoError(t, err) - store.SetReadResolver(rsresolver.New(map[tableland.ChainID]eventprocessor.EventProcessor{chainID: ep})) + store.SetReadResolver(parsing.NewReadStatementResolver(sm)) tableReader := func(readQuery string) []int64 { rq, err := parser.ValidateReadQuery(readQuery) diff --git a/pkg/parsing/readstatementresolver.go b/pkg/parsing/readstatementresolver.go new file mode 100644 index 00000000..8cf885b7 --- /dev/null +++ b/pkg/parsing/readstatementresolver.go @@ -0,0 +1,21 @@ +package parsing + +import ( + "github.com/textileio/go-tableland/internal/tableland" + "github.com/textileio/go-tableland/pkg/sharedmemory" +) + +// ReadStatementResolver implements the interface for custom functions resolution of read statements. +type ReadStatementResolver struct { + sm *sharedmemory.SharedMemory +} + +// NewReadStatementResolver creates a new ReadStatementResolver. +func NewReadStatementResolver(sm *sharedmemory.SharedMemory) *ReadStatementResolver { + return &ReadStatementResolver{sm: sm} +} + +// GetBlockNumber returns the block number for a given chain id. +func (rqr *ReadStatementResolver) GetBlockNumber(chainID int64) (int64, bool) { + return rqr.sm.GetLastSeenBlockNumber(tableland.ChainID(chainID)) +} diff --git a/pkg/readstatementresolver/readstatementresolver.go b/pkg/readstatementresolver/readstatementresolver.go deleted file mode 100644 index 95cc59ab..00000000 --- a/pkg/readstatementresolver/readstatementresolver.go +++ /dev/null @@ -1,30 +0,0 @@ -package readstatementresolver - -import ( - "github.com/textileio/go-tableland/internal/tableland" - "github.com/textileio/go-tableland/pkg/eventprocessor" -) - -// ReadStatementResolver implements the interface for custom functions resolution of read statements. -type ReadStatementResolver struct { - leb map[tableland.ChainID]func() int64 -} - -// New creates a new ReadStatementResolver. -func New(chainStacks map[tableland.ChainID]eventprocessor.EventProcessor) *ReadStatementResolver { - leb := make(map[tableland.ChainID]func() int64, len(chainStacks)) - for chainID, ep := range chainStacks { - leb[chainID] = ep.GetLastExecutedBlockNumber - } - return &ReadStatementResolver{leb: leb} -} - -// GetBlockNumber returns the block number for a given chain id. -func (rqr *ReadStatementResolver) GetBlockNumber(chainID int64) (int64, bool) { - r, ok := rqr.leb[tableland.ChainID(chainID)] - if !ok { - return 0, false - } - - return r(), true -} diff --git a/pkg/sharedmemory/sharedmemory.go b/pkg/sharedmemory/sharedmemory.go new file mode 100644 index 00000000..0dcb27d3 --- /dev/null +++ b/pkg/sharedmemory/sharedmemory.go @@ -0,0 +1,38 @@ +package sharedmemory + +import ( + "sync" + + "github.com/textileio/go-tableland/internal/tableland" +) + +// SharedMemory is a in-memory thread-safe data structure to exchange data between the validator and gateway. +type SharedMemory struct { + mu sync.RWMutex + lastSeenBlockNumber map[tableland.ChainID]int64 +} + +// NewSharedMemory creates new SharedMemory object. +func NewSharedMemory() *SharedMemory { + return &SharedMemory{ + lastSeenBlockNumber: make(map[tableland.ChainID]int64), + } +} + +// SetLastSeenBlockNumber sets the last seen block number of a specific chain. +func (sm *SharedMemory) SetLastSeenBlockNumber(chainID tableland.ChainID, blockNumber int64) { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.lastSeenBlockNumber[chainID] = blockNumber +} + +// GetLastSeenBlockNumber get the last seen block number of a specific chain. +func (sm *SharedMemory) GetLastSeenBlockNumber(chainID tableland.ChainID) (int64, bool) { + sm.mu.RLock() + defer sm.mu.RUnlock() + blockNumber, ok := sm.lastSeenBlockNumber[chainID] + if !ok { + return 0, false + } + return blockNumber, true +} diff --git a/tests/fullstack/fullstack.go b/tests/fullstack/fullstack.go index dd826b28..0f7a633f 100644 --- a/tests/fullstack/fullstack.go +++ b/tests/fullstack/fullstack.go @@ -24,6 +24,7 @@ import ( executor "github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor/impl" "github.com/textileio/go-tableland/pkg/parsing" parserimpl "github.com/textileio/go-tableland/pkg/parsing/impl" + "github.com/textileio/go-tableland/pkg/sharedmemory" "github.com/textileio/go-tableland/pkg/sqlstore" sqlstoreimplsystem "github.com/textileio/go-tableland/pkg/sqlstore/impl/system" "github.com/textileio/go-tableland/pkg/tables" @@ -95,6 +96,9 @@ func CreateFullStack(t *testing.T, deps Deps) FullStack { ex, err := executor.NewExecutor(1337, db, parser, 0, acl) require.NoError(t, err) + + sm := sharedmemory.NewSharedMemory() + // Spin up dependencies needed for the EventProcessor. // i.e: Executor, Parser, and EventFeed (connected to the EVM chain) ef, err := efimpl.New( @@ -102,6 +106,7 @@ func CreateFullStack(t *testing.T, deps Deps) FullStack { ChainID, backend, addr, + sm, eventfeed.WithNewHeadPollFreq(time.Millisecond), eventfeed.WithMinBlockDepth(0)) require.NoError(t, err) @@ -125,6 +130,7 @@ func CreateFullStack(t *testing.T, deps Deps) FullStack { stores := make(map[tableland.ChainID]sqlstore.SystemStore, len(chainStacks)) for chainID, stack := range chainStacks { + stack.Store.SetReadResolver(parsing.NewReadStatementResolver(sm)) stores[chainID] = stack.Store }