Skip to content

Commit e85ef18

Browse files
committed
chore: event publisher
1 parent 446836f commit e85ef18

16 files changed

Lines changed: 575 additions & 18 deletions

.prettierrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"semi": true,
33
"trailingComma": "none",
44
"printWidth": 150,
5-
"endOfLine": "lf"
5+
"endOfLine": "auto"
66
}

common/lib/aws_client.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { StorageService } from "./utils/storage/storage_service";
3737
import { MonitorService } from "./utils/monitoring/monitor_service";
3838
import { CoreServicesContainer } from "./utils/core_services_container";
3939
import { FullServicesContainer } from "./utils/full_services_container";
40+
import { EventPublisher } from "./utils/events/event";
4041

4142
const { EventEmitter } = pkgStream;
4243

@@ -45,6 +46,7 @@ export abstract class AwsClient extends EventEmitter implements SessionStateClie
4546
private readonly fullServiceContainer: FullServicesContainer;
4647
private readonly storageService: StorageService;
4748
private readonly monitorService: MonitorService;
49+
private readonly eventPublisher: EventPublisher;
4850
protected telemetryFactory: TelemetryFactory;
4951
protected pluginManager: PluginManager;
5052
protected pluginService: PluginService;
@@ -108,11 +110,13 @@ export abstract class AwsClient extends EventEmitter implements SessionStateClie
108110
const coreServicesContainer: CoreServicesContainer = CoreServicesContainer.getInstance();
109111
this.storageService = coreServicesContainer.getStorageService();
110112
this.monitorService = coreServicesContainer.getMonitorService();
113+
this.eventPublisher = coreServicesContainer.getEventPublisher();
111114
this.telemetryFactory = new DefaultTelemetryFactory(this.properties);
112115

