import numpy as np
import pandas as pd
import scanpy as sc
from anndata import AnnData
from sklearn.preprocessing import StandardScaler,LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDClassifier
from sklearn import __version__ as skv
from typing import Optional, Union
from .models import Model
from . import logger
from scipy.sparse import spmatrix
from datetime import datetime
import sys
try:
from cuml import LogisticRegression as cuLogisticRegression
except ImportError:
pass
def _to_vector(_vector_or_file):
"""
For internal use. Turn a file into an array.
"""
if isinstance(_vector_or_file, str):
try:
return pd.read_csv(_vector_or_file, header=None)[0].values
except Exception as e:
raise Exception(
f"🛑 {e}")
else:
return _vector_or_file
def _to_array(_array_like) -> np.ndarray:
"""
For internal use. Turn an array-like object into an array.
"""
if isinstance(_array_like, pd.DataFrame):
return _array_like.values
elif isinstance(_array_like, spmatrix):
return _array_like.toarray()
elif isinstance(_array_like, np.matrix):
return np.array(_array_like)
elif isinstance(_array_like, np.ndarray):
return _array_like
else:
raise TypeError(
f"🛑 Please provide a valid array-like object as input")
def _prepare_data(X, labels, genes, transpose) -> tuple:
"""
For internal use. Prepare data for celltypist training.
"""
if (X is None) or (labels is None):
raise Exception(
"🛑 Missing training data and/or training labels. Please provide both arguments")
if isinstance(X, AnnData) or (isinstance(X, str) and X.endswith('.h5ad')):
adata = sc.read(X) if isinstance(X, str) else X
adata.var_names_make_unique()
if adata.X[:1000].min() < 0:
logger.info("👀 Detected scaled expression in the input data, will try the .raw attribute")
try:
indata = adata.raw.X
genes = adata.raw.var_names
except Exception as e:
raise Exception(
f"🛑 Fail to use the .raw attribute in the input object. {e}")
else:
indata = adata.X
genes = adata.var_names
if isinstance(labels, str) and (labels in adata.obs):
labels = adata.obs[labels]
else:
labels = _to_vector(labels)
elif isinstance(X, str) and X.endswith(('.csv', '.txt', '.tsv', '.tab', '.mtx', '.mtx.gz')):
adata = sc.read(X)
if transpose:
adata = adata.transpose()
if X.endswith(('.mtx', '.mtx.gz')):
if genes is None:
raise Exception(
"🛑 Missing `genes`. Please provide this argument together with the input mtx file")
genes = _to_vector(genes)
if len(genes) != adata.n_vars:
raise ValueError(
f"🛑 The number of genes provided does not match the number of genes in {X}")
adata.var_names = np.array(genes)
adata.var_names_make_unique()
if not float(adata.X[:1000].max()).is_integer():
logger.warn(f"⚠️ Warning: the input file seems not a raw count matrix. The trained model may be biased")
sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)
indata = adata.X
genes = adata.var_names
labels = _to_vector(labels)
elif isinstance(X, str):
raise ValueError(
"🛑 Invalid input. Supported types: .csv, .txt, .tsv, .tab, .mtx, .mtx.gz and .h5ad")
else:
logger.info("👀 The input training data is processed as an array-like object")
indata = X
if transpose:
indata = indata.transpose()
if isinstance(indata, pd.DataFrame):
genes = indata.columns
else:
if genes is None:
raise Exception(
"🛑 Missing `genes`. Please provide this argument together with the input training data")
genes = _to_vector(genes)
labels = _to_vector(labels)
return indata, labels, genes
def _LRClassifier(indata, labels, C, solver, max_iter, n_jobs, **kwargs) -> LogisticRegression:
"""
For internal use. Get the logistic Classifier.
"""
no_cells = len(labels)
if solver is None:
solver = 'sag' if no_cells>50000 else 'lbfgs'
elif solver not in ('liblinear', 'lbfgs', 'newton-cg', 'sag', 'saga'):
raise ValueError(
f"🛑 Invalid `solver`, should be one of `'liblinear'`, `'lbfgs'`, `'newton-cg'`, `'sag'`, and `'saga'`")
logger.info(f"🏋️ Training data using logistic regression")
if (no_cells > 100000) and (indata.shape[1] > 10000):
logger.warn(f"⚠️ Warning: it may take a long time to train this dataset with {no_cells} cells and {indata.shape[1]} genes, try to downsample cells and/or restrict genes to a subset (e.g., hvgs)")
classifier = LogisticRegression(C = C, solver = solver, max_iter = max_iter, multi_class = 'ovr', n_jobs = n_jobs, **kwargs)
classifier.fit(indata, labels)
return classifier
def _cuLRClassifier(indata, labels, C, solver, max_iter, **kwargs) -> LogisticRegression:
"""
For internal use. Get the cuml logistic Classifier.
"""
solver = 'qn' if solver is None else solver
if solver != 'qn':
raise ValueError(
f"🛑 Invalid `solver`, should be `'qn'` to run on GPU")
le = LabelEncoder()
labels_ = le.fit_transform(labels)
logger.info(f"🏋️ Training data using logistic regression on GPU")
no_cells = len(labels)
if (no_cells > 100000) and (indata.shape[1] > 10000):
logger.warn(f"⚠️ Warning: it may take a long time to train this dataset with {no_cells} cells and {indata.shape[1]} genes, try to downsample cells and/or restrict genes to a subset (e.g., hvgs)")
classifier_ = cuLogisticRegression(C = C, max_iter = max_iter, solver = solver, **kwargs)
classifier_.fit(indata, labels_)
classifier = LogisticRegression(multi_class = 'ovr')
for attr in ['C', 'class_weight', 'fit_intercept', 'l1_ratio', 'max_iter', 'penalty', 'tol', 'solver', 'coef_', 'intercept_', 'verbose']:
setattr(classifier, attr, getattr(classifier_, attr))
classifier.classes_ = le.inverse_transform(classifier_.classes_)
return classifier
def _SGDClassifier(indata, labels,
alpha, max_iter, n_jobs,
mini_batch, batch_number, batch_size, epochs, balance_cell_type, **kwargs) -> SGDClassifier:
"""
For internal use. Get the SGDClassifier.
"""
loss_mode = 'log_loss' if float(skv[:3]) >= 1.1 else 'log'
classifier = SGDClassifier(loss = loss_mode, alpha = alpha, max_iter = max_iter, n_jobs = n_jobs, **kwargs)
if not mini_batch:
logger.info(f"🏋️ Training data using SGD logistic regression")
if (len(labels) > 100000) and (indata.shape[1] > 10000):
logger.warn(f"⚠️ Warning: it may take a long time to train this dataset with {len(labels)} cells and {indata.shape[1]} genes, try to downsample cells and/or restrict genes to a subset (e.g., hvgs)")
classifier.fit(indata, labels)
else:
logger.info(f"🏋️ Training data using mini-batch SGD logistic regression")
no_cells = len(labels)
if no_cells < 10000:
logger.warn(f"⚠️ Warning: the number of cells ({no_cells}) is not big enough to conduct a proper mini-batch training. You may consider using traditional SGD classifier (mini_batch = False)")
if no_cells <= batch_size:
raise ValueError(
f"🛑 Number of cells ({no_cells}) is fewer than the batch size ({batch_size}). Decrease `batch_size`, or use SGD directly (mini_batch = False)")
no_cells_sample = min([batch_number*batch_size, no_cells])
starts = np.arange(0, no_cells_sample, batch_size)
if balance_cell_type:
celltype_freq = np.unique(labels, return_counts = True)
len_celltype = len(celltype_freq[0])
mapping = pd.Series(1 / (celltype_freq[1]*len_celltype), index = celltype_freq[0])
p = mapping[labels].values
for epoch in range(1, (epochs+1)):
logger.info(f"⏳ Epochs: [{epoch}/{epochs}]")
if not balance_cell_type:
sampled_cell_index = np.random.choice(no_cells, no_cells_sample, replace = False)
else:
sampled_cell_index = np.random.choice(no_cells, no_cells_sample, replace = False, p = p)
for start in starts:
classifier.partial_fit(indata[sampled_cell_index[start:start+batch_size]], labels[sampled_cell_index[start:start+batch_size]], classes = np.unique(labels))
return classifier
[docs]
def train(X = None,
labels: Optional[Union[str, list, tuple, np.ndarray, pd.Series, pd.Index]] = None,
genes: Optional[Union[str, list, tuple, np.ndarray, pd.Series, pd.Index]] = None,
transpose_input: bool = False,
with_mean: bool = True,
check_expression: bool = True,
#LR param
C: float = 1.0, solver: Optional[str] = None, max_iter: Optional[int] = None, n_jobs: Optional[int] = None,
#SGD param
use_SGD: bool = False, alpha: float = 0.0001,
#GPU param
use_GPU: bool = False,
#mini-batch
mini_batch: bool = False, batch_number: int = 100, batch_size: int = 1000, epochs: int = 10, balance_cell_type: bool = False,
#feature selection
feature_selection: bool = False, top_genes: int = 300,
#description
date: str = '', details: str = '', url: str = '', source: str = '', version: str = '',
#other param
**kwargs
) -> Model:
"""
Train a celltypist model using mini-batch (optional) logistic classifier with a global solver or stochastic gradient descent (SGD) learning.
Parameters
----------
X
Path to the input count matrix (supported types are csv, txt, tsv, tab and mtx) or AnnData (h5ad).
Also accepts the input as an :class:`~anndata.AnnData` object, or any array-like objects already loaded in memory.
See `check_expression` for detailed format requirements.
A cell-by-gene format is desirable (see `transpose_input` for more information).
labels
Path to the file containing cell type label per line corresponding to the cells in `X`.
Also accepts any list-like objects already loaded in memory (such as an array).
If `X` is specified as an AnnData, this argument can also be set as a column name from cell metadata.
genes
Path to the file containing one gene per line corresponding to the genes in `X`.
Also accepts any list-like objects already loaded in memory (such as an array).
Note `genes` will be extracted from `X` where possible (e.g., `X` is an AnnData or data frame).
transpose_input
Whether to transpose the input matrix. Set to `True` if `X` is provided in a gene-by-cell format.
(Default: `False`)
with_mean
Whether to subtract the mean values during data scaling. Setting to `False` can lower the memory usage when the input is a sparse matrix but may slightly reduce the model performance.
(Default: `True`)
check_expression
Check whether the expression matrix in the input data is supplied as required.
Except the case where a path to the raw count table file is specified, all other inputs for `X` should be in log1p normalized expression to 10000 counts per cell.
Set to `False` if you want to train the data regardless of the expression formats.
(Default: `True`)
C
Inverse of L2 regularization strength for traditional logistic classifier. A smaller value can possibly improve model generalization while at the cost of decreased accuracy.
This argument is ignored if SGD learning is enabled (`use_SGD = True`).
(Default: 1.0)
solver
Algorithm to use in the optimization problem for traditional logistic classifier.
The default behavior is to choose the solver according to the size of the input data.
This argument is ignored if SGD learning is enabled (`use_SGD = True`).
max_iter
Maximum number of iterations before reaching the minimum of the cost function.
Try to decrease `max_iter` if the cost function does not converge for a long time.
This argument is for both traditional and SGD logistic classifiers, and will be ignored if mini-batch SGD training is conducted (`use_SGD = True` and `mini_batch = True`).
Default to 200, 500, and 1000 for large (>500k cells), medium (50-500k), and small (<50k) datasets, respectively.
n_jobs
Number of CPUs used. Default to one CPU. `-1` means all CPUs are used.
This argument is for both traditional and SGD logistic classifiers.
use_SGD
Whether to implement SGD learning for the logistic classifier.
(Default: `False`)
alpha
L2 regularization strength for SGD logistic classifier. A larger value can possibly improve model generalization while at the cost of decreased accuracy.
This argument is ignored if SGD learning is disabled (`use_SGD = False`).
(Default: 0.0001)
use_GPU
Whether to use GPU for logistic classifier.
This argument is ignored if SGD learning is enabled (`use_SGD = True`).
(Default: `False`)
mini_batch
Whether to implement mini-batch training for the SGD logistic classifier.
Setting to `True` may improve the training efficiency for large datasets (for example, >100k cells).
This argument is ignored if SGD learning is disabled (`use_SGD = False`).
(Default: `False`)
batch_number
The number of batches used for training in each epoch. Each batch contains `batch_size` cells.
For datasets which cannot be binned into `batch_number` batches, all batches will be used.
This argument is relevant only if mini-batch SGD training is conducted (`use_SGD = True` and `mini_batch = True`).
(Default: 100)
batch_size
The number of cells within each batch.
This argument is relevant only if mini-batch SGD training is conducted (`use_SGD = True` and `mini_batch = True`).
(Default: 1000)
epochs
The number of epochs for the mini-batch training procedure.
The default values of `batch_number`, `batch_size`, and `epochs` together allow observing ~10^6 training cells.
This argument is relevant only if mini-batch SGD training is conducted (`use_SGD = True` and `mini_batch = True`).
(Default: 10)
balance_cell_type
Whether to balance the cell type frequencies in mini-batches during each epoch.
Setting to `True` will sample rare cell types with a higher probability, ensuring close-to-even cell type distributions in mini-batches.
This argument is relevant only if mini-batch SGD training is conducted (`use_SGD = True` and `mini_batch = True`).
(Default: `False`)
feature_selection
Whether to perform two-pass data training where the first round is used for selecting important features/genes using SGD learning.
If `True`, the training time will be longer.
(Default: `False`)
top_genes
The number of top genes selected from each class/cell-type based on their absolute regression coefficients.
The final feature set is combined across all classes (i.e., union).
(Default: 300)
date
Free text of the date of the model. Default to the time when the training is completed.
details
Free text of the description of the model.
url
Free text of the (possible) download url of the model.
source
Free text of the source (publication, database, etc.) of the model.
version
Free text of the version of the model.
**kwargs
Other keyword arguments passed to :class:`~sklearn.linear_model.LogisticRegression` (`use_SGD = False` and `use_GPU = False`), :class:`cuml.LogisticRegression` (`use_SGD = False` and `use_GPU = True`), or :class:`~sklearn.linear_model.SGDClassifier` (`use_SGD = True`).
Returns
----------
:class:`~celltypist.models.Model`
An instance of the :class:`~celltypist.models.Model` trained by celltypist.
"""
#Test GPU
if not use_SGD and use_GPU and 'cuml' not in sys.modules:
logger.warn(f"⚠️ Warning: to run logistic regression on GPU, please first install cuml")
return
#prepare
logger.info("🍳 Preparing data before training")
indata, labels, genes = _prepare_data(X, labels, genes, transpose_input)
if isinstance(indata, pd.DataFrame):
indata = indata.values
elif with_mean and isinstance(indata, spmatrix):
indata = indata.toarray()
labels = np.array(labels)
genes = np.array(genes)
#check
if check_expression and (np.abs(np.expm1(indata[0]).sum()-10000) > 1):
raise ValueError(
"🛑 Invalid expression matrix, expect log1p normalized expression to 10000 counts per cell")
if len(labels) != indata.shape[0]:
raise ValueError(
f"🛑 Length of training labels ({len(labels)}) does not match the number of input cells ({indata.shape[0]})")
if len(genes) != indata.shape[1]:
raise ValueError(
f"🛑 The number of genes ({len(genes)}) provided does not match the number of genes in the training data ({indata.shape[1]})")
#filter
flag = indata.sum(axis = 0) == 0
if isinstance(flag, np.matrix):
flag = flag.A1
if flag.sum() > 0:
logger.info(f"✂️ {flag.sum()} non-expressed genes are filtered out")
#indata = indata[:, ~flag]
genes = genes[~flag]
#report data stats
logger.info(f"🔬 Input data has {indata.shape[0]} cells and {(~flag).sum()} genes")
#scaler
logger.info(f"⚖️ Scaling input data")
scaler = StandardScaler(with_mean = with_mean)
indata = scaler.fit_transform(indata[:, ~flag] if flag.sum() > 0 else indata)
indata[indata > 10] = 10
#sklearn (Cython) does not support very large sparse matrices for the time being
if isinstance(indata, spmatrix) and ((indata.indices.dtype == 'int64') or (indata.indptr.dtype == 'int64')):
indata = indata.toarray()
#max_iter
if max_iter is None:
if indata.shape[0] < 50000:
max_iter = 1000
elif indata.shape[0] < 500000:
max_iter = 500
else:
max_iter = 200
#classifier
if use_SGD or feature_selection:
classifier = _SGDClassifier(indata = indata, labels = labels, alpha = alpha, max_iter = max_iter, n_jobs = n_jobs, mini_batch = mini_batch, batch_number = batch_number, batch_size = batch_size, epochs = epochs, balance_cell_type = balance_cell_type, **kwargs)
elif use_GPU:
classifier = _cuLRClassifier(indata = indata, labels = labels, C = C, solver = solver, max_iter = max_iter, **kwargs)
else:
classifier = _LRClassifier(indata = indata, labels = labels, C = C, solver = solver, max_iter = max_iter, n_jobs = n_jobs, **kwargs)
#feature selection -> new classifier and scaler
if feature_selection:
logger.info(f"🔎 Selecting features")
if len(genes) <= top_genes:
raise ValueError(
f"🛑 The number of genes ({len(genes)}) is fewer than the `top_genes` ({top_genes}). Unable to perform feature selection")
gene_index = np.argpartition(np.abs(classifier.coef_), -top_genes, axis = 1)[:, -top_genes:]
gene_index = np.unique(gene_index)
logger.info(f"🧬 {len(gene_index)} features are selected")
genes = genes[gene_index]
#indata = indata[:, gene_index]
logger.info(f"🏋️ Starting the second round of training")
if use_SGD:
classifier = _SGDClassifier(indata = indata[:, gene_index], labels = labels, alpha = alpha, max_iter = max_iter, n_jobs = n_jobs, mini_batch = mini_batch, batch_number = batch_number, batch_size = batch_size, epochs = epochs, balance_cell_type = balance_cell_type, **kwargs)
elif use_GPU:
classifier = _cuLRClassifier(indata = indata[:, gene_index], labels = labels, C = C, solver = solver, max_iter = max_iter, **kwargs)
else:
classifier = _LRClassifier(indata = indata[:, gene_index], labels = labels, C = C, solver = solver, max_iter = max_iter, n_jobs = n_jobs, **kwargs)
scaler.mean_ = scaler.mean_[gene_index]
scaler.var_ = scaler.var_[gene_index]
scaler.scale_ = scaler.scale_[gene_index]
scaler.n_features_in_ = len(gene_index)
#model finalization
classifier.features = genes
classifier.n_features_in_ = len(genes)
if not date:
date = str(datetime.now())
description = {'date': date, 'details': details, 'url': url, 'source': source, 'version': version, 'number_celltypes': len(classifier.classes_)}
logger.info(f"✅ Model training done!")
return Model(classifier, scaler, description)