Skip to content

Commit 58e445b

Browse files
authored
chore: topology utils refactoring (#615)
1 parent 6612a60 commit 58e445b

17 files changed

Lines changed: 1057 additions & 286 deletions

common/lib/database_dialect/topology_aware_database_dialect.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { HostInfo } from "../host_info";
18-
import { HostListProvider } from "../host_list_provider/host_list_provider";
1917
import { HostRole } from "../host_role";
2018
import { ClientWrapper } from "../client_wrapper";
19+
import { TopologyQueryResult } from "../host_list_provider/topology_utils";
2120

2221
export interface TopologyAwareDatabaseDialect {
23-
queryForTopology(client: ClientWrapper, hostListProvider: HostListProvider): Promise<HostInfo[]>;
22+
queryForTopology(client: ClientWrapper): Promise<TopologyQueryResult[]>;
2423

2524
identifyConnection(targetClient: ClientWrapper): Promise<string>;
2625

2726
getHostRole(client: ClientWrapper): Promise<HostRole>;
2827

2928
// Returns the host id of the targetClient if it is connected to a writer, null otherwise.
3029
getWriterId(targetClient: ClientWrapper): Promise<string | null>;
30+
31+
getInstanceId(targetClient: ClientWrapper): Promise<[string, string]>;
3132
}
3233

3334
export interface GlobalAuroraTopologyDialect extends TopologyAwareDatabaseDialect {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { TopologyQueryResult, TopologyUtils } from "./topology_utils";
18+
import { ClientWrapper } from "../client_wrapper";
19+
import { DatabaseDialect } from "../database_dialect/database_dialect";
20+
import { HostInfo } from "../host_info";
21+
import { isDialectTopologyAware } from "../utils/utils";
22+
import { Messages } from "../utils/messages";
23+
import { AwsWrapperError } from "../utils/errors";
24+
25+
export class GlobalTopologyUtils extends TopologyUtils {
26+
async queryForTopology(
27+
targetClient: ClientWrapper,
28+
dialect: DatabaseDialect,
29+
initialHost: HostInfo,
30+
clusterInstanceTemplate: HostInfo
31+
): Promise<HostInfo[]> {
32+
throw new AwsWrapperError("Not implemented");
33+
}
34+
35+
async queryForTopologyWithRegion(
36+
targetClient: ClientWrapper,
37+
dialect: DatabaseDialect,
38+
initialHost: HostInfo,
39+
instanceTemplateByRegion: Map<string, HostInfo>
40+
): Promise<HostInfo[]> {
41+
if (!isDialectTopologyAware(dialect)) {
42+
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
43+
}
44+
45+
return await dialect
46+
.queryForTopology(targetClient)
47+
.then((res: TopologyQueryResult[]) => this.verifyWriter(this.createHostsWithTemplateMap(res, initialHost, instanceTemplateByRegion)));
48+
}
49+
50+
private createHostsWithTemplateMap(
51+
topologyQueryResults: TopologyQueryResult[],
52+
initialHost: HostInfo,
53+
instanceTemplateByRegion: Map<string, HostInfo>
54+
): HostInfo[] {
55+
const hostsMap = new Map<string, HostInfo>();
56+
topologyQueryResults.forEach((row) => {
57+
if (!row.awsRegion) {
58+
throw new AwsWrapperError(Messages.get("GlobalTopologyUtils.missingRegion", row.host));
59+
}
60+
const clusterInstanceTemplate = instanceTemplateByRegion.get(row.awsRegion);
61+
62+
if (!clusterInstanceTemplate) {
63+
throw new AwsWrapperError(Messages.get("GlobalTopologyUtils.missingTemplateForRegion", row.awsRegion, row.host));
64+
}
65+
66+
const lastUpdateTime = row.lastUpdateTime ?? Date.now();
67+
68+
const host = this.createHost(
69+
row.id,
70+
row.host,
71+
row.isWriter,
72+
row.weight,
73+
lastUpdateTime,
74+
initialHost,
75+
clusterInstanceTemplate,
76+
row.endpoint,
77+
row.port
78+
);
79+
80+
const existing = hostsMap.get(host.host);
81+
if (!existing || existing.lastUpdateTime < host.lastUpdateTime) {
82+
hostsMap.set(host.host, host);
83+
}
84+
});
85+
86+
return Array.from(hostsMap.values());
87+
}
88+
}

common/lib/host_list_provider/host_list_provider.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ export interface HostListProvider {
4040

4141
getHostRole(client: ClientWrapper, dialect: DatabaseDialect): Promise<HostRole>;
4242

43-
identifyConnection(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo | null>;
44-
45-
createHost(host: string, isWriter: boolean, weight: number, lastUpdateTime: number, port?: number): HostInfo;
43+
identifyConnection(targetClient: ClientWrapper): Promise<HostInfo | null>;
4644

4745
getHostProviderType(): string;
4846

common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616

1717
import { HostInfo } from "../../host_info";
18-
import { CacheMap } from "../../utils/cache_map";
1918
import { PluginService } from "../../plugin_service";
2019
import { HostAvailability } from "../../host_availability/host_availability";
2120
import { logTopology, sleep } from "../../utils/utils";
@@ -27,7 +26,9 @@ import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";
2726
import { Messages } from "../../utils/messages";
2827
import { CoreServicesContainer } from "../../utils/core_services_container";
2928
import { Topology } from "../topology";
30-
import { StorageService, StorageServiceImpl } from "../../utils/storage/storage_service";
29+
import { StorageService } from "../../utils/storage/storage_service";
30+
import { TopologyUtils } from "../topology_utils";
31+
import { RdsUtils } from "../../utils/rds_utils";
3132

3233
export interface ClusterTopologyMonitor {
3334
forceRefresh(client: ClientWrapper, timeoutMs: number): Promise<HostInfo[]>;
@@ -49,6 +50,9 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
4950
private readonly refreshRateMs: number;
5051
private readonly highRefreshRateMs: number;
5152
private readonly storageService: StorageService;
53+
private readonly topologyUtils: TopologyUtils;
54+
private readonly rdsUtils: RdsUtils = new RdsUtils();
55+
private readonly instanceTemplate: HostInfo;
5256

5357
private writerHostInfo: HostInfo = null;
5458
private isVerifiedWriterConnection: boolean = false;
@@ -74,17 +78,21 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
7478
private requestToUpdateTopology: boolean = false;
7579

7680
constructor(
81+
topologyUtils: TopologyUtils,
7782
clusterId: string,
7883
initialHostInfo: HostInfo,
7984
props: Map<string, any>,
85+
instanceTemplate: HostInfo,
8086
pluginService: PluginService,
8187
hostListProvider: MonitoringRdsHostListProvider,
8288
refreshRateMs: number,
8389
highRefreshRateMs: number
8490
) {
91+
this.topologyUtils = topologyUtils;
8592
this.clusterId = clusterId;
8693
this.storageService = CoreServicesContainer.getInstance().getStorageService(); // TODO: store serviceContainer instead
8794
this.initialHostInfo = initialHostInfo;
95+
this.instanceTemplate = instanceTemplate;
8896
this._pluginService = pluginService;
8997
this._hostListProvider = hostListProvider;
9098
this.refreshRateMs = refreshRateMs;
@@ -212,12 +220,19 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
212220
this.monitoringClient = client;
213221
logger.debug(Messages.get("ClusterTopologyMonitor.openedMonitoringConnection", this.initialHostInfo.host));
214222
try {
215-
const writerId = await this.getWriterHostIdIfConnected(this.monitoringClient, this.initialHostInfo.hostId);
216-
if (writerId) {
223+
if (await this.topologyUtils.isWriterInstance(this.monitoringClient)) {
217224
this.isVerifiedWriterConnection = true;
218-
this.writerHostInfo = this.initialHostInfo;
219-
logger.info(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.initialHostInfo.host));
220-
writerVerifiedByThisTask = true;
225+
226+
if (this.rdsUtils.isRdsInstance(this.initialHostInfo.host)) {
227+
this.writerHostInfo = this.initialHostInfo;
228+
logger.info(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.writerHostInfo.host));
229+
writerVerifiedByThisTask = true;
230+
} else {
231+
const pair: [string, string] = await this.topologyUtils.getInstanceId(this.monitoringClient);
232+
const instanceTemplate: HostInfo = await this.getInstanceTemplate(pair[1], this.monitoringClient);
233+
this.writerHostInfo = this.topologyUtils.createHost(pair[0], pair[1], true, 0, Date.now(), this.initialHostInfo, instanceTemplate);
234+
logger.debug(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.writerHostInfo.host));
235+
}
221236
}
222237
} catch (error) {
223238
// Do nothing.
@@ -245,6 +260,10 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
245260
return hosts;
246261
}
247262

263+
protected getInstanceTemplate(hostId: string, targetClient: ClientWrapper): Promise<HostInfo> {
264+
return Promise.resolve(this.instanceTemplate);
265+
}
266+
248267
updateTopologyCache(hosts: HostInfo[]): void {
249268
this.storageService.set(this.clusterId, new Topology(hosts));
250269
this.requestToUpdateTopology = false;
@@ -513,7 +532,7 @@ export class HostMonitor {
513532
let hosts: HostInfo[];
514533
try {
515534
hosts = await this.monitor.hostListProvider.sqlQueryForTopology(client);
516-
if (hosts === null || hosts.length === 0) {
535+
if (hosts === null) {
517536
return;
518537
}
519538
this.monitor.hostMonitorsLatestTopology = hosts;

common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { BlockingHostListProvider } from "../host_list_provider";
2727
import { logger } from "../../../logutils";
2828
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";
2929
import { isDialectTopologyAware } from "../../utils/utils";
30+
import { TopologyUtils } from "../topology_utils";
3031

3132
export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider {
3233
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute.
@@ -48,8 +49,14 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
4849

4950
private readonly pluginService: PluginService;
5051

51-
constructor(properties: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService, pluginService: PluginService) {
52-
super(properties, originalUrl, hostListProviderService);
52+
constructor(
53+
properties: Map<string, any>,
54+
originalUrl: string,
55+
topologyUtils: TopologyUtils,
56+
hostListProviderService: HostListProviderService,
57+
pluginService: PluginService
58+
) {
59+
super(properties, originalUrl, topologyUtils, hostListProviderService);
5360
this.pluginService = pluginService;
5461
}
5562

@@ -58,7 +65,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
5865
await MonitoringRdsHostListProvider.monitors.clear();
5966
}
6067

61-
async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
68+
async getCurrentTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
6269
const monitor: ClusterTopologyMonitor = this.initMonitor();
6370

6471
try {
@@ -70,11 +77,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
7077
}
7178

7279
async sqlQueryForTopology(targetClient: ClientWrapper): Promise<HostInfo[]> {
73-
const dialect: DatabaseDialect = this.hostListProviderService.getDialect();
74-
if (!isDialectTopologyAware(dialect)) {
75-
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
76-
}
77-
return await dialect.queryForTopology(targetClient, this).then((res: any) => this.processQueryResults(res));
80+
return await this.topologyUtils.queryForTopology(targetClient, this.pluginService.getDialect(), this.initialHost, this.clusterInstanceTemplate);
7881
}
7982

8083
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
@@ -88,9 +91,11 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
8891
this.clusterId,
8992
() =>
9093
new ClusterTopologyMonitorImpl(
94+
this.topologyUtils,
9195
this.clusterId,
9296
this.initialHost,
9397
this.properties,
98+
this.clusterInstanceTemplate,
9499
this.pluginService,
95100
this,
96101
WrapperProperties.CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties),

0 commit comments

Comments
 (0)