Source code for cytoflow.utility.algorithms

#!/usr/bin/env python3.4
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2021
#
# This program is free software: you can redistribute it and/or modify
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

'''
cytoflow.utility.algorithms
---------------------------

Useful algorithms.
'''

import numpy as np
from scipy import stats

[docs]def ci(data, func, which=95, boots=1000):
"""
Determine the confidence interval of a function applied to a data set by
bootstrapping.

Parameters
----------
data : pandas.DataFrame
The data to resample.

func : callable
A function that is called on a resampled data

which : int
The percentile to use for the confidence interval

boots : int (default = 1000):
How many times to bootstrap

Returns
-------
(float, float)
The confidence interval.

"""
boots = bootstrap(data, func = func, n_boot = boots)
p = 50 - which / 2, 50 + which / 2
return tuple(percentiles(boots, p))

[docs]def percentiles(a, pcts, axis=None):
"""
Like scoreatpercentile but can take and return array of percentiles.

Parameters
----------
a : array
data

pcts : sequence of percentile values
percentile or percentiles to find score at

axis : int or None
if not None, computes scores over this axis

Returns
-------
scores: array
array of scores at requested percentiles
first dimension is length of object passed to pcts

"""
scores = []
try:
n = len(pcts)
except TypeError:
pcts = [pcts]
n = 0
for p in pcts:
if axis is None:
score = stats.scoreatpercentile(a.ravel(), p)
else:
score = np.apply_along_axis(stats.scoreatpercentile, axis, a, p)
scores.append(score)
scores = np.asarray(scores)
if not n:
scores = scores.squeeze()
return scores

[docs]def bootstrap(*args, **kwargs):
"""
Resample one or more arrays with replacement and store aggregate values.
Positional arguments are a sequence of arrays to bootstrap along the first
axis and pass to a summary function.

Parameters
----------
n_boot : int, default 10000
Number of iterations

axis : int, default None
Will pass axis to func as a keyword argument.

units : array, default None
Array of sampling unit IDs. When used the bootstrap resamples units
and then observations within units instead of individual
datapoints.

smooth : bool, default False
If True, performs a smoothed bootstrap (draws samples from a kernel
destiny estimate); only works for one-dimensional inputs and cannot
be used units is present.

func : callable, default np.mean
Function to call on the args that are passed in.

random_seed : int | None, default None
Seed for the random number generator; useful if you want
reproducible resamples.

Returns
-------
array
array of bootstrapped statistic values

"""
# Ensure list of arrays are same length
if len(np.unique(list(map(len, args)))) > 1:
raise ValueError("All input arrays must have the same length")
n = len(args[0])

# Default keyword arguments
n_boot = kwargs.get("n_boot", 10000)
func = kwargs.get("func", np.mean)
axis = kwargs.get("axis", None)
units = kwargs.get("units", None)
smooth = kwargs.get("smooth", False)
random_seed = kwargs.get("random_seed", None)
if axis is None:
func_kwargs = dict()
else:
func_kwargs = dict(axis=axis)

# Initialize the resampler
rs = np.random.RandomState(random_seed)

# Coerce to arrays
args = list(map(np.asarray, args))
if units is not None:
units = np.asarray(units)

# Do the bootstrap
if smooth:
return _smooth_bootstrap(args, n_boot, func, func_kwargs)

if units is not None:
return _structured_bootstrap(args, n_boot, units, func,
func_kwargs, rs)

boot_dist = []
for i in range(int(n_boot)):
resampler = rs.randint(0, n, n)
sample = [a.take(resampler, axis=0) for a in args]
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)

def _structured_bootstrap(args, n_boot, units, func, func_kwargs, rs):
unique_units = np.unique(units)
n_units = len(unique_units)

args = [[a[units == unit] for unit in unique_units] for a in args]

boot_dist = []
for i in range(int(n_boot)):
resampler = rs.randint(0, n_units, n_units)
sample = [np.take(a, resampler, axis=0) for a in args]
lengths = list(map(len, sample[0]))
resampler = [rs.randint(0, n, n) for n in lengths]
sample = [[c.take(r, axis=0) for c, r in zip(a, resampler)]
for a in sample]
sample = list(map(np.concatenate, sample))
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)

def _smooth_bootstrap(args, n_boot, func, func_kwargs):
"""Bootstrap by resampling from a kernel density estimate."""
n = len(args[0])
boot_dist = []
kde = [stats.gaussian_kde(np.transpose(a)) for a in args]
for i in range(int(n_boot)):
sample = [a.resample(n).T for a in kde]
boot_dist.append(func(*sample, **func_kwargs))
return np.array(boot_dist)