import numdifftools
import numpy as np
from diffpy.snmf.containers import ComponentSignal
from diffpy.snmf.factorizers import lsqnonneg
from diffpy.snmf.optimizers import get_weights
[docs]
def initialize_components(number_of_components, number_of_signals, grid_vector):
"""Initializes ComponentSignals for each of the components in the decomposition.
Parameters
----------
number_of_components: int
The number of component signals in the NMF decomposition
number_of_signals: int
grid_vector: 1d array
The grid of the user provided signals.
Returns
-------
tuple of ComponentSignal objects
The tuple containing `number_of_components` of initialized ComponentSignal objects.
"""
if number_of_components <= 0:
raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1.")
components = list()
for component in range(number_of_components):
component = ComponentSignal(grid_vector, number_of_signals, component)
components.append(component)
return tuple(components)
[docs]
def lift_data(data_input, lift=1):
"""Lifts values of data_input.
Adds 'lift' * the minimum value in data_input to data_input element-wise.
Parameters
----------
data_input: 2d array like
The matrix containing a series of signals to be decomposed. Has dimensions N x M where N is the length
of each signal and M is the number of signals.
lift: float
The factor representing how much to lift 'data_input'.
Returns
-------
2d array like
The matrix that contains data_input - (min(data_input) * lift).
"""
data_input = np.asarray(data_input)
return data_input + np.abs(np.min(data_input) * lift)
[docs]
def construct_stretching_matrix(components, number_of_components, number_of_signals):
"""Constructs the stretching factor matrix.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the component signals in ComponentSignal objects.
number_of_signals: int
The number of signals in the data provided by the user.
Returns
-------
2d array
The matrix containing the stretching factors for the component signals for each of the signals in the
raw data. Has dimensions `component_signal` x `number_of_signals`
"""
if (len(components)) == 0:
raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1.")
number_of_components = len(components)
if number_of_signals <= 0:
raise ValueError(f"Number of signals = {number_of_signals}. Number_of_signals must be >= 1.")
stretching_factor_matrix = np.zeros((number_of_components, number_of_signals))
for i, component in enumerate(components):
stretching_factor_matrix[i] = component.stretching_factors
return stretching_factor_matrix
[docs]
def construct_component_matrix(components):
"""Constructs the component matrix.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the component signals in ComponentSignal objects.
Returns
-------
2d array
The matrix containing the component signal values. Has dimensions `signal_length` x `number_of_components`.
"""
signal_length = len(components[0].iq)
number_of_components = len(components)
if signal_length == 0:
raise ValueError(f"Signal length = {signal_length}. Signal length must be >= 1")
if number_of_components == 0:
raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1")
component_matrix = np.zeros((number_of_components, signal_length))
for i, component in enumerate(components):
component_matrix[i] = component.iq
return component_matrix
[docs]
def construct_weight_matrix(components):
"""Constructs the weights matrix.
Constructs a ΔΆ x M matrix where K is the number of components and M is the
number of signals. Each element is the stretching factor for a specific
weights for a specific signal from the data input.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the component signals.
Returns
-------
2d array like
The 2d array containing the weightings for each component for each signal.
"""
number_of_components = len(components)
number_of_signals = len(components[0].weights)
if number_of_components == 0:
raise ValueError(f"Number of components = {number_of_components}. Number of components must be >= 1")
if number_of_signals == 0:
raise ValueError(f"Number of signals = {number_of_signals}. Number_of_signals must be >= 1.")
weights_matrix = np.zeros((number_of_components, number_of_signals))
for i, component in enumerate(components):
weights_matrix[i] = component.weights
return weights_matrix
[docs]
def update_weights(components, data_input, method=None):
"""Updates the weights matrix.
Updates the weights matrix and the weights vector for each ComponentSignal object.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the component signals.
method: str
The string specifying which method should be used to find a new weight matrix: non-negative least squares
or a quadratic program.
data_input: 2d array
The 2d array containing the user-provided signals.
Returns
-------
2d array
The 2d array containing the weight factors for each component for each signal from `data_input`.
Has dimensions K x M where K is the number of components and M is the number of signals in `data_input.`
"""
data_input = np.asarray(data_input)
weight_matrix = construct_weight_matrix(components)
number_of_signals = len(components[0].weights)
number_of_components = len(components)
signal_length = len(components[0].grid)
for signal in range(number_of_signals):
stretched_components = np.zeros((signal_length, number_of_components))
for i, component in enumerate(components):
stretched_components[:, i] = component.apply_stretch(signal)[0]
if method == "align":
weights = lsqnonneg(stretched_components, data_input[:, signal])
else:
weights = get_weights(
stretched_components.T @ stretched_components,
-stretched_components.T @ data_input[:, signal],
0,
1,
)
weight_matrix[:, signal] = weights
return weight_matrix
[docs]
def reconstruct_signal(components, signal_idx):
"""Reconstructs a specific signal from its weighted and stretched components.
Calculates the linear combination of stretched components where each term is the stretched component multiplied
by its weight factor.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the ComponentSignal objects
signal_idx: int
The index of the specific signal in the input data to be reconstructed
Returns
-------
1d array like
The reconstruction of a signal from calculated weights, stretching factors, and iq values.
"""
signal_length = len(components[0].grid)
reconstruction = np.zeros(signal_length)
for component in components:
stretched = component.apply_stretch(signal_idx)[0]
stretched_and_weighted = component.apply_weight(signal_idx, stretched)
reconstruction += stretched_and_weighted
return reconstruction
[docs]
def initialize_arrays(number_of_components, number_of_moments, signal_length):
"""Generates the initial guesses for the weight, stretching, and component matrices.
Calculates the initial guesses for the component matrix, stretching factor matrix, and weight matrix. The
initial guess for the component matrix is a random (signal_length) x (number_of_components) matrix where
each element is between 0 and 1. The initial stretching factor matrix is a random
(number_of_components) x (number_of_moments) matrix where each element is number slightly perturbed from 1.
The initial weight matrix guess is a random (number_of_components) x (number_of_moments) matrix where
each element is between 0 and 1.
Parameters
----------
number_of_components: int
The number of component signals to obtain from the stretched nmf decomposition.
number_of_moments: int
The number of signals in the user provided dataset where each signal is at a different moment.
signal_length: int
The length of each signal in the user provided dataset.
Returns
-------
tuple of 2d arrays of floats
The tuple containing three elements: the initial component matrix guess, the initial stretching factor matrix
guess, and the initial weight factor matrix guess in that order.
"""
component_matrix_guess = np.random.rand(signal_length, number_of_components)
weight_matrix_guess = np.random.rand(number_of_components, number_of_moments)
stretching_matrix_guess = (
np.ones(number_of_components, number_of_moments)
+ np.random.randn(number_of_components, number_of_moments) * 1e-3
)
return component_matrix_guess, weight_matrix_guess, stretching_matrix_guess
[docs]
def objective_function(
residual_matrix, stretching_factor_matrix, smoothness, smoothness_term, component_matrix, sparsity
):
"""Defines the objective function of the algorithm and returns its value.
Calculates the value of '(||residual_matrix||_F) ** 2 + smoothness * (||smoothness_term *
stretching_factor_matrix.T||)**2 + sparsity * sum(component_matrix ** .5)' and returns its value.
Parameters
----------
residual_matrix: 2d array like
The matrix where each column is the difference between an experimental PDF/XRD pattern and a calculated
PDF/XRD pattern at each grid point. Has dimensions R x M where R is the length of each pattern and M is
the amount of patterns.
stretching_factor_matrix: 2d array like
The matrix containing the stretching factors of the calculated component signal. Has dimensions K x M where
K is the amount of components and M is the number of experimental PDF/XRD patterns.
smoothness: float
The coefficient of the smoothness term which determines the intensity of the smoothness term and its
behavior. It is not very sensitive and is usually adjusted by multiplying it by ten.
smoothness_term: 2d array like
The regularization term that ensures that smooth changes in the component stretching signals are favored.
Has dimensions (M-2) x M where M is the amount of experimentally obtained PDF/XRD patterns, the moment
amount.
component_matrix: 2d array like
The matrix containing the calculated component signals of the experimental PDF/XRD patterns. Has dimensions
R x K where R is the signal length and K is the number of component signals.
sparsity: float
The parameter determining the intensity of the sparsity regularization term which enables the algorithm to
exploit the sparse nature of XRD data. It is usually adjusted by doubling.
Returns
-------
float
The value of the objective function.
"""
residual_matrix = np.asarray(residual_matrix)
stretching_factor_matrix = np.asarray(stretching_factor_matrix)
component_matrix = np.asarray(component_matrix)
return (
0.5 * np.linalg.norm(residual_matrix, "fro") ** 2
+ 0.5 * smoothness * np.linalg.norm(smoothness_term @ stretching_factor_matrix.T, "fro") ** 2
+ sparsity * np.sum(np.sqrt(component_matrix))
)
[docs]
def get_stretched_component(stretching_factor, component, signal_length):
"""Applies a stretching factor to a component signal.
Computes a stretched signal and reinterpolates it onto the original grid of points. Uses a normalized grid
of evenly spaced integers counting from 0 to signal_length (exclusive) to approximate values in between grid
nodes. Once this grid is stretched, values at grid nodes past the unstretched signal's domain are set to zero.
Returns the approximate values of x(r/a) from x(r) where x is a component signal.
Parameters
----------
stretching_factor: float
The stretching factor of a component signal at a particular moment.
component: 1d array like
The calculated component signal without stretching or weighting. Has length N, the length of the signal.
signal_length: int
The length of the component signal.
Returns
-------
tuple of 1d array of floats
The calculated component signal with stretching factors applied. Has length N, the length of the unstretched
component signal. Also returns the gradient and hessian of the stretching transformation.
"""
component = np.asarray(component)
normalized_grid = np.arange(signal_length)
def stretched_component_func(stretching_factor):
return np.interp(normalized_grid / stretching_factor, normalized_grid, component, left=0, right=0)
derivative_func = numdifftools.Derivative(stretched_component_func)
second_derivative_func = numdifftools.Derivative(derivative_func)
stretched_component = stretched_component_func(stretching_factor)
stretched_component_gra = derivative_func(stretching_factor)
stretched_component_hess = second_derivative_func(stretching_factor)
return (
np.asarray(stretched_component),
np.asarray(stretched_component_gra),
np.asarray(stretched_component_hess),
)
[docs]
def update_weights_matrix(
component_amount,
signal_length,
stretching_factor_matrix,
component_matrix,
data_input,
moment_amount,
weights_matrix,
method,
):
"""Update the weight factors matrix.
Parameters
----------
component_amount: int
The number of component signals the user would like to determine from the experimental data.
signal_length: int
The length of the experimental signal patterns
stretching_factor_matrix: 2d array like
The matrx containing the stretching factors of the calculated component signals. Has dimensions K x M
where K is the number of component signals and M is the number of XRD/PDF patterns.
component_matrix: 2d array lik
The matrix containing the unstretched calculated component signals. Has dimensions N x K where N is the
length of the signals and K is the number of component signals.
data_input: 2d array like
The experimental series of PDF/XRD patterns. Has dimensions N x M where N is the length of the PDF/XRD
signals and M is the number of PDF/XRD patterns.
moment_amount: int
The number of PDF/XRD patterns from the experimental data.
weights_matrix: 2d array like
The matrix containing the weights of the stretched component signals. Has dimensions K x M where K is
the number of component signals and M is the number of XRD/PDF patterns.
method: str
The string specifying the method for obtaining individual weights.
Returns
-------
2d array like
The matrix containing the new weight factors of the stretched component signals.
"""
stretching_factor_matrix = np.asarray(stretching_factor_matrix)
component_matrix = np.asarray(component_matrix)
data_input = np.asarray(data_input)
weights_matrix = np.asarray(weights_matrix)
weight = np.zeros(component_amount)
for i in range(moment_amount):
stretched_components = np.zeros((signal_length, component_amount))
for n in range(component_amount):
stretched_components[:, n] = get_stretched_component(
stretching_factor_matrix[n, i], component_matrix[:, n], signal_length
)[0]
if method == "align":
weight = lsqnonneg(stretched_components[0:signal_length, :], data_input[0:signal_length, i])
else:
weight = get_weights(
stretched_components[0:signal_length, :].T @ stretched_components[0:signal_length, :],
-1 * stretched_components[0:signal_length, :].T @ data_input[0:signal_length, i],
0,
1,
)
weights_matrix[:, i] = weight
return weights_matrix
[docs]
def get_residual_matrix(
component_matrix, weights_matrix, stretching_matrix, data_input, moment_amount, component_amount, signal_length
):
"""Obtains the residual matrix between the experimental data and calculated data.
Calculates the difference between the experimental data and the reconstructed experimental data created from
the calculated components, weights, and stretching factors. For each experimental pattern, the stretched and
weighted components making up that pattern are subtracted.
Parameters
----------
component_matrix: 2d array like
The matrix containing the calculated component signals. Has dimensions N x K where N is the length of the
signal and K is the number of calculated component signals.
weights_matrix: 2d array like
The matrix containing the calculated weights of the stretched component signals. Has dimensions K x M where
K is the number of components and M is the number of moments or experimental PDF/XRD patterns.
stretching_matrix: 2d array like
The matrix containing the calculated stretching factors of the calculated component signals. Has dimensions
K x M where K is the number of components and M is the number of moments or experimental PDF/XRD patterns.
data_input: 2d array like
The matrix containing the experimental PDF/XRD data. Has dimensions N x M where N is the length of the
signals and M is the number of signal patterns.
moment_amount: int
The number of patterns in the experimental data. Represents the number of moments in time in the data series
component_amount: int
The number of component signals the user would like to obtain from the experimental data.
signal_length: int
The length of the signals in the experimental data.
Returns
-------
2d array like
The matrix containing the residual between the experimental data and reconstructed data from calculated
values. Has dimensions N x M where N is the signal length and M is the number of moments. Each column
contains the difference between an experimental signal and a reconstruction of that signal from the
calculated weights, components, and stretching factors.
"""
component_matrix = np.asarray(component_matrix)
weights_matrix = np.asarray(weights_matrix)
stretching_matrix = np.asarray(stretching_matrix)
data_input = np.asarray(data_input)
residual_matrx = -1 * data_input
for m in range(moment_amount):
residual = residual_matrx[:, m]
for k in range(component_amount):
residual = (
residual
+ weights_matrix[k, m]
* get_stretched_component(stretching_matrix[k, m], component_matrix[:, k], signal_length)[0]
)
residual_matrx[:, m] = residual
return residual_matrx
[docs]
def reconstruct_data(components):
"""Reconstructs the `input_data` matrix.
Reconstructs the `input_data` matrix from calculated component signals, weights, and stretching factors.
Parameters
----------
components: tuple of ComponentSignal objects
The tuple containing the component signals.
Returns
-------
2d array
The 2d array containing the reconstruction of input_data.
"""
signal_length = len(components[0].iq)
number_of_signals = len(components[0].weights)
data_reconstruction = np.zeros((signal_length, number_of_signals))
for signal in range(number_of_signals):
data_reconstruction[:, signal] = reconstruct_signal(components, signal)
return data_reconstruction