|
11 | 11 |
|
12 | 12 | from enum import Enum |
13 | 13 | from textwrap import dedent |
| 14 | +from statistics import mean |
| 15 | +from typing import IO, AnyStr, Callable |
| 16 | + |
| 17 | +import pandas as pd |
14 | 18 | from ipykernel.ipkernel import IPythonKernel |
15 | 19 | from scorep_jupyter.userpersistence import PersHelper, scorep_script_name |
16 | 20 | from scorep_jupyter.userpersistence import magics_cleanup, create_busy_spinner |
@@ -459,6 +463,7 @@ async def scorep_execute( |
459 | 463 | allow_stdin=False, |
460 | 464 | *, |
461 | 465 | cell_id=None, |
| 466 | + is_multicell_final=False, |
462 | 467 | ): |
463 | 468 | """ |
464 | 469 | Execute given code with Score-P Python bindings instrumentation. |
@@ -563,28 +568,7 @@ async def scorep_execute( |
563 | 568 | self.pershelper.postprocess() |
564 | 569 | return reply_status_dump |
565 | 570 |
|
566 | | - # Empty cell output, required for interactive output |
567 | | - # e.g. tqdm for-loop progress bar |
568 | | - self.cell_output("\0") |
569 | | - |
570 | | - stdout_lock = threading.Lock() |
571 | | - process_busy_spinner = create_busy_spinner(stdout_lock) |
572 | | - process_busy_spinner.start("Process is running...") |
573 | | - |
574 | | - # Due to splitting into scorep-kernel and ipython extension, |
575 | | - # multicell mode is not supported for coarse-grained measurements |
576 | | - # anymore (in the extension) and we do not show the single cells in |
577 | | - # the ipython extension visualizations after executing them with scorep |
578 | | - # however, since we are using scorep anyway, the ipython extension is |
579 | | - # not useful, since we can count hardware counters anyway |
580 | | - # multicellmode_timestamps = [] |
581 | | - |
582 | | - try: |
583 | | - # multicellmode_timestamps = |
584 | | - self.read_scorep_process_pipe(proc, stdout_lock) |
585 | | - process_busy_spinner.stop("Done.") |
586 | | - except KeyboardInterrupt: |
587 | | - process_busy_spinner.stop("Kernel interrupted.") |
| 571 | + multicellmode_timestamps = self.start_reading_scorep_process_streams(proc, is_multicell_final) |
588 | 572 |
|
589 | 573 | # In disk mode, subprocess already terminated |
590 | 574 | # after dumping persistence to file |
@@ -669,67 +653,130 @@ async def scorep_execute( |
669 | 653 | self.pershelper.postprocess() |
670 | 654 | return self.standard_reply() |
671 | 655 |
|
672 | | - def read_scorep_process_pipe( |
673 | | - self, proc: subprocess.Popen[bytes], stdout_lock: threading.Lock |
| 656 | + def start_reading_scorep_process_streams( |
| 657 | + self, |
| 658 | + proc: subprocess.Popen[bytes], |
| 659 | + is_multicell_final: bool, |
674 | 660 | ) -> list: |
675 | 661 | """ |
676 | 662 | This function reads stdout and stderr of the subprocess running with |
677 | 663 | Score-P instrumentation independently. |
678 | | - It logs all stderr output, collects lines containing |
679 | | - the marker "MCM_TS" (used to identify multi-cell mode timestamps) into |
680 | | - a list, and sends the remaining |
681 | | - stdout lines to the Jupyter cell output. |
682 | 664 |
|
683 | 665 | Simultaneous access to stdout is synchronized via a lock to prevent |
684 | | - overlapping with another thread performing |
| 666 | + overlapping with stderr reading thread and thread performing |
685 | 667 | long-running process animation. |
686 | 668 |
|
687 | 669 | Args: |
688 | | - proc (subprocess.Popen[bytes]): The subprocess whose output is |
689 | | - being read. |
690 | | - stdout_lock (threading.Lock): Lock to avoid output overlapping |
| 670 | + proc (subprocess.Popen[bytes]): The subprocess whose output is being read. |
| 671 | + is_multicell_final (bool): If multicell mode is finalizing - spinner must be disabled. |
691 | 672 |
|
692 | 673 | Returns: |
693 | 674 | list: A list of decoded strings containing "MCM_TS" timestamps. |
694 | 675 | """ |
| 676 | + |
| 677 | + stdout_lock = threading.Lock() |
| 678 | + spinner_stop_event = threading.Event() |
| 679 | + process_busy_spinner = create_busy_spinner(stdout_lock, spinner_stop_event, is_multicell_final) |
| 680 | + process_busy_spinner.start("Process is running...") |
| 681 | + |
695 | 682 | multicellmode_timestamps = [] |
696 | | - sel = selectors.DefaultSelector() |
697 | 683 |
|
698 | | - sel.register(proc.stdout, selectors.EVENT_READ) |
699 | | - sel.register(proc.stderr, selectors.EVENT_READ) |
| 684 | + # Empty cell output, required for interactive output |
| 685 | + # e.g. tqdm for-loop progress bar |
| 686 | + self.cell_output("\0") |
700 | 687 |
|
701 | | - line_width = 50 |
702 | | - clear_line = "\r" + " " * line_width + "\r" |
| 688 | + try: |
| 689 | + t_stderr = threading.Thread( |
| 690 | + target=self.read_scorep_stderr, |
| 691 | + args=( |
| 692 | + proc.stderr, |
| 693 | + stdout_lock, |
| 694 | + spinner_stop_event) |
| 695 | + ) |
| 696 | + t_stderr.start() |
703 | 697 |
|
704 | | - while True: |
705 | | - # Select between stdout and stderr |
706 | | - for key, val in sel.select(): |
707 | | - line = key.fileobj.readline() |
708 | | - if not line: |
709 | | - sel.unregister(key.fileobj) |
710 | | - continue |
711 | | - |
712 | | - decoded_line = line.decode( |
713 | | - sys.getdefaultencoding(), errors="ignore" |
714 | | - ) |
| 698 | + multicellmode_timestamps = self.read_scorep_stdout(proc.stdout, stdout_lock, spinner_stop_event) |
| 699 | + |
| 700 | + t_stderr.join() |
| 701 | + process_busy_spinner.stop("Done.") |
| 702 | + |
| 703 | + except KeyboardInterrupt: |
| 704 | + process_busy_spinner.stop("Kernel interrupted.") |
| 705 | + |
| 706 | + if multicellmode_timestamps: |
| 707 | + self.log.debug(f'{multicellmode_timestamps = }') |
| 708 | + else: |
| 709 | + self.log.debug(f'"multicellmode_timestamps" is empty.') |
| 710 | + |
| 711 | + |
| 712 | + return multicellmode_timestamps |
| 713 | + |
| 714 | + def read_scorep_stream( |
| 715 | + self, |
| 716 | + stream: IO[AnyStr], |
| 717 | + lock: threading.Lock, |
| 718 | + process_line: Callable[[str], None], |
| 719 | + read_chunk_size: int = 64, |
| 720 | + ): |
| 721 | + incomplete_line = "" |
| 722 | + endline_pattern = re.compile(r"(.*?[\r\n]|.+$)") |
715 | 723 |
|
716 | | - if key.fileobj is proc.stderr: |
717 | | - with stdout_lock: |
718 | | - self.log.warning(f"{decoded_line.strip()}") |
719 | | - elif "MCM_TS" in decoded_line: |
720 | | - multicellmode_timestamps.append(decoded_line) |
721 | | - else: |
722 | | - with stdout_lock: |
723 | | - sys.stdout.write(clear_line) |
724 | | - sys.stdout.flush() |
725 | | - self.cell_output(decoded_line) |
726 | 724 |
|
727 | | - # If both stdout and stderr empty -> out of loop |
728 | | - if not sel.get_map(): |
| 725 | + |
| 726 | + while True: |
| 727 | + chunk = stream.read(read_chunk_size) |
| 728 | + if not chunk: |
729 | 729 | break |
| 730 | + chunk = chunk.decode(sys.getdefaultencoding(), errors="ignore") |
| 731 | + lines = endline_pattern.findall(chunk) |
| 732 | + if lines: |
| 733 | + lines[0] = incomplete_line + lines[0] |
| 734 | + if lines[-1][-1] not in ["\n", "\r"]: |
| 735 | + incomplete_line = lines.pop(-1) |
| 736 | + else: |
| 737 | + incomplete_line = "" |
| 738 | + for line in lines: |
| 739 | + with lock: |
| 740 | + process_line(line) |
| 741 | + |
| 742 | + def read_scorep_stdout( |
| 743 | + self, |
| 744 | + stdout: IO[AnyStr], |
| 745 | + lock: threading.Lock, |
| 746 | + spinner_stop_event: threading.Event, |
| 747 | + read_chunk_size=64, |
| 748 | + ) -> list: |
| 749 | + multicellmode_timestamps = [] |
| 750 | + line_width = 50 |
| 751 | + clear_line = "\r" + " " * line_width + "\r" |
730 | 752 |
|
| 753 | + def process_stdout_line(line: str): |
| 754 | + if "MCM_TS" in line: |
| 755 | + multicellmode_timestamps.append(line) |
| 756 | + else: |
| 757 | + if spinner_stop_event.is_set(): |
| 758 | + sys.stdout.write(clear_line) |
| 759 | + sys.stdout.flush() |
| 760 | + self.cell_output(line) |
| 761 | + |
| 762 | + self.read_scorep_stream(stdout, lock, process_stdout_line, read_chunk_size) |
731 | 763 | return multicellmode_timestamps |
732 | 764 |
|
| 765 | + def read_scorep_stderr( |
| 766 | + self, |
| 767 | + stderr: IO[AnyStr], |
| 768 | + lock: threading.Lock, |
| 769 | + spinner_stop_event: threading.Event, |
| 770 | + read_chunk_size=64, |
| 771 | + ): |
| 772 | + def process_stderr_line(line: str): |
| 773 | + if spinner_stop_event.is_set(): |
| 774 | + self.cell_output(line) |
| 775 | + self.log.error(line) |
| 776 | + |
| 777 | + self.read_scorep_stream(stderr, lock, process_stderr_line, read_chunk_size) |
| 778 | + |
| 779 | + |
733 | 780 | async def do_execute( |
734 | 781 | self, |
735 | 782 | code, |
@@ -778,6 +825,7 @@ async def do_execute( |
778 | 825 | user_expressions, |
779 | 826 | allow_stdin, |
780 | 827 | cell_id=cell_id, |
| 828 | + is_multicell_final=True, |
781 | 829 | ) |
782 | 830 | except Exception: |
783 | 831 | self.cell_output( |
|
0 commit comments