Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/npm_and_yarn/npm_and_yarn-1d3c3…
Browse files Browse the repository at this point in the history
…3138b
  • Loading branch information
lmangani authored Sep 1, 2024
2 parents bc37d63 + 5e1d223 commit 094dba3
Show file tree
Hide file tree
Showing 16 changed files with 419 additions and 209 deletions.
69 changes: 42 additions & 27 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -1110,43 +1110,58 @@ const scanClickhouse = function (settings, client, params) {
*/
const getSeries = async (matches) => {
const query = transpiler.transpileSeries(matches)
const stream = await axios.post(`${getClickhouseUrl()}`, query + ' FORMAT JSONEachRow', {
const stream = await rawRequest(query + ' FORMAT JSONEachRow', null, DATABASE_NAME(), {
responseType: 'stream'
})
const dStream = StringStream.from(stream.data).lines().map(l => {
if (!l) {
return null
}
try {
return JSON.parse(l)
} catch (err) {
logger.error({ line: l, err }, 'Error parsing line')
return null
}
}, DataStream).filter(e => e)
const res = new Transform({
transform (chunk, encoding, callback) {
callback(null, chunk)
}
})
setTimeout(async () => {
const gen = dStream.toGenerator()
res.write('{"status":"success", "data":[', 'utf-8')
let i = 0
try {
for await (const item of gen()) {
if (!item || !item.labels) {
continue
res.write('{"status":"success", "data":[', 'utf-8')
let lastString = ''
let i = 0
let lastData = 0
let open = true
stream.data.on('data', (chunk) => {
lastData = Date.now()
const strChunk = Buffer.from(chunk).toString('utf-8')
const lines = (lastString + strChunk).split('\n')
lastString = lines.pop()
lines.forEach(line => {
if (!line) {
return
}
try {
const obj = JSON.parse(line)
if (obj.labels) {
res.write((i === 0 ? '' : ',') + obj.labels)
++i
}
res.write((i === 0 ? '' : ',') + item.labels)
++i
} catch (err) {
logger.error({ line: line, err }, 'Error parsing line')
}
} catch (e) {
logger.error(e)
} finally {
res.end(']}', 'utf-8')
})
})
const close = () => {
if (lastString) {
res.write((i === 0 ? '' : ',') + lastString)
}
}, 0)
res.end(']}')
open = false
}
const maybeClose = () => {
if (open && Date.now() - lastData >= 10000) {
close()
}
if (open && Date.now() - lastData < 10000) {
setTimeout(maybeClose, 10000)
}
}
setTimeout(maybeClose, 10000)
stream.data.on('end', close)
stream.data.on('error', close)
stream.data.on('finish', close)
return res
}

Expand Down
6 changes: 5 additions & 1 deletion lib/handlers/label_values.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ async function handler (req, res) {
`type IN (${types.map(t => `${t}`).join(',')})`
].filter(w => w)
where = `WHERE ${where.join(' AND ')}`
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON`
let limit = ''
if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) {
limit = `LIMIT ${process.env.ADVANCED_SERIES_REQUEST_LIMIT}`
}
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} ${limit} FORMAT JSON`
const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME())
const resp = { status: 'success', data: allValues.data.data.map(r => r.val) }
return res.send(resp)
Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"basic-auth": "^2.0.1",
"google-protobuf": "^3.21.2",
"@grpc/grpc-js": "^1.10.6",
"@grpc/proto-loader": "^0.7.12"
"@grpc/proto-loader": "^0.7.12",
"pako": "^2.1.0"
},
"devDependencies": {
"@elastic/elasticsearch": "^8.5.0",
Expand Down
3 changes: 3 additions & 0 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ module.exports.transpileSeries = (request) => {
const _query = getQuery(req)
query.withs.idx_sel.query.sqls.push(_query.withs.idx_sel.query)
}
if (process.env.ADVANCED_SERIES_REQUEST_LIMIT) {
query.limit(process.env.ADVANCED_SERIES_REQUEST_LIMIT)
}
setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series${dist}`)
setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}${dist}`)
// logger.debug(query.toString())
Expand Down
13 changes: 13 additions & 0 deletions pyroscope/json_parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ const labelNames = async (req, payload) => {
}
}

const labelValues = async (req, payload) => {
req.type = 'json'
let body = await bufferize(payload)
body = JSON.parse(body.toString())
return {
getName: () => body.name,
getMatchers: () => body.matchers,
getStart: () => body.start,
getEnd: () => body.end
}
}

const analyzeQuery = async (req, payload) => {
req.type = 'json'
let body = await bufferize(payload)
Expand All @@ -52,6 +64,7 @@ module.exports = {
series,
getProfileStats,
labelNames,
labelValues,
settingsGet,
analyzeQuery
}
7 changes: 6 additions & 1 deletion pyroscope/pprof-bin/pkg/pprof_bin.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ export function diff_tree(id1: number, id2: number, sample_type: string): Uint8A
*/
export function export_tree(id: number, sample_type: string): Uint8Array;
/**
* @param {number} id
* @param {Uint8Array} payload
*/
export function merge_trees_pprof(id: number, payload: Uint8Array): void;
/**
* @param {number} id
* @returns {Uint8Array}
*/
export function export_trees_pprof(payload: Uint8Array): Uint8Array;
export function export_trees_pprof(id: number): Uint8Array;
/**
* @param {number} id
*/
Expand Down
20 changes: 14 additions & 6 deletions pyroscope/pprof-bin/pkg/pprof_bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,28 @@ module.exports.export_tree = function(id, sample_type) {
};

/**
* @param {number} id
* @param {Uint8Array} payload
*/
module.exports.merge_trees_pprof = function(id, payload) {
const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc);
const len0 = WASM_VECTOR_LEN;
wasm.merge_trees_pprof(id, ptr0, len0);
};

/**
* @param {number} id
* @returns {Uint8Array}
*/
module.exports.export_trees_pprof = function(payload) {
module.exports.export_trees_pprof = function(id) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc);
const len0 = WASM_VECTOR_LEN;
wasm.export_trees_pprof(retptr, ptr0, len0);
wasm.export_trees_pprof(retptr, id);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v2 = getArrayU8FromWasm0(r0, r1).slice();
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1, 1);
return v2;
return v1;
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
}
Expand Down
Binary file modified pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm
Binary file not shown.
3 changes: 2 additions & 1 deletion pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ export function merge_prof(a: number, b: number, c: number, d: number, e: number
export function merge_tree(a: number, b: number, c: number, d: number, e: number): void;
export function diff_tree(a: number, b: number, c: number, d: number, e: number): void;
export function export_tree(a: number, b: number, c: number, d: number): void;
export function export_trees_pprof(a: number, b: number, c: number): void;
export function merge_trees_pprof(a: number, b: number, c: number): void;
export function export_trees_pprof(a: number, b: number): void;
export function drop_tree(a: number): void;
export function init_panic_hook(): void;
export function __wbindgen_malloc(a: number, b: number): number;
Expand Down
42 changes: 37 additions & 5 deletions pyroscope/pprof-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct Tree {
sample_types: Vec<String>,
max_self: Vec<i64>,
nodes_num: i32,
pprof: Profile,
}

impl Tree {
Expand Down Expand Up @@ -357,6 +358,7 @@ fn upsert_tree(ctx: &mut HashMap<u32, Mutex<Tree>>, id: u32, sample_types: Vec<S
sample_types,
max_self: vec![0; _len],
nodes_num: 1,
pprof: Profile::default(),
}),
);
}
Expand Down Expand Up @@ -384,7 +386,7 @@ impl TrieReader {
fn read_size(&mut self) -> usize {
let res = read_uleb128(&self.bytes[self.offs..]);
self.offs += res.1;
res.0
res.0.clone()
}

fn read_string(&mut self) -> String {
Expand Down Expand Up @@ -423,6 +425,24 @@ impl TrieReader {
}
res
}
fn read_blob(&mut self) -> &[u8] {
let size = self.read_size();
let string = &self.bytes[self.offs..self.offs + size];
self.offs += size;
string
}
fn read_blob_list(&mut self) -> Vec<&[u8]> {
let mut res = Vec::new();
while self.offs < self.bytes.len() {
let uleb = read_uleb128(&self.bytes[self.offs..]);
self.offs += uleb.1;
let _size = uleb.0;
let string = &self.bytes[self.offs..self.offs + _size];
self.offs += _size;
res.push(string);
}
res
}
/*fn end(&self) -> bool {
self.offs >= self.bytes.len()
}*/
Expand Down Expand Up @@ -917,11 +937,15 @@ pub fn export_tree(id: u32, sample_type: String) -> Vec<u8> {
}

#[wasm_bindgen]
pub fn export_trees_pprof(payload: &[u8]) -> Vec<u8> {
pub fn merge_trees_pprof(id: u32, payload: &[u8]) {
let p = panic::catch_unwind(|| {
let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![]);
let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap();
let mut reader = TrieReader::new(payload);
let bin_profs = reader.read_blob_vec();
let bin_profs = reader.read_blob_list();
let mut merger = merge::ProfileMerge::new();
merger.merge(&mut tree.pprof);
for bin_prof in bin_profs {
if bin_prof.len() >= 2 && bin_prof[0] == 0x1f && bin_prof[1] == 0x8b {
let mut decompressed = Vec::new();
Expand All @@ -936,14 +960,22 @@ pub fn export_trees_pprof(payload: &[u8]) -> Vec<u8> {

}
let res = merger.profile();
res.encode_to_vec()
tree.pprof = res;
});
match p {
Ok(res) => return res,
Ok(_) => {}
Err(err) => panic!("{:?}", err),
}
}

#[wasm_bindgen]
pub fn export_trees_pprof(id: u32) -> Vec<u8> {
let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![]);
let tree = ctx.get_mut(&id).unwrap().lock().unwrap();
tree.pprof.encode_to_vec()
}

#[wasm_bindgen]
pub fn drop_tree(id: u32) {
let mut ctx = CTX.lock().unwrap();
Expand Down
Loading

0 comments on commit 094dba3

Please sign in to comment.