Source code for diffpy.utils.diffraction_objects

import datetime
import uuid
import warnings
from copy import deepcopy

import numpy as np

from diffpy.utils.tools import get_package_info
from diffpy.utils.transforms import d_to_q, d_to_tth, q_to_d, q_to_tth, tth_to_d, tth_to_q

QQUANTITIES = ["q"]
ANGLEQUANTITIES = ["angle", "tth", "twotheta", "2theta"]
DQUANTITIES = ["d", "dspace"]
XQUANTITIES = ANGLEQUANTITIES + DQUANTITIES + QQUANTITIES
XUNITS = ["degrees", "radians", "rad", "deg", "inv_angs", "inv_nm", "nm-1", "A-1"]

x_values_not_equal_emsg = (
    "The two objects have different values in x arrays (my_do.all_arrays[:, [1, 2, 3]]). "
    "Please ensure the x values of the two objects are identical by re-instantiating "
    "the DiffractionObject with the correct x value inputs."
)

invalid_add_type_emsg = (
    "You may only add a DiffractionObject with another DiffractionObject or a scalar value. "
    "Please rerun by adding another DiffractionObject instance or a scalar value. "
    "e.g., my_do_1 + my_do_2 or my_do + 10 or 10 + my_do"
)


def _xtype_wmsg(xtype):
    return (
        f"I don't know how to handle the xtype, '{xtype}'. "
        f"Please rerun specifying an xtype from {*XQUANTITIES, }"
    )


def _setter_wmsg(attribute):
    return (
        f"Direct modification of attribute '{attribute}' is not allowed. "
        f"Please use 'input_data' to modify '{attribute}'.",
    )


