Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spatially chunked and sharding compatible annotation writer #522

Open
wants to merge 49 commits into
base: master
Choose a base branch
from

Conversation

fcollman
Copy link
Contributor

I suspect this PR is not viable, but it adds chunking support and sharding support via cloudvolume. If there is an alternative way to implement the shard writing we could use that instead. It doesn't address truly parallel writing or downsampling yet, but this does work better than putting things all in one spatial bucket.

@jbms
Copy link
Collaborator

jbms commented Jan 30, 2024

Thanks. For writing the sharded format we can use tensorstore:

https://google.github.io/tensorstore/kvstore/neuroglancer_uint64_sharded/index.html

It would be nice to calculate the sharding spec automatically. Here is a pseudocode version of how the sharding spec is chosen by the Google-internal pipeline. total_count is the total number of keys, total_bytes is the approximate total size in bytes, and hashed is set to true for the id and relationship indices and false for the spatial indices.

function choose_output_spec(int total_count, int total_bytes, bool hashed) {
if (total_count == 1) {
   // Use non-sharded format
} else {
    auto* options = output_spec.mutable_sharding_options();
    options->set_hash(
        hashed ? NeuroglancerUint64ShardedOptions::MURMURHASH3_X86_128
               : NeuroglancerUint64ShardedOptions::IDENTITY_HASH);

    // 1000 entries per minishard results in an (uncompressed) minishard size of
    // 24000.  This provides a reasonable tradeoff between overhead in
    // retrieving unnecessary entries and the overhead of additional requests to
    // retrieve additional neighboring entries that are required.  Note that to
    // ensure entries in the same minishard are nearby in the id space, we have
    // to set `preshift_bits` to a non-zero value.
    const size_t minishard_target_count = 1000;
    size_t total_minishard_bits = 0;
    while ((total_count >> total_minishard_bits) > minishard_target_count) {
      ++total_minishard_bits;
    }
    const size_t shard_target_size = 500000000;
    size_t shard_bits = 0;
    while ((total_bytes >> shard_bits) > shard_target_size) {
      ++shard_bits;
    }
    // Pick `preshift_bits` so for dense ids there is locality in the minishard
    // assignment.  We limit this by `minishard_target_count`, however, to
    // ensure we don't end up with an excessively large minishard.
    size_t preshift_bits = 0;
    while (minishard_target_count >> preshift_bits) {
      ++preshift_bits;
    }
    options->set_preshift_bits(preshift_bits);
    options->set_shard_bits(shard_bits);
    options->set_minishard_bits(total_minishard_bits -
                                std::min(total_minishard_bits, shard_bits));
    options->mutable_minishard_index_compression()
        ->mutable_gzip_compression()
        ->set_level(9);
    options->mutable_data_compression()->mutable_gzip_compression()->set_level(
        6);
  }
  return output_spec;

@fcollman
Copy link
Contributor Author

fcollman commented Jan 31, 2024

I switched to tensorstore for writing and translated your automated sharding parameter function to python. Remaining to do would be to:

  1. Add the sharding to the other elements of the format (spatial index and relationships)
  2. Add downsampling
  3. Generalize the file path to allow for file://, gs://, s3:// etc...
  4. Make the lower bound dynamic based on the data rather than hard-coded and pre-determined.

One question I have is whether tensorstore might eventually interface a spatial grabbing of annotations via the array style interface, but return the binary encoded objects as json rather than as byte arrays.

Perhaps there is already a workflow to enable this now..

@fcollman
Copy link
Contributor Author

fcollman commented Feb 1, 2024

I'm having trouble getting the spatial bins to work with sharding. They were working fine without sharding and I thought i understood that the key was the compressed morton code, and i tried two different implementations that were giving the same answer, but neuroglancer doesn't load points written with this code.

I'm not sure if the issue is with the morton code or some other misunderstanding of how these things are suppose to be encoded.

As an example the chunk index [9,6,5] within a grid of size [13, 8, 9] gets the morton code index of 917. This is translated to a uint64 representation of b'\x95\x03\x00\x00\x00\x00\x00\x00'.

@jbms do you have any insight or can you check that my encoding is not somehow mangling the key?

@fcollman
Copy link
Contributor Author

fcollman commented Feb 1, 2024

I've checked in my WIP progress code here with some print statements/etc that should be removed if it were working.

@jbms
Copy link
Collaborator

jbms commented Feb 2, 2024

I think your computation of the morton code is correct, the problem is that tensorstore expects the keys to be encoded as big endian rather than little endian (that way the lexicographical ordering of the keys corresponds to the numerical order).

@fcollman
Copy link
Contributor Author

fcollman commented Feb 2, 2024

Ah! thanks for that tip. This is fixed now, and works pretty well. I've updated the description and successfully used this to write 3.7 million annotations with several properties and a relationship on my laptop in about 7 minutes to add all the annotation objects to the writer, and 1 minute to write the data (350 MB into 15 files).

Future improvements that could still be done:

  1. Spatial downsampling layers, including dynamic sizing of chunks.
  2. Generalize writing to file paths or to other locations via file://, gs://, etc..
  3. Demonstrate a proof-of-concept multi-process based map/reduce implementation that streams processing of annotations rather than batch processing to break out of memory limitations [likely beyond the scope of what one would want to do in this project]

I think having this implementation is already better than the original and shows everyone the patterns to follow, so I think the PR could be included now as is.

A related question i have is where it would make sense to have a precomputed annotation reading class that implements

  1. reading annotations from this format in bulk
  2. reading annotations based on relationship indexing
  3. reading annotations based on spatial cutouts

This is similar in functionality to tensorstore's interface, but it's not going to return Nd blocks of data so i'm not sure it goes there.

@jbms
Copy link
Collaborator

jbms commented Feb 2, 2024

Regarding a library for reading the annotations, I agree that makes sense to have. While the spatial indexing has some similarities to tensorstore indexing, ultimately there are a lot of differences such that I don't see it fitting in with tensorstore.

It appears that the slow speed is unrelated to your changes, but as a separate matter it would be interesting to see if this can be sped up, likely with a batch interface for adding an entire table of annotations at once, because I expect it should be possible, at least with an optimized C/C++ implementation, to write the 3.7 million annotations in less than a second.

@fcollman fcollman changed the title initial sharding compatible version using cloud-volume Spatially chunked and sharding compatible annotation writer Feb 3, 2024
self.properties.sort(key=lambda p: -_PROPERTY_DTYPES[p.type][1])
self.annotations = []
self.rank = coordinate_space.rank
self.dtype = _get_dtype_for_geometry(
annotation_type, coordinate_space.rank
) + _get_dtype_for_properties(self.properties)

# if chunk_size is an integer, then make it a sequence
if isinstance(experimental_chunk_size, numbers.Integral):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be numbers.Real instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

shape=(self.rank,), fill_value=experimental_chunk_size, dtype=np.int32
)
else:
chunk_size = cast(Sequence[int], experimental_chunk_size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequence[float]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

# if chunk_size is an integer, then make it a sequence
if isinstance(experimental_chunk_size, numbers.Integral):
self.chunk_size = np.full(
shape=(self.rank,), fill_value=experimental_chunk_size, dtype=np.int32
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dtype should be float64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -197,20 +407,34 @@ def _add_obj(self, coords: Sequence[float], id: Optional[int], **kwargs):
id=id, encoded=encoded.tobytes(), relationships=related_ids
)

for i in range(int(n_spatial_coords)):
chunk_index = self.get_chunk_index(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For annotation types other than a point, we need to add all of the intersecting chunks. For example, for a line we need all of the chunks that intersect the line, not just the endpoints, as otherwise when viewing some portion of the middle of the line it might not be displayed in Neuroglancer.

There are probably various approaches that could be taken to compute the set of intersecting chunks, but the approach I took was to compute the bounding box in grid space, and then for each chunk within the bounding box do the exact line or ellipsoid intersection test. For axis_aligned_bounding_box no exact test is needed since the bounding box is exact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this handled during downsampling? Each annotation can appear multiple times per downsampling level, but should appear in all chunked it overlaps with within that level? So does one randomly choose annotations amongst the entire level to show at that level or does one choose randomly amongst the anntotations that intersect with the chunk? If the latter how do you resolve that with the requirement that it be listed for all chunks that it intersects with? Does one first do the sampling by chunks, but then take that filtered list and do the spatial overlap test to see if it gets listed in any spatial chunk? Therefore sometimes ending up with more annotations in a spatial chunk than the maximum that you were going for?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a description of how the spatial index is computed:

// Writes the multi-level spatial index.  Returns the spatial index metadata.
//
// This involves the following steps:
//
// 1. To initialize the `remaining_annotations` table, key each input annotation
//    with the all-zero vector as its grid cell.
//
// 2. For each spatial index level `level_i`, starting at the coarsest level
//    (`level_i=0`):
//
//    2.1. For each dimension with variable chunk size: if `level_i == 0`, the
//         grid cell size is equal to the full range of the dimension; if
//         `level_i != 0`, the grid cell size is equal to either 1 or 1/2 times
//         the cell size for `level_i - 1`, whichever leads to a more isotropic
//         chunk (the dimension with the largest cell size is always halved).
//
//    2.2. Compute the `keyed_by_cell` table containing for each annotation in
//         `remaining_annotations` keyed by its `parent_cell` in level
//         `level_i - 1` (or the root all-zero vector grid cell if
//         `level_i == 0`) a copy of the annotation keyed by each child grid
//         cell in `level_i` it intersects.
//
//    2.3. Compute `max_per_cell`, the maximum number of annotations per cell.
//
//    2.4. Compute the `output_by_cell` table containing a random
//         `min(1, max_annotations_per_spatial_cell / double(max_per_cell))`
//         fraction of `keyed_by_cell`.  Also compute the updated value of
//         `remaining_annotations`, equal to the elements of `keyed_by_cell` not
//         included in `output_by_cell`.
//
//    2.5. Write the contents of `output_by_cell` as the `level_i` spatial
//         index.

The sampling is done based on the annotation id, using a hash, which ensures that an annotation is picked in all cells if it is picked in one cell.

We can't exactly limit the number of annotations per cell but it is a binomial distribution and assuming the hash is reasonable won't be too high.

We have to pick a single sampling probability for all cells to ensure that the relative density of different regions is preserved.

Annotations that span a large number of cells are problematic, since then adding a spatial index level does not necessarily reduce the number of annotations per cell. Hopefully you don't need that case to work. A different type of spatial index would probably needed in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now added a general purpose nd index (rtree) to the code to keep track of where annotations are so they can be written into the appropriate chunks.

I've added lower_bound and upper_bound parameters to the _add_obj and _add_two_point_obj functions, with custom values set depending on the annotation type.

this should set the code up better for doing downsampling as this general spatial index can be used across the scales to query for annotations that exist in different spatial boxes.

@jbms
Copy link
Collaborator

jbms commented Feb 13, 2024

For your reference, here is the code that I've used to compute grid cell/annotation intersections:

template <bool Floor = true>
void GetGridCell(tensorstore::span<int64_t> result,
                 tensorstore::span<const float> point,
                 tensorstore::span<const float> grid_origin,
                 tensorstore::span<const float> cell_shape) {
  const size_t rank = result.size();
  DCHECK_EQ(rank, point.size());
  DCHECK_EQ(rank, grid_origin.size());
  DCHECK_EQ(rank, cell_shape.size());
  for (size_t i = 0; i < rank; ++i) {
    float cell = (point[i] - grid_origin[i]) / cell_shape[i];
    cell = Floor ? std::floor(cell) : std::ceil(cell);
    result[i] = static_cast<int64_t>(cell);
  }
}

tensorstore::Box<> GetGridBox(
    tensorstore::span<const float> point_a,
    tensorstore::span<const float> point_b,
    tensorstore::span<const float> grid_origin,
    tensorstore::span<const float> cell_shape,
    tensorstore::span<const int64_t> parent_cell,
    tensorstore::span<const int64_t> relative_cell_shape) {
  const size_t rank = point_a.size();
  DCHECK_EQ(rank, point_b.size());
  DCHECK_EQ(rank, grid_origin.size());
  DCHECK_EQ(rank, cell_shape.size());
  DCHECK_EQ(rank, parent_cell.size());
  DCHECK_EQ(rank, relative_cell_shape.size());
  tensorstore::Box<> result(rank);
  std::vector<float> temp(rank);
  for (size_t i = 0; i < rank; ++i) {
    const float a = point_a[i], b = point_b[i];
    const float lower = std::min(a, b);
    const float upper = std::max(a, b);
    const float origin = grid_origin[i];
    const float cell_size = cell_shape[i];
    int64_t start_cell =
        static_cast<int64_t>(std::floor((lower - origin) / cell_size));
    int64_t end_cell =
        static_cast<int64_t>(std::ceil((upper - origin) / cell_size));
    start_cell = std::max(start_cell, parent_cell[i] * relative_cell_shape[i]);
    end_cell = std::max(
        start_cell,
        std::min(end_cell, (parent_cell[i] + 1) * relative_cell_shape[i]));
    result.origin()[i] = start_cell;
    result.shape()[i] = end_cell - start_cell;
  }
  return result;
}
bool LineIntersectsCell(tensorstore::span<const float> point_a,
                        tensorstore::span<const float> point_b,
                        tensorstore::span<const float> grid_origin,
                        tensorstore::span<const float> cell_shape,
                        tensorstore::span<const int64_t> cell) {
  const size_t rank = point_a.size();
  DCHECK_EQ(rank, point_b.size());
  DCHECK_EQ(rank, grid_origin.size());
  DCHECK_EQ(rank, cell_shape.size());
  DCHECK_EQ(rank, cell.size());
  float min_t = 0, max_t = 1;
  for (size_t i = 0; i < rank; ++i) {
    const float a = point_a[i], b = point_b[i];
    const float line_lower = std::min(a, b);
    const float line_upper = std::max(a, b);
    const float box_lower = grid_origin[i] + cell_shape[i] * cell[i];
    const float box_upper = box_lower + cell_shape[i];
    if (box_lower > line_lower) {
      float t = (box_lower - line_lower) / (line_upper - line_lower);
      if (!std::isfinite(t) || t > 1) return false;
      min_t = std::max(min_t, t);
    }
    if (box_upper < line_upper) {
      float t = (box_upper - line_lower) / (line_upper - line_lower);
      if (!std::isfinite(t) || t < 0) return false;
      max_t = std::min(max_t, t);
    }
  }
  return max_t >= min_t;
}

enum class EllipsoidCellOverlap {
  // Ellipsoid does not intersect cell
  kDisjoint,
  // Ellipsoid intersects cell but does not fully contain it.
  kIntersectsCell,
  // Ellipsoid fully contains cell.
  kContainsCell,
};

EllipsoidCellOverlap GetEllipsoidCellOverlap(
    tensorstore::span<const float> center, tensorstore::span<const float> radii,
    tensorstore::span<const float> grid_origin,
    tensorstore::span<const float> cell_shape,
    tensorstore::span<const int64_t> cell) {
  const size_t rank = center.size();
  DCHECK_EQ(rank, radii.size());
  DCHECK_EQ(rank, grid_origin.size());
  DCHECK_EQ(rank, cell_shape.size());
  DCHECK_EQ(rank, cell.size());
  float min_sum = 0;
  float max_sum = 0;
  for (size_t i = 0; i < rank; ++i) {
    const float cell_size = cell_shape[i];
    const float cell_start = cell[i] * cell_size + grid_origin[i];
    const float cell_end = cell_start + cell_size;
    const float center_pos = center[i];
    const float start_dist = std::abs(cell_start - center_pos);
    const float end_dist = std::abs(cell_end - center_pos);
    const float min_distance =
        (center_pos >= cell_start && center_pos <= cell_end)
            ? 0
            : std::min(start_dist, end_dist);
    const float max_distance = std::max(start_dist, end_dist);
    const float r = radii[i];
    min_sum += min_distance * min_distance / (r * r);
    max_sum += max_distance * max_distance / (r * r);
  }
  if (min_sum <= 1) {
    return (max_sum <= 1) ? EllipsoidCellOverlap::kContainsCell
                          : EllipsoidCellOverlap::kIntersectsCell;
  }
  return EllipsoidCellOverlap::kDisjoint;
}
}  // namespace

void ForEachOverlappingChildGridCell(
    AnnotationType annotation_type, tensorstore::span<const float> geometry,
    tensorstore::span<const float> grid_origin,
    tensorstore::span<const float> cell_shape,
    tensorstore::span<const int64_t> parent_cell,
    tensorstore::span<const int64_t> relative_cell_shape,
    absl::FunctionRef<void(const CellPosition& cell)> callback) {
  const size_t rank = grid_origin.size();
  DCHECK_EQ(rank, cell_shape.size());
  DCHECK_EQ(rank, parent_cell.size());
  DCHECK_EQ(rank, relative_cell_shape.size());
  switch (annotation_type) {
    case ExportPrecomputedAnnotationsRequest::POINT: {
      DCHECK_EQ(rank, geometry.size());
      CellPosition cell(rank);
      GetGridCell(cell, geometry, grid_origin, cell_shape);
      callback(cell);
      break;
    }
    case ExportPrecomputedAnnotationsRequest::LINE: {
      DCHECK_EQ(rank * 2, geometry.size());
      tensorstore::IterateOverIndexRange(
          GetGridBox(geometry.subspan(0, rank), geometry.subspan(rank, rank),
                     grid_origin, cell_shape, parent_cell, relative_cell_shape),
          [&](auto cell) {
            CellPosition vec(cell.begin(), cell.end());
            if (!LineIntersectsCell(geometry.subspan(0, rank),
                                    geometry.subspan(rank, rank), grid_origin,
                                    cell_shape, vec)) {
              return;
            }
            callback(vec);
          });
      break;
    }
    case ExportPrecomputedAnnotationsRequest::AXIS_ALIGNED_BOUNDING_BOX: {
      DCHECK_EQ(rank * 2, geometry.size());
      tensorstore::IterateOverIndexRange(
          GetGridBox(geometry.subspan(0, rank), geometry.subspan(rank, rank),
                     grid_origin, cell_shape, parent_cell, relative_cell_shape),
          [&](auto cell) {
            CellPosition vec(cell.begin(), cell.end());
            callback(vec);
          });
      break;
    }
    case ExportPrecomputedAnnotationsRequest::ELLIPSOID: {
      DCHECK_EQ(rank * 2, geometry.size());
      std::vector<float> temp(rank * 2);
      for (size_t i = 0; i < rank; ++i) {
        const float center = geometry[i];
        const float radius = geometry[rank + i];
        temp[i] = center - radius;
        temp[rank + i] = center + radius;
      }
      tensorstore::IterateOverIndexRange(
          GetGridBox(tensorstore::span(temp).subspan(0, rank),
                     tensorstore::span(temp).subspan(rank, rank), grid_origin,
                     cell_shape, parent_cell, relative_cell_shape),
          [&](auto cell) {
            CellPosition vec(cell.begin(), cell.end());
            switch (GetEllipsoidCellOverlap(geometry.subspan(0, rank),
                                            geometry.subspan(rank, rank),
                                            grid_origin, cell_shape, vec)) {
              case EllipsoidCellOverlap::kDisjoint:
                break;
              case EllipsoidCellOverlap::kIntersectsCell:
              case EllipsoidCellOverlap::kContainsCell:
                callback(vec);
                break;
            }
          });
      break;
    }
    case ExportPrecomputedAnnotationsRequest::INVALID_ANNOTATION_TYPE:
      break;
  }
}

@@ -193,7 +193,9 @@ def get_encoded_subvolume(self, data_format, start, end, scale_key):
or np.prod(downsample_factor) > self.max_downsampling
):
raise ValueError("Invalid downsampling factor.")
downsampled_shape = np.cast[np.int64](np.ceil(self.shape / downsample_factor))
downsampled_shape = np.asarray(
np.ceil(self.shape / downsample_factor, dtype=np.int64)
Copy link

@mikejhuang mikejhuang Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm running into a snag here.

TypeError: No loop matching the specified signature and casting was found for ufunc ceil

I moved the dtype to the asarray and it seemed to have worked.
downsampled_shape = np.asarray( np.ceil(self.shape / downsample_factor), dtype=np.int64 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks fixed!

@fcollman
Copy link
Contributor Author

I have restructured the PR now with a more general spatial index.

I also discovered issues with my compressed_morton_code representation and replaced it with a function lifted from cloud-volume (BSD license) which does the dynamic bit encoding properly.

I think i've also fixed some issues that existed in the previous iteration with the lower and upper bound calculations for ellisoid and axis aligned boxes, where it was assumed that the coordinates for the first and second points were sorted to have the smallest values for all dimensions in the first point and the largest in the second, but i'm not sure that's generally true.

@jbms
Copy link
Collaborator

jbms commented Feb 20, 2024

I have restructured the PR now with a more general spatial index.

Depending on rtree could be fine in principle but I don't think it is advantageous for this use case (even if you are creating a multi-level spatial index): without a spatial index, we iterate over each annotation, determine the lower/upper bounds, iterate over each chunk within the bounds, check exact intersection (for ellipsoid and line).

When using rtree, we instead iterate over each chunk, find all potentially intersecting annotations, and then check exact intersections (for ellipsoid and line).

Except for the exact intersection checks, which the rtree doesn't help with anyway, and which will be limited to a constant factor when doing a multi-level index, the amount of work is linear in the size of the resultant index when not using the rtree. (When building a multi-level index, we have to do this work for every level of the spatial index, and therefore the total cost is O(n log n), where n is the size of the index.)

When using the rtree, we still end up iterating over the same number of elements, just in a different order, plus there is the (probably insignificant) rtree lookup cost for each chunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants