Skip to content

Commit 1e56026

Browse files
author
OutlyingWest
committed
Added logging, subprocess output handling with BusySpinner and test improvements
1. Added and documented a logging system. 2. Implemented animation for long-running processes in Score-P mode (both for data transfer to the subprocess and for displaying progress animation within the subprocess itself). 3. Added an example of execution of the long running task – Large array processing with Score-P in `examples/ExampleBasic.ipynb`. 4. Changed the way subprocess output streams are read — moved this logic into a separate function `read_scorep_process_pipe()`. 5. Brought tests into a working state: - Created a new function to clean up garbage from standard output. - Added a context manager `with self.subTest()` to better identify which cell failed during execution. - Temporarily updated expected test outputs to account for a new line introduced for proper animation handling (`clear_line` string in `read_scorep_process_pipe()`).
1 parent 8bcc5fd commit 1e56026

9 files changed

Lines changed: 387 additions & 58 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ MODE=[disk,memory]
152152
```
153153

154154
When using persistence in `disk` mode, user can also define directory to which serializer output will be saved with `SCOREP_KERNEL_PERSISTENCE_DIR` environment variable.
155+
To see the detailed report for marshalling steps - `MARSHALLING_DETAILED_REPORT=1` environment variable can be set.
155156

156157
`%%execute_with_scorep`
157158

@@ -235,6 +236,11 @@ Similar yields for cloudpickle. Use the `%%marshalling_settings` magic command t
235236

236237
When dealing with big data structures, there might be a big runtime overhead at the beginning and the end of a Score-P cell. This is due to additional data saving and loading processes for persistency in the background. However this does not affect the actual user code and the Score-P measurements.
237238

239+
## Logging Configuration
240+
To adjust logging and obtain more detailed output about the behavior of the JUmPER kernel, refer to the `src/logging_config.py` file.
241+
242+
This file contains configuration options for controlling the verbosity, format, and destination of log messages. You can customize it to suit your debugging or monitoring needs.
243+
238244
# Future Work
239245

240246
The kernel is still under development. The following is on the agenda:

examples/ExampleBasic.ipynb

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,57 @@
466466
{
467467
"cell_type": "markdown",
468468
"metadata": {},
469-
"source": []
469+
"source": [
470+
"### Large array processing with Score-P\n",
471+
"This example illustrates the steps involved in the marshalling process when a cell instrumented with Score-P is executed with a large data payload as input.\n"
472+
]
473+
},
474+
{
475+
"metadata": {},
476+
"cell_type": "code",
477+
"outputs": [],
478+
"execution_count": null,
479+
"source": [
480+
"import time\n",
481+
"import numpy as np\n",
482+
"\n",
483+
"def generate_array_with_size(size_mb, dtype=np.float32):\n",
484+
" size_bytes = size_mb * 1024 * 1024\n",
485+
" element_size = np.dtype(dtype).itemsize\n",
486+
" num_elements = size_bytes // element_size\n",
487+
" array = np.zeros(num_elements, dtype=dtype)\n",
488+
" return array\n",
489+
"\n",
490+
"big_array = generate_array_with_size(size_mb=1000)"
491+
]
492+
},
493+
{
494+
"metadata": {},
495+
"cell_type": "markdown",
496+
"source": "Enable marshalling detailed report for each step."
497+
},
498+
{
499+
"metadata": {},
500+
"cell_type": "code",
501+
"outputs": [],
502+
"execution_count": null,
503+
"source": "%env MARSHALLING_DETAILED_REPORT=1"
504+
},
505+
{
506+
"metadata": {},
507+
"cell_type": "markdown",
508+
"source": "Run cell with Score-P"
509+
},
510+
{
511+
"metadata": {},
512+
"cell_type": "code",
513+
"outputs": [],
514+
"execution_count": null,
515+
"source": [
516+
"%%execute_with_scorep\n",
517+
"big_array\n",
518+
"time.sleep(4)"
519+
]
470520
}
471521
],
472522
"metadata": {

src/jumper/kernel.py

Lines changed: 111 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22
import json
33
import os
44
import re
5+
import selectors
56
import subprocess
67
import sys
8+
import threading
79
import time
810
import shutil
11+
import logging
12+
import logging.config
913

1014
from enum import Enum
1115
from textwrap import dedent
@@ -14,13 +18,15 @@
1418
from ipykernel.ipkernel import IPythonKernel
1519
from itables import show
1620
from jumper.userpersistence import PersHelper, scorep_script_name
17-
from jumper.userpersistence import magics_cleanup
21+
from jumper.userpersistence import magics_cleanup, create_busy_spinner
1822
import importlib
1923
from jumper.perfdatahandler import PerformanceDataHandler
2024
import jumper.visualization as perfvis
2125

2226
# import jumper.multinode_monitor.slurm_monitor as slurm_monitor
2327

28+
from jumper.logging_config import LOGGING
29+
2430
PYTHON_EXECUTABLE = sys.executable
2531
READ_CHUNK_SIZE = 8
2632
userpersistence_token = "jumper.userpersistence"
@@ -103,6 +109,9 @@ def __init__(self, **kwargs):
103109
except ModuleNotFoundError:
104110
self.scorep_python_available_ = False
105111

112+
logging.config.dictConfig(LOGGING)
113+
self.log = logging.getLogger('kernel')
114+
106115
def cell_output(self, string, stream="stdout"):
107116
"""
108117
Display string as cell output.
@@ -683,16 +692,21 @@ async def scorep_execute(
683692
"""
684693
Execute given code with Score-P Python bindings instrumentation.
685694
"""
695+
self.log.info("Executing Score-P instrumented code...")
696+
self.pershelper.set_dump_report_level()
686697
# Set up files/pipes for persistence communication
687698
if not self.pershelper.preprocess():
688699
self.pershelper.postprocess()
700+
error_message = "Failed to set up persistence communication files/pipes."
701+
self.log.error(error_message)
689702
self.cell_output(
690-
"KernelError: Failed to set up the persistence communication "
691-
"files/pipes.",
703+
f"KernelError: {error_message} ",
692704
"stderr",
693705
)
694706
return self.standard_reply()
695707

708+
self.log.debug("Persistence communication set up successfully.")
709+
696710
# Prepare code for the Score-P instrumented execution as subprocess
697711
# Transmit user persistence and updated sys.path from Jupyter
698712
# notebook to subprocess After running the code, transmit subprocess
@@ -701,11 +715,14 @@ async def scorep_execute(
701715
os.open(scorep_script_name, os.O_WRONLY | os.O_CREAT), "w"
702716
) as file:
703717
file.write(self.pershelper.subprocess_wrapper(code))
718+
self.log.debug(f"Code written to temporary script: {scorep_script_name}")
719+
704720
# For disk mode use implicit synchronization between kernel and
705721
# subprocess: await jupyter_dump, subprocess.wait(),
706722
# await jupyter_update Ghost cell - dump current Jupyter session for
707723
# subprocess Run in a "silent" way to not increase cells counter
708724
if self.pershelper.mode == "disk":
725+
self.log.debug("Executing Jupyter dump for disk mode.")
709726
reply_status_dump = await super().do_execute(
710727
self.pershelper.jupyter_dump(),
711728
silent,
@@ -716,18 +733,24 @@ async def scorep_execute(
716733
)
717734

718735
if reply_status_dump["status"] != "ok":
736+
error_message = "Failed to pickle notebook's persistence."
737+
self.log.error(error_message)
719738
self.ghost_cell_error(
720739
reply_status_dump,
721-
"KernelError: Failed to pickle notebook's persistence.",
740+
f"KernelError: {error_message}",
722741
)
723742
return reply_status_dump
724743

725744
# Launch subprocess with Jupyter notebook environment
745+
self.log.debug("Preparing subprocess execution.")
746+
726747
cmd = (
727748
[PYTHON_EXECUTABLE, "-m", "scorep"]
728749
+ self.scorep_binding_args
729750
+ [scorep_script_name]
730751
)
752+
self.log.debug(f"Subprocess command: {' '.join(cmd)}")
753+
731754
scorep_env = {
732755
key: os.environ[key]
733756
for key in os.environ
@@ -749,13 +772,15 @@ async def scorep_execute(
749772
minute = dt.strftime("%M")
750773

751774
proc = subprocess.Popen(
752-
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env
775+
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=proc_env
753776
)
777+
self.log.debug(f"Subprocess started with PID {proc.pid}")
754778

755779
self.perfdata_handler.start_perfmonitor(proc.pid)
756780
# For memory mode jupyter_dump and jupyter_update must be awaited
757781
# concurrently to the running subprocess
758782
if self.pershelper.mode == "memory":
783+
self.log.debug("Executing Jupyter dump for memory mode.")
759784
reply_status_dump = await super().do_execute(
760785
self.pershelper.jupyter_dump(),
761786
silent,
@@ -765,44 +790,25 @@ async def scorep_execute(
765790
cell_id=cell_id,
766791
)
767792
if reply_status_dump["status"] != "ok":
793+
error_message = "Failed to pickle notebook's persistence."
794+
self.log.error(error_message)
768795
self.ghost_cell_error(
769796
reply_status_dump,
770-
"KernelError: Failed to pickle notebook's persistence.",
797+
f"KernelError: {error_message}",
771798
)
772799
return reply_status_dump
773800

774-
# Redirect process stderr to stdout and observe the latter
775-
# Observing two stream with two threads causes interference in
776-
# cell_output in Jupyter notebook
777-
# stdout is read in chunks, which are split into lines using
778-
# \r or \n as delimiter
779-
# Last element in the list might be "incomplete line",
780-
# not ending with \n or \r, it is saved
781-
# and merged with the first line in the next chunk
782-
incomplete_line = ""
783-
endline_pattern = re.compile(r"(.*?[\r\n]|.+$)")
784801
# Empty cell output, required for interactive output
785802
# e.g. tqdm for-loop progress bar
786803
self.cell_output("\0")
787804

788-
multicellmode_timestamps = []
789-
while True:
790-
chunk = b"" + proc.stdout.read(READ_CHUNK_SIZE)
791-
if chunk == b"":
792-
break
793-
chunk = chunk.decode(sys.getdefaultencoding(), errors="ignore")
794-
lines = endline_pattern.findall(chunk)
795-
if len(lines) > 0:
796-
lines[0] = incomplete_line + lines[0]
797-
if lines[-1][-1] not in ["\n", "\r"]:
798-
incomplete_line = lines.pop(-1)
799-
else:
800-
incomplete_line = ""
801-
for line in lines:
802-
if "MCM_TS" in line:
803-
multicellmode_timestamps.append(line)
804-
continue
805-
self.cell_output(line)
805+
stdout_lock = threading.Lock()
806+
process_busy_spinner = create_busy_spinner(stdout_lock)
807+
process_busy_spinner.start('Process is running...')
808+
809+
multicellmode_timestamps = self.read_scorep_process_pipe(proc, stdout_lock)
810+
811+
process_busy_spinner.stop()
806812

807813
# for multiple nodes, we have to add more lists here, one list per node
808814
# this is required to be in line with the performance data aggregation
@@ -857,21 +863,24 @@ async def scorep_execute(
857863
self.perfdata_handler.end_perfmonitor()
858864
)
859865

860-
# In disk mode, subprocess already terminated
861-
# after dumping persistence to file
862-
if self.pershelper.mode == "disk":
863-
if proc.returncode:
864-
self.pershelper.postprocess()
865-
self.cell_output(
866-
"KernelError: Cell execution failed, cell persistence "
867-
"was not recorded.",
868-
"stderr",
869-
)
870-
return self.standard_reply()
866+
# Check if the score-p process is running.
867+
# This prevents jupyter_update() from getting stuck while reading non-existent temporary files
868+
# if something goes wrong during process execution.
869+
if proc.poll():
870+
self.pershelper.postprocess()
871+
error_message = "Cell execution failed, cell persistence was not recorded."
872+
self.log.error(error_message)
873+
self.cell_output(
874+
f"KernelError: {error_message}",
875+
"stderr",
876+
)
877+
return self.standard_reply()
871878

872879
# os_environ_.clear()
873880
# sys_path_.clear()
874881

882+
# In disk mode, subprocess already terminated
883+
# after dumping persistence to file
875884
# Ghost cell - load subprocess persistence back to Jupyter notebook
876885
# Run in a "silent" way to not increase cells counter
877886
reply_status_update = await super().do_execute(
@@ -883,28 +892,31 @@ async def scorep_execute(
883892
cell_id=cell_id,
884893
)
885894
if reply_status_update["status"] != "ok":
895+
error_message = "Failed to load cell's persistence to the notebook."
896+
self.log.error(error_message)
886897
self.ghost_cell_error(
887898
reply_status_update,
888-
"KernelError: Failed to load cell's persistence to the "
889-
"notebook.",
899+
f"KernelError: {error_message}"
890900
)
891901
return reply_status_update
892902

893903
# In memory mode, subprocess terminates once jupyter_update is
894904
# executed and pipe is closed
895905
if self.pershelper.mode == "memory":
896-
if proc.returncode:
906+
if proc.poll():
897907
self.pershelper.postprocess()
908+
error_message = "Cell execution failed, cell persistence was not recorded."
909+
self.log.error(error_message)
898910
self.cell_output(
899-
"KernelError: Cell execution failed, cell persistence "
900-
"was not recorded.",
911+
f"KernelError: {error_message}",
901912
"stderr",
902913
)
903914
return self.standard_reply()
904915

905916
# Determine directory to which trace files were saved by Score-P
906917
scorep_folder = ""
907918
if "SCOREP_EXPERIMENT_DIRECTORY" in os.environ:
919+
self.log.warning(f'{os.environ["SCOREP_EXPERIMENT_DIRECTORY"]=}')
908920
scorep_folder = os.environ["SCOREP_EXPERIMENT_DIRECTORY"]
909921
self.cell_output(
910922
f"Instrumentation results can be found in {scorep_folder}"
@@ -942,13 +954,62 @@ async def scorep_execute(
942954
)
943955

944956
self.pershelper.postprocess()
957+
945958
if performance_data_nodes:
946959
self.report_perfdata(performance_data_nodes, duration)
947960
self.perfdata_handler.append_code(
948961
datetime.datetime.now(), code, time_indices
949962
)
950963
return self.standard_reply()
951964

965+
966+
def read_scorep_process_pipe(self, proc: subprocess.Popen[bytes], stdout_lock: threading.Lock) -> list:
967+
"""
968+
Reads and processes the output of a subprocess running with Score-P instrumentation.
969+
Args:
970+
proc (subprocess.Popen[bytes]): The subprocess whose output is being read.
971+
stdout_lock (threading.Lock): Lock to avoid output overlapping
972+
973+
Returns:
974+
list: A list of decoded strings containing "MCM_TS" timestamps.
975+
"""
976+
multicellmode_timestamps = []
977+
sel = selectors.DefaultSelector()
978+
979+
sel.register(proc.stdout, selectors.EVENT_READ)
980+
sel.register(proc.stderr, selectors.EVENT_READ)
981+
982+
line_width = 50
983+
clear_line = "\r" + " " * line_width + "\r"
984+
985+
while True:
986+
# Select between stdout and stderr
987+
for key, val in sel.select():
988+
line = key.fileobj.readline()
989+
if not line:
990+
sel.unregister(key.fileobj)
991+
continue
992+
993+
decoded_line = line.decode(sys.getdefaultencoding(), errors='ignore')
994+
995+
if key.fileobj is proc.stderr:
996+
with stdout_lock:
997+
self.log.warning(f'{decoded_line.strip()}')
998+
elif 'MCM_TS' in decoded_line:
999+
multicellmode_timestamps.append(decoded_line)
1000+
else:
1001+
with stdout_lock:
1002+
sys.stdout.write(clear_line)
1003+
sys.stdout.flush()
1004+
self.cell_output(decoded_line)
1005+
1006+
# If both stdout and stderr empty -> out of loop
1007+
if not sel.get_map():
1008+
break
1009+
1010+
return multicellmode_timestamps
1011+
1012+
9521013
async def do_execute(
9531014
self,
9541015
code,

0 commit comments

Comments
 (0)