Skip to content

Commit

Permalink
refactor: cluster topology monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 7, 2025
1 parent 5ebcaf3 commit 871729d
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 110 deletions.
182 changes: 86 additions & 96 deletions common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,45 @@
import { HostInfo } from "../../host_info";
import { CacheMap } from "../../utils/cache_map";
import { PluginService } from "../../plugin_service";
import { HostListProviderService } from "../../host_list_provider_service";
import { HostAvailability } from "../../host_availability/host_availability";
import { logTopology, sleep } from "../../utils/utils";
import { logger } from "../../../logutils";
import { HostRole } from "../../host_role";
import { ClientWrapper } from "../../client_wrapper";
import { Messages } from "../../utils/messages";
import { TopologyAwareDatabaseDialect } from "../../topology_aware_database_dialect";
import { HostListProvider } from "../host_list_provider";
import { AwsWrapperError } from "../../utils/errors";
import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";

export interface ClusterToplogyMonitor {
forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]>;

setClusterId(clusterId: string): void;

close(): void;

forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;
}

export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
public clusterId: string;
public topologyMap: CacheMap<string, HostInfo[]>;
private topologyCacheExpirationNanos: number = 10000; // TODO: investigate values and set in constructor.
protected initialHostInfo: HostInfo;
public properties: Map<string, any>;
public pluginService: PluginService;
protected hostListProviderService: HostListProviderService;
private hostListProvider: HostListProvider;
protected refreshRate: number = 300; // TODO: investigate issues with setting lower values.
private highRefreshRate: number = 300;
static readonly TOPOLOGY_CACHE_EXPIRATION_MS: number = 300000;

private readonly clusterId: string;
private readonly initialHostInfo: HostInfo;
private readonly _properties: Map<string, any>;
private readonly _pluginService: PluginService;
private readonly _hostListProvider: MonitoringRdsHostListProvider;
private readonly refreshRateMs: number;
private readonly highRefreshRateMs: number;

private topologyMap: CacheMap<string, HostInfo[]>;
private writerHostInfo: HostInfo = null;
private isVerifiedWriterConnection: boolean = false;
private monitoringClient: ClientWrapper = null;
private highRefreshRateEndTime: any = 0;
private highRefreshPeriodAfterPanic: number = 30000; // 30 seconds.
private ignoreTopologyRequest: number = 1000; // 10 seconds.
private highRefreshRateEndTime: number = 0;
private highRefreshPeriodAfterPanicMs: number = 30000; // 30 seconds.
private ignoreNewTopologyRequestsEndTime: number = 0;
private ignoreTopologyRequestMs: number = 10000; // 10 seconds.

// Controls for stopping the ClusterTopologyMonitor run.
private stopMonitoring: boolean = false;
private runPromise: Promise<void>;
private readonly runPromise: Promise<void>;

// Tracking of the host monitors.
private hostMonitors: Map<string, HostMonitor> = new Map();
Expand All @@ -87,20 +82,29 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
initialHostSpec: HostInfo,
props,
pluginService: PluginService,
hostListProviderService: HostListProviderService,
hostListProvider: HostListProvider,
refreshRateNano
hostListProvider: MonitoringRdsHostListProvider,
refreshRateMs: number,
highRefreshRateMs: number
) {
this.clusterId = clusterId;
this.topologyMap = topologyMap;
this.initialHostInfo = initialHostSpec;
this.pluginService = pluginService;
this.hostListProviderService = hostListProviderService;
this.hostListProvider = hostListProvider;
this.properties = props;
//this.refreshRateNano = refreshRateNano; // TODO: coordinate timeouts for bigint or number.
const runMonitor = this.run();
this.runPromise = runMonitor;
this._pluginService = pluginService;
this._hostListProvider = hostListProvider;
this._properties = props;
this.refreshRateMs = refreshRateMs;
this.highRefreshRateMs = highRefreshRateMs;
this.runPromise = this.run();
}

get hostListProvider(): MonitoringRdsHostListProvider {
return this._hostListProvider;
}
get pluginService(): PluginService {
return this._pluginService;
}
get properties(): Map<string, any> {
return this._properties;
}

async close(): Promise<void> {
Expand All @@ -111,10 +115,6 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
this.hostMonitors.clear();
}

setClusterId(clusterId: string): void {
this.clusterId = clusterId;
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
const currentHosts = this.topologyMap.get(this.clusterId);
if (currentHosts) {
Expand All @@ -127,7 +127,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
await this.closeConnection(client);
}

return this.waitTillTopologyGetsUpdated(timeoutMs);
return await this.waitTillTopologyGetsUpdated(timeoutMs);
}

async forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]> {
Expand Down Expand Up @@ -180,7 +180,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

try {
const hosts: HostInfo[] = await this.queryForTopology(client);
const hosts: HostInfo[] = await this._hostListProvider.sqlQueryForTopology(client);
if (hosts) {
this.updateTopologyCache(hosts);
}
Expand All @@ -191,58 +191,46 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
return null;
}

private openAnyClientAndUpdateTopology() {
// TODO: implement method.
return [];
}

async queryForTopology(targetClient: ClientWrapper): Promise<HostInfo[]> {
const dialect = this.hostListProviderService.getDialect();
if (!this.isTopologyAwareDatabaseDialect(dialect)) {
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
}
return await dialect.queryForTopology(targetClient, this.hostListProvider).then((res: any) => this.processQueryResults(res));
}

protected isTopologyAwareDatabaseDialect(arg: any): arg is TopologyAwareDatabaseDialect {
return arg;
}

private async processQueryResults(result: HostInfo[]): Promise<HostInfo[]> {
const hostMap: Map<string, HostInfo> = new Map<string, HostInfo>();

let hosts: HostInfo[] = [];
const writers: HostInfo[] = [];
result.forEach((host) => {
hostMap.set(host.host, host);
});
private async openAnyClientAndUpdateTopology() {
if (!this.monitoringClient) {
let client;
try {
client = await this._pluginService.forceConnect(this.initialHostInfo, this._properties);
if (!this.monitoringClient) {
this.monitoringClient = client;
client = await this._pluginService.forceConnect(this.initialHostInfo, this._properties);
}
} catch {
logger.debug(`Could not connect to: ${this.initialHostInfo.host}`);
return null;
}

hostMap.forEach((host) => {
if (host.role !== HostRole.WRITER) {
hosts.push(host);
if (client && !this.monitoringClient) {
this.monitoringClient = client;
logger.debug(`Opened monitoring connection to: ${this.initialHostInfo.host}`);
if (this.getWriterHostId(this.monitoringClient) !== null) {
this.isVerifiedWriterConnection = true;
this.writerHostInfo = this.initialHostInfo;
}
} else {
writers.push(host);
// Monitoring connection already set by another task, close the new connection.
this.untrackedPromises.push(this.closeConnection(client));
}
});
}

const writerCount: number = writers.length;
if (writerCount === 0) {
hosts = [];
} else if (writerCount === 1) {
hosts.push(writers[0]);
} else {
const sortedWriters: HostInfo[] = writers.sort((a, b) => {
return b.lastUpdateTime - a.lastUpdateTime;
});
const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (!hosts) {
const clientToClose = this.monitoringClient;
this.monitoringClient = null;
this.isVerifiedWriterConnection = false;

hosts.push(sortedWriters[0]);
this.untrackedPromises.push(this.closeConnection(clientToClose));
}

return hosts;
}

updateTopologyCache(hosts: HostInfo[]) {
this.topologyMap.put(this.clusterId, hosts, this.topologyCacheExpirationNanos);
this.topologyMap.put(this.clusterId, hosts, ClusterToplogyMonitorImpl.TOPOLOGY_CACHE_EXPIRATION_MS);
this.releaseTopologyUpdate();
this.topologyUpdated = new Promise<void>((done) => {
this.releaseTopologyUpdate = () => {
Expand All @@ -252,14 +240,14 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

getWriterHostId(client: ClientWrapper) {
return client.hostInfo.role === HostRole.WRITER ? client.id : null;
return client && client.hostInfo.role === HostRole.WRITER ? client.id : null;
}

async closeConnection(client: any) {
async closeConnection(client: ClientWrapper) {
if (!client) {
return;
}
await this.pluginService.abortTargetClient(client);
await this._pluginService.abortTargetClient(client);
}

private isInPanicMode() {
Expand All @@ -284,7 +272,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
// Use any client to gather topology information.
let hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
if (!hosts) {
hosts = this.openAnyClientAndUpdateTopology();
hosts = await this.openAnyClientAndUpdateTopology();
}

// Set up host monitors.
Expand All @@ -307,15 +295,13 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
// Writer detected, update monitoringClient.
const client = this.monitoringClient;
this.monitoringClient = writerClient;
await this.closeConnection(client);
this.untrackedPromises.push(this.closeConnection(client));
this.isVerifiedWriterConnection = true;
this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanic;
this.ignoreNewTopologyRequestsEndTime = Date.now() - this.ignoreTopologyRequest;
this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanicMs;
this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs;

// Stop monitoring of each host, writer detected.
this.hostMonitorsStop = true;
await Promise.all(this.untrackedPromises);
this.untrackedPromises = [];
this.hostMonitors.clear();
continue;
} else {
Expand All @@ -340,25 +326,23 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
if (this.hostMonitors.size !== 0) {
// Stop host monitors.
this.hostMonitorsStop = true;
await Promise.all(this.untrackedPromises);
this.untrackedPromises = [];
this.hostMonitors.clear();
}
const hosts = this.fetchTopologyAndUpdateCache(this.monitoringClient);
const hosts = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (!hosts) {
// Unable to gather topology, switch to panic mode.
const client = this.monitoringClient;
this.monitoringClient = null;
this.isVerifiedWriterConnection = false;
await this.closeConnection(client);
this.untrackedPromises.push(this.closeConnection(client));
continue;
}
if (this.highRefreshRateEndTime > 0 && Date.now() > this.highRefreshRateEndTime) {
this.highRefreshRateEndTime = 0;
}
if (this.highRefreshRateEndTime == 0) {
// Log topology when not in high refresh rate.
logger.debug(logTopology(this.topologyMap.get(this.clusterId), ""));
this.logTopology("");
}
// Set an easily interruptible delay between topology refreshes.
await this.delay(false);
Expand All @@ -379,12 +363,19 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

private async delay(useHighRefreshRate: boolean) {
const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRate : this.refreshRate);
const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRateMs : this.refreshRateMs);
while (Date.now() < endTime && !this.requestToUpdateTopology) {
await sleep(50);
}
this.requestToUpdateTopology = false;
}

logTopology(msgPrefix: string) {
const hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
if (hosts && hosts.length !== 0) {
logger.debug(logTopology(hosts, msgPrefix));
}
}
}

export class HostMonitor {
Expand Down Expand Up @@ -433,9 +424,8 @@ export class HostMonitor {
this.monitor.hostMonitorsStop = true;

await this.monitor.fetchTopologyAndUpdateCache(client);
logger.debug(logTopology(this.monitor.topologyMap.get(this.monitor.clusterId), ""));
this.monitor.logTopology("");
}

client = null;
return;
} else if (!client) {
Expand Down Expand Up @@ -467,7 +457,7 @@ export class HostMonitor {

let hosts: HostInfo[];
try {
hosts = await this.monitor.queryForTopology(client);
hosts = await this.monitor.hostListProvider.sqlQueryForTopology(client);
} catch (error) {
return;
}
Expand Down
Loading

0 comments on commit 871729d

Please sign in to comment.