Skip to content

Commit

Permalink
Merge branch 'main' into ci/expand-ci-matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
jhamman authored Oct 24, 2024
2 parents f4f74fc + f4af51c commit 1e30a28
Show file tree
Hide file tree
Showing 14 changed files with 485 additions and 109 deletions.
8 changes: 6 additions & 2 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,16 @@ async def save_array(

mode = kwargs.pop("mode", None)
store_path = await make_store_path(store, path=path, mode=mode, storage_options=storage_options)
if np.isscalar(arr):
arr = np.array(arr)
shape = arr.shape
chunks = getattr(arr, "chunks", None) # for array-likes with chunks attribute
new = await AsyncArray.create(
store_path,
zarr_format=zarr_format,
shape=arr.shape,
shape=shape,
dtype=arr.dtype,
chunks=arr.shape,
chunks=chunks,
**kwargs,
)
await new.setitem(slice(None), arr)
Expand Down
115 changes: 48 additions & 67 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
from typing import TYPE_CHECKING

import numcodecs
from numcodecs.compat import ensure_bytes, ensure_ndarray
from numcodecs.compat import ensure_ndarray_like

from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype
from zarr.abc.codec import ArrayBytesCodec
from zarr.registry import get_ndbuffer_class

if TYPE_CHECKING:
import numcodecs.abc

from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDBuffer


@dataclass(frozen=True)
class V2Compressor(ArrayBytesCodec):
class V2Codec(ArrayBytesCodec):
filters: tuple[numcodecs.abc.Codec, ...] | None
compressor: numcodecs.abc.Codec | None

is_fixed_size = False
Expand All @@ -28,81 +29,61 @@ async def _decode_single(
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
if self.compressor is not None:
chunk_numpy_array = ensure_ndarray(
await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like())
)
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
else:
chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like())
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
# special case object dtype, because incorrect handling can lead to
# segfaults and other bad things happening
if chunk_spec.dtype != object:
chunk = chunk.view(chunk_spec.dtype)
elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")

# ensure correct dtype
if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject:
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)
# ensure correct chunk shape
chunk = chunk.reshape(-1, order="A")
chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order)

return get_ndbuffer_class().from_numpy_array(chunk_numpy_array)
return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> Buffer | None:
chunk_numpy_array = chunk_array.as_numpy_array()
if self.compressor is not None:
if (
not chunk_numpy_array.flags.c_contiguous
and not chunk_numpy_array.flags.f_contiguous
):
chunk_numpy_array = chunk_numpy_array.copy(order="A")
encoded_chunk_bytes = ensure_bytes(
await asyncio.to_thread(self.compressor.encode, chunk_numpy_array)
)
else:
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array)

return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError


@dataclass(frozen=True)
class V2Filters(ArrayArrayCodec):
filters: tuple[numcodecs.abc.Codec, ...] | None

is_fixed_size = False

async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
chunk_ndarray = chunk_array.as_ndarray_like()
# apply filters in reverse order
if self.filters is not None:
for filter in self.filters[::-1]:
chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray)

# ensure correct chunk shape
if chunk_ndarray.shape != chunk_spec.shape:
chunk_ndarray = chunk_ndarray.reshape(
chunk_spec.shape,
order=chunk_spec.order,
)
) -> Buffer | None:
chunk = chunk_array.as_ndarray_like()

return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer | None:
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

if self.filters is not None:
for filter in self.filters:
chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray)
# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
else:
cdata = chunk

return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(cdata)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
137 changes: 115 additions & 22 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
from asyncio import gather
from dataclasses import dataclass, field, replace
from dataclasses import dataclass, field
from itertools import starmap
from logging import getLogger
from typing import TYPE_CHECKING, Any, Generic, Literal, cast, overload
Expand All @@ -13,7 +13,7 @@
from zarr._compat import _deprecate_positional_args
from zarr.abc.store import Store, set_or_delete
from zarr.codecs import _get_default_array_bytes_codec
from zarr.codecs._v2 import V2Compressor, V2Filters
from zarr.codecs._v2 import V2Codec
from zarr.core.attributes import Attributes
from zarr.core.buffer import (
BufferPrototype,
Expand Down Expand Up @@ -118,9 +118,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline:
if isinstance(metadata, ArrayV3Metadata):
return get_pipeline_class().from_codecs(metadata.codecs)
elif isinstance(metadata, ArrayV2Metadata):
return get_pipeline_class().from_codecs(
[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
)
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return get_pipeline_class().from_codecs([v2_codec])
else:
raise TypeError

Expand Down Expand Up @@ -1104,15 +1103,15 @@ async def setitem(
)
return await self._set_selection(indexer, value, prototype=prototype)

async def resize(self, new_shape: ChunkCoords, delete_outside_chunks: bool = True) -> Self:
async def resize(self, new_shape: ShapeLike, delete_outside_chunks: bool = True) -> None:
new_shape = parse_shapelike(new_shape)
assert len(new_shape) == len(self.metadata.shape)
new_metadata = self.metadata.update_shape(new_shape)

# Remove all chunks outside of the new shape
old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape))
new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape))

