Skip to content

Commit

Permalink
remove CloudManager
Browse files Browse the repository at this point in the history
  • Loading branch information
true-real-michael committed Nov 12, 2023
1 parent 1110bd9 commit b651377
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 128 deletions.
49 changes: 30 additions & 19 deletions octreelib/grid/grid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import random
from typing import List, Dict, Callable, Optional
from typing import List, Dict, Callable, Optional, Tuple

import k3d
import numpy as np
Expand All @@ -10,8 +10,8 @@
GridVisualizationType,
VisualizationConfig,
)
from octreelib.internal.point import PointCloud, Point, CloudManager
from octreelib.internal.voxel import Voxel
from octreelib.internal.point import PointCloud, Point
from octreelib.internal.voxel import Voxel, VoxelBase
from octreelib.octree_manager import OctreeManager

__all__ = ["Grid", "GridConfig"]
Expand All @@ -34,11 +34,11 @@ class Grid(GridBase):
def __init__(self, grid_config: GridConfig):
super().__init__(grid_config)

# {pose -> list of voxel coordinates}
self.__pose_voxel_coordinates: Dict[int, List[Point]] = {}
# {pose -> list of voxels}
self.__pose_voxel_coordinates: Dict[int, List[VoxelBase]] = {}

# {voxel coordinates hash -> octree}
self.__octrees: Dict[int, grid_config.octree_type] = {}
# {voxel -> octree manager}
self.__octrees: Dict[VoxelBase, grid_config.octree_type] = {}

def insert_points(self, pose_number: int, points: PointCloud):
"""
Expand All @@ -52,24 +52,35 @@ def insert_points(self, pose_number: int, points: PointCloud):
# register pose
self.__pose_voxel_coordinates[pose_number] = []

distributed_points = CloudManager.distribute_grid(
points, self._grid_config.grid_voxel_edge_length, self._grid_config.corner
)
# distribute points to voxels
voxel_indices = (
(
(points - self._grid_config.corner)
// self._grid_config.grid_voxel_edge_length
)
* self._grid_config.grid_voxel_edge_length
).astype(int)
distributed_points = {}
unique_indices = np.unique(voxel_indices, axis=0)
for unique_id in unique_indices:
mask = np.where((voxel_indices == unique_id).all(axis=1))
distributed_points[tuple(unique_id)] = points[mask]

# insert points to octrees
for voxel_coordinates, voxel_points in distributed_points.items():
voxel_coordinates_hash = tuple(voxel_coordinates)

# create octree in the voxel if it does not exist yet
if voxel_coordinates_hash not in self.__octrees:
self.__octrees[voxel_coordinates_hash] = self._grid_config.octree_type(
target_voxel = VoxelBase(
np.array(voxel_coordinates),
self._grid_config.grid_voxel_edge_length,
)
if target_voxel not in self.__octrees:
self.__octrees[target_voxel] = self._grid_config.octree_type(
self._grid_config.octree_config,
np.array(voxel_coordinates),
self._grid_config.grid_voxel_edge_length,
)

self.__pose_voxel_coordinates[pose_number].append(voxel_coordinates)
self.__octrees[voxel_coordinates_hash].insert_points(
pose_number, voxel_points
)
self.__pose_voxel_coordinates[pose_number].append(target_voxel)
self.__octrees[target_voxel].insert_points(pose_number, voxel_points)

def map_leaf_points(self, function: Callable[[PointCloud], PointCloud]):
"""
Expand Down
85 changes: 2 additions & 83 deletions octreelib/internal/point.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from __future__ import annotations

import itertools
from typing import Annotated, Literal, List, Tuple
from typing import Annotated, Literal

import numpy as np
import numpy.typing as npt


__all__ = ["Point", "PointCloud", "CloudManager"]
__all__ = ["Point", "PointCloud"]

