Skip to content

Commit

Permalink
apply code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 14, 2025
1 parent f86dc16 commit d81f1af
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { HostRole } from "../../host_role";
import { ClientWrapper } from "../../client_wrapper";
import { AwsWrapperError } from "../../utils/errors";
import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";
import { Messages } from "../../utils/messages";

export interface ClusterTopologyMonitor {
forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]>;
Expand Down Expand Up @@ -54,20 +55,19 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
private ignoreNewTopologyRequestsEndTime: number = 0;
private ignoreTopologyRequestMs: number = 10000; // 10 seconds.

// Controls for stopping the ClusterTopologyMonitor run.
private stopMonitoring: boolean = false;

// Tracking of the host monitors.
private hostMonitors: Map<string, HostMonitor> = new Map();
public hostMonitorsWriterClient = null;
public hostMonitorsWriterInfo: HostInfo = null;
public hostMonitorsReaderClient = null;
public hostMonitorsLatestTopology: HostInfo[] = null;
// Controls for stopping all the host monitors run.

// Controls for stopping asynchronous monitoring tasks.
private stopMonitoring: boolean = false;
public hostMonitorsStop: boolean = false;
private untrackedPromises: Promise<void>[] = [];

// Signals to other methods that asynchronous operations have completed/should be completed.
// Signals to other methods that asynchronous tasks have completed/should be completed.
private requestToUpdateTopology: boolean = false;

constructor(
Expand Down Expand Up @@ -111,6 +111,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
async close(): Promise<void> {
this.stopMonitoring = true;
this.hostMonitorsStop = true;
this.requestToUpdateTopology = true;
await Promise.all(this.untrackedPromises);
this.untrackedPromises = [];
this.hostMonitors.clear();
Expand Down Expand Up @@ -140,12 +141,12 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return await this.waitTillTopologyGetsUpdated(timeoutMs);
}

// Otherwise use provided unverified connection to update topology
// Otherwise use provided unverified connection to update topology.
return await this.fetchTopologyAndUpdateCache(client);
}

async waitTillTopologyGetsUpdated(timeoutMs: number): Promise<HostInfo[]> {
// Signal to any monitor, that might be in delay, that topology should be updated.
// Signal to any monitor that might be in delay, that topology should be updated.
this.requestToUpdateTopology = true;

const currentHosts: HostInfo[] = this.topologyMap.get(this.clusterId);
Expand All @@ -158,12 +159,12 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}

if (Date.now() >= endTime) {
throw new AwsWrapperError(`ClusterTopologyMonitor topology update timed out in ${timeoutMs} ms.`);
throw new AwsWrapperError(Messages.get("ClusterTopologyMonitor.timeoutError", timeoutMs.toString()));
}
return latestHosts;
}

async fetchTopologyAndUpdateCache(client: any): Promise<HostInfo[]> {
async fetchTopologyAndUpdateCache(client: ClientWrapper): Promise<HostInfo[]> {
if (!client) {
return null;
}
Expand All @@ -175,7 +176,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}
return hosts;
} catch (error: any) {
logger.error(`Error fetching topology: ${error?.message}.`);
logger.debug(Messages.get("ClusterTopologyMonitor.errorFetchingTopology", error?.message));
}
return null;
}
Expand All @@ -186,13 +187,13 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
try {
client = await this._pluginService.forceConnect(this.initialHostInfo, this._monitoringProperties);
} catch {
logger.debug(`Could not connect to: ${this.initialHostInfo.host}`);
logger.debug(Messages.get("ClusterTopologyMonitor.unableToConnect", this.initialHostInfo.hostId));
return null;
}

if (client && !this.monitoringClient) {
this.monitoringClient = client;
logger.debug(`Opened monitoring connection to: ${this.initialHostInfo.host}`);
logger.debug(Messages.get("ClusterTopologyMonitor.openedMonitoringConnection", this.initialHostInfo.hostId));
if (this.getWriterHostId(this.monitoringClient) !== null) {
this.isVerifiedWriterConnection = true;
this.writerHostInfo = this.initialHostInfo;
Expand Down Expand Up @@ -227,7 +228,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
try {
await client.end();
} catch (error) {
// ignore
// Ignore.
}
}

Expand All @@ -236,14 +237,14 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}

async run() {
logger.debug(`Start cluster monitoring thread: ${this.initialHostInfo.host}`);
logger.debug(Messages.get("ClusterTopologyMonitor.startMonitoring"));
try {
while (!this.stopMonitoring) {
if (this.isInPanicMode()) {
// Panic Mode: high refresh rate in effect.

if (this.hostMonitors.size === 0) {
// Initialize host threads.
// Initialize host tasks.
this.hostMonitorsStop = false;
this.hostMonitorsWriterClient = null;
this.hostMonitorsReaderClient = null;
Expand Down Expand Up @@ -323,7 +324,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}
if (this.highRefreshRateEndTime == 0) {
// Log topology when not in high refresh rate.
this.logTopology("");
this.logTopology(`[clusterTopologyMonitor `);
}
// Set an easily interruptible delay between topology refreshes.
await this.delay(false);
Expand All @@ -333,13 +334,13 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}
}
} catch (error) {
logger.error(`Exception during monitoring: ${error}.`);
logger.debug(Messages.get("ClusterTopologyMonitor.errorDuringMonitoring", error?.message));
} finally {
this.stopMonitoring = true;
const client = this.monitoringClient;
this.monitoringClient = null;
await this.closeConnection(client);
logger.debug(`Stop monitoring ClusterTopologyMonitor: ${this.initialHostInfo.getHostAndPort()}.`);
logger.debug(Messages.get("ClusterTopologyMonitor.endMonitoring"));
}
}

