Skip to content

Commit

Permalink
class: add class get session
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Nov 11, 2024
1 parent a2c2fad commit 931e9a9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 176 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ system-test/*key.json
.DS_Store
package-lock.json
__pycache__
.vscode
195 changes: 35 additions & 160 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ import {finished, Duplex, Readable, Transform} from 'stream';
import {PreciseDate} from '@google-cloud/precise-date';
import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.';
import {
MultiplexedSession,
MultiplexedSessionInterface,
MultiplexedSessionOptions,
} from './multiplexed-session';

import { GetSession, GetSessionInterface } from './get-session';

Check failure on line 103 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·GetSession,·GetSessionInterface·` with `GetSession,·GetSessionInterface`

export interface MultiplexedSessionConstructor {
new (database: Database): MultiplexedSessionInterface;
}
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import IPolicy = google.iam.v1.IPolicy;
Expand All @@ -116,7 +121,6 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';
import { GetSession, GetSessionInterface } from './session-getter';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
Expand Down Expand Up @@ -348,9 +352,7 @@ export interface RestoreOptions {
class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
multiplexedSession_?: MultiplexedSessionInterface;
getSession_?: GetSessionInterface;
sessionFactory_: GetSessionInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand All @@ -365,9 +367,7 @@ class Database extends common.GrpcServiceObject {
name: string,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions,
multiplexedSessionOptions?:
| MultiplexedSessionOptions
| MultiplexedSessionConstructor
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor,

Check failure on line 370 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSessionOptions·|·MultiplexedSessionConstructor,·` with `⏎······|·MultiplexedSessionOptions⏎······|·MultiplexedSessionConstructor`

Check failure on line 370 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
) {
const methods = {
/**
Expand Down Expand Up @@ -464,21 +464,6 @@ class Database extends common.GrpcServiceObject {
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
this.multiplexedSession_ =
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this)
: new MultiplexedSession(this, multiplexedSessionOptions);
this.getSession_ = new GetSession(this);

const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
}
Expand All @@ -497,11 +482,19 @@ class Database extends common.GrpcServiceObject {
this._observabilityOptions = instance._observabilityOptions;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.multiplexedSession_.createSession();
//creating multiplexed session
// this.database.createSession({multiplexed: true});
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);

this.sessionFactory_ = new GetSession(this, name, poolOptions, multiplexedSessionOptions);

Check failure on line 490 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `this,·name,·poolOptions,·multiplexedSessionOptions` with `⏎······this,⏎······name,⏎······poolOptions,⏎······multiplexedSessionOptions⏎····`
this.pool_ = this.sessionFactory_.getPool();
this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
Expand Down Expand Up @@ -1013,10 +1006,6 @@ class Database extends common.GrpcServiceObject {
reqOpts.session.creatorRole =
options.databaseRole || this.databaseRole || null;

if (options.multiplexed) {
reqOpts.session.multiplexed = true;
}

const headers = this.resourceHeader_;
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
Expand Down Expand Up @@ -1158,16 +1147,18 @@ class Database extends common.GrpcServiceObject {
* @returns {Transaction}
*/
private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) {
transaction.once('end', () => {
try {
this.pool_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
});
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS!=='true') {

Check failure on line 1150 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS!==` with `·(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS·!==·`
transaction.once('end', () => {
try {
this.pool_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
});
}
}
/**
* @typedef {array} DatabaseDeleteResponse
Expand Down Expand Up @@ -3076,7 +3067,8 @@ class Database extends common.GrpcServiceObject {
...this._traceConfig,
};
return startTrace('Database.runStream', traceConfig, span => {
this.pool_.getSession((err, session) => {
this.sessionFactory_.getSession((err, session) => {
console.log("session: ", session?.formattedName_);

Check failure on line 3071 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `"session:·"` with `'session:·'`

Check warning on line 3071 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
if (err) {
setSpanError(span, err);
proxyStream.destroy(err);
Expand Down Expand Up @@ -3133,123 +3125,6 @@ class Database extends common.GrpcServiceObject {
.once('end', endListener)
.pipe(proxyStream);
});

// if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS) {
// this.multiplexedSession_?.getSession((err, session) => {
// console.log("mux session: ", session?.formattedName_);
// if (err) {
// setSpanError(span, err);
// proxyStream.destroy(err);
// span.end();
// return;
// }

// span.addEvent('Using Session', {'session.id': session?.id});

// const snapshot = session!.snapshot(options, this.queryOptions_);

// let dataReceived = false;
// let dataStream = snapshot.runStream(query);

// const endListener = () => {
// span.end();
// snapshot.end();
// };
// dataStream
// .once('data', () => (dataReceived = true))
// .once('error', err => {
// setSpanError(span, err);

// if (
// !dataReceived &&
// isSessionNotFoundError(err as grpc.ServiceError)
// ) {
// // If it is a 'Session not found' error and we have not yet received
// // any data, we can safely retry the query on a new session.
// // Register the error on the session so the pool can discard it.
// if (session) {
// session.lastError = err as grpc.ServiceError;
// }
// span.addEvent('No session available', {
// 'session.id': session?.id,
// });
// // Remove the current data stream from the end user stream.
// dataStream.unpipe(proxyStream);
// dataStream.removeListener('end', endListener);
// dataStream.end();
// snapshot.end();
// // Create a new data stream and add it to the end user stream.
// dataStream = this.runStream(query, options);
// dataStream.pipe(proxyStream);
// } else {
// proxyStream.destroy(err);
// snapshot.end();
// }
// })
// .on('stats', stats => proxyStream.emit('stats', stats))
// .on('response', response => proxyStream.emit('response', response))
// .once('end', endListener)
// .pipe(proxyStream);
// });
// } else {
// this.pool_.getSession((err, session) => {
// if (err) {
// setSpanError(span, err);
// proxyStream.destroy(err);
// span.end();
// return;
// }

// span.addEvent('Using Session', {'session.id': session?.id});

// const snapshot = session!.snapshot(options, this.queryOptions_);

// this._releaseOnEnd(session!, snapshot, span);

// let dataReceived = false;
// let dataStream = snapshot.runStream(query);

// const endListener = () => {
// span.end();
// snapshot.end();
// };
// dataStream
// .once('data', () => (dataReceived = true))
// .once('error', err => {
// setSpanError(span, err);

// if (
// !dataReceived &&
// isSessionNotFoundError(err as grpc.ServiceError)
// ) {
// // If it is a 'Session not found' error and we have not yet received
// // any data, we can safely retry the query on a new session.
// // Register the error on the session so the pool can discard it.
// if (session) {
// session.lastError = err as grpc.ServiceError;
// }
// span.addEvent('No session available', {
// 'session.id': session?.id,
// });
// // Remove the current data stream from the end user stream.
// dataStream.unpipe(proxyStream);
// dataStream.removeListener('end', endListener);
// dataStream.end();
// snapshot.end();
// // Create a new data stream and add it to the end user stream.
// dataStream = this.runStream(query, options);
// dataStream.pipe(proxyStream);
// } else {
// proxyStream.destroy(err);
// snapshot.end();
// }
// })
// .on('stats', stats => proxyStream.emit('stats', stats))
// .on('response', response => proxyStream.emit('response', response))
// .once('end', endListener)
// .pipe(proxyStream);
// });
// }

finished(proxyStream, err => {
if (err) {
Expand Down
41 changes: 28 additions & 13 deletions src/session-getter.ts → src/get-session.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { EventEmitter } from "events-intercept";
import { Database, Session, Transaction } from ".";

Check failure on line 1 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·Database,·Session,·Transaction·}·from·"."` with `Database,·Session,·Transaction}·from·'.'`

Check warning on line 1 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
import { MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions } from "./multiplexed-session";

Check failure on line 2 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSession,·MultiplexedSessionInterface,·MultiplexedSessionOptions·}·from·"./multiplexed-session"` with `⏎··MultiplexedSession,⏎··MultiplexedSessionInterface,⏎··MultiplexedSessionOptions,⏎}·from·'./multiplexed-session'`

Check warning on line 2 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
import { SessionPool, SessionPoolInterface, SessionPoolOptions } from "./session-pool";

Check failure on line 3 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·SessionPool,·SessionPoolInterface,·SessionPoolOptions·}·from·"./session-pool"` with `⏎··SessionPool,⏎··SessionPoolInterface,⏎··SessionPoolOptions,⏎}·from·'./session-pool'`
import { MultiplexedSessionConstructor, SessionPoolConstructor } from "./database";

Check failure on line 4 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSessionConstructor,·SessionPoolConstructor·}·from·"./database"` with `⏎··MultiplexedSessionConstructor,⏎··SessionPoolConstructor,⏎}·from·'./database'`
import { ServiceObjectConfig } from "@google-cloud/common";
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
Expand All @@ -20,39 +21,53 @@ export interface GetSessionCallback {

export interface GetSessionInterface {
getSession(callback: GetSessionCallback): void;
getPool(): SessionPoolInterface;
getMultiplexedSession(): MultiplexedSessionInterface | undefined;
}

export class GetSession extends EventEmitter implements GetSessionInterface {
database: Database;
export class GetSession extends common.GrpcServiceObject implements GetSessionInterface{
multiplexedSession_?: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor
) {
super();
this.database = database;
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this.database, null)
: new SessionPool(this.database, poolOptions);
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.multiplexedSession_ =
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this.database)
: new MultiplexedSession(this.database, multiplexedSessionOptions);
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(database)
: new MultiplexedSession(database, multiplexedSessionOptions);
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.multiplexedSession_.createSession();
}

getSession(callback: GetSessionCallback): void{
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS==='true') {
this.multiplexedSession_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
} else {
this.pool_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
}
}

getPool(): SessionPoolInterface {
return this.pool_;
}

getMultiplexedSession(): MultiplexedSessionInterface | undefined {
return this.multiplexedSession_;
}
}
12 changes: 9 additions & 3 deletions src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const common = require('./common-grpc/service-object');
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import snakeCase = require('lodash.snakecase');
import {Database, SessionPoolConstructor} from './database';
import {Database, MultiplexedSessionConstructor, SessionPoolConstructor} from './database';
import {Spanner, RequestConfig} from '.';
import {
RequestCallback,
Expand Down Expand Up @@ -52,6 +52,7 @@ import {google as databaseAdmin} from '../protos/protos';
import {google as spannerClient} from '../protos/protos';
import {CreateInstanceRequest} from './index';
import {ObservabilityOptions} from './instrument';
import { MultiplexedSessionOptions } from './multiplexed-session';

export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup;
export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase;
Expand Down Expand Up @@ -960,7 +961,8 @@ class Instance extends common.GrpcServiceObject {
database(
name: string,
poolOptions?: SessionPoolOptions | SessionPoolConstructor,
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor,
): Database {
if (!name) {
throw new GoogleError('A name is required to access a Database object.');
Expand All @@ -975,9 +977,13 @@ class Instance extends common.GrpcServiceObject {
optionsKey =
optionsKey + '/' + JSON.stringify(Object.entries(queryOptions!).sort());
}
if (multiplexedSessionOptions && Object.keys(multiplexedSessionOptions).length > 0) {
optionsKey =
optionsKey + '/' + JSON.stringify(Object.entries(multiplexedSessionOptions!).sort());
}
const key = name.split('/').pop() + optionsKey;
if (!this.databases_.has(key!)) {
const db = new Database(this, name, poolOptions, queryOptions);
const db = new Database(this, name, poolOptions, queryOptions, multiplexedSessionOptions);
db._observabilityOptions = this._observabilityOptions;
this.databases_.set(key!, db);
}
Expand Down

0 comments on commit 931e9a9

Please sign in to comment.