Skip to content

FIX remove link to resource_tracker._pid in child processes #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion loky/backend/popen_loky_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _launch(self, process_obj):
reduction._mk_inheritable(tracker_fd)
self._fds += [child_r, child_w, tracker_fd]
if sys.version_info >= (3, 8) and os.name == "posix":
mp_tracker_fd = prep_data["mp_tracker_args"]["fd"]
mp_tracker_fd = prep_data["mp_tracker_fd"]
self.duplicate_for_child(mp_tracker_fd)

from .fork_exec import fork_exec
Expand Down
22 changes: 8 additions & 14 deletions loky/backend/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@
from .resource_tracker import _resource_tracker

_resource_tracker.ensure_running()
d["tracker_args"] = {"pid": _resource_tracker._pid}
if sys.platform == "win32":
d["tracker_args"]["fh"] = msvcrt.get_osfhandle(_resource_tracker._fd)
d["tracker_fd"] = msvcrt.get_osfhandle(_resource_tracker._fd)

Check warning on line 86 in loky/backend/spawn.py

View check run for this annotation

Codecov / codecov/patch

loky/backend/spawn.py#L86

Added line #L86 was not covered by tests
else:
d["tracker_args"]["fd"] = _resource_tracker._fd
d["tracker_fd"] = _resource_tracker._fd

if sys.version_info >= (3, 8) and os.name == "posix":
# joblib/loky#242: allow loky processes to retrieve the resource
Expand All @@ -105,10 +104,7 @@
# process is created (othewise the child won't be able to use it if it
# is created later on)
mp_resource_tracker.ensure_running()
d["mp_tracker_args"] = {
"fd": mp_resource_tracker._fd,
"pid": mp_resource_tracker._pid,
}
d["mp_tracker_fd"] = mp_resource_tracker._fd

# Figure out whether to initialise main in the subprocess as a module
# or through direct execution (or to leave it alone entirely)
Expand Down Expand Up @@ -172,23 +168,21 @@
if "orig_dir" in data:
process.ORIGINAL_DIR = data["orig_dir"]

if "mp_tracker_args" in data:
if "mp_tracker_fd" in data:
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker,
)

mp_resource_tracker._fd = data["mp_tracker_args"]["fd"]
mp_resource_tracker._pid = data["mp_tracker_args"]["pid"]
if "tracker_args" in data:
mp_resource_tracker._fd = data["mp_tracker_fd"]
if "tracker_fd" in data:
from .resource_tracker import _resource_tracker

_resource_tracker._pid = data["tracker_args"]["pid"]
if sys.platform == "win32":
handle = data["tracker_args"]["fh"]
handle = data["tracker_fd"]

Check warning on line 181 in loky/backend/spawn.py

View check run for this annotation

Codecov / codecov/patch

loky/backend/spawn.py#L181

Added line #L181 was not covered by tests
handle = duplicate(handle, source_process=parent_sentinel)
_resource_tracker._fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
else:
_resource_tracker._fd = data["tracker_args"]["fd"]
_resource_tracker._fd = data["tracker_fd"]

if "init_main_from_name" in data:
_fixup_main_from_name(data["init_main_from_name"])
Expand Down
62 changes: 31 additions & 31 deletions tests/test_resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def _resource_unlink(name, rtype):
resource_tracker._CLEANUP_FUNCS[rtype](name)


def get_rtracker_pid():
def get_rtracker_fd():
resource_tracker.ensure_running()
return resource_tracker._resource_tracker._pid
return resource_tracker._resource_tracker._fd


class TestResourceTracker:
Expand All @@ -40,12 +40,18 @@ def test_resource_utils(self, rtype):
assert not resource_exists(name, rtype)

def test_child_retrieves_resource_tracker(self):
parent_rtracker_pid = get_rtracker_pid()
executor = ProcessPoolExecutor(max_workers=2)
child_rtracker_pid = executor.submit(get_rtracker_pid).result()

# First simple pid retrieval check (see #200)
assert child_rtracker_pid == parent_rtracker_pid
# First simple fd retrieval check (see #200)
# checking fd only work on posix for now
if sys.platform != "win32":
try:
parent_rtracker_fd = get_rtracker_fd()
executor = ProcessPoolExecutor(max_workers=2)
child_rtracker_fd = executor.submit(get_rtracker_fd).result()

assert child_rtracker_fd == parent_rtracker_fd
finally:
executor.shutdown()

# Register a resource in the parent process, and un-register it in the
# child process. If the two processes do not share the same
Expand Down Expand Up @@ -77,23 +83,20 @@ def maybe_unlink(name, rtype):
e.submit(maybe_unlink, filename, "file").result()
e.shutdown()
"""
try:
p = subprocess.run(
[sys.executable, "-E", "-c", cmd],
capture_output=True,
text=True,
)
filename = p.stdout.strip()

pattern = f"decremented refcount of file {filename}"
assert pattern in p.stderr
assert "leaked" not in p.stderr
p = subprocess.run(
[sys.executable, "-E", "-c", cmd],
capture_output=True,
text=True,
)
filename = p.stdout.strip()

pattern = f"KeyError: '{filename}'"
assert pattern not in p.stderr
pattern = f"decremented refcount of file {filename}"
assert pattern in p.stderr
assert "leaked" not in p.stderr

finally:
executor.shutdown()
pattern = f"KeyError: '{filename}'"
assert pattern not in p.stderr

# The following four tests are inspired from cpython _test_multiprocessing
@pytest.mark.parametrize("rtype", ["file", "folder", "semlock"])
Expand Down Expand Up @@ -294,15 +297,12 @@ def test_loky_process_inherit_multiprocessing_resource_tracker(self):
cmd = """if 1:
from loky import get_reusable_executor
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker
)

def mp_rtracker_getattrs():
def mp_rtracker_getfd():
from multiprocessing.resource_tracker import (
_resource_tracker as mp_resource_tracker
)
return mp_resource_tracker._fd, mp_resource_tracker._pid
return mp_resource_tracker._fd


if __name__ == '__main__':
Expand All @@ -312,9 +312,9 @@ def mp_rtracker_getattrs():

# loky forces the creation of the resource tracker at process
# creation so that loky processes can inherit its file descriptor.
fd, pid = executor.submit(mp_rtracker_getattrs).result()
assert fd == mp_resource_tracker._fd
assert pid == mp_resource_tracker._pid
parent_fd = mp_rtracker_getfd()
child_fd = executor.submit(mp_rtracker_getfd).result()
assert child_fd == parent_fd

# non-regression test for #242: unlinking in a loky process a
# shared_memory segment tracked by multiprocessing and created its
Expand All @@ -326,5 +326,5 @@ def mp_rtracker_getattrs():
p = subprocess.run(
[sys.executable, "-c", cmd], capture_output=True, text=True
)
assert not p.stdout
assert not p.stderr
assert not p.stdout, p.stdout
assert not p.stderr, p.stderr
Loading