#!/usr/bin/env python
# @Author: Kelvin
# @Date: 2022-07-18 11:33:46
# @Last Modified by: Kelvin
# @Last Modified time: 2022-11-17 16:09:57
"""Miscellaneous single-cell functions."""
import functools
import math
import scipy.sparse
import numpy as np
import scanpy as sc
import pandas as pd
from anndata import AnnData
from pandas import DataFrame
from scanpy.pl._dotplot import DotPlot
from typing import List, Union
[docs]
def exportDEres(
adata: AnnData,
column: str = None,
filename: str = None,
remove_mito_ribo: bool = True,
key: str = "rank_genes_groups",
) -> DataFrame:
"""
Export DE results from scanpy.
Parameters
----------
adata : AnnData
AnnData object with `sc.tl.rank_genes_groups` performed.
column : Optional[str], optional
specific contrast to return.
filename : Optional[str], optional
if provided, save as file. Otherwise, return as DataFrame.
remove_mito_ribo : bool, optional
whether to filter all mito and ribo genes in the output.
key : str, optional
name in `.uns` to retrieve DE results.
Returns
-------
DataFrame
`DataFrame` of DE results.
"""
if column is None:
column = list(adata.uns[key]["scores"].dtype.fields.keys())[0]
else:
column = column
if filename is not None:
df_final = returnDEres(
adata,
column=column,
remove_mito_ribo=remove_mito_ribo,
key=key,
)
df_final.to_csv(filename, sep="\t")
else:
df = returnDEres(
adata,
column=column,
remove_mito_ribo=remove_mito_ribo,
key=key,
)
return df
[docs]
def returnDEres(
adata: AnnData,
column: str = None,
remove_mito_ribo: bool = True,
key: str = "rank_genes_groups",
) -> DataFrame:
"""Summary
Parameters
----------
adata : AnnData
AnnData object with `sc.tl.rank_genes_groups` performed.
column : Optional[str], optional
specific contrast to return.
remove_mito_ribo : bool, optional
whether to filter all mito and ribo genes in the output.
key : str, optional
name in `.uns` to retrieve DE results.
Returns
-------
DataFrame
`DataFrame` of DE results.
"""
if key is None:
key = "rank_genes_groups"
else:
key = key
if column is None:
column = list(adata.uns[key]["scores"].dtype.fields.keys())[0]
else:
column = column
reference = adata.uns["rank_genes_groups"]["params"]["reference"]
scores = DataFrame(
data=adata.uns[key]["scores"][column], index=adata.uns[key]["names"][column]
)
lfc = DataFrame(
data=adata.uns[key]["logfoldchanges"][column],
index=adata.uns[key]["names"][column],
)
pvals = DataFrame(
data=adata.uns[key]["pvals"][column], index=adata.uns[key]["names"][column]
)
padj = DataFrame(
data=adata.uns[key]["pvals_adj"][column], index=adata.uns[key]["names"][column]
)
try:
pts = DataFrame(
data=adata.uns[key]["pts"][column], index=adata.uns[key]["names"][column]
)
ptsx = DataFrame(
data=adata.uns[key]["pts_" + reference][column],
index=adata.uns[key]["names"][column],
)
except:
pass
scores = scores.loc[scores.index.dropna()]
lfc = lfc.loc[lfc.index.dropna()]
pvals = pvals.loc[pvals.index.dropna()]
padj = padj.loc[padj.index.dropna()]
try:
pts = pts.loc[pts.index.dropna()]
ptsx = ptsx.loc[ptsx.index.dropna()]
except:
pass
try:
dfs = [scores, lfc, pvals, padj, pts, ptsx]
except:
dfs = [scores, lfc, pvals, padj]
try:
df_final = functools.reduce(
lambda left, right: pd.merge(
left, right, left_index=True, right_index=True
),
dfs,
)
except:
df_final = pd.concat(dfs, axis=1)
try:
df_final.columns = [
"scores",
"logfoldchanges",
"pvals",
"pvals_adj",
"pts" + "_" + column,
"pts_" + reference,
]
except:
df_final.columns = ["scores", "logfoldchanges", "pvals", "pvals_adj"]
if remove_mito_ribo:
df_final = df_final[
~df_final.index.isin(
list(df_final.filter(regex="^RPL|^RPS|^MRPS|^MRPL|^MT-", axis=0).index)
)
]
df_final = df_final[
~df_final.index.isin(
list(df_final.filter(regex="^Rpl|^Rps|^Mrps|^Mrpl|^mt-", axis=0).index)
)
]
return df_final
[docs]
def vmax(adata: AnnData, genes: Union[List, str], pct: float) -> List:
"""
Extract the maximum expression value from list of genes in `AnnData` at the specified `pct`.
Parameters
----------
adata : AnnData
input `AnnData` object.
genes : Union[List, str]
gene(s) to query from `AnnData` object.
pct : float
percentage to cut-off and return.
Returns
-------
List
List of maximum values.
"""
if type(genes) is not list:
genes = [genes]
vm = []
for g in genes:
try:
idx = adata.raw.var.index.get_loc(g)
except:
idx = adata.var.index.get_loc(g)
vm.append(
math.ceil(np.quantile(adata.raw.X[:, idx].toarray(), pct) * 100.0) / 100.0
)
return vm
[docs]
def vmin(adata: AnnData, genes: Union[List, str], pct: float) -> List:
"""
Extract the minimum expression value from list of genes in `AnnData` at the specified `pct`.
Parameters
----------
adata : AnnData
input `AnnData` object.
genes : Union[List, str]
gene(s) to query from `AnnData` object.
pct : float
percentage to cut-off and return.
Returns
-------
List
List of minimum values.
"""
if type(genes) is not list:
genes = [genes]
vm = []
for g in genes:
try:
idx = adata.raw.var.index.get_loc(g)
except:
idx = adata.var.index.get_loc(g)
vm.append(
math.ceil(np.quantile(adata.raw.X[:, idx].toarray(), 1 - pct) * 100.0)
/ 100.0
)
return vm
[docs]
def cell_cycle_scoring(adata: AnnData, human: bool = False):
"""
Run cell cycle scoring on `AnnData` object.
Parameters
----------
adata : AnnData
input `AnnData` object.
human : bool, optional
whether the data is human or not (mouse).
"""
# cell cycle scoring
adata_cc = adata.copy()
if adata_cc.raw is not None:
adata_cc = adata_cc.raw.to_adata()
if float(np.max(adata_cc.X)).is_integer():
# raw integer counts
sc.pp.normalize_total(adata_cc, target_sum=1e4)
sc.pp.log1p(adata_cc)
sc.pp.scale(adata_cc)
elif np.min(adata_cc.X) == 0:
if "log1p" not in adata_cc.uns:
sc.pp.log1p(adata_cc)
# not scaled
sc.pp.scale(adata_cc)
else:
raise ValueError("Please provide either raw integer or normalised data.")
if not human:
s_genes = [
"Mcm5",
"Pcna",
"Tyms",
"Fen1",
"Mcm2",
"Mcm4",
"Rrm1",
"Ung",
"Gins2",
"Mcm6",
"Cdca7",
"Dtl",
"Prim1",
"Uhrf1",
"Hells",
"Rfc2",
"Rpa2",
"Nasp",
"Rad51ap1",
"Gmnn",
"Wdr76",
"Slbp",
"Ccne2",
"Msh2",
"Rad51",
"Rrm2",
"Cdc45",
"Cdc6",
"Exo1",
"Tipin",
"Dscc1",
"Blm",
"Casp8ap2",
"Usp1",
"Clspn",
"Pola1",
"Chaf1b",
"Brip1",
"E2f8",
]
g2m_genes = [
"Hmgb2",
"Cdk1",
"Nusap1",
"Ube2c",
"Birc5",
"Tpx2",
"Top2a",
"Ndc80",
"Cks2",
"Nuf2",
"Cks1b",
"Mki67",
"Tmpo",
"Cenpf",
"Tacc3",
"Smc4",
"Ccnb2",
"Ckap2l",
"Ckap2",
"Aurkb",
"Bub1",
"Kif11",
"Anp32e",
"Tubb4b",
"Gtse1",
"Kif20b",
"Hjurp",
"Cdca3",
"Cdc20",
"Ttk",
"Cdc25c",
"Kif2c",
"Rangap1",
"Ncapd2",
"Dlgap5",
"Cdca2",
"Cdca8",
"Ect2",
"Kif23",
"Hmmr",
"Aurka",
"Psrc1",
"Anln",
"Lbr",
"Ckap5",
"Cenpe",
"Ctcf",
"Nek2",
"G2e3",
"Gas2l3",
"Cbx5",
"Cenpa",
]
else:
s_genes = [
"MCM5",
"PCNA",
"TYMS",
"FEN1",
"MCM2",
"MCM4",
"RRM1",
"UNG",
"GINS2",
"MCM6",
"CDCA7",
"DTL",
"PRIM1",
"UHRF1",
"MLF1IP",
"HELLS",
"RFC2",
"RPA2",
"NASP",
"RAD51AP1",
"GMNN",
"WDR76",
"SLBP",
"CCNE2",
"UBR7",
"POLD3",
"MSH2",
"ATAD2",
"RAD51",
"RRM2",
"CDC45",
"CDC6",
"EXO1",
"TIPIN",
"DSCC1",
"BLM",
"CASP8AP2",
"USP1",
"CLSPN",
"POLA1",
"CHAF1B",
"BRIP1",
"E2F8",
]
g2m_genes = [
"HMGB2",
"CDK1",
"NUSAP1",
"UBE2C",
"BIRC5",
"TPX2",
"TOP2A",
"NDC80",
"CKS2",
"NUF2",
"CKS1B",
"MKI67",
"TMPO",
"CENPF",
"TACC3",
"FAM64A",
"SMC4",
"CCNB2",
"CKAP2L",
"CKAP2",
"AURKB",
"BUB1",
"KIF11",
"ANP32E",
"TUBB4B",
"GTSE1",
"KIF20B",
"HJURP",
"CDCA3",
"HN1",
"CDC20",
"TTK",
"CDC25C",
"KIF2C",
"RANGAP1",
"NCAPD2",
"DLGAP5",
"CDCA2",
"CDCA8",
"ECT2",
"KIF23",
"HMMR",
"AURKA",
"PSRC1",
"ANLN",
"LBR",
"CKAP5",
"CENPE",
"CTCF",
"NEK2",
"G2E3",
"GAS2L3",
"CBX5",
"CENPA",
]
sc.tl.score_genes_cell_cycle(
adata_cc, s_genes=s_genes, g2m_genes=g2m_genes, use_raw=False
)
for x in ["S_score", "G2M_score", "phase"]:
adata.obs[x] = adata_cc.obs[x]
[docs]
def combine_two_categories(adata: AnnData, A: str, B: str, sep: str = "_") -> None:
"""Combine two categories in place, respecting the order of the concatenation.
Parameters
----------
adata : AnnData
Input anndata object.
A : str
Column name for first category.
B : str
Column name for second category.
sep : str, optional
The separator to combine the names.
"""
comb_cat = A + sep + B
adata.obs[comb_cat] = [a + "_" + b for a, b in zip(adata.obs[A], adata.obs[B])]
adata.obs[A] = adata.obs[A].astype("category")
adata.obs[B] = adata.obs[B].astype("category")
a_cat = adata.obs[A].cat.categories
b_cat = adata.obs[B].cat.categories
cats = []
for a in a_cat:
for b in b_cat:
cats.append(a + "_" + b)
adata.obs[comb_cat] = adata.obs[comb_cat].astype("category")
adata.obs[comb_cat] = adata.obs[comb_cat].cat.reorder_categories(
[c for c in cats if c in adata.obs[comb_cat].cat.categories]
)
[docs]
def dotplot_2obs(
adata,
gene,
x_axis,
y_axis,
x_order=None,
y_order=None,
use_raw=True,
fill_na=True,
show_plot=True,
**kwargs
):
"""
A function that extracts the expression for a provided gene,
and subsequently prepares a dotplot where the two axes are
two categoricals from the obs of the object.
Can be controlled between .raw and .X via use_raw (defaults to raw)
By default fills nonexistent obs combinations as zeroes in the plot
(as otherwise the plot can error in weird ways sometimes).
Thanks kp9!
"""
# prepare dfs with dot sizes and colours
# start off by making a helper df with all the info
obs_tidy_master = adata.obs[[y_axis, x_axis]].set_index(y_axis)
# add gene expression
if use_raw:
if scipy.sparse.issparse(adata.raw.X):
obs_tidy_master[gene] = adata.raw[:, gene].X.todense()
else:
obs_tidy_master[gene] = np.array(adata.raw[:, gene].X)
else:
if scipy.sparse.issparse(adata.X):
obs_tidy_master[gene] = adata[:, gene].X.todense()
else:
obs_tidy_master[gene] = np.array(adata[:, gene].X)
# this is where our computed values will go
dot_size_df = pd.DataFrame(
0,
index=np.unique(obs_tidy_master.index),
columns=np.unique(obs_tidy_master[x_axis]),
)
# reorder the groups if specified
if x_order is not None:
dot_size_df = dot_size_df[x_order]
if y_order is not None:
dot_size_df = dot_size_df.loc[y_order, :]
dot_color_df = dot_size_df.copy()
# compute the sizes and colors, replicating the logic found within DotPlot()
for cat in dot_size_df.columns:
obs_tidy = obs_tidy_master.loc[obs_tidy_master[x_axis] == cat, gene]
obs_bool = obs_tidy > 0
dot_color_df[cat] = obs_tidy.groupby(level=0).mean().fillna(0)
dot_size_df[cat] = (
obs_bool.groupby(level=0).sum() / obs_bool.groupby(level=0).count()
)
# some combinations may be absent, and there will be NaNs there, and this sometimes kills the plot
# fill with zeroes if specified
if fill_na:
dot_color_df.fillna(0, inplace=True)
dot_size_df.fillna(0, inplace=True)
# in order to actually plot this, we need to make a dummy anndata object
# just how DotPlot() is wired internally
bdata = AnnData(np.zeros(dot_size_df.shape))
bdata.var_names = dot_size_df.columns
# this needs to be turned to a list or it whines about a conflict later
bdata.obs_names = list(dot_size_df.index)
# the grouping variable needs to be present in the dummy object
# its existence is checked, it actually does nothing when we insert the dot sizes
bdata.obs[y_axis] = dot_size_df.index
# actually make the dotplot, with the best colour scheme :P
dp = DotPlot(
bdata,
dot_size_df.columns,
y_axis,
dot_size_df=dot_size_df,
dot_color_df=dot_color_df,
title=gene,
**kwargs,
)
if show_plot:
dp.show()
return dp