"""
Point and PointCloud are intended to be used in the methods
Expand All @@ -18,83 +17,3 @@

Point = Annotated[npt.NDArray[np.float_], Literal[3]]
PointCloud = Annotated[npt.NDArray[np.float_], Literal["N", 3]]
HashablePoint = Tuple[float, float, float]


class CloudManager:
"""
This class implements methods for point cloud manipulation.
"""
def __init__(self):
raise TypeError("This class is not intended to be instantiated")

@classmethod
def hash_point(cls, point: Point):
"""
Hash point.
:param point: Point to hash.
:return: Hash of the point.
"""
return hash((point[0], point[1], point[2]))

@classmethod
def empty(cls):
"""
:return: Empty point cloud.
"""
return np.empty((0, 3), dtype=float)

@classmethod
def add(cls, points_a: Point, points_b: Point):
"""
Add two point clouds.
:param points_a: First point cloud.
:param points_b: Second point cloud.
:return: Combined point cloud.
"""
return np.vstack([points_a, points_b])

@classmethod
def distribute_grid(
cls, points: PointCloud, voxel_size: float, grid_start: Point
):
"""
Distribute points into voxels.
:param points: Points to distribute.
:param voxel_size: Edge length of the voxel.
:param grid_start: Start of the grid.
:return: Dictionary of voxel coordinates and points in the voxel.
"""
voxel_indices = (((points - grid_start) // voxel_size) * voxel_size).astype(int)
voxel_dict = {}
unique_indices = np.unique(voxel_indices, axis=0)

for unique_id in unique_indices:
mask = np.where((voxel_indices == unique_id).all(axis=1))
voxel_dict[tuple(unique_id)] = points[mask]

return voxel_dict

@classmethod
def distribute(
cls, points: PointCloud, corner_min: Point, edge_length: float
) -> List[PointCloud]:
"""
Distribute points into 8 octants.
:param points: Points to distribute.
:param corner_min: Corner of the octree node.
:param edge_length: Edge length of the octree node.
:return: List of point clouds in the octants.
"""
clouds = []

for offset in itertools.product([0, edge_length / 2], repeat=3):
child_corner_min = corner_min + np.array(offset)
child_corner_max = child_corner_min + edge_length / 2
mask = np.all(
(points >= child_corner_min) & (points < child_corner_max), axis=1
)
child_points = points[mask]
clouds.append(child_points)

return clouds
36 changes: 12 additions & 24 deletions octreelib/internal/voxel.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

import itertools
from typing import Optional

import numpy as np

from octreelib.internal.interfaces import WithID
from octreelib.internal.point import (
Point,
PointCloud,
CloudManager,
)
from octreelib.internal.point import Point, PointCloud

__all__ = ["Voxel", "VoxelBase"]

Expand All @@ -30,28 +28,18 @@ def __init__(
self._corner_min = corner_min
self._edge_length = edge_length

voxel_position_hash = hash(
(
CloudManager.hash_point(corner_min),
CloudManager.hash_point(corner_min + edge_length),
)
)
if self not in self._static_voxel_id_map:
self._static_voxel_id_map[self] = len(self._static_voxel_id_map)

if voxel_position_hash not in self._static_voxel_id_map:
self._static_voxel_id_map[voxel_position_hash] = len(
self._static_voxel_id_map
)
WithID.__init__(self, self._static_voxel_id_map[self])

WithID.__init__(self, self._static_voxel_id_map[voxel_position_hash])
def __hash__(self):
return hash((tuple(self._corner_min), self._edge_length))

def is_point_geometrically_inside(self, point: Point) -> bool:
"""
This method checks if the point is inside the voxel geometrically.
:param point: Point to check.
:return: True if point is inside the bounding box of a voxel, False if outside.
"""
return bool((point >= self.corner_min).all()) and bool(
(point <= self.corner_max).all()
def __eq__(self, other: VoxelBase):
return (
all(self.corner_min == other.corner_min)
and self.edge_length == other.edge_length
)

@property
Expand Down
14 changes: 12 additions & 2 deletions octreelib/octree/octree.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import numpy as np

from octreelib.internal import PointCloud, T, Voxel, CloudManager
from octreelib.internal import PointCloud, T, Voxel
from octreelib.octree.octree_base import OctreeBase, OctreeNodeBase, OctreeConfigBase

__all__ = ["OctreeNode", "Octree", "OctreeConfig"]
Expand Down Expand Up @@ -79,7 +79,17 @@ def insert_points(self, points: PointCloud):
:param points: Points to insert.
"""
if self._has_children:
clouds = CloudManager.distribute(points, self.corner_min, self.edge_length)
clouds = []

for offset in itertools.product([0, self.edge_length / 2], repeat=3):
child_corner_min = self.corner_min + np.array(offset)
child_corner_max = child_corner_min + self.edge_length / 2
mask = np.all(
(points >= child_corner_min) & (points < child_corner_max), axis=1
)
child_points = points[mask]
clouds.append(child_points)

for child, cloud in zip(self._children, clouds):
child.insert_points(cloud)
else:
Expand Down

0 comments on commit b651377

Please sign in to comment.