Skip to content

Commit 6097e91

Browse files
committed
force_unlock_after_fork
1 parent e81a0fc commit 6097e91

File tree

5 files changed

+96
-52
lines changed

5 files changed

+96
-52
lines changed

Lib/test/test_multiprocessing_fork/test_processes.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,6 @@
33

44
install_tests_in_module_dict(globals(), 'fork', only_type="processes")
55

6-
import os, sys # TODO: RUSTPYTHON
7-
class WithProcessesTestCondition(WithProcessesTestCondition): # TODO: RUSTPYTHON
8-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
9-
def test_notify_all(self): super().test_notify_all() # TODO: RUSTPYTHON
10-
11-
class WithProcessesTestLock(WithProcessesTestLock): # TODO: RUSTPYTHON
12-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
13-
def test_repr_lock(self): super().test_repr_lock() # TODO: RUSTPYTHON
14-
15-
class WithProcessesTestManagerRestart(WithProcessesTestManagerRestart): # TODO: RUSTPYTHON
16-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
17-
def test_rapid_restart(self): super().test_rapid_restart() # TODO: RUSTPYTHON
18-
19-
class WithProcessesTestProcess(WithProcessesTestProcess): # TODO: RUSTPYTHON
20-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
21-
def test_args_argument(self): super().test_args_argument() # TODO: RUSTPYTHON
22-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
23-
def test_process(self): super().test_process() # TODO: RUSTPYTHON
24-
25-
class WithProcessesTestPoolWorkerLifetime(WithProcessesTestPoolWorkerLifetime): # TODO: RUSTPYTHON
26-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
27-
def test_pool_worker_lifetime(self): super().test_pool_worker_lifetime() # TODO: RUSTPYTHON
28-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
29-
def test_pool_worker_lifetime_early_close(self): super().test_pool_worker_lifetime_early_close() # TODO: RUSTPYTHON
30-
31-
class WithProcessesTestQueue(WithProcessesTestQueue): # TODO: RUSTPYTHON
32-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
33-
def test_fork(self): super().test_fork() # TODO: RUSTPYTHON
34-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout')
35-
def test_get(self): super().test_get() # TODO: RUSTPYTHON
36-
37-
class WithProcessesTestSharedMemory(WithProcessesTestSharedMemory): # TODO: RUSTPYTHON
38-
@unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError')
39-
def test_shared_memory_SharedMemoryManager_basics(self): super().test_shared_memory_SharedMemoryManager_basics() # TODO: RUSTPYTHON
406

417
if __name__ == '__main__':
428
unittest.main()

crates/vm/src/codecs.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,31 @@ impl ToPyObject for PyCodec {
150150
}
151151

