Skip to content

Commit 077056c

Browse files
authored
feat: GDB RW Splitting (#626)
1 parent fc51880 commit 077056c

34 files changed

Lines changed: 495 additions & 155 deletions

.github/workflows/integration_tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ jobs:
119119
with:
120120
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_DEPLOY_ROLE }}
121121
role-session-name: nodejs_int_latest_tests
122+
role-duration-seconds: 21600
122123
aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
123124
output-credentials: true
124125

common/lib/connection_plugin_chain_builder.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_pl
4545
import { BlueGreenPluginFactory } from "./plugins/bluegreen/blue_green_plugin_factory";
4646
import { GlobalDbFailoverPluginFactory } from "./plugins/gdb_failover/global_db_failover_plugin_factory";
4747
import { FullServicesContainer } from "./utils/full_services_container";
48+
import { GdbReadWriteSplittingPluginFactory } from "./plugins/read_write_splitting/gdb_read_write_splitting_plugin_factory";
4849

4950
/*
5051
Type alias used for plugin factory sorting. It holds a reference to a plugin
@@ -65,6 +66,7 @@ export class ConnectionPluginChainBuilder {
6566
["staleDns", { factory: StaleDnsPluginFactory, weight: 500 }],
6667
["bg", { factory: BlueGreenPluginFactory, weight: 550 }],
6768
["readWriteSplitting", { factory: ReadWriteSplittingPluginFactory, weight: 600 }],
69+
["gdbReadWriteSplitting", { factory: GdbReadWriteSplittingPluginFactory, weight: 610 }],
6870
["failover", { factory: FailoverPluginFactory, weight: 700 }],
6971
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
7072
["gdbFailover", { factory: GlobalDbFailoverPluginFactory, weight: 720 }],
@@ -87,6 +89,7 @@ export class ConnectionPluginChainBuilder {
8789
[StaleDnsPluginFactory, 500],
8890
[BlueGreenPluginFactory, 550],
8991
[ReadWriteSplittingPluginFactory, 600],
92+
[GdbReadWriteSplittingPluginFactory, 610],
9093
[FailoverPluginFactory, 700],
9194
[Failover2PluginFactory, 710],
9295
[GlobalDbFailoverPluginFactory, 720],

common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
413413
if (writerClient && writerClientHostInfo) {
414414
logger.debug(Messages.get("ClusterTopologyMonitor.writerPickedUpFromHostMonitors", writerClientHostInfo.toString()));
415415

416+
const oldMonitoringClient = this.monitoringClient;
416417
this.monitoringClient = writerClient;
417418
this.writerHostInfo = writerClientHostInfo;
418419
this.isVerifiedWriterConnection = true;
@@ -425,6 +426,11 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
425426
this.readerTopologiesById.clear();
426427
this.completedOneCycle.clear();
427428

429+
// Close the old monitoring client that was replaced by the new writer client.
430+
if (oldMonitoringClient && oldMonitoringClient !== this.monitoringClient) {
431+
await this.closeConnection(oldMonitoringClient);
432+
}
433+
428434
await this.delay(true);
429435
continue;
430436
} else {

common/lib/host_list_provider/rds_host_list_provider.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,21 @@ export class RdsHostListProvider implements DynamicHostListProvider {
144144

145145
if (!this.pluginService.isDialectConfirmed()) {
146146
// We need to confirm the dialect before creating a topology monitor so that it uses the correct SQL queries.
147-
// We will return the original hosts parsed from the connections string until the dialect has been confirmed.
147+
// Return the original hosts parsed from the connection string.
148148
return this.initialHostList;
149149
}
150150

151-
return await this.forceRefreshMonitor(verifyTopology, timeoutMs);
151+
const hosts = await this.forceRefreshMonitor(verifyTopology, timeoutMs);
152+
if (hosts && hosts.length > 0) {
153+
return hosts;
154+
}
155+
156+
// Check for cached topology as a fallback.
157+
const storedTopology = this.getStoredTopology();
158+
if (storedTopology && storedTopology.length > 0) {
159+
return storedTopology;
160+
}
161+
return this.initialHostList;
152162
}
153163

154164
async getHostRole(client: ClientWrapper, _dialect: DatabaseDialect): Promise<HostRole> {
@@ -236,6 +246,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
236246
}
237247

238248
async getCurrentTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
249+
this.init();
239250
return await this.topologyUtils.queryForTopology(targetClient, dialect, this.initialHost, this.clusterInstanceTemplate);
240251
}
241252

common/lib/partial_plugin_service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ export class PartialPluginService implements PluginService, HostListProviderServ
264264
}
265265

266266
isDialectConfirmed(): boolean {
267-
throw new AwsWrapperError(Messages.get("PartialPluginService.unexpectedMethodCall", "isDialectConfirmed"));
267+
return true;
268268
}
269269

270270
setInTransaction(inTransaction: boolean): void {

common/lib/pg_client_wrapper.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class PgClientWrapper implements ClientWrapper {
6060

6161
async abort(): Promise<void> {
6262
try {
63-
return await ClientUtils.queryWithTimeout(this.end(), this.properties);
63+
this.client?.connection?.stream?.destroy();
6464
} catch (error: any) {
6565
// Ignore
6666
}

common/lib/plugin_service.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,28 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
281281
return this.dialect;
282282
}
283283

284+
private static readonly DIALECT_CONFIRMED_STATUS_KEY = "DialectConfirmed";
285+
286+
private getDialectConfirmedCacheKey(): string {
287+
let clusterId = WrapperProperties.CLUSTER_ID.defaultValue;
288+
try {
289+
clusterId = this._hostListProvider?.getClusterId() ?? WrapperProperties.CLUSTER_ID.defaultValue;
290+
} catch (e) {
291+
// May fail if the host list provider does not support getClusterId. In this case use the default value.
292+
}
293+
return `${clusterId}::${PluginServiceImpl.DIALECT_CONFIRMED_STATUS_KEY}`;
294+
}
295+
284296
isDialectConfirmed(): boolean {
297+
if (this._isDialectConfirmed) {
298+
return true;
299+
}
300+
301+
const cacheItem = this.storageService.get(StatusCacheItem, this.getDialectConfirmedCacheKey());
302+
if (cacheItem && cacheItem.status === true) {
303+
this._isDialectConfirmed = true;
304+
}
305+
285306
return this._isDialectConfirmed;
286307
}
287308

@@ -634,6 +655,8 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
634655
this.dialect = await this.dbDialectProvider.getDialectForUpdate(targetClient, this.initialHost, this.props.get(WrapperProperties.HOST.name));
635656

636657
this._isDialectConfirmed = true;
658+
this.storageService.set(this.getDialectConfirmedCacheKey(), new StatusCacheItem(true));
659+
637660
if (originalDialect === this.dialect) {
638661
return;
639662
}

common/lib/plugins/bluegreen/blue_green_plugin.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ import { IamAuthenticationPlugin } from "../../authentication/iam_authentication
2727
import { BlueGreenRole } from "./blue_green_role";
2828
import { ExecuteRouting, RoutingResultHolder } from "./routing/execute_routing";
2929
import { CanReleaseResources } from "../../can_release_resources";
30+
import { FullServicesContainer } from "../../utils/full_services_container";
3031

3132
export interface BlueGreenProviderSupplier {
32-
create(pluginService: PluginService, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider;
33+
create(servicesContainer: FullServicesContainer, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider;
3334
}
3435

3536
export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
@@ -42,6 +43,7 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele
4243

4344
private static readonly CLOSED_METHOD_NAMES: Set<string> = new Set(["end", "abort"]);
4445
protected readonly pluginService: PluginService;
46+
protected readonly servicesContainer: FullServicesContainer;
4547
protected readonly properties: Map<string, any>;
4648
protected bgProviderSupplier: BlueGreenProviderSupplier;
4749
protected bgStatus: BlueGreenStatus = null;
@@ -53,18 +55,19 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele
5355
protected endTimeNano: bigint = BigInt(0);
5456
private static provider: Map<string, BlueGreenStatusProvider> = new Map();
5557

56-
constructor(pluginService: PluginService, properties: Map<string, any>, bgProviderSupplier: BlueGreenProviderSupplier = null) {
58+
constructor(servicesContainer: FullServicesContainer, properties: Map<string, any>, bgProviderSupplier: BlueGreenProviderSupplier = null) {
5759
super();
5860
if (!bgProviderSupplier) {
5961
bgProviderSupplier = {
60-
create: (pluginService: PluginService, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider => {
61-
return new BlueGreenStatusProvider(pluginService, props, bgdId);
62+
create: (servicesContainer: FullServicesContainer, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider => {
63+
return new BlueGreenStatusProvider(servicesContainer, props, bgdId);
6264
}
6365
};
6466
}
6567

6668
this.properties = properties;
67-
this.pluginService = pluginService;
69+
this.servicesContainer = servicesContainer;
70+
this.pluginService = servicesContainer.pluginService;
6871
this.bgProviderSupplier = bgProviderSupplier;
6972
this.bgdId = WrapperProperties.BGD_ID.get(this.properties).trim().toLowerCase();
7073
}
@@ -215,7 +218,7 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele
215218
private initProvider() {
216219
const provider = BlueGreenPlugin.provider.get(this.bgdId);
217220
if (!provider) {
218-
const provider = this.bgProviderSupplier.create(this.pluginService, this.properties, this.bgdId);
221+
const provider = this.bgProviderSupplier.create(this.servicesContainer, this.properties, this.bgdId);
219222
BlueGreenPlugin.provider.set(this.bgdId, provider);
220223
}
221224
}

common/lib/plugins/bluegreen/blue_green_plugin_factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export class BlueGreenPluginFactory extends ConnectionPluginFactory {
2828
if (!BlueGreenPluginFactory.blueGreenPlugin) {
2929
BlueGreenPluginFactory.blueGreenPlugin = await import("./blue_green_plugin");
3030
}
31-
return new BlueGreenPluginFactory.blueGreenPlugin.BlueGreenPlugin(servicesContainer.pluginService, props);
31+
return new BlueGreenPluginFactory.blueGreenPlugin.BlueGreenPlugin(servicesContainer, props);
3232
} catch (error: any) {
3333
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "BlueGreenPluginFactory"));
3434
}

common/lib/plugins/failover/failover_plugin.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { ClientWrapper } from "../../client_wrapper";
4343
import { getWriter, logTopology } from "../../utils/utils";
4444
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
4545
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
46+
import { FullServicesContainer } from "../../utils/full_services_container";
4647

4748
export class FailoverPlugin extends AbstractConnectionPlugin {
4849
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
@@ -79,34 +80,36 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
7980

8081
private hostListProviderService?: HostListProviderService;
8182
private readonly pluginService: PluginService;
83+
private readonly servicesContainer: FullServicesContainer;
8284
protected enableFailoverSetting: boolean = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.defaultValue;
8385

84-
constructor(pluginService: PluginService, properties: Map<string, any>, rdsHelper: RdsUtils);
86+
constructor(servicesContainer: FullServicesContainer, properties: Map<string, any>, rdsHelper: RdsUtils);
8587
constructor(
86-
pluginService: PluginService,
88+
servicesContainer: FullServicesContainer,
8789
properties: Map<string, any>,
8890
rdsHelper: RdsUtils,
8991
readerFailoverHandler: ClusterAwareReaderFailoverHandler,
9092
writerFailoverHandler: ClusterAwareWriterFailoverHandler
9193
);
9294
constructor(
93-
pluginService: PluginService,
95+
servicesContainer: FullServicesContainer,
9496
properties: Map<string, any>,
9597
rdsHelper: RdsUtils,
9698
readerFailoverHandler?: ClusterAwareReaderFailoverHandler,
9799
writerFailoverHandler?: ClusterAwareWriterFailoverHandler
98100
) {
99101
super();
100102
this._properties = properties;
101-
this.pluginService = pluginService;
103+
this.pluginService = servicesContainer.pluginService;
104+
this.servicesContainer = servicesContainer;
102105
this._rdsHelper = rdsHelper;
103106

104107
this.initSettings();
105108

106109
this._readerFailoverHandler = readerFailoverHandler
107110
? readerFailoverHandler
108111
: new ClusterAwareReaderFailoverHandler(
109-
pluginService,
112+
this.pluginService,
110113
properties,
111114
this.failoverTimeoutMsSetting,
112115
this.failoverReaderConnectTimeoutMsSetting,
@@ -115,7 +118,8 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
115118
this._writerFailoverHandler = writerFailoverHandler
116119
? writerFailoverHandler
117120
: new ClusterAwareWriterFailoverHandler(
118-
pluginService,
121+
this.pluginService,
122+
this.servicesContainer,
119123
this._readerFailoverHandler,
120124
properties,
121125
this.failoverTimeoutMsSetting,

0 commit comments

Comments
 (0)