diff --git a/changelog/59959.fixed.md b/changelog/59959.fixed.md new file mode 100644 index 000000000000..4e98e5f7e5de --- /dev/null +++ b/changelog/59959.fixed.md @@ -0,0 +1 @@ +Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states) diff --git a/salt/modules/state.py b/salt/modules/state.py index cf4a592fd2f7..33819f4a6b73 100644 --- a/salt/modules/state.py +++ b/salt/modules/state.py @@ -1926,7 +1926,7 @@ def sls_id(id_, mods, test=None, queue=None, state_events=None, **kwargs): ret = {} for chunk in chunks: if chunk.get("__id__", "") == id_: - ret.update(st_.state.call_chunk(chunk, {}, chunks)) + ret.update(st_.state.call_chunk(chunk, {}, chunks)[0]) _set_retcode(ret, highstate=highstate) # Work around Windows multiprocessing bug, set __opts__['test'] back to diff --git a/salt/state.py b/salt/state.py index 9865e602597f..8c9059fc1833 100644 --- a/salt/state.py +++ b/salt/state.py @@ -2468,6 +2468,20 @@ def call_chunks( """ Iterate over a list of chunks and call them, checking for requires. """ + + def _call_pending( + pending: dict[str, LowChunk], running: dict[str, dict] + ) -> tuple[dict[str, LowChunk], dict[str, dict], bool]: + still_pending = {} + for tag, pend in pending.items(): + if tag not in running: + running, is_pending = self.call_chunk(pend, running, chunks) + if is_pending: + still_pending[tag] = pend + if self.check_failhard(pend, running): + return still_pending, running, True + return still_pending, running, False + if disabled_states is None: # Check for any disabled states disabled = {} @@ -2476,7 +2490,11 @@ def call_chunks( else: disabled = disabled_states running = {} + pending_chunks = {} for low in chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running if "__FAILHARD__" in running: running.pop("__FAILHARD__") return running @@ -2486,14 +2504,21 @@ def call_chunks( action = self.check_pause(low) if action == "kill": break - running = self.call_chunk(low, running, chunks) + running, pending = self.call_chunk(low, running, chunks) + if pending: + pending_chunks[tag] = low if self.check_failhard(low, running): return running + while pending_chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running + time.sleep(0.01) while True: if self.reconcile_procs(running): break time.sleep(0.01) - ret = dict(list(disabled.items()) + list(running.items())) + ret = {**disabled, **running} return ret def check_failhard(self, low: LowChunk, running: dict[str, dict]): @@ -2599,6 +2624,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): states. """ reqs = {} + pending = False for req_type, chunk in self.dependency_dag.get_dependencies(low): reqs.setdefault(req_type, []).append(chunk) fun_stats = set() @@ -2618,16 +2644,21 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): filtered_run_dict[tag] = run_dict_chunk run_dict = filtered_run_dict - while True: - if self.reconcile_procs(run_dict): - break - time.sleep(0.01) + if low.get("parallel"): + pending = not self.reconcile_procs(run_dict) + else: + while True: + if self.reconcile_procs(run_dict): + break + time.sleep(0.01) for chunk in chunks: tag = _gen_tag(chunk) if tag not in run_dict: req_stats.add("unmet") continue + if pending: + continue # A state can include a "skip_req" key in the return dict # with a True value to skip triggering onchanges, watch, or # other requisites which would result in a only running on a @@ -2684,6 +2715,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): if "unmet" in fun_stats: status = "unmet" + elif pending: + status = "pending" elif "fail" in fun_stats: status = "fail" elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}): @@ -2765,7 +2798,7 @@ def call_chunk( running: dict[str, dict], chunks: Sequence[LowChunk], depth: int = 0, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: """ Execute the chunk if the requisites did not fail """ @@ -2781,8 +2814,13 @@ def call_chunk( status, reqs = self._check_requisites(low, running) if status == "unmet": - if self._call_unmet_requisites(low, running, chunks, tag, depth): - return running + running_failhard, pending = self._call_unmet_requisites( + low, running, chunks, tag, depth + ) + if running_failhard or pending: + return running, pending + elif status == "pending": + return running, True elif status == "met": if low.get("__prereq__"): self.pre[tag] = self.call(low, chunks, running) @@ -2910,7 +2948,7 @@ def call_chunk( for key in ("__sls__", "__id__", "name"): running[sub_tag][key] = low.get(key) - return running + return running, False def _assign_not_run_result_dict( self, @@ -2944,16 +2982,19 @@ def _call_unmet_requisites( chunks: Sequence[LowChunk], tag: str, depth: int, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: + pending = False for _, chunk in self.dependency_dag.get_dependencies(low): # Check to see if the chunk has been run, only run it if # it has not been run already ctag = _gen_tag(chunk) if ctag not in running: - running = self.call_chunk(chunk, running, chunks) + running, pending = self.call_chunk(chunk, running, chunks) + if pending: + return running, pending if self.check_failhard(chunk, running): running["__FAILHARD__"] = True - return running + return running, pending if low.get("__prereq__"): status, _ = self._check_requisites(low, running) self.pre[tag] = self.call(low, chunks, running) @@ -2975,11 +3016,11 @@ def _call_unmet_requisites( for key in ("__sls__", "__id__", "name"): running[tag][key] = low.get(key) else: - running = self.call_chunk(low, running, chunks, depth) + running, pending = self.call_chunk(low, running, chunks, depth) if self.check_failhard(low, running): running["__FAILHARD__"] = True - return running - return {} + return running, pending + return {}, pending def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict: """ diff --git a/tests/pytests/functional/modules/state/requisites/test_require.py b/tests/pytests/functional/modules/state/requisites/test_require.py index ce2a3fd0109d..861b9f4d4db1 100644 --- a/tests/pytests/functional/modules/state/requisites/test_require.py +++ b/tests/pytests/functional/modules/state/requisites/test_require.py @@ -1,3 +1,4 @@ +import datetime import time import pytest @@ -537,6 +538,104 @@ def test_parallel_state_with_requires(state, state_tree): assert "__parallel__" in ret[_id] +@pytest.mark.skip_on_windows +def test_parallel_state_with_requires_on_parallel(state, state_tree): + """ + Parallel states requiring other parallel states should not block + state execution while waiting on their requisites. + + Issue #59959 + """ + sls_contents = """ + service_a: + cmd.run: + - name: sleep 2 + - parallel: True + + service_b1: + cmd.run: + - name: sleep 5 + - parallel: True + - require: + - service_a + + service_b2: + cmd.run: + - name: 'true' + - parallel: True + - require: + - service_b1 + + service_c: + cmd.run: + - name: 'true' + - parallel: True + - require: + - service_a + """ + + with pytest.helpers.temp_file("requisite_parallel.sls", sls_contents, state_tree): + ret = state.sls( + "requisite_parallel", + __pub_jid="1", # Because these run in parallel we need a fake JID) + ) + start_b1 = datetime.datetime.combine( + datetime.date.today(), + datetime.time.fromisoformat( + ret["cmd_|-service_b1_|-sleep 5_|-run"]["start_time"] + ), + ) + start_c = datetime.datetime.combine( + datetime.date.today(), + datetime.time.fromisoformat( + ret["cmd_|-service_c_|-true_|-run"]["start_time"] + ), + ) + start_diff = start_c - start_b1 + # Expected order: + # a > (b1, c) > b2 + # When b2 blocks while waiting for b1, c has to wait for b1 as well. + # c should approximately start at the same time as b1 though. + assert start_diff < datetime.timedelta(seconds=5) # b1 sleeps for 5 seconds + for state_ret in ret.raw.values(): + assert "__parallel__" in state_ret + + +@pytest.mark.skip_on_windows +def test_regular_state_requires_parallel(state, state_tree, tmp_path): + """ + Regular states requiring parallel states should block until all + requisites are executed. + """ + tmpfile = tmp_path / "foo" + sls_contents = f""" + service_a: + cmd.run: + - name: sleep 3 + - parallel: True + + service_b: + cmd.run: + - name: 'touch {tmpfile}' + - parallel: True + - require: + - service_a + + service_c: + cmd.run: + - name: 'test -f {tmpfile}' + - require: + - service_b + """ + + with pytest.helpers.temp_file("requisite_parallel_2.sls", sls_contents, state_tree): + ret = state.sls( + "requisite_parallel_2", + __pub_jid="1", # Because these run in parallel we need a fake JID) + ) + assert ret[f"cmd_|-service_c_|-test -f {tmpfile}_|-run"]["result"] is True + + def test_issue_59922_conflict_in_name_and_id_for_require_in(state, state_tree): """ Make sure that state_type is always honored while compiling down require_in to diff --git a/tests/pytests/unit/modules/state/test_state.py b/tests/pytests/unit/modules/state/test_state.py index c8fbafb6c1bb..8fb78d71b8c5 100644 --- a/tests/pytests/unit/modules/state/test_state.py +++ b/tests/pytests/unit/modules/state/test_state.py @@ -107,7 +107,7 @@ def call_chunk(data, data1, data2): """ Mock call_chunk method """ - return {"": "ABC"} + return {"": "ABC"}, False @staticmethod def call_chunks(data): diff --git a/tests/pytests/unit/state/test_state_compiler.py b/tests/pytests/unit/state/test_state_compiler.py index c2a474f1892b..122a12aaa769 100644 --- a/tests/pytests/unit/state/test_state_compiler.py +++ b/tests/pytests/unit/state/test_state_compiler.py @@ -853,7 +853,7 @@ def test_call_chunk_sub_state_run(minion_opts): with patch("salt.state.State.call", return_value=mock_call_return): minion_opts["disabled_requisites"] = ["require"] state_obj = salt.state.State(minion_opts) - ret = state_obj.call_chunk(low_data, {}, []) + ret, _ = state_obj.call_chunk(low_data, {}, []) sub_state = ret.get(expected_sub_state_tag) assert sub_state assert sub_state["__run_num__"] == 1