Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority to SHFlow, replace _flowPool with flat_set #1079

Merged
merged 20 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
baaa957
Add priority to SHFlow, replace _flowPool with flat_set
sinkingsugar Nov 9, 2024
2a9278b
Prevent duplicate wire scheduling in SHMesh flow setup
sinkingsugar Nov 9, 2024
0fec670
Fix wireCleanedUp
sinkingsugar Nov 9, 2024
23b3627
Add priority support for SHWire in scheduler and tests
sinkingsugar Nov 9, 2024
64d5ed0
Refactor network test shards by removing priority tags
sinkingsugar Nov 10, 2024
9e2499a
Refactor network-ws.shs to modularize main test
sinkingsugar Nov 10, 2024
9253512
Make sure server starts first in network tests
sinkingsugar Nov 10, 2024
da59ef4
Refactor `SHFlow` initialization, enhance cleanup logging
sinkingsugar Nov 10, 2024
44219e2
Fix mesh terminate
sinkingsugar Nov 10, 2024
313d51d
Proper order for resumer restoring
sinkingsugar Nov 10, 2024
ad19259
Refactor `SHFlow` to encapsulate paused and wire state, streamline fl…
sinkingsugar Nov 11, 2024
43a4f9c
Refactor SHWire event handling and cleanup process
sinkingsugar Nov 11, 2024
96025ac
Refactor: simplify cleanup connections in Spawn and WhenDone
sinkingsugar Nov 11, 2024
a39c972
progress on removing shflow
sinkingsugar Nov 11, 2024
2eb5759
Nearly there, simplified a lot
sinkingsugar Nov 12, 2024
621f459
Enhance wire handling with child wire delegation in SwitchTo
sinkingsugar Nov 12, 2024
be66330
Final touches with SwitchTo and Step
sinkingsugar Nov 12, 2024
94a701d
Improve wire management and ID regeneration logic
sinkingsugar Nov 13, 2024
ce9b04d
Revert network test
sinkingsugar Nov 13, 2024
bcd20d5
Remove SHFlow struct from runtime and update SwitchTo
sinkingsugar Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/shards/shards.h
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ typedef void(__cdecl *SHSetWireLooped)(SHWireRef wire, SHBool looped);
typedef void(__cdecl *SHSetWireUnsafe)(SHWireRef wire, SHBool unsafe);
typedef void(__cdecl *SHSetWirePure)(SHWireRef wire, SHBool pure);
typedef void(__cdecl *SHSetWireStackSize)(SHWireRef wire, uint64_t stackSize);
typedef void(__cdecl *SHSetWirePriority)(SHWireRef wire, int priority);
typedef void(__cdecl *SHSetWireTraits)(SHWireRef wire, SHSeq traits);
typedef void(__cdecl *SHAddShard)(SHWireRef wire, ShardPtr shard);
typedef void(__cdecl *SHRemShard)(SHWireRef wire, ShardPtr shard);
Expand Down Expand Up @@ -1119,6 +1120,7 @@ typedef struct _SHCore {
SHSetWireUnsafe setWireUnsafe;
SHSetWirePure setWirePure;
SHSetWireStackSize setWireStackSize;
SHSetWirePriority setWirePriority;
SHSetWireTraits setWireTraits;
SHAddShard addShard;
SHRemShard removeShard;
Expand Down
53 changes: 39 additions & 14 deletions shards/core/foundation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@

#include <shards/shardwrapper.hpp>

#ifdef __clang__
#pragma clang attribute push(__attribute__((no_sanitize("undefined"))), apply_to = function)
#endif

#include <boost/container/stable_vector.hpp>
#include <boost/container/flat_map.hpp>

#ifdef __clang__
#pragma clang attribute pop
#endif

// Needed specially for win32/32bit
#include <boost/align/aligned_allocator.hpp>

Expand Down Expand Up @@ -256,10 +245,13 @@ typedef void(__cdecl *SHSetWireError)(const SHWire *, void *errorData, struct SH

struct SHWire : public std::enable_shared_from_this<SHWire> {
enum State { Stopped, Prepared, Starting, Iterating, IterationEnded, Failed, Ended };

// Triggered whenever the main wire of a context starts
struct OnStartEvent {
const SHWire *wire;
};

// Triggered directly on each wire
struct OnCleanupEvent {
const SHWire *wire;
};
Expand All @@ -280,6 +272,8 @@ struct SHWire : public std::enable_shared_from_this<SHWire> {
const SHWire *childWire;
};

mutable entt::dispatcher dispatcher;

// Storage of data used only during compose
struct ComposeData {
// List of output types used for this wire
Expand All @@ -306,6 +300,7 @@ struct SHWire : public std::enable_shared_from_this<SHWire> {
uint64_t debugId{0}; // used for debugging
shards::OwnedVar astObject; // optional, used for debugging
std::shared_ptr<SHWire> parent; // used in doppelganger pool, we keep track of the template wire
int priority{0}; // used in scheduler

// The wire's running coroutine
shards::Coroutine coro;
Expand Down Expand Up @@ -343,7 +338,9 @@ struct SHWire : public std::enable_shared_from_this<SHWire> {
mutable std::unordered_map<std::string_view, SHExposedTypeInfo> requirements;

SHContext *context{nullptr};
SHWire *resumer{nullptr}; // used in Resume/Start shards

SHWire *resumer{nullptr}; // used in SwitchTo shard
SHWire *childWire{nullptr}; // used in SwitchTo shard

std::weak_ptr<SHMesh> mesh;

Expand Down Expand Up @@ -459,8 +456,34 @@ struct SHWire : public std::enable_shared_from_this<SHWire> {
void addTrait(SHTrait trait);
const std::vector<shards::Trait> &getTraits() const { return traits; }

// less operator, compare by priority fall back to wire id
bool operator<(const SHWire &other) const {
if (priority != other.priority) {
return priority < other.priority;
} else {
return uniqueId < other.uniqueId;
}
}

bool paused{false};

SHWire *tickingWire() {
SHWire *wire = this;
while (wire->childWire) {
wire = wire->childWire;
}
return wire;
}

// regenerate the id, SHMesh uses flat_set which is sorted by id
// this allows us to regenerate when acquiring from pools in order to keep adding new wires at the end of the list
uint64_t regenerateId() { return uniqueId = idCounter.fetch_add(1); }

private:
SHWire(std::string_view wire_name) : name(wire_name) { SHLOG_TRACE("Creating wire: {}", name); }
SHWire(std::string_view wire_name) : name(wire_name) {
SHLOG_TRACE("Creating wire: {}", name);
regenerateId();
}

std::unordered_map<shards::OwnedVar, SHVar, std::hash<shards::OwnedVar>, std::equal_to<shards::OwnedVar>,
boost::alignment::aligned_allocator<std::pair<const shards::OwnedVar, SHVar>, 16>>
Expand All @@ -473,8 +496,10 @@ struct SHWire : public std::enable_shared_from_this<SHWire> {

std::vector<shards::Trait> traits;

private:
void destroy();

uint64_t uniqueId;
static inline std::atomic_uint64_t idCounter{0};
};

namespace shards {
Expand Down
31 changes: 16 additions & 15 deletions shards/core/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ SHRunWireOutput runWire(SHWire *wire, SHContext *context, const SHVar &wireInput
return {wire->previousOutput, SHRunWireOutputState::Running};
}

void run(SHWire *wire, SHFlow *flow, shards::Coroutine *coro) {
void run(SHWire *wire, shards::Coroutine *coro) {
SH_CORO_RESUMED(wire);

// store stack start address here
Expand All @@ -1560,8 +1560,7 @@ void run(SHWire *wire, SHFlow *flow, shards::Coroutine *coro) {
wire->finishedError.clear();

// Create a new context and copy the sink in
SHFlow anonFlow{wire};
SHContext context(coro, wire, flow ? flow : &anonFlow);
SHContext context(coro, wire);
context.stackStart = &stackStart;

// if the wire had a context (Stepped wires in wires.cpp)
Expand Down Expand Up @@ -1602,7 +1601,7 @@ void run(SHWire *wire, SHFlow *flow, shards::Coroutine *coro) {
goto endOfWire;
}

mesh->dispatcher.trigger(SHWire::OnStartEvent{wire});
wire->dispatcher.trigger(SHWire::OnStartEvent{wire});

while (running) {
running = wire->looped;
Expand Down Expand Up @@ -1669,17 +1668,17 @@ void run(SHWire *wire, SHFlow *flow, shards::Coroutine *coro) {
wire->finishedOutput = wire->previousOutput; // cloning over! (OwnedVar)
}

// run cleanup on all the shards
// ensure stop state is set
context.stopFlow(wire->previousOutput);

// if we have a resumer we return to it
if (wire->resumer) {
SHLOG_TRACE("Wire {} ending and resuming {}", wire->name, wire->resumer->name);
context.flow->wire = wire->resumer;
wire->resumer->childWire = nullptr; // reset childWire, this will resume the wire
wire->resumer = nullptr;
}

// run cleanup on all the shards
// ensure stop state is set
context.stopFlow(wire->previousOutput);

// Set onLastResume so tick keeps processing mesh tasks on cleanup
context.onLastResume = true;
wire->cleanup(true);
Expand Down Expand Up @@ -2417,10 +2416,7 @@ void SHWire::cleanup(bool force) {

warmedUp = false;

auto mesh_ = mesh.lock();
if (mesh_) {
mesh_->dispatcher.trigger(SHWire::OnCleanupEvent{this});
}
dispatcher.trigger(SHWire::OnCleanupEvent{this});

// Run cleanup on all shards, prepare them for a new start if necessary
// Do this in reverse to allow a safer cleanup
Expand Down Expand Up @@ -2450,9 +2446,9 @@ void SHWire::cleanup(bool force) {
}
variables.clear();

// finally reset the mesh
auto mesh_ = mesh.lock();
if (mesh_) {
mesh_->wireCleanedUp(this);
mesh_->unschedule(shared_from_this());
}
mesh.reset();

Expand Down Expand Up @@ -2844,6 +2840,11 @@ SHCore *__cdecl shardsInterface(uint32_t abi_version) {
sc->looped = looped;
};

result->setWirePriority = [](SHWireRef wireref, int priority) noexcept {
auto &sc = SHWire::sharedFromRef(wireref);
sc->priority = priority;
};

result->setWireUnsafe = [](SHWireRef wireref, SHBool unsafe) noexcept {
auto &sc = SHWire::sharedFromRef(wireref);
sc->unsafe = unsafe;
Expand Down
Loading
Loading