11import datetime
2+ import importlib
3+ import logging .config
24import os
35import re
4- import selectors
6+ import shutil
57import subprocess
68import sys
79import threading
810import time
9- import shutil
10- import logging .config
11-
1211from enum import Enum
1312from textwrap import dedent
13+ from typing import IO , AnyStr , Callable , List , TextIO
14+
1415from ipykernel .ipkernel import IPythonKernel
15- from scorep_jupyter .userpersistence import PersHelper , scorep_script_name
16- from scorep_jupyter .userpersistence import magics_cleanup , create_busy_spinner
17- import importlib
16+
1817from scorep_jupyter .kernel_messages import (
1918 KernelErrorCode ,
2019 KERNEL_ERROR_MESSAGES ,
20+ get_scorep_process_error_hint ,
2121)
22+ from scorep_jupyter .userpersistence import PersHelper , scorep_script_name
23+ from scorep_jupyter .userpersistence import magics_cleanup , create_busy_spinner
24+ from .logging_config import LOGGING
2225
2326# import scorep_jupyter.multinode_monitor.slurm_monitor as slurm_monitor
2427
25- from .logging_config import LOGGING
26-
2728PYTHON_EXECUTABLE = sys .executable
2829userpersistence_token = "scorep_jupyter.userpersistence"
2930jupyter_dump = "jupyter_dump.pkl"
@@ -234,7 +235,6 @@ def append_multicellmode(self, code):
234235 f"print('Executing cell { self .multicell_cellcount } ')\n "
235236 + f"print('''{ code } ''')\n "
236237 + f"print('-' * { max_line_len } )\n "
237- + "print('MCM_TS'+str(time.time()))\n "
238238 + f"{ code } \n "
239239 + "print('''\n ''')\n "
240240 )
@@ -459,6 +459,7 @@ async def scorep_execute(
459459 allow_stdin = False ,
460460 * ,
461461 cell_id = None ,
462+ is_multicell_final = False ,
462463 ):
463464 """
464465 Execute given code with Score-P Python bindings instrumentation.
@@ -563,40 +564,17 @@ async def scorep_execute(
563564 self .pershelper .postprocess ()
564565 return reply_status_dump
565566
566- # Empty cell output, required for interactive output
567- # e.g. tqdm for-loop progress bar
568- self .cell_output ("\0 " )
569-
570- stdout_lock = threading .Lock ()
571- process_busy_spinner = create_busy_spinner (stdout_lock )
572- process_busy_spinner .start ("Process is running..." )
567+ self .start_reading_scorep_process_streams (proc , is_multicell_final )
573568
574- # Due to splitting into scorep-kernel and ipython extension,
575- # multicell mode is not supported for coarse-grained measurements
576- # anymore (in the extension) and we do not show the single cells in
577- # the ipython extension visualizations after executing them with scorep
578- # however, since we are using scorep anyway, the ipython extension is
579- # not useful, since we can count hardware counters anyway
580- # multicellmode_timestamps = []
581-
582- try :
583- # multicellmode_timestamps =
584- self .read_scorep_process_pipe (proc , stdout_lock )
585- process_busy_spinner .stop ("Done." )
586- except KeyboardInterrupt :
587- process_busy_spinner .stop ("Kernel interrupted." )
569+ if proc .poll ():
570+ self .pershelper .postprocess ()
571+ self .log_error (
572+ KernelErrorCode .PERSISTENCE_LOAD_FAIL ,
573+ direction = "Score-P -> Jupyter" ,
574+ optional_hint = get_scorep_process_error_hint ()
575+ )
576+ return self .standard_reply ()
588577
589- # In disk mode, subprocess already terminated
590- # after dumping persistence to file
591- if self .pershelper .mode == "disk" :
592- if proc .returncode :
593- self .pershelper .postprocess ()
594- self .cell_output (
595- "KernelError: Cell execution failed, cell persistence "
596- "was not recorded." ,
597- "stderr" ,
598- )
599- return self .standard_reply ()
600578 # Ghost cell - load subprocess persistence back to Jupyter notebook
601579 # Run in a "silent" way to not increase cells counter
602580 reply_status_update = await super ().do_execute (
@@ -607,25 +585,16 @@ async def scorep_execute(
607585 allow_stdin = allow_stdin ,
608586 cell_id = cell_id ,
609587 )
588+
610589 if reply_status_update ["status" ] != "ok" :
611590 self .log_error (
612591 KernelErrorCode .PERSISTENCE_LOAD_FAIL ,
613592 direction = "Score-P -> Jupyter" ,
593+ optional_hint = get_scorep_process_error_hint ()
614594 )
615595 self .pershelper .postprocess ()
616596 return reply_status_update
617597
618- # In memory mode, subprocess terminates once jupyter_update is
619- # executed and pipe is closed
620- if self .pershelper .mode == "memory" :
621- if proc .poll ():
622- self .pershelper .postprocess ()
623- self .log_error (
624- KernelErrorCode .PERSISTENCE_LOAD_FAIL ,
625- direction = "Score-P -> Jupyter" ,
626- )
627- return self .standard_reply ()
628-
629598 # Determine directory to which trace files were saved by Score-P
630599 scorep_folder = ""
631600 if "SCOREP_EXPERIMENT_DIRECTORY" in os .environ :
@@ -669,66 +638,145 @@ async def scorep_execute(
669638 self .pershelper .postprocess ()
670639 return self .standard_reply ()
671640
672- def read_scorep_process_pipe (
673- self , proc : subprocess .Popen [bytes ], stdout_lock : threading .Lock
674- ) -> list :
641+ def start_reading_scorep_process_streams (
642+ self ,
643+ proc : subprocess .Popen [bytes ],
644+ is_multicell_final : bool ,
645+ ):
675646 """
676647 This function reads stdout and stderr of the subprocess running with
677648 Score-P instrumentation independently.
678- It logs all stderr output, collects lines containing
679- the marker "MCM_TS" (used to identify multi-cell mode timestamps) into
680- a list, and sends the remaining
681- stdout lines to the Jupyter cell output.
682649
683650 Simultaneous access to stdout is synchronized via a lock to prevent
684- overlapping with another thread performing
651+ overlapping with stderr reading thread and thread performing
685652 long-running process animation.
686653
687654 Args:
688655 proc (subprocess.Popen[bytes]): The subprocess whose output is
689656 being read.
690- stdout_lock (threading.Lock): Lock to avoid output overlapping
657+ is_multicell_final (bool): If multicell mode is finalizing -
658+ spinner must be disabled.
691659
692- Returns:
693- list: A list of decoded strings containing "MCM_TS" timestamps.
694660 """
695- multicellmode_timestamps = []
696- sel = selectors .DefaultSelector ()
697661
698- sel .register (proc .stdout , selectors .EVENT_READ )
699- sel .register (proc .stderr , selectors .EVENT_READ )
662+ stdout_lock = threading .Lock ()
663+ spinner_stop_event = threading .Event ()
664+ process_busy_spinner = create_busy_spinner (
665+ stdout_lock , spinner_stop_event , is_multicell_final
666+ )
700667
668+ captured_stdout : List [str ] = []
669+ captured_stderr : List [str ] = [] # Output parameter (return not possible from thread)
670+ t_stderr = threading .Thread (
671+ target = self .read_scorep_stderr ,
672+ args = (proc .stderr , stdout_lock , spinner_stop_event , captured_stderr ),
673+ )
674+
675+ # Empty cell output, required for interactive output
676+ # e.g. tqdm for-loop progress bar
677+ self .cell_output ("\0 " )
678+
679+ spinner_message = "Done."
680+
681+ try :
682+ process_busy_spinner .start ("Process is running..." )
683+ t_stderr .start ()
684+
685+ captured_stdout = self .read_scorep_stdout (
686+ proc .stdout , stdout_lock , spinner_stop_event
687+ )
688+
689+ except KeyboardInterrupt :
690+ spinner_message = "Kernel interrupted."
691+ finally :
692+ t_stderr .join ()
693+ process_busy_spinner .stop (spinner_message )
694+
695+ # Handle recorded output (in case if it is suppressed by spinner animation)
696+ self .handle_captured_output (captured_stdout , stream = "stdout" )
697+ self .handle_captured_output (captured_stderr , stream = "stderr" )
698+
699+ def read_scorep_stdout (
700+ self ,
701+ stdout : IO [AnyStr ],
702+ lock : threading .Lock ,
703+ spinner_stop_event : threading .Event ,
704+ read_chunk_size = 64 ,
705+ ) -> List [str ]:
701706 line_width = 50
702707 clear_line = "\r " + " " * line_width + "\r "
703708
704- while True :
705- # Select between stdout and stderr
706- for key , val in sel .select ():
707- line = key .fileobj .readline ()
708- if not line :
709- sel .unregister (key .fileobj )
710- continue
711-
712- decoded_line = line .decode (
713- sys .getdefaultencoding (), errors = "ignore"
714- )
709+ captured_stdout : List [str ] = []
715710
716- if key .fileobj is proc .stderr :
717- with stdout_lock :
718- self .log .warning (f"{ decoded_line .strip ()} " )
719- elif "MCM_TS" in decoded_line :
720- multicellmode_timestamps .append (decoded_line )
721- else :
722- with stdout_lock :
723- sys .stdout .write (clear_line )
724- sys .stdout .flush ()
725- self .cell_output (decoded_line )
711+ def process_stdout_line (line : str ):
712+ if spinner_stop_event .is_set ():
713+ sys .stdout .write (clear_line )
714+ sys .stdout .flush ()
715+ self .cell_output (line )
716+ else :
717+ captured_stdout .append (line )
726718
727- # If both stdout and stderr empty -> out of loop
728- if not sel .get_map ():
729- break
719+ self .read_scorep_stream (
720+ stdout , lock , process_stdout_line , read_chunk_size
721+ )
722+ return captured_stdout
723+
724+ def read_scorep_stderr (
725+ self ,
726+ stderr : IO [AnyStr ],
727+ lock : threading .Lock ,
728+ spinner_stop_event : threading .Event ,
729+ captured_stderr : List [str ],
730+ read_chunk_size = 64 ,
731+ ):
732+
733+ def process_stderr_line (line : str ):
734+ if spinner_stop_event .is_set ():
735+ self .log .error (line .strip ())
736+ self .cell_output (line , 'stderr' )
737+ else :
738+ captured_stderr .append (line )
739+
740+ self .read_scorep_stream (
741+ stderr , lock , process_stderr_line , read_chunk_size
742+ )
743+
744+ def read_scorep_stream (
745+ self ,
746+ stream : IO [AnyStr ],
747+ lock : threading .Lock ,
748+ process_line : Callable [[str ], None ],
749+ read_chunk_size : int = 64 ,
750+ ):
751+ incomplete_line = ""
752+ endline_pattern = re .compile (r"(.*?[\r\n]|.+$)" )
730753
731- return multicellmode_timestamps
754+ while True :
755+ chunk = stream .read (read_chunk_size )
756+ if not chunk :
757+ break
758+ chunk = chunk .decode (sys .getdefaultencoding (), errors = "ignore" )
759+ lines = endline_pattern .findall (chunk )
760+ if lines :
761+ lines [0 ] = incomplete_line + lines [0 ]
762+ if lines [- 1 ][- 1 ] not in ["\n " , "\r " ]:
763+ incomplete_line = lines .pop (- 1 )
764+ else :
765+ incomplete_line = ""
766+ for line in lines :
767+ with lock :
768+ process_line (line )
769+
770+ def handle_captured_output (self , output : List [str ], stream : str ):
771+ if output :
772+ text_output = "" .join (output )
773+ if stream == "stdout" :
774+ self .cell_output (text_output , stream = stream )
775+ elif stream == "stderr" :
776+ self .cell_output (text_output , stream = stream )
777+ self .log .error (text_output )
778+ else :
779+ self .log .error (f"Undefined stream type: { stream } " )
732780
733781 async def do_execute (
734782 self ,
@@ -778,6 +826,7 @@ async def do_execute(
778826 user_expressions ,
779827 allow_stdin ,
780828 cell_id = cell_id ,
829+ is_multicell_final = True ,
781830 )
782831 except Exception :
783832 self .cell_output (
@@ -879,7 +928,7 @@ def log_error(self, code: KernelErrorCode, **kwargs):
879928 )
880929 message = template .format (mode = mode , marshaller = marshaller , ** kwargs )
881930
882- self .log .error (message )
931+ self .log .error (message . strip () )
883932 self .cell_output ("KernelError: " + message , "stderr" )
884933
885934
0 commit comments