Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into api/private-core
  • Loading branch information
jhamman committed Jan 8, 2025
2 parents 7215121 + 0c1aad5 commit e49f0ae
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 42 deletions.
4 changes: 2 additions & 2 deletions docs/user-guide/arrays.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ prints additional diagnostics, e.g.::
Serializer : BytesCodec(endian=<Endian.little: 'little'>)
Compressors : (BloscCodec(typesize=4, cname=<BloscCname.zstd: 'zstd'>, clevel=3, shuffle=<BloscShuffle.bitshuffle: 'bitshuffle'>, blocksize=0),)
No. bytes : 400000000 (381.5M)
No. bytes stored : 9696302
No. bytes stored : 9696520
Storage ratio : 41.3
Chunks Initialized : 100

Expand Down Expand Up @@ -611,7 +611,7 @@ Sharded arrays can be created by providing the ``shards`` parameter to :func:`za
Serializer : BytesCodec(endian=<Endian.little: 'little'>)
Compressors : (ZstdCodec(level=0, checksum=False),)
No. bytes : 100000000 (95.4M)
No. bytes stored : 3981060
No. bytes stored : 3981552
Storage ratio : 25.1
Shards Initialized : 100

Expand Down
4 changes: 2 additions & 2 deletions docs/user-guide/groups.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ property. E.g.::
Serializer : BytesCodec(endian=<Endian.little: 'little'>)
Compressors : (ZstdCodec(level=0, checksum=False),)
No. bytes : 8000000 (7.6M)
No. bytes stored : 1432
Storage ratio : 5586.6
No. bytes stored : 1614
Storage ratio : 4956.6
Chunks Initialized : 0
>>> baz.info
Type : Array
Expand Down
4 changes: 2 additions & 2 deletions docs/user-guide/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ ratios, depending on the correlation structure within the data. E.g.::
Serializer : BytesCodec(endian=<Endian.little: 'little'>)
Compressors : (ZstdCodec(level=0, checksum=False),)
No. bytes : 400000000 (381.5M)
No. bytes stored : 342588717
No. bytes stored : 342588911
Storage ratio : 1.2
Chunks Initialized : 100
>>> with zarr.config.set({'array.order': 'F'}):
Expand All @@ -150,7 +150,7 @@ ratios, depending on the correlation structure within the data. E.g.::
Serializer : BytesCodec(endian=<Endian.little: 'little'>)
Compressors : (ZstdCodec(level=0, checksum=False),)
No. bytes : 400000000 (381.5M)
No. bytes stored : 342588717
No. bytes stored : 342588911
Storage ratio : 1.2
Chunks Initialized : 100

Expand Down
2 changes: 2 additions & 0 deletions docs/user-guide/v3_migration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,5 @@ of Zarr-Python, please open (or comment on) a
* Object dtypes (:issue:`2617`)
* Ragged arrays (:issue:`2618`)
* Groups and Arrays do not implement ``__enter__`` and ``__exit__`` protocols (:issue:`2619`)
* Big Endian dtypes (:issue:`2324`)
* Default filters for object dtypes for Zarr format 2 arrays (:issue:`2627`)
6 changes: 2 additions & 4 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@
from zarr.core.metadata import ArrayMetadataDict, ArrayV2Metadata, ArrayV3Metadata
from zarr.core.metadata.v2 import _default_compressor, _default_filters
from zarr.errors import NodeTypeValidationError
from zarr.storage import (
StoreLike,
make_store_path,
)
from zarr.storage._common import make_store_path

if TYPE_CHECKING:
from collections.abc import Iterable

from zarr.abc.codec import Codec
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
from zarr.storage import StoreLike

# TODO: this type could use some more thought
ArrayLike = AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | Array | npt.NDArray[Any]
Expand Down
5 changes: 3 additions & 2 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
_parse_bytes_bytes_codec,
get_pipeline_class,
)
from zarr.storage import StoreLike, make_store_path
from zarr.storage._common import StorePath, ensure_no_existing_node
from zarr.storage import StoreLike
from zarr.storage._common import StorePath, ensure_no_existing_node, make_store_path

if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
Expand All @@ -122,6 +122,7 @@
from zarr.abc.codec import CodecPipeline
from zarr.codecs.sharding import ShardingCodecIndexLocation
from zarr.core.group import AsyncGroup
from zarr.storage import StoreLike


# Array and AsyncArray are defined in the base ``zarr`` namespace
Expand Down
4 changes: 2 additions & 2 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
from zarr.core.metadata.v3 import V3JsonEncoder
from zarr.core.sync import SyncMixin, sync
from zarr.errors import MetadataValidationError
from zarr.storage import StoreLike, StorePath, make_store_path
from zarr.storage._common import ensure_no_existing_node
from zarr.storage import StoreLike, StorePath
from zarr.storage._common import ensure_no_existing_node, make_store_path

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Generator, Iterable, Iterator
Expand Down
28 changes: 25 additions & 3 deletions src/zarr/core/metadata/v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from zarr.core.buffer.core import default_buffer_prototype

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Self

from zarr.core.buffer import Buffer, BufferPrototype
Expand Down Expand Up @@ -143,9 +144,30 @@ def parse_storage_transformers(data: object) -> tuple[dict[str, JSON], ...]:


class V3JsonEncoder(json.JSONEncoder):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.indent = kwargs.pop("indent", config.get("json_indent"))
super().__init__(*args, **kwargs)
def __init__(
self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
sort_keys: bool = False,
indent: int | None = None,
separators: tuple[str, str] | None = None,
default: Callable[[object], object] | None = None,
) -> None:
if indent is None:
indent = config.get("json_indent")
super().__init__(
skipkeys=skipkeys,
ensure_ascii=ensure_ascii,
check_circular=check_circular,
allow_nan=allow_nan,
sort_keys=sort_keys,
indent=indent,
separators=separators,
default=default,
)

def default(self, o: object) -> Any:
if isinstance(o, np.dtype):
Expand Down
8 changes: 5 additions & 3 deletions src/zarr/core/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ def _get_executor() -> ThreadPoolExecutor:
global _executor
if not _executor:
max_workers = config.get("threading.max_workers", None)
print(max_workers)
# if max_workers is not None and max_workers > 0:
# raise ValueError(max_workers)
logger.debug("Creating Zarr ThreadPoolExecutor with max_workers=%s", max_workers)
_executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="zarr_pool")
_get_loop().set_default_executor(_executor)
return _executor
Expand Down Expand Up @@ -118,6 +116,9 @@ def sync(
# NB: if the loop is not running *yet*, it is OK to submit work
# and we will wait for it
loop = _get_loop()
if _executor is None and config.get("threading.max_workers", None) is not None:
# trigger executor creation and attach to loop
_ = _get_executor()
if not isinstance(loop, asyncio.AbstractEventLoop):
raise TypeError(f"loop cannot be of type {type(loop)}")
if loop.is_closed():
Expand Down Expand Up @@ -153,6 +154,7 @@ def _get_loop() -> asyncio.AbstractEventLoop:
# repeat the check just in case the loop got filled between the
# previous two calls from another thread
if loop[0] is None:
logger.debug("Creating Zarr event loop")
new_loop = asyncio.new_event_loop()
loop[0] = new_loop
iothread[0] = threading.Thread(target=new_loop.run_forever, name="zarr_io")
Expand Down
3 changes: 1 addition & 2 deletions src/zarr/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from types import ModuleType
from typing import Any

from zarr.storage._common import StoreLike, StorePath, make_store_path
from zarr.storage._common import StoreLike, StorePath
from zarr.storage._fsspec import FsspecStore
from zarr.storage._local import LocalStore
from zarr.storage._logging import LoggingStore
Expand All @@ -21,7 +21,6 @@
"StorePath",
"WrapperStore",
"ZipStore",
"make_store_path",
]


Expand Down
25 changes: 12 additions & 13 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,27 +399,27 @@ async def test_chunks_initialized() -> None:
def test_nbytes_stored() -> None:
arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4", codecs=[BytesCodec()])
result = arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
assert result == 502 # the size of the metadata document. This is a fragile test.
arr[:50] = 1
result = arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
assert result == 702 # the size with 5 chunks filled.
arr[50:] = 2
result = arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.
assert result == 902 # the size with all chunks filled.


async def test_nbytes_stored_async() -> None:
arr = await zarr.api.asynchronous.create(
shape=(100,), chunks=(10,), dtype="i4", codecs=[BytesCodec()]
)
result = await arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
assert result == 502 # the size of the metadata document. This is a fragile test.
await arr.setitem(slice(50), 1)
result = await arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
assert result == 702 # the size with 5 chunks filled.
await arr.setitem(slice(50, 100), 2)
result = await arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.
assert result == 902 # the size with all chunks filled.


def test_default_fill_values() -> None:
Expand Down Expand Up @@ -537,19 +537,19 @@ def test_info_complete(self, chunks: tuple[int, int], shards: tuple[int, int] |
_serializer=BytesCodec(),
_count_bytes=512,
_count_chunks_initialized=0,
_count_bytes_stored=373 if shards is None else 578, # the metadata?
_count_bytes_stored=521 if shards is None else 982, # the metadata?
)
assert result == expected

arr[:4, :4] = 10
result = arr.info_complete()
if shards is None:
expected = dataclasses.replace(
expected, _count_chunks_initialized=4, _count_bytes_stored=501
expected, _count_chunks_initialized=4, _count_bytes_stored=649
)
else:
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=774
expected, _count_chunks_initialized=1, _count_bytes_stored=1178
)
assert result == expected

Expand Down Expand Up @@ -624,21 +624,20 @@ async def test_info_complete_async(
_serializer=BytesCodec(),
_count_bytes=512,
_count_chunks_initialized=0,
_count_bytes_stored=373 if shards is None else 578, # the metadata?
_count_bytes_stored=521 if shards is None else 982, # the metadata?
)
assert result == expected

await arr.setitem((slice(4), slice(4)), 10)
result = await arr.info_complete()
if shards is None:
expected = dataclasses.replace(
expected, _count_chunks_initialized=4, _count_bytes_stored=501
expected, _count_chunks_initialized=4, _count_bytes_stored=553
)
else:
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=774
expected, _count_chunks_initialized=1, _count_bytes_stored=1178
)
assert result == expected


@pytest.mark.parametrize("store", ["memory"], indirect=True)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from zarr.core.group import ConsolidatedMetadata, GroupMetadata
from zarr.core.sync import sync
from zarr.errors import ContainsArrayError, ContainsGroupError
from zarr.storage import LocalStore, MemoryStore, StorePath, ZipStore, make_store_path
from zarr.storage import LocalStore, MemoryStore, StorePath, ZipStore
from zarr.storage._common import make_store_path
from zarr.testing.store import LatencyStore

from .conftest import parse_store
Expand Down
11 changes: 10 additions & 1 deletion tests/test_metadata/test_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from zarr.codecs.bytes import BytesCodec
from zarr.core.buffer import default_buffer_prototype
from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding
from zarr.core.group import parse_node_type
from zarr.core.config import config
from zarr.core.group import GroupMetadata, parse_node_type
from zarr.core.metadata.v3 import (
ArrayV3Metadata,
DataType,
Expand Down Expand Up @@ -304,6 +305,14 @@ def test_metadata_to_dict(
assert observed == expected


@pytest.mark.parametrize("indent", [2, 4, None])
def test_json_indent(indent: int):
with config.set({"json_indent": indent}):
m = GroupMetadata()
d = m.to_buffer_dict(default_buffer_prototype())["zarr.json"].to_bytes()
assert d == json.dumps(json.loads(d), indent=indent).encode()


# @pytest.mark.parametrize("fill_value", [-1, 0, 1, 2932897])
# @pytest.mark.parametrize("precision", ["ns", "D"])
# async def test_datetime_metadata(fill_value: int, precision: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion tests/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from _pytest.compat import LEGACY_PATH

from zarr.core.common import AccessModeLiteral
from zarr.storage import FsspecStore, LocalStore, MemoryStore, StoreLike, StorePath, make_store_path
from zarr.storage import FsspecStore, LocalStore, MemoryStore, StoreLike, StorePath
from zarr.storage._common import make_store_path
from zarr.storage._utils import normalize_path


Expand Down
18 changes: 14 additions & 4 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
_get_lock,
_get_loop,
cleanup_resources,
loop,
sync,
)
from zarr.storage import MemoryStore
Expand Down Expand Up @@ -148,11 +149,20 @@ def test_open_positional_args_deprecate():


@pytest.mark.parametrize("workers", [None, 1, 2])
def test_get_executor(clean_state, workers) -> None:
def test_threadpool_executor(clean_state, workers: int | None) -> None:
with zarr.config.set({"threading.max_workers": workers}):
e = _get_executor()
if workers is not None and workers != 0:
assert e._max_workers == workers
_ = zarr.zeros(shape=(1,)) # trigger executor creation
assert loop != [None] # confirm loop was created
if workers is None:
# confirm no executor was created if no workers were specified
# (this is the default behavior)
assert loop[0]._default_executor is None
else:
# confirm executor was created and attached to loop as the default executor
# note: python doesn't have a direct way to get the default executor so we
# use the private attribute
assert _get_executor() is loop[0]._default_executor
assert _get_executor()._max_workers == workers


def test_cleanup_resources_idempotent() -> None:
Expand Down

0 comments on commit e49f0ae

Please sign in to comment.