Skip to content

Commit 234e123

Browse files
committed
feat: gdb failover support
1 parent 81930fe commit 234e123

36 files changed

Lines changed: 1696 additions & 980 deletions

.github/workflows/integration_tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ on:
55
push:
66
branches:
77
- main
8-
- refactor/monitor-service
98
paths-ignore:
109
- "**/*.md"
1110
- "**/*.jpg"

common/lib/connection_plugin_chain_builder.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { CustomEndpointPluginFactory } from "./plugins/custom_endpoint/custom_en
4343
import { ConfigurationProfile } from "./profile/configuration_profile";
4444
import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory";
4545
import { BlueGreenPluginFactory } from "./plugins/bluegreen/blue_green_plugin_factory";
46+
import { GlobalDbFailoverPluginFactory } from "./plugins/gdb_failover/global_db_failover_plugin_factory";
4647
import { FullServicesContainer } from "./utils/full_services_container";
4748

4849
/*
@@ -66,6 +67,7 @@ export class ConnectionPluginChainBuilder {
6667
["readWriteSplitting", { factory: ReadWriteSplittingPluginFactory, weight: 600 }],
6768
["failover", { factory: FailoverPluginFactory, weight: 700 }],
6869
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
70+
["gdbFailover", { factory: GlobalDbFailoverPluginFactory, weight: 720 }],
6971
["efm", { factory: HostMonitoringPluginFactory, weight: 800 }],
7072
["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }],
7173
["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }],
@@ -87,6 +89,7 @@ export class ConnectionPluginChainBuilder {
8789
[ReadWriteSplittingPluginFactory, 600],
8890
[FailoverPluginFactory, 700],
8991
[Failover2PluginFactory, 710],
92+
[GlobalDbFailoverPluginFactory, 720],
9093
[HostMonitoringPluginFactory, 800],
9194
[HostMonitoring2PluginFactory, 810],
9295
[LimitlessConnectionPluginFactory, 950],

common/lib/database_dialect/database_dialect_codes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616

1717
export class DatabaseDialectCodes {
18+
static readonly GLOBAL_AURORA_MYSQL: string = "global-aurora-mysql";
1819
static readonly AURORA_MYSQL: string = "aurora-mysql";
1920
static readonly RDS_MYSQL: string = "rds-mysql";
2021
static readonly MYSQL: string = "mysql";
2122
// https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/multi-az-db-clusters-concepts.html
2223
static readonly RDS_MULTI_AZ_MYSQL: string = "rds-multi-az-mysql";
24+
static readonly GLOBAL_AURORA_PG: string = "global-aurora-pg";
2325
static readonly AURORA_PG: string = "aurora-pg";
2426
static readonly RDS_PG: string = "rds-pg";
2527
// https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/multi-az-db-clusters-concepts.html

common/lib/database_dialect/database_dialect_manager.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ export class DatabaseDialectManager implements DatabaseDialectProvider {
9595

9696
if (this.dbType === DatabaseType.MYSQL) {
9797
const type = this.rdsHelper.identifyRdsType(host);
98+
if (type == RdsUrlType.RDS_GLOBAL_WRITER_CLUSTER) {
99+
this.canUpdate = false;
100+
this.dialectCode = DatabaseDialectCodes.GLOBAL_AURORA_MYSQL;
101+
this.dialect = <DatabaseDialect>this.knownDialectsByCode.get(DatabaseDialectCodes.GLOBAL_AURORA_MYSQL);
102+
this.logCurrentDialect();
103+
return this.dialect;
104+
}
105+
98106
if (type.isRdsCluster) {
99107
this.canUpdate = true;
100108
this.dialectCode = DatabaseDialectCodes.AURORA_MYSQL;
@@ -128,6 +136,14 @@ export class DatabaseDialectManager implements DatabaseDialectProvider {
128136
return this.dialect;
129137
}
130138

139+
if (type == RdsUrlType.RDS_GLOBAL_WRITER_CLUSTER) {
140+
this.canUpdate = false;
141+
this.dialectCode = DatabaseDialectCodes.GLOBAL_AURORA_PG;
142+
this.dialect = <DatabaseDialect>this.knownDialectsByCode.get(DatabaseDialectCodes.GLOBAL_AURORA_PG);
143+
this.logCurrentDialect();
144+
return this.dialect;
145+
}
146+
131147
if (type.isRdsCluster) {
132148
this.canUpdate = true;
133149
this.dialectCode = DatabaseDialectCodes.AURORA_PG;

common/lib/host_list_provider/connection_string_host_list_provider.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,8 @@ export class ConnectionStringHostListProvider implements StaticHostListProvider
112112
getClusterId(): string {
113113
throw new AwsWrapperError("ConnectionStringHostListProvider does not support getClusterId.");
114114
}
115+
116+
forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
117+
throw new AwsWrapperError("ConnectionStringHostListProvider does not support forceMonitoringRefresh.");
118+
}
115119
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { RdsHostListProvider } from "./rds_host_list_provider";
18+
import { GlobalTopologyUtils } from "./global_topology_utils";
19+
import { FullServicesContainer } from "../utils/full_services_container";
20+
import { HostInfo } from "../host_info";
21+
import { WrapperProperties } from "../wrapper_property";
22+
import { ClusterTopologyMonitor, ClusterTopologyMonitorImpl } from "./monitoring/cluster_topology_monitor";
23+
import { GlobalAuroraTopologyMonitor } from "./monitoring/global_aurora_topology_monitor";
24+
import { MonitorInitializer } from "../utils/monitoring/monitor";
25+
import { ClientWrapper } from "../client_wrapper";
26+
import { DatabaseDialect } from "../database_dialect/database_dialect";
27+
28+
export class GlobalAuroraHostListProvider extends RdsHostListProvider {
29+
protected readonly globalTopologyUtils: GlobalTopologyUtils;
30+
protected instanceTemplatesByRegion: Map<string, HostInfo>;
31+
32+
constructor(properties: Map<string, any>, originalUrl: string, topologyUtils: GlobalTopologyUtils, servicesContainers: FullServicesContainer) {
33+
super(properties, originalUrl, topologyUtils, servicesContainers);
34+
this.globalTopologyUtils = topologyUtils;
35+
}
36+
37+
protected override initSettings(): void {
38+
super.initSettings();
39+
40+
const instanceTemplates = WrapperProperties.GLOBAL_CLUSTER_INSTANCE_HOST_PATTERNS.get(this.properties);
41+
this.instanceTemplatesByRegion = this.globalTopologyUtils.parseInstanceTemplates(
42+
instanceTemplates,
43+
(hostPattern: string) => this.validateHostPatternSetting(hostPattern),
44+
() => this.hostListProviderService.getHostInfoBuilder()
45+
);
46+
}
47+
48+
protected override async getOrCreateMonitor(): Promise<ClusterTopologyMonitor> {
49+
const initializer: MonitorInitializer = {
50+
createMonitor: (servicesContainer: FullServicesContainer): ClusterTopologyMonitor => {
51+
return new GlobalAuroraTopologyMonitor(
52+
servicesContainer,
53+
this.globalTopologyUtils,
54+
this.clusterId,
55+
this.initialHost,
56+
this.properties,
57+
this.clusterInstanceTemplate,
58+
this.refreshRateNano,
59+
this.highRefreshRateNano,
60+
this.instanceTemplatesByRegion
61+
);
62+
}
63+
};
64+
65+
return await this.servicesContainers
66+
.getMonitorService()
67+
.runIfAbsent(ClusterTopologyMonitorImpl, this.clusterId, this.servicesContainers, this.properties, initializer);
68+
}
69+
70+
override async getCurrentTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
71+
this.init();
72+
return await this.globalTopologyUtils.queryForTopologyWithRegion(targetClient, dialect, this.initialHost, this.instanceTemplatesByRegion);
73+
}
74+
}

common/lib/host_list_provider/global_topology_utils.ts

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@ import { HostInfo } from "../host_info";
2121
import { isDialectTopologyAware } from "../utils/utils";
2222
import { Messages } from "../utils/messages";
2323
import { AwsWrapperError } from "../utils/errors";
24+
import { HostInfoBuilder } from "../host_info_builder";
25+
import { logger } from "../../logutils";
2426

2527
export class GlobalTopologyUtils extends TopologyUtils {
26-
async queryForTopology(
28+
override async queryForTopology(
2729
targetClient: ClientWrapper,
2830
dialect: DatabaseDialect,
2931
initialHost: HostInfo,
3032
clusterInstanceTemplate: HostInfo
3133
): Promise<HostInfo[]> {
32-
throw new AwsWrapperError("Not implemented");
34+
// This method should not be called on this class.
35+
throw new AwsWrapperError("GlobalTopologyUtils.queryForTopology should not be called directly. Use queryForTopologyWithRegion instead.");
3336
}
3437

3538
async queryForTopologyWithRegion(
@@ -39,14 +42,61 @@ export class GlobalTopologyUtils extends TopologyUtils {
3942
instanceTemplateByRegion: Map<string, HostInfo>
4043
): Promise<HostInfo[]> {
4144
if (!isDialectTopologyAware(dialect)) {
42-
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
45+
throw new AwsWrapperError(Messages.get("RdsHostListProvider.incorrectDialect"));
4346
}
4447

4548
return await dialect
4649
.queryForTopology(targetClient)
4750
.then((res: TopologyQueryResult[]) => this.verifyWriter(this.createHostsWithTemplateMap(res, initialHost, instanceTemplateByRegion)));
4851
}
4952

53+
async getRegion(instanceId: string, targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<string | null> {
54+
if (!isDialectTopologyAware(dialect)) {
55+
throw new AwsWrapperError(Messages.get("RdsHostListProvider.incorrectDialect"));
56+
}
57+
58+
const results = await dialect.queryForTopology(targetClient);
59+
const match = results.find((row) => row.id === instanceId);
60+
return match?.awsRegion ?? null;
61+
}
62+
63+
parseInstanceTemplates(
64+
instanceTemplatesString: string | null,
65+
hostValidator: (hostPattern: string) => void,
66+
hostInfoBuilderFunc: () => HostInfoBuilder
67+
): Map<string, HostInfo> {
68+
if (!instanceTemplatesString) {
69+
throw new AwsWrapperError(Messages.get("GlobalTopologyUtils.globalClusterInstanceHostPatternsRequired"));
70+
}
71+
72+
const instanceTemplates = new Map<string, HostInfo>();
73+
const patterns = instanceTemplatesString.split(",");
74+
75+
for (const pattern of patterns) {
76+
const trimmedPattern = pattern.trim();
77+
const colonIndex = trimmedPattern.indexOf(":");
78+
if (colonIndex === -1) {
79+
throw new AwsWrapperError(Messages.get("GlobalTopologyUtils.invalidPatternFormat", trimmedPattern));
80+
}
81+
82+
const region = trimmedPattern.substring(0, colonIndex).trim();
83+
const hostPattern = trimmedPattern.substring(colonIndex + 1).trim();
84+
85+
if (!region || !hostPattern) {
86+
throw new AwsWrapperError(Messages.get("GlobalTopologyUtils.invalidPatternFormat", trimmedPattern));
87+
}
88+
89+
hostValidator(hostPattern);
90+
91+
const hostInfo = hostInfoBuilderFunc().withHost(hostPattern).build();
92+
instanceTemplates.set(region, hostInfo);
93+
}
94+
95+
logger.debug(`Detected Global Database patterns: ${JSON.stringify(Array.from(instanceTemplates.entries()))}`);
96+
97+
return instanceTemplates;
98+
}
99+
50100
private createHostsWithTemplateMap(
51101
topologyQueryResults: TopologyQueryResult[],
52102
initialHost: HostInfo,

common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
160160
this.stopMonitoring = true;
161161
this.hostMonitorsStop = true;
162162
this.requestToUpdateTopology = true;
163-
// await Promise.all(this.untrackedPromises);
164-
// await Promise.all(this.submittedHosts.values());
163+
await Promise.all(this.untrackedPromises);
164+
await Promise.all(this.submittedHosts.values());
165165

166166
const monitoringClientToClose = this.monitoringClient;
167167
const hostMonitorsWriterClientToClose = this.hostMonitorsWriterClient;
@@ -258,7 +258,7 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
258258
let client: ClientWrapper;
259259
try {
260260
client = await this.servicesContainer.getPluginService().forceConnect(this.initialHostInfo, this._monitoringProperties);
261-
} catch {
261+
} catch (connectError) {
262262
// Unable to connect to host;
263263
return null;
264264
}
@@ -374,7 +374,6 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
374374

375375
this.untrackedPromises = [];
376376
this.submittedHosts.clear();
377-
this.hostMonitors.clear();
378377

379378
return super.stop();
380379
}
@@ -417,8 +416,8 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
417416
);
418417
await minimalServiceContainer.getPluginManager().init();
419418
const hostMonitor = new HostMonitor(minimalServiceContainer, this, hostInfo, this.writerHostInfo);
420-
this.submittedHosts.set(hostInfo.host, hostMonitor.run());
421-
this.hostMonitors.set(hostInfo.host, hostMonitor);
419+
const promise = hostMonitor.run();
420+
this.submittedHosts.set(hostInfo.host, promise);
422421
}
423422
});
424423

@@ -451,7 +450,8 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
451450
hosts.forEach((hostInfo) => {
452451
if (!this.submittedHosts.get(hostInfo.host)) {
453452
const hostMonitor = new HostMonitor(this.servicesContainer, this, hostInfo, this.writerHostInfo);
454-
this.submittedHosts.set(hostInfo.host, hostMonitor.run());
453+
const promise = hostMonitor.run();
454+
this.submittedHosts.set(hostInfo.host, promise);
455455
}
456456
});
457457
}
@@ -697,7 +697,8 @@ export class HostMonitor {
697697
} else {
698698
// It might be some transient error. Let's try again.
699699
// If the error repeats, we will try again after a longer delay.
700-
await sleep(this.calculateBackoffWithJitter(this.connectionAttempts++));
700+
const backoff = this.calculateBackoffWithJitter(this.connectionAttempts++);
701+
await sleep(backoff);
701702
this.monitor.completedOneCycle.set(this.hostInfo.hostId, true);
702703
this.monitor.readerTopologiesById.delete(this.hostInfo.hostId);
703704
continue;
@@ -718,7 +719,8 @@ export class HostMonitor {
718719
if (isWriter) {
719720
try {
720721
// First connection after failover may be stale.
721-
if ((await this.monitor.pluginService.getHostRole(this.client)) !== HostRole.WRITER) {
722+
const hostRole = await this.monitor.pluginService.getHostRole(this.client);
723+
if (hostRole !== HostRole.WRITER) {
722724
isWriter = false;
723725
}
724726
} catch (error: any) {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { ClusterTopologyMonitorImpl } from "./cluster_topology_monitor";
18+
import { GlobalTopologyUtils } from "../global_topology_utils";
19+
import { FullServicesContainer } from "../../utils/full_services_container";
20+
import { HostInfo } from "../../host_info";
21+
import { ClientWrapper } from "../../client_wrapper";
22+
import { AwsWrapperError } from "../../utils/errors";
23+
import { Messages } from "../../utils/messages";
24+
25+
export class GlobalAuroraTopologyMonitor extends ClusterTopologyMonitorImpl {
26+
protected readonly instanceTemplatesByRegion: Map<string, HostInfo>;
27+
declare public readonly topologyUtils: GlobalTopologyUtils;
28+
29+
constructor(
30+
servicesContainer: FullServicesContainer,
31+
topologyUtils: GlobalTopologyUtils,
32+
clusterId: string,
33+
initialHostInfo: HostInfo,
34+
properties: Map<string, any>,
35+
instanceTemplate: HostInfo,
36+
refreshRateNano: number,
37+
highRefreshRateNano: number,
38+
instanceTemplatesByRegion: Map<string, HostInfo>
39+
) {
40+
super(servicesContainer, topologyUtils, clusterId, initialHostInfo, properties, instanceTemplate, refreshRateNano, highRefreshRateNano);
41+
42+
this.instanceTemplatesByRegion = instanceTemplatesByRegion;
43+
this.topologyUtils = topologyUtils;
44+
}
45+
46+
protected override async getInstanceTemplate(hostId: string, targetClient: ClientWrapper): Promise<HostInfo> {
47+
const dialect = this.hostListProviderService.getDialect();
48+
const region = await this.topologyUtils.getRegion(hostId, targetClient, dialect);
49+
50+
if (region) {
51+
const instanceTemplate = this.instanceTemplatesByRegion.get(region);
52+
if (!instanceTemplate) {
53+
throw new AwsWrapperError(Messages.get("GlobalAuroraTopologyMonitor.cannotFindRegionTemplate", region));
54+
}
55+
return instanceTemplate;
56+
}
57+
58+
return this.instanceTemplate;
59+
}
60+
61+
override async queryForTopology(client: ClientWrapper): Promise<HostInfo[]> {
62+
const dialect = this.hostListProviderService.getDialect();
63+
return await this.topologyUtils.queryForTopologyWithRegion(client, dialect, this.initialHostInfo, this.instanceTemplatesByRegion);
64+
}
65+
}

common/lib/host_selector.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ import { HostInfo } from "./host_info";
1818
import { HostRole } from "./host_role";
1919

2020
export interface HostSelector {
21-
getHost(hosts: HostInfo[], role: HostRole, props?: Map<string, any>): HostInfo;
21+
getHost(hosts: HostInfo[], role: HostRole | null, props?: Map<string, any>): HostInfo;
2222
}

0 commit comments

Comments
 (0)