import copy
from pathlib import Path
from diffpy.utils.diffraction_objects import (
ANGLEQUANTITIES,
QQUANTITIES,
XQUANTITIES,
)
from diffpy.utils.tools import (
_load_config,
check_and_build_global_config,
compute_mu_using_xraydb,
compute_mud,
get_package_info,
get_user_info,
)
# Reference values are taken from
# https://x-server.gmca.aps.anl.gov/cgi/www_dbli.exe?x0hdb=waves
# Ka1Ka2 values are calculated as: (Ka1 * 2 + Ka2) / 3
# For CuKa1Ka2: (1.54056 * 2 + 1.544398) / 3 = 1.54184
WAVELENGTHS = {
"Mo": 0.71073,
"MoKa1": 0.70930,
"MoKa1Ka2": 0.71073,
"Ag": 0.56087,
"AgKa1": 0.55941,
"AgKa1Ka2": 0.56087,
"Cu": 1.54184,
"CuKa1": 1.54056,
"CuKa1Ka2": 1.54184,
}
known_sources = [key for key in WAVELENGTHS.keys()]
# Exclude wavelength to avoid duplication,
# as it's written explicitly by diffpy.utils dump function.
# Exclude "theoretical_from_density" and "theoretical_from_packing"
# as they are only used for theoretical mu*D estimation
# and will be written into separate arguments for clarity.
METADATA_KEYS_TO_EXCLUDE = [
"output_correction",
"input",
"input_paths",
"force",
"energy",
]
[docs]
def set_output_directory(args):
"""Set the output directory based on the given input arguments.
It is determined as follows:
If user provides an output directory, use it.
Otherwise, we set it to the current directory if nothing is provided.
We then create the directory if it does not exist.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments,
with output_directory as the full path to the output file directory.
"""
output_dir = (
Path(args.output_directory).resolve()
if args.output_directory
else Path.cwd().resolve()
)
output_dir.mkdir(parents=True, exist_ok=True)
args.output_directory = output_dir
return args
def _expand_user_input(args):
"""Expand the list of inputs by adding files from file lists and
wildcards.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments with the modified input list.
"""
file_list_inputs = [
input_name for input_name in args.input if "file_list" in input_name
]
for file_list_input in file_list_inputs:
with open(file_list_input, "r") as f:
file_inputs = [input_name.strip() for input_name in f.readlines()]
args.input.extend(file_inputs)
args.input.remove(file_list_input)
wildcard_inputs = [
input_name for input_name in args.input if "*" in input_name
]
for wildcard_input in wildcard_inputs:
input_files = [
str(file)
for file in Path(".").glob(wildcard_input)
if "file_list" not in file.name
and "diffpyconfig.json" not in file.name
]
args.input.extend(input_files)
args.input.remove(wildcard_input)
return args
[docs]
def normalize_wavelength(args):
"""Normalize args.wavelength to a float.
If args.wavelength is:
- None: return args unchanged
- float-like: convert to float
- string: look up corresponding value in WAVELENGTHS (case-insensitive)
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments with args.wavelength.
Raises
------
ValueError
If a string wavelength is not a known source.
"""
if args.wavelength is None:
return args
try:
args.wavelength = float(args.wavelength)
return args
except (TypeError, ValueError):
pass
key = str(args.wavelength).strip()
matched = next(
(k for k in WAVELENGTHS if k.lower() == key.lower()),
None,
)
if matched is None:
raise ValueError(
f"Anode type '{args.wavelength}' not recognized. "
f"Please rerun specifying an anode type from {*known_sources, }."
)
args.wavelength = WAVELENGTHS[matched]
return args
[docs]
def load_wavelength_from_config_file(args):
"""Load wavelength from config files.
It prioritizes values in the following order:
1. cli inputs, 2. local config file, 3. global config file.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments with the updated wavelength and anode type.
"""
if args.wavelength is not None:
return normalize_wavelength(args)
global_config_file = _load_config(Path().home() / "diffpyconfig.json")
local_config_file = _load_config(Path().cwd() / "diffpyconfig.json")
config_file = None
if (
isinstance(local_config_file, dict)
and "wavelength" in local_config_file
):
config_file = local_config_file
elif (
isinstance(global_config_file, dict)
and "wavelength" in global_config_file
):
config_file = global_config_file
if config_file is not None:
args.wavelength = config_file.get("wavelength")
return normalize_wavelength(args)
else:
raise ValueError(
"\nThe wavelength was not specified and no "
"configuration file 'diffpyconfig.json' containing "
"the wavelength or X-ray source was found in either the "
"local or home directories. Either specify the wavelength "
"or source using the -w/--wavelength option or "
"create a configuration file.\n\n"
"You can add the wavelength or anode type to a "
"configuration file on this computer. Once created, it "
"will be automatically used for subsequent diffpy data "
"by default, and you will only need to do this once.\n\n"
"For detailed instructions on creating the configuration "
"file, please refer to:\n"
"https://www.diffpy.org/diffpy.labpdfproc/examples/"
"toolsexample.html"
)
[docs]
def set_wavelength(args):
"""Set the wavelength based on args.wavelength.
args.wavelength may be:
- None
- a number (explicit wavelength in Å)
- a string (X-ray source name)
If a string is provided, it must match a key in WAVELENGTHS.
Parameters
----------
args : argparse.Namespace
Raises
------
ValueError
If wavelength is required but missing,
if a string wavelength is not a known source,
or if a numeric wavelength is non-positive.
Returns
-------
args : argparse.Namespace
Updated arguments with args.wavelength as a float.
"""
args = normalize_wavelength(args)
if args.wavelength is None:
if args.xtype not in ANGLEQUANTITIES:
raise ValueError(
f"Please provide a wavelength or anode type "
f"because the independent variable axis is not on two-theta. "
f"Allowed anode types are {*known_sources, }."
)
return args
if args.wavelength <= 0:
raise ValueError(
f"Wavelength = {args.wavelength} is not valid. "
"Please rerun specifying a known anode type "
"or a positive wavelength."
)
return args
[docs]
def set_xtype(args):
"""Set the xtype based on the given input arguments, raise an error
if xtype is not one of {*XQUANTITIES, }.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments with the xtype as one of q, tth, or d.
"""
if args.xtype.lower() not in XQUANTITIES:
raise ValueError(
f"Unknown xtype: {args.xtype}. "
f"Allowed xtypes are {*XQUANTITIES, }."
)
args.xtype = (
"q"
if args.xtype.lower() in QQUANTITIES
else "tth" if args.xtype.lower() in ANGLEQUANTITIES else "d"
)
return args
def _set_mud_from_zscan(args):
"""Experimental estimation of mu*D from a z-scan file."""
filepath = Path(args.z_scan_file).resolve()
if not filepath.is_file():
raise FileNotFoundError(
f"Cannot find {args.z_scan_file}. "
f"Please specify a valid file path."
)
args.z_scan_file = str(filepath)
args.mud = compute_mud(filepath)
return args
def _parse_theoretical_input(input_str):
"""Helper function to parse and validate the input string."""
parts = [part.strip() for part in input_str.split(",")]
if len(parts) != 3:
raise ValueError(
f"Invalid mu*D input '{input_str}'. "
"Expected format is 'sample composition, energy, "
"sample mass density or packing fraction' "
"(e.g., 'ZrO2,17.45,0.5').",
)
sample_composition = parts[0]
energy = float(parts[1])
mass_density_or_packing_fraction = float(parts[2])
return sample_composition, energy, mass_density_or_packing_fraction
def _set_theoretical_mud_from_density(args):
"""Theoretical estimation of mu*D from sample composition, energy,
and sample mass density."""
args = normalize_wavelength(args)
if args.wavelength is None:
args = load_wavelength_from_config_file(args)
energy = 12.398 / args.wavelength
args.energy = energy
args.mud = (
compute_mu_using_xraydb(
args.sample_composition,
args.energy,
sample_mass_density=args.sample_mass_density,
)
* args.diameter
)
return args
def _set_theoretical_mud_from_packing(args):
"""Theoretical estimation of mu*D from sample composition, energy,
and packing fraction."""
sample_composition, energy, packing_fraction = _parse_theoretical_input(
args.theoretical_from_packing
)
args.sample_composition = sample_composition
args.energy = energy
args.packing_fraction = packing_fraction
args.mud = (
compute_mu_using_xraydb(
args.sample_composition,
args.energy,
packing_fraction=args.packing_fraction,
)
* args.diameter
)
return args
[docs]
def set_mud(args):
"""Compute and set mu*D based on the selected method.
Options include:
1. Manually entering a value.
2. Estimating from a z-scan file.
3. Estimating theoretically based on sample mass density.
4. Estimating theoretically based on packing fraction.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated arguments with mu*D.
"""
if args.command == "mud":
return args
if args.command == "zscan":
return _set_mud_from_zscan(args)
if args.command == "sample":
return _set_theoretical_mud_from_density(args)
return args
def _load_key_value_pair(s):
items = s.split("=")
key = items[0].strip()
if len(items) > 1:
value = "=".join(items[1:])
return key, value
[docs]
def load_user_info(args):
"""Load user info into args. If none is provided, call
check_and_build_global_config function from diffpy.utils to prompt
the user for inputs. Otherwise, call get_user_info with the provided
arguments.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated argparse Namespace
with username, email, and orcid inserted.
"""
if args.username is None or args.email is None:
check_and_build_global_config()
config = get_user_info(
owner_name=args.username,
owner_email=args.email,
owner_orcid=args.orcid,
)
args.username = config.get("owner_name")
args.email = config.get("owner_email")
args.orcid = config.get("owner_orcid")
return args
[docs]
def load_package_info(args):
"""Load diffpy.labpdfproc package name and version into args using
get_package_info function from diffpy.utils.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated argparse Namespace
with diffpy.labpdfproc name and version inserted.
"""
metadata = get_package_info("diffpy.labpdfproc")
setattr(args, "package_info", metadata["package_info"])
return args
[docs]
def preprocessing_args(args):
"""Perform preprocessing on the provided args. The process includes
loading package and user information, setting input, output,
wavelength, anode type, xtype, mu*D, and loading user metadata.
Parameters
----------
args : argparse.Namespace
The arguments from the parser.
Returns
-------
args : argparse.Namespace
The updated argparse Namespace with arguments preprocessed.
"""
args = load_wavelength_from_config_file(args)
args = set_mud(args)
args = set_input_lists(args)
args = set_output_directory(args)
args = set_wavelength(args)
args = set_xtype(args)
args = load_user_metadata(args)
args = load_user_info(args)
args = load_package_info(args)
return args
# Update load_metadata to use 'input_directory' consistently: