diff --git a/libs/ccc/coef/impl.py b/libs/ccc/coef/impl.py index f0d08411..5118ca5d 100644 --- a/libs/ccc/coef/impl.py +++ b/libs/ccc/coef/impl.py @@ -153,27 +153,44 @@ def get_parts( def get_feature_parts(params): """ - Given a list of parameters, it returns the partitions for each feature. + Given a list of parameters, it returns the partitions for each feature. The goal + of this function is to parallelize the partitioning step (get_parts function). Args: - params: tuple with four elements: 1) the indexes of the features, 2) the - data for the features, 3) the range of k values to cluster the features - into, and 4) a boolean indicating whether the feature is numerical or - not. + params: a list of tuples with three elements: 1) a tuple with the feature + index, the cluster index and the number of clusters (k), 2) the data for the + feature, and 3) a boolean indicating whether the feature is numerical or not. Returns: - A 3d array with the partitions for each feature. The first dimension is - the feature, the second dimension is the number of clusters, and the - third dimension is the number of objects. + A 2d array with the partitions (rows) for the selected features and number of + clusters. """ - _, data, range_n_clusters, data_types = params + n_objects = params[0][1].shape[0] + parts = np.zeros((len(params), n_objects), dtype=np.int16) - 1 + + # iterate over a list of tuples that indicate a feature-k pair + for p_idx, p in enumerate(params): + # the first element is a tuple with the feature index, the cluster index and the + # number of clusters (k) + info = p[0] + # f_idx = info[0] + c_idx = info[1] + c = info[2] + range_n_clusters = np.array([c], dtype=np.uint16) + + # the second element is the data for the feature + data = p[1] + + # the third element is a boolean indicating whether the feature is numerical + numerical_data_type = p[2] + + # if the feature is categorical, then only the first partition is filled + if not numerical_data_type and c_idx > 0: + continue - return np.array( - [ - get_parts(data[i], range_n_clusters, data_types[i]) - for i in range(data.shape[0]) - ] - ) + parts[p_idx] = get_parts(data, range_n_clusters, numerical_data_type) + + return parts def cdist_parts_basic(x: NDArray, y: NDArray) -> NDArray[float]: @@ -617,9 +634,9 @@ def ccc( X = np.zeros((n_features, n_objects)) X_numerical_type = np.full((n_features,), True, dtype=bool) - for idx in range(n_features): - X[idx, :], X_numerical_type[idx] = get_feature_type_and_encode( - x.iloc[:, idx] + for f_idx in range(n_features): + X[f_idx, :], X_numerical_type[f_idx] = get_feature_type_and_encode( + x.iloc[:, f_idx] ) else: raise ValueError("Wrong combination of parameters x and y") @@ -672,21 +689,40 @@ def ccc( map_func = pexecutor.map # pre-compute the internal partitions for each object in parallel - inputs = get_chunks(n_features, default_n_threads, n_chunks_threads_ratio) + # first, create a list with features-k pairs that will be used to parallelize + # the partitioning step + inputs = get_chunks( + [ + (f_idx, c_idx, c) + for f_idx in range(n_features) + for c_idx, c in enumerate(range_n_clusters) + ], + default_n_threads, + n_chunks_threads_ratio, + ) + + # then, flatten the list of features-k pairs into a list that is divided into + # chunks that will be used to parallelize the partitioning step. inputs = [ - ( - i, - X[i], - range_n_clusters, - X_numerical_type[i], - ) - for i in inputs + [ + ( + feature_k_pair, + X[feature_k_pair[0]], + X_numerical_type[feature_k_pair[0]], + ) + for feature_k_pair in chunk + ] + for chunk in inputs ] for params, ps in zip(inputs, map_func(get_feature_parts, inputs)): - idx = params[0] - parts[idx] = ps + # get the set of feature indexes and cluster indexes + f_idxs = [p[0][0] for p in params] + c_idxs = [p[0][1] for p in params] + + # update the partitions for each feature-k pair + parts[f_idxs, c_idxs] = ps # Below, there are two layers of parallelism: 1) parallel execution # across feature pairs and 2) the cdist_parts_parallel function, which @@ -727,11 +763,11 @@ def ccc( for params, (max_ari_list, max_part_idx_list, pvalues) in zip( inputs, map_func(compute_coef, inputs) ): - idx = params[0] + f_idx = params[0] - cm_values[idx] = max_ari_list - max_parts[idx, :] = max_part_idx_list - cm_pvalues[idx] = pvalues + cm_values[f_idx] = max_ari_list + max_parts[f_idx, :] = max_part_idx_list + cm_pvalues[f_idx] = pvalues # return an array of values or a single scalar, depending on the input data if cm_values.shape[0] == 1: