diff --git a/CMake/Findphoton.cmake b/CMake/Findphoton.cmake index 471ce598..90b7fdf2 100644 --- a/CMake/Findphoton.cmake +++ b/CMake/Findphoton.cmake @@ -4,7 +4,7 @@ set(FETCHCONTENT_QUIET false) FetchContent_Declare( photon GIT_REPOSITORY https://github.com/alibaba/PhotonLibOS.git - GIT_TAG v0.6.10 + GIT_TAG v0.6.11 ) if(BUILD_TESTING) diff --git a/README.md b/README.md index 398fc069..a85b3130 100644 --- a/README.md +++ b/README.md @@ -199,7 +199,7 @@ Default configure file `overlaybd.json` is installed to `/etc/overlaybd/`. | enableAudit | Enable audit or not. | | enableThread | Enable overlaybd device run in seprate thread or not. Note `cacheType` should be `ocf`. `false` is default. | | auditPath | The path for audit file, `/var/log/overlaybd-audit.log` is the default value. | -| registryFsVersion | registry client version, 'v1' libcurl based, 'v2' is photon http based. 'v1' is the default value. | +| registryFsVersion | registry client version, 'v1' libcurl based, 'v2' is photon http based. 'v2' is the default value. | | prefetchConfig.concurrency | Prefetch concurrency for reloading trace, `16` is default | > NOTE: `download` is the config for background downloading. After an overlaybd device is lauched, a background task will be running to fetch the whole blobs into local directories. After downloading, I/O requests are directed to local files. Unlike other options, download config is reloaded when a device launching. diff --git a/src/config.h b/src/config.h index 127b006f..2d0f9db0 100644 --- a/src/config.h +++ b/src/config.h @@ -140,7 +140,7 @@ struct GlobalConfig : public ConfigUtils::Config { APPCFG_PARA(p2pConfig, P2PConfig); APPCFG_PARA(exporterConfig, ExporterConfig); APPCFG_PARA(auditPath, std::string, "/var/log/overlaybd-audit.log"); - APPCFG_PARA(registryFsVersion, std::string, "v1"); + APPCFG_PARA(registryFsVersion, std::string, "v2"); APPCFG_PARA(cacheConfig, CacheConfig); APPCFG_PARA(gzipCacheConfig, GzipCacheConfig); APPCFG_PARA(logConfig, LogConfig); diff --git a/src/overlaybd/registryfs/registryfs.h b/src/overlaybd/registryfs/registryfs.h index b3ad76b9..3414ff07 100644 --- a/src/overlaybd/registryfs/registryfs.h +++ b/src/overlaybd/registryfs/registryfs.h @@ -35,5 +35,10 @@ photon::fs::IFileSystem *new_registryfs_v1(PasswordCB callback, photon::fs::IFileSystem *new_registryfs_v2(PasswordCB callback, const char *caFile = nullptr, uint64_t timeout = -1); -} +photon::fs::IFile* new_registry_uploader(photon::fs::IFile *lfile, + std::string &upload_url, + std::string &username, std::string &password, + uint64_t timeout, + ssize_t upload_bs = -1); +} diff --git a/src/overlaybd/registryfs/registryfs_v2.cpp b/src/overlaybd/registryfs/registryfs_v2.cpp index 5b1dd590..5e783714 100644 --- a/src/overlaybd/registryfs/registryfs_v2.cpp +++ b/src/overlaybd/registryfs/registryfs_v2.cpp @@ -23,7 +23,7 @@ #include #include #include - +#include #include #include #include @@ -39,6 +39,8 @@ #include #include #include +#include +#include using namespace photon::fs; using namespace photon::net::http; @@ -52,9 +54,14 @@ static const uint64_t kMinimalMetaLife = 300L * 1000 * 1000; // actual_url lives using HTTP_OP = photon::net::http::Client::OperationOnStack<64 * 1024 - 1>; -static std::unordered_map str_to_kvmap(const estring &src) { +static std::unordered_map str_to_kvmap(estring &src) { + size_t pos = 0; + while ((pos = src.find("\",", pos)) != std::string::npos) { + src.replace(pos, 2, "\";"); + pos += 2; + } std::unordered_map ret; - for (const auto &token : src.split(',')) { + for (const auto &token : src.split(';')) { auto pos = token.find_first_of('='); auto key = token.substr(0, pos); auto val = token.substr(pos + 1).trim('\"'); @@ -127,9 +134,9 @@ class RegistryFSImpl_v2 : public RegistryFS { estring *actual_url = (estring*)&url; if (actual_info->mode == UrlMode::Redirect) actual_url = &actual_info->info; - //use p2p proxy + // use p2p proxy estring accelerate_url; - if(m_accelerate.size() > 0) { + if (m_accelerate.size() > 0) { accelerate_url = estring().appends(m_accelerate, "/", *actual_url); actual_url = &accelerate_url; LOG_DEBUG("p2p_url: `", *actual_url); @@ -187,8 +194,7 @@ class RegistryFSImpl_v2 : public RegistryFS { if (!scope.empty()) { token = m_scope_token.acquire(scope, [&]() -> estring * { estring *token = new estring(); - auto ret = m_callback(url.data()); - if (!authenticate(authurl, ret.first, ret.second, token, tmo.timeout())) { + if (get_token(url, authurl, *token, tmo.timeout()) < 0) { code = 401; delete token; return nullptr; @@ -236,6 +242,29 @@ class RegistryFSImpl_v2 : public RegistryFS { return 0; } + photon::net::http::Client* get_client() { + return m_client; + } + + void refresh_client() { + delete m_client; + m_client = new_http_client(); + } + + int refresh_token(const estring &url, estring &token) { + estring authurl, scope; + Timeout tmo(m_timeout); + if (get_scope_auth(url, &authurl, &scope, tmo.timeout(), true) < 0) + return -1; + if (!scope.empty()) { + get_token(url, authurl, token, tmo.timeout()); + if (token.empty()) { + LOG_ERROR_RETURN(0, -1, "Failed to get token"); + } + } + return 0; + } + protected: PasswordCB m_callback; estring m_accelerate; @@ -246,13 +275,18 @@ class RegistryFSImpl_v2 : public RegistryFS { ObjectCache m_scope_token; ObjectCache m_url_info; - int get_scope_auth(const estring &url, estring *authurl, estring *scope, uint64_t timeout) { + int get_scope_auth(const estring &url, estring *authurl, estring *scope, uint64_t timeout, + bool push = false) { Timeout tmo(timeout); - - HTTP_OP op(m_client, Verb::GET, url); + auto verb = push ? Verb::POST : Verb::GET; + HTTP_OP op(m_client, verb, url); op.follow = 0; op.retry = 0; - op.req.headers.range(0, 0); + if (!push) + op.req.headers.range(0, 0); + else + op.req.headers.insert("Content-Type", "application/octet-stream"); + op.timeout = tmo.timeout(); op.call(); if (op.status_code == -1) @@ -298,6 +332,15 @@ class RegistryFSImpl_v2 : public RegistryFS { return 0; } + int get_token(const estring &url, const estring &authurl, estring &token, uint64_t timeout) { + auto ret = m_callback(url.data()); + if (!authenticate(authurl, ret.first, ret.second, &token, timeout)) { + token = ""; + return -1; + } + return 0; + } + bool authenticate(const estring &authurl, std::string &username, std::string &password, estring *token, uint64_t timeout) { Timeout tmo(timeout); @@ -313,18 +356,18 @@ class RegistryFSImpl_v2 : public RegistryFS { op.timeout = tmo.timeout(); op.call(); if (op.status_code != 200) { - LOG_ERROR_RETURN(EPERM, false, "invalid key"); + LOG_ERROR_RETURN(EPERM, false, "invalid key, code=", op.status_code); } estring body; - body.resize(16 *1024); - auto len = op.resp.read((void*)body.data(), 16 *1024); + body.resize(16 * 1024); + auto len = op.resp.read((void*)body.data(), 16 * 1024); body.resize(len); if (op.status_code == 200 && parse_token(body, token) == 0) return true; LOG_ERROR_RETURN(EPERM, false, "auth failed, response code=` ", op.status_code, VALUE(authurl)); } -}; // namespace FileSystem +}; class RegistryFileImpl_v2 : public photon::fs::VirtualReadOnlyFile { public: @@ -340,7 +383,7 @@ class RegistryFileImpl_v2 : public photon::fs::VirtualReadOnlyFile { return m_fs; } - ssize_t preadv(const struct iovec *iov, int iovcnt, off_t offset) { + ssize_t preadv(const struct iovec *iov, int iovcnt, off_t offset) override { if (m_filesize == 0) { struct stat stat; auto stret = fstat(&stat); @@ -432,3 +475,301 @@ IFileSystem *new_registryfs_v2(PasswordCB callback, const char *caFile, uint64_t LOG_ERROR_RETURN(EINVAL, nullptr, "password callback not set"); return new RegistryFSImpl_v2(callback, caFile ? caFile : "", timeout); } + +class RegistryUploader : public VirtualFile { +public: + photon::semaphore m_sem, m_init_sem; + SHA256_CTX m_sha256_ctx = {0}; + std::string m_sha256sum; + std::thread m_upload_th; + IFile *m_local_file; + estring m_origin_upload_url, m_upload_url; + ssize_t m_upload_chunk_size = 128 * 1024 * 1024; + void *m_upload_buf; + off_t m_upload_pos = 0, m_write_pos = 0; + bool m_finished = false, m_failed = false; + RegistryFSImpl_v2 *m_upload_fs; + uint64_t m_http_client_ts = 0; + std::string m_username, m_password; + uint64_t m_timeout = -1; + estring m_token; + + RegistryUploader(IFile *lfile, std::string &upload_url, std::string &username, + std::string &password, uint64_t timeout = -1, ssize_t upload_bs = -1) + : m_local_file(lfile), m_origin_upload_url(upload_url), m_username(username), m_password(password), + m_timeout(timeout) { + if (upload_bs != -1) + m_upload_chunk_size = upload_bs; + SHA256_Init(&m_sha256_ctx); + } + + int init() { + LOG_INFO("init registry upload ", VALUE(m_username)); + m_upload_th = std::thread(&RegistryUploader::upload_thread, this); + m_init_sem.wait(1); + if (m_failed) { + m_upload_th.join(); + return -1; + } + return 0; + } + + ~RegistryUploader() { + } + + int fsync() override { + if (m_failed) { + m_upload_th.join(); + return -1; + } + // calc sha256 result + unsigned char sha[32]; + SHA256_Final(sha, &m_sha256_ctx); + char res[SHA256_DIGEST_LENGTH * 2]; + for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) + sprintf(res + (i * 2), "%02x", sha[i]); + m_sha256sum = "sha256:" + std::string(res, SHA256_DIGEST_LENGTH * 2); + LOG_INFO(VALUE(m_sha256sum)); + + m_finished = true; + m_sem.signal(1); + m_upload_th.join(); + if (m_failed) { + return -1; + } + return 0; + } + + UNIMPLEMENTED(int fdatasync() override); + UNIMPLEMENTED(int close() override); + UNIMPLEMENTED(int fchmod(mode_t mode) override); + UNIMPLEMENTED(int fchown(uid_t owner, gid_t group) override); + UNIMPLEMENTED(int ftruncate(off_t length) override); + virtual IFileSystem *filesystem() { + return nullptr; + } + virtual int fstat(struct stat *buf) override { + return m_local_file->fstat(buf); + } + + ssize_t preadv(const struct iovec *iov, int iovcnt, off_t offset) override { + LOG_ERRNO_RETURN(EINVAL, -1, "not readable"); + } + + ssize_t write(const void *buf, size_t count) override { + if (m_failed) { + LOG_ERROR_RETURN(EINVAL, -1, "already failed"); + } + auto rc = m_local_file->write(buf, count); + if (rc < 0) { + LOG_ERRNO_RETURN(0, -1, "failed to write local file", VALUE(rc)); + } + if (rc > 0 && SHA256_Update(&m_sha256_ctx, buf, rc) < 0) { + LOG_ERRNO_RETURN(0, -1, "sha256 calculate error"); + } + m_write_pos += rc; + m_sem.signal(1); + return rc; + } + + ssize_t pwrite(const void *buf, size_t count, off_t offset) override { + LOG_ERROR_RETURN(EINVAL, -1, "pwrite is not supported"); + } + + std::pair load_auth(const char *remote_path) { + return std::make_pair(m_username, m_password); + } + + // non-empty digest means complete request + off_t upload_chunk(off_t offset, size_t count, std::string_view digest) { + LOG_INFO("upload chunk ", VALUE(offset), VALUE(count), VALUE(digest)); + Timeout tmo(m_timeout); + auto verb = Verb::PATCH; + estring url = m_upload_url; + if (!digest.empty()) { + verb = Verb::PUT; + estring delimiter = "?"; + if (m_upload_url.find("?") != std::string::npos) { + delimiter = "&"; + } + url = estring().appends(m_upload_url, delimiter, "digest=", digest); + } + int retry = 3; + LOG_INFO(VALUE(url)); + again: + if (photon::now - m_http_client_ts >= 5ULL * 60 * 1000 * 1000) { + LOG_INFO("http client expire, refresh"); + m_upload_fs->refresh_client(); + m_http_client_ts = photon::now; + } + HTTP_OP op(m_upload_fs->get_client(), verb, url); + op.follow = 0; + op.retry = 0; + op.req.headers.content_length(count); + + auto writer = [&](Request *req) -> ssize_t { + auto start = offset; + ssize_t ret = 0; + while (start < (off_t)(offset + count)) { + ssize_t cnt = 1024 * 1024; + if ((off_t)(start + cnt) > (off_t)(offset + count)) + cnt = offset + count - start; + auto rc = m_local_file->pread(m_upload_buf, cnt, start); + if (rc != cnt) { + LOG_ERRNO_RETURN(0, -1, "failed to read file", VALUE(rc), VALUE(cnt)); + } + rc = req->write(m_upload_buf, cnt); + if (rc != cnt) { + LOG_ERRNO_RETURN(0, -1, "failed to upload", VALUE(rc), VALUE(cnt)); + } + start += cnt; + ret += cnt; + } + return ret; + }; + + if (digest.empty()) { + op.req.headers.insert("Content-Type", "application/octet-stream"); + op.req.headers.insert_format("Content-Range", "%lu-%lu", offset, offset + count - 1); + op.body_writer = writer; + } + op.req.headers.insert(kAuthHeaderKey, "Bearer "); + op.req.headers.value_append(m_token); + op.timeout = tmo.timeout(); + op.call(); + + if (op.status_code == 401 || op.status_code == 403) { + LOG_WARN("Token invalid, try refresh"); + if (retry--) { + if (m_upload_fs->refresh_token(m_upload_url, m_token) < 0) { + LOG_ERRNO_RETURN(0, -1, "failed update token"); + } + goto again; + } + } + + if (op.status_code / 100 == 2) { + if (count > 0) { + auto rg = op.resp.headers.range(); + if (rg.second == -1) { + LOG_ERRNO_RETURN(0, -1, "failed to upload, range=(`-`)", rg.first, rg.second); + } + auto new_upload_pos = rg.second + 1; + m_upload_url = op.resp.headers["Location"]; + return new_upload_pos; + } else { + LOG_INFO(op.resp.headers["Docker-Content-Digest"]); + } + return 0; + } + LOG_ERRNO_RETURN(0, -1, "failed to upload, code=", op.status_code); + } + + int upload_thread() { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + DEFER(photon::fini()); + m_upload_fs = new RegistryFSImpl_v2({this, &RegistryUploader::load_auth}, "", m_timeout); + DEFER({ delete m_upload_fs; }); + m_http_client_ts = photon::now; + ::posix_memalign(&m_upload_buf, 4096, 1024 * 1024); + DEFER(free(m_upload_buf)); + int retry = 3; + again: + m_upload_pos = 0; + if (init_upload() < 0) { + if (retry--) { + goto again; + } + m_failed = true; + m_init_sem.signal(1); + LOG_ERRNO_RETURN(0, -1, "failed to init upload"); + } + + m_init_sem.signal(1); + while (!m_finished && !m_failed) { + m_sem.wait(1); + while (m_write_pos > m_upload_pos + m_upload_chunk_size) { + m_upload_pos = upload_chunk(m_upload_pos, m_upload_chunk_size, ""); + if (m_upload_pos < 0) { + if (retry--) { + LOG_ERROR("failed to upload chunk, retry"); + m_sem.signal(1); + goto again; + } + + m_failed = true; + goto fail; + } + } + } + while (m_write_pos > m_upload_pos && !m_failed) { + auto size = m_write_pos - m_upload_pos; + if (size > m_upload_chunk_size) + size = m_upload_chunk_size; + m_upload_pos = upload_chunk(m_upload_pos, size, ""); + if (m_upload_pos < 0) { + if (retry--) { + LOG_ERROR("failed to upload chunk, retry"); + goto again; + } + m_failed = true; + goto fail; + } + } + + // send complete + m_upload_pos = upload_chunk(m_upload_pos, 0, m_sha256sum); + if (m_upload_pos < 0) { + if (retry--) { + LOG_ERROR("failed to send complete request, retry"); + goto again; + } + m_failed = true; + LOG_ERROR("failed to send complete request"); + goto fail; + } + LOG_INFO("file uploaded"); + return 0; + + fail: + LOG_ERROR("file upload failed"); + return -1; + } + + int init_upload() { + m_upload_url = m_origin_upload_url; + if (m_upload_fs->refresh_token(m_upload_url, m_token) < 0) { + return -1; + } + + Timeout tmo(m_timeout); + HTTP_OP op(m_upload_fs->get_client(), Verb::POST, m_upload_url); + op.req.headers.insert("Content-Type", "application/octet-stream"); + op.req.headers.insert(kAuthHeaderKey, "Bearer "); + op.req.headers.value_append(m_token); + op.follow = 0; + op.retry = 0; + op.timeout = tmo.timeout(); + op.call(); + if (op.status_code == 401 || op.status_code == 403) { + LOG_ERROR_RETURN(0, -1, "Token invalid"); + } + if (op.status_code / 100 == 2) { + auto location = op.resp.headers["Location"]; + m_upload_url = std::string(location); + LOG_INFO(VALUE(m_upload_url)); + return 0; + } + LOG_ERROR_RETURN(0, -1, "failed to get upload url, code=`", op.status_code); + } +}; + +IFile *new_registry_uploader(IFile *lfile, std::string &upload_url, std::string &username, + std::string &password, uint64_t timeout, ssize_t upload_bs) { + auto ret = new RegistryUploader(lfile, upload_url, username, password, timeout, upload_bs); + if (ret->init() < 0) { + delete ret; + return nullptr; + } + return ret; +}