-
Notifications
You must be signed in to change notification settings - Fork 83
Expand file tree
/
Copy pathutils.py
More file actions
1670 lines (1354 loc) · 50.4 KB
/
utils.py
File metadata and controls
1670 lines (1354 loc) · 50.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import multiprocessing
import warnings
from math import ceil
from math import floor
from pathlib import Path
from typing import List
from typing import Literal
from typing import Optional
from typing import Tuple
from typing import Union
from typing import cast
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from scipy.linalg import solve # type: ignore
from scipy.optimize import minimize # type: ignore
from scipy.special import gammaln # type: ignore
from scipy.special import polygamma # type: ignore
from scipy.stats import norm # type: ignore
from sklearn.linear_model import LinearRegression # type: ignore
import pydeseq2
from pydeseq2.grid_search import grid_fit_alpha
from pydeseq2.grid_search import grid_fit_beta
from pydeseq2.grid_search import grid_fit_shrink_beta
def load_example_data(
modality: Literal["raw_counts", "metadata"] = "raw_counts",
dataset: Literal["synthetic"] = "synthetic",
debug: bool = False,
debug_seed: int = 42,
) -> pd.DataFrame:
"""Load synthetic example data.
May load either metadata or rna-seq data. For now, this function may only return the
synthetic data provided as part of this repo, but new datasets might be added in the
future.
Parameters
----------
modality : str
Data modality. "raw_counts" or "metadata".
dataset : str
The dataset for which to return gene expression data.
If "synthetic", will return the synthetic data that is used for CI unit tests.
(default: ``"synthetic"``).
debug : bool
If true, subsample 10 samples and 100 genes at random.
(Note that the "synthetic" dataset is already 10 features 100.)
(default: ``False``).
debug_seed : int
Seed for the debug mode. (default: ``42``).
Returns
-------
pandas.DataFrame
Requested data modality.
"""
assert modality in ["raw_counts", "metadata"], (
"The modality argument must be one of the following: " "raw_counts, metadata"
)
assert dataset in [
"synthetic"
], "The dataset argument must be one of the following: synthetic."
# Load data
datasets_path = Path(pydeseq2.__file__).parent.parent / "datasets"
if dataset == "synthetic":
path_to_data = datasets_path / "synthetic"
if Path(path_to_data).is_dir():
# Cast the Paths to strings to have coherent types wrt to the url case (that
# does not handle Paths), else mypy throws an error.
path_to_data_counts = str(path_to_data / "test_counts.csv")
path_to_data_metadata = str(path_to_data / "test_metadata.csv")
else:
# if the path does not exist (as is the case in RDT) load it from github
url_to_data = (
"https://raw.githubusercontent.com/owkin/"
"PyDESeq2/main/datasets/synthetic/"
)
path_to_data_counts = url_to_data + "/test_counts.csv"
path_to_data_metadata = url_to_data + "/test_metadata.csv"
if modality == "raw_counts":
df = pd.read_csv(
path_to_data_counts,
sep=",",
index_col=0,
).T
elif modality == "metadata":
df = pd.read_csv(
path_to_data_metadata,
sep=",",
index_col=0,
)
if debug:
# TODO: until we provide a larger dataset, this option is useless
# subsample 10 samples and 100 genes
df = df.sample(n=10, axis=0, random_state=debug_seed)
if modality == "raw_counts":
df = df.sample(n=100, axis="index", random_state=debug_seed)
return df
def test_valid_counts(counts: Union[pd.DataFrame, np.ndarray]) -> None:
"""Test that the count matrix contains valid inputs.
More precisely, test that inputs are non-negative integers.
Parameters
----------
counts : pandas.DataFrame or ndarray
Raw counts. One column per gene, rows are indexed by sample barcodes.
"""
if isinstance(counts, pd.DataFrame):
if counts.isna().any().any():
raise ValueError("NaNs are not allowed in the count matrix.")
if ~counts.apply(
lambda s: pd.to_numeric(s, errors="coerce").notnull().all()
).all():
raise ValueError("The count matrix should only contain numbers.")
else:
if np.isnan(counts).any().any():
raise ValueError("NaNs are not allowed in the count matrix.")
if not np.issubdtype(counts.dtype, np.number):
raise ValueError("The count matrix should only contain numbers.")
if (counts % 1 != 0).any().any():
raise ValueError("The count matrix should only contain integers.")
if (counts < 0).any().any():
raise ValueError("The count matrix should only contain non-negative values.")
def build_design_matrix(
metadata: pd.DataFrame,
design_factors: Union[str, List[str]] = "condition",
ref_level: Optional[List[str]] = None,
continuous_factors: Optional[List[str]] = None,
expanded: bool = False,
intercept: bool = True,
) -> pd.DataFrame:
"""Build design_matrix matrix for DEA.
Unless specified, the reference factor is chosen alphabetically.
Parameters
----------
metadata : pandas.DataFrame
DataFrame containing metadata information.
Must be indexed by sample barcodes.
design_factors : str or list
Name of the columns of metadata to be used as design_matrix variables.
(default: ``"condition"``).
ref_level : dict or None
An optional list of two strings of the form ``["factor", "ref_level"]``
specifying the factor of interest and the desired reference level, e.g.
``["condition", "A"]``. (default: ``None``).
continuous_factors : list or None
An optional list of continuous (as opposed to categorical) factors. Any factor
not in ``continuous_factors`` will be considered categorical (default: ``None``).
expanded : bool
If true, use one column per category. Else, use n-1 columns, for each n-level
categorical factor.
(default: ``False``).
intercept : bool
If true, add an intercept (a column containing only ones). (default: ``True``).
Returns
-------
pandas.DataFrame
A DataFrame with experiment design information (to split cohorts).
Indexed by sample barcodes.
"""
if isinstance(
design_factors, str
): # if there is a single factor, convert to singleton list
design_factors = [design_factors]
for factor in design_factors:
# Check that each factor has at least 2 levels
if len(np.unique(metadata[factor])) < 2:
raise ValueError(
f"Factors should take at least two values, but {factor} "
f"takes the single value '{np.unique(metadata[factor])}'."
)
# Check that level factors in the design don't contain underscores. If so, convert
# them to hyphens
warning_issued = False
for factor in design_factors:
if np.any(["_" in value for value in metadata[factor]]):
if not warning_issued:
warnings.warn(
"""Some factor levels in the design contain underscores ('_').
They will be converted to hyphens ('-').""",
UserWarning,
stacklevel=2,
)
warning_issued = True
metadata[factor] = metadata[factor].apply(lambda x: x.replace("_", "-"))
if continuous_factors is not None:
categorical_factors = [
factor for factor in design_factors if factor not in continuous_factors
]
else:
categorical_factors = design_factors
# Check that there is at least one categorical factor
if len(categorical_factors) > 0:
design_matrix = pd.get_dummies(
metadata[categorical_factors], drop_first=not expanded
)
if ref_level is not None:
if len(ref_level) != 2:
raise KeyError("The reference level should contain 2 strings.")
if ref_level[1] not in metadata[ref_level[0]].values:
raise KeyError(
f"The metadata data should contain a '{ref_level[0]}' column"
f" with a '{ref_level[1]}' level."
)
# Check that the reference level is not in the matrix (if unexpanded design)
ref_level_name = "_".join(ref_level)
if (not expanded) and ref_level_name in design_matrix.columns:
# Remove the reference level and add one
factor_cols = [
col for col in design_matrix.columns if col.startswith(ref_level[0])
]
missing_level = next(
level
for level in np.unique(metadata[ref_level[0]])
if f"{ref_level[0]}_{level}" not in design_matrix.columns
)
design_matrix[f"{ref_level[0]}_{missing_level}"] = 1 - design_matrix[
factor_cols
].sum(1)
design_matrix.drop(ref_level_name, axis="columns", inplace=True)
if not expanded:
# Add reference level as column name suffix
for factor in design_factors:
if ref_level is None or factor != ref_level[0]:
# The reference is the unique level that is no longer there
ref = next(
level
for level in np.unique(metadata[factor])
if f"{factor}_{level}" not in design_matrix.columns
)
else:
# The reference level is given as an argument
ref = ref_level[1]
design_matrix.columns = [
f"{col}_vs_{ref}" if col.startswith(factor) else col
for col in design_matrix.columns
]
else:
# There is no categorical factor in the design
design_matrix = pd.DataFrame(index=metadata.index)
if intercept:
design_matrix.insert(0, "intercept", 1)
# Convert categorical factors one-hot encodings to int
design_matrix = design_matrix.astype("int")
# Add continuous factors
if continuous_factors is not None:
for factor in continuous_factors:
# This factor should be numeric
design_matrix[factor] = pd.to_numeric(metadata[factor])
return design_matrix
def replace_underscores(factors: List[str]):
"""Replace all underscores from strings in a list by hyphens.
To be used on design factors to avoid bugs due to the reliance on
``str.split("_")`` in parts of the code.
Parameters
----------
factors : list
A list of strings which may contain underscores.
Returns
-------
list
A list of strings in which underscores were replaced by hyphens.
"""
return [factor.replace("_", "-") for factor in factors]
def dispersion_trend(
normed_mean: Union[float, np.ndarray],
coeffs: Union["pd.Series[float]", np.ndarray],
) -> Union[float, np.ndarray]:
r"""Return dispersion trend from normalized counts.
:math:`a_1/ \mu + a_0`.
Parameters
----------
normed_mean : float or ndarray
Mean of normalized counts for a given gene or set of genes.
coeffs : ndarray or pd.Series
Fitted dispersion trend coefficients :math:`a_0` and :math:`a_1`.
Returns
-------
float or ndarray
Dispersion trend :math:`a_1/ \mu + a_0`.
"""
if isinstance(coeffs, pd.Series):
return coeffs["a0"] + coeffs["a1"] / normed_mean
else:
return coeffs[0] + coeffs[1] / normed_mean
def nb_nll(
counts: np.ndarray, mu: np.ndarray, alpha: Union[float, np.ndarray]
) -> Union[float, np.ndarray]:
r"""Neg log-likelihood of a negative binomial of parameters ``mu`` and ``alpha``.
Mathematically, if ``counts`` is a vector of counting entries :math:`y_i`
then the likelihood of each entry :math:`y_i` to be drawn from a negative
binomial :math:`NB(\mu, \alpha)` is [1]
.. math::
p(y_i | \mu, \alpha) = \frac{\Gamma(y_i + \alpha^{-1})}{
\Gamma(y_i + 1)\Gamma(\alpha^{-1})
}
\left(\frac{1}{1 + \alpha \mu} \right)^{1/\alpha}
\left(\frac{\mu}{\alpha^{-1} + \mu} \right)^{y_i}
As a consequence, assuming there are :math:`n` entries,
the total negative log-likelihood for ``counts`` is
.. math::
\ell(\mu, \alpha) = \frac{n}{\alpha} \log(\alpha) +
\sum_i \left \lbrace
- \log \left( \frac{\Gamma(y_i + \alpha^{-1})}{
\Gamma(y_i + 1)\Gamma(\alpha^{-1})
} \right)
+ (\alpha^{-1} + y_i) \log (\alpha^{-1} + \mu)
- y_i \log \mu
\right \rbrace
This is implemented in this function.
Parameters
----------
counts : ndarray
Observations.
mu : ndarray
Mean of the distribution :math:`\mu`.
alpha : float or ndarray
Dispersion of the distribution :math:`\alpha`,
s.t. the variance is :math:`\mu + \alpha \mu^2`.
Returns
-------
float or ndarray
Negative log likelihood of the observations counts
following :math:`NB(\mu, \alpha)`.
Notes
-----
[1] https://en.wikipedia.org/wiki/Negative_binomial_distribution
"""
n = len(counts)
alpha_neg1 = 1 / alpha
logbinom = gammaln(counts + alpha_neg1) - gammaln(counts + 1) - gammaln(alpha_neg1)
if hasattr(alpha, "__len__") and len(alpha) > 1:
return (
alpha_neg1 * np.log(alpha)
- logbinom
+ (counts + alpha_neg1) * np.log(mu + alpha_neg1)
- (counts * np.log(mu))
).sum(0)
else:
return (
n * alpha_neg1 * np.log(alpha)
+ (
-logbinom
+ (counts + alpha_neg1) * np.log(alpha_neg1 + mu)
- counts * np.log(mu)
).sum()
)
def dnb_nll(counts: np.ndarray, mu: np.ndarray, alpha: float) -> float:
r"""Gradient of the negative log-likelihood of a negative binomial.
Unvectorized.
Parameters
----------
counts : ndarray
Observations.
mu : float
Mean of the distribution.
alpha : float
Dispersion of the distribution,
s.t. the variance is :math:`\mu + \alpha\mu^2`.
Returns
-------
float
Derivative of negative log likelihood of NB w.r.t. :math:`\alpha`.
"""
alpha_neg1 = 1 / alpha
ll_part = (
alpha_neg1**2
* (
polygamma(0, alpha_neg1)
- polygamma(0, counts + alpha_neg1)
+ np.log(1 + mu * alpha)
+ (counts - mu) / (mu + alpha_neg1)
).sum()
)
return -ll_part
def irls_solver(
counts: np.ndarray,
size_factors: np.ndarray,
design_matrix: np.ndarray,
disp: float,
min_mu: float = 0.5,
beta_tol: float = 1e-8,
min_beta: float = -30,
max_beta: float = 30,
optimizer: Literal["BFGS", "L-BFGS-B"] = "L-BFGS-B",
maxiter: int = 250,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, bool]:
r"""Fit a NB GLM wit log-link to predict counts from the design matrix.
See equations (1-2) in the DESeq2 paper.
Parameters
----------
counts : ndarray
Raw counts for a given gene.
size_factors : ndarray
Sample-wise scaling factors (obtained from median-of-ratios).
design_matrix : ndarray
Design matrix.
disp : float
Gene-wise dispersion prior.
min_mu : float
Lower bound on estimated means, to ensure numerical stability.
(default: ``0.5``).
beta_tol : float
Stopping criterion for IRWLS:
:math:`\vert dev - dev_{old}\vert / \vert dev + 0.1 \vert < \beta_{tol}`.
(default: ``1e-8``).
min_beta : float
Lower-bound on LFC. (default: ``-30``).
max_beta : float
Upper-bound on LFC. (default: ``-30``).
optimizer : str
Optimizing method to use in case IRLS starts diverging.
Accepted values: 'BFGS' or 'L-BFGS-B'.
NB: only 'L-BFGS-B' ensures that LFCS will
lay in the [min_beta, max_beta] range. (default: ``'L-BFGS-B'``).
maxiter : int
Maximum number of IRLS iterations to perform before switching to L-BFGS-B.
(default: ``250``).
Returns
-------
beta: ndarray
Fitted (basemean, lfc) coefficients of negative binomial GLM.
mu: ndarray
Means estimated from size factors and beta: :math:`\mu = s_{ij} \exp(\beta^t X)`.
H: ndarray
Diagonal of the :math:`W^{1/2} X (X^t W X)^-1 X^t W^{1/2}` covariance matrix.
converged: bool
Whether IRLS or the optimizer converged. If not and if dimension allows it,
perform grid search.
"""
assert optimizer in ["BFGS", "L-BFGS-B"]
num_vars = design_matrix.shape[1]
X = design_matrix
# if full rank, estimate initial betas for IRLS below
if np.linalg.matrix_rank(X) == num_vars:
Q, R = np.linalg.qr(X)
y = np.log(counts / size_factors + 0.1)
beta_init = solve(R, Q.T @ y)
beta = beta_init
else: # Initialise intercept with log base mean
beta_init = np.zeros(num_vars)
beta_init[0] = np.log(counts / size_factors).mean()
beta = beta_init
dev = 1000.0
dev_ratio = 1.0
ridge_factor = np.diag(np.repeat(1e-6, num_vars))
mu = np.maximum(size_factors * np.exp(X @ beta), min_mu)
converged = True
i = 0
while dev_ratio > beta_tol:
W = mu / (1.0 + mu * disp)
z = np.log(mu / size_factors) + (counts - mu) / mu
H = (X.T * W) @ X + ridge_factor
beta_hat = solve(H, X.T @ (W * z), assume_a="pos")
i += 1
if sum(np.abs(beta_hat) > max_beta) > 0 or i >= maxiter:
# If IRLS starts diverging, use L-BFGS-B
def f(beta: np.ndarray) -> float:
# closure to minimize
mu_ = np.maximum(size_factors * np.exp(X @ beta), min_mu)
return nb_nll(counts, mu_, disp) + 0.5 * (ridge_factor @ beta**2).sum()
def df(beta: np.ndarray) -> np.ndarray:
mu_ = np.maximum(size_factors * np.exp(X @ beta), min_mu)
return (
-X.T @ counts
+ ((1 / disp + counts) * mu_ / (1 / disp + mu_)) @ X
+ ridge_factor @ beta
)
res = minimize(
f,
beta_init,
jac=df,
method=optimizer,
bounds=(
[(min_beta, max_beta)] * num_vars
if optimizer == "L-BFGS-B"
else None
),
)
beta = res.x
mu = np.maximum(size_factors * np.exp(X @ beta), min_mu)
converged = res.success
if not res.success and num_vars <= 2:
beta = grid_fit_beta(
counts,
size_factors,
X,
disp,
)
mu = np.maximum(size_factors * np.exp(X @ beta), min_mu)
break
beta = beta_hat
mu = np.maximum(size_factors * np.exp(X @ beta), min_mu)
# Compute deviation
old_dev = dev
# Replaced deviation with -2 * nll, as in the R code
dev = -2 * nb_nll(counts, mu, disp)
dev_ratio = np.abs(dev - old_dev) / (np.abs(dev) + 0.1)
# Compute H diagonal (useful for Cook distance outlier filtering)
# Calculate only the diagonal for X(XTWX)-1XT using einsum
# This is numerically equivalent to the more expensive calculation
# np.diag(X @ (X^T @ np.inv(X^T @ np.diag(W) @ X + lambda) @ X^T)
W = mu / (1.0 + mu * disp)
H = np.einsum(
"ij,jk,ki->i", X, np.linalg.inv((X.T * W[None, :]) @ X + ridge_factor), X.T
)
W_sq = np.sqrt(W)
H = W_sq * H * W_sq
# Return an UNthresholded mu (as in the R code)
# Previous quantities are estimated with a threshold though
mu = size_factors * np.exp(X @ beta)
return beta, mu, H, converged
def fit_alpha_mle(
counts: np.ndarray,
design_matrix: np.ndarray,
mu: np.ndarray,
alpha_hat: float,
min_disp: float,
max_disp: float,
prior_disp_var: Optional[float] = None,
cr_reg: bool = True,
prior_reg: bool = False,
optimizer: Literal["BFGS", "L-BFGS-B"] = "L-BFGS-B",
) -> Tuple[float, bool]:
"""Estimate the dispersion parameter of a negative binomial GLM.
Note: it is possible to pass counts, design_matrix and mu arguments in the form of
pandas Series, but using numpy arrays makes the code significantly faster.
Parameters
----------
counts : ndarray
Raw counts for a given gene.
design_matrix : ndarray
Design matrix.
mu : ndarray
Mean estimation for the NB model.
alpha_hat : float
Initial dispersion estimate.
min_disp : float
Lower threshold for dispersion parameters.
max_disp : float
Upper threshold for dispersion parameters.
prior_disp_var : float
Prior dispersion variance.
cr_reg : bool
Whether to use Cox-Reid regularization. (default: ``True``).
prior_reg : bool
Whether to use prior log-residual regularization. (default: ``False``).
optimizer : str
Optimizing method to use. Accepted values: 'BFGS' or 'L-BFGS-B'.
(default: ``'L-BFGS-B'``).
Returns
-------
float
Dispersion estimate.
bool
Whether L-BFGS-B converged. If not, dispersion is estimated using grid search.
"""
assert optimizer in ["BFGS", "L-BFGS-B"]
if prior_reg:
# Note: assertion is not working when using numpy
assert (
prior_disp_var is not None
), "Sigma_prior is required for prior regularization"
log_alpha_hat = np.log(alpha_hat)
def loss(log_alpha: float) -> float:
# closure to be minimized
alpha = np.exp(log_alpha)
reg = 0
if cr_reg:
W = mu / (1 + mu * alpha)
reg += 0.5 * np.linalg.slogdet((design_matrix.T * W) @ design_matrix)[1]
if prior_reg:
if prior_disp_var is None:
raise ValueError("Sigma_prior is required for prior regularization")
reg += (log_alpha - log_alpha_hat) ** 2 / (2 * prior_disp_var)
return nb_nll(counts, mu, alpha) + reg
def dloss(log_alpha: float) -> float:
# gradient closure
alpha = np.exp(log_alpha)
reg_grad = 0
if cr_reg:
W = mu / (1 + mu * alpha)
dW = -(W**2)
reg_grad += (
0.5
* (
np.linalg.inv((design_matrix.T * W) @ design_matrix)
* ((design_matrix.T * dW) @ design_matrix)
).sum()
) * alpha # since we want the gradient wrt log_alpha,
# we need to multiply by alpha
if prior_reg:
if prior_disp_var is None:
raise ValueError("Sigma_prior is required for prior regularization")
reg_grad += (log_alpha - log_alpha_hat) / prior_disp_var
# dnb_nll is the gradient wrt alpha, we need to multiply by alpha to get the
# gradient wrt log_alpha
return alpha * dnb_nll(counts, mu, alpha) + reg_grad
res = minimize(
lambda x: loss(x[0]),
x0=np.log(alpha_hat),
jac=lambda x: dloss(x[0]),
method=optimizer,
bounds=(
[(np.log(min_disp), np.log(max_disp))] if optimizer == "L-BFGS-B" else None
),
)
if res.success:
return np.exp(res.x[0]), res.success
else:
return (
np.exp(
grid_fit_alpha(counts, design_matrix, mu, alpha_hat, min_disp, max_disp)
),
res.success,
)
def trimmed_mean(x: np.ndarray, trim: float = 0.1, **kwargs) -> Union[float, np.ndarray]:
"""Return trimmed mean.
Compute the mean after trimming data of its smallest and largest quantiles.
Parameters
----------
x : ndarray
Data whose mean to compute.
trim : float
Fraction of data to trim at each end. (default: ``0.1``).
**kwargs
Keyword arguments, useful to pass axis.
Returns
-------
float or ndarray :
Trimmed mean.
"""
assert trim <= 0.5
if "axis" in kwargs:
axis = kwargs.get("axis")
s = np.sort(x, axis=axis)
n = x.shape[axis]
ntrim = floor(n * trim)
return np.take(s, np.arange(ntrim, n - ntrim), axis).mean(axis)
else:
n = len(x)
s = np.sort(x)
ntrim = floor(n * trim)
return s[ntrim : n - ntrim].mean()
def trimmed_cell_variance(counts: np.ndarray, cells: pd.Series) -> np.ndarray:
"""Return trimmed variance of counts according to condition.
Compute the variance after trimming data of its smallest and largest elements,
grouped by cohorts, and return the max across cohorts.
The trim factor is a function of data size.
Parameters
----------
counts : ndarray
Sample-wise gene counts.
cells : pandas.Series
Cohort affiliation of each sample.
Returns
-------
ndarray :
Gene-wise trimmed variance estimate.
"""
# how much to trim at different n
trimratio = (1 / 3, 1 / 4, 1 / 8)
# returns an index for the vector above for three sample size bins
def trimfn(x: float) -> int:
return 2 if x >= 23.5 else 1 if x >= 3.5 else 0
ns = cells.value_counts()
sqerror = np.zeros_like(counts)
for lvl in cells.unique():
cell_means = cast(
np.ndarray,
trimmed_mean(
counts[cells == lvl, :], trim=trimratio[trimfn(ns[lvl])], axis=0
),
)
sqerror[cells == lvl, :] = counts[cells == lvl, :] - cell_means[None, :]
sqerror **= 2
varEst = np.zeros((len(ns), counts.shape[1]), dtype=float)
for i, lvl in enumerate(cells.unique()):
scale = [2.04, 1.86, 1.51][trimfn(ns[lvl])]
varEst[i, :] = scale * trimmed_mean(
sqerror[cells == lvl, :], trim=trimratio[trimfn(ns[lvl])], axis=0
)
return varEst.max(axis=0)
def trimmed_variance(
x: np.ndarray, trim: float = 0.125, axis: int = 0
) -> Union[float, np.ndarray]:
"""Return trimmed variance.
Compute the variance after trimming data of its smallest and largest quantiles.
Parameters
----------
features : ndarray
Data whose trimmed variance to compute.
trim : float
Fraction of data to trim at each end. (default: ``0.125``).
axis : int
Dimension along which to compute variance. (default: ``0``).
Returns
-------
float or ndarray
Trimmed variances.
"""
rm = trimmed_mean(x, trim=trim, axis=axis)
sqerror = (x - rm) ** 2
# scale due to trimming of large squares
return 1.51 * trimmed_mean(sqerror, trim=trim, axis=axis)
def fit_lin_mu(
counts: np.ndarray,
size_factors: np.ndarray,
design_matrix: np.ndarray,
min_mu: float = 0.5,
) -> np.ndarray:
"""Estimate mean of negative binomial model using a linear regression.
Used to initialize genewise dispersion models.
Parameters
----------
counts : ndarray
Raw counts for a given gene.
size_factors : ndarray
Sample-wise scaling factors (obtained from median-of-ratios).
design_matrix : ndarray
Design matrix.
min_mu : float
Lower threshold for fitted means, for numerical stability. (default: ``0.5``).
Returns
-------
ndarray
Estimated mean.
"""
reg = LinearRegression(fit_intercept=False)
reg.fit(design_matrix, counts / size_factors)
mu_hat = size_factors * reg.predict(design_matrix)
# Threshold mu_hat as 1/mu_hat will be used later on.
return np.maximum(mu_hat, min_mu)
def wald_test(
design_matrix: np.ndarray,
disp: float,
lfc: np.ndarray,
mu: np.ndarray,
ridge_factor: np.ndarray,
contrast: np.ndarray,
lfc_null: float,
alt_hypothesis: Optional[Literal["greaterAbs", "lessAbs", "greater", "less"]],
) -> Tuple[float, float, float]:
"""Run Wald test for differential expression.
Computes Wald statistics, standard error and p-values from
dispersion and LFC estimates.
Parameters
----------
design_matrix : ndarray
Design matrix.
disp : float
Dispersion estimate.
lfc : ndarray
Log-fold change estimate (in natural log scale).
mu : ndarray
Mean estimation for the NB model.
ridge_factor : ndarray
Regularization factors.
contrast : ndarray
Vector encoding the contrast that is being tested.
lfc_null : float
The (log2) log fold change under the null hypothesis.
alt_hypothesis : str or None
The alternative hypothesis for computing wald p-values.
Returns
-------
wald_p_value : float
Estimated p-value.
wald_statistic : float
Wald statistic.
wald_se : float
Standard error of the Wald statistic.
"""
# Build covariance matrix estimator
W = mu / (1 + mu * disp)
M = (design_matrix.T * W[None, :]) @ design_matrix
H = np.linalg.inv(M + ridge_factor)
Hc = H @ contrast
# Evaluate standard error and Wald statistic
wald_se: float = np.sqrt(Hc.T @ M @ Hc)
def greater(lfc_null):
stat = contrast @ np.fmax((lfc - lfc_null) / wald_se, 0)
pval = norm.sf(stat)
return stat, pval
def less(lfc_null):
stat = contrast @ np.fmin((lfc - lfc_null) / wald_se, 0)
pval = norm.sf(np.abs(stat))
return stat, pval
def greater_abs(lfc_null):
stat = contrast @ (np.sign(lfc) * np.fmax((np.abs(lfc) - lfc_null) / wald_se, 0))
pval = 2 * norm.sf(np.abs(stat)) # Only case where the test is two-tailed
return stat, pval
def less_abs(lfc_null):
stat_above, pval_above = greater(-abs(lfc_null))
stat_below, pval_below = less(abs(lfc_null))
return min(stat_above, stat_below, key=abs), max(pval_above, pval_below)
wald_statistic: float
wald_p_value: float
if alt_hypothesis:
wald_statistic, wald_p_value = {
"greaterAbs": greater_abs(lfc_null),
"lessAbs": less_abs(lfc_null),
"greater": greater(lfc_null),
"less": less(lfc_null),
}[alt_hypothesis]
else:
wald_statistic = contrast @ (lfc - lfc_null) / wald_se
wald_p_value = 2 * norm.sf(np.abs(wald_statistic))
return wald_p_value, wald_statistic, wald_se
def fit_rough_dispersions(
normed_counts: np.ndarray, design_matrix: pd.DataFrame
) -> np.ndarray:
"""Rough dispersion estimates from linear model, as per the R code.
Used as initial estimates in :meth:`DeseqDataSet.fit_genewise_dispersions()
<pydeseq2.dds.DeseqDataSet.fit_genewise_dispersions>`.
Parameters
----------
normed_counts : ndarray
Array of deseq2-normalized read counts. Rows: samples, columns: genes.
design_matrix : pandas.DataFrame
A DataFrame with experiment design information (to split cohorts).