113116
this.fullServiceContainer = ServiceUtils.instance.createStandardServiceContainer(
114117
this.storageService,
115118
this.monitorService,
119+
this.eventPublisher,
116120
this,
117121
this.properties,
118122
dbType,

common/lib/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ export type ShouldDisposeFunc<V> = (item: V) => boolean;
3131
* @param item The item to dispose
3232
*/
3333
export type ItemDisposalFunc<V> = (item: V) => void;
34+
35+
/**
36+
* Type representing an event class constructor.
37+
*/
38+
export type EventClass<T extends import("./utils/events/event").Event = import("./utils/events/event").Event> =
39+
new (...args: any[]) => T;

common/lib/utils/core_services_container.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import { StorageService, StorageServiceImpl } from "./storage/storage_service";
1818
import { MonitorService, MonitorServiceImpl } from "./monitoring/monitor_service";
19+
import { EventPublisher } from "./events/event";
20+
import { BatchingEventPublisher } from "./events/batching_event_publisher";
1921

2022
/**
2123
* A singleton container object used to instantiate and access core universal services. This class should be used
@@ -27,13 +29,14 @@ import { MonitorService, MonitorServiceImpl } from "./monitoring/monitor_service
2729
export class CoreServicesContainer {
2830
private static readonly INSTANCE = new CoreServicesContainer();
2931

30-
// TODO: implement monitor service
3132
private readonly monitorService: MonitorService;
3233
private readonly storageService: StorageService;
34+
private readonly eventPublisher: EventPublisher;
3335

3436
private constructor() {
35-
this.storageService = new StorageServiceImpl();
36-
this.monitorService = new MonitorServiceImpl();
37+
this.eventPublisher = new BatchingEventPublisher();
38+
this.storageService = new StorageServiceImpl(this.eventPublisher);
39+
this.monitorService = new MonitorServiceImpl(this.eventPublisher);
3740
}
3841

3942
static getInstance(): CoreServicesContainer {
@@ -48,6 +51,10 @@ export class CoreServicesContainer {
4851
return this.monitorService;
4952
}
5053

54+
getEventPublisher(): EventPublisher {
55+
return this.eventPublisher;
56+
}
57+
5158
static releaseResources(): void {
5259
CoreServicesContainer.INSTANCE.storageService.releaseResources();
5360
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { Event, EventPublisher, EventSubscriber } from "./event";
18+
import { EventClass } from "../../types";
19+
20+
const DEFAULT_MESSAGE_INTERVAL_MS = 30_000; // 30 seconds
21+
22+
/**
23+
* An event publisher that periodically publishes a batch of all unique events
24+
* encountered during the latest time interval.
25+
*/
26+
export class BatchingEventPublisher implements EventPublisher {
27+
protected readonly subscribersMap = new Map<EventClass, Set<EventSubscriber>>();
28+
protected readonly pendingEvents = new Set<Event>();
29+
protected publishingInterval?: ReturnType<typeof setInterval>;
30+
31+
constructor(messageIntervalMs: number = DEFAULT_MESSAGE_INTERVAL_MS) {
32+
this.initPublishingInterval(messageIntervalMs);
33+
}
34+
35+
protected initPublishingInterval(messageIntervalMs: number): void {
36+
this.publishingInterval = setInterval(() => this.sendMessages(), messageIntervalMs);
37+
// Allow the process to exit even if the interval is still running
38+
this.publishingInterval.unref();
39+
}
40+
41+
protected sendMessages(): void {
42+
for (const event of this.pendingEvents) {
43+
this.pendingEvents.delete(event);
44+
this.deliverEvent(event);
45+
}
46+
}
47+
48+
protected deliverEvent(event: Event): void {
49+
const subscribers = this.subscribersMap.get(event.constructor as EventClass);
50+
if (!subscribers) {
51+
return;
52+
}
53+
54+
for (const subscriber of subscribers) {
55+
subscriber.processEvent(event);
56+
}
57+
}
58+
59+
subscribe(subscriber: EventSubscriber, eventClasses: Set<EventClass>): void {
60+
for (const eventClass of eventClasses) {
61+
let subscribers = this.subscribersMap.get(eventClass);
62+
if (!subscribers) {
63+
subscribers = new Set();
64+
this.subscribersMap.set(eventClass, subscribers);
65+
}
66+
subscribers.add(subscriber);
67+
}
68+
}
69+
70+
unsubscribe(subscriber: EventSubscriber, eventClasses: Set<EventClass>): void {
71+
for (const eventClass of eventClasses) {
72+
const subscribers = this.subscribersMap.get(eventClass);
73+
if (subscribers) {
74+
subscribers.delete(subscriber);
75+
if (subscribers.size === 0) {
76+
this.subscribersMap.delete(eventClass);
77+
}
78+
}
79+
}
80+
}
81+
82+
publish(event: Event): void {
83+
if (event.isImmediateDelivery) {
84+
this.deliverEvent(event);
85+
} else {
86+
this.pendingEvents.add(event);
87+
}
88+
}
89+
90+
releaseResources(): void {
91+
if (this.publishingInterval) {
92+
clearInterval(this.publishingInterval);
93+
this.publishingInterval = undefined;
94+
}
95+
}
96+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { Event } from "./event";
18+
19+
/**
20+
* A class defining a data access event. The class specifies the class of the data
21+
* that was accessed and the key for the data.
22+
*
23+
* Used by StorageService to notify MonitorService when data is accessed,
24+
* allowing monitors to extend their expiration time.
25+
*/
26+
export class DataAccessEvent implements Event {
27+
readonly isImmediateDelivery = false;
28+
readonly dataClass: new (...args: any[]) => any;
29+
readonly key: unknown;
30+
31+
constructor(dataClass: new (...args: any[]) => any, key: unknown) {
32+
this.dataClass = dataClass;
33+
this.key = key;
34+
}
35+
}

common/lib/utils/events/event.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { EventClass } from "../../types";
18+
19+
/**
20+
* An interface for events that need to be communicated between different components.
21+
*/
22+
export interface Event {
23+
readonly isImmediateDelivery: boolean;
24+
}
25+
26+
/**
27+
* An event subscriber. Subscribers can subscribe to a publisher's events.
28+
*/
29+
export interface EventSubscriber {
30+
/**
31+
* Processes an event. This method will only be called on this subscriber
32+
* if it has subscribed to the event class.
33+
* @param event the event to process.
34+
*/
35+
processEvent(event: Event): void;
36+
}
37+
38+
/**
39+
* An event publisher that publishes events to subscribers.
40+
* Subscribers can specify which types of events they would like to receive.
41+
*/
42+
export interface EventPublisher {
43+
/**
44+
* Registers the given subscriber for the given event classes.
45+
* @param subscriber the subscriber to be notified when the given event classes occur.
46+
* @param eventClasses the classes of events that the subscriber should be notified of.
47+
*/
48+
subscribe(subscriber: EventSubscriber, eventClasses: Set<EventClass>): void;
49+
50+
/**
51+
* Unsubscribes the given subscriber from the given event classes.
52+
* @param subscriber the subscriber to unsubscribe from the given event classes.
53+
* @param eventClasses the classes of events that the subscriber wants to unsubscribe from.
54+
*/
55+
unsubscribe(subscriber: EventSubscriber, eventClasses: Set<EventClass>): void;
56+
57+
/**
58+
* Publishes an event. All subscribers to the given event class will be notified of the event.
59+
* @param event the event to publish.
60+
*/
61+
publish(event: Event): void;
62+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { Event } from "./event";
18+
19+
/**
20+
* Event indicating that a monitor should be reset with new endpoints.
21+
* Used by ClusterTopologyMonitorImpl to reset monitoring when cluster topology changes.
22+
*/
23+
export class MonitorResetEvent implements Event {
24+
readonly isImmediateDelivery = true;
25+
readonly clusterId: string;
26+
readonly endpoints: Set<string>;
27+
28+
constructor(clusterId: string, endpoints: Set<string>) {
29+
this.clusterId = clusterId;
30+
this.endpoints = endpoints;
31+
}
32+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { Event } from "./event";
18+
import { Monitor } from "../monitoring/monitor";
19+
20+
/**
21+
* Event indicating that a monitor should be stopped.
22+
* Used by MonitorService to stop and remove monitors.
23+
*/
24+
export class MonitorStopEvent implements Event {
25+
readonly isImmediateDelivery = true;
26+
readonly monitorClass: new (...args: any[]) => Monitor;
27+
readonly key: unknown;
28+
29+
constructor(monitorClass: new (...args: any[]) => Monitor, key: unknown) {
30+
this.monitorClass = monitorClass;
31+
this.key = key;
32+
}
33+
}

0 commit comments

Comments
 (0)