版本
導(dǎo)入Scanpy, 其版本為'1.9.1'豺妓,如果你看到的源碼和下文有差異砂代,其可能是由于版本差異蹋订。
import scanpy as sc
sc.__version__
#'1.9.1'
功能
函數(shù)pp.calculate_qc_metrics
其源代碼在scanpy/preprocessing/_qc.py
其主要功能為計(jì)算一些質(zhì)控指標(biāo)。詳細(xì)指標(biāo)見下文的小標(biāo)題
代碼解析
主要代碼
以下為calculate_qc_metrics
主要邏輯代碼刻伊,為方便理解主要邏輯露戒,其中刪除了一些即將廢棄的,異常處理捶箱,日志打印智什,稀疏矩陣處理等代碼。
[圖片上傳失敗...(image-defb81-1663405843670)]
其中質(zhì)控指標(biāo)計(jì)算由另外兩個(gè)函數(shù)完成丁屎,我們?cè)谙挛牧硗庹故舅鼈兊拇a荠锭。
代碼說明
代碼前幾行是函數(shù)的參數(shù)設(shè)置:
def calculate_qc_metrics(
adata: AnnData,
*,
expr_type: str = "counts",
var_type: str = "genes",
qc_vars: Collection[str] = (),
percent_top: Optional[Collection[int]] = (50, 100, 200, 500),
layer: Optional[str] = None,
use_raw: bool = False,
inplace: bool = False,
log1p: bool = True,
) -> Optional[Tuple[pd.DataFrame, pd.DataFrame]]:
adata, expr_type, ..., log1p是函數(shù)參數(shù), 冒號(hào)后面跟的是參數(shù)類型注解晨川,表明這個(gè)參數(shù)應(yīng)該傳遞什么類型的值給函數(shù)证九。
# def _choose_mtx_rep(adata, use_raw=False, layer=None):
# is_layer = layer is not None
# if use_raw and is_layer:
# raise ValueError(
# "Cannot use expression from both layer and raw. You provided:"
# f"'use_raw={use_raw}' and 'layer={layer}'"
# )
# if is_layer:
# return adata.layers[layer]
# elif use_raw:
# return adata.raw.X
# else:
# return adata.X
X = _choose_mtx_rep(adata, use_raw, layer)
該行代碼用到參數(shù)adata
, use_raw
, layer
, 根據(jù)參數(shù)設(shè)置來選擇對(duì)應(yīng)的數(shù)據(jù)。其中注釋部分是調(diào)用函數(shù)源代碼共虑。
obs_metrics = describe_obs(
adata,
expr_type=expr_type,
var_type=var_type,
qc_vars=qc_vars,
percent_top=percent_top,
inplace=inplace,
X=X,
log1p=log1p,
)
var_metrics = describe_var(
adata,
expr_type=expr_type,
var_type=var_type,
inplace=inplace,
X=X,
log1p=log1p,
)
if not inplace:
return obs_metrics, var_metrics
在上面的代碼中分別調(diào)用這兩個(gè)函數(shù)describe_obs
, describe_var
愧怜,來進(jìn)行對(duì)基因,細(xì)胞進(jìn)行質(zhì)控指標(biāo)的計(jì)算看蚜。
最后inplace=False 則直接返回兩個(gè)質(zhì)控指標(biāo)叫搁。
describe_obs
函數(shù)源碼
def describe_obs(
adata: AnnData,
*,
expr_type: str = "counts",
var_type: str = "genes",
qc_vars: Collection[str] = (),
percent_top: Optional[Collection[int]] = (50, 100, 200, 500),
layer: Optional[str] = None,
use_raw: bool = False,
log1p: Optional[str] = True,
inplace: bool = False,
X=None,
) -> Optional[pd.DataFrame]:
# Handle whether X is passed
if X is None:
X = _choose_mtx_rep(adata, use_raw, layer)
if isspmatrix_coo(X):
X = csr_matrix(X) # COO not subscriptable
if issparse(X):
X.eliminate_zeros()
obs_metrics = pd.DataFrame(index=adata.obs_names)
if issparse(X):
obs_metrics[f"n_{var_type}_by_{expr_type}"] = X.getnnz(axis=1)
else:
obs_metrics[f"n_{var_type}_by_{expr_type}"] = np.count_nonzero(X, axis=1)
if log1p:
obs_metrics[f"log1p_n_{var_type}_by_{expr_type}"] = np.log1p(
obs_metrics[f"n_{var_type}_by_{expr_type}"]
)
obs_metrics[f"total_{expr_type}"] = np.ravel(X.sum(axis=1))
if log1p:
obs_metrics[f"log1p_total_{expr_type}"] = np.log1p(
obs_metrics[f"total_{expr_type}"]
)
if percent_top:
percent_top = sorted(percent_top)
proportions = top_segment_proportions(X, percent_top)
for i, n in enumerate(percent_top):
obs_metrics[f"pct_{expr_type}_in_top_{n}_{var_type}"] = (
proportions[:, i] * 100
)
for qc_var in qc_vars:
obs_metrics[f"total_{expr_type}_{qc_var}"] = np.ravel(
X[:, adata.var[qc_var].values].sum(axis=1)
)
if log1p:
obs_metrics[f"log1p_total_{expr_type}_{qc_var}"] = np.log1p(
obs_metrics[f"total_{expr_type}_{qc_var}"]
)
obs_metrics[f"pct_{expr_type}_{qc_var}"] = (
obs_metrics[f"total_{expr_type}_{qc_var}"]
/ obs_metrics[f"total_{expr_type}"]
* 100
)
if inplace:
adata.obs[obs_metrics.columns] = obs_metrics
else:
return obs_metrics
處理X參數(shù)
# Handle whether X is passed
if X is None:
X = _choose_mtx_rep(adata, use_raw, layer)
if isspmatrix_coo(X):
X = csr_matrix(X) # COO not subscriptable
if issparse(X):
X.eliminate_zeros()
如果沒傳遞X參數(shù), 重新在adata里根據(jù)use_raw, layer獲取數(shù)據(jù)。
生成obs指標(biāo)DataFrame
obs_metrics = pd.DataFrame(index=adata.obs_names)
該行代碼生成一個(gè)DataFrame, 其中行名 為細(xì)胞名(adata.obs_names)
n_genes_by_counts
n_genes_by_counts
為每個(gè)細(xì)胞的基因表達(dá)量>0的基因數(shù)目
if issparse(X):
obs_metrics[f"n_{var_type}_by_{expr_type}"] = X.getnnz(axis=1)
else:
obs_metrics[f"n_{var_type}_by_{expr_type}"] = np.count_nonzero(X, axis=1)
該部分代碼if else兩個(gè)分支所作用目的是一樣的渴逻,只是為了支持不同的數(shù)據(jù)類似疾党,形成了兩個(gè)分支,
該部分前面兩行為支持稀疏矩陣處理惨奕,暫且不管雪位,當(dāng)前的源碼解析主要關(guān)注numpy.ndarray類型。
我們可以從源碼中發(fā)現(xiàn)n_genes_by_counts
由f"n_{var_type}_by_{expr_type}"
生成梨撞,其中
- var_type 為可傳遞改變的參數(shù)雹洗,默認(rèn)為"genes"
- expr_type為可傳遞改變的參數(shù),默認(rèn)為"counts"
np.count_nonzero(X, axis=1)
計(jì)算了每行細(xì)胞中表達(dá)量非0的基因的數(shù)量
log1p_n_genes_by_counts
if log1p:
obs_metrics[f"log1p_n_{var_type}_by_{expr_type}"] = np.log1p(
obs_metrics[f"n_{var_type}_by_{expr_type}"]
)
如果log1p為True, 則對(duì)**n_genes_by_counts **進(jìn)行l(wèi)og1p轉(zhuǎn)換處理卧波,得到log1p_n_genes_by_counts
log1p表示 log(X+1), 為防止為0值出現(xiàn)(log(0))
total_counts
obs_metrics[f"total_{expr_type}"] = np.ravel(X.sum(axis=1))
計(jì)算每個(gè)細(xì)胞的total counts
log1p_total_counts
if log1p:
obs_metrics[f"log1p_total_{expr_type}"] = np.log1p(
obs_metrics[f"total_{expr_type}"]
)
如果log1p為True, 則對(duì)total_counts 進(jìn)行l(wèi)og1p轉(zhuǎn)換處理时肿,得到log1p_total_counts
pct_counts_in_top_{n}_genes
if percent_top:
percent_top = sorted(percent_top)
proportions = top_segment_proportions(X, percent_top)
for i, n in enumerate(percent_top):
obs_metrics[f"pct_{expr_type}_in_top_{n}_{var_type}"] = (
proportions[:, i] * 100
)
percent_top
默認(rèn)值為 (50, 100, 200, 500)
該參數(shù)用于設(shè)定尋找每個(gè)細(xì)胞中前n個(gè)基因的表達(dá)量和占總的基因中表達(dá)量和的比例。
函數(shù)top_segment_proportions
用于計(jì)算這個(gè)比例港粱。for循環(huán)將percent_top
中每個(gè)n值螃成,所計(jì)算的比例轉(zhuǎn)換成百分比,并保存在obs_metrics 這個(gè)DataFrame中查坪。
def top_segment_proportions(
mtx: Union[np.array, spmatrix], ns: Collection[int]
) -> np.ndarray:
# Pretty much just does dispatch
if not (max(ns) <= mtx.shape[1] and min(ns) > 0):
raise IndexError("Positions outside range of features.")
if issparse(mtx):
if not isspmatrix_csr(mtx):
mtx = csr_matrix(mtx)
return top_segment_proportions_sparse_csr(mtx.data, mtx.indptr, np.array(ns))
else:
return top_segment_proportions_dense(mtx, ns)
def top_segment_proportions_dense(
mtx: Union[np.array, spmatrix], ns: Collection[int]
) -> np.ndarray:
# Currently ns is considered to be 1 indexed
ns = np.sort(ns)
sums = mtx.sum(axis=1)
partitioned = np.apply_along_axis(np.partition, 1, mtx, mtx.shape[1] - ns)[:, ::-1][
:, : ns[-1]
]
values = np.zeros((mtx.shape[0], len(ns)))
acc = np.zeros(mtx.shape[0])
prev = 0
for j, n in enumerate(ns):
acc += partitioned[:, prev:n].sum(axis=1)
values[:, j] = acc
prev = n
return values / sums[:, None]
top_segment_proportions
源碼見上面寸宏, 其中根據(jù)傳入矩陣類型分別調(diào)用了兩個(gè)函數(shù)進(jìn)行處理:
-
top_segment_proportions_sparse_csr
處理sparse 矩陣 -
top_segment_proportions_dense
處理dense矩陣
我們關(guān)注下dense矩陣處理方式,理解top_segment_proportions_dense源碼偿曙,有幾個(gè)要點(diǎn):
-
np.apply_along_axis
函數(shù)作用 -
np.partition
函數(shù)作用 -
[:, ::-1]
取反操作 - 其他代碼
qc_vars 相關(guān)指標(biāo)計(jì)算
for qc_var in qc_vars:
obs_metrics[f"total_{expr_type}_{qc_var}"] = np.ravel(
X[:, adata.var[qc_var].values].sum(axis=1)
)
if log1p:
obs_metrics[f"log1p_total_{expr_type}_{qc_var}"] = np.log1p(
obs_metrics[f"total_{expr_type}_{qc_var}"]
)
obs_metrics[f"pct_{expr_type}_{qc_var}"] = (
obs_metrics[f"total_{expr_type}_{qc_var}"]
/ obs_metrics[f"total_{expr_type}"]
* 100
)
qc_vars 用于指定adata.var里的特定字段氮凝,該字段需要為布爾值,來進(jìn)行相關(guān)指標(biāo)計(jì)算望忆。例如罩阵,假設(shè)adata.var有個(gè)字段為"mt" 用于判斷基因是否為線粒體基因。就會(huì)得到三個(gè)相關(guān)指標(biāo):
- total_counts_mt 細(xì)胞中炭臭,線粒體基因表達(dá)量總和
- log1p_total_counts_mt log1p(細(xì)胞中線粒體基因表達(dá)量總和)
- pct_counts_mt 細(xì)胞中永脓,線粒體基因表達(dá)量總和 占總的基因表達(dá)和的百分比
返回
if inplace:
adata.obs[obs_metrics.columns] = obs_metrics
else:
return obs_metrics
若是inplace為真,則將計(jì)算的這些指標(biāo)添加到adata.obs鞋仍, 否則直接返回指標(biāo)數(shù)據(jù)
describe_var
函數(shù)源碼
def describe_var(
adata: AnnData,
*,
expr_type: str = "counts",
var_type: str = "genes",
layer: Optional[str] = None,
use_raw: bool = False,
inplace=False,
log1p=True,
X=None,
) -> Optional[pd.DataFrame]:
# Handle whether X is passed
if X is None:
X = _choose_mtx_rep(adata, use_raw, layer)
if isspmatrix_coo(X):
X = csr_matrix(X) # COO not subscriptable
if issparse(X):
X.eliminate_zeros()
var_metrics = pd.DataFrame(index=adata.var_names)
if issparse(X):
# Current memory bottleneck for csr matrices:
var_metrics["n_cells_by_{expr_type}"] = X.getnnz(axis=0)
var_metrics["mean_{expr_type}"] = mean_variance_axis(X, axis=0)[0]
else:
var_metrics["n_cells_by_{expr_type}"] = np.count_nonzero(X, axis=0)
var_metrics["mean_{expr_type}"] = X.mean(axis=0)
if log1p:
var_metrics["log1p_mean_{expr_type}"] = np.log1p(
var_metrics["mean_{expr_type}"]
)
var_metrics["pct_dropout_by_{expr_type}"] = (
1 - var_metrics["n_cells_by_{expr_type}"] / X.shape[0]
) * 100
var_metrics["total_{expr_type}"] = np.ravel(X.sum(axis=0))
if log1p:
var_metrics["log1p_total_{expr_type}"] = np.log1p(
var_metrics["total_{expr_type}"]
)
# Relabel
new_colnames = []
for col in var_metrics.columns:
new_colnames.append(col.format(**locals()))
var_metrics.columns = new_colnames
if inplace:
adata.var[var_metrics.columns] = var_metrics
else:
return var_metrics
處理X參數(shù)
# Handle whether X is passed
if X is None:
X = _choose_mtx_rep(adata, use_raw, layer)
if isspmatrix_coo(X):
X = csr_matrix(X) # COO not subscriptable
if issparse(X):
X.eliminate_zeros()
如果沒傳遞X參數(shù)常摧, 重新在adata里根據(jù)use_raw, layer獲取數(shù)據(jù)。
生成var指標(biāo)DataFrame
var_metrics = pd.DataFrame(index=adata.var_names)
該行代碼生成一個(gè)DataFrame, 其中行名 為基因名(adata.var_names)
n_cells_by_counts和mean_counts
if issparse(X):
# Current memory bottleneck for csr matrices:
var_metrics["n_cells_by_{expr_type}"] = X.getnnz(axis=0)
var_metrics["mean_{expr_type}"] = mean_variance_axis(X, axis=0)[0]
else:
var_metrics["n_cells_by_{expr_type}"] = np.count_nonzero(X, axis=0)
var_metrics["mean_{expr_type}"] = X.mean(axis=0)
- n_cells_by_counts 為計(jì)算 所有細(xì)胞中表達(dá)該基因的的細(xì)胞數(shù)目
- mean_counts 為所有細(xì)胞中的該基因表達(dá)量的平均值
log1p_mean_counts
if log1p:
var_metrics["log1p_mean_{expr_type}"] = np.log1p(
var_metrics["mean_{expr_type}"]
)
log1p(mean_counts)
pct_dropout_by_counts
var_metrics["pct_dropout_by_{expr_type}"] = (
1 - var_metrics["n_cells_by_{expr_type}"] / X.shape[0]
) * 100
n_cells_by_counts 為 所有細(xì)胞中表達(dá)該基因的的細(xì)胞數(shù)目威创, pct_dropout_by_counts為細(xì)胞中未表達(dá)基因占總的細(xì)胞總數(shù)的百分比
total_counts
var_metrics["total_{expr_type}"] = np.ravel(X.sum(axis=0))
所有細(xì)胞中落午,基因的表達(dá)量總和
log1p_total_counts
if log1p:
var_metrics["log1p_total_{expr_type}"] = np.log1p(
var_metrics["total_{expr_type}"]
)
log1p(所有細(xì)胞中基因的表達(dá)量總和)
收尾
# Relabel
new_colnames = []
for col in var_metrics.columns:
new_colnames.append(col.format(**locals()))
var_metrics.columns = new_colnames
if inplace:
adata.var[var_metrics.columns] = var_metrics
else:
return var_metrics
若是inplace為真,則將計(jì)算的這些指標(biāo)添加到adata.var肚豺, 否則直接返回指標(biāo)數(shù)據(jù)