Skip to content

FIX remove link to resource_tracker._pid in child processes #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
### 3.6.0 - in development

- Fix resource_tracker teardown to accomodate with newer version of
Python (3.12.10+, 3.13.3+, 3.14+). (#450)

### 3.5.1 - 2025-03-18

Expand Down
2 changes: 1 addition & 1 deletion loky/backend/popen_loky_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _launch(self, process_obj):
reduction._mk_inheritable(tracker_fd)
self._fds += [child_r, child_w, tracker_fd]
if sys.version_info >= (3, 8) and os.name == "posix":
mp_tracker_fd = prep_data["mp_tracker_args"]["fd"]
mp_tracker_fd = prep_data["mp_tracker_fd"]
self.duplicate_for_child(mp_tracker_fd)

from .fork_exec import fork_exec
Expand Down
11 changes: 11 additions & 0 deletions loky/backend/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@
else:
os.close(r)

def __del__(self):
# ignore error due to trying to clean up child process which has already been
# shutdown on windows See https://github.com/joblib/loky/pull/450
# This is only required if __del__ is defined
if not hasattr(ResourceTracker, "__del__"):
return
try:
super().__del__()
except ChildProcessError:
pass

Check warning on line 183 in loky/backend/resource_tracker.py

View check run for this annotation

Codecov / codecov/patch

loky/backend/resource_tracker.py#L179-L183

Added lines #L179 - L183 were not covered by tests


_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
Expand Down
22 changes: 8 additions & 14 deletions loky/backend/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ def get_preparation_data(name, init_main_module=True):
from .resource_tracker import _resource_tracker

_resource_tracker.ensure_running()
d["tracker_args"] = {"pid": _resource_tracker._pid}
if sys.platform == "win32":
d["tracker_args"]["fh"] = msvcrt.get_osfhandle(_resource_tracker._fd)
d["tracker_fd"] = msvcrt.get_osfhandle(_resource_tracker._fd)
else:
d["tracker_args"]["fd"] = _resource_tracker._fd
d["tracker_fd"] = _resource_tracker._fd

if sys.version_info >= (3, 8) and os.name == "posix":
# joblib/loky#242: allow loky processes to retrieve the resource
Expand All @@ -105,10 +104,7 @@ def get_preparation_data(name, init_main_module=True):
# process is created (othewise the child won't be able to use it if it
# is created later on)
mp_resource_tracker.ensure_running()
d["mp_tracker_args"] = {
"fd": mp_resource_tracker._fd,
"pid": mp_resource_tracker._pid,
}
d["mp_tracker_fd"] = mp_resource_tracker._fd

# Figure out whether to initialise main in the subprocess as a module
# or through direct execution (or to leave it alone entirely)
Expand Down Expand Up @@ -172,23 +168,21 @@ def prepare(data, parent_sentinel=None):
if "orig_dir" in data:
process.ORIGINAL_DIR = data["orig_dir"]

if "mp_tracker_args" in data:
if "mp_tracker_fd" in data:
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker,
)

mp_resource_tracker._fd = data["mp_tracker_args"]["fd"]
mp_resource_tracker._pid = data["mp_tracker_args"]["pid"]
if "tracker_args" in data:
mp_resource_tracker._fd = data["mp_tracker_fd"]
if "tracker_fd" in data:
from .resource_tracker import _resource_tracker

_resource_tracker._pid = data["tracker_args"]["pid"]
if sys.platform == "win32":
handle = data["tracker_args"]["fh"]
handle = data["tracker_fd"]
handle = duplicate(handle, source_process=parent_sentinel)
_resource_tracker._fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
else:
_resource_tracker._fd = data["tracker_args"]["fd"]
_resource_tracker._fd = data["tracker_fd"]

if "init_main_from_name" in data:
_fixup_main_from_name(data["init_main_from_name"])
Expand Down
62 changes: 31 additions & 31 deletions tests/test_resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def _resource_unlink(name, rtype):
resource_tracker._CLEANUP_FUNCS[rtype](name)


def get_rtracker_pid():
def get_rtracker_fd():
resource_tracker.ensure_running()
return resource_tracker._resource_tracker._pid
return resource_tracker._resource_tracker._fd


class TestResourceTracker:
Expand All @@ -40,12 +40,18 @@ def test_resource_utils(self, rtype):
assert not resource_exists(name, rtype)

def test_child_retrieves_resource_tracker(self):
parent_rtracker_pid = get_rtracker_pid()
executor = ProcessPoolExecutor(max_workers=2)
child_rtracker_pid = executor.submit(get_rtracker_pid).result()

# First simple pid retrieval check (see #200)
assert child_rtracker_pid == parent_rtracker_pid
# First simple fd retrieval check (see #200)
# checking fd only work on posix for now
if sys.platform != "win32":
try:
parent_rtracker_fd = get_rtracker_fd()
executor = ProcessPoolExecutor(max_workers=2)
child_rtracker_fd = executor.submit(get_rtracker_fd).result()

assert child_rtracker_fd == parent_rtracker_fd
finally:
executor.shutdown()

# Register a resource in the parent process, and un-register it in the
# child process. If the two processes do not share the same
Expand Down Expand Up @@ -77,23 +83,20 @@ def maybe_unlink(name, rtype):
e.submit(maybe_unlink, filename, "file").result()
e.shutdown()
"""
try:
p = subprocess.run(
[sys.executable, "-E", "-c", cmd],
capture_output=True,
text=True,
)
filename = p.stdout.strip()

pattern = f"decremented refcount of file {filename}"
assert pattern in p.stderr
assert "leaked" not in p.stderr
p = subprocess.run(
[sys.executable, "-E", "-c", cmd],
capture_output=True,
text=True,
)
filename = p.stdout.strip()

pattern = f"KeyError: '{filename}'"
assert pattern not in p.stderr
pattern = f"decremented refcount of file {filename}"
assert pattern in p.stderr
assert "leaked" not in p.stderr

finally:
executor.shutdown()
pattern = f"KeyError: '{filename}'"
assert pattern not in p.stderr

# The following four tests are inspired from cpython _test_multiprocessing
@pytest.mark.parametrize("rtype", ["file", "folder", "semlock"])
Expand Down Expand Up @@ -294,15 +297,12 @@ def test_loky_process_inherit_multiprocessing_resource_tracker(self):
cmd = """if 1:
from loky import get_reusable_executor
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker
)
def mp_rtracker_getattrs():
def mp_rtracker_getfd():
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker
)
return mp_resource_tracker._fd, mp_resource_tracker._pid
return mp_resource_tracker._fd
if __name__ == '__main__':
Expand All @@ -312,9 +312,9 @@ def mp_rtracker_getattrs():
# loky forces the creation of the resource tracker at process
# creation so that loky processes can inherit its file descriptor.
fd, pid = executor.submit(mp_rtracker_getattrs).result()
assert fd == mp_resource_tracker._fd
assert pid == mp_resource_tracker._pid
parent_fd = mp_rtracker_getfd()
child_fd = executor.submit(mp_rtracker_getfd).result()
assert child_fd == parent_fd
# non-regression test for #242: unlinking in a loky process a
# shared_memory segment tracked by multiprocessing and created its
Expand All @@ -326,5 +326,5 @@ def mp_rtracker_getattrs():
p = subprocess.run(
[sys.executable, "-c", cmd], capture_output=True, text=True
)
assert not p.stdout
assert not p.stderr
assert not p.stdout, p.stdout
assert not p.stderr, p.stderr
Loading