Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 20, 2023
1 parent 87a7de7 commit a22550c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
15 changes: 8 additions & 7 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/lock.h"
#include "util/path_util.h"
#include "util/thrift_rpc_helper.h"
#include "vec/exec/format/wal/wal_reader.h"
Expand All @@ -41,6 +42,7 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) {
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
_all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
_cv = std::make_shared<doris::ConditionVariable>();
}

WalManager::~WalManager() {
Expand Down Expand Up @@ -199,14 +201,15 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
}
LOG(INFO) << "create wal " << wal_path;
wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes);
wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes, _cv);
RETURN_IF_ERROR(wal_writer->init());
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
_wal_id_to_writer_map.emplace(wal_id, wal_writer);
}
return Status::OK();
}

Status WalManager::scan_wals(const std::string& wal_path) {
size_t count = 0;
bool exists = true;
Expand Down Expand Up @@ -336,13 +339,11 @@ Status WalManager::delete_wal(int64_t wal_id) {
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) {
_all_wal_disk_bytes->store(
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
std::memory_order_relaxed),
std::memory_order_relaxed);
_wal_id_to_writer_map[wal_id]->cv.notify_one();
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
std::memory_order_relaxed);
_cv->notify_one();
std::string wal_path = _wal_path_map[wal_id];
LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is"
LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is "
<< _wal_id_to_writer_map[wal_id]->disk_bytes()
<< " ,after deleting it, all wals disk usage is "
<< _all_wal_disk_bytes->load(std::memory_order_relaxed);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "olap/wal_writer.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/lock.h"
#include "util/thread.h"

namespace doris {
Expand Down Expand Up @@ -85,5 +86,6 @@ class WalManager {
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> _wal_status_queues;
std::atomic<bool> _stop;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::shared_ptr<doris::ConditionVariable> _cv;
};
} // namespace doris
34 changes: 24 additions & 10 deletions be/src/olap/wal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@
#include "olap/wal_writer.h"

#include <atomic>
#include <memory>

#include "common/config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "olap/storage_engine.h"
#include "util/crc32c.h"
#include "util/lock.h"

namespace doris {

const char* k_wal_magic = "WAL1";
const uint32_t k_wal_magic_length = 4;

WalWriter::WalWriter(const std::string& file_name,
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes)
: _file_name(file_name), _disk_bytes(0), _all_wal_disk_bytes(all_wal_disk_bytes) {}
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes,
const std::shared_ptr<doris::ConditionVariable>& cv)
: cv(cv),
_file_name(file_name),
_disk_bytes(0),
_all_wal_disk_bytes(all_wal_disk_bytes),
_is_first_append_blocks(false) {}

WalWriter::~WalWriter() {}

Expand All @@ -52,9 +59,19 @@ Status WalWriter::finalize() {

Status WalWriter::append_blocks(const PBlockArray& blocks) {
{
std::unique_lock l(_mutex);
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > config::wal_max_disk_size) {
cv.wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
if (_is_first_append_blocks) {
_is_first_append_blocks = false;
std::unique_lock l(_mutex);
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
config::wal_max_disk_size) {
LOG(INFO) << "First time to append blocks to wal file " << _file_name
<< ". Currently, all wal disk space usage is "
<< _all_wal_disk_bytes->load(std::memory_order_relaxed)
<< ", larger than the maximum limit " << config::wal_max_disk_size
<< ", so we need to wait. When any other load finished, that wal will be "
"removed, the space used by that wal will be free.";
cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
}
}
}
size_t total_size = 0;
Expand Down Expand Up @@ -82,11 +99,8 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
"failed to write block to wal expected= " + std::to_string(total_size) +
",actually=" + std::to_string(offset));
}
_disk_bytes.store(_disk_bytes.fetch_add(total_size, std::memory_order_relaxed),
std::memory_order_relaxed);
_all_wal_disk_bytes->store(
_all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed),
std::memory_order_relaxed);
_disk_bytes.fetch_add(total_size, std::memory_order_relaxed);
_all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed);
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/wal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ extern const uint32_t k_wal_magic_length;
class WalWriter {
public:
explicit WalWriter(const std::string& file_name,
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes);
const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes,
const std::shared_ptr<doris::ConditionVariable>& cv);
~WalWriter();

Status init();
Expand All @@ -49,7 +50,7 @@ class WalWriter {
public:
static const int64_t LENGTH_SIZE = 8;
static const int64_t CHECKSUM_SIZE = 4;
doris::ConditionVariable cv;
std::shared_ptr<doris::ConditionVariable> cv;
static const int64_t VERSION_SIZE = 4;

private:
Expand All @@ -59,6 +60,7 @@ class WalWriter {
std::atomic_size_t _disk_bytes;
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
doris::Mutex _mutex;
bool _is_first_append_blocks;
};

} // namespace doris

0 comments on commit a22550c

Please sign in to comment.