|
10 | 10 | import time |
11 | 11 | from enum import Enum |
12 | 12 | from textwrap import dedent |
13 | | -from typing import IO, AnyStr, Callable |
| 13 | +from typing import IO, AnyStr, Callable, List, TextIO |
14 | 14 |
|
15 | 15 | from ipykernel.ipkernel import IPythonKernel |
16 | 16 |
|
@@ -572,7 +572,6 @@ async def scorep_execute( |
572 | 572 | KernelErrorCode.PERSISTENCE_LOAD_FAIL, |
573 | 573 | direction="Score-P -> Jupyter", |
574 | 574 | optional_hint = get_scorep_process_error_hint() |
575 | | - |
576 | 575 | ) |
577 | 576 | return self.standard_reply() |
578 | 577 |
|
@@ -665,62 +664,78 @@ def start_reading_scorep_process_streams( |
665 | 664 | process_busy_spinner = create_busy_spinner( |
666 | 665 | stdout_lock, spinner_stop_event, is_multicell_final |
667 | 666 | ) |
| 667 | + |
| 668 | + captured_stdout: List[str] = [] |
| 669 | + captured_stderr: List[str] = [] # Output parameter (return not possible from thread) |
668 | 670 | t_stderr = threading.Thread( |
669 | 671 | target=self.read_scorep_stderr, |
670 | | - args=(proc.stderr, stdout_lock, spinner_stop_event), |
| 672 | + args=(proc.stderr, stdout_lock, spinner_stop_event, captured_stderr), |
671 | 673 | ) |
672 | 674 |
|
673 | 675 | # Empty cell output, required for interactive output |
674 | 676 | # e.g. tqdm for-loop progress bar |
675 | 677 | self.cell_output("\0") |
676 | 678 |
|
| 679 | + spinner_message = "Done." |
| 680 | + |
677 | 681 | try: |
678 | 682 | process_busy_spinner.start("Process is running...") |
679 | 683 | t_stderr.start() |
680 | 684 |
|
681 | | - self.read_scorep_stdout( |
| 685 | + captured_stdout = self.read_scorep_stdout( |
682 | 686 | proc.stdout, stdout_lock, spinner_stop_event |
683 | 687 | ) |
684 | 688 |
|
685 | | - process_busy_spinner.stop("Done.") |
686 | | - |
687 | 689 | except KeyboardInterrupt: |
688 | | - process_busy_spinner.stop("Kernel interrupted.") |
| 690 | + spinner_message = "Kernel interrupted." |
689 | 691 | finally: |
690 | 692 | t_stderr.join() |
| 693 | + process_busy_spinner.stop(spinner_message) |
| 694 | + |
| 695 | + # Handle recorded output (in case if it is suppressed by spinner animation) |
| 696 | + self.handle_captured_output(captured_stdout, stream="stdout") |
| 697 | + self.handle_captured_output(captured_stderr, stream="stderr") |
691 | 698 |
|
692 | 699 | def read_scorep_stdout( |
693 | 700 | self, |
694 | 701 | stdout: IO[AnyStr], |
695 | 702 | lock: threading.Lock, |
696 | 703 | spinner_stop_event: threading.Event, |
697 | 704 | read_chunk_size=64, |
698 | | - ): |
| 705 | + ) -> List[str]: |
699 | 706 | line_width = 50 |
700 | 707 | clear_line = "\r" + " " * line_width + "\r" |
701 | 708 |
|
| 709 | + captured_stdout: List[str] = [] |
| 710 | + |
702 | 711 | def process_stdout_line(line: str): |
703 | 712 | if spinner_stop_event.is_set(): |
704 | 713 | sys.stdout.write(clear_line) |
705 | 714 | sys.stdout.flush() |
706 | 715 | self.cell_output(line) |
| 716 | + else: |
| 717 | + captured_stdout.append(line) |
707 | 718 |
|
708 | 719 | self.read_scorep_stream( |
709 | 720 | stdout, lock, process_stdout_line, read_chunk_size |
710 | 721 | ) |
| 722 | + return captured_stdout |
711 | 723 |
|
712 | 724 | def read_scorep_stderr( |
713 | 725 | self, |
714 | 726 | stderr: IO[AnyStr], |
715 | 727 | lock: threading.Lock, |
716 | 728 | spinner_stop_event: threading.Event, |
| 729 | + captured_stderr: List[str], |
717 | 730 | read_chunk_size=64, |
718 | 731 | ): |
| 732 | + |
719 | 733 | def process_stderr_line(line: str): |
720 | | - self.log.error(line.strip()) |
721 | 734 | if spinner_stop_event.is_set(): |
722 | | - self.cell_output(line) |
723 | | - |
| 735 | + self.log.error(line.strip()) |
| 736 | + self.cell_output(line, 'stderr') |
| 737 | + else: |
| 738 | + captured_stderr.append(line) |
724 | 739 |
|
725 | 740 | self.read_scorep_stream( |
726 | 741 | stderr, lock, process_stderr_line, read_chunk_size |
@@ -752,6 +767,17 @@ def read_scorep_stream( |
752 | 767 | with lock: |
753 | 768 | process_line(line) |
754 | 769 |
|
| 770 | + def handle_captured_output(self, output: List[str], stream: str): |
| 771 | + if output: |
| 772 | + text_output = "".join(output) |
| 773 | + if stream == "stdout": |
| 774 | + self.cell_output(text_output, stream=stream) |
| 775 | + elif stream == "stderr": |
| 776 | + self.cell_output(text_output, stream=stream) |
| 777 | + self.log.error(text_output) |
| 778 | + else: |
| 779 | + self.log.error(f"Undefined stream type: {stream}") |
| 780 | + |
755 | 781 | async def do_execute( |
756 | 782 | self, |
757 | 783 | code, |
|
0 commit comments