[docs] class DiffractionObject: """Class for storing and manipulating diffraction data. DiffractionObject stores data produced from X-ray, neutron, and electron scattering experiments. The object can transform between different scattering quantities such as q (scattering vector), 2θ (two-theta angle), and d (interplanar spacing), and perform various operations like scaling, addition, subtraction, and comparison for equality between diffraction objects. Attributes ---------- scat_quantity : str The type of scattering experiment (e.g., "x-ray", "neutron"). Default is an empty string "". wavelength : float The wavelength of the incoming beam, specified in angstroms (Å). Default is none. name: str The name or label for the scattering data. Default is an empty string "". qmin : float The minimum q value. qmax : float The maximum q value. tthmin : float The minimum two-theta value. tthmax : float The maximum two-theta value. dmin : float The minimum d-spacing value. dmax : float The maximum d-spacing value. """ def __init__( self, xarray, yarray, xtype, wavelength=None, scat_quantity="", name="", metadata={}, ): """Initialize a DiffractionObject instance. Parameters ---------- xarray : ndarray The independent variable array containing "q", "tth", or "d" values. yarray : ndarray The dependent variable array corresponding to intensity values. xtype : str The type of the independent variable in `xarray`. Must be one of {*XQUANTITIES}. wavelength : float, optional, default is None. The wavelength of the incoming beam, specified in angstroms (Å) scat_quantity : str, optional, default is an empty string "". The type of scattering experiment (e.g., "x-ray", "neutron"). name : str, optional, default is an empty string "". The name or label for the scattering data. metadata : dict, optional, default is an empty dictionary {} The additional metadata associated with the diffraction object. Examples -------- Create a DiffractionObject for X-ray scattering data: >>> import numpy as np >>> from diffpy.utils.diffraction_objects import DiffractionObject ... >>> x = np.array([0.12, 0.24, 0.31, 0.4]) # independent variable (e.g., q) >>> y = np.array([10, 20, 40, 60]) # intensity values >>> metadata = { ... "sample": "rock salt from the beach", ... "composition": "NaCl", ... "temperature": "300 K,", ... "experimenters": "Phill, Sally" ... } >>> do = DiffractionObject( ... xarray=x, ... yarray=y, ... xtype="q", ... wavelength=1.54, ... scat_quantity="x-ray", ... name="beach_rock_salt_1", ... metadata=metadata ... ) >>> print(do.metadata) """ self._uuid = uuid.uuid4() self._input_data(xarray, yarray, xtype, wavelength, scat_quantity, name, metadata) def _input_data(self, xarray, yarray, xtype, wavelength, scat_quantity, name, metadata): if xtype not in XQUANTITIES: raise ValueError(_xtype_wmsg(xtype)) if len(xarray) != len(yarray): raise ValueError( "'xarray' and 'yarray' are different lengths. They must " "correspond to each other and have the same length. " "Please re-initialize 'DiffractionObject'" "with valid 'xarray' and 'yarray's" ) self.scat_quantity = scat_quantity self.wavelength = wavelength self.metadata = metadata self.name = name self._input_xtype = xtype self._set_arrays(xarray, yarray, xtype) self._set_min_max_xarray() def __eq__(self, other): if not isinstance(other, DiffractionObject): return NotImplemented self_attributes = [key for key in self.__dict__ if not key.startswith("_")] other_attributes = [key for key in other.__dict__ if not key.startswith("_")] if not sorted(self_attributes) == sorted(other_attributes): return False for key in self_attributes: value = getattr(self, key) other_value = getattr(other, key) if isinstance(value, float): if ( not (value is None and other_value is None) and (value is None) or (other_value is None) or not np.isclose(value, other_value, rtol=1e-5) ): return False elif isinstance(value, list) and all(isinstance(i, np.ndarray) for i in value): if not all(np.allclose(i, j, rtol=1e-5) for i, j in zip(value, other_value)): return False else: if value != other_value: return False return True def __add__(self, other): """Add a scalar value or another DiffractionObject to the yarray of the DiffractionObject. Parameters ---------- other : DiffractionObject, int, or float The item to be added. If `other` is a scalar value, this value will be added to each element of the yarray of this DiffractionObject instance. If `other` is another DiffractionObject, the yarrays of the two DiffractionObjects will be combined element-wise. The result is a new DiffractionObject instance, representing the addition and using the xarray from the left-hand side DiffractionObject. Returns ------- DiffractionObject The new DiffractionObject instance with modified yarray values. This instance is a deep copy of the original with the additions applied. Raises ------ ValueError Raised when the xarrays of two DiffractionObject instances are not equal. TypeError Raised when `other` is not an instance of DiffractionObject, int, or float. Examples -------- Add a scalar value to the yarray of a DiffractionObject instance: >>> new_do = my_do + 10.1 >>> new_do = 10.1 + my_do Combine the yarrays of two DiffractionObject instances: >>> new_do = my_do_1 + my_do_2 """ self._check_operation_compatibility(other) summed_do = deepcopy(self) if isinstance(other, (int, float)): summed_do._all_arrays[:, 0] += other if isinstance(other, DiffractionObject): summed_do._all_arrays[:, 0] += other.all_arrays[:, 0] return summed_do __radd__ = __add__ def __sub__(self, other): """Subtract scalar value or another DiffractionObject to the yarray of the DiffractionObject. This method behaves similarly to the `__add__` method, but performs subtraction instead of addition. For details on parameters, returns, and exceptions, refer to the documentation for `__add__`. Examples -------- Subtract a scalar value from the yarray of a DiffractionObject instance: >>> new_do = my_do - 10.1 Subtract the yarrays of two DiffractionObject instances: >>> new_do = my_do_1 - my_do_2 """ self._check_operation_compatibility(other) subtracted_do = deepcopy(self) if isinstance(other, (int, float)): subtracted_do._all_arrays[:, 0] -= other if isinstance(other, DiffractionObject): subtracted_do._all_arrays[:, 0] -= other.all_arrays[:, 0] return subtracted_do __rsub__ = __sub__ def __mul__(self, other): """Multiply a scalar value or another DiffractionObject with the yarray of this DiffractionObject. This method behaves similarly to the `__add__` method, but performs multiplication instead of addition. For details on parameters, returns, and exceptions, refer to the documentation for `__add__`. Examples -------- Multiply a scalar value with the yarray of a DiffractionObject instance: >>> new_do = my_do * 3.5 Multiply the yarrays of two DiffractionObject instances: >>> new_do = my_do_1 * my_do_2 """ self._check_operation_compatibility(other) multiplied_do = deepcopy(self) if isinstance(other, (int, float)): multiplied_do._all_arrays[:, 0] *= other if isinstance(other, DiffractionObject): multiplied_do._all_arrays[:, 0] *= other.all_arrays[:, 0] return multiplied_do __rmul__ = __mul__ def __truediv__(self, other): """Divide the yarray of this DiffractionObject by a scalar value or another DiffractionObject. This method behaves similarly to the `__add__` method, but performs division instead of addition. For details on parameters, returns, and exceptions, refer to the documentation for `__add__`. Examples -------- Divide the yarray of a DiffractionObject instance by a scalar value: >>> new_do = my_do / 2.0 Divide the yarrays of two DiffractionObject instances: >>> new_do = my_do_1 / my_do_2 """ self._check_operation_compatibility(other) divided_do = deepcopy(self) if isinstance(other, (int, float)): divided_do._all_arrays[:, 0] /= other if isinstance(other, DiffractionObject): divided_do._all_arrays[:, 0] /= other.all_arrays[:, 0] return divided_do __rtruediv__ = __truediv__ def _check_operation_compatibility(self, other): if not isinstance(other, (DiffractionObject, int, float)): raise TypeError(invalid_add_type_emsg) if isinstance(other, DiffractionObject): if self.all_arrays.shape != other.all_arrays.shape: raise ValueError(x_values_not_equal_emsg) if not np.allclose(self.all_arrays[:, [1, 2, 3]], other.all_arrays[:, [1, 2, 3]]): raise ValueError(x_values_not_equal_emsg) @property def all_arrays(self): """The 2D array containing `xarray` and `yarray` values. Returns ------- ndarray The shape (len(data), 4) 2D array with columns containing the `yarray` (intensity) and the `xarray` values in q, tth, and d. Examples -------- To access specific arrays individually, use these slices: >>> my_do.all_arrays[:, 0] # yarray >>> my_do.all_arrays[:, 1] # xarray in q >>> my_do.all_arrays[:, 2] # xarray in tth >>> my_do.all_arrays[:, 3] # xarray in d """ return self._all_arrays @all_arrays.setter def all_arrays(self, _): raise AttributeError(_setter_wmsg("all_arrays")) @property def input_xtype(self): """The type of the independent variable in `xarray`. Returns ------- input_xtype : str The type of `xarray`, which must be one of {*XQUANTITIES}. """ return self._input_xtype @input_xtype.setter def input_xtype(self, _): raise AttributeError(_setter_wmsg("input_xtype")) @property def uuid(self): """The unique identifier for the DiffractionObject instance. Returns ------- uuid : UUID The unique identifier of the DiffractionObject instance. """ return self._uuid @uuid.setter def uuid(self, _): raise AttributeError(_setter_wmsg("uuid"))
[docs] def get_array_index(self, xtype, xvalue): """Return the index of the closest value in the array associated with the specified xtype and the value provided. Parameters ---------- xtype : str The type of the independent variable in `xarray`. Must be one of {*XQUANTITIES}. xvalue : float The value of the xtype to find the closest index for. Returns ------- index : int The index of the closest value in the array associated with the specified xtype and the value provided. """ xtype = self._input_xtype xarray = self.on_xtype(xtype)[0] if len(xarray) == 0: raise ValueError(f"The '{xtype}' array is empty. Please ensure it is initialized.") index = (np.abs(xarray - xvalue)).argmin() return index
def _set_arrays(self, xarray, yarray, xtype): self._all_arrays = np.empty(shape=(len(xarray), 4)) self._all_arrays[:, 0] = yarray if xtype.lower() in QQUANTITIES: self._all_arrays[:, 1] = xarray self._all_arrays[:, 2] = q_to_tth(xarray, self.wavelength) self._all_arrays[:, 3] = q_to_d(xarray) elif xtype.lower() in ANGLEQUANTITIES: self._all_arrays[:, 2] = xarray self._all_arrays[:, 1] = tth_to_q(xarray, self.wavelength) self._all_arrays[:, 3] = tth_to_d(xarray, self.wavelength) elif xtype.lower() in DQUANTITIES: self._all_arrays[:, 3] = xarray self._all_arrays[:, 1] = d_to_q(xarray) self._all_arrays[:, 2] = d_to_tth(xarray, self.wavelength) def _set_min_max_xarray(self): self.qmin = np.nanmin(self._all_arrays[:, 1], initial=np.inf) self.qmax = np.nanmax(self._all_arrays[:, 1], initial=0.0) self.tthmin = np.nanmin(self._all_arrays[:, 2], initial=np.inf) self.tthmax = np.nanmax(self._all_arrays[:, 2], initial=0.0) self.dmin = np.nanmin(self._all_arrays[:, 3], initial=np.inf) self.dmax = np.nanmax(self._all_arrays[:, 3], initial=0.0) def _get_original_array(self): if self._input_xtype in QQUANTITIES: return self.on_q(), "q" elif self._input_xtype in ANGLEQUANTITIES: return self.on_tth(), "tth" elif self._input_xtype in DQUANTITIES: return self.on_d(), "d"
[docs] def on_q(self): """Return the tuple of two 1D numpy arrays containing q and y data. Returns ------- (q-array, y-array) : tuple of ndarray The tuple containing two 1D numpy arrays with q and y data """ return [self.all_arrays[:, 1], self.all_arrays[:, 0]]
[docs] def on_tth(self): """Return the tuple of two 1D numpy arrays containing tth and y data. Returns ------- (tth-array, y-array) : tuple of ndarray The tuple containing two 1D numpy arrays with tth and y data """ return [self.all_arrays[:, 2], self.all_arrays[:, 0]]
[docs] def on_d(self): """Return the tuple of two 1D numpy arrays containing d and y data. Returns ------- (d-array, y-array) : tuple of ndarray The tuple containing two 1D numpy arrays with d and y data """ return [self.all_arrays[:, 3], self.all_arrays[:, 0]]
[docs] def scale_to(self, target_diff_object, q=None, tth=None, d=None, offset=None): """Return a new diffraction object which is the current object but rescaled in y to the target. By default, if `q`, `tth`, or `d` are not provided, scaling is based on the max intensity from each object. Otherwise, y-value in the target at the closest specified x-value will be used as the factor to scale to. The entire array is scaled by this factor so that one object places on top of the other at that point. If multiple values of `q`, `tth`, or `d` are provided, an error will be raised. Parameters ---------- target_diff_object: DiffractionObject The diffraction object you want to scale the current one onto. q, tth, d : float, optional, default is None The value of the x-array where you want the curves to line up vertically. Specify a value on one of the allowed grids, q, tth, or d), e.g., q=10. offset : float, optional, default is None The offset to add to the scaled y-values. Returns ------- scaled_do : DiffractionObject The rescaled DiffractionObject as a new object. """ if offset is None: offset = 0 scaled_do = self.copy() count = sum([q is not None, tth is not None, d is not None]) if count > 1: raise ValueError( "You must specify none or exactly one of 'q', 'tth', or 'd'. " "Please provide either none or one value." ) if count == 0: q_target_max = max(target_diff_object.on_q()[1]) q_self_max = max(self.on_q()[1]) scaled_do._all_arrays[:, 0] = scaled_do._all_arrays[:, 0] * q_target_max / q_self_max + offset return scaled_do xtype = "q" if q is not None else "tth" if tth is not None else "d" data = self.on_xtype(xtype) target = target_diff_object.on_xtype(xtype) xvalue = q if xtype == "q" else tth if xtype == "tth" else d xindex_data = (np.abs(data[0] - xvalue)).argmin() xindex_target = (np.abs(target[0] - xvalue)).argmin() scaled_do._all_arrays[:, 0] = data[1] * target[1][xindex_target] / data[1][xindex_data] + offset return scaled_do
[docs] def on_xtype(self, xtype): """Return a tuple of two 1D numpy arrays containing x and y data. Parameters ---------- xtype : str The type of quantity for the independent variable chosen from {*XQUANTITIES, } Raises ------ ValueError Raised when the specified xtype is not among {*XQUANTITIES, } Returns ------- (xarray, yarray) : tuple of ndarray The tuple containing two 1D numpy arrays with x and y data for the specified xtype. """ if xtype.lower() in ANGLEQUANTITIES: return self.on_tth() elif xtype.lower() in QQUANTITIES: return self.on_q() elif xtype.lower() in DQUANTITIES: return self.on_d() else: raise ValueError(_xtype_wmsg(xtype))
[docs] def dump(self, filepath, xtype=None): """Dump the xarray and yarray of the diffraction object to a two-column file, with the associated information included in the header. Parameters ---------- filepath : str The filepath where the diffraction object will be dumped xtype : str, optional, default is q The type of quantity for the independent variable chosen from {*XQUANTITIES, } Examples -------- To save a diffraction object to a file named "diffraction_data.chi" in the current directory with the independent variable 'q': >>> file = "diffraction_data.chi" >>> do.dump(file, xtype="q") To save the diffraction data to a file in a subfolder `output`: >>> file = "./output/diffraction_data.chi" >>> do.dump(file, xtype="q") To save the diffraction data with a different independent variable, such as 'tth': >>> file = "diffraction_data_tth.chi" >>> do.dump(file, xtype="tth") """ if xtype is None: xtype = "q" if xtype in QQUANTITIES: data_to_save = np.column_stack((self.on_q()[0], self.on_q()[1])) elif xtype in ANGLEQUANTITIES: data_to_save = np.column_stack((self.on_tth()[0], self.on_tth()[1])) elif xtype in DQUANTITIES: data_to_save = np.column_stack((self.on_d()[0], self.on_d()[1])) else: warnings.warn(_xtype_wmsg(xtype)) self.metadata.update(get_package_info("diffpy.utils", metadata=self.metadata)) self.metadata["creation_time"] = datetime.datetime.now() with open(filepath, "w") as f: f.write( f"[DiffractionObject]\nname = {self.name}\nwavelength = {self.wavelength}\n" f"scat_quantity = {self.scat_quantity}\n" ) for key, value in self.metadata.items(): f.write(f"{key} = {value}\n") f.write("\n#### start data\n") np.savetxt(f, data_to_save, delimiter=" ")
[docs] def copy(self): """Create a deep copy of the DiffractionObject instance. Returns ------- DiffractionObject The new instance of DiffractionObject, which is a deep copy of the current instance. """ return deepcopy(self)