Skip to content

Commit

Permalink
Move streaming reader out of Blob into a dedicated BufReader class (
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiltd authored Jan 20, 2025
1 parent a0d8515 commit 39114a4
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 192 deletions.
195 changes: 36 additions & 159 deletions builtins/web/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,19 @@
#include "file.h"
#include "builtin.h"
#include "encode.h"
#include "extension-api.h"
#include "rust-encoding.h"
#include "streams/buf-reader.h"
#include "streams/native-stream-source.h"

#include "js/UniquePtr.h"
#include "js/ArrayBuffer.h"
#include "js/Conversions.h"
#include "js/experimental/TypedData.h"
#include "js/HashTable.h"
#include "js/Stream.h"
#include "js/TypeDecls.h"
#include "js/Value.h"

namespace {

static api::Engine *ENGINE;

JSObject *new_array_buffer_from_span(JSContext *cx, std::span<const uint8_t> span) {
auto buf = mozilla::MakeUnique<uint8_t[]>(span.size());
if (!buf) {
JS_ReportOutOfMemory(cx);
return nullptr;
}

std::copy(span.begin(), span.end(), buf.get());

auto array_buffer = JS::NewArrayBufferWithContents(
cx, span.size(), buf.get(), JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory);
if (!array_buffer) {
JS_ReportOutOfMemory(cx);
return nullptr;
}

// `array_buffer` now owns `buf`
std::ignore = (buf.release());
return array_buffer;
}

template <typename T> bool validate_type(T *chars, size_t strlen) {
for (size_t i = 0; i < strlen; i++) {
T c = chars[i];
Expand Down Expand Up @@ -153,6 +128,8 @@ namespace blob {

using js::Vector;
using file::File;
using streams::BufReader;
using streams::NativeStreamSource;

#define DEFINE_BLOB_METHOD(name) \
bool Blob::name(JSContext *cx, unsigned argc, JS::Value *vp) { \
Expand Down Expand Up @@ -190,94 +167,45 @@ const JSPropertySpec Blob::properties[] = {
JS_PS_END,
};

class StreamTask final : public api::AsyncTask {
Heap<JSObject *> source_;

static constexpr size_t CHUNK_SIZE = 8192;

public:
explicit StreamTask(const HandleObject source, api::Engine *engine) : source_(source) {
handle_ = IMMEDIATE_TASK_HANDLE;
}

[[nodiscard]] bool run(api::Engine *engine) override {
JSContext *cx = engine->cx();
RootedObject owner(cx, streams::NativeStreamSource::owner(source_));
RootedObject stream(cx, streams::NativeStreamSource::stream(source_));
RootedValue ret(cx);

auto readers = Blob::readers(owner);
auto rdr = readers->lookup(source_);

if (!rdr) {
return false;
}

auto chunk = rdr->value().read(CHUNK_SIZE);
auto chunk_size = chunk.size();

if (chunk.empty()) {
if (!JS::ReadableStreamClose(cx, stream)) {
return false;
}

readers->remove(source_);
return cancel(engine);
}

RootedObject array_buffer(cx, new_array_buffer_from_span(cx, chunk));
if (!array_buffer) {
return false;
}

RootedObject bytes_buffer(cx, JS_NewUint8ArrayWithBuffer(cx, array_buffer, 0, chunk_size));
if (!bytes_buffer) {
return false;
}

RootedValue enqueue_val(cx);
enqueue_val.setObject(*bytes_buffer);
if (!JS::ReadableStreamEnqueue(cx, stream, enqueue_val)) {
return false;
}
JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self) {
auto src = Blob::blob(self);
auto size = src->length();

return cancel(engine);
auto buf = mozilla::MakeUnique<uint8_t[]>(size);
if (!buf) {
JS_ReportOutOfMemory(cx);
return nullptr;
}

[[nodiscard]] bool cancel(api::Engine *engine) override {
handle_ = INVALID_POLLABLE_HANDLE;
return true;
auto array_buffer = JS::NewArrayBufferWithContents(
cx, size, src->begin(), JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory);
if (!array_buffer) {
JS_ReportOutOfMemory(cx);
return nullptr;
}

void trace(JSTracer *trc) override { TraceEdge(trc, &source_, "Source for Blob StreamTask"); }
};

JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self) {
size_t total_size = blob_size(self);
size_t bytes_read = 0;

return Blob::data_to_owned_array_buffer(cx, self, 0, total_size, &bytes_read);
// `array_buffer` now owns `buf`
std::ignore = (buf.release());
return array_buffer;
}

JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self, size_t offset,
size_t size, size_t *bytes_read) {
auto blob = Blob::blob(self);
auto blob_size = blob->length();
*bytes_read = 0;
bool Blob::read_blob_slice(JSContext *cx, HandleObject self, std::span<uint8_t> buf,
size_t start, size_t *read, bool *done) {
auto src = Blob::blob(self);

MOZ_ASSERT(offset <= blob_size);
if (start >= src->length()) {
*read = 0;
*done = true;
return true;
}

size_t available_bytes = blob_size - offset;
size_t read_size = std::min(size, available_bytes);
size_t available = src->length() - start;
size_t to_read = std::min(buf.size(), available);

auto span = std::span<uint8_t>(blob->begin() + offset, read_size);
auto array_buffer = new_array_buffer_from_span(cx, span);
if (!array_buffer) {
return nullptr;
}
std::copy_n(src->begin() + start, to_read, buf.data());
*read = to_read;

*bytes_read = read_size;
return array_buffer;
return true;
}

DEFINE_BLOB_METHOD(arrayBuffer)
Expand Down Expand Up @@ -382,28 +310,15 @@ bool Blob::slice(JSContext *cx, HandleObject self, const CallArgs &args, Mutable
}

bool Blob::stream(JSContext *cx, HandleObject self, MutableHandleValue rval) {
auto native_stream = streams::NativeStreamSource::create(cx, self, JS::UndefinedHandleValue,
stream_pull, stream_cancel);

JS::RootedObject source(cx, native_stream);
if (!source) {
RootedObject reader(cx, BufReader::create(cx, self, read_blob_slice));
if (!reader) {
return false;
}

auto readers = Blob::readers(self);
auto blob = Blob::blob(self);
auto span = std::span<uint8_t>(blob->begin(), blob->length());
RootedObject native_stream(cx, BufReader::stream(reader));
RootedObject default_stream(cx, NativeStreamSource::stream(native_stream));

if (!readers->put(source, BlobReader(span))) {
return false;
}

JS::RootedObject stream(cx, streams::NativeStreamSource::stream(native_stream));
if (!stream) {
return false;
}

rval.setObject(*stream);
rval.setObject(*default_stream);
return true;
}

Expand Down Expand Up @@ -476,21 +391,6 @@ bool Blob::type_get(JSContext *cx, unsigned argc, JS::Value *vp) {
return true;
}

bool Blob::stream_cancel(JSContext *cx, JS::CallArgs args, JS::HandleObject stream,
JS::HandleObject owner, JS::HandleValue reason) {
args.rval().setUndefined();
return true;
}

bool Blob::stream_pull(JSContext *cx, JS::CallArgs args, JS::HandleObject source,
JS::HandleObject owner, JS::HandleObject controller) {

ENGINE->queue_async_task(new StreamTask(source, ENGINE));

args.rval().setUndefined();
return true;
}

Blob::ByteBuffer *Blob::blob(JSObject *self) {
MOZ_ASSERT(is_instance(self));
auto blob = static_cast<ByteBuffer *>(
Expand All @@ -509,15 +409,6 @@ JSString *Blob::type(JSObject *self) {
return JS::GetReservedSlot(self, static_cast<size_t>(Blob::Slots::Type)).toString();
}

Blob::ReadersMap *Blob::readers(JSObject *self) {
MOZ_ASSERT(is_instance(self));
auto readers = static_cast<ReadersMap *>(
JS::GetReservedSlot(self, static_cast<size_t>(Blob::Slots::Readers)).toPrivate());

MOZ_ASSERT(readers);
return readers;
}

Blob::LineEndings Blob::line_endings(JSObject *self) {
MOZ_ASSERT(is_instance(self));
return static_cast<LineEndings>(
Expand Down Expand Up @@ -687,15 +578,13 @@ JSObject *Blob::create(JSContext *cx, UniqueChars data, size_t data_len, HandleS
SetReservedSlot(self, static_cast<uint32_t>(Slots::Data), JS::PrivateValue(blob));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Type), JS::StringValue(type));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Endings), JS::Int32Value(LineEndings::Transparent));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Readers), JS::PrivateValue(new ReadersMap));
return self;
}

bool Blob::init(JSContext *cx, HandleObject self, HandleValue blobParts, HandleValue opts) {
SetReservedSlot(self, static_cast<uint32_t>(Slots::Type), JS_GetEmptyStringValue(cx));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Endings), JS::Int32Value(LineEndings::Transparent));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Data), JS::PrivateValue(new ByteBuffer));
SetReservedSlot(self, static_cast<uint32_t>(Slots::Readers), JS::PrivateValue(new ReadersMap));

// Walk the blob parts and append them to the blob's buffer.
if (blobParts.isNull()) {
Expand Down Expand Up @@ -742,21 +631,9 @@ void Blob::finalize(JS::GCContext *gcx, JSObject *self) {
if (blob) {
free(blob);
}

auto readers = Blob::readers(self);
if (readers) {
free(readers);
}
}

void Blob::trace(JSTracer *trc, JSObject *self) {
MOZ_ASSERT(is_instance(self));
auto readers = Blob::readers(self);
readers->trace(trc);
}

bool install(api::Engine *engine) {
ENGINE = engine;
return Blob::init_class(engine->cx(), engine->global());
}

Expand Down
36 changes: 3 additions & 33 deletions builtins/web/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,14 @@
#include "builtin.h"
#include "extension-api.h"
#include "js/AllocPolicy.h"
#include "js/GCHashTable.h"
#include "js/TypeDecls.h"
#include "js/Vector.h"

namespace builtins {
namespace web {
namespace blob {

class BlobReader {
std::span<const uint8_t> data_;
std::size_t position_;

public:
explicit BlobReader(std::span<const uint8_t> data) : data_(data), position_(0) {}

std::size_t remaining() const { return data_.size() - position_; }

std::span<const uint8_t> read(std::size_t size) {
size = std::min(size, remaining());
auto result = data_.subspan(position_, size);

position_ += size;
return result;
}

void trace(JSTracer *trc) {}
};

class Blob : public TraceableBuiltinImpl<Blob> {
class Blob : public FinalizableBuiltinImpl<Blob> {
static bool arrayBuffer(JSContext *cx, unsigned argc, JS::Value *vp);
static bool bytes(JSContext *cx, unsigned argc, JS::Value *vp);
static bool slice(JSContext *cx, unsigned argc, JS::Value *vp);
Expand All @@ -54,17 +33,14 @@ class Blob : public TraceableBuiltinImpl<Blob> {
enum Slots { Data, Type, Endings, Readers, Count };
enum LineEndings { Transparent, Native };

using HeapObj = Heap<JSObject *>;
using ByteBuffer = js::Vector<uint8_t, 0, js::SystemAllocPolicy>;
using ReadersMap = JS::GCHashMap<HeapObj, BlobReader, js::StableCellHasher<HeapObj>, js::SystemAllocPolicy>;

static bool arrayBuffer(JSContext *cx, HandleObject self, MutableHandleValue rval);
static bool bytes(JSContext *cx, HandleObject self, MutableHandleValue rval);
static bool stream(JSContext *cx, HandleObject self, MutableHandleValue rval);
static bool text(JSContext *cx, HandleObject self, MutableHandleValue rval);
static bool slice(JSContext *cx, HandleObject self, const CallArgs &args, MutableHandleValue rval);

static ReadersMap *readers(JSObject *self);
static ByteBuffer *blob(JSObject *self);
static size_t blob_size(JSObject *self);
static JSString *type(JSObject *self);
Expand All @@ -77,21 +53,15 @@ class Blob : public TraceableBuiltinImpl<Blob> {
static bool init_options(JSContext *cx, HandleObject self, HandleValue opts);
static bool init(JSContext *cx, HandleObject self, HandleValue blobParts, HandleValue opts);

static bool stream_cancel(JSContext *cx, JS::CallArgs args, JS::HandleObject stream,
JS::HandleObject owner, JS::HandleValue reason);
static bool stream_pull(JSContext *cx, JS::CallArgs args, JS::HandleObject source,
JS::HandleObject body_owner, JS::HandleObject controller);

static JSObject *data_to_owned_array_buffer(JSContext *cx, HandleObject self);
static JSObject *data_to_owned_array_buffer(JSContext *cx, HandleObject self, size_t offset,
size_t size, size_t *bytes_read);
static bool read_blob_slice(JSContext *cx, HandleObject self, std::span<uint8_t>,
size_t start, size_t *read, bool *done);

static JSObject *create(JSContext *cx, UniqueChars data, size_t data_len, HandleString type);

static bool init_class(JSContext *cx, HandleObject global);
static bool constructor(JSContext *cx, unsigned argc, Value *vp);
static void finalize(JS::GCContext *gcx, JSObject *self);
static void trace(JSTracer *trc, JSObject *self);
};

bool install(api::Engine *engine);
Expand Down
Loading

0 comments on commit 39114a4

Please sign in to comment.