Skip to content

Commit e117401

Browse files
authored
chore: storage service refactoring (#611)
1 parent f913149 commit e117401

13 files changed

Lines changed: 668 additions & 41 deletions

.github/workflows/integration_tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ on:
1414
- "docs/**"
1515
- "ISSUE_TEMPLATE/**"
1616
- "**/remove-old-artifacts.yml"
17+
pull_request:
18+
branches:
19+
- dev/v3
1720

1821
permissions:
1922
id-token: write # This is required for requesting the JWT

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ on:
44
push:
55
branches:
66
- main
7-
- dev/v3
87
pull_request:
98
branches:
109
- "*"
10+
- dev/v3
1111

1212
permissions:
1313
contents: read

common/lib/aws_client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ import { Messages } from "./utils/messages";
3535
import { HostListProviderService } from "./host_list_provider_service";
3636
import { SessionStateClient } from "./session_state_client";
3737
import { DriverConnectionProvider } from "./driver_connection_provider";
38+
import { StorageService } from "./utils/storage/storage_service";
39+
import { CoreServicesContainer } from "./utils/core_services_container";
3840

3941
const { EventEmitter } = pkgStream;
4042

4143
export abstract class AwsClient extends EventEmitter implements SessionStateClient {
4244
private _defaultPort: number = -1;
45+
private readonly storageService: StorageService;
4346
protected telemetryFactory: TelemetryFactory;
4447
protected pluginManager: PluginManager;
4548
protected pluginService: PluginService;
@@ -64,6 +67,8 @@ export abstract class AwsClient extends EventEmitter implements SessionStateClie
6467

6568
this.properties = new Map<string, any>(Object.entries(config));
6669

70+
this.storageService = CoreServicesContainer.getInstance().getStorageService();
71+
6772
const profileName = WrapperProperties.PROFILE_NAME.get(this.properties);
6873
if (profileName && profileName.length > 0) {
6974
this._configurationProfile = DriverConfigurationProfiles.getProfileConfiguration(profileName);

common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import { ClientWrapper } from "../../client_wrapper";
2525
import { AwsWrapperError } from "../../utils/errors";
2626
import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";
2727
import { Messages } from "../../utils/messages";
28+
import { CoreServicesContainer } from "../../utils/core_services_container";
29+
import { Topology } from "../topology";
30+
import { StorageService, StorageServiceImpl } from "../../utils/storage/storage_service";
2831

2932
export interface ClusterTopologyMonitor {
3033
forceRefresh(client: ClientWrapper, timeoutMs: number): Promise<HostInfo[]>;
@@ -45,8 +48,8 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
4548
private readonly _hostListProvider: MonitoringRdsHostListProvider;
4649
private readonly refreshRateMs: number;
4750
private readonly highRefreshRateMs: number;
51+
private readonly storageService: StorageService;
4852

49-
private topologyMap: CacheMap<string, HostInfo[]>;
5053
private writerHostInfo: HostInfo = null;
5154
private isVerifiedWriterConnection: boolean = false;
5255
private monitoringClient: ClientWrapper = null;
@@ -72,7 +75,6 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
7275

7376
constructor(
7477
clusterId: string,
75-
topologyMap: CacheMap<string, HostInfo[]>,
7678
initialHostInfo: HostInfo,
7779
props: Map<string, any>,
7880
pluginService: PluginService,
@@ -81,7 +83,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
8183
highRefreshRateMs: number
8284
) {
8385
this.clusterId = clusterId;
84-
this.topologyMap = topologyMap;
86+
this.storageService = CoreServicesContainer.getInstance().getStorageService(); // TODO: store serviceContainer instead
8587
this.initialHostInfo = initialHostInfo;
8688
this._pluginService = pluginService;
8789
this._hostListProvider = hostListProvider;
@@ -125,7 +127,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
125127
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[] | null> {
126128
if (Date.now() < this.ignoreNewTopologyRequestsEndTimeMs) {
127129
// Previous failover has just completed, use results without triggering new update.
128-
const currentHosts = this.topologyMap.get(this.clusterId);
130+
const currentHosts = this.getStoredHosts();
129131
if (currentHosts !== null) {
130132
logger.info(Messages.get("ClusterTopologyMonitoring.ignoringNewTopologyRequest"));
131133
return currentHosts;
@@ -158,7 +160,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
158160
// Signal to any monitor that might be in delay, that topology should be updated.
159161
this.requestToUpdateTopology = true;
160162

161-
const currentHosts: HostInfo[] = this.topologyMap.get(this.clusterId);
163+
const currentHosts: HostInfo[] = this.getStoredHosts();
162164

163165
if (timeoutMs === 0) {
164166
logger.info(logTopology(currentHosts, Messages.get("ClusterTopologyMonitoring.timeoutSetToZero")));
@@ -168,7 +170,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
168170
const endTime = Date.now() + timeoutMs;
169171
let latestHosts: HostInfo[];
170172

171-
while ((latestHosts = this.topologyMap.get(this.clusterId)) === currentHosts && Date.now() < endTime) {
173+
while ((latestHosts = this.getStoredHosts()) === currentHosts && Date.now() < endTime) {
172174
await sleep(1000);
173175
}
174176

@@ -244,7 +246,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
244246
}
245247

246248
updateTopologyCache(hosts: HostInfo[]): void {
247-
this.topologyMap.put(this.clusterId, hosts, ClusterTopologyMonitorImpl.TOPOLOGY_CACHE_EXPIRATION_NANOS);
249+
this.storageService.set(this.clusterId, new Topology(hosts));
248250
this.requestToUpdateTopology = false;
249251
}
250252

@@ -296,7 +298,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
296298
this.hostMonitorsLatestTopology = [];
297299

298300
// Use any client to gather topology information.
299-
let hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
301+
let hosts: HostInfo[] = this.getStoredHosts();
300302
if (!hosts) {
301303
hosts = await this.openAnyClientAndUpdateTopology();
302304
}
@@ -392,6 +394,11 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
392394
}
393395
}
394396

397+
private getStoredHosts(): HostInfo[] | null {
398+
const topology = this.storageService.get(Topology, this.clusterId);
399+
return topology == null ? null : topology.hosts;
400+
}
401+
395402
private async delay(useHighRefreshRate: boolean) {
396403
if (Date.now() < this.highRefreshRateEndTimeMs) {
397404
useHighRefreshRate = true;
@@ -404,7 +411,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
404411
}
405412

406413
logTopology(msgPrefix: string) {
407-
const hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
414+
const hosts: HostInfo[] = this.getStoredHosts();
408415
if (hosts && hosts.length !== 0) {
409416
logger.debug(logTopology(hosts, msgPrefix));
410417
}

common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
8989
() =>
9090
new ClusterTopologyMonitorImpl(
9191
this.clusterId,
92-
MonitoringRdsHostListProvider.topologyCache,
9392
this.initialHost,
9493
this.properties,
9594
this.pluginService,

common/lib/host_list_provider/rds_host_list_provider.ts

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,15 @@ import { CacheMap } from "../utils/cache_map";
3030
import { isDialectTopologyAware, logTopology } from "../utils/utils";
3131
import { DatabaseDialect } from "../database_dialect/database_dialect";
3232
import { ClientWrapper } from "../client_wrapper";
33+
import { CoreServicesContainer } from "../utils/core_services_container";
34+
import { StorageService } from "../utils/storage/storage_service";
35+
import { Topology } from "./topology";
36+
import { ExpirationCache } from "../utils/storage/expiration_cache";
3337

3438
export class RdsHostListProvider implements DynamicHostListProvider {
3539
private readonly originalUrl: string;
3640
private readonly rdsHelper: RdsUtils;
41+
private readonly storageService: StorageService;
3742
protected readonly properties: Map<string, any>;
3843
private rdsUrlType: RdsUrlType;
3944
private initialHostList: HostInfo[];
@@ -45,9 +50,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
4550
protected readonly hostListProviderService: HostListProviderService;
4651

4752
public static readonly suggestedPrimaryClusterIdCache: CacheMap<string, string> = new CacheMap<string, string>();
48-
4953
public static readonly primaryClusterIdCache: CacheMap<string, boolean> = new CacheMap<string, boolean>();
50-
public static readonly topologyCache: CacheMap<string, HostInfo[]> = new CacheMap<string, HostInfo[]>();
5154
public clusterId: string = Date.now().toString();
5255
public isInitialized: boolean = false;
5356
public isPrimaryClusterId?: boolean;
@@ -59,6 +62,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
5962
this.connectionUrlParser = hostListProviderService.getConnectionUrlParser();
6063
this.originalUrl = originalUrl;
6164
this.properties = properties;
65+
this.storageService = CoreServicesContainer.getInstance().getStorageService(); // TODO: store the service container instead.
6266

6367
let port = WrapperProperties.PORT.get(properties);
6468
if (port == null) {
@@ -197,7 +201,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
197201
this.isPrimaryClusterId = true;
198202
}
199203

200-
const cachedHosts: HostInfo[] | null = RdsHostListProvider.topologyCache.get(this.clusterId);
204+
const cachedHosts: HostInfo[] | null = this.getStoredTopology();
201205

202206
// This clusterId is a primary one and is about to create a new entry in the cache.
203207
// When a primary entry is created it needs to be suggested for other (non-primary) entries.
@@ -211,7 +215,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
211215

212216
const hosts = await this.queryForTopology(targetClient, this.hostListProviderService.getDialect());
213217
if (hosts && hosts.length > 0) {
214-
RdsHostListProvider.topologyCache.put(this.clusterId, hosts, this.refreshRateNano);
218+
this.storageService.set(this.clusterId, new Topology(hosts));
215219
if (needToSuggest) {
216220
this.suggestPrimaryCluster(hosts);
217221
}
@@ -227,14 +231,18 @@ export class RdsHostListProvider implements DynamicHostListProvider {
227231
}
228232

229233
private getSuggestedClusterId(hostAndPort: string): ClusterSuggestedResult | null {
230-
for (const [key, hosts] of RdsHostListProvider.topologyCache.getEntries()) {
234+
const cache: ExpirationCache<string, Topology> = this.storageService.getAll(Topology) as ExpirationCache<string, Topology>;
235+
if (!cache) {
236+
return null;
237+
}
238+
for (const [key, hosts] of cache.getEntries()) {
231239
const isPrimaryCluster: boolean = RdsHostListProvider.primaryClusterIdCache.get(key, false, this.suggestedClusterIdRefreshRateNano) ?? false;
232240
if (key === hostAndPort) {
233241
return new ClusterSuggestedResult(hostAndPort, isPrimaryCluster);
234242
}
235243

236244
if (hosts) {
237-
for (const hostInfo of hosts) {
245+
for (const hostInfo of hosts.hosts) {
238246
if (hostInfo.hostAndPort === hostAndPort) {
239247
logger.debug(Messages.get("RdsHostListProvider.suggestedClusterId", key, hostAndPort));
240248
return new ClusterSuggestedResult(key, isPrimaryCluster);
@@ -255,7 +263,11 @@ export class RdsHostListProvider implements DynamicHostListProvider {
255263
primaryClusterHostUrls.add(hostInfo.url);
256264
});
257265

258-
for (const [clusterId, clusterHosts] of RdsHostListProvider.topologyCache.getEntries()) {
266+
const cache: ExpirationCache<string, Topology> = this.storageService.getAll(Topology) as ExpirationCache<string, Topology>;
267+
if (!cache) {
268+
return;
269+
}
270+
for (const [clusterId, clusterHosts] of cache.getEntries()) {
259271
const isPrimaryCluster: boolean | null = RdsHostListProvider.primaryClusterIdCache.get(
260272
clusterId,
261273
false,
@@ -266,7 +278,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
266278
continue;
267279
}
268280

269-
for (const clusterHost of clusterHosts) {
281+
for (const clusterHost of clusterHosts.hosts) {
270282
if (primaryClusterHostUrls.has(clusterHost.url)) {
271283
RdsHostListProvider.suggestedPrimaryClusterIdCache.put(clusterId, this.clusterId, this.suggestedClusterIdRefreshRateNano);
272284
break;
@@ -343,22 +355,24 @@ export class RdsHostListProvider implements DynamicHostListProvider {
343355
return host.replace("?", hostName);
344356
}
345357

346-
getCachedTopology(): HostInfo[] | null {
358+
getStoredTopology(): HostInfo[] | null {
347359
if (!this.clusterId) {
348360
return null;
349361
}
350-
return RdsHostListProvider.topologyCache.get(this.clusterId) ?? null;
362+
363+
const topology: Topology = this.storageService.get(Topology, this.clusterId);
364+
365+
return topology == null ? null : topology.hosts;
351366
}
352367

353368
static clearAll(): void {
354-
RdsHostListProvider.topologyCache.clear();
355369
RdsHostListProvider.primaryClusterIdCache.clear();
356370
RdsHostListProvider.suggestedPrimaryClusterIdCache.clear();
357371
}
358372

359373
clear(): void {
360374
if (this.clusterId) {
361-
RdsHostListProvider.topologyCache.delete(this.clusterId);
375+
CoreServicesContainer.getInstance().getStorageService().remove(Topology, this.clusterId);
362376
}
363377
}
364378

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { HostInfo } from "../host_info";
18+
19+
export class Topology {
20+
private readonly _hosts: HostInfo[];
21+
22+
constructor(hosts: HostInfo[]) {
23+
this._hosts = hosts;
24+
}
25+
26+
get hosts(): HostInfo[] {
27+
return this._hosts;
28+
}
29+
}

common/lib/types.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Type representing a constructor for any class.
19+
*/
20+
export type Constructor<T = unknown> = new (...args: unknown[]) => T;
21+
22+
/**
23+
* Function type that determines whether an item should be disposed when expired.
24+
* @param item The item to check
25+
* @returns true if the item should be disposed, false otherwise
26+
*/
27+
export type ShouldDisposeFunc<V> = (item: V) => boolean;
28+
29+
/**
30+
* Function type that defines how to dispose of an item when it is removed.
31+
* @param item The item to dispose
32+
*/
33+
export type ItemDisposalFunc<V> = (item: V) => void;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { StorageService, StorageServiceImpl } from "./storage/storage_service";
18+
19+
/**
20+
* A singleton container object used to instantiate and access core universal services. This class should be used
21+
* instead of directly instantiating core services so that only one instance of each service is instantiated.
22+
*
23+
* @see FullServicesContainer for a container that holds both connection-specific services and core universal
24+
* services.
25+
*/
26+
export class CoreServicesContainer {
27+
private static readonly INSTANCE = new CoreServicesContainer();
28+
29+
// private readonly monitorService: MonitorService; // TODO: implement monitor service
30+
private readonly storageService: StorageService;
31+
32+
private constructor() {
33+
this.storageService = new StorageServiceImpl();
34+
// this.monitorService = new MonitorServiceImpl();
35+
}
36+
37+
static getInstance(): CoreServicesContainer {
38+
return CoreServicesContainer.INSTANCE;
39+
}
40+
41+
getStorageService(): StorageService {
42+
return this.storageService;
43+
}
44+
45+
// getMonitorService(): MonitorService {
46+
// return this.monitorService;
47+
// }
48+
}

0 commit comments

Comments
 (0)