Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipykernel 7 support #898

Merged
merged 27 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cdd0175
ipykernel 7 compatibility
minrk Oct 22, 2024
3ca0cbb
ruff check --fix
minrk Oct 22, 2024
2da7de9
test with ipykernel branch
minrk Oct 22, 2024
c30e6ce
actually install parallel branch
minrk Oct 22, 2024
76c43be
kernel.io_loop attribute is gone
minrk Oct 22, 2024
d0f9495
update signal handling for ipykernel 7
minrk Oct 22, 2024
36ee6a8
poll uses milliseconds
minrk Oct 25, 2024
31f9613
join is supposed to raise TimeoutError if timeout is reached
minrk Oct 25, 2024
8550ff5
add timeout to restart_engines
minrk Oct 25, 2024
52fa677
suppress common irrelevant debugger warning by default
minrk Oct 25, 2024
7b2f79b
temporarily suppress ruff lint that will cause a lot of churn
minrk Oct 25, 2024
2b28955
Merge remote-tracking branch 'origin/main' into kernel7
minrk Oct 25, 2024
9118015
ipengine: fix log name condition
minrk Oct 25, 2024
e19c40a
improve error handling in engine
minrk Oct 25, 2024
7df31bb
bump some dependencies
minrk Oct 25, 2024
e953def
ensure abort reply is sent
minrk Oct 25, 2024
d8c7271
make sure dask workers actually start
minrk Oct 25, 2024
bf629e1
Sync with main
minrk Oct 25, 2024
8e8289e
better debug output for batch commands
minrk Oct 25, 2024
02d0c09
maybe punt on \n\n in stream
minrk Oct 25, 2024
bbd2dd4
remove extra squeue
minrk Oct 25, 2024
2d6168a
move slurmctld logs later
minrk Oct 25, 2024
fe42715
try increasing slurm timeout
minrk Oct 25, 2024
4666db6
skip cellpx stream
minrk Oct 25, 2024
4343491
no longer need special branch for ipykernel prerelease
minrk Oct 28, 2024
b32c641
run main loop with tornado
minrk Oct 28, 2024
87ec808
hook up control thread in ipykernel 6
minrk Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:
- python: "3.9"
runs_on: macos-14
- python: "3.11"
- python: "3.12"
pre: pre

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -122,7 +124,12 @@ jobs:

- name: Install Python dependencies
run: |
pip install --pre --upgrade ipyparallel[test]
pip install --upgrade ipyparallel[test]

- name: Install pre-release dependencies
if: ${{ matrix.pre }}
run: |
pip install --pre --upgrade ipyparallel[test] 'https://github.com/ipython/ipykernel/archive/main.tar.gz#egg=ipykernel'

- name: Install extra Python packages
if: ${{ ! startsWith(matrix.python, '3.11') }}
Expand Down Expand Up @@ -161,8 +168,8 @@ jobs:
run: |
set -x
docker ps -a
docker logs slurmctld
docker exec -i slurmctld squeue --states=all
docker exec -i slurmctld sinfo
docker logs slurmctld
docker logs c1
docker logs c2
12 changes: 1 addition & 11 deletions ipyparallel/apps/baseapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import re
import sys

import traitlets
from IPython.core.application import BaseIPythonApplication
from IPython.core.application import base_aliases as base_ip_aliases
from IPython.core.application import base_flags as base_ip_flags
Expand All @@ -21,15 +20,6 @@

from .._version import __version__

# FIXME: CUnicode is needed for cli parsing
# with traitlets 4
# bump when we require traitlets 5, which requires Python 3.7
if int(traitlets.__version__.split(".", 1)[0]) < 5:
from traitlets import CUnicode
else:
# don't need CUnicode with traitlets 4
CUnicode = Unicode

# -----------------------------------------------------------------------------
# Module errors
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -107,7 +97,7 @@ def _work_dir_changed(self, change):
'', config=True, help="The ZMQ URL of the iplogger to aggregate logging."
)

cluster_id = CUnicode(
cluster_id = Unicode(
'',
config=True,
help="""String id to add to runtime files, to prevent name collisions when
Expand Down
43 changes: 20 additions & 23 deletions ipyparallel/cluster/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ async def join(self, timeout=None):
"""Wait for the process to exit"""
if self._wait_thread is not None:
self._wait_thread.join(timeout=timeout)
if self._wait_thread.is_alive():
raise TimeoutError(
f"Process {self.process.pid} did not exit in {timeout} seconds."
)

def _stream_file(self, path):
"""Stream one file"""
Expand Down Expand Up @@ -1886,6 +1890,17 @@ def __init__(self, work_dir='.', config=None, **kwargs):
# trigger program_changed to populate default context arguments
self._program_changed()

def _run_command(self, command, **kwargs):
joined_command = shlex_join(command)
self.log.debug("Running command: %s", joined_command)
output = check_output(
command,
stdin=None,
**kwargs,
).decode("utf8", "replace")
self.log.debug("Command %s output: %s", command[0], output)
return output

def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
m = self.job_id_regexp.search(output)
Expand Down Expand Up @@ -1960,42 +1975,24 @@ def start(self, n=1):

env = os.environ.copy()
env.update(self.get_env())
output = check_output(self.args, env=env)
output = output.decode("utf8", 'replace')
self.log.debug(f"Submitted {shlex_join(self.args)}. Output: {output}")
output = self._run_command(self.args, env=env)

job_id = self.parse_job_id(output)
self.notify_start(job_id)
return job_id

def stop(self):
try:
output = check_output(
self.delete_command + [self.job_id],
stdin=None,
).decode("utf8", 'replace')
except Exception:
self.log.exception(
"Problem stopping cluster with command: %s"
% (self.delete_command + [self.job_id])
)
output = ""
command = self.delete_command + [self.job_id]
output = self._run_command(command)

self.notify_stop(
dict(job_id=self.job_id, output=output)
) # Pass the output of the kill cmd
return output

def signal(self, sig):
cmd = self.signal_command + [str(sig), self.job_id]
try:
output = check_output(
cmd,
stdin=None,
).decode("utf8", 'replace')
except Exception:
self.log.exception("Problem sending signal with: {shlex_join(cmd)}")
output = ""
command = self.signal_command + [str(sig), self.job_id]
self._run_command(command)

# same local-file implementation as LocalProcess
# should this be on the base class?
Expand Down
Loading