diff --git a/zarr/v3/abc/array.py b/zarr/v3/abc/array.py index 976aa48618..c4004c23ea 100644 --- a/zarr/v3/abc/array.py +++ b/zarr/v3/abc/array.py @@ -79,22 +79,22 @@ def info(self) -> Any: ... -class AsynchronousArray(BaseArray): +class AsyncArray(BaseArray): """This class can be implemented as a v2 or v3 array""" @classmethod @abstractmethod - async def from_json(cls, zarr_json: Any, store: ReadStore) -> AsynchronousArray: + async def from_json(cls, zarr_json: Any, store: ReadStore) -> AsyncArray: ... @classmethod @abstractmethod - async def open(cls, store: ReadStore) -> AsynchronousArray: + async def open(cls, store: ReadStore) -> AsyncArray: ... @classmethod @abstractmethod - async def create(cls, store: WriteStore, *, shape, **kwargs) -> AsynchronousArray: + async def create(cls, store: WriteStore, *, shape, **kwargs) -> AsyncArray: ... @abstractmethod @@ -106,24 +106,24 @@ async def setitem(self, selection: Selection, value: np.ndarray) -> None: ... -class SynchronousArray(BaseArray): +class SyncArray(BaseArray): """ This class can be implemented as a v2 or v3 array """ @classmethod @abstractmethod - def from_json(cls, zarr_json: Any, store: ReadStore) -> SynchronousArray: + def from_json(cls, zarr_json: Any, store: ReadStore) -> SyncArray: ... @classmethod @abstractmethod - def open(cls, store: ReadStore) -> SynchronousArray: + def open(cls, store: ReadStore) -> SyncArray: ... @classmethod @abstractmethod - def create(cls, store: WriteStore, *, shape, **kwargs) -> SynchronousArray: + def create(cls, store: WriteStore, *, shape, **kwargs) -> SyncArray: ... @abstractmethod @@ -133,8 +133,3 @@ def __getitem__(self, selection: Selection): # TODO: type as np.ndarray | scala @abstractmethod def __setitem__(self, selection: Selection, value: np.ndarray) -> None: ... - - # some day ;) - # @property - # def __array_api_version__(self) -> str: - # return "2022.12" diff --git a/zarr/v3/abc/group.py b/zarr/v3/abc/group.py index 02de819894..60cda07d0e 100644 --- a/zarr/v3/abc/group.py +++ b/zarr/v3/abc/group.py @@ -1,8 +1,10 @@ from __future__ import annotations -from abc import abstractproperty, ABC +from abc import abstractproperty, abstractmethod, ABC from collections.abc import MutableMapping -from typing import Dict, Any +from typing import Dict, Any, AsyncIterator, Union, Iterator + +from zarr.v3.abc.array import AsyncArray, SyncArray class BaseGroup(ABC): @@ -17,70 +19,188 @@ def info(self) -> Any: # TODO: type this later ... -class AsynchronousGroup(BaseGroup): - pass - # TODO: (considering the following api) - # store_path (rename to path?) - # nchildren - number of child groups + arrays - # children (async iterator) - # contains - check if child exists - # getitem - get child - # group_keys (async iterator) - # groups (async iterator) - # array_keys (async iterator) - # arrays (async iterator) - # visit - # visitkeys - # visitvalues - # tree - # create_group - # require_group - # create_groups - # require_groups - # create_dataset - # require_dataset - # create - # empty - # zeros - # ones - # full - # array - # empty_like - # zeros_like - # ones_like - # full_like - # move - - -class SynchronousGroup(BaseGroup, MutableMapping): - # TODO - think about if we want to keep the MutableMapping abstraction or - pass +class AsyncGroup(BaseGroup): + @abstractmethod + async def nchildren(self) -> int: + ... + + @abstractmethod + async def children(self) -> AsyncIterator: + ... + + @abstractmethod + async def contains(self, child: str) -> bool: + """check if child exists""" + ... + + @abstractmethod + async def getitem(self, child: str) -> Union[AsyncArray, "AsyncGroup"]: + """get child""" + ... + + @abstractmethod + async def group_keys(self) -> AsyncIterator[str]: + """iterate over child group keys""" + ... + + @abstractmethod + async def groups(self) -> AsyncIterator["AsyncGroup"]: + """iterate over child groups""" + ... + + @abstractmethod + async def array_keys(self) -> AsyncIterator[str]: + """iterate over child array keys""" + ... + + @abstractmethod + async def arrays(self) -> AsyncIterator[AsyncArray]: + """iterate over child arrays""" + ... + + @abstractmethod + async def tree(self, expand=False, level=None) -> Any: # TODO: type this later + ... + + @abstractmethod + async def create_group(self, name: str, **kwargs) -> "AsyncGroup": + ... + + @abstractmethod + async def create_array(self, name: str, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def empty(self, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def zeros(self, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def ones(self, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def full(self, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def empty_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def zeros_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def ones_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def full_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + ... + + @abstractmethod + async def move(self, source: str, dest: str) -> None: + ... + + # TODO / maybes: # store_path (rename to path?) - # __enter__ - # __exit__ - # group_keys - # groups - # array_keys - # arrays # visit # visitkeys # visitvalues - # visititems - # tree - # create_group - # require_group - # create_groups - # require_groups - # create_dataset - # require_dataset - # create - # empty - # zeros - # ones - # full - # array - # empty_like - # zeros_like - # ones_like - # full_like - # move + + +class SyncGroup(BaseGroup, MutableMapping): + @abstractproperty + def nchildren(self) -> int: + ... + + @abstractproperty + def children(self) -> Iterator: + ... + + @abstractmethod + def __contains__(self, child: str) -> bool: + """check if child exists""" + ... + + @abstractmethod + def __getitem__(self, child: str) -> Union[SyncArray, "SyncGroup"]: + """get child""" + ... + + @abstractmethod + def __setitem__(self, key: str, value: Union[SyncArray, "SyncGroup"]) -> None: + """get child""" + ... + + @abstractmethod + def group_keys(self) -> AsyncIterator[str]: + """iterate over child group keys""" + ... + + @abstractmethod + def groups(self) -> AsyncIterator["SyncGroup"]: + """iterate over child groups""" + ... + + @abstractmethod + def array_keys(self) -> AsyncIterator[str]: + """iterate over child array keys""" + ... + + @abstractmethod + def arrays(self) -> AsyncIterator[SyncArray]: + """iterate over child arrays""" + ... + + @abstractmethod + def tree(self) -> Any: + ... + + @abstractmethod + def create_group(self, name: str, **kwargs) -> "SyncGroup": + ... + + @abstractmethod + def create_array(self, name: str, **kwargs) -> SyncArray: + ... + + @abstractmethod + def empty(self, **kwargs) -> SyncArray: + ... + + @abstractmethod + def zeros(self, **kwargs) -> SyncArray: + ... + + @abstractmethod + def ones(self, **kwargs) -> SyncArray: + ... + + @abstractmethod + def full(self, **kwargs) -> SyncArray: + ... + + @abstractmethod + def empty_like(self, prototype: SyncArray, **kwargs) -> SyncArray: + ... + + @abstractmethod + def zeros_like(self, prototype: SyncArray, **kwargs) -> SyncArray: + ... + + @abstractmethod + def ones_like(self, prototype: SyncArray, **kwargs) -> SyncArray: + ... + + @abstractmethod + def full_like(self, prototype: SyncArray, **kwargs) -> SyncArray: + ... + + @abstractmethod + def move(self, source: str, dest: str) -> None: + ... diff --git a/zarr/v3/array.py b/zarr/v3/array.py index 3c0d7eba5c..2fe9b09791 100644 --- a/zarr/v3/array.py +++ b/zarr/v3/array.py @@ -1,6 +1,6 @@ # Notes on what I've changed here: # 1. Split Array into AsyncArray and Array -# 2. Inherit from abc (SynchronousArray, AsynchronousArray) +# 2. Inherit from abc (SyncArray, AsyncArray) # 3. Added .size and .attrs methods # 4. Temporarily disabled the creation of ArrayV2 # 5. Added from_json to AsyncArray @@ -17,7 +17,7 @@ import numpy as np from attr import evolve, frozen -from zarr.v3.abc.array import SynchronousArray, AsynchronousArray +from zarr.v3.abc.array import SyncArray, AsyncArray # from zarr.v3.array_v2 import ArrayV2 from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec @@ -47,7 +47,7 @@ @frozen -class AsyncArray(AsynchronousArray): +class AsyncArray(AsyncArray): metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -414,7 +414,7 @@ async def info(self): @frozen -class Array(SynchronousArray): +class Array(SyncArray): _async_array: AsyncArray @classmethod diff --git a/zarr/v3/group.py b/zarr/v3/group.py index aa43c706a5..d4a423a3c6 100644 --- a/zarr/v3/group.py +++ b/zarr/v3/group.py @@ -1,25 +1,35 @@ from __future__ import annotations +import asyncio import json -from typing import Any, Dict, Literal, Optional, Union +from typing import Any, Dict, Literal, Optional, Union, AsyncIterator, Iterator, List -from attr import asdict, evolve, field, frozen +from attr import asdict, evolve, field, frozen, validators -from zarr.v3.array import Array -from zarr.v3.common import ZARR_JSON, make_cattr -from zarr.v3.metadata import RuntimeConfiguration +from zarr.v3.abc import AsyncGroup, SyncGroup +from zarr.v3.array import AsyncArray, Array +from zarr.v3.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr +from zarr.v3.config import RuntimeConfiguration from zarr.v3.store import StoreLike, StorePath, make_store_path -from zarr.v3.sync import sync +from zarr.v3.sync import SyncMixin @frozen class GroupMetadata: attributes: Dict[str, Any] = field(factory=dict) - zarr_format: Literal[3] = 3 - node_type: Literal["group"] = "group" + zarr_format: Literal[2, 3] = field(default=3, validator=validators.in_([2, 3])) + node_type: Literal["group"] = field(default="group", init=False) - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode() + def to_bytes(self) -> Dict[str, bytes]: + if self.zarr_format == 3: + return {ZARR_JSON: json.dumps(asdict(self)).encode()} + elif self.zarr_format == 2: + return { + ZGROUP_JSON: self.zarr_format, + ZATTRS_JSON: json.dumps(self.attributes).encode(), + } + else: + raise ValueError(f"unexpected zarr_format: {self.zarr_format}") @classmethod def from_json(cls, zarr_json: Any) -> GroupMetadata: @@ -27,25 +37,29 @@ def from_json(cls, zarr_json: Any) -> GroupMetadata: @frozen -class Group: +class AsyncGroup(AsyncGroup): metadata: GroupMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @classmethod - async def create_async( + async def create( cls, store: StoreLike, *, attributes: Optional[Dict[str, Any]] = None, exists_ok: bool = False, + zarr_format: Literal[2, 3] = field(default=3, validator=validators.in_([2, 3])), runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: + ) -> "AsyncGroup": store_path = make_store_path(store) if not exists_ok: - assert not await (store_path / ZARR_JSON).exists_async() + if zarr_format == 3: + assert not await (store_path / ZARR_JSON).exists_async() + elif zarr_format == 2: + assert not await (store_path / ZGROUP_JSON).exists_async() group = cls( - metadata=GroupMetadata(attributes=attributes or {}), + metadata=GroupMetadata(attributes=attributes or {}, zarr_format=zarr_format), store_path=store_path, runtime_configuration=runtime_configuration, ) @@ -53,45 +67,39 @@ async def create_async( return group @classmethod - def create( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: - return sync( - cls.create_async( - store, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, - ), - runtime_configuration.asyncio_loop, - ) - - @classmethod - async def open_async( + async def open( cls, store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: + zarr_format: Literal[2, 3] = 3, + ) -> "AsyncGroup": store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() - assert zarr_json_bytes is not None - return cls.from_json(store_path, json.loads(zarr_json_bytes), runtime_configuration) - @classmethod - def open( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: - return sync( - cls.open_async(store, runtime_configuration), - runtime_configuration.asyncio_loop, - ) + # TODO: consider trying to autodiscover the zarr-format here + if zarr_format == 3: + # V3 groups are comprised of a zarr.json object + # (it is optional in the case of implicit groups) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + zarr_json = ( + json.loads(zarr_json_bytes) if zarr_json_bytes is not None else {"zarr_format": 3} + ) + + elif zarr_format == 2: + # V2 groups are comprised of a .zgroup and .zattrs objects + # (both are optional in the case of implicit groups) + zgroup_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZGROUP_JSON).get_async(), (store_path / ZATTRS_JSON).get_async() + ) + zgroup = ( + json.loads(json.loads(zgroup_bytes)) + if zgroup_bytes is not None + else {"zarr_format": 2} + ) + zattrs = json.loads(json.loads(zattrs_bytes)) if zattrs_bytes is not None else {} + zarr_json = {**zgroup, "attributes": zattrs} + else: + raise ValueError(f"unexpected zarr_format: {zarr_format}") + return cls.from_json(store_path, zarr_json, runtime_configuration) @classmethod def from_json( @@ -107,73 +115,275 @@ def from_json( ) return group - @classmethod - async def open_or_array( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + async def getitem( + self, + key: str, ) -> Union[Array, Group]: - store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() - if zarr_json_bytes is None: - raise KeyError - zarr_json = json.loads(zarr_json_bytes) - if zarr_json["node_type"] == "group": - return cls.from_json(store_path, zarr_json, runtime_configuration) - if zarr_json["node_type"] == "array": - return Array.from_json( - store_path, zarr_json, runtime_configuration=runtime_configuration + + store_path = self.store_path / key + + if self.zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + if zarr_json_bytes is None: + # implicit group? + zarr_json = { + "zarr_format": self.zarr_format, + "node_type": "group", + "attributes": {}, + } + else: + zarr_json = json.loads(zarr_json_bytes) + if zarr_json["node_type"] == "group": + return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + if zarr_json["node_type"] == "array": + return Array.from_json( + store_path, zarr_json, runtime_configuration=self.runtime_configuration + ) + elif self.zarr_format == 2: + # Q: how do we like optimistically fetching .zgroup, .zarray, and .zattrs? + # This guarantees that we will always make at least one extra request to the store + zgroup_bytes, zarray_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZGROUP_JSON).get_async(), + (store_path / ZARRAY_JSON).get_async(), + (store_path / ZATTRS_JSON).get_async(), ) - raise KeyError + + # unpack the zarray, if this is None then we must be opening a group + zarray = json.loads(zarray_bytes) if zarray_bytes else None + # unpack the zattrs, this can be None if no attrs were written + zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + + if zarray is not None: + # TODO: update this once the V2 array support is part of the primary array class + zarr_json = {**zarray, "attributes": zattrs} + return Array.from_json( + store_path, zarray, runtime_configuration=self.runtime_configuration + ) + else: + zgroup = ( + json.loads(zgroup_bytes) + if zgroup_bytes is not None + else {"zarr_format": self.zarr_format} + ) + zarr_json = {**zgroup, "attributes": zattrs} + return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + else: + raise ValueError(f"unexpected zarr_format: {self.zarr_format}") async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + to_save = self.metadata.to_bytes() + awaitables = [(self.store_path / key).set_async(value) for key, value in to_save.items()] + await asyncio.gather(*awaitables) - async def get_async(self, path: str) -> Union[Array, Group]: - return await self.__class__.open_or_array( - self.store_path / path, self.runtime_configuration - ) + @property + def attrs(self): + return self.metadata.attributes - def __getitem__(self, path: str) -> Union[Array, Group]: - return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) + @property + def info(self): + return self.metadata.info - async def create_group_async(self, path: str, **kwargs) -> Group: + async def create_group(self, path: str, **kwargs) -> Group: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await self.__class__.create_async( + return await type(self).create( self.store_path / path, runtime_configuration=runtime_configuration, **kwargs, ) - def create_group(self, path: str, **kwargs) -> Group: - return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) - - async def create_array_async(self, path: str, **kwargs) -> Array: + async def create_array(self, path: str, **kwargs) -> Array: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await Array.create_async( + return await AsyncArray.create( self.store_path / path, runtime_configuration=runtime_configuration, **kwargs, ) - def create_array(self, path: str, **kwargs) -> Array: - return sync( - self.create_array_async(path, **kwargs), - self.runtime_configuration.asyncio_loop, - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Group: + async def update_attributes(self, new_attributes: Dict[str, Any]): new_metadata = evolve(self.metadata, attributes=new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + to_save = new_metadata.to_bytes() + if new_metadata.zarr_format == 2: + # only save the .zattrs object + await (self.store_path / ZATTRS_JSON).set_async(to_save[ZATTRS_JSON]) + else: + await (self.store_path / ZARR_JSON).set_async(to_save[ZARR_JSON]) return evolve(self, metadata=new_metadata) - def update_attributes(self, new_attributes: Dict[str, Any]) -> Group: - return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, - ) - def __repr__(self): return f"" + + async def nchildren(self) -> int: + raise NotImplementedError + + async def children(self) -> AsyncIterator[AsyncArray, "AsyncGroup"]: + raise NotImplementedError + + async def contains(self, child: str) -> bool: + raise NotImplementedError + + async def group_keys(self) -> AsyncIterator[str]: + raise NotImplementedError + + async def groups(self) -> AsyncIterator["AsyncGroup"]: + raise NotImplementedError + + async def array_keys(self) -> AsyncIterator[str]: + raise NotImplementedError + + async def arrays(self) -> AsyncIterator[AsyncArray]: + raise NotImplementedError + + async def tree(self, expand=False, level=None) -> Any: + raise NotImplementedError + + async def empty(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def zeros(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def ones(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def full(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def empty_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def zeros_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def ones_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def full_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def move(self, source: str, dest: str) -> None: + raise NotImplementedError + + +@frozen +class Group(SyncGroup, SyncMixin): + _async_group: AsyncGroup + + @classmethod + def create( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + return cls._sync( + AsyncGroup.create, + store, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + obj = cls._sync(AsyncGroup.open, store, runtime_configuration) + return cls(obj) + + def __getitem__(self, path: str) -> Union[Array, Group]: + obj = self._sync(self._async_group.getitem, path) + if isinstance(obj, AsyncArray): + return Array(obj) + else: + return Group(obj) + + def __delitem__(self, key): + raise NotImplementedError + + def __iter__(self): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def __setitem__(self, key, value): + raise NotImplementedError + + @property + def attrs(self): + return self._async_group.attrs + + @property + def info(self): + return self._async_group.info + + def update_attributes(self, new_attributes: Dict[str, Any]): + self._sync(self._async_group.update_attributes, new_attributes) + return self + + @property + def nchildren(self) -> int: + return self._sync(self._async_group.nchildren) + + @property + def children(self) -> List[Array, "Group"]: + _children = self._sync_iter(self._async_group.children) + return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children] + + def __contains__(self, child) -> bool: + return self._sync(self._async_group.contains, child) + + def group_keys(self) -> Iterator[str]: + return self._sync_iter(self._async_group.group_keys) + + def groups(self) -> List["Group"]: + # TODO: in v2 this was a generator that return key: Group + return [Group(obj) for obj in self._sync_iter(self._async_group.groups)] + + def array_keys(self) -> List[str]: + return self._sync_iter(self._async_group.array_keys) + + def arrays(self) -> List[Array]: + return [Array(obj) for obj in self._sync_iter(self._async_group.arrays)] + + def tree(self, expand=False, level=None) -> Any: + return self._sync(self._async_group.tree, expand=expand, level=level) + + def create_group(self, name: str, **kwargs) -> "Group": + return Group(self._sync(self._async_group.create_group, name, **kwargs)) + + def create_array(self, name: str, **kwargs) -> Array: + return Array(self._sync(self._async_group.create_array, name, **kwargs)) + + def empty(self, **kwargs) -> "Array": + return Array(self._sync(self._async_group.empty, **kwargs)) + + def zeros(self, **kwargs) -> "Array": + return Array(self._sync(self._async_group.zeros, **kwargs)) + + def ones(self, **kwargs) -> "Array": + return Array(self._sync(self._async_group.ones, **kwargs)) + + def full(self, **kwargs) -> "Array": + return Array(self._sync(self._async_group.full, **kwargs)) + + def empty_like(self, prototype: AsyncArray, **kwargs) -> "Array": + return Array(self._sync(self._async_group.empty_like, prototype, **kwargs)) + + def zeros_like(self, prototype: AsyncArray, **kwargs) -> "Array": + return Array(self._sync(self._async_group.zeros_like, prototype, **kwargs)) + + def ones_like(self, prototype: AsyncArray, **kwargs) -> "Array": + return Array(self._sync(self._async_group.ones_like, prototype, **kwargs)) + + def full_like(self, prototype: AsyncArray, **kwargs) -> "Array": + return Array(self._sync(self._async_group.full_like, prototype, **kwargs)) + + def move(self, source: str, dest: str) -> None: + return self._sync(self._async_group.move, source, dest) diff --git a/zarr/v3/group_v2.py b/zarr/v3/group_v2.py deleted file mode 100644 index 3b1a369ae2..0000000000 --- a/zarr/v3/group_v2.py +++ /dev/null @@ -1,218 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union - -from attr import asdict, evolve, frozen - -from zarr.v3.array_v2 import ArrayV2 -from zarr.v3.common import ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr -from zarr.v3.metadata import RuntimeConfiguration -from zarr.v3.store import StoreLike, StorePath, make_store_path -from zarr.v3.sync import sync - -if TYPE_CHECKING: - from zarr.v3.group import Group - - -@frozen -class GroupV2Metadata: - zarr_format: Literal[2] = 2 - - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode() - - @classmethod - def from_json(cls, zarr_json: Any) -> GroupV2Metadata: - return make_cattr().structure(zarr_json, cls) - - -@frozen -class GroupV2: - metadata: GroupV2Metadata - store_path: StorePath - runtime_configuration: RuntimeConfiguration - attributes: Optional[Dict[str, Any]] = None - - @classmethod - async def create_async( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - store_path = make_store_path(store) - if not exists_ok: - assert not await (store_path / ZGROUP_JSON).exists_async() - group = cls( - metadata=GroupV2Metadata(), - attributes=attributes, - store_path=store_path, - runtime_configuration=runtime_configuration, - ) - await group._save_metadata() - return group - - @classmethod - def create( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - return sync( - cls.create_async( - store, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, - ), - runtime_configuration.asyncio_loop if runtime_configuration else None, - ) - - @classmethod - async def open_async( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - store_path = make_store_path(store) - zgroup_bytes = await (store_path / ZGROUP_JSON).get_async() - assert zgroup_bytes is not None - zattrs_bytes = await (store_path / ZATTRS_JSON).get_async() - metadata = json.loads(zgroup_bytes) - attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None - - return cls.from_json( - store_path, - metadata, - runtime_configuration, - attributes, - ) - - @classmethod - def open( - cls, - store_path: StorePath, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - return sync( - cls.open_async(store_path, runtime_configuration), - runtime_configuration.asyncio_loop, - ) - - @classmethod - def from_json( - cls, - store_path: StorePath, - zarr_json: Any, - runtime_configuration: RuntimeConfiguration, - attributes: Optional[Dict[str, Any]] = None, - ) -> GroupV2: - group = cls( - metadata=GroupV2Metadata.from_json(zarr_json), - store_path=store_path, - runtime_configuration=runtime_configuration, - attributes=attributes, - ) - return group - - @staticmethod - async def open_or_array( - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Union[ArrayV2, GroupV2]: - store_path = make_store_path(store) - zgroup_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZGROUP_JSON).get_async(), - (store_path / ZATTRS_JSON).get_async(), - ) - attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None - if zgroup_bytes is not None: - return GroupV2.from_json( - store_path, json.loads(zgroup_bytes), runtime_configuration, attributes - ) - zarray_bytes = await (store_path / ZARRAY_JSON).get_async() - if zarray_bytes is not None: - return ArrayV2.from_json( - store_path, json.loads(zarray_bytes), attributes, runtime_configuration - ) - raise KeyError - - async def _save_metadata(self) -> None: - await (self.store_path / ZGROUP_JSON).set_async(self.metadata.to_bytes()) - if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set_async( - json.dumps(self.attributes).encode(), - ) - else: - await (self.store_path / ZATTRS_JSON).delete_async() - - async def get_async(self, path: str) -> Union[ArrayV2, GroupV2]: - return await self.__class__.open_or_array( - self.store_path / path, self.runtime_configuration - ) - - def __getitem__(self, path: str) -> Union[ArrayV2, GroupV2]: - return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) - - async def create_group_async(self, path: str, **kwargs) -> GroupV2: - runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await self.__class__.create_async( - self.store_path / path, - runtime_configuration=runtime_configuration, - **kwargs, - ) - - def create_group(self, path: str, **kwargs) -> GroupV2: - return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) - - async def create_array_async(self, path: str, **kwargs) -> ArrayV2: - runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await ArrayV2.create_async( - self.store_path / path, - runtime_configuration=runtime_configuration, - **kwargs, - ) - - def create_array(self, path: str, **kwargs) -> ArrayV2: - return sync( - self.create_array_async(path, **kwargs), - self.runtime_configuration.asyncio_loop, - ) - - async def convert_to_v3_async(self) -> Group: - from zarr.v3.common import ZARR_JSON - from zarr.v3.group import Group, GroupMetadata - - new_metadata = GroupMetadata(attributes=self.attributes or {}) - new_metadata_bytes = new_metadata.to_bytes() - - await (self.store_path / ZARR_JSON).set_async(new_metadata_bytes) - - return Group.from_json( - store_path=self.store_path, - zarr_json=json.loads(new_metadata_bytes), - runtime_configuration=self.runtime_configuration, - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> GroupV2: - await (self.store_path / ZATTRS_JSON).set_async(json.dumps(new_attributes).encode()) - return evolve(self, attributes=new_attributes) - - def update_attributes(self, new_attributes: Dict[str, Any]) -> GroupV2: - return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, - ) - - def convert_to_v3(self) -> Group: - return sync(self.convert_to_v3_async(), loop=self.runtime_configuration.asyncio_loop) - - def __repr__(self): - return f"" diff --git a/zarr/v3/sync.py b/zarr/v3/sync.py index ef3a6e08c0..f2532c87b2 100644 --- a/zarr/v3/sync.py +++ b/zarr/v3/sync.py @@ -1,9 +1,12 @@ from __future__ import annotations import asyncio +import functools import threading from typing import Any, Coroutine, List, Optional +from zarr.v3.config import SyncConfiguration + # From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread @@ -85,3 +88,22 @@ def _get_loop(): th.start() iothread[0] = th return loop[0] + + +class SyncMixin: + + _sync_configuration: SyncConfiguration + + def _sync(self, method, *args, **kwargs): # TODO: type this + @functools.wraps(method) + def wrap(*args, **kwargs): + return sync(method, *args, loop=self._sync_configuration.asyncio_loop, **kwargs) + + return wrap(*args, **kwargs) + + def _sync_iter(self, func, *args, **kwargs) -> List[Any]: # TODO: type this + async def iter_to_list() -> List[Any]: + # TODO: replace with generators so we don't materialize the entire iterator at once + return [item async for item in func(*args, **kwargs)] + + return self._sync(iter_to_list)