import time
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from collections import defaultdict
from joblib import Parallel, delayed
import skmisc.loess as sl
from typing import List
from anndata import AnnData
import logging
logger = logging.getLogger("sccloud")
def estimate_feature_statistics(data: AnnData, consider_batch: bool) -> None:
""" Estimate feature (gene) statistics per channel, such as mean, var etc.
"""
assert issparse(data.X)
if consider_batch:
start = time.time()
if "Channels" not in data.uns:
data.uns["Channels"] = data.obs["Channel"].unique()
if "Group" not in data.obs:
data.obs["Group"] = "one_group"
if "Groups" not in data.uns:
data.uns["Groups"] = data.obs["Group"].unique()
channels = data.uns["Channels"]
groups = data.uns["Groups"]
ncells = np.zeros(channels.size)
means = np.zeros((data.shape[1], channels.size))
partial_sum = np.zeros((data.shape[1], channels.size))
group_dict = defaultdict(list)
for i, channel in enumerate(channels):
idx = np.isin(data.obs["Channel"], channel)
mat = data.X[idx].astype(np.float64)
ncells[i] = mat.shape[0]
if ncells[i] == 0:
continue
if ncells[i] == 1:
means[:, i] = mat.toarray()[0]
else:
means[:, i] = mat.mean(axis=0).A1
m2 = mat.power(2).sum(axis=0).A1
partial_sum[:, i] = m2 - ncells[i] * (means[:, i] ** 2)
group = data.obs["Group"][idx.nonzero()[0][0]]
group_dict[group].append(i)
partial_sum[partial_sum < 1e-6] = 0.0
overall_means = np.dot(means, ncells) / data.shape[0]
batch_adjusted_vars = np.zeros(data.shape[1])
c2gid = np.zeros(channels.size, dtype=int)
gncells = np.zeros(groups.size)
gmeans = np.zeros((data.shape[1], groups.size))
gstds = np.zeros((data.shape[1], groups.size))
for i, group in enumerate(groups):
gchannels = group_dict[group]
c2gid[gchannels] = i
gncells[i] = ncells[gchannels].sum()
gmeans[:, i] = np.dot(means[:, gchannels], ncells[gchannels]) / gncells[i]
gstds[:, i] = (
partial_sum[:, gchannels].sum(axis=1) / gncells[i]
) ** 0.5 # calculate std
if groups.size > 1:
batch_adjusted_vars += gncells[i] * (
(gmeans[:, i] - overall_means) ** 2
)
data.varm["means"] = means
data.varm["partial_sum"] = partial_sum
data.uns["ncells"] = ncells
data.varm["gmeans"] = gmeans
data.varm["gstds"] = gstds
data.uns["gncells"] = gncells
data.uns["c2gid"] = c2gid
data.var["mean"] = overall_means
data.var["var"] = (batch_adjusted_vars + partial_sum.sum(axis=1)) / (
data.shape[0] - 1.0
)
end = time.time()
logger.info(
"Estimation on feature statistics per channel is finished. Time spent = {:.2f}s.".format(
end - start
)
)
else:
mean = data.X.mean(axis=0).A1
m2 = data.X.power(2).sum(axis=0).A1
var = (m2 - data.X.shape[0] * (mean ** 2)) / (data.X.shape[0] - 1)
data.var["mean"] = mean
data.var["var"] = var
def select_hvf_scCloud(
data: AnnData, consider_batch: bool, n_top: int = 2000, span: float = 0.02
) -> None:
""" Select highly variable features using the sccloud method
"""
if "robust" not in data.var:
raise ValueError("Please run `qc_metrics` to identify robust genes")
estimate_feature_statistics(data, consider_batch)
robust_idx = data.var["robust"].values
hvf_index = np.zeros(robust_idx.sum(), dtype=bool)
mean = data.var.loc[robust_idx, "mean"]
var = data.var.loc[robust_idx, "var"]
lobj = sl.loess(mean, var, span=span, degree=2)
lobj.fit()
rank1 = np.zeros(hvf_index.size, dtype=int)
rank2 = np.zeros(hvf_index.size, dtype=int)
delta = var - lobj.outputs.fitted_values
fc = var / lobj.outputs.fitted_values
rank1[np.argsort(delta)[::-1]] = range(hvf_index.size)
rank2[np.argsort(fc)[::-1]] = range(hvf_index.size)
hvf_rank = rank1 + rank2
hvf_index[np.argsort(hvf_rank)[:n_top]] = True
data.var["hvf_loess"] = 0.0
data.var.loc[robust_idx, "hvf_loess"] = lobj.outputs.fitted_values
data.var["hvf_rank"] = -1
data.var.loc[robust_idx, "hvf_rank"] = hvf_rank
data.var["highly_variable_features"] = False
data.var.loc[robust_idx, "highly_variable_features"] = hvf_index
def select_hvf_seurat_single(
X: "csr_matrix",
n_top: int,
min_disp: float,
max_disp: float,
min_mean: float,
max_mean: float,
) -> List[int]:
""" HVF selection for one channel using Seurat method
"""
X = X.copy().expm1()
mean = X.mean(axis=0).A1
m2 = X.power(2).sum(axis=0).A1
var = (m2 - X.shape[0] * (mean ** 2)) / (X.shape[0] - 1)
dispersion = np.full(X.shape[1], np.nan)
idx_valid = (mean > 0.0) & (var > 0.0)
dispersion[idx_valid] = var[idx_valid] / mean[idx_valid]
mean = np.log1p(mean)
dispersion = np.log(dispersion)
df = pd.DataFrame({"log_dispersion": dispersion, "bin": pd.cut(mean, bins=20)})
log_disp_groups = df.groupby("bin")["log_dispersion"]
log_disp_mean = log_disp_groups.mean()
log_disp_std = log_disp_groups.std(ddof=1)
log_disp_zscore = (
df["log_dispersion"].values - log_disp_mean.loc[df["bin"]].values
) / log_disp_std.loc[df["bin"]].values
log_disp_zscore[np.isnan(log_disp_zscore)] = 0.0
hvf_rank = np.full(X.shape[1], -1, dtype=int)
ords = np.argsort(log_disp_zscore)[::-1]
if n_top is None:
hvf_rank[ords] = range(X.shape[1])
idx = np.logical_and.reduce(
(
mean > min_mean,
mean < max_mean,
log_disp_zscore > min_disp,
log_disp_zscore < max_disp,
)
)
hvf_rank[~idx] = -1
else:
hvf_rank[ords[:n_top]] = range(n_top)
return hvf_rank
def select_hvf_seurat_multi(
X: "csr_matrix",
channels: List[str],
cell2channel: List[str],
n_top: int,
n_jobs: int,
min_disp: float,
max_disp: float,
min_mean: float,
max_mean: float,
) -> List[int]:
Xs = []
for channel in channels:
Xs.append(X[np.isin(cell2channel, channel)])
from joblib import effective_n_jobs
n_jobs = effective_n_jobs(n_jobs)
res_arr = np.array(
Parallel(n_jobs=n_jobs)(
delayed(select_hvf_seurat_single)(
Xs[i], n_top, min_disp, max_disp, min_mean, max_mean
)
for i in range(channels.size)
)
)
selected = res_arr >= 0
shared = selected.sum(axis=0)
cands = (shared > 0).nonzero()[0]
import numpy.ma as ma
median_rank = ma.median(ma.masked_array(res_arr, mask=~selected), axis=0).data
cands = sorted(cands, key=lambda x: median_rank[x])
cands = sorted(cands, key=lambda x: shared[x], reverse=True)
hvf_rank = np.full(X.shape[1], -1, dtype=int)
hvf_rank[cands[:n_top]] = range(n_top)
return hvf_rank
def select_hvf_seurat(
data: AnnData,
consider_batch: bool,
n_top: int,
min_disp: float,
max_disp: float,
min_mean: float,
max_mean: float,
n_jobs: int,
) -> None:
""" Select highly variable features using Seurat method.
"""
robust_idx = data.var["robust"].values
X = data.X[:, robust_idx]
hvf_rank = (
select_hvf_seurat_multi(
X,
data.uns["Channels"],
data.obs["Channel"],
n_top,
n_jobs=n_jobs,
min_disp=min_disp,
max_disp=max_disp,
min_mean=min_mean,
max_mean=max_mean,
)
if consider_batch
else select_hvf_seurat_single(
X,
n_top=n_top,
min_disp=min_disp,
max_disp=max_disp,
min_mean=min_mean,
max_mean=max_mean,
)
)
hvf_index = hvf_rank >= 0
data.var["hvf_rank"] = -1
data.var.loc[robust_idx, "hvf_rank"] = hvf_rank
data.var["highly_variable_features"] = False
data.var.loc[robust_idx, "highly_variable_features"] = hvf_index
[docs]def highly_variable_features(
data: AnnData,
consider_batch: bool,
flavor: str = "sccloud",
n_top: int = 2000,
span: float = 0.02,
min_disp: float = 0.5,
max_disp: float = np.inf,
min_mean: float = 0.0125,
max_mean: float = 7,
n_jobs: int = -1,
) -> None:
""" Highly variable features (HVF) selection. The input data should be logarithmized.
Parameters
----------
data: ``anndata.AnnData``
Annotated data matrix with rows for cells and columns for genes.
consider_batch: ``bool``.
Whether consider batch effects or not.
flavor: ``str``, optional, default: ``"sccloud"``
The HVF selection method to use. Available choices are ``"sccloud"`` or ``"Seurat"``.
n_top: ``int``, optional, default: ``2000``
Number of genes to be selected as HVF. if ``None``, no gene will be selected.
span: ``float``, optional, default: ``0.02``
Only applicable when ``flavor`` is ``"sccloud"``. The smoothing factor used by *scikit-learn loess* model in sccloud HVF selection method.
min_disp: ``float``, optional, default: ``0.5``
Minimum normalized dispersion.
max_disp: ``float``, optional, default: ``np.inf``
Maximum normalized dispersion. Set it to ``np.inf`` for infinity bound.
min_mean: ``float``, optional, default: ``0.0125``
Minimum mean.
max_mean: ``float``, optional, default: ``7``
Maximum mean.
n_jobs: ``int``, optional, default: ``-1``
Number of threads to be used during calculation. If ``-1``, all available threads will be used.
Returns
-------
Examples
--------
>>> scc.highly_variable_features(adata, consider_batch = False)
"""
start = time.time()
if "Channels" not in data.uns:
if "Channel" not in data.obs:
data.obs["Channel"] = ""
data.uns["Channels"] = data.obs["Channel"].unique()
if data.uns["Channels"].size == 1 and consider_batch:
consider_batch = False
logger.warning(
"Warning: only contains one channel, no need to consider batch for selecting highly variable features."
)
if flavor == "sccloud":
select_hvf_scCloud(data, consider_batch, n_top=n_top, span=span)
else:
assert flavor == "Seurat"
select_hvf_seurat(
data,
consider_batch,
n_top=n_top,
min_disp=min_disp,
max_disp=max_disp,
min_mean=min_mean,
max_mean=max_mean,
n_jobs=n_jobs,
)
end = time.time()
logger.info(
"{tot} highly variable features have been selected. Time spent = {time:.2f}s.".format(
tot=data.var["highly_variable_features"].sum(), time=end - start
)
)