Expand Down Expand Up @@ -378,7 +379,7 @@ export class HostMonitor {
let client = null;
let updateTopology: boolean = false;
const startTime: number = Date.now();
logger.debug(`Host monitor ${this.hostInfo.hostId} started.`);
logger.debug(Messages.get("HostMonitor.startMonitoring", this.hostInfo.hostId));
try {
while (!this.monitor.hostMonitorsStop) {
if (!client) {
Expand All @@ -405,22 +406,22 @@ export class HostMonitor {
} else {
this.monitor.hostMonitorsWriterClient = client;
this.monitor.hostMonitorsWriterInfo = this.hostInfo;
logger.debug(`Detected writer: ${writerId}`);
logger.debug(Messages.get("HostMonitor.detectedWriter", writerId));
this.monitor.hostMonitorsStop = true;

await this.monitor.fetchTopologyAndUpdateCache(client);
this.monitor.logTopology("");
this.monitor.logTopology(`[hostMonitor ${this.hostInfo.hostId}] `);
}
client = null;
return;
} else if (!client) {
if (!this.monitor.hostMonitorsWriterClient) {
if (updateTopology) {
await this.readerThreadFetchTopology(client, this.writerHostInfo);
await this.readerTaskFetchTopology(client, this.writerHostInfo);
} else if (!this.monitor.hostMonitorsReaderClient) {
this.monitor.hostMonitorsReaderClient = client;
updateTopology = true;
await this.readerThreadFetchTopology(client, this.writerHostInfo);
await this.readerTaskFetchTopology(client, this.writerHostInfo);
}
}
}
Expand All @@ -431,11 +432,11 @@ export class HostMonitor {
// Close the monitor.
} finally {
await this.monitor.closeConnection(client);
logger.debug(`Host monitor ${this.hostInfo.hostId} completed in ${Date.now() - startTime}.`);
logger.debug(Messages.get("HostMonitor.endMonitoring", this.hostInfo.hostId, (Date.now() - startTime).toString()));
}
}

private async readerThreadFetchTopology(client: any, writerHostInfo: HostInfo) {
private async readerTaskFetchTopology(client: any, writerHostInfo: HostInfo) {
if (!client) {
return;
}
Expand All @@ -451,17 +452,17 @@ export class HostMonitor {

if (this.writerChanged) {
this.monitor.updateTopologyCache(hosts);
logger.debug(logTopology(hosts, ""));
logger.debug(logTopology(hosts, `[hostMonitor ${this.hostInfo.hostId}] `));
return;
}

const latestWriterHostInfo: HostInfo = hosts.find((x) => x.role === HostRole.WRITER);
if (latestWriterHostInfo && writerHostInfo && latestWriterHostInfo.getHostAndPort() !== writerHostInfo.getHostAndPort()) {
this.writerChanged = true;

logger.debug(`Writer host has changed from ${writerHostInfo.getHostAndPort()} to ${latestWriterHostInfo.getHostAndPort()}.`);
logger.debug(Messages.get("HostMonitor.writerHostChanged", writerHostInfo.getHostAndPort(), latestWriterHostInfo.getHostAndPort()));
this.monitor.updateTopologyCache(hosts);
logger.debug(logTopology(hosts, ""));
logger.debug(logTopology(hosts, `[hostMonitor ${this.hostInfo.hostId}] `));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
try {
await monitor.close();
} catch {
// ignore
// Ignore.
}
}
);
Expand All @@ -50,11 +50,12 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
this.pluginService = pluginService;
}

static async clearAll(): Promise<void> {
static async releaseResources(): Promise<void> {
super.clearAll();
for (const [key, monitor] of this.monitors.entries) {
await monitor.item.close();
}
this.monitors.map.clear();
}

async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
Expand Down
6 changes: 2 additions & 4 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ import { ClientWrapper } from "./client_wrapper";
import { logger } from "../logutils";
import { Messages } from "./utils/messages";
import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes";
import { getWriter, logTopology } from "./utils/utils";
import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConfigurationProfile } from "./profile/configuration_profile";
import { SessionState } from "./session_state";
import { MonitoringRdsHostListProvider } from "./host_list_provider/monitoring/monitoring_host_list_provider";

export class PluginService implements ErrorHandler, HostListProviderService {
Expand Down Expand Up @@ -178,7 +176,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

async initiateTopologyUpdate(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
const hostListProvider: HostListProvider = this.getHostListProvider();
const hostListProvider = this.getHostListProvider();
if (!(hostListProvider instanceof MonitoringRdsHostListProvider)) {
throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider);
}
Expand Down
Loading

0 comments on commit d81f1af

Please sign in to comment.