-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Await fsm checkpoint #6765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Await fsm checkpoint #6765
Changes from 1 commit
e9a70c7
68da611
1bf2bdf
1726b36
fd57302
ecfabb8
bc9b80a
1a1c97a
3997507
551d025
665790f
c2edc6b
4dc6120
9ac5e54
8ea2822
66dc584
6c26391
a1c1891
f41b080
5f547a5
3ed0799
9704677
965b201
9f354c2
b50f1f3
f8fc889
26ac488
b2ba6ea
2dfed2d
d8aabdd
6d54427
a1a7ec6
3620c13
fe31a3d
8eaf89f
5544761
13117d6
63c44b5
c6806af
9844d0d
a07fed6
06e9e6a
072c0cc
5e01466
b9dc39d
d5afdde
9f3c462
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Added a new entry to `.gitignore` for `actor_complex_demo.rpsnap` to improve source management. - Introduced methods `get_stack` and `push_stack_value` in the `Frame` struct to support multi-frame checkpointing. - Refactored checkpoint saving functions to handle both instruction pointer and stack state for improved robustness. - Updated demo script `actor_complex_demo.py` to showcase the enhanced checkpoint and resume functionality in a complex actor model scenario.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,6 @@ use std::fs; | |
|
|
||
| #[allow(dead_code)] | ||
| pub(crate) fn save_checkpoint(vm: &VirtualMachine, path: &str) -> PyResult<()> { | ||
| eprintln!("DEBUG: save_checkpoint called"); | ||
| let frames = vm.frames.borrow(); | ||
| if frames.is_empty() { | ||
| return Err(vm.new_runtime_error("checkpoint requires an active frame".to_owned())); | ||
|
|
@@ -22,41 +21,48 @@ pub(crate) fn save_checkpoint(vm: &VirtualMachine, path: &str) -> PyResult<()> { | |
| let frame_refs: Vec<_> = frames.iter().map(|f| f.to_owned()).collect(); | ||
| drop(frames); // Release borrow | ||
|
|
||
| eprintln!("DEBUG: Got {} frames", frame_refs.len()); | ||
|
|
||
| // Temporarily skip validation to avoid potential deadlock | ||
| // TODO: Re-enable validation after fixing the issue | ||
| // for frame in &frame_refs { | ||
| // validate_frame_for_checkpoint(vm, frame)?; | ||
| // } | ||
|
Comment on lines
+23
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's examine the checkpoint.rs file to see the context around lines 23-29
head -50 crates/vm/src/vm/checkpoint.rs | cat -nRepository: RustPython/RustPython Length of output: 2283 🏁 Script executed: # Search for the validate_frame_for_checkpoint function
rg -A 20 "fn validate_frame_for_checkpoint" crates/vm/src/vm/checkpoint.rsRepository: RustPython/RustPython Length of output: 826 🏁 Script executed: # Look for any TODO or FIXME comments related to deadlock/validation in the file
rg -B 2 -A 2 "deadlock|TODO|FIXME" crates/vm/src/vm/checkpoint.rsRepository: RustPython/RustPython Length of output: 282 🏁 Script executed: # Check what imports are in the file to see what synchronization primitives might be used
head -20 crates/vm/src/vm/checkpoint.rs | cat -nRepository: RustPython/RustPython Length of output: 768 🏁 Script executed: # Search for checkpoint_stack to understand what it does
rg -B 3 -A 10 "fn checkpoint_stack" crates/Repository: RustPython/RustPython Length of output: 884 🏁 Script executed: # Look for all calls to validate_frame_for_checkpoint in the file
rg "validate_frame_for_checkpoint" crates/vm/src/vm/checkpoint.rsRepository: RustPython/RustPython Length of output: 306 🏁 Script executed: # Check if validation happens anywhere else in the save_checkpoint_bytes functions
rg -B 5 -A 5 "validate_frame_for_checkpoint" crates/vm/src/vm/checkpoint.rsRepository: RustPython/RustPython Length of output: 1476 🏁 Script executed: # Look at the complete save_checkpoint_bytes_from_frames function to see if validation happens there
rg -A 30 "fn save_checkpoint_bytes_from_frames" crates/vm/src/vm/checkpoint.rsRepository: RustPython/RustPython Length of output: 2613 Re-enable frame validation for consistency with other checkpoint variants. Validation is commented out in 🤖 Prompt for AI Agents |
||
|
|
||
| eprintln!("DEBUG: Calling save_checkpoint_bytes_from_frames"); | ||
| let data = save_checkpoint_bytes_from_frames(vm, &frame_refs, None)?; | ||
| eprintln!("DEBUG: Writing {} bytes to {}", data.len(), path); | ||
| fs::write(path, &data).map_err(|err| vm.new_os_error(format!("checkpoint write failed: {err}")))?; | ||
| eprintln!("DEBUG: File written"); | ||
| Ok(()) | ||
| } | ||
|
|
||
| // Version that accepts the innermost frame's resume_lasti (already validated) | ||
| pub(crate) fn save_checkpoint_with_lasti(vm: &VirtualMachine, path: &str, innermost_resume_lasti: u32) -> PyResult<()> { | ||
| eprintln!("DEBUG: save_checkpoint_with_lasti called, resume_lasti={}", innermost_resume_lasti); | ||
| save_checkpoint_with_lasti_and_stack(vm, path, innermost_resume_lasti, Vec::new()) | ||
| } | ||
|
|
||
| // Version that accepts both resume_lasti and the innermost frame's stack | ||
| pub(crate) fn save_checkpoint_with_lasti_and_stack( | ||
| vm: &VirtualMachine, | ||
| path: &str, | ||
| innermost_resume_lasti: u32, | ||
| innermost_stack: Vec<crate::PyObjectRef> | ||
| ) -> PyResult<()> { | ||
| let frames = vm.frames.borrow(); | ||
| if frames.is_empty() { | ||
| return Err(vm.new_runtime_error("checkpoint requires an active frame".to_owned())); | ||
| } | ||
|
|
||
|
|
||
| // Get all frames in the stack | ||
| let frame_refs: Vec<_> = frames.iter().map(|f| f.to_owned()).collect(); | ||
| drop(frames); // Release borrow | ||
|
|
||
| eprintln!("DEBUG: Got {} frames", frame_refs.len()); | ||
|
|
||
| eprintln!("DEBUG: Calling save_checkpoint_bytes_from_frames with innermost_lasti"); | ||
| let data = save_checkpoint_bytes_from_frames(vm, &frame_refs, Some(innermost_resume_lasti))?; | ||
| eprintln!("DEBUG: Writing {} bytes to {}", data.len(), path); | ||
| let data = save_checkpoint_bytes_from_frames_with_stack( | ||
| vm, | ||
| &frame_refs, | ||
| Some(innermost_resume_lasti), | ||
| innermost_stack | ||
| )?; | ||
| fs::write(path, &data).map_err(|err| vm.new_os_error(format!("checkpoint write failed: {err}")))?; | ||
| eprintln!("DEBUG: File written"); | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -118,10 +124,7 @@ pub(crate) fn resume_script_from_bytes( | |
| script_path: &str, | ||
| data: &[u8], | ||
| ) -> PyResult<()> { | ||
| eprintln!("DEBUG: Loading checkpoint state..."); | ||
| let (state, objects) = snapshot::load_checkpoint_state(vm, data)?; | ||
| eprintln!("DEBUG: Loaded {} objects from checkpoint", objects.len()); | ||
| eprintln!("DEBUG: Checkpoint has {} frames", state.frames.len()); | ||
|
|
||
| if state.source_path != script_path { | ||
| return Err(vm.new_value_error(format!( | ||
|
|
@@ -136,7 +139,6 @@ pub(crate) fn resume_script_from_bytes( | |
| .cloned() | ||
| .ok_or_else(|| vm.new_runtime_error("checkpoint globals missing".to_owned()))?; | ||
| let globals_dict = PyDictRef::try_from_object(vm, globals_obj)?; | ||
| eprintln!("DEBUG: Got globals dict"); | ||
|
|
||
| if !globals_dict.contains_key("__file__", vm) { | ||
| globals_dict.set_item("__file__", vm.ctx.new_str(script_path).into(), vm)?; | ||
|
|
@@ -151,35 +153,26 @@ pub(crate) fn resume_script_from_bytes( | |
| let code_obj: crate::PyRef<PyCode> = vm.ctx.new_pyref(PyCode::new(code)); | ||
|
|
||
| // Get locals for this frame | ||
| eprintln!("DEBUG: Frame {i}: Getting locals obj from index {}", frame_state.locals); | ||
| let locals_obj = objects | ||
| .get(frame_state.locals as usize) | ||
| .cloned() | ||
| .ok_or_else(|| vm.new_runtime_error(format!("checkpoint frame {i} locals missing")))?; | ||
| eprintln!("DEBUG: Frame {i}: locals_obj class = {}", locals_obj.class().name()); | ||
|
|
||
| let locals_dict = PyDictRef::try_from_object(vm, locals_obj.clone())?; | ||
| eprintln!("DEBUG: Frame {i}: Successfully converted to PyDictRef"); | ||
|
|
||
| let varnames = &code_obj.code.varnames; | ||
| eprintln!("DEBUG: Frame {i}: varnames = {:?}", varnames.iter().map(|v| v.as_str()).collect::<Vec<_>>()); | ||
|
|
||
| // Try to iterate all keys in the dict | ||
| eprintln!("DEBUG: Frame {i}: Iterating all dict keys..."); | ||
| let dict_items: Vec<_> = locals_dict.clone().into_iter().collect(); | ||
| eprintln!("DEBUG: Frame {i}: Dict has {} items", dict_items.len()); | ||
| for (key, value) in dict_items.iter() { | ||
| if let Some(key_str) = key.downcast_ref::<crate::builtins::PyStr>() { | ||
| eprintln!("DEBUG: Frame {i}: Dict contains key '{}' = {}", key_str.as_str(), value.class().name()); | ||
| } | ||
| } | ||
|
|
||
| // Debug: check what's in locals_dict BEFORE creating the frame | ||
| for varname in varnames.iter() { | ||
| if let Some(value) = locals_dict.get_item_opt(*varname, vm)? { | ||
| eprintln!("DEBUG: Frame {i}: locals_dict[{varname}] = {} BEFORE frame creation", value.class().name()); | ||
| } else { | ||
| eprintln!("DEBUG: Frame {i}: locals_dict[{varname}] = <MISSING> BEFORE frame creation"); | ||
| } | ||
| } | ||
|
Comment on lines
+209
to
+219
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove empty for loops that appear to be debug remnants. These loops iterate over locals but have empty bodies - they appear to be leftover debug code that should be removed. 🧹 Suggested fix- // Try to iterate all keys in the dict
- let dict_items: Vec<_> = locals_dict.clone().into_iter().collect();
- for (key, value) in dict_items.iter() {
- if let Some(key_str) = key.downcast_ref::<crate::builtins::PyStr>() {
- }
- }
-
- // Debug: check what's in locals_dict BEFORE creating the frame
- for varname in varnames.iter() {
- if let Some(value) = locals_dict.get_item_opt(*varname, vm)? {
- } else {
- }
- }🤖 Prompt for AI Agents |
||
|
|
||
|
|
@@ -194,18 +187,24 @@ pub(crate) fn resume_script_from_bytes( | |
| .into_ref(&vm.ctx); | ||
|
|
||
| // Restore fastlocals from the locals dict | ||
| eprintln!("DEBUG: Frame {i}: Restoring fastlocals..."); | ||
| let mut fastlocals = frame.fastlocals.lock(); | ||
| for (idx, varname) in varnames.iter().enumerate() { | ||
| if let Some(value) = locals_dict.get_item_opt(*varname, vm)? { | ||
| eprintln!("DEBUG: Frame {i}: Restoring fastlocals[{idx}] = {varname} = {}", value.class().name()); | ||
| fastlocals[idx] = Some(value); | ||
| } else { | ||
| eprintln!("DEBUG: Frame {i}: No value for fastlocals[{idx}] = {varname}"); | ||
| } | ||
| } | ||
| drop(fastlocals); | ||
|
|
||
| // Restore the value stack | ||
| for stack_item_id in &frame_state.stack { | ||
| let stack_obj = objects | ||
| .get(*stack_item_id as usize) | ||
| .cloned() | ||
| .ok_or_else(|| vm.new_runtime_error(format!("checkpoint frame {i} stack item {} missing", stack_item_id)))?; | ||
| frame.push_stack_value(stack_obj); | ||
| } | ||
|
|
||
| if frame_state.lasti as usize >= frame.code.instructions.len() { | ||
| return Err(vm.new_value_error( | ||
| format!("checkpoint frame {i} lasti is out of range for current bytecode"), | ||
|
|
@@ -215,18 +214,75 @@ pub(crate) fn resume_script_from_bytes( | |
| frame_refs.push(frame); | ||
| } | ||
|
|
||
| // Push all frames onto the VM stack (bottom to top) | ||
| for frame in frame_refs.iter() { | ||
| vm.frames.borrow_mut().push(frame.clone()); | ||
|
|
||
| if frame_refs.len() == 1 { | ||
| // Simple case: only one frame, just run it | ||
| let result = vm.run_frame(frame_refs[0].clone()); | ||
| vm.frames.borrow_mut().clear(); | ||
| return result.map(drop); | ||
| } | ||
|
|
||
| // Multiple frames: need to execute inner frames first, then continue outer frames | ||
| // Push all outer frames to VM stack (they are waiting for inner frames to return) | ||
| for i in 0..frame_refs.len() - 1 { | ||
| vm.frames.borrow_mut().push(frame_refs[i].clone()); | ||
| } | ||
|
|
||
| // Run the top frame | ||
| let result = vm.run_frame(frame_refs.last().unwrap().clone()); | ||
| // Run the innermost frame using vm.run_frame | ||
| let innermost_frame = frame_refs.last().unwrap().clone(); | ||
| let inner_result = vm.run_frame(innermost_frame); | ||
|
|
||
| // Clean up frames | ||
| vm.frames.borrow_mut().clear(); | ||
| // If inner frame failed, clean up and return error | ||
| let inner_return_val = match inner_result { | ||
| Ok(val) => val, | ||
| Err(e) => { | ||
| vm.frames.borrow_mut().clear(); | ||
| return Err(e); | ||
| } | ||
| }; | ||
|
|
||
| // Push the inner frame's return value to the caller's (outer frame's) stack | ||
| let caller_frame = &frame_refs[frame_refs.len() - 2]; | ||
| caller_frame.push_stack_value(inner_return_val); | ||
|
|
||
| result.map(drop) | ||
| // Inner frame succeeded. Now continue executing outer frames | ||
| // The return value from inner frame should be on the caller's stack already | ||
| // We need to continue executing from the outermost frame | ||
| for i in (0..frame_refs.len() - 1).rev() { | ||
| let frame = frame_refs[i].clone(); | ||
|
|
||
| // Use frame.run() directly since frame is already on VM stack | ||
| let result = frame.run(vm); | ||
|
|
||
| match result { | ||
| Ok(crate::frame::ExecutionResult::Return(val)) => { | ||
| // Frame returned normally | ||
| // Pop this frame | ||
| vm.frames.borrow_mut().pop(); | ||
|
|
||
| // If there's an outer frame, push the return value to its stack | ||
| if i > 0 { | ||
| frame_refs[i - 1].push_stack_value(val); | ||
| } else { | ||
| // This was the outermost frame, we're done | ||
| vm.frames.borrow_mut().clear(); | ||
| return Ok(()); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| // Error occurred | ||
| vm.frames.borrow_mut().clear(); | ||
| return Err(e); | ||
| } | ||
| Ok(_other) => { | ||
| vm.frames.borrow_mut().clear(); | ||
| return Err(vm.new_runtime_error("unexpected execution result (not Return)".to_owned())); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| vm.frames.borrow_mut().clear(); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[allow(dead_code)] | ||
|
|
@@ -277,6 +333,15 @@ fn save_checkpoint_bytes_from_frames( | |
| vm: &VirtualMachine, | ||
| frames: &[FrameRef], | ||
| innermost_resume_lasti: Option<u32>, // If provided, use this for the innermost frame | ||
| ) -> PyResult<Vec<u8>> { | ||
| save_checkpoint_bytes_from_frames_with_stack(vm, frames, innermost_resume_lasti, Vec::new()) | ||
| } | ||
|
|
||
| fn save_checkpoint_bytes_from_frames_with_stack( | ||
| vm: &VirtualMachine, | ||
| frames: &[FrameRef], | ||
| innermost_resume_lasti: Option<u32>, | ||
| innermost_stack: Vec<crate::PyObjectRef>, | ||
| ) -> PyResult<Vec<u8>> { | ||
| if frames.is_empty() { | ||
| return Err(vm.new_runtime_error("no frames to checkpoint".to_owned())); | ||
|
|
@@ -285,25 +350,6 @@ fn save_checkpoint_bytes_from_frames( | |
| // Get source path from the outermost (first) frame | ||
| let source_path = frames[0].code.source_path.as_str(); | ||
|
|
||
| // Debug: Check fastlocals before serialization | ||
| for (idx, frame) in frames.iter().enumerate() { | ||
| eprintln!("DEBUG: Frame {idx} before serialize:"); | ||
| eprintln!(" code.varnames = {:?}", frame.code.code.varnames.iter().map(|v| v.as_str()).collect::<Vec<_>>()); | ||
| eprintln!(" code.flags = {:?}", frame.code.code.flags); | ||
| let fastlocals = frame.fastlocals.lock(); | ||
| for (i, value) in fastlocals.iter().enumerate() { | ||
| if i < frame.code.code.varnames.len() { | ||
| let varname = &frame.code.code.varnames[i]; | ||
| if let Some(v) = value { | ||
| eprintln!(" fastlocals[{i}] ({varname}) = {}", v.class().name()); | ||
| } else { | ||
| eprintln!(" fastlocals[{i}] ({varname}) = None"); | ||
| } | ||
| } | ||
| } | ||
| drop(fastlocals); | ||
| } | ||
|
|
||
| // Collect frame states | ||
| let mut frame_states = Vec::new(); | ||
| for (idx, frame) in frames.iter().enumerate() { | ||
|
|
@@ -321,11 +367,10 @@ fn save_checkpoint_bytes_from_frames( | |
| // For non-innermost frames, just use current lasti | ||
| frame.lasti() | ||
| }; | ||
| eprintln!("DEBUG: Frame {idx} resume_lasti = {}", resume_lasti); | ||
| frame_states.push((frame, resume_lasti)); | ||
| } | ||
|
|
||
| snapshot::dump_checkpoint_frames(vm, source_path, &frame_states) | ||
| snapshot::dump_checkpoint_frames_with_stack(vm, source_path, &frame_states, innermost_stack) | ||
| } | ||
|
|
||
| #[allow(dead_code)] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate entry and self-ignore issue.
Cargo.lockappears twice (lines 25 and 33) - remove one to avoid redundancy..gitignoreignoring itself (line 35) is almost certainly unintended. This would cause the.gitignorefile to be tracked initially but ignored for future modifications, leading to confusion when changes to ignore rules aren't picked up.🔧 Suggested fix
📝 Committable suggestion
🤖 Prompt for AI Agents