Skip to content

Commit

Permalink
Split Executor header and source files into corresponding class files
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyRyabinin committed Dec 12, 2023
1 parent 0141323 commit 5fe1708
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"Bodyless", "HTTPGET", "ratelimiter", "Ratelimiter", "STDMETHODCALLTYPE", "CANTSAVE", "OLECHAR", "DISPID",
"UNKNOWNNAME", "DISPPARAMS", "XMLHTTP", "comptr", "Metadataservice", "Streamfn", "HWAVEOUT", "matdesc",
"Presigner", "xindex", "errortype", "waveout", "WAVEOUTCAPSA", "ALLOWSYNC", "WAVEHDR", "MMSYSERR",
"WAVEFORMATEX", "Unprepare", "DDISABLE_IMDSV1",
"WAVEFORMATEX", "Unprepare", "DDISABLE_IMDSV1", "threadpool",
// AWS general
"Arns", "AMZN", "amzn", "Paulo", "Ningxia", "ISOB", "isob", "AWSXML", "IMDSV",
// AWS Signature
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#pragma once

#include <aws/core/utils/threading/Executor.h>

#include <aws/core/utils/memory/stl/AWSMap.h>

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>

namespace Aws
{
namespace Utils
{
namespace Threading
{
/**
* Default Executor implementation. Simply spawns a thread and detaches it.
*/
class AWS_CORE_API DefaultExecutor : public Executor
{
public:
DefaultExecutor() : m_state(State::Free) {}
~DefaultExecutor();

void WaitUntilStopped() override;
protected:
enum class State
{
Free, Locked, Shutdown
};
bool SubmitToThread(std::function<void()>&&) override;
void Detach(std::thread::id id);
std::atomic<State> m_state;
Aws::UnorderedMap<std::thread::id, std::thread> m_threads;
};
} // namespace Threading
} // namespace Utils
} // namespace Aws
91 changes: 8 additions & 83 deletions src/aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@
*/

#pragma once
#if !defined(AWS_EXECUTOR_H)
#define AWS_EXECUTOR_H

#include <aws/core/Core_EXPORTS.h>
#include <aws/core/utils/memory/stl/AWSQueue.h>
#include <aws/core/utils/memory/stl/AWSVector.h>
#include <aws/core/utils/memory/stl/AWSMap.h>
#include <aws/core/utils/threading/Semaphore.h>

#include <functional>
#include <future>
#include <mutex>
#include <atomic>

namespace Aws
{
namespace Utils
{
namespace Threading
{
class ThreadTask;

/**
* Interface for implementing an Executor, to implement a custom thread execution strategy, inherit from this class
* and override SubmitToThread().
Expand Down Expand Up @@ -59,80 +53,11 @@ namespace Aws
*/
virtual bool SubmitToThread(std::function<void()>&&) = 0;
};


/**
* Default Executor implementation. Simply spawns a thread and detaches it.
*/
class AWS_CORE_API DefaultExecutor : public Executor
{
public:
DefaultExecutor() : m_state(State::Free) {}
~DefaultExecutor();

void WaitUntilStopped() override;
protected:
enum class State
{
Free, Locked, Shutdown
};
bool SubmitToThread(std::function<void()>&&) override;
void Detach(std::thread::id id);
std::atomic<State> m_state;
Aws::UnorderedMap<std::thread::id, std::thread> m_threads;
};

enum class OverflowPolicy
{
QUEUE_TASKS_EVENLY_ACROSS_THREADS,
REJECT_IMMEDIATELY
};

/**
* Thread Pool Executor implementation.
*/
class AWS_CORE_API PooledThreadExecutor : public Executor
{
public:
PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS);
~PooledThreadExecutor();

/**
* Rule of 5 stuff.
* Don't copy or move
*/
PooledThreadExecutor(const PooledThreadExecutor&) = delete;
PooledThreadExecutor& operator =(const PooledThreadExecutor&) = delete;
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;

/**
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
*/
void WaitUntilStopped() override;

protected:
bool SubmitToThread(std::function<void()>&&) override;

private:
Aws::Queue<std::function<void()>*> m_tasks;
mutable std::mutex m_queueLock;
Aws::Utils::Threading::Semaphore m_sync;
Aws::Vector<ThreadTask*> m_threadTaskHandles;
size_t m_poolSize = 0;
OverflowPolicy m_overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS;
bool m_stopped{false};

/**
* Once you call this, you are responsible for freeing the memory pointed to by task.
*/
std::function<void()>* PopTask();
bool HasTasks() const;

friend class ThreadTask;
};


} // namespace Threading
} // namespace Utils
} // namespace Aws

