Skip to content

Commit

Permalink
Updates to only use docker --privileged when required and check cpu…
Browse files Browse the repository at this point in the history
…set (intel#150)

* Update numactl usage

* Update error handling

* Check to make sure it's in cpuset

* Update forming cpu list

* Style fixes and add unit tests

* Test updates

* Updates to figure out which cores are on which node

* Update to print debug message

* Update to organize the cpuset list by node:

* Check args to see if docker should run with privileged

* Add unit tests

* Update numa_cores_per_instance for 'socket' setting

* update for cores per instance 'socket'

* Add doc update

* Remove unused import

* Style fixes

* update to cpuset list

* make str

* limit length

* update cores_per_node

* Move num_physical_cores calculation

* update num inter/intra threads

* Fix intra threads

* Updates to platform util to explain the core lists

* Update unit tests

* Unit test updates

* Add tests with limited cpusets

* Remove debug print

* Add additional error handling and info

* Update base benchmark util

* Fix conditionals

* Update messages

* Update for numa_cores_per_instance 'socket' when sockets have different number of cores

* Fix conditionals for checking numa cores per instance socket

* Add another unit test to check the case when the --socket-id specified does not have any cores in the cpuset

* Removing conditional in the validate function since it's being done in the init function

* Add stderr=PIPE so that the terminal doesn't show an error when PlatformUtils is used before numactl is installed
  • Loading branch information
dmsuehir authored Oct 21, 2021
1 parent aca4879 commit 1f7f7df
Show file tree
Hide file tree
Showing 38 changed files with 860 additions and 216 deletions.
51 changes: 35 additions & 16 deletions benchmarks/common/base_benchmark_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,21 +315,34 @@ def _validate_args(self):
if args.mpi:
raise ValueError("--mpi_num_processes cannot be used together with --numa-cores-per-instance.")

if args.numa_cores_per_instance == "socket":
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket

if args.socket_id != -1:
if int(args.numa_cores_per_instance) > self._platform_util.num_cores_per_socket:
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
"number of cores per socket {} when a single socket (--socket-id {}) "
"is being used.".format(args.numa_cores_per_instance,
self._platform_util.num_cores_per_socket,
args.socket_id))
else:
if int(args.numa_cores_per_instance) > system_num_cores:
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
"number of system cores ({}).".format(args.numa_cores_per_instance,
system_num_cores))
if args.numa_cores_per_instance != "socket":
if args.socket_id != -1:
if int(args.numa_cores_per_instance) > self._platform_util.num_cores_per_socket:
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
"number of cores per socket {} when a single socket (--socket-id {}) "
"is being used.".format(args.numa_cores_per_instance,
self._platform_util.num_cores_per_socket,
args.socket_id))
else:
if int(args.numa_cores_per_instance) > system_num_cores:
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
"number of system cores ({}).".format(args.numa_cores_per_instance,
system_num_cores))

# If socket id is specified and we have a cpuset, make sure that there are some cores in the specified socket.
# If cores are limited, then print out a note about that.
if args.socket_id != -1 and self._platform_util.cpuset_cpus:
cpuset_len_for_socket = 0

if args.socket_id in self._platform_util.cpuset_cpus.keys():
cpuset_len_for_socket = len(self._platform_util.cpuset_cpus[args.socket_id])

if cpuset_len_for_socket == 0:
sys.exit("ERROR: There are no socket id {} cores in the cpuset.".format(args.socket_id))
elif cpuset_len_for_socket < self._platform_util.num_cores_per_socket:
print("Note: Socket id {} is specified, but the cpuset has limited this socket to {} cores. "
"This is less than the number of cores per socket on the system ({})".
format(args.socket_id, cpuset_len_for_socket, self._platform_util.num_cores_per_socket))

def initialize_model(self, args, unknown_args):
"""Create model initializer for the specified model"""
Expand All @@ -340,7 +353,13 @@ def initialize_model(self, args, unknown_args):
os.path.dirname(os.path.realpath(__file__)))

if args.numa_cores_per_instance == "socket":
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket
if self._platform_util.cpuset_cpus:
if args.socket_id != -1:
args.numa_cores_per_instance = len(self._platform_util.cpuset_cpus[args.socket_id])
else:
args.numa_cores_per_instance = "socket"
else:
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket

