Skip to content

Commit dfaee41

Browse files
authored
Asyncable functions (#258)
1 parent 868b16a commit dfaee41

6 files changed

Lines changed: 58 additions & 70 deletions

File tree

main/como/create_context_specific_model.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
Solver,
3333
_BoundaryReactions,
3434
)
35-
from como.utils import set_up_logging, split_gene_expression_data
35+
from como.utils import asyncable, set_up_logging, split_gene_expression_data
3636

3737

3838
def _reaction_indices_to_ids(
@@ -1014,3 +1014,6 @@ def create_context_specific_model( # noqa: C901
10141014
f"metabolites={len(context_model.metabolites)}"
10151015
)
10161016
return context_model
1017+
1018+
1019+
async_create_context_specific_model = asyncable(create_context_specific_model)

main/como/merge_xomics.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
_SourceWeights,
2323
)
2424
from como.project import Config
25-
from como.utils import get_missing_gene_data, read_file, return_placeholder_data, set_up_logging
25+
from como.utils import asyncable, get_missing_gene_data, read_file, return_placeholder_data, set_up_logging
2626

2727

2828
class _MergedHeaderNames:
@@ -616,3 +616,6 @@ def merge_xomics( # noqa: C901
616616
output_final_model_scores_filepath=output_final_model_scores_filepath,
617617
output_figure_dirpath=output_figure_dirpath,
618618
)
619+
620+
621+
async_merge_xomics = asyncable(merge_xomics)

main/como/proteomics_gen.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import itertools
44
import sys
55
from pathlib import Path
6-
from typing import TextIO, cast
6+
from typing import TextIO
77

88
import numpy as np
99
import pandas as pd
@@ -13,7 +13,7 @@
1313
from como.data_types import LogLevel
1414
from como.project import Config
1515
from como.proteomics_preprocessing import protein_transform_main
16-
from como.utils import return_placeholder_data, set_up_logging
16+
from como.utils import asyncable, return_placeholder_data, set_up_logging
1717

1818

1919
# Load Proteomics
@@ -43,7 +43,7 @@ def process_proteomics_data(path: Path) -> pd.DataFrame:
4343

4444

4545
# read map to convert to entrez
46-
async def load_gene_symbol_map(gene_symbols: list[str], entrez_map: Path | None = None):
46+
def load_gene_symbol_map(gene_symbols: list[str], entrez_map: Path | None = None):
4747
"""Load a mapping from gene symbols to Entrez IDs.
4848
4949
Args:
@@ -188,8 +188,9 @@ def load_empty_dict():
188188
)
189189
return load_empty_dict()
190190

191+
191192
# TODO: Convert to synchronous function
192-
async def proteomics_gen(
193+
def proteomics_gen(
193194
context_name: str,
194195
config_filepath: Path,
195196
matrix_filepath: Path,
@@ -230,9 +231,9 @@ async def proteomics_gen(
230231
for group in groups:
231232
indices = np.where([g == group for g in config_df["group"]])
232233
sample_columns = [*np.take(config_df["sample_name"].to_numpy(), indices).ravel().tolist(), "gene_symbol"]
233-
matrix = cast(pd.DataFrame, matrix.loc[:, sample_columns])
234-
235-
symbols_to_gene_ids = await load_gene_symbol_map(
234+
matrix = matrix.loc[:, sample_columns]
235+
236+
symbols_to_gene_ids = load_gene_symbol_map(
236237
gene_symbols=matrix["gene_symbol"].tolist(),
237238
entrez_map=input_entrez_map,
238239
)
@@ -264,3 +265,6 @@ async def proteomics_gen(
264265
hi_group_ratio=high_confidence_batch_ratio,
265266
group_names=groups,
266267
)
268+
269+
270+
async_proteomics_gen = asyncable(proteomics_gen)

main/como/rnaseq_gen.py

Lines changed: 8 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from como.migrations import gene_info_migrations
2626
from como.pipelines.identifier import contains_identical_gene_types, determine_gene_type
2727
from como.project import Config
28-
from como.utils import read_file, set_up_logging
28+
from como.utils import asyncable, read_file, set_up_logging
2929

3030

3131
class _FilteringOptions(NamedTuple):
@@ -185,6 +185,9 @@ def _build_matrix_results(
185185
elif isinstance(matrix, sc.AnnData):
186186
if not isinstance(matrix.var, pd.DataFrame):
187187
raise TypeError(f"Expected matrix.var object to be 'pd.DataFrame', got '{type(matrix.var)}'")
188+
189+
if matrix.raw is not None:
190+
matrix = matrix.raw.to_adata()
188191

189192
gene_info = gene_info.sort_values(["entrez_gene_id", "size"], ascending=[True, True]).drop_duplicates(
190193
subset=["entrez_gene_id"], keep="first"
@@ -538,8 +541,6 @@ def umi_filter(
538541
adata: sc.AnnData = metric.count_matrix.copy()
539542

540543
if perform_normalization:
541-
if adata.raw is not None:
542-
adata.X = adata.raw.X.copy()
543544
sc.pp.filter_cells(adata, min_genes=10)
544545
sc.pp.filter_genes(adata, min_cells=1)
545546
sc.pp.normalize_total(adata, target_sum=target_sum)
@@ -549,8 +550,8 @@ def umi_filter(
549550

550551
adata_x = adata.X
551552
n_cells, n_genes = adata.shape
552-
553-
min_samples: float = round(min_sample_expression * n_cells)
553+
554+
min_samples = round(min_sample_expression * n_cells)
554555
min_func = k_over_a(min_samples, cut_off)
555556
min_genes_mask = np.zeros(n_genes, dtype=bool)
556557
for j in range(n_genes):
@@ -709,7 +710,7 @@ def _process(
709710

710711
merged_zscores = merged_zscores.reindex(columns=sorted(merged_zscores.columns))
711712
merged_zscores = merged_zscores.groupby(by=merged_zscores.index.name).mean()
712-
merged_zscores.to_csv(output_zscore_normalization_filepath, index=True)
713+
merged_zscores.to_csv(output_zscore_normalization_filepath.with_suffix(".csv"), index=True)
713714
elif isinstance(rnaseq_matrix, sc.AnnData):
714715
merged_zscores = ad.concat([m.z_score_matrix for m in metrics.values()], axis="obs")
715716
merged_zscores.var.index.name = "entrez_gene_id"
@@ -905,55 +906,4 @@ def rnaseq_gen( # noqa: C901
905906
)
906907

907908

908-
if __name__ == "__main__":
909-
import matplotlib.pyplot as plt
910-
911-
data = pd.read_csv("/Users/joshl/Downloads/fpkm_example_data/CD8.genes.results.txt", sep="\t")
912-
data["gene_id"] = data["gene_id"].str.partition(".")[0]
913-
counts = (
914-
data[["gene_id", "expected_count"]]
915-
.copy()
916-
.set_index("gene_id")
917-
.sort_index()
918-
.rename(columns={"expected_count": "actual"})
919-
)
920-
eff_len = (
921-
data[["gene_id", "effective_length"]]
922-
.copy()
923-
.set_index("gene_id")
924-
.sort_index()
925-
.rename(columns={"effective_length": "actual"})
926-
)
927-
expected_fpkm = (
928-
data[["gene_id", "FPKM"]].copy().set_index("gene_id").sort_index().rename(columns={"FPKM": "expected"})
929-
)
930-
931-
metrics = {
932-
"S1": _StudyMetrics(
933-
study="S1",
934-
num_samples=1,
935-
count_matrix=counts,
936-
eff_length=eff_len,
937-
sample_names=[""],
938-
layout=[LayoutMethod.paired_end],
939-
entrez_gene_ids=np.ndarray([0]),
940-
gene_sizes=np.ndarray([0]),
941-
)
942-
}
943-
calculated_fpkm = _calculate_fpkm(metrics)["S1"].normalization_matrix
944-
calculated_fpkm = calculated_fpkm.round(2)
945-
946-
joined = calculated_fpkm.join(expected_fpkm, how="inner")
947-
joined["actual"] = joined["actual"].replace([np.nan, np.inf], 0)
948-
949-
zfpkm_df, _ = zFPKM(joined, remove_na=True)
950-
zfpkm_df = zfpkm_df.replace(-np.inf, np.nan)
951-
952-
fig, axes = cast(tuple[plt.Figure, list[plt.Axes]], plt.subplots(nrows=2, ncols=1))
953-
axes[0].hist(zfpkm_df["actual"].to_numpy())
954-
axes[0].set_title("Expected zFPKM")
955-
956-
axes[1].hist(zfpkm_df["expected"].to_numpy())
957-
axes[1].set_title("Actual zFPKM")
958-
fig.tight_layout()
959-
fig.show()
909+
async_rnaseq_gen = asyncable(rnaseq_gen)

main/como/rnaseq_preprocess.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from como.data_types import LogLevel, RNAType
1717
from como.pipelines.identifier import build_gene_info, get_remaining_identifiers
18-
from como.utils import read_file, set_up_logging
18+
from como.utils import asyncable, read_file, set_up_logging
1919

2020

2121
@dataclass
@@ -739,3 +739,6 @@ def rnaseq_preprocess( # noqa: C901
739739
cache=cache,
740740
create_gene_info_only=create_gene_info_only,
741741
)
742+
743+
744+
async_rnaseq_preprocess = asyncable(rnaseq_preprocess)

main/como/utils.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import contextlib
5+
import functools
46
import io
57
import sys
6-
from collections.abc import Iterator, Sequence
8+
from collections.abc import Awaitable, Callable, Iterator, Sequence
79
from pathlib import Path
8-
from typing import Any, Literal, NoReturn, TextIO, TypeVar, overload
10+
from typing import Any, Literal, ParamSpec, TextIO, TypeVar, overload
911

1012
import numpy.typing as npt
1113
import pandas as pd
@@ -15,8 +17,11 @@
1517
from como.data_types import LOG_FORMAT, Algorithm, LogLevel
1618
from como.pipelines.identifier import get_remaining_identifiers
1719

20+
P = ParamSpec("P")
1821
T = TypeVar("T")
22+
1923
__all__ = [
24+
"asyncable",
2025
"get_missing_gene_data",
2126
"num_columns",
2227
"num_rows",
@@ -309,3 +314,23 @@ def set_up_logging(
309314
with contextlib.suppress(ValueError):
310315
logger.remove(0)
311316
logger.add(sink=location, level=level.value, format=formatting)
317+
318+
319+
def asyncable(func: Callable[P, T]) -> Callable[P, Awaitable[T]]:
320+
"""Converts a synchronous function to asynchronous.
321+
322+
This wrapper functions by running the synchronous function in a separate thread using asyncio's run_in_executor
323+
This allows the synchronous function to be called in an asynchronous context without blocking the event loop.
324+
325+
:param func: The synchronous function to convert.
326+
:return: An asynchronous version of the input function that runs in a separate thread.
327+
"""
328+
329+
@functools.wraps(func)
330+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
331+
loop = asyncio.get_running_loop()
332+
call = functools.partial(func, *args, **kwargs)
333+
return await loop.run_in_executor(None, call)
334+
# return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
335+
336+
return wrapper

0 commit comments

Comments
 (0)