Skip to content

Commit

Permalink
Try snappy compression
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Jan 8, 2025
1 parent bfe2960 commit c94c085
Show file tree
Hide file tree
Showing 16 changed files with 333 additions and 158 deletions.
329 changes: 227 additions & 102 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/scripts/esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const defaultConfig = {
keepNames: false,
sourcemap: false,
logLevel: 'error',
external: [],
external: ['snappy'],
define: {},
};

Expand Down
2 changes: 1 addition & 1 deletion dev/tool/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --sourcemap=external --define:MODEL_VERSION --define:GIT_REVISION",
"bundle": "node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --external=snappy --sourcemap=external --define:MODEL_VERSION --define:GIT_REVISION",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/tool",
"docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
Expand Down
6 changes: 4 additions & 2 deletions plugins/client-resources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
"typescript": "^5.3.3",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
"@types/jest": "^29.5.5",
"@types/snappyjs": "^0.7.1"
},
"dependencies": {
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/core": "^0.6.32",
"@hcengineering/client": "^0.6.18",
"@hcengineering/rpc": "^0.6.5"
"@hcengineering/rpc": "^0.6.5",
"snappyjs": "^0.7.0"
},
"repository": "https://github.com/hcengineering/platform",
"publishConfig": {
Expand Down
32 changes: 25 additions & 7 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import platform, {
broadcastEvent,
getMetadata
} from '@hcengineering/platform'
import { uncompress } from 'snappyjs'

import { HelloRequest, HelloResponse, RPCHandler, ReqId, type Response } from '@hcengineering/rpc'

Expand Down Expand Up @@ -88,6 +89,7 @@ class RequestPromise {
class Connection implements ClientConnection {
private websocket: ClientSocket | null = null
binaryMode = false
compressionMode = false
private readonly requests = new Map<ReqId, RequestPromise>()
private lastId = 0
private interval: number | undefined
Expand Down Expand Up @@ -286,9 +288,8 @@ class Connection implements ClientConnection {
}
if (resp.result === 'hello') {
const helloResp = resp as HelloResponse
if (helloResp.binary) {
this.binaryMode = true
}
this.binaryMode = helloResp.binary
this.compressionMode = helloResp.useCompression ?? false

// We need to clear dial timer, since we recieve hello response.
clearTimeout(this.dialTimer)
Expand Down Expand Up @@ -436,6 +437,7 @@ class Connection implements ClientConnection {

private openConnection (ctx: MeasureContext, socketId: number): void {
this.binaryMode = false
this.helloRecieved = false
// Use defined factory or browser default one.
const clientSocketFactory =
this.opt?.socketFactory ??
Expand Down Expand Up @@ -500,11 +502,28 @@ class Connection implements ClientConnection {
}
if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => {
if (this.compressionMode && this.helloRecieved) {
try {
data = uncompress(data)
} catch (err: any) {
// Ignore
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
})
} else {
const resp = this.rpcHandler.readResponse<any>(event.data, this.binaryMode)
let data = event.data
if (this.compressionMode && this.helloRecieved) {
try {
data = uncompress(data)
} catch (err: any) {
// Ignore
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
}
}
Expand All @@ -522,15 +541,14 @@ class Connection implements ClientConnection {
return
}
const useBinary = this.opt?.useBinaryProtocol ?? getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression =
this.compressionMode =
this.opt?.useProtocolCompression ?? getMetadata(client.metadata.UseProtocolCompression) ?? false
this.helloRecieved = false
const helloRequest: HelloRequest = {
method: 'hello',
params: [],
id: -1,
binary: useBinary,
compression: useCompression
compression: this.compressionMode
}
ctx.withSync('send-hello', {}, () => this.websocket?.send(this.rpcHandler.serialize(helloRequest, false)))
}
Expand Down
2 changes: 1 addition & 1 deletion pods/fulltext/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"get-model": "node ../../common/scripts/esbuild.js --entry=src/get-model.ts --keep-names=true --bundle=true --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION && node ./bundle/bundle.js > ./bundle/model.json",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --keep-names=true --bundle=true --external=*.node --sourcemap=external",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --keep-names=true --bundle=true --external=*.node --external=snappy --sourcemap=external",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/fulltext",
"docker:tbuild": "docker build -t hardcoreeng/fulltext . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
"docker:abuild": "docker build -t hardcoreeng/fulltext . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
Expand Down
9 changes: 6 additions & 3 deletions pods/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"get-model": "node ../../common/scripts/esbuild.js --entry=src/get-model.ts -keep-names=true --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --bundle=true && node ./bundle/bundle.js > ./bundle/model.json",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --bundle=true --sourcemap=external --external=*.node --external=*.node --external=bufferutil --external=snappy --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --external=utf-8-validate --external=msgpackr-extract",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --bundle=true --sourcemap=external --external=*.node --external=bufferutil --external=snappy --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --external=utf-8-validate --external=msgpackr-extract",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor",
"docker:tbuild": "docker build -t hardcoreeng/transactor . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",
"docker:abuild": "docker build -t hardcoreeng/transactor . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",
Expand Down Expand Up @@ -51,7 +51,8 @@
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5",
"@hcengineering/model-all": "^0.6.0"
"@hcengineering/model-all": "^0.6.0",
"snappyjs": "^0.7.0"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",
Expand All @@ -77,6 +78,8 @@
"bufferutil": "^4.0.8",
"msgpackr": "^1.11.2",
"msgpackr-extract": "^3.0.3",
"@hcengineering/postgres": "^0.6.0"
"@hcengineering/postgres": "^0.6.0",
"snappy": "^7.2.2",
"@hcengineering/rpc": "^0.6.5"
}
}
23 changes: 23 additions & 0 deletions pods/server/src/__tests__/comptession.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { readFile } from 'fs/promises'
import { compress } from 'snappy'
import { RPCHandler } from '@hcengineering/rpc'

describe('compression-tests', () => {
it('check-snappy', async () => {
const modelJSON = (await readFile('./bundle/model.json')).toString()

const txes = JSON.parse(modelJSON)

const compressed = await compress(modelJSON)
console.log(modelJSON.length, compressed.length, compressed.length / modelJSON.length)
expect(compressed.length).toBeLessThan(modelJSON.length)

const rpc = new RPCHandler()

const jsonData = rpc.serialize(txes, true)

const compressed2 = await compress(jsonData)
console.log(jsonData.length, compressed2.length, compressed2.length / jsonData.length)
expect(compressed2.length).toBeLessThan(jsonData.length)
})
})
1 change: 0 additions & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ export type ServerFactory = (
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
enableCompression: boolean,
accountsUrl: string,
externalStorage: StorageAdapter
) => () => Promise<void>
Expand Down
1 change: 1 addition & 0 deletions server/rpc/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface HelloResponse extends Response<any> {
serverVersion: string
lastTx?: string
lastHash?: string // Last model hash
useCompression?: boolean
}

function replacer (key: string, value: any): any {
Expand Down
17 changes: 10 additions & 7 deletions server/server/src/sessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class TSessionManager implements SessionManager {
stop: () => Promise<string | undefined>
}
| undefined,
readonly accountsUrl: string
readonly accountsUrl: string,
readonly enableCompression: boolean
) {
this.checkInterval = setInterval(() => {
this.handleTick()
Expand Down Expand Up @@ -1080,7 +1081,7 @@ class TSessionManager implements SessionManager {
): Promise<void> {
const hello = request as HelloRequest
service.binaryMode = hello.binary ?? false
service.useCompression = hello.compression ?? false
service.useCompression = this.enableCompression ? hello.compression ?? false : false

if (LOGGING_ENABLED) {
ctx.info('hello happen', {
Expand All @@ -1106,7 +1107,8 @@ class TSessionManager implements SessionManager {
reconnect,
serverVersion: this.serverVersion,
lastTx: pipeline.context.lastTx,
lastHash: pipeline.context.lastHash
lastHash: pipeline.context.lastHash,
useCompression: service.useCompression
}
ws.send(requestCtx, helloResponse, false, false)
}
Expand All @@ -1123,9 +1125,10 @@ export function createSessionManager (
stop: () => Promise<string | undefined>
}
| undefined,
accountsUrl: string
accountsUrl: string,
enableCompression: boolean
): SessionManager {
return new TSessionManager(ctx, sessionFactory, timeouts, brandingMap ?? null, profiling, accountsUrl)
return new TSessionManager(ctx, sessionFactory, timeouts, brandingMap ?? null, profiling, accountsUrl, enableCompression)
}

/**
Expand Down Expand Up @@ -1157,7 +1160,8 @@ export function startSessionManager (
reconnectTimeout: 500
},
opt.profiling,
opt.accountsUrl
opt.accountsUrl,
opt.enableCompression ?? false
)
return {
shutdown: opt.serverFactory(
Expand All @@ -1168,7 +1172,6 @@ export function startSessionManager (
ctx,
opt.pipelineFactory,
opt.port,
opt.enableCompression ?? false,
opt.accountsUrl,
opt.externalStorage
),
Expand Down
3 changes: 2 additions & 1 deletion server/ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"express": "^4.21.2",
"utf-8-validate": "^6.0.4",
"ws": "^8.18.0",
"body-parser": "^1.20.2"
"body-parser": "^1.20.2",
"snappy": "^7.2.2"
}
}
43 changes: 16 additions & 27 deletions server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import os from 'os'
import { WebSocketServer, type RawData, type WebSocket } from 'ws'

import 'bufferutil'
import { compress } from 'snappy'
import 'utf-8-validate'

let profiling = false
const rpcHandler = new RPCHandler()
/**
Expand All @@ -60,14 +62,12 @@ export function startHttpServer (
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
enableCompression: boolean,
accountsUrl: string,
externalStorage: StorageAdapter
): () => Promise<void> {
if (LOGGING_ENABLED) {
ctx.info('starting server on', {
port,
enableCompression,
accountsUrl,
parallel: os.availableParallelism()
})
Expand Down Expand Up @@ -324,27 +324,7 @@ export function startHttpServer (
const httpServer = http.createServer(app)
const wss = new WebSocketServer({
noServer: true,
perMessageDeflate: enableCompression
? {
zlibDeflateOptions: {
// See zlib defaults.
chunkSize: 32 * 1024,
memLevel: 1,
level: 1
},
zlibInflateOptions: {
chunkSize: 32 * 1024,
level: 1,
memLevel: 1
},
serverNoContextTakeover: true,
clientNoContextTakeover: true,
// Below options specified as default values.
concurrencyLimit: Math.max(10, os.availableParallelism()), // Limits zlib concurrency for perf.
threshold: 1024 // Size (in bytes) below which messages
// should not be compressed if context takeover is disabled.
}
: false,
perMessageDeflate: false,
skipUTF8Validation: true,
maxPayload: 250 * 1024 * 1024,
clientTracking: false // We do not need to track clients inside clients.
Expand Down Expand Up @@ -566,23 +546,32 @@ function createWebsocketClientSocket (
}
ws.send(pongConst)
},
send: (ctx: MeasureContext, msg, binary, compression) => {
send: (ctx: MeasureContext, msg, binary, _compression) => {
const smsg = rpcHandler.serialize(msg, binary)

ctx.measure('send-data', smsg.length)
const st = Date.now()
if (ws.readyState !== ws.OPEN || cs.isClosed) {
return
}
ws.send(smsg, { binary: true, compress: compression }, (err) => {

const handleErr = (err?: Error): void => {
ctx.measure('msg-send-delta', Date.now() - st)
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err })
Analytics.handleError(err)
}
}
ctx.measure('msg-send-delta', Date.now() - st)
})
}

if (_compression) {
void compress(smsg).then((msg: any) => {
ws.send(msg, { binary: true }, handleErr)
})
} else {
ws.send(smsg, { binary: true }, handleErr)
}
}
}
return cs
Expand Down
6 changes: 4 additions & 2 deletions workers/transactor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
"ts-jest": "^29.1.1",
"typescript": "^5.3.3",
"wrangler": "^3.97.0",
"esbuild": "^0.24.2"
"esbuild": "^0.24.2",
"@types/snappyjs": "^0.7.1"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",
Expand All @@ -54,6 +55,7 @@
"@hcengineering/server-pipeline": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/storage": "^0.6.0",
"itty-router": "^5.0.18"
"itty-router": "^5.0.18",
"snappyjs": "^0.7.0"
}
}
Loading

0 comments on commit c94c085

Please sign in to comment.