"""
This module contains an implementation of the Automated Swirl Detection
Algorithm (ASDA).
"""
import warnings
from itertools import product
import numpy as np
from skimage import measure
from sunkit_image.utils import calculate_gamma, points_in_poly, reform2d, remove_duplicate
__all__ = ["Asda", "Lamb_Oseen"]
[docs]
class Asda:
"""
Version 2.0 of Automated Swirl Detection Algorithm (ASDA).
References
----------
* `ASDA Source Code <https://github.com/PyDL/ASDA>`__.
"""
def __init__(self, vx, vy, r=3, factor=1):
"""
Parameters
----------
vx : `numpy.ndarray`
Velocity field in the x direction.
vy : `numpy.ndarray`
Velocity field in the y direction.
r : `int`, optional
Maximum distance of neighbor points from target point.
Default value is 3.
factor : `int`, optional
Magnify the original data to find sub-grid vortex center and boundary.
Default value is 1.
References
----------
* Laurent Graftieaux, Marc Michard and Nathalie Grosjean.
Combining PIV, POD and vortex identification algorithms for the
study of unsteady turbulent swirling flows.
Meas. Sci. Technol. 12, 1422, 2001.
(https://doi.org/10.1088/0957-0233/12/9/307)
* Jiajia Liu, Chris Nelson, Robert Erdelyi.
Automated Swirl Detection Algorithm (ASDA) and Its Application to
Simulation and Observational Data.
Astrophys. J., 872, 22, 2019.
(https://doi.org/10.3847/1538-4357/aabd34)
"""
if vx.shape != vy.shape:
raise ValueError("Shape of velocity field's vx and vy do not match")
if not isinstance(r, int):
raise ValueError("Keyword 'r' must be an integer")
if not isinstance(factor, int):
raise ValueError("Keyword 'factor' must be an integer")
self.vx = vx
self.vy = vy
self.dshape = np.shape(vx)
self.r = r
self.factor = factor
[docs]
def gen_vel(self, i, j):
"""
Given a point ``[i, j]``, generate a velocity field which contains a
region with a size of ``(2r+1) x (2r+1)`` centered at ``[i, j]`` from
the original velocity field ``self.vx`` and ``self.vy``.
Parameters
----------
i : `int`
first dimension of the pixel position of a target point.
j : `int`
second dimension of the pixel position of a target point.
Returns
-------
`numpy.ndarray`
The first dimension is a velocity field which contains a
region with a size of ``(2r+1) x (2r+1)`` centered at ``[i, j]`` from
the original velocity field ``self.vx`` and ``self.vy``.
the second dimension is similar as the first dimension, but
with the mean velocity field subtracted from the original
velocity field.
"""
vel = np.array(
[
[self.vx[i + im, j + jm], self.vy[i + im, j + jm]]
for im in np.arange(-self.r, self.r + 1)
for jm in np.arange(-self.r, self.r + 1)
],
)
return np.array([vel, vel - vel.mean(axis=0)])
[docs]
def gamma_values(self):
"""
Calculate ``gamma1`` and ``gamma2`` values of velocity field vx and vy.
Returns
-------
`tuple`
A tuple in form of ``(gamma1, gamma2)``, where ``gamma1`` is useful in
finding vortex centers and ``gamma2`` is useful in finding vortex
edges.
"""
# This part of the code was written in (x, y) order
# but numpy is in (y, x) order so we need to transpose it
self.vx = self.vx.T
self.vy = self.vy.T
if self.factor > 1:
self.vx = reform2d(self.vx, self.factor)
self.vy = reform2d(self.vy, self.factor)
self.gamma = np.array([np.zeros_like(self.vx), np.zeros_like(self.vy)]).T
# pm vectors, see equation (8) in Graftieaux et al. 2001 or Equation (1) in Liu et al. 2019
pm = np.array(
[[i, j] for i in np.arange(-self.r, self.r + 1) for j in np.arange(-self.r, self.r + 1)],
dtype=float,
)
# Mode of vector pm
pnorm = np.linalg.norm(pm, axis=1)
# Number of points in the concerned region
N = (2 * self.r + 1) ** 2
index = np.array(
[
[i, j]
for i in np.arange(self.r, self.dshape[0] - self.r)
for j in np.arange(self.r, self.dshape[1] - self.r)
],
)
index = index.T
vel = self.gen_vel(index[1], index[0])
for d, (i, j) in enumerate(
product(np.arange(self.r, self.dshape[0] - self.r, 1), np.arange(self.r, self.dshape[1] - self.r, 1)),
):
self.gamma[i, j, 0], self.gamma[i, j, 1] = calculate_gamma(pm, vel[..., d], pnorm, N)
# Transpose back vx & vy
self.vx = self.vx.T
self.vy = self.vy.T
return self.gamma
[docs]
def center_edge(self, rmin=4, gamma_min=0.89):
"""
Find all swirls from ``gamma1``, and ``gamma2``.
Parameters
----------
rmin : `int`, optional
Minimum radius of swirls, all swirls with radius less than ``rmin`` will be rejected.
Defaults to 4.
gamma_min : `float`, optional
Minimum value of ``gamma1``, all potential swirls with
peak ``gamma1`` values less than ``gamma_min`` will be rejected.
Returns
-------
`dict`
The keys and their meanings of the dictionary are:
``center`` : Center locations of vortices, in the form of ``[x, y]``.
``edge`` : Edge locations of vortices, in the form of ``[x, y]``.
``points`` : All points within vortices, in the form of ``[x, y]``.
``peak`` : Maximum/minimum gamma1 values in vortices.
``radius`` : Equivalent radius of vortices.
All results are in pixel coordinates.
"""
self.edge_prop = {"center": (), "edge": (), "points": (), "peak": (), "radius": ()}
cs = np.array(measure.find_contours(self.gamma[..., 1].T, -2 / np.pi), dtype=object)
cs_pos = np.array(measure.find_contours(self.gamma[..., 1].T, 2 / np.pi), dtype=object)
if len(cs) == 0:
cs = cs_pos
elif len(cs_pos) != 0:
cs = np.append(cs, cs_pos, 0)
for i in range(np.shape(cs)[0]):
v = np.rint(cs[i].astype(np.float32))
v = remove_duplicate(v)
# Find all points in the contour
ps = points_in_poly(v)
# gamma1 value of all points in the contour
dust = []
for p in ps:
dust.append(self.gamma[..., 0][int(p[1]), int(p[0])])
# Determine swirl properties
if len(dust) > 1:
# Effective radius
re = np.sqrt(np.array(ps).shape[0] / np.pi) / self.factor
# Only consider swirls with re >= rmin and maximum gamma1 value greater than gamma_min
if np.max(np.fabs(dust)) >= gamma_min and re >= rmin:
# Extract the index, only first dimension
idx = np.where(np.fabs(dust) == np.max(np.fabs(dust)))[0][0]
self.edge_prop["center"] += (np.array(ps[idx]) / self.factor,)
self.edge_prop["edge"] += (np.array(v) / self.factor,)
self.edge_prop["points"] += (np.array(ps) / self.factor,)
self.edge_prop["peak"] += (dust[idx],)
self.edge_prop["radius"] += (re,)
return self.edge_prop
[docs]
def vortex_property(self, image=None):
"""
Calculate expanding, rotational speed, equivalent radius and average
intensity of given swirls.
Parameters
----------
image : `numpy.ndarray`
Has to have the same shape as ``self.vx`` observational image,
which will be used to calculate the average observational values of all swirls.
Returns
-------
`tuple`
The returned tuple has four components, which are:
``ve`` : expanding speed, in the same unit as ``self.vx`` or ``self.vy``.
``vr`` : rotational speed, in the same unit as ``self.vx`` or ``self.vy``.
``vc`` : velocity of the center, in the form of ``[vx, vy]``.
``ia`` : average of the observational values within the vortices if the parameter image is given.
"""
ve, vr, vc, ia = (), (), (), ()
for i in range(len(self.edge_prop["center"])):
# Centre and edge of i-th swirl
cen = self.edge_prop["center"][i]
edg = self.edge_prop["edge"][i]
# Points of i-th swirl
pnt = np.array(self.edge_prop["points"][i], dtype=int)
# Calculate velocity of the center
vc += (
[
self.vx[int(round(cen[1])), int(round(cen[0]))],
self.vy[int(round(cen[1])), int(round(cen[0]))],
],
)
# Calculate average the observational values
if image is None:
ia += (None,)
else:
value = 0
for pos in pnt:
value += image[pos[1], pos[0]]
ia += (value / pnt.shape[0],)
ve0, vr0 = [], []
for j in range(edg.shape[0]):
# Edge position
idx = [edg[j][0], edg[j][1]]
# Eadial vector from swirl center to a point at its edge
pm = [idx[0] - cen[0], idx[1] - cen[1]]
# Tangential vector
tn = [cen[1] - idx[1], idx[0] - cen[0]]
# Velocity vector
v = [self.vx[int(idx[1]), int(idx[0])], self.vy[int(idx[1]), int(idx[0])]]
ve0.append(np.dot(v, pm) / np.linalg.norm(pm))
vr0.append(np.dot(v, tn) / np.linalg.norm(tn))
ve += (np.nanmean(ve0),)
vr += (np.nanmean(vr0),)
return ve, vr, vc, ia
[docs]
class Lamb_Oseen(Asda):
"""
Creates an Lamb Oseen vortex.
References
----------
* https://en.wikipedia.org/wiki/Lamb%E2%80%93Oseen_vortex
"""
def __init__(self, vmax=2.0, rmax=5, ratio_vradial=0, gamma=None, rcore=None, r=3, factor=1):
"""
Parameters
----------
vmax : `float`, optional
Rotating speed of the vortex, negative value for clockwise vortex.
Defaults to 2.0.
rmax : `float`, optional
Radius of of the vortex.
Defaults to 5.
ratio_vradial : `float`, optional
Ratio between expanding/shrinking speed and rotating speed.
Defaults to 0.
gamma : `float`, optional
A replacement for ``vmax`` and only used if both ``gamma`` and ``rcore``are not `None`.
Defaults to `None`.
rcore : `float`, optional
A replacement for ``rmax`` and only used if both ``gamma`` and ``rcore``are not `None`.
Defaults to `None`.
r : `int`, optional
Maximum distance of neighbor points from target point.
Default value is 3.
factor : `int`, optional
Magnify the original data to find sub-grid vortex center and boundary.
Default value is 1.
"""
if not isinstance(r, int):
raise ValueError("Keyword 'r' must be an integer")
if not isinstance(factor, int):
raise ValueError("Keyword 'factor' must be an integer")
# Alpha of Lamb Oseen vortices
self.alpha = 1.256430
self.ratio_vradial = ratio_vradial
if gamma is None or rcore is None:
if gamma != rcore:
warnings.warn("One of the input parameters is missing, setting both to 'None'", stacklevel=3)
gamma, rcore = None, None
# Radius of the position where v_theta reaches vmax
self.rmax = rmax
# Maximum value of v_theta
self.vmax = vmax
# Core radius
self.rcore = self.rmax / np.sqrt(self.alpha)
self.gamma = 2 * np.pi * self.vmax * self.rmax * (1 + 1 / (2 * self.alpha))
else:
# Radius
self.rmax = self.rcore * np.sqrt(self.alpha)
# Rotating speed
self.vmax = self.gamma / (2 * np.pi * self.rmax * (1 + 1 / (2 * self.alpha)))
# Core radius
self.rcore = rcore
self.gamma = gamma
# Calculating core speed
self.vcore = (1 - np.exp(-1.0)) * self.gamma / (2 * np.pi * self.rcore)
self.r = r
self.factor = factor
[docs]
def get_grid(self, x_range, y_range):
"""
Returns a meshgrid of the coordinates of the vortex.
Parameters
----------
x_range : `list`
Range of the x coordinates of the meshgrid.
y_range : `list`
Range of the y coordinates of the meshgrid.
Return
------
`tuple`
Contains the meshgrids generated.
"""
self.xx, self.yy = np.meshgrid(np.arange(x_range[0], x_range[1]), np.arange(y_range[0], y_range[1]))
self.dshape = np.shape(self.xx)
return self.xx, self.yy
[docs]
def get_vtheta(self, r=0):
"""
Calculate rotation speed at radius of ``r``.
Parameters
----------
r : `float`, optional
Radius which defaults to 0.
Return
------
`float`
Rotating speed at radius of ``r``.
"""
r = r + 1e-10
return self.gamma * (1.0 - np.exp(0 - np.square(r) / np.square(self.rcore))) / (2 * np.pi * r)
[docs]
def get_vradial(self, r=0):
"""
Calculate radial (expanding or shrinking) speed at radius of ``r``.
Parameters
----------
r : `float`, optional
Radius which defaults to 0.
Return
------
`float`
Radial speed at the radius of ``r``.
"""
r = r + 1e-10
return self.get_vtheta(r) * self.ratio_vradial
[docs]
def get_vxvy(self, x_range, y_range, x=None, y=None):
"""
Calculates the velocity field in a meshgrid generated with ``x_range``
and ``y_range``.
Parameters
----------
x_range : `list`
Range of the x coordinates of the meshgrid.
y_range : `list`
range of the y coordinates of the meshgrid.
x, y : `numpy.meshgrid`, optional
If both are given, ``x_range`` and ``y_range`` will be ignored.
Defaults to None``.
Return
------
`tuple`
The generated velocity field ``(vx, vy)``.
"""
if len(x_range) != 2:
self.x_range = [0 - self.rmax, self.rmax]
if len(y_range) != 2:
self.y_range = [0 - self.rmax, self.rmax]
if x is None or y is None:
# Check if one of the input parameters is None but the other one is not None
if x != y:
warnings.warn("One of the input parameters is missing, setting both to 'None'", stacklevel=3)
x, y = None, None
# Creating mesh grid
x, y = self.get_grid(x_range=x_range, y_range=y_range)
# Calculate radius
r = np.sqrt(np.square(x) + np.square(y)) + 1e-10
# Calculate velocity vector
vector = [
0 - self.get_vtheta(r) * y + self.get_vradial(r) * x,
self.get_vtheta(r) * x + self.get_vradial(r) * y,
]
self.vx = vector[0] / r
self.vy = vector[1] / r
return self.vx, self.vy