Source code for celltypist.classifier

import os
import sys
from typing import Optional, Union
import scanpy as sc
from anndata import AnnData
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from .models import Model
from . import logger
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)
try:
    import rapids_singlecell as rsc
except ImportError:
    pass


[docs] class AnnotationResult(): """ Class that represents the result of a celltyping annotation process. Parameters ---------- labels A :class:`~pandas.DataFrame` object returned from the celltyping process, showing the predicted labels. decision_mat A :class:`~pandas.DataFrame` object returned from the celltyping process, showing the decision matrix. prob_mat A :class:`~pandas.DataFrame` object returned from the celltyping process, showing the probability matrix. adata An :class:`~anndata.AnnData` object representing the input object. Attributes ---------- predicted_labels Predicted labels including the individual prediction results and (if majority voting is done) majority voting results. decision_matrix Decision matrix with the decision score of each cell belonging to a given cell type. probability_matrix Probability matrix representing the probability each cell belongs to a given cell type (transformed from decision matrix by the sigmoid function). cell_count Number of input cells which undergo the prediction process. adata An :class:`~anndata.AnnData` object representing the input data. """ def __init__(self, labels: pd.DataFrame, decision_mat: pd.DataFrame, prob_mat: pd.DataFrame, adata: AnnData): self.predicted_labels = labels self.decision_matrix = decision_mat self.probability_matrix = prob_mat self.adata = adata self.cell_count = labels.shape[0]
[docs] def summary_frequency(self, by: str = 'predicted_labels') -> pd.DataFrame: """ Get the frequency of cells belonging to each cell type predicted by celltypist. Parameters ---------- by Column name of :attr:`~celltypist.classifier.AnnotationResult.predicted_labels` specifying the prediction type which the summary is based on. Set to `'majority_voting'` if you want to summarize for the majority voting classifier. (Default: `'predicted_labels'`) Returns ---------- :class:`~pandas.DataFrame` A :class:`~pandas.DataFrame` object with cell type frequencies. """ unique, counts = np.unique(self.predicted_labels[by], return_counts=True) df = pd.DataFrame(list(zip(unique, counts)), columns=["celltype", "counts"]) df.sort_values(['counts'], ascending=False, inplace=True) return df
[docs] def to_adata(self, insert_labels: bool = True, insert_conf: bool = True, insert_conf_by: str = 'predicted_labels', insert_decision: bool = False, insert_prob: bool = False, prefix: str = '') -> AnnData: """ Insert the predicted labels, decision or probability matrix, and (if majority voting is done) majority voting results into the AnnData object. Parameters ---------- insert_labels Whether to insert the predicted cell type labels and (if majority voting is done) majority voting-based labels into the AnnData object. (Default: `True`) insert_conf Whether to insert the confidence scores into the AnnData object. (Default: `True`) insert_conf_by Column name of :attr:`~celltypist.classifier.AnnotationResult.predicted_labels` specifying the prediction type which the confidence scores are based on. Setting to `'majority_voting'` will insert the confidence scores corresponding to the majority-voting result. (Default: `'predicted_labels'`) insert_decision Whether to insert the decision matrix into the AnnData object. (Default: `False`) insert_prob Whether to insert the probability matrix into the AnnData object. This will override the decision matrix even when `insert_decision` is set to `True`. (Default: `False`) prefix Prefix for the inserted columns in the AnnData object. Default to no prefix used. Returns ---------- :class:`~anndata.AnnData` Depending on whether majority voting is done, an :class:`~anndata.AnnData` object with the following columns (prefixed with `prefix`) added to the observation metadata: 1) **predicted_labels**, individual prediction outcome for each cell. 2) **over_clustering**, over-clustering result for the cells. 3) **majority_voting**, the cell type label assigned to each cell after the majority voting process. 4) **conf_score**, the confidence score of each cell. 5) **name of each cell type**, which represents the decision scores (or probabilities if `insert_prob` is `True`) of a given cell type across cells. """ if insert_labels: self.adata.obs[[f"{prefix}{x}" for x in self.predicted_labels.columns]] = self.predicted_labels if insert_conf: if insert_conf_by == 'predicted_labels': self.adata.obs[f"{prefix}conf_score"] = self.probability_matrix.max(axis=1).values elif insert_conf_by == 'majority_voting': if insert_conf_by not in self.predicted_labels: raise KeyError( f"🛑 Did not find the column `majority_voting` in the `AnnotationResult.predicted_labels`, perform majority voting beforehand or use `insert_conf_by = 'predicted_labels'` instead") self.adata.obs[f"{prefix}conf_score"] = [row[self.predicted_labels.majority_voting[index]] if self.predicted_labels.majority_voting[index] in row.index else row.max() for index, row in self.probability_matrix.iterrows()] else: raise KeyError( f"🛑 Unrecognized `insert_conf_by` value ('{insert_conf_by}'), should be one of `'predicted_labels'` or `'majority_voting'`") if insert_prob: self.adata.obs[[f"{prefix}{x}" for x in self.probability_matrix.columns]] = self.probability_matrix elif insert_decision: self.adata.obs[[f"{prefix}{x}" for x in self.decision_matrix.columns]] = self.decision_matrix return self.adata
[docs] def to_plots(self, folder: str, plot_probability: bool = False, format: str = 'pdf', prefix: str = '') -> None: """ Plot the celltyping and (if majority voting is done) majority-voting results. Parameters ---------- folder Path to a folder which stores the output figures. plot_probability Whether to also plot the decision score and probability distributions of each cell type across the test cells. If `True`, a number of figures will be generated (may take some time if the input data is large). (Default: `False`) format Format of output figures. Default to vector PDF files (note dots are still drawn with png backend). (Default: `'pdf'`) prefix Prefix for the output figures. Default to no prefix used. Returns ---------- None Depending on whether majority voting is done and `plot_probability`, multiple UMAP plots showing the prediction and majority voting results in the `folder`: 1) **predicted_labels**, individual prediction outcome for each cell overlaid onto the UMAP. 2) **over_clustering**, over-clustering result of the cells overlaid onto the UMAP. 3) **majority_voting**, the cell type label assigned to each cell after the majority voting process overlaid onto the UMAP. 4) **name of each cell type**, which represents the decision scores and probabilities of a given cell type distributed across cells overlaid onto the UMAP. """ if not os.path.isdir(folder): raise FileNotFoundError( f"🛑 Output folder {folder} does not exist. Please provide a valid folder") if 'X_umap' in self.adata.obsm: logger.info("👀 Detected existing UMAP coordinates, will plot the results accordingly") elif 'connectivities' in self.adata.obsp: logger.info("🧙 Generating UMAP coordinates based on the neighborhood graph") sc.tl.umap(self.adata) else: logger.info("🧙 Constructing the neighborhood graph and generating UMAP coordinates") adata = self.adata.copy() self.adata.obsm['X_pca'], self.adata.obsp['connectivities'], self.adata.obsp['distances'], self.adata.uns['neighbors'] = Classifier._construct_neighbor_graph(adata) sc.tl.umap(self.adata) logger.info("📈 Plotting the results") sc.settings.set_figure_params(figsize=[6.4, 6.4], format=format) self.adata.obs[self.predicted_labels.columns] = self.predicted_labels for column in self.predicted_labels: sc.pl.umap(self.adata, color = column, legend_loc = 'on data', show = False, legend_fontweight = 'normal', title = column.replace('_', ' ')) plt.savefig(os.path.join(folder, prefix + column + '.' + format)) plt.close() if plot_probability: for column in self.probability_matrix: self.adata.obs['decision score'] = self.decision_matrix[column] self.adata.obs['probability'] = self.probability_matrix[column] sc.pl.umap(self.adata, color = ['decision score', 'probability'], show = False) plt.savefig(os.path.join(folder, prefix + column.replace('/','_') + '.' + format)) plt.close() self.adata.obs.drop(columns=['decision score', 'probability'], inplace=True)
[docs] def to_table(self, folder: str, prefix: str = '', xlsx: bool = False) -> None: """ Write out tables of predicted labels, decision matrix, and probability matrix. Parameters ---------- folder Path to a folder which stores the output table/tables. prefix Prefix for the output table/tables. Default to no prefix used. xlsx Whether to merge output tables into a single Excel (.xlsx). (Default: `False`) Returns ---------- None Depending on `xlsx`, return table(s) of predicted labels, decision matrix and probability matrix. """ if not os.path.isdir(folder): raise FileNotFoundError( f"🛑 Output folder {folder} does not exist. Please provide a valid folder") if not xlsx: self.predicted_labels.to_csv(os.path.join(folder, f"{prefix}predicted_labels.csv")) self.decision_matrix.to_csv(os.path.join(folder, f"{prefix}decision_matrix.csv")) self.probability_matrix.to_csv(os.path.join(folder, f"{prefix}probability_matrix.csv")) else: with pd.ExcelWriter(os.path.join(folder, f"{prefix}annotation_result.xlsx")) as writer: self.predicted_labels.to_excel(writer, sheet_name="Predicted Labels") self.decision_matrix.to_excel(writer, sheet_name="Decision Matrix") self.probability_matrix.to_excel(writer, sheet_name="Probability Matrix")
def __repr__(self): base = f"CellTypist prediction result for {self.cell_count} query cells" base += f"\n predicted_labels: data frame with {self.predicted_labels.shape[1]} {'columns' if self.predicted_labels.shape[1] > 1 else 'column'} ({str(list(self.predicted_labels.columns))[1:-1]})" base += f"\n decision_matrix: data frame with {self.cell_count} query cells and {self.decision_matrix.shape[1]} cell types" base += f"\n probability_matrix: data frame with {self.cell_count} query cells and {self.probability_matrix.shape[1]} cell types" base += f"\n adata: AnnData object referred" return base
[docs] class Classifier(): """ Class that wraps the celltyping and majority voting processes. Parameters ---------- filename Path to the input count matrix (supported types are csv, txt, tsv, tab and mtx) or AnnData object (h5ad). If it's the former, a cell-by-gene format is desirable (see `transpose` for more information). Also accepts the input as an :class:`~anndata.AnnData` object already loaded in memory. Genes should be gene symbols. Non-expressed genes are preferred to be provided as well. model A :class:`~celltypist.models.Model` object that wraps the logistic Classifier and the StandardScaler, the path to the desired model file, or the model name. transpose Whether to transpose the input matrix. Set to `True` if `filename` is provided in a gene-by-cell format. (Default: `False`) gene_file Path to the file which stores each gene per line corresponding to the genes used in the provided mtx file. Ignored if `filename` is not provided in the mtx format. cell_file Path to the file which stores each cell per line corresponding to the cells used in the provided mtx file. Ignored if `filename` is not provided in the mtx format. Attributes ---------- filename Path to the input dataset. This attribute exists only when the input is a file path. adata An :class:`~anndata.AnnData` object which stores the log1p normalized expression data in `.X` or `.raw.X`. indata The expression matrix used for predictions stored in the log1p normalized format. indata_genes All the genes included in the input data. indata_names All the cells included in the input data. model A :class:`~celltypist.models.Model` object that wraps the logistic Classifier and the StandardScaler. """ def __init__(self, filename: Union[AnnData,str] = "", model: Union[Model,str] = "", transpose: bool = False, gene_file: Optional[str] = None, cell_file: Optional[str] = None): if isinstance(model, str): model = Model.load(model) self.model = model if not filename: logger.warn(f"📭 No input file provided to the classifier") return if isinstance(filename, str): self.filename = filename logger.info(f"📁 Input file is '{self.filename}'") logger.info(f"⏳ Loading data") if isinstance(filename, str) and filename.endswith(('.csv', '.txt', '.tsv', '.tab', '.mtx', '.mtx.gz')): self.adata = sc.read(self.filename) if transpose: self.adata = self.adata.transpose() if self.filename.endswith(('.mtx', '.mtx.gz')): if (gene_file is None) or (cell_file is None): raise FileNotFoundError( "🛑 Missing `gene_file` and/or `cell_file`. Please provide both arguments together with the input mtx file") genes_mtx = pd.read_csv(gene_file, header=None)[0].values cells_mtx = pd.read_csv(cell_file, header=None)[0].values if len(genes_mtx) != self.adata.n_vars: raise ValueError( f"🛑 The number of genes in {gene_file} does not match the number of genes in {self.filename}") if len(cells_mtx) != self.adata.n_obs: raise ValueError( f"🛑 The number of cells in {cell_file} does not match the number of cells in {self.filename}") self.adata.var_names = genes_mtx self.adata.obs_names = cells_mtx if not float(self.adata.X[:1000].max()).is_integer(): logger.warn(f"⚠️ Warning: the input file seems not a raw count matrix. The prediction result may not be accurate") if (self.adata.n_vars >= 100000) or (len(self.adata.var_names[0]) >= 30) or (len(self.adata.obs_names.intersection(['GAPDH', 'ACTB', 'CALM1', 'PTPRC', 'MALAT1'])) >= 1): logger.warn(f"⚠️ The input matrix is detected to be a gene-by-cell matrix, will transpose it") self.adata = self.adata.transpose() self.adata.var_names_make_unique() sc.pp.normalize_total(self.adata, target_sum=1e4) sc.pp.log1p(self.adata) self.indata = self.adata.X self.indata_genes = self.adata.var_names self.indata_names = self.adata.obs_names elif isinstance(filename, AnnData) or (isinstance(filename, str) and filename.endswith('.h5ad')): self.adata = sc.read(filename) if isinstance(filename, str) else filename self.adata.var_names_make_unique() if (self.adata.X[:1000].min() < 0) or (self.adata.X[:1000].max() > 9.22): if not self.adata.raw: raise ValueError( "🛑 Invalid expression matrix in `.X`, expect log1p normalized expression to 10000 counts per cell") elif (self.adata.raw.X[:1000].min() < 0) or (self.adata.raw.X[:1000].max() > 9.22): raise ValueError( "🛑 Invalid expression matrix in both `.X` and `.raw.X`, expect log1p normalized expression to 10000 counts per cell") else: logger.info("👀 Invalid expression matrix in `.X`, expect log1p normalized expression to 10000 counts per cell; will use `.raw.X` instead") self.indata = self.adata.raw.X self.indata_genes = self.adata.raw.var_names self.indata_names = self.adata.raw.obs_names else: self.indata = self.adata.X self.indata_genes = self.adata.var_names self.indata_names = self.adata.obs_names if np.abs(np.expm1(self.indata[0]).sum()-10000) > 1: logger.warn(f"⚠️ Warning: invalid expression matrix, expect ALL genes and log1p normalized expression to 10000 counts per cell. The prediction result may not be accurate") else: raise ValueError( "🛑 Invalid input. Supported types: .csv, .txt, .tsv, .tab, .mtx, .mtx.gz and .h5ad, or AnnData loaded in memory") logger.info(f"🔬 Input data has {self.indata.shape[0]} cells and {len(self.indata_genes)} genes")
[docs] def celltype(self, mode: str = 'best match', p_thres: float = 0.5) -> AnnotationResult: """ Run celltyping jobs to predict cell types of input data. Parameters ---------- mode The way cell prediction is performed. For each query cell, the default (`'best match'`) is to choose the cell type with the largest score/probability as the final prediction. Setting to `'prob match'` will enable a multi-label classification, which assigns 0 (i.e., unassigned), 1, or >=2 cell type labels to each query cell. (Default: `'best match'`) p_thres Probability threshold for the multi-label classification. Ignored if `mode` is `'best match'`. (Default: 0.5) Returns ---------- :class:`~celltypist.classifier.AnnotationResult` An :class:`~celltypist.classifier.AnnotationResult` object. Four important attributes within this class are: 1) :attr:`~celltypist.classifier.AnnotationResult.predicted_labels`, predicted labels from celltypist. 2) :attr:`~celltypist.classifier.AnnotationResult.decision_matrix`, decision matrix from celltypist. 3) :attr:`~celltypist.classifier.AnnotationResult.probability_matrix`, probability matrix from celltypist. 4) :attr:`~celltypist.classifier.AnnotationResult.adata`, AnnData object representation of the input data. """ logger.info(f"🔗 Matching reference genes in the model") k_x = np.isin(self.indata_genes, self.model.classifier.features) if k_x.sum() == 0: raise ValueError( f"🛑 No features overlap with the model. Please provide gene symbols") else: logger.info(f"🧬 {k_x.sum()} features used for prediction") k_x_idx = np.where(k_x)[0] #self.indata = self.indata[:, k_x_idx] self.indata_genes = self.indata_genes[k_x_idx] lr_idx = pd.DataFrame(self.model.classifier.features, columns=['features']).reset_index().set_index('features').loc[self.indata_genes, 'index'].values logger.info(f"⚖️ Scaling input data") means_ = self.model.scaler.mean_[lr_idx] if self.model.scaler.with_mean else 0 sds_ = self.model.scaler.scale_[lr_idx] self.indata = (self.indata[:, k_x_idx] - means_) / sds_ self.indata[self.indata > 10] = 10 ni, fs, cf = self.model.classifier.n_features_in_, self.model.classifier.features, self.model.classifier.coef_ self.model.classifier.n_features_in_ = lr_idx.size self.model.classifier.features = self.model.classifier.features[lr_idx] self.model.classifier.coef_ = self.model.classifier.coef_[:, lr_idx] logger.info("🖋️ Predicting labels") decision_mat, prob_mat, lab = self.model.predict_labels_and_prob(self.indata, mode = mode, p_thres = p_thres) logger.info("✅ Prediction done!") #restore model after prediction self.model.classifier.n_features_in_, self.model.classifier.features, self.model.classifier.coef_ = ni, fs, cf cells = self.indata_names return AnnotationResult(pd.DataFrame(lab, columns=['predicted_labels'], index=cells, dtype='category'), pd.DataFrame(decision_mat, columns=self.model.classifier.classes_, index=cells), pd.DataFrame(prob_mat, columns=self.model.classifier.classes_, index=cells), self.adata)
@staticmethod def _construct_neighbor_graph(adata: AnnData, use_GPU: bool = False) -> tuple: """Construct a neighborhood graph. This function is for internal use.""" fsc = rsc if use_GPU else sc # fix for adata.uns['log1p']['base'] error if 'log1p' in adata.uns.keys(): if isinstance(adata.uns['log1p'], dict) and 'base' not in adata.uns['log1p'].keys(): adata.uns['log1p']['base'] = None if 'X_pca' not in adata.obsm.keys(): if adata.X[:1000].min() < 0: adata = adata.raw.to_adata() if use_GPU: fsc.get.anndata_to_GPU(adata) if 'highly_variable' not in adata.var: sc.pp.filter_genes(adata, min_cells=5) fsc.pp.highly_variable_genes(adata, n_top_genes = min([2500, adata.n_vars])) adata = adata[:, adata.var.highly_variable] fsc.pp.scale(adata, max_value=10) fsc.pp.pca(adata, n_comps=50) fsc.pp.neighbors(adata, n_neighbors=10, n_pcs=50) return adata.obsm['X_pca'], adata.obsp['connectivities'], adata.obsp['distances'], adata.uns['neighbors']
[docs] def over_cluster(self, resolution: Optional[float] = None, use_GPU: bool = False) -> pd.Series: """ Over-clustering input data with a canonical Scanpy pipeline. A neighborhood graph will be used (or constructed if not found) for the over-clustering. Parameters ---------- resolution Resolution parameter for leiden clustering which controls the coarseness of the clustering. Default to 5, 10, 15, 20, 25 and 30 for datasets with cell numbers less than 5k, 20k, 40k, 100k, 200k and above, respectively. use_GPU Whether to use GPU for over clustering on the basis of `rapids-singlecell`. (Default: `False`) Returns ---------- :class:`~pandas.Series` A :class:`~pandas.Series` object showing the over-clustering result. """ if use_GPU and 'rapids_singlecell' not in sys.modules: logger.warn("⚠️ Warning: rapids_singlecell is not installed but required for GPU running, will switch back to CPU") use_GPU = False if 'connectivities' not in self.adata.obsp: logger.info("👀 Can not detect a neighborhood graph, will construct one before the over-clustering") adata = self.adata.copy() self.adata.obsm['X_pca'], self.adata.obsp['connectivities'], self.adata.obsp['distances'], self.adata.uns['neighbors'] = Classifier._construct_neighbor_graph(adata, use_GPU) else: logger.info("👀 Detected a neighborhood graph in the input object, will run over-clustering on the basis of it") if resolution is None: if self.adata.n_obs < 5000: resolution = 5 elif self.adata.n_obs < 20000: resolution = 10 elif self.adata.n_obs < 40000: resolution = 15 elif self.adata.n_obs < 100000: resolution = 20 elif self.adata.n_obs < 200000: resolution = 25 else: resolution = 30 logger.info(f"⛓️ Over-clustering input data with resolution set to {resolution}") if use_GPU: rsc.tl.leiden(self.adata, resolution=resolution, key_added='over_clustering') else: sc.tl.leiden(self.adata, resolution=resolution, key_added='over_clustering') return self.adata.obs.pop('over_clustering')
[docs] @staticmethod def majority_vote(predictions: AnnotationResult, over_clustering: Union[list, tuple, np.ndarray, pd.Series, pd.Index], min_prop: float = 0) -> AnnotationResult: """ Majority vote the celltypist predictions using the result from the over-clustering. Parameters ---------- predictions An :class:`~celltypist.classifier.AnnotationResult` object containing the :attr:`~celltypist.classifier.AnnotationResult.predicted_labels`. over_clustering A list, tuple, numpy array, pandas series or index containing the over-clustering information. min_prop For the dominant cell type within a subcluster, the minimum proportion of cells required to support naming of the subcluster by this cell type. (Default: 0) Returns ---------- :class:`~celltypist.classifier.AnnotationResult` An :class:`~celltypist.classifier.AnnotationResult` object. Four important attributes within this class are: 1) :attr:`~celltypist.classifier.AnnotationResult.predicted_labels`, predicted labels from celltypist. 2) :attr:`~celltypist.classifier.AnnotationResult.decision_matrix`, decision matrix from celltypist. 3) :attr:`~celltypist.classifier.AnnotationResult.probability_matrix`, probability matrix from celltypist. 4) :attr:`~celltypist.classifier.AnnotationResult.adata`, AnnData object representation of the input data. """ if isinstance(over_clustering, (list, tuple)): over_clustering = np.array(over_clustering) logger.info("🗳️ Majority voting the predictions") votes = pd.crosstab(predictions.predicted_labels['predicted_labels'], over_clustering) majority = votes.idxmax(axis=0).astype(str) freqs = (votes / votes.sum(axis=0).values).max(axis=0) majority[freqs < min_prop] = 'Heterogeneous' majority = majority[over_clustering].reset_index() majority.index = predictions.predicted_labels.index majority.columns = ['over_clustering', 'majority_voting'] majority['majority_voting'] = majority['majority_voting'].astype('category') predictions.predicted_labels = predictions.predicted_labels.join(majority) logger.info("✅ Majority voting done!") return predictions