// TODO: remove on a next minor API bump from 1.11.x
#endif // !defined(AWS_EXECUTOR_H)
#include <aws/core/utils/threading/DefaultExecutor.h>
#include <aws/core/utils/threading/PooledThreadExecutor.h>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#pragma once

#include <aws/core/utils/threading/Executor.h>

#include <aws/core/utils/memory/stl/AWSQueue.h>
#include <aws/core/utils/memory/stl/AWSVector.h>
#include <aws/core/utils/threading/Semaphore.h>
#include <functional>
#include <mutex>
#include <atomic>

namespace Aws
{
namespace Utils
{
namespace Threading
{
class ThreadTask;

enum class OverflowPolicy
{
QUEUE_TASKS_EVENLY_ACROSS_THREADS,
REJECT_IMMEDIATELY
};

/**
* Thread Pool Executor implementation.
*/
class AWS_CORE_API PooledThreadExecutor : public Executor
{
public:
PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS);
~PooledThreadExecutor();

/**
* Rule of 5 stuff.
* Don't copy or move
*/
PooledThreadExecutor(const PooledThreadExecutor&) = delete;
PooledThreadExecutor& operator =(const PooledThreadExecutor&) = delete;
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;

/**
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
*/
void WaitUntilStopped() override;

protected:
bool SubmitToThread(std::function<void()>&&) override;

private:
Aws::Queue<std::function<void()>*> m_tasks;
mutable std::mutex m_queueLock;
Aws::Utils::Threading::Semaphore m_sync;
Aws::Vector<ThreadTask*> m_threadTaskHandles;
size_t m_poolSize = 0;
OverflowPolicy m_overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS;
bool m_stopped{false};

/**
* Once you call this, you are responsible for freeing the memory pointed to by task.
*/
std::function<void()>* PopTask();
bool HasTasks() const;

friend class ThreadTask;
};
} // namespace Threading
} // namespace Utils
} // namespace Aws
82 changes: 82 additions & 0 deletions src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/core/utils/threading/DefaultExecutor.h>
#include <aws/core/utils/threading/ThreadTask.h>

#include <cassert>

using namespace Aws::Utils::Threading;

bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
{
// Generalized lambda capture is C++14, using std::bind as a workaround to force moving fx (instead of copying)
std::function<void()> main = std::bind(
[this](std::function<void()>& storedFx)
{
storedFx();
Detach(std::this_thread::get_id());
},
std::move(fx)
);

State expected;
do
{
expected = State::Free;
if(m_state.compare_exchange_strong(expected, State::Locked))
{
std::thread t(std::move(main));
const auto id = t.get_id(); // copy the id before we std::move the thread
m_threads.emplace(id, std::move(t));
m_state = State::Free;
return true;
}
}
while(expected != State::Shutdown);
return false;
}

void DefaultExecutor::Detach(std::thread::id id)
{
State expected;
do
{
expected = State::Free;
if(m_state.compare_exchange_strong(expected, State::Locked))
{
auto it = m_threads.find(id);
assert(it != m_threads.end());
it->second.detach();
m_threads.erase(it);
m_state = State::Free;
return;
}
}
while(expected != State::Shutdown);
}

void DefaultExecutor::WaitUntilStopped()
{
auto expected = State::Free;
while(!m_state.compare_exchange_strong(expected, State::Shutdown))
{
//spin while currently detaching threads finish
assert(expected == State::Locked);
expected = State::Free;
}
}

DefaultExecutor::~DefaultExecutor()
{
WaitUntilStopped();

auto it = m_threads.begin();
while(!m_threads.empty())
{
it->second.join();
it = m_threads.erase(it);
}
}
Loading

0 comments on commit 5fe1708

Please sign in to comment.