diff --git a/python/paddle/distributed/auto_parallel/api.py b/python/paddle/distributed/auto_parallel/api.py index ef7539094f5db..d70df4a139b03 100644 --- a/python/paddle/distributed/auto_parallel/api.py +++ b/python/paddle/distributed/auto_parallel/api.py @@ -271,7 +271,7 @@ def dtensor_from_fn(fn, mesh, placements, *args, **kwargs): *args (tuple): A tuple of arguments to be passed to the ``fn`` function. **kwargs (dict): A dict of arguments to be passed to the ``fn`` function. - Retruns: + Returns: Tensor: A Tensor constructed from ``fn`` with distributed attributes. Examples: @@ -304,7 +304,7 @@ def reshard(dist_tensor, mesh, placements): be Shard, Replicate and Partial. Returns: - Tensor: A Distributed Tensor reshared with distributed attributes. + Tensor: A Distributed Tensor resharded with distributed attributes. Examples: .. code-block:: python @@ -465,7 +465,7 @@ def output_fn(outputs, process_mesh) -> list(paddle.Tensor) >>> layer = dist.shard_layer(layer, mesh, shard_fn) >>> print(layer) - >>> # This case need to be excuted in multi-card environment + >>> # This case need to be executed in multi-card environment >>> # export CUDA_VISIBLE_DEVICES=0,1 >>> # python -m paddle.distributed.launch {test_case}.py """ @@ -642,7 +642,7 @@ def step(self): def state_dict(self): """ - Create and shard the optimizer states e.g., acumulators and master_weights before load_state_dict. + Create and shard the optimizer states e.g., accumulators and master_weights before load_state_dict. If training has already started or the optimizer states are already created and sharded, do nothing. """ state_dict = self._inner_opt.state_dict() @@ -1552,7 +1552,7 @@ def unshard_dtensor(dist_tensor): dist_tensor ) # in static mode, 'distributed tensor' and 'dense tensor' are all - # Varialble type, the distributed attribute is a property of the Varibale. + # Variable type, the distributed attribute is a property of the Variable. # So, it's no need to convert the distributed tensor to a dense tensor. # We only need to modify its distributed attribute. empty_dist_attr = ( diff --git a/python/paddle/distributed/auto_parallel/placement_type.py b/python/paddle/distributed/auto_parallel/placement_type.py index 744197a326f01..53ce35781152b 100644 --- a/python/paddle/distributed/auto_parallel/placement_type.py +++ b/python/paddle/distributed/auto_parallel/placement_type.py @@ -52,11 +52,11 @@ def to_placements(dim_map, mesh, partial_idx=[]): def check_placements_equal(this, that): assert isinstance(this, list) and isinstance(that, list) - small_placemets = this if len(this) < len(that) else that + small_placements = this if len(this) < len(that) else that large_placements = that if len(this) < len(that) else this for i in range(len(large_placements)): - if i < len(small_placemets): - if small_placemets[i] != large_placements[i]: + if i < len(small_placements): + if small_placements[i] != large_placements[i]: return False else: if large_placements[i] != Replicate(): diff --git a/python/paddle/distributed/auto_parallel/process_mesh.py b/python/paddle/distributed/auto_parallel/process_mesh.py index dd0f84aba69ab..604688dc1f9b4 100644 --- a/python/paddle/distributed/auto_parallel/process_mesh.py +++ b/python/paddle/distributed/auto_parallel/process_mesh.py @@ -56,7 +56,7 @@ def get_unique_id_for_process_mesh(shape, process_ids): return unique_id -def retrive_unique_id_for_process_mesh(shape, process_ids): +def retrieve_unique_id_for_process_mesh(shape, process_ids): key = f"shape {shape}, process_ids {process_ids}" global _g_unique_process_mesh_map assert key in _g_unique_process_mesh_map diff --git a/python/paddle/distributed/auto_parallel/random.py b/python/paddle/distributed/auto_parallel/random.py index 17858bb45e14b..3d971ff9f40bf 100644 --- a/python/paddle/distributed/auto_parallel/random.py +++ b/python/paddle/distributed/auto_parallel/random.py @@ -17,7 +17,7 @@ import paddle from ..utils.log_utils import get_logger -from .process_mesh import retrive_unique_id_for_process_mesh +from .process_mesh import retrieve_unique_id_for_process_mesh from .static.utils import _get_idx_in_axis _logger = get_logger(logging.INFO) @@ -57,7 +57,7 @@ def parallel_manual_seed(seed, name=""): This function should be called only once before auto parallel compiles the computation graph (e.g. auto_parallel.engine.prepare() or fit()). - This seed only affects how randomness-relative **operators** (dropout, fuse op with dropout inside, etc) are execute amonge mesh, and would NOT affect other process like Parameter initialization. + This seed only affects how randomness-relative **operators** (dropout, fuse op with dropout inside, etc) are execute among mesh, and would NOT affect other process like Parameter initialization. Examples: # seed relative to training step @@ -102,7 +102,7 @@ def determinate_rng( # FIXME # unique_id = process_mesh.unique_id - unique_id = retrive_unique_id_for_process_mesh( + unique_id = retrieve_unique_id_for_process_mesh( process_mesh.shape, process_mesh.process_ids ) sharding_expr = name_ + f'mesh:{unique_id}' diff --git a/python/paddle/distributed/auto_parallel/static/converter.py b/python/paddle/distributed/auto_parallel/static/converter.py index a46340994e9a2..c7cd4e32d6e42 100644 --- a/python/paddle/distributed/auto_parallel/static/converter.py +++ b/python/paddle/distributed/auto_parallel/static/converter.py @@ -291,7 +291,7 @@ def merge_with_dist_attr(tensor_list, dist_attr): ) # merge the tensor with dist_attr partition_tensor_list = [] - merged_partiton = [] + merged_partition = [] for process in process_group: partition_index = Resharder.compute_partition_index( process, @@ -301,8 +301,8 @@ def merge_with_dist_attr(tensor_list, dist_attr): process_group, ) index = process_group.index(process) - if partition_index not in merged_partiton: - merged_partiton.append(partition_index) + if partition_index not in merged_partition: + merged_partition.append(partition_index) Converter.merge( partition_tensor_list, tensor_list[index], diff --git a/python/paddle/distributed/auto_parallel/static/partitioner.py b/python/paddle/distributed/auto_parallel/static/partitioner.py index b1f6e0c589569..024c921e60ba2 100644 --- a/python/paddle/distributed/auto_parallel/static/partitioner.py +++ b/python/paddle/distributed/auto_parallel/static/partitioner.py @@ -161,7 +161,7 @@ def partition_startup_program( ) target_block._sync_with_cpp() - # set distribute atrribute + # set distribute attribute new_op = target_block.ops[-1] assert new_op.type == new_op_desc.type() assert new_op.desc == new_op_desc diff --git a/python/paddle/distributed/auto_parallel/static/reshard.py b/python/paddle/distributed/auto_parallel/static/reshard.py index f41c50dad4df1..9155e59791596 100644 --- a/python/paddle/distributed/auto_parallel/static/reshard.py +++ b/python/paddle/distributed/auto_parallel/static/reshard.py @@ -1658,8 +1658,8 @@ def find_op_desc_seq( # TODO(zhaoyingli): Remove the method to a pass. # Current method to get all pp_ranks' relationship must rely on reshard. # When reshard insert send/recv pair, the process_group has the pp relationship. - # But the mothod to obtain pp_ranks' relationship is only supported in 'reshard_input', - # casue 'reshard_output' only has current process_group view instead of global view. + # But the method to obtain pp_ranks' relationship is only supported in 'reshard_input', + # cause 'reshard_output' only has current process_group view instead of global view. op_role = dist_attr[-1] if int(op_role) == int(OpRole.Forward): self.dist_context.up_down_streams.add_pair_stream( @@ -1695,7 +1695,7 @@ def find_op_desc_seq( ) ) - # In the same process group, it will use allgahther and slice op. + # In the same process group, it will use allgather and slice op. else: # NOTE: It just supports even partition scene. partition_index_list = [] @@ -1868,7 +1868,7 @@ def parse_op_desc( """ # Parse all communicator groups for all ranks - # Ensure every rank has a global view of communicator groups for entire cluters. + # Ensure every rank has a global view of communicator groups for entire cluster. # When initialize communicators for pipeline parallel, every rank could # conduct a correct global synchronization. for rank_id in op_desc_seq: @@ -2449,7 +2449,7 @@ def get_op_input_attrs(self, op, var_name): op_input_attrs = self._get_subblock_input_attrs(op, var_name) if not op_input_attrs: # NOTE: [hack method] - # Adapt to quantization pass, which presist_vars, including inputs and outputs, all are in global_block. + # Adapt to quantization pass, which persist_vars, including inputs and outputs, all are in global_block. # Therefore, the while_op's inputs will contain the all persist_vars, which will be inputs or output of the quantization op in subblock. op_input_attrs = self._get_subblock_output_attrs(op, var_name) else: @@ -2927,7 +2927,7 @@ def _is_special_op(op): dist_tensor.dist_attr, ) else: - # Ensure every rank has a global view of communicator groups for entire cluters. + # Ensure every rank has a global view of communicator groups for entire cluster. # When initialize communicators for pipeline parallel, every rank could # conduct a correct global synchronization. new_process_group( @@ -2971,7 +2971,7 @@ def _is_special_op(op): dist_tensor.dist_attr, ) else: - # Ensure every rank has a global view of communicator groups for entire cluters. + # Ensure every rank has a global view of communicator groups for entire cluster. # When initialize communicators for pipeline parallel, every rank could # conduct a correct global synchronization. new_process_group( @@ -3010,7 +3010,7 @@ def reshard(self): self.dist_params_grads, ) - # remove no need vars and ops in the startip program + # remove no need vars and ops in the startup program Remover.remove_no_need_in_startup( self.auto_parallel_main_prog, self.auto_parallel_startup_prog ) @@ -3249,10 +3249,10 @@ def _get_idx(comm_ranks, group_ranks): ) elif isinstance(op_desc, ConcatOpDesc): partition_index_list = op_desc._partition_index_list - for idx, partion_idex in enumerate(partition_index_list): + for idx, partition_idex in enumerate(partition_index_list): self._concat_partitions_for_cost( partition_tensor_list, - partion_idex, + partition_idex, dtype, key, local_rank_comp_cost, diff --git a/python/paddle/distributed/auto_parallel/static/tuner/parallel_tuner.py b/python/paddle/distributed/auto_parallel/static/tuner/parallel_tuner.py index c2c1055663ccc..4e76caa46aa6e 100644 --- a/python/paddle/distributed/auto_parallel/static/tuner/parallel_tuner.py +++ b/python/paddle/distributed/auto_parallel/static/tuner/parallel_tuner.py @@ -66,7 +66,7 @@ def __init__( self._seed, "mode", self._mode, - "num_machies", + "num_machines", self._num_machines, "num_devices_per_machine", self._num_devices_per_machine, diff --git a/python/paddle/distributed/auto_parallel/static/tuner/rule_based_tuner.py b/python/paddle/distributed/auto_parallel/static/tuner/rule_based_tuner.py index 4a88948e90f77..9b08388c649ff 100644 --- a/python/paddle/distributed/auto_parallel/static/tuner/rule_based_tuner.py +++ b/python/paddle/distributed/auto_parallel/static/tuner/rule_based_tuner.py @@ -67,7 +67,7 @@ def register(): pattern = cls() _PATTERNS[pattern.name] = pattern # sort patterns according to the number of sharded tensors - # set its dist attr by the fisrt one when a tensor can be matched by multiple patterns. + # set its dist attr by the first one when a tensor can be matched by multiple patterns. _PATTERNS = dict( sorted( _PATTERNS.items(), key=lambda x: -x[1].attrs["sharded_tensors"] @@ -201,7 +201,7 @@ def build(self): # define reshape reshape = self.add_node(1, **{"type": "reshape2"}) - # define reshape input egde + # define reshape input edge x_edge = self.add_edge(input.id, reshape.id, **{"input_name": "X"}) # define reshape out @@ -991,14 +991,14 @@ def partition_cluster( device_mesh.append([1, partition[1]]) device_meshes.append(device_mesh) else: - incerement = 1 if partition_result[-1] == [1] else 0 + increment = 1 if partition_result[-1] == [1] else 0 for partition in partition_result: if len(partition) < 2: continue device_mesh = [] for i in range(partition[0]): device_mesh.append([partition[1], m]) - device_mesh[-1][0] += incerement + device_mesh[-1][0] += increment device_meshes.append(device_mesh) return device_meshes diff --git a/python/paddle/distributed/auto_parallel/static/utils.py b/python/paddle/distributed/auto_parallel/static/utils.py index fdc8b2a28ed47..2fe48dcdfa5f6 100644 --- a/python/paddle/distributed/auto_parallel/static/utils.py +++ b/python/paddle/distributed/auto_parallel/static/utils.py @@ -319,7 +319,7 @@ def _get_idx_in_axis(processes, shape, axis, rank): Given a rank and the processes mesh the rank belongs to, compute the index of the rank in given axis. - Example: 27 processes managed in a 3-Dimensinal mesh with shape of [3, 3, 3]. + Example: 27 processes managed in a 3-Dimensional mesh with shape of [3, 3, 3]. the index of rank 22 are: in axis 0: 1 in axis 1: 1 @@ -861,7 +861,7 @@ def merge_and_slice_parameter(dist_param_dict, pre_dist_attr, cur_dist_attr): """ Merge parameters with previous dist_attr and slice parameters with current dist_attr - Arags: + Args: dist_param_dict(dict): parameters' value of all ranks. pre_dist_attr(dict): parameters' dist_attr of last training process. cur_dist_attr(dict): parameters' dist_attr of current training process. @@ -962,14 +962,14 @@ def _merge_parameter_with_dist_attr(param_list, dist_attr): ) # merge the parameter with dist_attr partition_param_list = [] - merged_partiton = [] + merged_partition = [] for process in process_group: partition_index = Resharder.compute_partition_index( process, complete_shape, dims_mapping, process_shape, process_group ) index = process_group.index(process) - if partition_index not in merged_partiton: - merged_partiton.append(partition_index) + if partition_index not in merged_partition: + merged_partition.append(partition_index) _merge_parameter( partition_param_list, param_list[index], @@ -1539,10 +1539,10 @@ def get_all_distributed_main_program( class SerialProgramInfo: def __init__( - self, train_program, satrtup_program, loss, optimizer, cluster=None + self, train_program, startup_program, loss, optimizer, cluster=None ): self._train_program = train_program - self._startup_program = satrtup_program + self._startup_program = startup_program self._loss = loss self._optimizer = optimizer self._cluster = cluster @@ -1700,7 +1700,7 @@ def set_dist_op_desc_original_id(dist_op_desc, op_desc, dist_context): elif op_original_id in dist_context._dist_ops_for_program: dist_op_desc.set_original_id(op_original_id) return - # Third, print error infomation if we cannot find the original id + # Third, print error information if we cannot find the original id else: raise AssertionError( "Cannot find the original id in the distributed context" @@ -1748,7 +1748,7 @@ def get_var_numel(var): input: - var: variable return: - number of elemnet in var + number of element in var """ assert isinstance(var, Variable) assert -1 not in var.shape @@ -1835,7 +1835,7 @@ def initialize_pg_in_full_mode(all_process_groups, cur_rank): ) client_sockets[send_rank].close() print( - "It is able to instantiate {} as recver now.".format( + "It is able to instantiate {} as receiver now.".format( process_group.ranks ) ) @@ -1982,7 +1982,7 @@ def set_data_parallel(x): def is_naive_data_parallel(dist_context): - # Navie data parallel only completes dist_attr once from the front to back. + # Naive data parallel only completes dist_attr once from the front to back. if not dist_context.data_parallel: return False