# Source code for cytoflow.utility.algorithms

```#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflow.utility.algorithms
---------------------------

Useful algorithms.

`ci` -- determine a confidence interval by boostrapping.

`percentiles` -- find percentiles in an array.

`bootstrap` -- resample (with replacement) and store aggregate values.
"""

import numpy as np
from scipy import stats

[docs]def ci(data, func, which=95, boots=1000):
"""
Determine the confidence interval of a function applied to a data set by
bootstrapping.

Parameters
----------
data : pandas.DataFrame
The data to resample.

func : callable
A function that is called on a resampled ``data``

which : int
The percentile to use for the confidence interval

boots : int (default = 1000):
How many times to bootstrap

Returns
-------
(float, float)
The confidence interval.

"""
boots = bootstrap(data, func = func, n_boot = boots)
p = 50 - which / 2, 50 + which / 2
return tuple(percentiles(boots, p))

[docs]def percentiles(a, pcts, axis=None):
"""
Like `scipy.stats.scoreatpercentile` but can take and return array of percentiles.

Parameters
----------
a : array
data

pcts : sequence of percentile values
percentile or percentiles to find score at

axis : int or None
if not None, computes scores over this axis

Returns
-------
scores: array
array of scores at requested percentiles
first dimension is length of object passed to ``pcts``

"""
scores = []
try:
n = len(pcts)
except TypeError:
pcts = [pcts]
n = 0
for p in pcts:
if axis is None:
score = stats.scoreatpercentile(a.ravel(), p)
else:
score = np.apply_along_axis(stats.scoreatpercentile, axis, a, p)
scores.append(score)
scores = np.asarray(scores)
if not n:
scores = scores.squeeze()
return scores

[docs]def bootstrap(*args, **kwargs):
"""
Resample one or more arrays with replacement and store aggregate values.
Positional arguments are a sequence of arrays to bootstrap along the first
axis and pass to a summary function.

Parameters
----------
n_boot : int, default 10000
Number of iterations

axis : int, default None
Will pass axis to ``func`` as a keyword argument.

units : array, default None
Array of sampling unit IDs. When used the bootstrap resamples units
and then observations within units instead of individual
datapoints.

smooth : bool, default False
If True, performs a smoothed bootstrap (draws samples from a kernel
destiny estimate); only works for one-dimensional inputs and cannot
be used `units` is present.

func : callable, default np.mean
Function to call on the args that are passed in.

random_seed : int | None, default None
Seed for the random number generator; useful if you want
reproducible resamples.

Returns
-------
array
array of bootstrapped statistic values

"""
# Ensure list of arrays are same length
if len(np.unique(list(map(len, args)))) > 1:
raise ValueError("All input arrays must have the same length")
n = len(args[0])

# Default keyword arguments
n_boot = kwargs.get("n_boot", 10000)
func = kwargs.get("func", np.mean)
axis = kwargs.get("axis", None)
units = kwargs.get("units", None)
smooth = kwargs.get("smooth", False)
random_seed = kwargs.get("random_seed", None)
if axis is None:
func_kwargs = dict()
else:
func_kwargs = dict(axis=axis)

# Initialize the resampler
rs = np.random.RandomState(random_seed)

# Coerce to arrays
args = list(map(np.asarray, args))
if units is not None:
units = np.asarray(units)

# Do the bootstrap
if smooth:
return _smooth_bootstrap(args, n_boot, func, func_kwargs)

if units is not None:
return _structured_bootstrap(args, n_boot, units, func,
func_kwargs, rs)

boot_dist = []
for _ in range(int(n_boot)):
resampler = rs.randint(0, n, n)
sample = [a.take(resampler, axis=0) for a in args]
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)

def _structured_bootstrap(args, n_boot, units, func, func_kwargs, rs):
"""Resample units instead of datapoints."""
unique_units = np.unique(units)
n_units = len(unique_units)

args = [[a[units == unit] for unit in unique_units] for a in args]

boot_dist = []
for _ in range(int(n_boot)):
resampler = rs.randint(0, n_units, n_units)
sample = [np.take(a, resampler, axis=0) for a in args]
lengths = list(map(len, sample[0]))
resampler = [rs.randint(0, n, n) for n in lengths]
sample = [[c.take(r, axis=0) for c, r in zip(a, resampler)]
for a in sample]
sample = list(map(np.concatenate, sample))
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)

def _smooth_bootstrap(args, n_boot, func, func_kwargs):
"""Bootstrap by resampling from a kernel density estimate."""
n = len(args[0])
boot_dist = []
kde = [stats.gaussian_kde(np.transpose(a)) for a in args]
for _ in range(int(n_boot)):
sample = [a.resample(n).T for a in kde]
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)

# from https://stackoverflow.com/questions/36399381/whats-the-fastest-way-of-checking-if-a-point-is-inside-a-polygon-in-python
from numba import jit, njit
import numba

[docs]@jit(nopython=True)
def is_inside_sm(polygon, point):
length = len(polygon)-1
dy2 = point[1] - polygon[0][1]
intersections = 0
ii = 0
jj = 1

while ii<length:
dy  = dy2
dy2 = point[1] - polygon[jj][1]

# consider only lines which are not completely above/bellow/right from the point
if dy*dy2 <= 0.0 and (point[0] >= polygon[ii][0] or point[0] >= polygon[jj][0]):

# non-horizontal line
if dy<0 or dy2<0:
F = dy*(polygon[jj][0] - polygon[ii][0])/(dy-dy2) + polygon[ii][0]

if point[0] > F: # if line is left from the point - the ray moving towards left, will intersect it
intersections += 1
elif point[0] == F: # point on line
return 2

# point on upper peak (dy2=dx2=0) or horizontal line (dy=dy2=0 and dx*dx2<=0)
elif dy2==0 and (point[0]==polygon[jj][0] or (dy==0 and (point[0]-polygon[ii][0])*(point[0]-polygon[jj][0])<=0)):
return 2

ii = jj
jj += 1

#print 'intersections =', intersections
return intersections & 1

[docs]@njit(parallel=True)
def polygon_contains(points, polygon):
ln = len(points)
D = np.empty(ln, dtype=numba.boolean)
for i in numba.prange(ln):
D[i] = is_inside_sm(polygon,points[i])
return D
```