Skip to content

Commit 8bc63f0

Browse files
authored
Fix abort/retry interaction (#1655)
* Add failing test for retry + abort issue * Fix retry after abort issue * Skip retries for warning errors Warning errors indicate data consistency issues that won't resolve on retry, so attempting to retry them is futile and causes unnecessary delays. This change detects warning errors early and aborts immediately instead of retrying. * Fix test expectation
1 parent 8f274f7 commit 8bc63f0

File tree

3 files changed

+234
-1
lines changed

3 files changed

+234
-1
lines changed

go/logic/migrator.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,21 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
160160
// sleep after previous iteration
161161
RetrySleepFn(1 * time.Second)
162162
}
163+
// Check for abort/context cancellation before each retry
164+
if abortErr := this.checkAbort(); abortErr != nil {
165+
return abortErr
166+
}
163167
err = operation()
164168
if err == nil {
165169
return nil
166170
}
171+
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
172+
if strings.Contains(err.Error(), "warnings detected") {
173+
if len(notFatalHint) == 0 {
174+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
175+
}
176+
return err
177+
}
167178
// there's an error. Let's try again.
168179
}
169180
if len(notFatalHint) == 0 {
@@ -190,10 +201,21 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
190201
if i != 0 {
191202
RetrySleepFn(time.Duration(interval) * time.Second)
192203
}
204+
// Check for abort/context cancellation before each retry
205+
if abortErr := this.checkAbort(); abortErr != nil {
206+
return abortErr
207+
}
193208
err = operation()
194209
if err == nil {
195210
return nil
196211
}
212+
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
213+
if strings.Contains(err.Error(), "warnings detected") {
214+
if len(notFatalHint) == 0 {
215+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
216+
}
217+
return err
218+
}
197219
}
198220
if len(notFatalHint) == 0 {
199221
// Use helper to prevent deadlock if listenOnPanicAbort already exited

go/logic/migrator_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,118 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
714714
assert.Equal(t, tries, 100)
715715
}
716716

717+
func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) {
718+
oldRetrySleepFn := RetrySleepFn
719+
defer func() { RetrySleepFn = oldRetrySleepFn }()
720+
721+
migrationContext := base.NewMigrationContext()
722+
migrationContext.SetDefaultNumRetries(100)
723+
migrator := NewMigrator(migrationContext, "1.2.3")
724+
725+
RetrySleepFn = func(duration time.Duration) {
726+
// No sleep needed for this test
727+
}
728+
729+
var tries = 0
730+
retryable := func() error {
731+
tries++
732+
if tries == 5 {
733+
// Cancel context on 5th try
734+
migrationContext.CancelContext()
735+
}
736+
return errors.New("Simulated error")
737+
}
738+
739+
result := migrator.retryOperation(retryable, false)
740+
assert.Error(t, result)
741+
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
742+
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
743+
// Verify we got context cancellation error
744+
assert.Contains(t, result.Error(), "context canceled")
745+
}
746+
747+
func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) {
748+
oldRetrySleepFn := RetrySleepFn
749+
defer func() { RetrySleepFn = oldRetrySleepFn }()
750+
751+
migrationContext := base.NewMigrationContext()
752+
migrationContext.SetDefaultNumRetries(100)
753+
migrationContext.SetExponentialBackoffMaxInterval(42)
754+
migrator := NewMigrator(migrationContext, "1.2.3")
755+
756+
RetrySleepFn = func(duration time.Duration) {
757+
// No sleep needed for this test
758+
}
759+
760+
var tries = 0
761+
retryable := func() error {
762+
tries++
763+
if tries == 5 {
764+
// Cancel context on 5th try
765+
migrationContext.CancelContext()
766+
}
767+
return errors.New("Simulated error")
768+
}
769+
770+
result := migrator.retryOperationWithExponentialBackoff(retryable, false)
771+
assert.Error(t, result)
772+
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
773+
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
774+
// Verify we got context cancellation error
775+
assert.Contains(t, result.Error(), "context canceled")
776+
}
777+
778+
func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) {
779+
oldRetrySleepFn := RetrySleepFn
780+
defer func() { RetrySleepFn = oldRetrySleepFn }()
781+
782+
migrationContext := base.NewMigrationContext()
783+
migrationContext.SetDefaultNumRetries(100)
784+
migrator := NewMigrator(migrationContext, "1.2.3")
785+
786+
RetrySleepFn = func(duration time.Duration) {
787+
t.Fatal("Should not sleep/retry for warning errors")
788+
}
789+
790+
var tries = 0
791+
retryable := func() error {
792+
tries++
793+
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
794+
}
795+
796+
result := migrator.retryOperation(retryable, false)
797+
assert.Error(t, result)
798+
// Should only try once - no retries for warnings
799+
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
800+
assert.Contains(t, result.Error(), "warnings detected")
801+
}
802+
803+
func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) {
804+
oldRetrySleepFn := RetrySleepFn
805+
defer func() { RetrySleepFn = oldRetrySleepFn }()
806+
807+
migrationContext := base.NewMigrationContext()
808+
migrationContext.SetDefaultNumRetries(100)
809+
migrationContext.SetExponentialBackoffMaxInterval(42)
810+
migrator := NewMigrator(migrationContext, "1.2.3")
811+
812+
RetrySleepFn = func(duration time.Duration) {
813+
t.Fatal("Should not sleep/retry for warning errors")
814+
}
815+
816+
var tries = 0
817+
retryable := func() error {
818+
tries++
819+
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
820+
}
821+
822+
result := migrator.retryOperationWithExponentialBackoff(retryable, false)
823+
assert.Error(t, result)
824+
// Should only try once - no retries for warnings
825+
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
826+
assert.Contains(t, result.Error(), "warnings detected")
827+
}
828+
717829
func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
718830
ctx := context.Background()
719831

