import sys
from itertools import product
from shutil import rmtree
import os
import Pabq.common_tools.FileLock as flck
[docs]def dummyfunc(*args, **kargs):
""" Null function used for testing :func:`~PySS.parametric.run_factorial`"""
return("")
[docs]def divisors(n):
"""
Divisors of an integer.
Return all the possible divisors for a given integer.
Parameters
----------
n: int
Returns
-------
int
Notes
-----
"""
large_divisors = []
for i in range(1, int(n**.5 + 1)):
if n % i == 0:
yield i
if i * i != n:
large_divisors.append(int(n / i))
for divisor in reversed(large_divisors):
yield divisor
[docs]def get_queued(filename):
"""
Read a batch info file and return the indexes of the queued jobs.
Parameters
----------
filename : str
Filename of the batch status file.
"""
queue = []
with flck.FileLock(filename):
with open(filename, "r") as f:
for i, line in enumerate(f):
if sys.version[0] == "2":
curr_line = line.split(";")
else:
curr_line = line.split(sep=";")
if curr_line[-1] == "QUEUED\n":
queue.append(i - 2)
return queue
[docs]def goto_next_queued(filename):
"""
Read a batch info file and return the indexes of the queued jobs.
Parameters
----------
filename : str
Filename of the batch status file.
"""
with flck.FileLock(filename):
lines = open(filename, "r").readlines()
for i, line in enumerate(lines):
if sys.version[0] == "2":
curr_line = line.split(";")
else:
curr_line = line.split(sep=";")
if curr_line[-1] == "QUEUED\n":
lines[i] = ";".join(curr_line[:-1]) + ";RUNNING\n"
with open(filename, "w") as f:
f.writelines(lines)
return i - 2
return False
[docs]def update_job_status(filename, job_nr, new_status):
"""
Change status of a job number on a batch status file.
Parameters
----------
filename : str
Filename of the batch status file.
job_nr : int
Index of the job to be updated.
new_status : str
String to be written as new status.
"""
line_nr = job_nr + 2
with flck.FileLock(filename):
lines = open(filename, "r").readlines()
if sys.version[0] == "2":
curr_line = lines[line_nr].split(";")
else:
curr_line = lines[line_nr].split(sep=";")
lines[line_nr] = ";".join(curr_line[:-1]) + ";" + new_status + "\n"
with open(filename, "w") as f:
f.writelines(lines)
[docs]def run_factorial(
prj_name,
exec_func,
func_args=None,
func_kargs=None,
p_args=None,
p_kargs=None,
mk_subdirs=None,
del_subdirs=None,
):
"""
Run a function for a full factorial table of input parameters.
Creates a full factorial matrix for a given list of parameters and executes a function for each combination. A list
of values is needed for each parameter.
Parameters
----------
prj_name : str
Name of the parametric project. Used for directory name and filenames.
exec_func : function
Function to be executed parametrically.
func_args : list, optional
A list containing the arguments to be passed to the function, both parametric and static.
func_kargs : dict, optional
A dictionaty of all the keyword arguments to be passed to the function, both parametric and static.
p_args : list of integers, optional
A list of integers, indicating the positions on the func_args list of the arguments for which the parametric
matrix is composed. On the positions indicated by p_args, the func_args must contain list items.
p_kargs : list of strings, optional
A list of the keys of all the keyword arguments that need to be executed parametrically, similarly to p_args.
mk_subdirs : bool, optional
If `True`, each combination is executed in a different, automatically created, subdirectory.
Useful when the executed job generates files on the working directory. If subdirectory with the same name
exists, the job will enter and execute in the directory ignoring pre-existing files.
Default is False (run all jobs in the current directory)
del_subdirs : bool, optional
Remove the subdirectories for the individual jobs and keep only the results summary.
This argument is used in combination with the mk_subdirs.
Default is False (do not remove).
Returns
-------
list
List item containing the results of all the executed runs
"""
# Defaults
if p_args is None:
p_args = []
if p_kargs is None:
p_kargs = []
if mk_subdirs is None:
mk_subdirs = False
if del_subdirs is None:
del_subdirs = False
# Pick up the parametric variables from the list of arguments
if func_args:
param_args = [func_args[x] for x in p_args]
else:
param_args = []
if func_kargs:
param_kargs = [func_kargs[x] for x in p_kargs]
else:
param_kargs = []
print("The parametric argument values are: \n", param_args)
print("The parametric keyword argument values are: \n", param_kargs)
# Cartesian product of parameters
all_params = param_args + param_kargs
combinations = list(product(*all_params))
# Check which of the given parametric arguments are numeric and find our how many leading zeros are needed based on
# the largest value.
isnumeric = lambda x: isinstance(x, int) or isinstance(x, float)
numeric_dimensions = [param_args.index(x) for x in param_args if isnumeric(max(x))]
print("numeric: ", numeric_dimensions)
print("parametric: ", param_args)
leading_zeros = []
for n, x in enumerate(param_args):
if n in numeric_dimensions:
leading_zeros.append(len(str(max(x))))
else:
leading_zeros.append(None)
print("leading: ", leading_zeros)
#leading_zeros = [len(str(max(x))) for x in param_args if isnumeric(max(x))]
# Write the combinations in a file
batch_status_file = prj_name + "_batch_status.csv"
if not os.path.isfile(batch_status_file):
with open(batch_status_file, "w") as f:
f.write(str(len(param_args)+len(param_kargs)) + " dimensions\n")
f.write("job_parameters;status\n")
for dimensions in combinations:
dim_string = ""
for idx, dimension in enumerate(dimensions):
if idx in numeric_dimensions:
dim_string = dim_string + str(dimension).zfill(leading_zeros[idx]) + ";"
else:
dim_string = dim_string + str(dimension) + ";"
dim_string = dim_string + "QUEUED" + "\n"
f.write(dim_string)
# Initiate (if doesn't exist) a file for the results
results_file = prj_name + "_results.csv"
if not os.path.isfile(results_file):
with open(results_file, "w") as f:
f.write(str(len(param_args)+len(param_kargs)) + " dimensions\n")
f.write("(For this database to be loaded with the PySS.fem.ParametricDB(), replace this line with the "
"titles of the columns in a semicolon separated list. TeX expressions can be used.)\n")
for dimensions in combinations:
dim_string = ""
for idx, dimension in enumerate(dimensions):
if idx in numeric_dimensions:
dim_string = dim_string + str(dimension).zfill(leading_zeros[idx]) + ";"
else:
dim_string = dim_string + str(dimension) + ";"
dim_string = dim_string + "Wait for it...!" + "\n"
f.write(dim_string)
# Initiate a list for the results
prj_results = []
while True:
curr_job_nr = goto_next_queued(batch_status_file)
if curr_job_nr or curr_job_nr is 0:
comb_case = combinations[curr_job_nr]
comb_case_str = ""
for n_param, c_param in enumerate(comb_case):
if n_param in numeric_dimensions:
comb_case_str = comb_case_str+str(c_param).zfill(leading_zeros[n_param])+" "
else:
comb_case_str = comb_case_str+str(c_param)+" "
# Construct an id string for the current job based on the combination. The string is formatted so that it
# does contain any illegal characters for filename and abaqus job name usage.
if sys.version[0] == "2":
job_id = str(comb_case_str).translate(None, ",.&*~!()[]{}|;:\'\"`<>?/\\")
else:
job_id = str(comb_case_str).translate(str.maketrans("", "", ",.&*~!()[]{}|;:\'\"`<>?/\\"))
job_id = job_id.replace(" ", "-")
job_id = job_id + prj_name
# # Check if the directory exists. If so, continue to the next job.
# if os.path.isdir("./" + job_id):
# print("Job already exists: A directory with the same name'" + job_id + "' exists in the cwd")
# continue
if mk_subdirs:
if os.path.isdir(job_id):
os.chdir(job_id)
else:
# Make a new subdirectory for the current session, if it
# doesn't already exist
# NOTE: This 'try-except' should be redundant. I already
# checked for an existing subdir in the if-else condition.
try:
os.mkdir(job_id)
except OSerror as exc:
if exc.errno == 17:
pass
else:
raise
# Change working directory
os.chdir(job_id)
# copyfile("../abq_env_" + os.environ["SLURM_JOBID"] + ".env", "./abaqus_v6.env")
# Assemble current job's input arguments
if func_args:
current_func_args = func_args
for (index, new_parameter) in zip(p_args, comb_case):
current_func_args[index] = new_parameter
else:
current_func_args = []
# Assemble current job's input keyword arguments
if func_kargs:
current_func_kargs = func_kargs
for i, x in enumerate(p_kargs):
current_func_kargs[x] = comb_case[len(p_args) + i]
else:
current_func_kargs = {}
current_func_kargs["id_string"] = job_id
# The function to be run for the full factorial parametric is
# called here
print(
"Running job nr: " + str(curr_job_nr) + " with name: " + job_id
)
# Execute the current job
try:
result = exec_func(*current_func_args, **current_func_kargs)
new_status = "COMPLETED"
except:
result = []
new_status = "FAILED"
#result = exec_func(*current_func_args, **current_func_kargs)
#new_status = "COMPLETED"
# Create an output string
result = str(result)
# Return to parent directory
if mk_subdirs is True:
os.chdir('../')
# Add the result of the current job to the return list
prj_results = prj_results + [result]
# Remove job's folder (only the output information is kept)
if mk_subdirs and del_subdirs is True:
rmtree(job_id)
# Write the result of the current job to the file
update_job_status(results_file, curr_job_nr, result)
# Update the job status on the common batch status file
update_job_status(batch_status_file, curr_job_nr, new_status)
else:
break
return prj_results
[docs]def subdir_crawler(
exec_func,
func_args=None,
func_kargs=None,
prj_name=None,):
"""
Execute function in all subdirs.
Parameters
----------
exec_func : function
Function to be executed parametrically.
func_args : list, optional
List of arguments for the function. No arguments by default.
func_kargs : dict, optional
Dictionary of keyword arguments for the function. No keyword arguments
by default.
prj_name : str, optional
Project name.
"""
if func_args is None:
func_args = ()
if func_kargs is None:
func_kargs = {}
if prj_name is None:
prj_name = ""
all_dirs = [i for i in os.listdir('.') if os.path.isdir(i)]
all_dirs.sort()
for directory in all_dirs:
os.chdir(directory)
# Execute the current job
try:
func_return = exec_func(*func_args, **func_kargs)
except:
func_return = "FAILED"
os.chdir("..")
with open('./' + prj_name + '_info.csv', 'a') as out_file:
out_file.write(directory + ";" + str(func_return) + "\n")