152152
impl CodecsRegistry {
153+
/// Force-unlock the inner RwLock after fork in the child process.
154+
///
155+
/// # Safety
156+
/// Must only be called after fork() in the child process when no other
157+
/// threads exist. The calling thread must NOT hold this lock.
158+
#[cfg(all(unix, feature = "host_env"))]
159+
pub(crate) unsafe fn force_unlock_after_fork(&self) {
160+
if self.inner.try_write().is_some() {
161+
return;
162+
}
163+
let is_shared = self.inner.try_read().is_some();
164+
if is_shared {
165+
loop {
166+
// SAFETY: Lock is shared-locked by dead thread(s).
167+
unsafe { self.inner.force_unlock_read() };
168+
if self.inner.try_write().is_some() {
169+
return;
170+
}
171+
}
172+
} else {
173+
// SAFETY: Lock is exclusively locked by a dead thread.
174+
unsafe { self.inner.force_unlock_write() };
175+
}
176+
}
177+
153178
pub(crate) fn new(ctx: &Context) -> Self {
154179
::rustpython_vm::common::static_cell! {
155180
static METHODS: Box<[PyMethodDef]>;

crates/vm/src/intern.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,32 @@ impl Clone for StringPool {
3131
}
3232

3333
impl StringPool {
34+
/// Force-unlock the inner RwLock after fork in the child process.
35+
///
36+
/// # Safety
37+
/// Must only be called after fork() in the child process when no other
38+
/// threads exist. The calling thread must NOT hold this lock.
39+
#[cfg(all(unix, feature = "host_env"))]
40+
pub(crate) unsafe fn force_unlock_after_fork(&self) {
41+
if self.inner.try_write().is_some() {
42+
return;
43+
}
44+
// Lock is stuck from a thread that no longer exists.
45+
let is_shared = self.inner.try_read().is_some();
46+
if is_shared {
47+
loop {
48+
// SAFETY: Lock is shared-locked by dead thread(s).
49+
unsafe { self.inner.force_unlock_read() };
50+
if self.inner.try_write().is_some() {
51+
return;
52+
}
53+
}
54+
} else {
55+
// SAFETY: Lock is exclusively locked by a dead thread.
56+
unsafe { self.inner.force_unlock_write() };
57+
}
58+
}
59+
3460
#[inline]
3561
pub unsafe fn intern<S: InternableString>(
3662
&self,

crates/vm/src/stdlib/posix.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,22 @@ pub mod module {
707707
#[cfg(feature = "threading")]
708708
crate::object::reset_weakref_locks_after_fork();
709709

710+
// Force-unlock all global VM locks that may have been held by
711+
// threads that no longer exist in the child process after fork.
712+
// SAFETY: After fork, only the forking thread survives. Any lock
713+
// held by another thread is permanently stuck. The forking thread
714+
// does not hold these locks during fork() (a high-level Python op).
715+
unsafe {
716+
vm.ctx.string_pool.force_unlock_after_fork();
717+
vm.state.codec_registry.force_unlock_after_fork();
718+
force_unlock_mutex_after_fork(&vm.state.atexit_funcs);
719+
force_unlock_mutex_after_fork(&vm.state.before_forkers);
720+
force_unlock_mutex_after_fork(&vm.state.after_forkers_child);
721+
force_unlock_mutex_after_fork(&vm.state.after_forkers_parent);
722+
force_unlock_mutex_after_fork(&vm.state.global_trace_func);
723+
force_unlock_mutex_after_fork(&vm.state.global_profile_func);
724+
}
725+
710726
// Mark all other threads as done before running Python callbacks
711727
#[cfg(feature = "threading")]
712728
crate::stdlib::thread::after_fork_child(vm);
@@ -716,18 +732,24 @@ pub mod module {
716732
vm.signal_handlers
717733
.get_or_init(crate::signal::new_signal_handlers);
718734

719-
let after_forkers_child = match vm.state.after_forkers_child.try_lock() {
720-
Some(guard) => guard.clone(),
721-
None => {
722-
// SAFETY: After fork in child process, only the current thread
723-
// exists. The lock holder no longer exists.
724-
unsafe { vm.state.after_forkers_child.force_unlock() };
725-
vm.state.after_forkers_child.lock().clone()
726-
}
727-
};
735+
let after_forkers_child: Vec<PyObjectRef> =
736+
vm.state.after_forkers_child.lock().clone();
728737
run_at_forkers(after_forkers_child, false, vm);
729738
}
730739

740+
/// Force-unlock a PyMutex if held by a dead thread after fork.
741+
///
742+
/// # Safety
743+
/// Must only be called after fork() in the child process.
744+
unsafe fn force_unlock_mutex_after_fork<T>(
745+
mutex: &crate::common::lock::PyMutex<T>,
746+
) {
747+
if mutex.try_lock().is_none() {
748+
// SAFETY: Lock is held by a dead thread after fork.
749+
unsafe { mutex.force_unlock() };
750+
}
751+
}
752+
731753
fn py_os_after_fork_parent(vm: &VirtualMachine) {
732754
let after_forkers_parent: Vec<PyObjectRef> = vm.state.after_forkers_parent.lock().clone();
733755
run_at_forkers(after_forkers_parent, false, vm);

crates/vm/src/stdlib/thread.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -914,11 +914,13 @@ pub(crate) mod _thread {
914914
// Reinitialize frame slot for current thread
915915
crate::vm::thread::reinit_frame_slot_after_fork(vm);
916916

917-
// Clean up thread handles if we can acquire the lock.
918-
// Use try_lock because the mutex might have been held during fork.
919-
// If we can't acquire it, just skip - the child process will work
920-
// correctly with new handles it creates.
921-
if let Some(mut handles) = vm.state.thread_handles.try_lock() {
917+
// Clean up thread handles. Force-unlock if held by a dead thread.
918+
// SAFETY: After fork, only the current thread exists.
919+
if vm.state.thread_handles.try_lock().is_none() {
920+
unsafe { vm.state.thread_handles.force_unlock() };
921+
}
922+
{
923+
let mut handles = vm.state.thread_handles.lock();
922924
// Clean up dead weak refs and mark non-current threads as done
923925
handles.retain(|(inner_weak, done_event_weak): &HandleEntry| {
924926
let Some(inner) = inner_weak.upgrade() else {
@@ -957,10 +959,13 @@ pub(crate) mod _thread {
957959
});
958960
}
959961

960-
// Clean up shutdown_handles as well.
961-
// This is critical to prevent _shutdown() from waiting on threads
962-
// that don't exist in the child process after fork.
963-
if let Some(mut handles) = vm.state.shutdown_handles.try_lock() {
962+
// Clean up shutdown_handles. Force-unlock if held by a dead thread.
963+
// SAFETY: After fork, only the current thread exists.
964+
if vm.state.shutdown_handles.try_lock().is_none() {
965+
unsafe { vm.state.shutdown_handles.force_unlock() };
966+
}
967+
{
968+
let mut handles = vm.state.shutdown_handles.lock();
964969
// Mark all non-current threads as done in shutdown_handles
965970
handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| {
966971
let Some(inner) = inner_weak.upgrade() else {

0 commit comments

Comments
 (0)