@@ -1210,3 +1322,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) {
12101322
t.Fatal("Expected checkAbort to return error when context is cancelled")
12111323
}
12121324
}
1325+
1326+
func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() {
1327+
ctx := context.Background()
1328+
1329+
// Create table with email column (no unique constraint initially)
1330+
_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName()))
1331+
suite.Require().NoError(err)
1332+
1333+
// Insert initial rows with unique email values - passes pre-flight validation
1334+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
1335+
suite.Require().NoError(err)
1336+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName()))
1337+
suite.Require().NoError(err)
1338+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName()))
1339+
suite.Require().NoError(err)
1340+
1341+
// Verify we have 3 rows
1342+
var count int
1343+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1344+
suite.Require().NoError(err)
1345+
suite.Require().Equal(3, count)
1346+
1347+
// Create postpone flag file
1348+
tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test")
1349+
suite.Require().NoError(err)
1350+
defer os.RemoveAll(tmpDir)
1351+
postponeFlagFile := filepath.Join(tmpDir, "postpone.flag")
1352+
err = os.WriteFile(postponeFlagFile, []byte{}, 0644)
1353+
suite.Require().NoError(err)
1354+
1355+
// Start migration in goroutine
1356+
done := make(chan error, 1)
1357+
go func() {
1358+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
1359+
if err != nil {
1360+
done <- err
1361+
return
1362+
}
1363+
1364+
migrationContext := newTestMigrationContext()
1365+
migrationContext.ApplierConnectionConfig = connectionConfig
1366+
migrationContext.InspectorConnectionConfig = connectionConfig
1367+
migrationContext.SetConnectionConfig("innodb")
1368+
migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)"
1369+
migrationContext.HeartbeatIntervalMilliseconds = 100
1370+
migrationContext.PostponeCutOverFlagFile = postponeFlagFile
1371+
migrationContext.PanicOnWarnings = true
1372+
1373+
// High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted
1374+
migrationContext.SetDefaultNumRetries(30)
1375+
migrationContext.CutOverExponentialBackoff = true
1376+
migrationContext.SetExponentialBackoffMaxInterval(128)
1377+
1378+
migrator := NewMigrator(migrationContext, "0.0.0")
1379+
1380+
//nolint:contextcheck
1381+
done <- migrator.Migrate()
1382+
}()
1383+
1384+
// Wait for migration to reach postponed state
1385+
// TODO replace this with an actual check for postponed state
1386+
time.Sleep(3 * time.Second)
1387+
1388+
// Now insert a duplicate email value while migration is postponed
1389+
// This simulates data arriving during migration that would violate the unique constraint
1390+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
1391+
suite.Require().NoError(err)
1392+
1393+
// Verify we now have 4 rows (including the duplicate)
1394+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1395+
suite.Require().NoError(err)
1396+
suite.Require().Equal(4, count)
1397+
1398+
// Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate
1399+
err = os.Remove(postponeFlagFile)
1400+
suite.Require().NoError(err)
1401+
1402+
// Wait for Migrate() to return - with timeout to detect if it hangs
1403+
select {
1404+
case migrateErr := <-done:
1405+
// Success - Migrate() returned
1406+
// It should return an error due to the duplicate
1407+
suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation")
1408+
suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry")
1409+
case <-time.After(5 * time.Minute):
1410+
suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop")
1411+
}
1412+
1413+
// Verify all 4 rows are still in the original table (no silent data loss)
1414+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1415+
suite.Require().NoError(err)
1416+
suite.Require().Equal(4, count, "Original table should still have all 4 rows")
1417+
1418+
// Verify both user1@example.com entries still exist
1419+
var duplicateCount int
1420+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount)
1421+
suite.Require().NoError(err)
1422+
suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries")
1423+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ERROR warnings detected in statement 1 of 1
1+
ERROR warnings detected in statement

0 commit comments

Comments
 (0)