"""
This module contains functions for calculating the cross-correlation and time
lag between intensity cubes.
Useful for understanding time variability in EUV light curves.
"""
from typing import Optional
import astropy.units as u
import numpy as np
DASK_INSTALLED = False
try:
import dask.array # do this here so that Dask is not a hard requirement
DASK_INSTALLED = True
except ImportError:
pass
__all__ = [
"cross_correlation",
"get_lags",
"max_cross_correlation",
"time_lag",
]
[docs]
@u.quantity_input
def get_lags(time: u.s):
"""
Convert an array of evenly spaced times to an array of time lags evenly
spaced between ``-max(time)`` and ``max(time)``.
"""
delta_t = np.diff(time)
if not np.allclose(delta_t, delta_t[0]):
raise ValueError("Times must be evenly sampled")
delta_t = delta_t.cumsum(axis=0)
return np.hstack([-delta_t[::-1], np.array([0]), delta_t])
[docs]
@u.quantity_input
def cross_correlation(signal_a, signal_b, lags: u.s):
r"""
Compute cross-correlation between two signals, as a function of lag.
By the convolution theorem the cross-correlation between two signals
can be computed as,
.. math::
\mathcal{C}_{AB}(\tau) &= \mathcal{I}_A(t)\star\mathcal{I}_B(t) \\
&= \mathcal{I}_A(-t)\ast\mathcal{I}_B(t) \\
&= \mathscr{F}^{-1}\{\mathscr{F}\{\mathcal{I}_A(-t)\}\mathscr{F}\{\mathcal{I}_B(t)\}\}
where each signal has been centered and scaled by its mean and standard
deviation,
.. math::
\mathcal{I}_c(t)=\frac{I_c(t)-\bar{I}_c}{\sigma_{c}}
Additionally, :math:`\mathcal{C}_{AB}` is normalized by the length of
the time series.
Parameters
-----------
signal_a : array-like
The first dimension should correspond to the time dimension
and must have length ``(len(lags) + 1)/2``.
signal_b : array-like
Must have the same dimensions as ``signal_a``.
lags : `~astropy.units.Quantity`
Evenly spaced time lags corresponding to the time dimension of
``signal_a`` and ``signal_b`` running from ``-max(time)`` to
``max(time)``. This is easily constructed using :func:`get_lags`
Returns
-------
array-like
Cross-correlation as a function of ``lags``. The first dimension will be
the same as that of ``lags`` and the subsequent dimensions will be
consistent with dimensions of ``signal_a`` and ``signal_b``.
See Also
---------
get_lags
time_lag
max_cross_correlation
References
-----------
* https://en.wikipedia.org/wiki/Convolution_theorem
* Viall, N.M. and Klimchuk, J.A.
Evidence for Widespread Cooling in an Active Region Observed with the SDO Atmospheric Imaging Assembly
ApJ, 753, 35, 2012
(https://doi.org/10.1088/0004-637X/753/1/35)
* Appendix C in Barnes, W.T., Bradshaw, S.J., Viall, N.M.
Understanding Heating in Active Region Cores through Machine Learning. I. Numerical Modeling and Predicted Observables
ApJ, 880, 56, 2019
(https://doi.org/10.3847/1538-4357/ab290c)
"""
# NOTE: it is assumed that the arrays have already been appropriately
# interpolated and chunked (if using Dask)
delta_lags = np.diff(lags)
if not u.allclose(delta_lags, delta_lags[0]):
raise ValueError("Lags must be evenly sampled")
n_time = (lags.shape[0] + 1) // 2
if signal_a.shape != signal_b.shape:
raise ValueError("Signals must have same shape.")
if signal_a.shape[0] != n_time:
raise ValueError("First dimension of signal must be equal in length to time array.")
# Reverse the first timeseries
signal_a = signal_a[::-1]
# Normalize by mean and standard deviation
fill_value = signal_a.max()
std_a = signal_a.std(axis=0)
# Avoid dividing by zero by replacing with some non-zero dummy value. Note that
# what this value is does not matter as it will be multiplied by zero anyway
# since std_dev == 0 any place that signal - signal_mean == 0. We use the max
# of the signal as the fill_value in order to support Quantities.
std_a = np.where(std_a == 0, fill_value, std_a)
v_a = (signal_a - signal_a.mean(axis=0)[np.newaxis]) / std_a[np.newaxis]
std_b = signal_b.std(axis=0)
std_b = np.where(std_b == 0, fill_value, std_b)
v_b = (signal_b - signal_b.mean(axis=0)[np.newaxis]) / std_b[np.newaxis]
# Cross-correlation is inverse of product of FFTS (by convolution theorem)
fft_a = np.fft.rfft(v_a, axis=0, n=lags.shape[0])
fft_b = np.fft.rfft(v_b, axis=0, n=lags.shape[0])
cc = np.fft.irfft(fft_a * fft_b, axis=0, n=lags.shape[0])
# Normalize by the length of the timeseries
return cc / signal_a.shape[0]
def _get_bounds_indices(lags, bounds):
# The start and stop indices are computed in this way
# because Dask does not like "fancy" multidimensional indexing
start = 0
stop = lags.shape[0] + 1
if bounds is not None:
(indices,) = np.where(np.logical_and(lags >= bounds[0], lags <= bounds[1]))
start = indices[0]
stop = indices[-1] + 1
return start, stop
def _dask_check(lags, indices):
# In order for the time lag to be returned as a Dask array, the lags array,
# which is, in general, a Quantity, must also be a Dask array.
# This function is needed for two reasons:
# 1. astropy.units.Quantity do not play nice with each other and their behavior
# seems to vary from one numpy version to the next. To avoid this ill-defined
# behavior, we will do all of our Dask-ing on a Dask array created from the
# bare numpy array and re-attach the units at the end.
# 2. Dask arrays do not like "fancy" multidimensional indexing. Therefore, we must
# flatten the indices first and then reshape the time lag array in order to
# preserve the laziness of the array evaluation.
if DASK_INSTALLED and isinstance(indices, dask.array.Array):
lags_lazy = dask.array.from_array(lags.value, chunks=lags.shape)
lags_unit = lags.unit
return lags_lazy[indices.flatten()].reshape(indices.shape) * lags_unit
return lags[indices]
[docs]
@u.quantity_input
def time_lag(signal_a, signal_b, time: u.s, lag_bounds: Optional[u.Quantity[u.s]] = None, **kwargs):
r"""
Compute the time lag that maximizes the cross-correlation between
``signal_a`` and ``signal_b``.
For a pair of signals :math:`A,B`, e.g. time series from two EUV channels
on AIA, the time lag is the lag which maximizes the cross-correlation,
.. math::
\tau_{AB} = \mathop{\mathrm{arg\,max}}_{\tau}\mathcal{C}_{AB},
where :math:`\mathcal{C}_{AB}` is the cross-correlation as a function of
lag (computed in :func:`cross_correlation`). Qualitatively, this can be
thought of as how much `signal_a` needs to be shifted in time to best
"match" `signal_b`. Note that the sign of :math:`\tau_{AB}`` is determined
by the ordering of the two signals such that,
.. math::
\tau_{AB} = -\tau_{BA}.
Parameters
----------
signal_a : array-like
The first dimension must be the same length as ``time``.
signal_b : array-like
Must have the same dimensions as ``signal_a``.
time : `~astropy.units.Quantity`
Time array corresponding to the intensity time series
``signal_a`` and ``signal_b``.
lag_bounds : `~astropy.units.Quantity`, optional
Minimum and maximum lag to consider when finding the time
lag that maximizes the cross-correlation. This is useful
for minimizing boundary effects.
Other Parameters
----------------
array_check_hook : function
Function to apply to the resulting time lag result. This should take in the
`lags` array and the indices that specify the location of the maximum of the
cross-correlation and return an array that has used those indices to select
the `lags` which maximize the cross-correlation. As an example, if `lags`
and `indices` are both `~numpy.ndarray` objects, this would just return
`lags[indices]`. It is probably only necessary to specify this if you
are working with arrays that are something other than a `~numpy.ndarray`
or `~dask.array.Array` object.
Returns
-------
array-like
Lag which maximizes the cross-correlation. The dimensions will be
consistent with those of ``signal_a`` and ``signal_b``, i.e. if the
input arrays are of dimension ``(K,M,N)``, the resulting array
will have dimensions ``(M,N)``. Similarly, if the input signals
are one-dimensional time series ``(K,)``, the result will have
dimension ``(1,)``.
References
----------
* Viall, N.M. and Klimchuk, J.A.
Evidence for Widespread Cooling in an Active Region Observed with the SDO Atmospheric Imaging Assembly
ApJ, 753, 35, 2012
(https://doi.org/10.1088/0004-637X/753/1/35)
"""
array_check = kwargs.get("array_check_hook", _dask_check)
lags = get_lags(time)
cc = cross_correlation(signal_a, signal_b, lags)
start, stop = _get_bounds_indices(lags, lag_bounds)
i_max_cc = cc[start:stop].argmax(axis=0)
return array_check(lags[start:stop], i_max_cc)
[docs]
@u.quantity_input
def max_cross_correlation(signal_a, signal_b, time: u.s, lag_bounds: Optional[u.Quantity[u.s]] = None):
"""
Compute the maximum value of the cross-correlation between ``signal_a`` and
``signal_b``.
This is the maximum value of the cross-correlation as a function of
lag (computed in :func:`cross_correlation`). This will always be between
-1 (perfectly anti-correlated) and +1 (perfectly correlated) though
in practice is nearly always between 0 and +1.
Parameters
----------
signal_a : array-like
The first dimension must be the same length as ``time``.
signal_b : array-like
Must have the same dimensions as ``signal_a``.
time : array-like
Time array corresponding to the intensity time series
``signal_a`` and ``signal_b``.
lag_bounds : `tuple`, optional
Minimum and maximum lag to consider when finding the time
lag that maximizes the cross-correlation. This is useful
for minimizing boundary effects.
Returns
-------
array-like
Maximum value of the cross-correlation. The dimensions will be
consistent with those of ``signal_a`` and ``signal_b``, i.e. if the
input arrays are of dimension ``(K,M,N)``, the resulting array
will have dimensions ``(M,N)``. Similarly, if the input signals
are one-dimensional time series ``(K,)``, the result will have
dimension ``(1,)``.
References
----------
* Viall, N.M. and Klimchuk, J.A.
Evidence for Widespread Cooling in an Active Region Observed with the SDO Atmospheric Imaging Assembly
ApJ, 753, 35, 2012
(https://doi.org/10.1088/0004-637X/753/1/35)
"""
lags = get_lags(time)
cc = cross_correlation(signal_a, signal_b, lags)
start, stop = _get_bounds_indices(lags, lag_bounds)
return cc[start:stop].max(axis=0)