Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def arbitrary_address(family):
if family == 'AF_INET':
return ('localhost', 0)
elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
return tempfile.mktemp(prefix='sock-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)), dir="")
Expand Down
253 changes: 176 additions & 77 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# this resource tracker process, "killall python" would probably leave unlinked
# resources.

import base64
import os
import signal
import sys
import threading
import warnings
from collections import deque

import json

from . import spawn
from . import util
Expand Down Expand Up @@ -66,6 +70,14 @@ def __init__(self):
self._fd = None
self._pid = None
self._exitcode = None
self._reentrant_messages = deque()

# True to use colon-separated lines, rather than JSON lines,
# for internal communication. (Mainly for testing).
# Filenames not supported by the simple format will always be sent
# using JSON.
# The reader should understand all formats.
self._use_simple_format = True

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
Expand Down Expand Up @@ -102,7 +114,7 @@ def _stop_locked(
# This shouldn't happen (it might when called by a finalizer)
# so we check for it anyway.
if self._lock._recursion_count() > 1:
return self._reentrant_call_error()
raise self._reentrant_call_error()
if self._fd is None:
# not running
return
Expand All @@ -113,7 +125,12 @@ def _stop_locked(
close(self._fd)
self._fd = None

_, status = waitpid(self._pid, 0)
try:
_, status = waitpid(self._pid, 0)
except ChildProcessError:
self._pid = None
self._exitcode = None
return

self._pid = None

Expand All @@ -132,76 +149,119 @@ def ensure_running(self):

This can be run from any process. Usually a child process will use
the resource created by its parent.'''
return self._ensure_running_and_write()

def _teardown_dead_process(self):
os.close(self._fd)

# Clean-up to avoid dangling processes.
try:
# _pid can be None if this process is a child from another
# python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None
self._exitcode = None

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')

def _launch(self):
fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
r, w = os.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [
exe,
*util._args_from_interpreter_flags(),
'-c',
f'from multiprocessing.resource_tracker import main;main({r})',
]
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
prev_sigmask = None
try:
if _HAVE_SIGMASK:
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
if prev_sigmask is not None:
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
except:
os.close(w)
raise
else:
self._fd = w
self._pid = pid
finally:
os.close(r)

def _make_probe_message(self):
"""Return a probe message."""
if self._use_simple_format:
return b'PROBE:0:noop\n'
return (
json.dumps(
{"cmd": "PROBE", "rtype": "noop"},
ensure_ascii=True,
separators=(",", ":"),
)
+ "\n"
).encode("ascii")

def _ensure_running_and_write(self, msg=None):
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
return self._reentrant_call_error()
if msg is None:
raise self._reentrant_call_error()
return self._reentrant_messages.append(msg)

if self._fd is not None:
# resource tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
# => dead, launch it again
os.close(self._fd)

# Clean-up to avoid dangling processes.
if msg is None:
to_send = self._make_probe_message()
else:
to_send = msg
try:
# _pid can be None if this process is a child from another
# python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None
self._exitcode = None
self._write(to_send)
except OSError:
self._teardown_dead_process()
self._launch()

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
msg = None # message was sent in probe
else:
self._launch()

fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
r, w = os.pipe()
while True:
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
prev_sigmask = None
try:
if _HAVE_SIGMASK:
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
if prev_sigmask is not None:
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
except:
os.close(w)
raise
else:
self._fd = w
self._pid = pid
finally:
os.close(r)
reentrant_msg = self._reentrant_messages.popleft()
except IndexError:
break
self._write(reentrant_msg)
if msg is not None:
self._write(msg)

def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
os.write(self._fd, b'PROBE:0:noop\n')
os.write(self._fd, self._make_probe_message())
except OSError:
return False
else:
Expand All @@ -215,27 +275,42 @@ def unregister(self, name, rtype):
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)

def _send(self, cmd, name, rtype):
try:
self.ensure_running()
except ReentrantCallError:
# The code below might or might not work, depending on whether
# the resource tracker was already running and still alive.
# Better warn the user.
# (XXX is warnings.warn itself reentrant-safe? :-)
warnings.warn(
f"ResourceTracker called reentrantly for resource cleanup, "
f"which is unsupported. "
f"The {rtype} object {name!r} might leak.")
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
def _write(self, msg):
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
nbytes, len(msg))
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"

def _send(self, cmd, name, rtype):
if self._use_simple_format and '\n' not in name:
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
self._ensure_running_and_write(msg)
return

# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
# POSIX shm_open() and sem_open() require the name, including its leading slash,
# to be at most NAME_MAX bytes (255 on Linux)
# With json.dump(..., ensure_ascii=True) every non-ASCII byte becomes a 6-char
# escape like \uDC80.
# As we want the overall message to be kept atomic and therefore smaller than 512,
# we encode encode the raw name bytes with URL-safe Base64 - so a 255 long name
# will not exceed 340 bytes.
b = name.encode('utf-8', 'surrogateescape')
if len(b) > 255:
raise ValueError('shared memory name too long (max 255 bytes)')
b64 = base64.urlsafe_b64encode(b).decode('ascii')

payload = {"cmd": cmd, "rtype": rtype, "base64_name": b64}
msg = (json.dumps(payload, ensure_ascii=True, separators=(",", ":")) + "\n").encode("ascii")

# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
assert msg.startswith(b'{')

self._ensure_running_and_write(msg)

_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
Expand All @@ -244,6 +319,30 @@ def _send(self, cmd, name, rtype):
getfd = _resource_tracker.getfd


def _decode_message(line):
if line.startswith(b'{'):
try:
obj = json.loads(line.decode('ascii'))
except Exception as e:
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e

cmd = obj["cmd"]
rtype = obj["rtype"]
b64 = obj.get("base64_name", "")

if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
raise ValueError("malformed resource_tracker fields: %r" % (obj,))

try:
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
except ValueError as e:
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
else:
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
name, rtype = rest.rsplit(':', maxsplit=1)
return cmd, rtype, name


def main(fd):
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
Expand All @@ -266,7 +365,7 @@ def main(fd):
with open(fd, 'rb') as f:
for line in f:
try:
cmd, name, rtype = line.strip().decode('ascii').split(':')
cmd, rtype, name = _decode_message(line)
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
Expand Down
Loading
Loading