From a7e4601a69657e33c200c22d9cacc98541664c7b Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 23 Oct 2024 15:56:33 -0700 Subject: [PATCH 1/6] use multiStatements to apply DML --- go/logic/applier.go | 37 ++++++++++++++++++++++--------------- go/mysql/connection.go | 1 + 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 9d59d9ec4..a07faf20f 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1225,27 +1225,34 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) } + multiArgs := []interface{}{} + var multiQueryBuilder strings.Builder for _, dmlEvent := range dmlEvents { for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { if buildResult.err != nil { - return rollback(buildResult.err) + return buildResult.err } - result, err := tx.Exec(buildResult.query, buildResult.args...) - if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) - return rollback(err) - } - - rowsAffected, err := result.RowsAffected() - if err != nil { - log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) - rowsAffected = 1 - } - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - totalDelta += buildResult.rowsDelta * rowsAffected + multiArgs = append(multiArgs, buildResult.args...) + multiQueryBuilder.WriteString(buildResult.query) + multiQueryBuilder.WriteString(";\n") } } + // TODO: get rows affected from each query in multi statement + log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) + _, err = tx.Exec(multiQueryBuilder.String(), multiArgs...) + if err != nil { + err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) + return rollback(err) + } + // rowsAffected, err := result.RowsAffected() + // if err != nil { + // log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) + // rowsAffected = 1 + // } + // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). + // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event + // totalDelta += buildResult.rowsDelta * rowsAffected + if err := tx.Commit(); err != nil { return err } diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 33bde2b62..1766ee917 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -132,6 +132,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { connectionParams := []string{ "autocommit=true", "interpolateParams=true", + "multiStatements=true", fmt.Sprintf("charset=%s", this.Charset), fmt.Sprintf("tls=%s", tlsOption), fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation), From 90d6148dc4f163eaf566159fb5be05a8c29e8b31 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 23 Oct 2024 16:08:57 -0700 Subject: [PATCH 2/6] fix test --- go/mysql/connection_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/mysql/connection_test.go b/go/mysql/connection_test.go index 7859c9354..bcd8b3147 100644 --- a/go/mysql/connection_test.go +++ b/go/mysql/connection_test.go @@ -86,7 +86,7 @@ func TestGetDBUri(t *testing.T) { c.Charset = "utf8mb4,utf8,latin1" uri := c.GetDBUri("test") - require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) + require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) } func TestGetDBUriWithTLSSetup(t *testing.T) { @@ -100,5 +100,5 @@ func TestGetDBUriWithTLSSetup(t *testing.T) { c.Charset = "utf8mb4_general_ci,utf8_general_ci,latin1" uri := c.GetDBUri("test") - require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) + require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) } From 2a3318c8b6fb2dd74aa1b979bb28407a80037d18 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 23 Oct 2024 18:55:55 -0700 Subject: [PATCH 3/6] conn.Raw not working --- go/logic/applier.go | 66 ++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index a07faf20f..5b5dc7654 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -14,10 +14,13 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" - "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" - "github.com/openark/golib/log" + "context" + "database/sql/driver" + + "github.com/github/gh-ost/go/mysql" + drivermysql "github.com/go-sql-driver/mysql" "github.com/openark/golib/sqlutils" ) @@ -1207,13 +1210,19 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 + ctx := context.TODO() err := func() error { - tx, err := this.db.Begin() + conn, err := this.db.Conn(ctx) if err != nil { return err } + defer conn.Close() + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } rollback := func(err error) error { tx.Rollback() return err @@ -1225,34 +1234,49 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) } - multiArgs := []interface{}{} + rowDeltas := make([]int64, 0, len(dmlEvents)) + multiArgs := []driver.NamedValue{} var multiQueryBuilder strings.Builder for _, dmlEvent := range dmlEvents { for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { if buildResult.err != nil { - return buildResult.err + return rollback(buildResult.err) } - multiArgs = append(multiArgs, buildResult.args...) + for _, arg := range buildResult.args { + multiArgs = append(multiArgs, driver.NamedValue{Value: driver.Value(arg)}) + } + rowDeltas = append(rowDeltas, buildResult.rowsDelta) multiQueryBuilder.WriteString(buildResult.query) multiQueryBuilder.WriteString(";\n") } } - // TODO: get rows affected from each query in multi statement - log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) - _, err = tx.Exec(multiQueryBuilder.String(), multiArgs...) - if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) - return rollback(err) - } - // rowsAffected, err := result.RowsAffected() - // if err != nil { - // log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) - // rowsAffected = 1 - // } - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - // totalDelta += buildResult.rowsDelta * rowsAffected + //this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs) + execErr := conn.Raw(func(driverConn any) error { + ex, ok := driverConn.(driver.ExecerContext) + if !ok { + return fmt.Errorf("could not cast driverConn to ExecerContext") + } + res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) + if err != nil { + err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) + this.migrationContext.Log.Errorf("Error exec: %+v", err) + return err + } + mysqlRes, ok := res.(drivermysql.Result) + if !ok { + return fmt.Errorf("Could not cast %+v to mysql.Result", res) + } + // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). + // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event + for i, rowsAffected := range mysqlRes.AllRowsAffected() { + totalDelta += rowDeltas[i] * rowsAffected + } + return nil + }) + if execErr != nil { + return rollback(execErr) + } if err := tx.Commit(); err != nil { return err } From 1bd2b0be49e8d4db022d62ce7470e24ed6efdc83 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 24 Oct 2024 07:34:47 +0000 Subject: [PATCH 4/6] Fix named value building. --- go/logic/applier.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 5b5dc7654..4e0c58dd0 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1234,46 +1234,68 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) } - rowDeltas := make([]int64, 0, len(dmlEvents)) - multiArgs := []driver.NamedValue{} - var multiQueryBuilder strings.Builder + + buildResults := make([]*dmlBuildResult, 0, len(dmlEvents)) for _, dmlEvent := range dmlEvents { for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { if buildResult.err != nil { return rollback(buildResult.err) } - for _, arg := range buildResult.args { - multiArgs = append(multiArgs, driver.NamedValue{Value: driver.Value(arg)}) - } - rowDeltas = append(rowDeltas, buildResult.rowsDelta) - multiQueryBuilder.WriteString(buildResult.query) - multiQueryBuilder.WriteString(";\n") + + buildResults = append(buildResults, buildResult) } } - //this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs) execErr := conn.Raw(func(driverConn any) error { ex, ok := driverConn.(driver.ExecerContext) if !ok { return fmt.Errorf("could not cast driverConn to ExecerContext") } + + nvc, ok := driverConn.(driver.NamedValueChecker) + if !ok { + return fmt.Errorf("could not cast driverConn to NamedValueChecker") + } + + var multiArgs []driver.NamedValue + multiQueryBuilder := strings.Builder{} + var rowDeltas []int64 + + for _, buildResult := range buildResults { + for _, arg := range buildResult.args { + nv := driver.NamedValue{Value: driver.Value(arg)} + nvc.CheckNamedValue(&nv) + multiArgs = append(multiArgs, nv) + } + + multiQueryBuilder.WriteString(buildResult.query) + multiQueryBuilder.WriteString(";\n") + + rowDeltas = append(rowDeltas, buildResult.rowsDelta) + } + + // this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs) res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) if err != nil { err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) this.migrationContext.Log.Errorf("Error exec: %+v", err) return err } + mysqlRes, ok := res.(drivermysql.Result) if !ok { return fmt.Errorf("Could not cast %+v to mysql.Result", res) } + // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event for i, rowsAffected := range mysqlRes.AllRowsAffected() { totalDelta += rowDeltas[i] * rowsAffected } + return nil }) + if execErr != nil { return rollback(execErr) } From c1b600085c7bd1a627e97d56e40046778ae6c793 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Thu, 24 Oct 2024 12:03:26 -0700 Subject: [PATCH 5/6] add arthurscreiber's review suggestions --- go/logic/applier.go | 35 ++++++++++------------------------- go/mysql/connection.go | 1 - go/mysql/connection_test.go | 4 ++-- 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 4e0c58dd0..ecb9cc992 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -80,7 +80,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { func (this *Applier) InitDBConnections() (err error) { applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) - if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { + uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri) + if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, uriWithMulti); err != nil { return err } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) @@ -1210,7 +1211,7 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 - ctx := context.TODO() + ctx := context.Background() err := func() error { conn, err := this.db.Conn(ctx) @@ -1236,31 +1237,23 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } buildResults := make([]*dmlBuildResult, 0, len(dmlEvents)) + nArgs := 0 for _, dmlEvent := range dmlEvents { for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { if buildResult.err != nil { return rollback(buildResult.err) } - + nArgs += len(buildResult.args) buildResults = append(buildResults, buildResult) } } execErr := conn.Raw(func(driverConn any) error { - ex, ok := driverConn.(driver.ExecerContext) - if !ok { - return fmt.Errorf("could not cast driverConn to ExecerContext") - } - - nvc, ok := driverConn.(driver.NamedValueChecker) - if !ok { - return fmt.Errorf("could not cast driverConn to NamedValueChecker") - } + ex := driverConn.(driver.ExecerContext) + nvc := driverConn.(driver.NamedValueChecker) - var multiArgs []driver.NamedValue + multiArgs := make([]driver.NamedValue, 0, nArgs) multiQueryBuilder := strings.Builder{} - var rowDeltas []int64 - for _, buildResult := range buildResults { for _, arg := range buildResult.args { nv := driver.NamedValue{Value: driver.Value(arg)} @@ -1270,29 +1263,21 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) multiQueryBuilder.WriteString(buildResult.query) multiQueryBuilder.WriteString(";\n") - - rowDeltas = append(rowDeltas, buildResult.rowsDelta) } - // this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs) res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) if err != nil { err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) - this.migrationContext.Log.Errorf("Error exec: %+v", err) return err } - mysqlRes, ok := res.(drivermysql.Result) - if !ok { - return fmt.Errorf("Could not cast %+v to mysql.Result", res) - } + mysqlRes := res.(drivermysql.Result) // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event for i, rowsAffected := range mysqlRes.AllRowsAffected() { - totalDelta += rowDeltas[i] * rowsAffected + totalDelta += buildResults[i].rowsDelta * rowsAffected } - return nil }) diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 1766ee917..33bde2b62 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -132,7 +132,6 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { connectionParams := []string{ "autocommit=true", "interpolateParams=true", - "multiStatements=true", fmt.Sprintf("charset=%s", this.Charset), fmt.Sprintf("tls=%s", tlsOption), fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation), diff --git a/go/mysql/connection_test.go b/go/mysql/connection_test.go index bcd8b3147..7859c9354 100644 --- a/go/mysql/connection_test.go +++ b/go/mysql/connection_test.go @@ -86,7 +86,7 @@ func TestGetDBUri(t *testing.T) { c.Charset = "utf8mb4,utf8,latin1" uri := c.GetDBUri("test") - require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) + require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) } func TestGetDBUriWithTLSSetup(t *testing.T) { @@ -100,5 +100,5 @@ func TestGetDBUriWithTLSSetup(t *testing.T) { c.Charset = "utf8mb4_general_ci,utf8_general_ci,latin1" uri := c.GetDBUri("test") - require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) + require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri) } From 2e62f2a83905eeea92fd892fe187f2f617f77df8 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Thu, 24 Oct 2024 14:07:20 -0700 Subject: [PATCH 6/6] set session outside of transaction --- go/logic/applier.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index ecb9cc992..1be696909 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1220,6 +1220,12 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } defer conn.Close() + sessionQuery := "SET /* gh-ost */ SESSION time_zone = '+00:00'" + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) + if _, err := conn.ExecContext(ctx, sessionQuery); err != nil { + return err + } + tx, err := conn.BeginTx(ctx, nil) if err != nil { return err @@ -1229,13 +1235,6 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return err } - sessionQuery := "SET /* gh-ost */ SESSION time_zone = '+00:00'" - sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) - - if _, err := tx.Exec(sessionQuery); err != nil { - return rollback(err) - } - buildResults := make([]*dmlBuildResult, 0, len(dmlEvents)) nArgs := 0 for _, dmlEvent := range dmlEvents { @@ -1248,6 +1247,9 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } } + // We batch together the DML queries into multi-statements to minimize network trips. + // We have to use the raw driver connection to access the rows affected + // for each statement in the multi-statement. execErr := conn.Raw(func(driverConn any) error { ex := driverConn.(driver.ExecerContext) nvc := driverConn.(driver.NamedValueChecker)