Skip to content

Commit

Permalink
ccc: improve parallelization of the partitioning step
Browse files Browse the repository at this point in the history
  • Loading branch information
miltondp committed Sep 11, 2023
1 parent 04e0aeb commit f9ca50f
Showing 1 changed file with 68 additions and 32 deletions.
100 changes: 68 additions & 32 deletions libs/ccc/coef/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,27 +153,44 @@ def get_parts(

def get_feature_parts(params):
"""
Given a list of parameters, it returns the partitions for each feature.
Given a list of parameters, it returns the partitions for each feature. The goal
of this function is to parallelize the partitioning step (get_parts function).
Args:
params: tuple with four elements: 1) the indexes of the features, 2) the
data for the features, 3) the range of k values to cluster the features
into, and 4) a boolean indicating whether the feature is numerical or
not.
params: a list of tuples with three elements: 1) a tuple with the feature
index, the cluster index and the number of clusters (k), 2) the data for the
feature, and 3) a boolean indicating whether the feature is numerical or not.
Returns:
A 3d array with the partitions for each feature. The first dimension is
the feature, the second dimension is the number of clusters, and the
third dimension is the number of objects.
A 2d array with the partitions (rows) for the selected features and number of
clusters.
"""
_, data, range_n_clusters, data_types = params
n_objects = params[0][1].shape[0]
parts = np.zeros((len(params), n_objects), dtype=np.int16) - 1

# iterate over a list of tuples that indicate a feature-k pair
for p_idx, p in enumerate(params):
# the first element is a tuple with the feature index, the cluster index and the
# number of clusters (k)
info = p[0]
# f_idx = info[0]
c_idx = info[1]
c = info[2]
range_n_clusters = np.array([c], dtype=np.uint16)

# the second element is the data for the feature
data = p[1]

# the third element is a boolean indicating whether the feature is numerical
numerical_data_type = p[2]

# if the feature is categorical, then only the first partition is filled
if not numerical_data_type and c_idx > 0:
continue

return np.array(
[
get_parts(data[i], range_n_clusters, data_types[i])
for i in range(data.shape[0])
]
)
parts[p_idx] = get_parts(data, range_n_clusters, numerical_data_type)

return parts


def cdist_parts_basic(x: NDArray, y: NDArray) -> NDArray[float]:
Expand Down Expand Up @@ -617,9 +634,9 @@ def ccc(
X = np.zeros((n_features, n_objects))
X_numerical_type = np.full((n_features,), True, dtype=bool)

for idx in range(n_features):
X[idx, :], X_numerical_type[idx] = get_feature_type_and_encode(
x.iloc[:, idx]
for f_idx in range(n_features):
X[f_idx, :], X_numerical_type[f_idx] = get_feature_type_and_encode(
x.iloc[:, f_idx]
)
else:
raise ValueError("Wrong combination of parameters x and y")
Expand Down Expand Up @@ -672,21 +689,40 @@ def ccc(
map_func = pexecutor.map

# pre-compute the internal partitions for each object in parallel
inputs = get_chunks(n_features, default_n_threads, n_chunks_threads_ratio)

# first, create a list with features-k pairs that will be used to parallelize
# the partitioning step
inputs = get_chunks(
[
(f_idx, c_idx, c)
for f_idx in range(n_features)
for c_idx, c in enumerate(range_n_clusters)
],
default_n_threads,
n_chunks_threads_ratio,
)

# then, flatten the list of features-k pairs into a list that is divided into
# chunks that will be used to parallelize the partitioning step.
inputs = [
(
i,
X[i],
range_n_clusters,
X_numerical_type[i],
)
for i in inputs
[
(
feature_k_pair,
X[feature_k_pair[0]],
X_numerical_type[feature_k_pair[0]],
)
for feature_k_pair in chunk
]
for chunk in inputs
]

for params, ps in zip(inputs, map_func(get_feature_parts, inputs)):
idx = params[0]
parts[idx] = ps
# get the set of feature indexes and cluster indexes
f_idxs = [p[0][0] for p in params]
c_idxs = [p[0][1] for p in params]

# update the partitions for each feature-k pair
parts[f_idxs, c_idxs] = ps

# Below, there are two layers of parallelism: 1) parallel execution
# across feature pairs and 2) the cdist_parts_parallel function, which
Expand Down Expand Up @@ -727,11 +763,11 @@ def ccc(
for params, (max_ari_list, max_part_idx_list, pvalues) in zip(
inputs, map_func(compute_coef, inputs)
):
idx = params[0]
f_idx = params[0]

cm_values[idx] = max_ari_list
max_parts[idx, :] = max_part_idx_list
cm_pvalues[idx] = pvalues
cm_values[f_idx] = max_ari_list
max_parts[f_idx, :] = max_part_idx_list
cm_pvalues[f_idx] = pvalues

# return an array of values or a single scalar, depending on the input data
if cm_values.shape[0] == 1:
Expand Down

0 comments on commit f9ca50f

Please sign in to comment.