From e7cfb0965066a9c42690d2f5255b32c6062008bf Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Thu, 30 Jan 2025 09:38:50 -0300 Subject: [PATCH] Python configuration for manifest preload (and some other typing fixes) --- icechunk-python/python/icechunk/__init__.py | 6 + .../python/icechunk/_icechunk_python.pyi | 63 ++++- icechunk-python/src/config.rs | 232 +++++++++++++++++- icechunk-python/src/lib.rs | 6 +- .../tests/data/test-repo/config.yaml | 10 +- icechunk-python/tests/test_config.py | 40 ++- icechunk/src/config.rs | 17 +- 7 files changed, 346 insertions(+), 28 deletions(-) diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index 9dc8ae95..8a1b288e 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -17,6 +17,9 @@ GcsStaticCredentials, GCSummary, IcechunkError, + ManifestConfig, + ManifestPreloadCondition, + ManifestPreloadConfig, ObjectStoreConfig, RebaseFailedData, RepositoryConfig, @@ -91,6 +94,9 @@ "GcsStaticCredentials", "IcechunkError", "IcechunkStore", + "ManifestConfig", + "ManifestPreloadCondition", + "ManifestPreloadConfig", "ObjectStoreConfig", "RebaseFailedData", "RebaseFailedError", diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 496826c8..ded1e300 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -112,6 +112,59 @@ class CachingConfig: @staticmethod def default() -> CachingConfig: ... +class ManifestPreloadCondition: + """Configuration for conditions under which manifests will preload on session creation""" + + @staticmethod + def or_conditions( + conditions: list[ManifestPreloadCondition], + ) -> ManifestPreloadCondition: ... + @staticmethod + def and_conditions( + conditions: list[ManifestPreloadCondition], + ) -> ManifestPreloadCondition: ... + @staticmethod + def path_matches(regex: str) -> ManifestPreloadCondition: ... + @staticmethod + def name_matches(regex: str) -> ManifestPreloadCondition: ... + @staticmethod + def num_refs( + from_refs: int | None, to_refs: int | None + ) -> ManifestPreloadCondition: ... + @staticmethod + def true() -> ManifestPreloadCondition: ... + @staticmethod + def false() -> ManifestPreloadCondition: ... + +class ManifestPreloadConfig: + """Configuration for how Icechunk manifest preload on session creation""" + + def __init__( + self, + max_total_refs: int | None = None, + preload_if: ManifestPreloadCondition | None = None, + ) -> None: ... + @property + def max_total_refs(self) -> int | None: ... + @max_total_refs.setter + def max_total_refs(self, value: int | None) -> None: ... + @property + def preload_if(self) -> ManifestPreloadCondition | None: ... + @preload_if.setter + def preload_if(self, value: ManifestPreloadCondition | None) -> None: ... + +class ManifestConfig: + """Configuration for how Icechunk manifests""" + + def __init__( + self, + preload: ManifestPreloadConfig | None = None, + ) -> None: ... + @property + def preload(self) -> ManifestPreloadConfig | None: ... + @preload.setter + def preload(self, value: ManifestPreloadConfig | None) -> None: ... + class StorageConcurrencySettings: """Configuration for how Icechunk uses its Storage instance""" @@ -150,6 +203,7 @@ class RepositoryConfig: caching: CachingConfig | None = None, storage: StorageSettings | None = None, virtual_chunk_containers: dict[str, VirtualChunkContainer] | None = None, + manifest: ManifestConfig | None = None, ) -> None: ... @staticmethod def default() -> RepositoryConfig: ... @@ -174,9 +228,13 @@ class RepositoryConfig: @caching.setter def caching(self, value: CachingConfig | None) -> None: ... @property - def storage(self) -> Storage | None: ... + def storage(self) -> StorageSettings | None: ... @storage.setter - def storage(self, value: Storage | None) -> None: ... + def storage(self, value: StorageSettings | None) -> None: ... + @property + def manifest(self) -> ManifestConfig | None: ... + @manifest.setter + def manifest(self, value: ManifestConfig | None) -> None: ... @property def virtual_chunk_containers(self) -> dict[str, VirtualChunkContainer] | None: ... def get_virtual_chunk_container(self, name: str) -> VirtualChunkContainer | None: ... @@ -519,6 +577,7 @@ class Storage: *, config: dict[str, str] | None = None, ) -> Storage: ... + def default_settings(self) -> StorageSettings: ... class VersionSelection(Enum): """Enum for selecting the which version of a conflict""" diff --git a/icechunk-python/src/config.rs b/icechunk-python/src/config.rs index da156c75..dfd542db 100644 --- a/icechunk-python/src/config.rs +++ b/icechunk-python/src/config.rs @@ -13,7 +13,8 @@ use icechunk::{ config::{ AzureCredentials, AzureStaticCredentials, CachingConfig, CompressionAlgorithm, CompressionConfig, Credentials, CredentialsFetcher, GcsCredentials, - GcsStaticCredentials, S3Credentials, S3Options, S3StaticCredentials, + GcsStaticCredentials, ManifestConfig, ManifestPreloadCondition, + ManifestPreloadConfig, S3Credentials, S3Options, S3StaticCredentials, }, storage::{self, ConcurrencySettings}, virtual_chunks::VirtualChunkContainer, @@ -702,6 +703,209 @@ impl PyStorageSettings { } } +#[pyclass(name = "ManifestPreloadCondition", eq)] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PyManifestPreloadCondition { + Or(Vec), + And(Vec), + PathMatches { regex: String }, + NameMatches { regex: String }, + NumRefs { from: Option, to: Option }, + True(), + False(), +} + +#[pymethods] +impl PyManifestPreloadCondition { + #[staticmethod] + pub fn or_conditions(conditions: Vec) -> Self { + Self::Or(conditions) + } + #[staticmethod] + pub fn and_conditions(conditions: Vec) -> Self { + Self::And(conditions) + } + #[staticmethod] + pub fn path_matches(regex: String) -> Self { + Self::PathMatches { regex } + } + #[staticmethod] + pub fn name_matches(regex: String) -> Self { + Self::NameMatches { regex } + } + #[staticmethod] + #[pyo3(signature = (from, to))] + pub fn num_refs(from: Option, to: Option) -> Self { + Self::NumRefs { from, to } + } + #[staticmethod] + pub fn r#true() -> Self { + Self::True() + } + #[staticmethod] + pub fn r#false() -> Self { + Self::False() + } +} + +impl From<&PyManifestPreloadCondition> for ManifestPreloadCondition { + fn from(value: &PyManifestPreloadCondition) -> Self { + use PyManifestPreloadCondition::*; + match value { + Or(vec) => Self::Or(vec.iter().map(|c| c.into()).collect()), + And(vec) => Self::And(vec.iter().map(|c| c.into()).collect()), + PathMatches { regex } => Self::PathMatches { regex: regex.clone() }, + NameMatches { regex } => Self::NameMatches { regex: regex.clone() }, + NumRefs { from, to } => Self::NumRefs { + from: from + .map(std::ops::Bound::Included) + .unwrap_or(std::ops::Bound::Unbounded), + to: to + .map(std::ops::Bound::Excluded) + .unwrap_or(std::ops::Bound::Unbounded), + }, + True() => Self::True, + False() => Self::False, + } + } +} + +impl From for PyManifestPreloadCondition { + fn from(value: ManifestPreloadCondition) -> Self { + fn bound_from(from: std::ops::Bound) -> Option { + match from { + std::ops::Bound::Included(n) => Some(n), + std::ops::Bound::Excluded(n) => Some(n + 1), + std::ops::Bound::Unbounded => None, + } + } + + fn bound_to(to: std::ops::Bound) -> Option { + match to { + std::ops::Bound::Included(n) => Some(n + 1), + std::ops::Bound::Excluded(n) => Some(n), + std::ops::Bound::Unbounded => None, + } + } + + use ManifestPreloadCondition::*; + match value { + Or(vec) => Self::Or(vec.into_iter().map(|c| c.into()).collect()), + And(vec) => Self::And(vec.into_iter().map(|c| c.into()).collect()), + PathMatches { regex } => Self::PathMatches { regex }, + NameMatches { regex } => Self::NameMatches { regex }, + NumRefs { from, to } => { + Self::NumRefs { from: bound_from(from), to: bound_to(to) } + } + True => Self::True(), + False => Self::False(), + } + } +} + +#[pyclass(name = "ManifestPreloadConfig", eq)] +#[derive(Debug)] +pub struct PyManifestPreloadConfig { + #[pyo3(get, set)] + pub max_total_refs: Option, + #[pyo3(get, set)] + pub preload_if: Option>, +} + +#[pymethods] +impl PyManifestPreloadConfig { + #[new] + #[pyo3(signature = (max_total_refs=None, preload_if=None))] + fn new( + max_total_refs: Option, + preload_if: Option>, + ) -> Self { + Self { max_total_refs, preload_if } + } +} + +impl PartialEq for PyManifestPreloadConfig { + fn eq(&self, other: &Self) -> bool { + let x: ManifestPreloadConfig = self.into(); + let y: ManifestPreloadConfig = other.into(); + x == y + } +} + +impl From<&PyManifestPreloadConfig> for ManifestPreloadConfig { + fn from(value: &PyManifestPreloadConfig) -> Self { + Python::with_gil(|py| Self { + max_total_refs: value.max_total_refs, + preload_if: value.preload_if.as_ref().map(|c| (&*c.borrow(py)).into()), + }) + } +} + +impl From for PyManifestPreloadConfig { + fn from(value: ManifestPreloadConfig) -> Self { + #[allow(clippy::expect_used)] + Python::with_gil(|py| Self { + max_total_refs: value.max_total_refs, + preload_if: value.preload_if.map(|c| { + Py::new(py, Into::::into(c)) + .expect("Cannot create instance of ManifestPreloadCondition") + }), + }) + } +} + +#[pyclass(name = "ManifestConfig", eq)] +#[derive(Debug, Default)] +pub struct PyManifestConfig { + #[pyo3(get, set)] + pub preload: Option>, +} + +#[pymethods] +impl PyManifestConfig { + #[new] + #[pyo3(signature = (preload=None))] + fn new(preload: Option>) -> Self { + Self { preload } + } + + pub fn __repr__(&self) -> String { + // TODO: improve repr + format!( + r#"ManifestConfig(preload={pre})"#, + pre = format_option_to_string(self.preload.as_ref().map(|l| l.to_string())), + ) + } +} + +impl PartialEq for PyManifestConfig { + fn eq(&self, other: &Self) -> bool { + let x: ManifestConfig = self.into(); + let y: ManifestConfig = other.into(); + x == y + } +} + +impl From<&PyManifestConfig> for ManifestConfig { + fn from(value: &PyManifestConfig) -> Self { + Python::with_gil(|py| Self { + preload: value.preload.as_ref().map(|c| (&*c.borrow(py)).into()), + }) + } +} + +impl From for PyManifestConfig { + fn from(value: ManifestConfig) -> Self { + #[allow(clippy::expect_used)] + Python::with_gil(|py| Self { + preload: value.preload.map(|c| { + Py::new(py, Into::::into(c)) + .expect("Cannot create instance of ManifestPreloadConfig") + }), + }) + } +} + #[pyclass(name = "RepositoryConfig", eq)] #[derive(Debug)] pub struct PyRepositoryConfig { @@ -719,6 +923,8 @@ pub struct PyRepositoryConfig { pub storage: Option>, #[pyo3(get)] pub virtual_chunk_containers: Option>, + #[pyo3(get, set)] + pub manifest: Option>, } impl PartialEq for PyRepositoryConfig { @@ -741,8 +947,7 @@ impl From<&PyRepositoryConfig> for RepositoryConfig { virtual_chunk_containers: value.virtual_chunk_containers.as_ref().map(|c| { c.iter().map(|(name, cont)| (name.clone(), cont.into())).collect() }), - // FIXME: implement manifest preloading configuration in python - manifest: None, + manifest: value.manifest.as_ref().map(|c| (&*c.borrow(py)).into()), }) } } @@ -769,6 +974,11 @@ impl From for PyRepositoryConfig { virtual_chunk_containers: value .virtual_chunk_containers .map(|c| c.into_iter().map(|(name, cont)| (name, cont.into())).collect()), + + manifest: value.manifest.map(|c| { + Py::new(py, Into::::into(c)) + .expect("Cannot create instance of ManifestConfig") + }), }) } } @@ -782,7 +992,8 @@ impl PyRepositoryConfig { } #[new] - #[pyo3(signature = (inline_chunk_threshold_bytes = None, unsafe_overwrite_refs = None, get_partial_values_concurrency = None, compression = None, caching = None, storage = None, virtual_chunk_containers = None))] + #[pyo3(signature = (inline_chunk_threshold_bytes = None, unsafe_overwrite_refs = None, get_partial_values_concurrency = None, compression = None, caching = None, storage = None, virtual_chunk_containers = None, manifest = None))] + #[allow(clippy::too_many_arguments)] pub fn new( inline_chunk_threshold_bytes: Option, unsafe_overwrite_refs: Option, @@ -791,6 +1002,7 @@ impl PyRepositoryConfig { caching: Option>, storage: Option>, virtual_chunk_containers: Option>, + manifest: Option>, ) -> Self { Self { inline_chunk_threshold_bytes, @@ -800,6 +1012,7 @@ impl PyRepositoryConfig { caching, storage, virtual_chunk_containers, + manifest, } } @@ -851,15 +1064,22 @@ impl PyRepositoryConfig { .extract::(py) .expect("Cannot call __repr__") })); + let manifest: String = format_option(self.manifest.as_ref().map(|c| { + c.call_method0(py, "__repr__") + .expect("Cannot call __repr__") + .extract::(py) + .expect("Cannot call __repr__") + })); // TODO: virtual chunk containers format!( - r#"RepositoryConfig(inline_chunk_threshold_bytes={inl}, unsafe_overwrite_refs={uns}, get_partial_values_concurrency={partial}, compression={comp}, caching={caching}, storage={storage})"#, + r#"RepositoryConfig(inline_chunk_threshold_bytes={inl}, unsafe_overwrite_refs={uns}, get_partial_values_concurrency={partial}, compression={comp}, caching={caching}, storage={storage}, manifest={manifest})"#, inl = format_option_to_string(self.inline_chunk_threshold_bytes), uns = format_option(self.unsafe_overwrite_refs.map(format_bool)), partial = format_option_to_string(self.get_partial_values_concurrency), comp = comp, caching = caching, - storage = storage + storage = storage, + manifest = manifest, ) }) } diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index 20b69610..6def166f 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -9,7 +9,8 @@ mod streams; use config::{ PyAzureCredentials, PyAzureStaticCredentials, PyCachingConfig, PyCompressionAlgorithm, PyCompressionConfig, PyCredentials, PyGcsCredentials, - PyGcsStaticCredentials, PyObjectStoreConfig, PyRepositoryConfig, PyS3Credentials, + PyGcsStaticCredentials, PyManifestConfig, PyManifestPreloadCondition, + PyManifestPreloadConfig, PyObjectStoreConfig, PyRepositoryConfig, PyS3Credentials, PyS3Options, PyS3StaticCredentials, PyStorage, PyStorageConcurrencySettings, PyStorageSettings, PyVirtualChunkContainer, PythonCredentialsFetcher, }; @@ -55,6 +56,9 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/icechunk-python/tests/data/test-repo/config.yaml b/icechunk-python/tests/data/test-repo/config.yaml index 4d229fc9..71b6f838 100644 --- a/icechunk-python/tests/data/test-repo/config.yaml +++ b/icechunk-python/tests/data/test-repo/config.yaml @@ -8,7 +8,7 @@ virtual_chunk_containers: s3: name: s3 url_prefix: s3:// - store: !S3Compatible + store: !s3_compatible region: us-east-1 endpoint_url: http://localhost:9000 anonymous: false @@ -16,11 +16,11 @@ virtual_chunk_containers: az: name: az url_prefix: az - store: !Azure {} + store: !azure {} tigris: name: tigris url_prefix: tigris - store: !Tigris + store: !tigris region: null endpoint_url: https://fly.storage.tigris.dev anonymous: false @@ -28,8 +28,8 @@ virtual_chunk_containers: gcs: name: gcs url_prefix: gcs - store: !Gcs {} + store: !gcs {} file: name: file url_prefix: file - store: !LocalFileSystem '' + store: !local_file_system '' diff --git a/icechunk-python/tests/test_config.py b/icechunk-python/tests/test_config.py index fce5a8b6..53a03693 100644 --- a/icechunk-python/tests/test_config.py +++ b/icechunk-python/tests/test_config.py @@ -1,4 +1,5 @@ import os +import re from collections.abc import Generator from pathlib import Path @@ -125,10 +126,11 @@ def test_virtual_chunk_containers() -> None: ) container = icechunk.VirtualChunkContainer("custom", "s3://", store_config) config.set_virtual_chunk_container(container) - assert ( - repr(config) - == r"RepositoryConfig(inline_chunk_threshold_bytes=None, unsafe_overwrite_refs=None, get_partial_values_concurrency=None, compression=None, caching=None, storage=None)" + assert re.match( + r"RepositoryConfig\(inline_chunk_threshold_bytes=None, unsafe_overwrite_refs=None, get_partial_values_concurrency=None, compression=None, caching=None, storage=None, manifest=.*\)", + repr(config), ) + assert config.virtual_chunk_containers assert len(config.virtual_chunk_containers) > 1 found_cont = [ cont @@ -166,11 +168,20 @@ def test_can_change_deep_config_values() -> None: config.compression.level = 2 config.caching = icechunk.CachingConfig(num_chunk_refs=8) config.storage = storage.default_settings() + assert config.storage.concurrency config.storage.concurrency.ideal_concurrent_request_size = 1_000_000 + config.manifest = icechunk.ManifestConfig() + config.manifest.preload = icechunk.ManifestPreloadConfig(max_total_refs=42) + config.manifest.preload.preload_if = icechunk.ManifestPreloadCondition.and_conditions( + [ + icechunk.ManifestPreloadCondition.true(), + icechunk.ManifestPreloadCondition.name_matches("foo"), + ] + ) - assert ( - repr(config) - == r"RepositoryConfig(inline_chunk_threshold_bytes=5, unsafe_overwrite_refs=True, get_partial_values_concurrency=42, compression=CompressionConfig(algorithm=None, level=2), caching=CachingConfig(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None), storage=StorageSettings(concurrency=StorageConcurrencySettings(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000)))" + assert re.match( + r"RepositoryConfig\(inline_chunk_threshold_bytes=5, unsafe_overwrite_refs=True, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)", + repr(config), ) repo = icechunk.Repository.open( storage=storage, @@ -179,7 +190,24 @@ def test_can_change_deep_config_values() -> None: repo.save_config() stored_config = icechunk.Repository.fetch_config(storage) + assert stored_config assert stored_config.inline_chunk_threshold_bytes == 5 + assert stored_config.compression assert stored_config.compression.level == 2 + assert stored_config.caching assert stored_config.caching.num_chunk_refs == 8 + assert stored_config.storage + assert stored_config.storage.concurrency assert stored_config.storage.concurrency.ideal_concurrent_request_size == 1_000_000 + assert stored_config.manifest + assert stored_config.manifest.preload + assert config.manifest.preload.max_total_refs == 42 + assert ( + config.manifest.preload.preload_if + == icechunk.ManifestPreloadCondition.and_conditions( + [ + icechunk.ManifestPreloadCondition.true(), + icechunk.ManifestPreloadCondition.name_matches("foo"), + ] + ) + ) diff --git a/icechunk/src/config.rs b/icechunk/src/config.rs index 8d03491c..df337bc6 100644 --- a/icechunk/src/config.rs +++ b/icechunk/src/config.rs @@ -24,6 +24,7 @@ pub struct S3Options { } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] pub enum ObjectStoreConfig { InMemory, LocalFileSystem(PathBuf), @@ -35,6 +36,7 @@ pub enum ObjectStoreConfig { } #[derive(Debug, PartialEq, Eq, Default, Serialize, Deserialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] pub enum CompressionAlgorithm { #[default] Zstd, @@ -105,6 +107,7 @@ impl CachingConfig { } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub enum ManifestPreloadCondition { Or(Vec), And(Vec), @@ -369,19 +372,17 @@ pub trait CredentialsFetcher: fmt::Debug + Sync + Send { #[derive(Clone, Debug, Deserialize, Serialize, Default)] #[serde(tag = "type")] +#[serde(rename_all = "snake_case")] pub enum S3Credentials { #[default] - #[serde(rename = "from_env")] FromEnv, - #[serde(rename = "none")] Anonymous, - #[serde(rename = "static")] Static(S3StaticCredentials), - #[serde(rename = "refreshable")] Refreshable(Arc), } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] pub enum GcsStaticCredentials { ServiceAccount(PathBuf), ServiceAccountKey(String), @@ -389,14 +390,14 @@ pub enum GcsStaticCredentials { } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] pub enum GcsCredentials { - #[serde(rename = "from_env")] FromEnv, - #[serde(rename = "static")] Static(GcsStaticCredentials), } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] pub enum AzureStaticCredentials { AccessKey(String), SASToken(String), @@ -404,15 +405,15 @@ pub enum AzureStaticCredentials { } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] pub enum AzureCredentials { - #[serde(rename = "from_env")] FromEnv, - #[serde(rename = "static")] Static(AzureStaticCredentials), } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "type")] +#[serde(rename_all = "snake_case")] pub enum Credentials { S3(S3Credentials), Gcs(GcsCredentials),