if delete_outside_chunks:
# Remove all chunks outside of the new shape
old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape))
new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape))

async def _delete_key(key: str) -> None:
await (self.store_path / key).delete()
Expand All @@ -1128,7 +1127,63 @@ async def _delete_key(key: str) -> None:

# Write new metadata
await self._save_metadata(new_metadata)
return replace(self, metadata=new_metadata)

# Update metadata (in place)
object.__setattr__(self, "metadata", new_metadata)

async def append(self, data: npt.ArrayLike, axis: int = 0) -> ChunkCoords:
"""Append `data` to `axis`.
Parameters
----------
data : array-like
Data to be appended.
axis : int
Axis along which to append.
Returns
-------
new_shape : tuple
Notes
-----
The size of all dimensions other than `axis` must match between this
array and `data`.
"""
# ensure data is array-like
if not hasattr(data, "shape"):
data = np.asanyarray(data)

self_shape_preserved = tuple(s for i, s in enumerate(self.shape) if i != axis)
data_shape_preserved = tuple(s for i, s in enumerate(data.shape) if i != axis)
if self_shape_preserved != data_shape_preserved:
raise ValueError(
f"shape of data to append is not compatible with the array. "
f"The shape of the data is ({data_shape_preserved})"
f"and the shape of the array is ({self_shape_preserved})."
"All dimensions must match except for the dimension being "
"appended."
)
# remember old shape
old_shape = self.shape

# determine new shape
new_shape = tuple(
self.shape[i] if i != axis else self.shape[i] + data.shape[i]
for i in range(len(self.shape))
)

# resize
await self.resize(new_shape)

# store data
append_selection = tuple(
slice(None) if i != axis else slice(old_shape[i], new_shape[i])
for i in range(len(self.shape))
)
await self.setitem(append_selection, data)

return new_shape

async def update_attributes(self, new_attributes: dict[str, JSON]) -> Self:
# metadata.attributes is "frozen" so we simply clear and update the dict
Expand All @@ -1147,7 +1202,8 @@ async def info(self) -> None:
raise NotImplementedError


@dataclass(frozen=True)
# TODO: Array can be a frozen data class again once property setters (e.g. shape) are removed
@dataclass(frozen=False)
class Array:
"""Instantiate an array from an initialized store."""

Expand Down Expand Up @@ -1297,6 +1353,11 @@ def shape(self) -> ChunkCoords:
"""
return self._async_array.shape

@shape.setter
def shape(self, value: ChunkCoords) -> None:
"""Sets the shape of the array by calling resize."""
self.resize(value)

@property
def chunks(self) -> ChunkCoords:
"""Returns a tuple of integers describing the length of each dimension of a chunk of the array.
Expand Down Expand Up @@ -2754,18 +2815,18 @@ def blocks(self) -> BlockIndex:
:func:`set_block_selection` for documentation and examples."""
return BlockIndex(self)

def resize(self, new_shape: ChunkCoords) -> Array:
def resize(self, new_shape: ShapeLike) -> None:
"""
Change the shape of the array by growing or shrinking one or more
dimensions.
This method does not modify the original Array object. Instead, it returns a new Array
with the specified shape.
Parameters
----------
new_shape : tuple
New shape of the array.
Notes
-----
When resizing an array, the data are not rearranged in any way.
If one or more dimensions are shrunk, any chunks falling outside the
new array shape will be deleted from the underlying store.
However, it is noteworthy that the chunks partially falling inside the new array
Expand All @@ -2778,7 +2839,6 @@ def resize(self, new_shape: ChunkCoords) -> Array:
>>> import zarr
>>> z = zarr.zeros(shape=(10000, 10000),
>>> chunk_shape=(1000, 1000),
>>> store=StorePath(MemoryStore(mode="w")),
>>> dtype="i4",)
>>> z.shape
(10000, 10000)
Expand All @@ -2791,10 +2851,43 @@ def resize(self, new_shape: ChunkCoords) -> Array:
>>> z2.shape
(50, 50)
"""
resized = sync(self._async_array.resize(new_shape))
# TODO: remove this cast when type inference improves
_resized = cast(AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], resized)
return type(self)(_resized)
sync(self._async_array.resize(new_shape))

def append(self, data: npt.ArrayLike, axis: int = 0) -> ChunkCoords:
"""Append `data` to `axis`.
Parameters
----------
data : array-like
Data to be appended.
axis : int
Axis along which to append.
Returns
-------
new_shape : tuple
Notes
-----
The size of all dimensions other than `axis` must match between this
array and `data`.
Examples
--------
>>> import numpy as np
>>> import zarr
>>> a = np.arange(10000000, dtype='i4').reshape(10000, 1000)
>>> z = zarr.array(a, chunks=(1000, 100))
>>> z.shape
(10000, 1000)
>>> z.append(a)
(20000, 1000)
>>> z.append(np.vstack([a, a]), axis=1)
(20000, 2000)
>>> z.shape
(20000, 2000)
"""
return sync(self._async_array.append(data, axis=axis))

def update_attributes(self, new_attributes: dict[str, JSON]) -> Array:
# TODO: remove this cast when type inference improves
Expand Down
Loading

0 comments on commit 1e30a28

Please sign in to comment.