diff --git a/checkov/common/util/stopit/__init__.py b/checkov/common/util/stopit/__init__.py index 7b053f092ab..b8c44ccf013 100644 --- a/checkov/common/util/stopit/__init__.py +++ b/checkov/common/util/stopit/__init__.py @@ -15,9 +15,11 @@ from .utils import TimeoutException from .threadstop import ThreadingTimeout, async_raise, threading_timeoutable from .signalstop import SignalTimeout, signal_timeoutable +from .processstop import ProcessTimeout, process_timeoutable __all__ = ( 'ThreadingTimeout', 'async_raise', 'threading_timeoutable', - 'SignalTimeout', 'signal_timeoutable', 'TimeoutException' + 'SignalTimeout', 'signal_timeoutable', 'TimeoutException', + 'ProcessTimeout', 'process_timeoutable' ) diff --git a/checkov/common/util/stopit/processstop.py b/checkov/common/util/stopit/processstop.py new file mode 100644 index 00000000000..5bf09553cb2 --- /dev/null +++ b/checkov/common/util/stopit/processstop.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +""" +================= +stopit.processstop +================= + +Control the timeout of blocks or callables with a context manager or a +decorator. Based on the use of multiprocessing for enforcing timeouts. +""" + +from __future__ import annotations + +import multiprocessing +from typing import Callable, Any + +from .utils import TimeoutException, BaseTimeout, base_timeoutable + + +def process_target(block: Callable, args: tuple, kwargs: dict, return_dict: dict) -> None: + """Run the block of code in a subprocess. + + :param block: The function to execute in the subprocess. + :param args: Positional arguments for the block function. + :param kwargs: Keyword arguments for the block function. + :param return_dict: Shared dictionary to store the result or error. + """ + try: + # Call the block function with provided arguments and store the result + result = block(*args, **kwargs) + return_dict['result'] = result + except Exception as e: + # Store the error in return_dict + return_dict['error'] = str(e) + + +class ProcessTimeout(BaseTimeout): + """Context manager for enforcing timeouts using multiprocessing. + + See :class:`stopit.utils.BaseTimeout` for more information + """ + def __init__(self, seconds: int, swallow_exc: bool = True) -> None: + super().__init__(seconds, swallow_exc) + self.process: multiprocessing.Process | None = None + self.manager: multiprocessing.Manager | None = None + self.return_dict: multiprocessing.Dict | None = None + self.block: Callable | None = None + self.args: tuple = () + self.kwargs: dict = {} + + def set_block(self, block: Callable, *args: Any, **kwargs: Any) -> None: + """Set the block of code to execute + """ + if not callable(block): + raise ValueError("Block function must be callable.") + self.block = block + self.args = args + self.kwargs = kwargs + + def setup_interrupt(self) -> None: + """Setting up the resource that interrupts the block + """ + if not self.block: + raise ValueError("No block function provided for execution.") + + self.manager = multiprocessing.Manager() + self.return_dict = self.manager.dict() + + # Start the subprocess + self.process = multiprocessing.Process( + target=process_target, args=(self.block, self.args, self.kwargs, self.return_dict) + ) + self.process.start() + + # Wait for the process to complete or timeout + self.process.join(self.seconds) + if self.process.is_alive(): + # If still alive after timeout, terminate and raise TimeoutException + self.process.terminate() + self.state = self.TIMED_OUT + raise TimeoutException(f"Block exceeded maximum timeout value ({self.seconds} seconds).") + + def suppress_interrupt(self) -> None: + """Removing the resource that interrupts the block + """ + if self.process and self.process.is_alive(): + self.process.terminate() # Ensure the process is terminated + if 'error' in self.return_dict: + raise Exception(f"Error during execution: {self.return_dict['error']}") + if self.manager: + self.manager.shutdown() + + def get_result(self) -> Any: + """Retrieve the result of the block execution + """ + if self.return_dict and 'result' in self.return_dict: + return self.return_dict['result'] + return None + + +class process_timeoutable(base_timeoutable): # noqa: B903 + """A function or method decorator that raises a ``TimeoutException`` for + decorated functions that exceed a certain amount of time. This uses the + ``ProcessTimeout`` context manager. + + See :class:`.utils.base_timeoutable`` for further comments. + """ + def __init__(self) -> None: + super().__init__() + self.to_ctx_mgr = ProcessTimeout diff --git a/checkov/terraform/tf_parser.py b/checkov/terraform/tf_parser.py index d95c707b755..fe5033eb4be 100644 --- a/checkov/terraform/tf_parser.py +++ b/checkov/terraform/tf_parser.py @@ -17,7 +17,7 @@ from checkov.common.util.data_structures_utils import pickle_deepcopy from checkov.common.util.deep_merge import pickle_deep_merge from checkov.common.util.env_vars_config import env_vars_config -from checkov.common.util.stopit import ThreadingTimeout, SignalTimeout +from checkov.common.util.stopit import ThreadingTimeout, SignalTimeout, ProcessTimeout from checkov.common.util.stopit.utils import BaseTimeout from checkov.common.util.type_forcers import force_list from checkov.common.variables.context import EvaluationContext @@ -742,20 +742,35 @@ def load_or_die_quietly( # if we are not running in a thread, run the hcl2.load function with a timeout, to prevent from getting stuck in parsing. def __parse_with_timeout(f: TextIO) -> dict[str, list[dict[str, Any]]]: + """Parse files with a timeout mechanism. + + Attempts to use SignalTimeout for Unix systems on the main thread, + ThreadingTimeout for Windows, and ProcessTimeout + as a fallback for non-main threads and blocking operations. + """ # setting up timeout class timeout_class: Optional[Type[BaseTimeout]] = None if platform.system() == 'Windows': timeout_class = ThreadingTimeout elif threading.current_thread() is threading.main_thread(): timeout_class = SignalTimeout + else: + timeout_class = ProcessTimeout - # if we're not running on the main thread, don't use timeout parsing_timeout = env_vars_config.HCL_PARSE_TIMEOUT_SEC or 0 - if not timeout_class or not parsing_timeout: + if not parsing_timeout: return hcl2.load(f) - with timeout_class(parsing_timeout) as to_ctx_mgr: - raw_data = hcl2.load(f) + to_ctx_mgr = timeout_class(parsing_timeout) + if isinstance(to_ctx_mgr, ProcessTimeout): + to_ctx_mgr.set_block(hcl2.loads, f.read()) + + with to_ctx_mgr: + if isinstance(to_ctx_mgr, ProcessTimeout): + raw_data = to_ctx_mgr.get_result() + else: + raw_data = hcl2.load(f) + if to_ctx_mgr.state == to_ctx_mgr.TIMED_OUT: logging.debug(f"reached timeout when parsing file {f} using hcl2") raise Exception(f"file took more than {parsing_timeout} seconds to parse")