Skip to content

Commit

Permalink
Use streaming JSON parse for timing profile to work around string siz…
Browse files Browse the repository at this point in the history
…e limit
  • Loading branch information
bduffany committed Oct 12, 2023
1 parent 9aaf060 commit 3ff38a2
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 28 deletions.
20 changes: 20 additions & 0 deletions app/invocation/invocation.css
Original file line number Diff line number Diff line change
Expand Up @@ -1169,3 +1169,23 @@ svg.invocation-query-graph .nodes rect {
margin-left: auto;
white-space: nowrap;
}

.timing.card .progress-bar {
height: 8px;
border-radius: 4px;
max-width: 300px;
overflow: clip;
background: #bbdefb;
}

.timing.card .progress-label {
margin-bottom: 4px;
font-size: 13px;
color: #616161;
}

.timing.card .progress-bar-inner {
height: 100%;
background: #2196f3;
transform-origin: left;
}
52 changes: 46 additions & 6 deletions app/invocation/invocation_timing_card.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import React from "react";
import SetupCodeComponent from "../docs/setup_code";
import FlameChart from "../flame_chart/flame_chart";
import { Profile, parseProfile } from "../trace/trace_events";
import { Profile, readProfile } from "../trace/trace_events";
import rpcService, { FileEncoding } from "../service/rpc_service";
import InvocationModel from "./invocation_model";
import Button from "../components/button/button";
Expand Down Expand Up @@ -63,6 +63,8 @@ export default class InvocationTimingCardComponent extends React.Component<Props
eventPageSize: window.localStorage[eventPageSizeStorageKey] || 100,
};

private progressRef = React.createRef<HTMLDivElement>();

componentDidMount() {
this.fetchProfile();
}
Expand All @@ -81,6 +83,30 @@ export default class InvocationTimingCardComponent extends React.Component<Props
return Boolean(this.getProfileFile()?.uri?.startsWith("bytestream://"));
}

setProgress(bytesLoaded: number, digestSize: number, encoding: FileEncoding) {
const container = this.progressRef.current;
if (!container) return;

let approxCompressionRatio = 1;
if (encoding === "gzip") {
approxCompressionRatio = 11.3;
}

const compressedBytesLoaded = Math.min(bytesLoaded / approxCompressionRatio, digestSize);
const progressPercent = 100 * Math.min(1, compressedBytesLoaded / digestSize);

const spinner = container.querySelector(".loading") as HTMLElement;
spinner.style.display = "none";

const progressContainer = container.querySelector(".timing-profile-progress")!;
progressContainer.removeAttribute("hidden");
const progressLabel = progressContainer.querySelector(".progress-label")!;
progressLabel.innerHTML = `Loading profile (${format.bytes(compressedBytesLoaded)} / ${format.bytes(digestSize)})`;

const progressBarInner = progressContainer.querySelector(".progress-bar-inner") as HTMLElement;
progressBarInner.style.width = `${progressPercent}%`;
}