# find the path to the model_init.py file
filename = "{}.py".format(self.MODEL_INITIALIZER)
Expand Down
89 changes: 62 additions & 27 deletions benchmarks/common/base_model_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,38 @@ def run_numactl_multi_instance(self, cmd, replace_unique_output_dir=None):
swap out that path for a path with the instance number in the folder name
so that each instance uses a unique output folder.
"""
# Get the cores list and group them according to the number of cores per instance
cores_per_instance = int(self.args.numa_cores_per_instance)
cpu_cores_list = self.platform_util.cpu_core_list

if self.args.socket_id != -1:
# If it's specified to just use a single socket, then only use the cores from that socket
if len(cpu_cores_list) > self.args.socket_id:
cpu_cores_list = cpu_cores_list[self.args.socket_id]

if self.args.numa_cores_per_instance != "socket":
# Get the cores list and group them according to the number of cores per instance
cores_per_instance = int(self.args.numa_cores_per_instance)
cpu_cores_list = self.platform_util.cpu_core_list

if self.args.socket_id != -1:
# If it's specified to just use a single socket, then only use the cores from that socket
if len(cpu_cores_list) > self.args.socket_id:
cpu_cores_list = cpu_cores_list[self.args.socket_id]
else:
raise ValueError("Error while trying to get the core list for socket {0}. "
"The core list does not have cores for socket {0}.\n "
"Core list: {1}\n".format(self.args.socket_id, str(cpu_cores_list)))
else:
raise ValueError("Error while trying to get the core list for socket {0}. "
"The core list does not have cores for socket {0}.\n "
"Core list: {1}\n".format(self.args.socket_id, str(cpu_cores_list)))
else:
# Using cores from all sockets
combined_core_list = []
for socket_cores in cpu_cores_list:
combined_core_list += socket_cores
cpu_cores_list = combined_core_list
# Using cores from all sockets
combined_core_list = []
for socket_cores in cpu_cores_list:
combined_core_list += socket_cores
cpu_cores_list = combined_core_list

instance_cores_list = self.group_cores(cpu_cores_list, cores_per_instance)
instance_cores_list = self.group_cores(cpu_cores_list, cores_per_instance)
else:
instance_cores_list = []
cores_per_instance = "socket"
# Cores should be grouped based on the cores for each socket
if self.args.socket_id != -1:
# Only using cores from one socket
instance_cores_list[0] = self.platform_util.cpu_core_list[self.args.socket_id]
else:
# Get the cores for each socket
instance_cores_list = self.platform_util.cpu_core_list

# Setup the log file name with the model name, precision, mode, batch size (if there is one),
# number of cores per instance. An extra {} is intentionally left in the log_filename_format
Expand All @@ -188,11 +200,14 @@ def run_numactl_multi_instance(self, cmd, replace_unique_output_dir=None):
# Loop through each instance and add that instance's command to a string
multi_instance_command = ""
for instance_num, core_list in enumerate(instance_cores_list):
if len(core_list) < int(cores_per_instance):
if cores_per_instance != "socket" and len(core_list) < int(cores_per_instance):
print("NOTE: Skipping remainder of {} cores for instance {}"
.format(len(core_list), instance_num))
continue

if len(core_list) == 0:
continue

prefix = ("OMP_NUM_THREADS={0} "
"numactl --localalloc --physcpubind={1}").format(
len(core_list), ",".join(core_list))
Expand Down Expand Up @@ -340,31 +355,51 @@ def set_num_inter_intra_threads(self, num_inter_threads=None, num_intra_threads=

if self.args.numa_cores_per_instance:
# Set default num inter/intra threads if the user didn't provide specific values
if self.args.numa_cores_per_instance == "socket":
if self.args.socket_id != -1:
inter_threads = len(self.platform_util.cpu_core_list[self.args.socket_id])
else:
# since we can only have one value for inter threads and the number of cores
# per socket can vary, if the cpuset is limited, get the lowest core count
# per socket and use that as the num inter threads
inter_threads = min([len(i) for i in self.platform_util.cpu_core_list if len(i) > 0])
else:
inter_threads = self.args.numa_cores_per_instance

if not self.args.num_inter_threads:
self.args.num_inter_threads = 1
if not self.args.num_intra_threads:
self.args.num_intra_threads = self.args.numa_cores_per_instance
self.args.num_intra_threads = inter_threads
if not self.args.data_num_inter_threads:
self.args.data_num_inter_threads = 1
if not self.args.data_num_intra_threads:
self.args.data_num_intra_threads = self.args.numa_cores_per_instance
self.args.data_num_intra_threads = inter_threads
elif self.args.socket_id != -1:
if not self.args.num_inter_threads:
self.args.num_inter_threads = 1
if not self.args.num_intra_threads:
self.args.num_intra_threads = \
self.platform_util.num_cores_per_socket \
if self.args.num_cores == -1 else self.args.num_cores
if self.args.num_cores != -1:
self.args.num_intra_threads = self.args.num_cores
elif self.platform_util.cpuset_cpus and \
self.args.socket_id in self.platform_util.cpuset_cpus.keys():
self.args.num_intra_threads = len(self.platform_util.cpuset_cpus[self.args.socket_id])
else:
self.args.num_intra_threads = self.platform_util.num_cores_per_socket
else:
if not self.args.num_inter_threads:
self.args.num_inter_threads = self.platform_util.num_cpu_sockets
if os.environ["MPI_NUM_PROCESSES"] != "None":
self.args.num_inter_threads = 1
if not self.args.num_intra_threads:
if self.args.num_cores == -1:
self.args.num_intra_threads = \
int(self.platform_util.num_cores_per_socket *
self.platform_util.num_cpu_sockets)
if self.platform_util.cpuset_cpus and len(self.platform_util.cpuset_cpus.keys()) > 0:
# Total up the number of cores in the cpuset
self.args.num_intra_threads = sum([len(self.platform_util.cpuset_cpus[socket_id])
for socket_id in self.platform_util.cpuset_cpus.keys()])
else:
self.args.num_intra_threads = \
int(self.platform_util.num_cores_per_socket *
self.platform_util.num_cpu_sockets)
if os.environ["MPI_NUM_PROCESSES"] != "None":
self.args.num_intra_threads = self.platform_util.num_cores_per_socket - 2
else:
Expand Down
110 changes: 105 additions & 5 deletions benchmarks/common/platform_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
CORES_PER_SOCKET_STR_ = "Core(s) per socket"
THREADS_PER_CORE_STR_ = "Thread(s) per core"
LOGICAL_CPUS_STR_ = "CPU(s)"
NUMA_NODE_CPU_RANGE_STR_ = "NUMA node{} CPU(s):"
ONLINE_CPUS_LIST = "On-line CPU(s) list:"


class CPUInfo():
Expand Down Expand Up @@ -192,8 +194,16 @@ def __init__(self, args):
self.num_threads_per_core = 0
self.num_logical_cpus = 0
self.num_numa_nodes = 0

# Core list generated by numactl -H in the case where --numa-cores-per-instance is
# being used. It then gets pruned based on the cpuset_cpus, in case docker is
# limiting the cores that the container has access to
self.cpu_core_list = []

# Dictionary generated from the cpuset.cpus file (in linux_init) for the case where
# docker is limiting the number of cores that the container has access to
self.cpuset_cpus = None

os_type = system_platform.system()
if "Windows" == os_type:
self.windows_init()
Expand All @@ -204,6 +214,45 @@ def __init__(self, args):
else:
raise ValueError("Unable to determine Operating system type.")

def _get_list_from_string_ranges(self, str_ranges):
"""
Converts a string of numbered ranges (comma separated numbers or ranges) to an
integer list. Duplicates should be removed and the integer list should be
ordered.
For example an input of "3-6,10,0-5" should return [0, 1, 2, 3, 4, 5, 6, 10]
"""
result_list = []

for section in str_ranges.split(","):
if "-" in section:
# Section is a range, so get the start and end values
start, end = section.split("-")
section_list = range(int(start), int(end) + 1)
result_list += section_list
else:
# This section is just a single number, not a range
result_list.append(int(section))

# Remove duplicates
result_list = list(set(result_list))

return result_list

def _get_cpuset(self):
"""
Try to get the cpuset.cpus info, since lscpu does not know if docker has limited
the cpuset accessible to the container
"""
cpuset = ""
cpuset_cpus_file = "/sys/fs/cgroup/cpuset/cpuset.cpus"
if os.path.exists(cpuset_cpus_file):
with open(cpuset_cpus_file, "r") as f:
cpuset = f.read()

if self.args.verbose:
print("cpuset.cpus: {}".format(cpuset))
return cpuset

def linux_init(self):
lscpu_cmd = "lscpu"
try:
Expand All @@ -219,6 +268,9 @@ def linux_init(self):
print("Problem getting CPU info: {}".format(e))
sys.exit(1)

core_list_per_node = {}
online_cpus_list = ""

# parse it
for line in cpu_info:
# NUMA_NODES_STR_ = "NUMA node(s)"
Expand All @@ -236,28 +288,76 @@ def linux_init(self):
# LOGICAL_CPUS_STR_ = "CPU(s)"
elif line.find(LOGICAL_CPUS_STR_) == 0:
self.num_logical_cpus = int(line.split(":")[1].strip())
# ONLINE_CPUS_LIST = "On-line CPU(s) list"
elif line.find(ONLINE_CPUS_LIST) == 0:
online_cpus_list = line.split(":")[1].strip()
else:
# Get the ranges of cores per node from NUMA node* CPU(s)
for node in range(0, self.num_numa_nodes):
if line.find(NUMA_NODE_CPU_RANGE_STR_.format(str(node))) == 0:
range_for_node = line.split(":")[1].strip()
range_list_for_node = self._get_list_from_string_ranges(range_for_node)
core_list_per_node[node] = range_list_for_node

# Try to get the cpuset.cpus info, since lscpu does not know if the cpuset is limited
cpuset = self._get_cpuset()
if cpuset:
# If the cpuset is the same as the online_cpus_list, then we are using the whole
# machine, so let's avoid unnecessary complexity and don't bother with the cpuset_cpu list
if (online_cpus_list != "" and online_cpus_list != cpuset) or online_cpus_list == "":
self.cpuset_cpus = self._get_list_from_string_ranges(cpuset)

# Uses numactl get the core number for each numa node and adds the cores for each
# node to the cpu_cores_list array
if self.num_numa_nodes > 0:
# node to the cpu_cores_list array. Only do this if the command is trying to use
# numa_cores_per_instance we can't count on numactl being installed otherwise and
# this list is only used for the numactl multi-instance runs.
num_physical_cores = self.num_cpu_sockets * self.num_cores_per_socket
cores_per_node = int(num_physical_cores / self.num_numa_nodes)
if self.num_numa_nodes > 0 and self.args.numa_cores_per_instance is not None:
try:
# Get the list of cores
num_physical_cores = self.num_cpu_sockets * self.num_cores_per_socket
cores_per_node = int(num_physical_cores / self.num_numa_nodes)
cpu_array_command = \
"numactl -H | grep 'node [0-9]* cpus:' |" \
"sed 's/.*node [0-9]* cpus: *//' | head -{0} |cut -f1-{1} -d' '".format(
self.num_numa_nodes, int(cores_per_node))
cpu_array = subprocess.Popen(
cpu_array_command, shell=True, stdout=subprocess.PIPE).stdout.readlines()
cpu_array_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()

for node_cpus in cpu_array:
node_cpus = str(node_cpus).lstrip("b'").replace("\\n'", " ")
self.cpu_core_list.append([x for x in node_cpus.split(" ") if x != ''])

# If we have the cpuset list, cross check that list with our core list and
# remove cores that are not part of the cpuset list
if self.cpuset_cpus is not None:
for socket, core_list in enumerate(self.cpu_core_list):
self.cpu_core_list[socket] = [x for x in core_list if int(x) in self.cpuset_cpus]

if (self.args.verbose):
print("Core list: {}".format(self.cpu_core_list), flush=True)

except Exception as e:
print("Warning: An error occured when getting the list of cores using '{}':\n {}".
format(cpu_array_command, e))

if self.cpuset_cpus is not None:
# Reformat the cpuset_cpus list so that it's split up by node
for node in core_list_per_node.keys():
core_list_per_node[node] = [x for x in core_list_per_node[node] if x in self.cpuset_cpus]
self.cpuset_cpus = core_list_per_node

# Remove cores that aren't part of the cpu_core_list
for socket in self.cpuset_cpus.keys():
if len(self.cpuset_cpus[socket]) > cores_per_node:
del self.cpuset_cpus[socket][cores_per_node:]

# Remove keys with empty lists (sockets where there are no cores enabled in the cpuset)
self.cpuset_cpus = {k: v for k, v in self.cpuset_cpus.items() if v}

# Update the number of sockets based on the cpuset
if len(self.cpuset_cpus.keys()) > 0:
self.num_cpu_sockets = len(self.cpuset_cpus.keys())

def windows_init(self):
NUM_SOCKETS_STR_ = "DeviceID"
CORES_PER_SOCKET_STR_ = "NumberOfCores"
Expand Down
Loading

0 comments on commit 1f7f7df

Please sign in to comment.