From 95a34c165a56da4b52df0670d726e211f197cbb1 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:09:34 -0700 Subject: [PATCH 01/11] refactor: Clarify database/schema/table results --- results.go | 18 +++++++++++++----- verify.go | 16 ++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/results.go b/results.go index a232cd5c..597def8c 100644 --- a/results.go +++ b/results.go @@ -38,16 +38,24 @@ func NewResults(targetNames []string, testModes []string) *Results { } } -// SingleResult represents the verification result from a single target, with the schema: -// SingleResult[schema][table][mode] = test output. -type SingleResult map[string]map[string]map[string]string +// DatabaseResult represents the verification result from a single target database: +// DatabaseResult[schema][table][mode] = test output. +type DatabaseResult map[string]SchemaResult + +// SchemaResult represents the verification result from a single schema: +// SchemaResult[table][mode] = test output. +type SchemaResult map[string]TableResult + +// TableResult represents the verification result from a single table: +// TableResult[mode] = test output. +type TableResult map[string]string // AddResult adds a SingleResult from a test on a specific target to the Results object. -func (r *Results) AddResult(targetName string, schemaTableHashes SingleResult) { +func (r *Results) AddResult(targetName string, databaseHashes DatabaseResult) { r.mutex.Lock() defer r.mutex.Unlock() - for schema, tables := range schemaTableHashes { + for schema, tables := range databaseHashes { if _, ok := r.content[schema]; !ok { r.content[schema] = make(map[string]map[string]map[string][]string) } diff --git a/verify.go b/verify.go index 84d32eb1..1c3db92f 100644 --- a/verify.go +++ b/verify.go @@ -104,8 +104,8 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p close(done) } -func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (SingleResult, error) { - schemaTableHashes := make(SingleResult) +func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (DatabaseResult, error) { + schemaTableHashes := make(DatabaseResult) rows, err := conn.Query(ctx, buildGetTablesQuery(c.IncludeSchemas, c.ExcludeSchemas, c.IncludeTables, c.ExcludeTables)) if err != nil { @@ -119,10 +119,10 @@ func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (Sing } if _, ok := schemaTableHashes[schema.String]; !ok { - schemaTableHashes[schema.String] = make(map[string]map[string]string) + schemaTableHashes[schema.String] = make(SchemaResult) } - schemaTableHashes[schema.String][table.String] = make(map[string]string) + schemaTableHashes[schema.String][table.String] = make(TableResult) for _, testMode := range c.TestModes { schemaTableHashes[schema.String][table.String][testMode] = defaultErrorOutput @@ -152,8 +152,8 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaTableHashes SingleResult) SingleResult { - for schemaName, tables := range schemaTableHashes { +func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { + for schemaName, tables := range databaseHashes { for tableName := range tables { tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -236,13 +236,13 @@ func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry continue } - schemaTableHashes[schemaName][tableName][testMode] = testOutput + databaseHashes[schemaName][tableName][testMode] = testOutput testLogger.Infof("Hash computed: %s", testOutput) } } } - return schemaTableHashes + return databaseHashes } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 89efdb738103eaac3c8315318e106109cb47ec1a Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:15:48 -0700 Subject: [PATCH 02/11] refactor: Separate database, schema verification --- verify.go | 141 +++++++++++++++++++++++++++++------------------------- 1 file changed, 75 insertions(+), 66 deletions(-) diff --git a/verify.go b/verify.go index 1c3db92f..b42935ef 100644 --- a/verify.go +++ b/verify.go @@ -97,7 +97,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - schemaTableHashes = c.runTestQueriesOnTarget(ctx, logger, conn, schemaTableHashes) + schemaTableHashes = c.runTestQueriesOnDatabase(ctx, logger, conn, schemaTableHashes) finalResults.AddResult(targetName, schemaTableHashes) logger.Info("Table hashes computed") @@ -152,97 +152,106 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { - for schemaName, tables := range databaseHashes { - for tableName := range tables { - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) - tableLogger.Info("Computing hash") +func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { + databaseResults := make(DatabaseResult) - rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) - if err != nil { - tableLogger.WithError(err).Error("Failed to query column names, data types") - - continue - } + for schemaName, schemaHashes := range databaseHashes { + schemaResult := c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, schemaHashes) + databaseResults[schemaName] = schemaResult + } - allTableColumns := make(map[string]column) + return databaseResults +} - for rows.Next() { - var columnName, dataType, constraintName, constraintType pgtype.Text +func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { + for tableName := range schemaHashes { + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) + tableLogger.Info("Computing hash") - err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) - if err != nil { - tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) + if err != nil { + tableLogger.WithError(err).Error("Failed to query column names, data types") - continue - } + continue + } - existing, ok := allTableColumns[columnName.String] - if ok { - existing.constraints = append(existing.constraints, constraintType.String) - allTableColumns[columnName.String] = existing - } else { - allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} - } - } + allTableColumns := make(map[string]column) - var tableColumns []column + for rows.Next() { + var columnName, dataType, constraintName, constraintType pgtype.Text - var primaryKeyColumnNames []string + err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) + if err != nil { + tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") - for _, col := range allTableColumns { - if col.IsPrimaryKey() { - primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) - } + continue + } - if c.validColumnTarget(col.name) { - tableColumns = append(tableColumns, col) - } + existing, ok := allTableColumns[columnName.String] + if ok { + existing.constraints = append(existing.constraints, constraintType.String) + allTableColumns[columnName.String] = existing + } else { + allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} } + } - if len(primaryKeyColumnNames) == 0 { - tableLogger.Error("No primary keys found") + var tableColumns []column - continue + var primaryKeyColumnNames []string + + for _, col := range allTableColumns { + if col.IsPrimaryKey() { + primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) } - tableLogger.WithFields(logrus.Fields{ - "primary_keys": primaryKeyColumnNames, - "columns": tableColumns, - }).Info("Determined columns to hash") + if c.validColumnTarget(col.name) { + tableColumns = append(tableColumns, col) + } + } - for _, testMode := range c.TestModes { - testLogger := tableLogger.WithField("test", testMode) + if len(primaryKeyColumnNames) == 0 { + tableLogger.Error("No primary keys found") - var query string + continue + } - switch testMode { - case TestModeFull: - query = buildFullHashQuery(c, schemaName, tableName, tableColumns) - case TestModeBookend: - query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) - case TestModeSparse: - query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) - case TestModeRowCount: - query = buildRowCountQuery(schemaName, tableName) - } + tableLogger.WithFields(logrus.Fields{ + "primary_keys": primaryKeyColumnNames, + "columns": tableColumns, + }).Info("Determined columns to hash") - testLogger.Debugf("Generated query: %s", query) + for _, testMode := range c.TestModes { + testLogger := tableLogger.WithField("test", testMode) + + var query string + + switch testMode { + case TestModeFull: + query = buildFullHashQuery(c, schemaName, tableName, tableColumns) + case TestModeBookend: + query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) + case TestModeSparse: + query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) + case TestModeRowCount: + query = buildRowCountQuery(schemaName, tableName) + } - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") + testLogger.Debugf("Generated query: %s", query) - continue - } + testOutput, err := runTestOnTable(ctx, conn, query) + if err != nil { + testLogger.WithError(err).Error("Failed to compute hash") - databaseHashes[schemaName][tableName][testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + continue } + + schemaHashes[tableName][testMode] = testOutput + testLogger.Infof("Hash computed: %s", testOutput) } } - return databaseHashes + return schemaHashes } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 81258f908f2c9b6be95c9879412a51052a6b19ad Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:21:23 -0700 Subject: [PATCH 03/11] refactor: Separate schema, table verification --- verify.go | 139 +++++++++++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 63 deletions(-) diff --git a/verify.go b/verify.go index b42935ef..187652d2 100644 --- a/verify.go +++ b/verify.go @@ -164,94 +164,107 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { + schemaResults := make(SchemaResult) + for tableName := range schemaHashes { - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) - tableLogger.Info("Computing hash") + tableResult := c.runTestQueriesOnTable(ctx, logger, conn, schemaName, tableName) + if tableResult != nil { + schemaResults[tableName] = tableResult + } + } - rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) - if err != nil { - tableLogger.WithError(err).Error("Failed to query column names, data types") + return schemaResults +} - continue - } +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { + tableResults := make(TableResult) + + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) + tableLogger.Info("Computing hash") - allTableColumns := make(map[string]column) + rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) + if err != nil { + tableLogger.WithError(err).Error("Failed to query column names, data types") - for rows.Next() { - var columnName, dataType, constraintName, constraintType pgtype.Text + return nil + } - err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) - if err != nil { - tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + allTableColumns := make(map[string]column) - continue - } + for rows.Next() { + var columnName, dataType, constraintName, constraintType pgtype.Text - existing, ok := allTableColumns[columnName.String] - if ok { - existing.constraints = append(existing.constraints, constraintType.String) - allTableColumns[columnName.String] = existing - } else { - allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} - } + err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) + if err != nil { + tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + + continue } - var tableColumns []column + existing, ok := allTableColumns[columnName.String] + if ok { + existing.constraints = append(existing.constraints, constraintType.String) + allTableColumns[columnName.String] = existing + } else { + allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} + } + } - var primaryKeyColumnNames []string + var tableColumns []column - for _, col := range allTableColumns { - if col.IsPrimaryKey() { - primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) - } + var primaryKeyColumnNames []string - if c.validColumnTarget(col.name) { - tableColumns = append(tableColumns, col) - } + for _, col := range allTableColumns { + if col.IsPrimaryKey() { + primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) } - if len(primaryKeyColumnNames) == 0 { - tableLogger.Error("No primary keys found") - - continue + if c.validColumnTarget(col.name) { + tableColumns = append(tableColumns, col) } + } - tableLogger.WithFields(logrus.Fields{ - "primary_keys": primaryKeyColumnNames, - "columns": tableColumns, - }).Info("Determined columns to hash") + if len(primaryKeyColumnNames) == 0 { + tableLogger.Error("No primary keys found") - for _, testMode := range c.TestModes { - testLogger := tableLogger.WithField("test", testMode) - - var query string - - switch testMode { - case TestModeFull: - query = buildFullHashQuery(c, schemaName, tableName, tableColumns) - case TestModeBookend: - query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) - case TestModeSparse: - query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) - case TestModeRowCount: - query = buildRowCountQuery(schemaName, tableName) - } + return nil + } - testLogger.Debugf("Generated query: %s", query) + tableLogger.WithFields(logrus.Fields{ + "primary_keys": primaryKeyColumnNames, + "columns": tableColumns, + }).Info("Determined columns to hash") + + for _, testMode := range c.TestModes { + testLogger := tableLogger.WithField("test", testMode) + + var query string + + switch testMode { + case TestModeFull: + query = buildFullHashQuery(c, schemaName, tableName, tableColumns) + case TestModeBookend: + query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) + case TestModeSparse: + query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) + case TestModeRowCount: + query = buildRowCountQuery(schemaName, tableName) + } - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") + testLogger.Debugf("Generated query: %s", query) - continue - } + testOutput, err := runTestOnTable(ctx, conn, query) + if err != nil { + testLogger.WithError(err).Error("Failed to compute hash") - schemaHashes[tableName][testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + continue } + + tableResults[testMode] = testOutput + testLogger.Infof("Hash computed: %s", testOutput) } - return schemaHashes + return tableResults } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 5cb57bbc26dfb4a649f257c81b99b255bcbda378 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:30:28 -0700 Subject: [PATCH 04/11] refactor: Add results per-database --- verify.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/verify.go b/verify.go index 187652d2..7843f985 100644 --- a/verify.go +++ b/verify.go @@ -97,9 +97,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - schemaTableHashes = c.runTestQueriesOnDatabase(ctx, logger, conn, schemaTableHashes) - - finalResults.AddResult(targetName, schemaTableHashes) + c.runTestQueriesOnDatabase(ctx, logger, conn, targetName, schemaTableHashes, finalResults) logger.Info("Table hashes computed") close(done) } @@ -152,7 +150,7 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { +func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { databaseResults := make(DatabaseResult) for schemaName, schemaHashes := range databaseHashes { @@ -160,7 +158,7 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent databaseResults[schemaName] = schemaResult } - return databaseResults + finalResults.AddResult(targetName, databaseResults) } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { From 5627e6bb2538b18f56fb19c82a20c90c5c4ea807 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:32:41 -0700 Subject: [PATCH 05/11] refactor: Add results per-schema --- verify.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/verify.go b/verify.go index 7843f985..711ce069 100644 --- a/verify.go +++ b/verify.go @@ -151,17 +151,12 @@ func (c Config) validColumnTarget(columnName string) bool { } func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { - databaseResults := make(DatabaseResult) - for schemaName, schemaHashes := range databaseHashes { - schemaResult := c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, schemaHashes) - databaseResults[schemaName] = schemaResult + c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, targetName, schemaHashes, finalResults) } - - finalResults.AddResult(targetName, databaseResults) } -func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { +func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { schemaResults := make(SchemaResult) for tableName := range schemaHashes { @@ -171,7 +166,9 @@ func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry } } - return schemaResults + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = schemaResults + finalResults.AddResult(targetName, databaseResults) } func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { From b72d24e5293a370abcfd9491ea6d18658675e7ea Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:35:29 -0700 Subject: [PATCH 06/11] refactor: Add results per-table --- verify.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/verify.go b/verify.go index 711ce069..ba094606 100644 --- a/verify.go +++ b/verify.go @@ -157,21 +157,12 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { - schemaResults := make(SchemaResult) - for tableName := range schemaHashes { - tableResult := c.runTestQueriesOnTable(ctx, logger, conn, schemaName, tableName) - if tableResult != nil { - schemaResults[tableName] = tableResult - } + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) } - - databaseResults := make(DatabaseResult) - databaseResults[schemaName] = schemaResults - finalResults.AddResult(targetName, databaseResults) } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { tableResults := make(TableResult) tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) @@ -181,7 +172,7 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, if err != nil { tableLogger.WithError(err).Error("Failed to query column names, data types") - return nil + return } allTableColumns := make(map[string]column) @@ -222,7 +213,7 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, if len(primaryKeyColumnNames) == 0 { tableLogger.Error("No primary keys found") - return nil + return } tableLogger.WithFields(logrus.Fields{ @@ -259,7 +250,10 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Infof("Hash computed: %s", testOutput) } - return tableResults + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = make(SchemaResult) + databaseResults[schemaName][tableName] = tableResults + finalResults.AddResult(targetName, databaseResults) } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 49a971d9bab0e147f4ede1a47f57574f1d133feb Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:42:42 -0700 Subject: [PATCH 07/11] refactor: Add results per-test --- verify.go | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/verify.go b/verify.go index ba094606..29596caa 100644 --- a/verify.go +++ b/verify.go @@ -163,8 +163,6 @@ func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry } func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { - tableResults := make(TableResult) - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -239,35 +237,34 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") - - continue - } - - tableResults[testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults) } - - databaseResults := make(DatabaseResult) - databaseResults[schemaName] = make(SchemaResult) - databaseResults[schemaName][tableName] = tableResults - finalResults.AddResult(targetName, databaseResults) } -func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results) { row := conn.QueryRow(ctx, query) + var testOutputString string + var testOutput pgtype.Text if err := row.Scan(&testOutput); err != nil { switch err { case pgx.ErrNoRows: - return "no rows", nil + testOutputString = "no rows" default: - return "", errors.Wrap(err, "failed to scan test output") + logger.WithError(err).Error("failed to scan test output") + + return } + } else { + testOutputString = testOutput.String } - return testOutput.String, nil + logger.Infof("Hash computed: %s", testOutputString) + + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = make(SchemaResult) + databaseResults[schemaName][tableName] = make(TableResult) + databaseResults[schemaName][tableName][testMode] = testOutputString + finalResults.AddResult(targetName, databaseResults) } From e88922255ff27447b6eb9ca23f1db66ab5d7ecb4 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:44:24 -0700 Subject: [PATCH 08/11] refactor: Condense per database/schema/table logic --- verify.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/verify.go b/verify.go index 29596caa..1e4e155c 100644 --- a/verify.go +++ b/verify.go @@ -97,7 +97,12 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - c.runTestQueriesOnDatabase(ctx, logger, conn, targetName, schemaTableHashes, finalResults) + for schemaName, schemaHashes := range schemaTableHashes { + for tableName := range schemaHashes { + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) + } + } + logger.Info("Table hashes computed") close(done) } @@ -150,18 +155,6 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { - for schemaName, schemaHashes := range databaseHashes { - c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, targetName, schemaHashes, finalResults) - } -} - -func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { - for tableName := range schemaHashes { - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) - } -} - func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") From 3a4062925cad249bf7069a70088dd480bde0b559 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:49:04 -0700 Subject: [PATCH 09/11] fix: Cleaner parallel processing with sync.WaitGroup --- verify.go | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/verify.go b/verify.go index 1e4e155c..ec7af6b5 100644 --- a/verify.go +++ b/verify.go @@ -2,6 +2,7 @@ package pgverify import ( "context" + "sync" "github.com/jackc/pgx/pgtype" "github.com/jackc/pgx/v4" @@ -63,18 +64,17 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results finalResults = NewResults(targetNames, c.TestModes) // Then query each target database in parallel to generate table hashes. - var doneChannels []chan struct{} + wg := &sync.WaitGroup{} for i, conn := range conns { - done := make(chan struct{}) - go c.runTestsOnTarget(ctx, targetNames[i], conn, finalResults, done) - doneChannels = append(doneChannels, done) - } + wg.Add(1) - for _, done := range doneChannels { - <-done + go c.runTestsOnTarget(ctx, targetNames[i], conn, finalResults, wg) } + // Wait for queries to complete + wg.Wait() + // Compare final results reportErrors := finalResults.CheckForErrors() if len(reportErrors) > 0 { @@ -86,25 +86,27 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results return finalResults, nil } -func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, done chan struct{}) { +func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + logger := c.Logger.WithField("target", targetName) schemaTableHashes, err := c.fetchTargetTableNames(ctx, conn) if err != nil { logger.WithError(err).Error("failed to fetch target tables") - close(done) return } for schemaName, schemaHashes := range schemaTableHashes { for tableName := range schemaHashes { - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) + wg.Add(1) + + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) } } logger.Info("Table hashes computed") - close(done) } func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (DatabaseResult, error) { @@ -155,7 +157,9 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -230,11 +234,14 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) - runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults) + wg.Add(1) + runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) } } -func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + row := conn.QueryRow(ctx, query) var testOutputString string From bbe231899153d476c43cdaa7d3b1379ac00dd2b5 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:50:17 -0700 Subject: [PATCH 10/11] feat: Execute all queries in parallel --- verify.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/verify.go b/verify.go index ec7af6b5..fd87107b 100644 --- a/verify.go +++ b/verify.go @@ -102,7 +102,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p for tableName := range schemaHashes { wg.Add(1) - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) + go c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) } } @@ -235,7 +235,8 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) wg.Add(1) - runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) + + go runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) } } From fd37710fade2dc510720df175fedf7def2fd55f4 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 15:38:30 -0700 Subject: [PATCH 11/11] fix: Don't re-use db connections --- verify.go | 50 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/verify.go b/verify.go index fd87107b..22e43813 100644 --- a/verify.go +++ b/verify.go @@ -31,7 +31,7 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results // First check that we can connect to every specified target database. targetNames := make([]string, len(targets)) - conns := make(map[int]*pgx.Conn) + connConfs := make(map[int]*pgx.ConnConfig) for i, target := range targets { pgxLoggerFields := logrus.Fields{ @@ -53,12 +53,7 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results target.LogLevel = pgx.LogLevelError - conn, err := pgx.ConnectConfig(ctx, target) - if err != nil { - return finalResults, err - } - defer conn.Close(ctx) - conns[i] = conn + connConfs[i] = target } finalResults = NewResults(targetNames, c.TestModes) @@ -66,10 +61,10 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results // Then query each target database in parallel to generate table hashes. wg := &sync.WaitGroup{} - for i, conn := range conns { + for i, connConf := range connConfs { wg.Add(1) - go c.runTestsOnTarget(ctx, targetNames[i], conn, finalResults, wg) + go c.runTestsOnTarget(ctx, targetNames[i], connConf, finalResults, wg) } // Wait for queries to complete @@ -86,11 +81,20 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results return finalResults, nil } -func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, wg *sync.WaitGroup) { +func (c Config) runTestsOnTarget(ctx context.Context, targetName string, connConf *pgx.ConnConfig, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() logger := c.Logger.WithField("target", targetName) + conn, err := pgx.ConnectConfig(ctx, connConf) + if err != nil { + logger.WithError(err).Error("failed to connect to target") + + return + } + + defer conn.Close(ctx) + schemaTableHashes, err := c.fetchTargetTableNames(ctx, conn) if err != nil { logger.WithError(err).Error("failed to fetch target tables") @@ -102,7 +106,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p for tableName := range schemaHashes { wg.Add(1) - go c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) + go c.runTestQueriesOnTable(ctx, logger, connConf, targetName, schemaName, tableName, finalResults, wg) } } @@ -157,12 +161,21 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, connConf *pgx.ConnConfig, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") + conn, err := pgx.ConnectConfig(ctx, connConf) + if err != nil { + logger.WithError(err).Error("failed to connect to target") + + return + } + + defer conn.Close(ctx) + rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) if err != nil { tableLogger.WithError(err).Error("Failed to query column names, data types") @@ -236,13 +249,22 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, wg.Add(1) - go runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) + go runTestOnTable(ctx, testLogger, connConf, targetName, schemaName, tableName, testMode, query, finalResults, wg) } } -func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, connConf *pgx.ConnConfig, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() + conn, err := pgx.ConnectConfig(ctx, connConf) + if err != nil { + logger.WithError(err).Error("failed to connect to target") + + return + } + + defer conn.Close(ctx) + row := conn.QueryRow(ctx, query) var testOutputString string