fetchProfile() {
if (!this.isTimingEnabled()) {
this.setState({ loading: false });
Expand All @@ -98,15 +124,19 @@ export default class InvocationTimingCardComponent extends React.Component<Props
storedEncoding = "gzip";
}

const digestSize = Number(profileFile.uri.split("/").pop());

this.setState({ loading: true });
// Note: we use responseType "text" instead of "json" since the profile is
// not always valid JSON (the trailing "]}" may be missing).
rpcService
.fetchBytestreamFile(profileFile.uri, this.props.model.getInvocationId(), "text", {
.fetchBytestreamFile(profileFile.uri, this.props.model.getInvocationId(), "stream", {
// Set the stored encoding header to prevent the server from double-gzipping.
headers: { "X-Stored-Encoding-Hint": storedEncoding },
})
.then((contents) => this.updateProfile(parseProfile(contents)))
.then((body) => {
if (body === null) throw new Error("response body is null");
return readProfile(body, (n) => this.setProgress(n, digestSize, storedEncoding));
})
.then((profile) => this.updateProfile(profile))
.catch((e) => errorService.handleError(e))
.finally(() => this.setState({ loading: false }));
}
Expand Down Expand Up @@ -207,7 +237,17 @@ export default class InvocationTimingCardComponent extends React.Component<Props

renderEmptyState() {
if (this.state.loading) {
return <div className="loading" />;
return (
<div ref={this.progressRef}>
<div className="loading" />
<div className="timing-profile-progress" hidden>
<div className="progress-label" />
<div className="progress-bar">
<div className="progress-bar-inner" />
</div>
</div>
</div>
);
}

if (!this.props.model.buildToolLogs) {
Expand Down
3 changes: 3 additions & 0 deletions app/trace/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ ts_library(
ts_library(
name = "trace_events",
srcs = ["trace_events.ts"],
deps = [
"@npm//tslib",
],
)

ts_jasmine_node_test(
Expand Down
79 changes: 63 additions & 16 deletions app/trace/trace_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,73 @@ const TIME_SERIES_EVENT_NAMES_AND_ARG_KEYS: Map<string, string> = new Map([
["Network Down usage (total)", "system network down (Mbps)"],
]);

export function parseProfile(data: string): Profile {
// Note, the trace profile format specifies that the "]" at the end of the
// list is optional:
// https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#heading=h.f2f0yd51wi15
// Bazel uses the JSON Object Format, which means the trailing "]}" is
// optional.
if (trailingNonWhitespaceCharacters(data, 2) !== "]}") {
data += "]}";
export async function readProfile(
body: ReadableStream<Uint8Array>,
progress: (numBytesLoaded: number) => void
): Promise<Profile> {
const reader = body.getReader();
const decoder = new TextDecoder("utf-8");
let n = 0;
let buffer = "";
let profile: Profile | null = null;

while (true) {
const { value, done } = await reader.read();
if (done) break;
// `stream: true` allows us to handle UTF-8 sequences that cross chunk
// boundaries (should be relatively rare).
const text = decoder.decode(value, { stream: true });
buffer += text;
n += value.byteLength;
progress(n);
// Keep accumulating into the buffer until we see the "traceEvents" array.
// Each entry in this array is newline-delimited (a special property of
// Google's trace event JSON format).
if (!profile) {
const beginMarker = '"traceEvents":[\n';
const index = buffer.indexOf(beginMarker);
if (index < 0) continue;

const before = buffer.substring(0, index + beginMarker.length);
const after = buffer.substring(before.length);

const outerJSON = before + "]}";
profile = JSON.parse(outerJSON) as Profile;
buffer = after;
}
if (profile) {
buffer = consumeEvents(buffer, profile);
}
}
// Consume last event, which isn't guaranteed to end with ",\n"
if (buffer) {
const outerJSONClosingSequence = "\n ]\n}";
if (buffer.endsWith(outerJSONClosingSequence)) {
buffer = buffer.substring(0, buffer.length - outerJSONClosingSequence.length);
}
if (buffer) {
const event = JSON.parse(buffer);
profile?.traceEvents.push(event);
buffer = "";
}
}
return JSON.parse(data) as Profile;
}

function trailingNonWhitespaceCharacters(text: string, numTrailingChars: number) {
let out = "";
for (let i = text.length - 1; i >= 0; i--) {
if (text[i].trim() !== "") out = text[i] + out;
if (!profile) {
throw new Error("failed to parse timing profile JSON");
}
return profile;
}

if (out.length >= numTrailingChars) break;
function consumeEvents(buffer: string, profile: Profile): string {
// Each event entry looks like " { ... },\n"
const parts = buffer.split(",\n");
const completeEvents = parts.slice(0, parts.length - 1);
for (const rawEvent of completeEvents) {
profile!.traceEvents.push(JSON.parse(rawEvent));
}
return out;
// If there's a partial event at the end, that partial event becomes the new
// buffer value.
return parts[parts.length - 1] || "";
}

function eventComparator(a: TraceEvent, b: TraceEvent) {
Expand Down
62 changes: 56 additions & 6 deletions app/trace/trace_events_test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { parseProfile } from "./trace_events";
import { readProfile } from "./trace_events";

// NOTE: in the following profile data, whitespace is significant (unlike regular JSON):
// - The `"traceEvents":[` list opening has to end with a newline
// - Each entry in the traceEvents list has to end with ",\n", except for the last one
// - The traceEvents list is closed with the exact sequence "\n ]\n}"

const INCOMPLETE_PROFILE = `
{"otherData":{"build_id":"799cc58b-8695-4e70-a772-7a15472aa55b","output_base":"/output-base","date":"Wed Oct 26 20:24:01 UTC 2022"},"traceEvents":[
Expand All @@ -10,14 +15,59 @@ const INCOMPLETE_PROFILE = `
{"cat":"build phase marker","name":"Initialize command","ph":"i","ts":0,"pid":1,"tid":29},
{"cat":"general information","name":"BazelStartupOptionsModule.beforeCommand","ph":"X","ts":200868,"dur":206,"pid":1,"tid":29}`.trim();

const COMPLETE_PROFILE = INCOMPLETE_PROFILE + "]}";
const COMPLETE_PROFILE = INCOMPLETE_PROFILE + "\n ]\n}";

function readableStreamFromString(value: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
const reader: ReadableStreamDefaultReader<Uint8Array> = {
read() {
if (!value) {
return Promise.resolve({ done: true });
}
// Read a small-ish, random length (between 1 and 10 bytes)
const length = Math.min(value.length, 1 + Math.floor(Math.random() * 10));
const before = value.substring(0, length);
const after = value.substring(length);
value = after;
return Promise.resolve({ value: encoder.encode(before), done: false });
},
releaseLock() {
throw new Error("releaseLock(): not implemented");
},
cancel() {
throw new Error("cancel(): not implemented");
},
get closed() {
return Promise.reject("closed: not implemented");
},
};
const stream = {
getReader(): ReadableStreamDefaultReader<Uint8Array> {
return reader;
},
} as ReadableStream<Uint8Array>;

return stream;
}

describe("parseProfile", () => {
it("should parse a complete profile", () => {
parseProfile(COMPLETE_PROFILE);
it("should parse a complete profile", async () => {
const stream = readableStreamFromString(COMPLETE_PROFILE);
let numBytesRead = 0;
const profile = await readProfile(stream, (n) => {
numBytesRead = n;
});
expect(profile.traceEvents.length).toBe(7);
expect(numBytesRead).toBe(COMPLETE_PROFILE.length);
});

it("should parse an incomplete profile", () => {
parseProfile(INCOMPLETE_PROFILE);
it("should parse an incomplete profile", async () => {
const stream = readableStreamFromString(INCOMPLETE_PROFILE);
let numBytesRead = 0;
const profile = await readProfile(stream, (n) => {
numBytesRead = n;
});
expect(profile.traceEvents.length).toBe(7);
expect(numBytesRead).toBe(INCOMPLETE_PROFILE.length);
});
});

0 comments on commit 3ff38a2

Please sign in to comment.