In modern multi-threaded and asynchronous programming, CancellationToken is a common tool for timely and safe cancellation of long-running tasks or operations. However, careless handling of such objects can lead to the serious problem of dangling pointers, which can lead to program crashes or unpredictable behavior. This article will explore the dangling pointer problem caused by the use of tokio_util::sync::CancellationToken by analyzing a practical case in the Rust environment.
CancellationToken: A must-read dangling pointer disaster case for programmers
Case Reconstruction and Analysis
During the development of multi-threaded asynchronous task programs, we encountered a dangling pointer problem caused by the use of CancellationToken. The core of the problem is that when a thread holds a pointer to an executing task object and requests to cancel the task through CancellationToken, if the cancellation action triggers the early destruction of the object, and other threads still try to access the object that has been Destructed objects will generate a dangling pointer exception.
use std::io::{Error, ErrorKind, Result};
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
mod memory;
pub(crate) async fn asyncify<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::task::spawn_blocking(f).await {
Ok(res) => res,
Err(e) => Err(Error::new(
ErrorKind::Other,
format!("background task failed: {:?}", e),
)),
}
}
fn check_err_size(e: libc::ssize_t) -> Result<usize> {
if e == -1_isize {
Err(Error::last_os_error())
} else {
Ok(e as usize)
}
}
pub fn libc_pread(raw_fd: usize, pos: u64, len: usize, ptr: u64) -> Result<usize> {
check_err_size(unsafe {
libc::pread(
raw_fd as std::os::fd::RawFd,
ptr as *mut _,
len as _,
pos as libc::off_t,
)
})
}
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
let len = data.len();
let ptr = data.as_ptr() as u64;
let fd = file.as_ref().as_raw_fd() as usize;
let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
Ok(len)
// libc_pread(fd, pos, len, ptr)
}
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];
println!("----- * before read");
let len = read_file(file, 0, &mut buf).await.unwrap();
println!("----- * after read");
len
}
async fn test_read() {
let cancel: CancellationToken = CancellationToken::new();
let file_path = PathBuf::from("./a_big_big_file");
let file = Arc::new(std::fs::File::open(file_path).unwrap());
println!("----- begin test ------------");
let can_tok = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = can_tok.cancelled() => {
println!("----- cancelled break loop");
break;
}
res = read_data_from_file(file.clone()) => {
println!("----- read data len: {}",res);
}
}
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
println!(" ----- cancel.cancel()");
tokio::time::sleep(tokio::time::Duration::from_millis(5 * 1000)).await;
println!("----- test over");
}
// #[cfg(unix)]
// #[global_allocator]
// static A: memory::DebugMemoryAlloc = memory::DebugMemoryAlloc;
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(4)
.thread_stack_size(4 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(async move {
test_read().await;
});
std::thread::sleep(std::time::Duration::from_secs(10000));
}
In the above example program, start an asynchronous task to read the file, and then wait 100ms in the main thread to cancel the task. Please note that the way to read files is: tokio::task::spawn_blocking. At the same time, prepare a file exceeding 2G in size to ensure that the reading will not be completed within 100ms.
-
When running the program, the following phenomenon will occur in the function read_data_from_file()
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];
println!("----- * before read"); // 会执行输出
let len = read_file(file, 0, &mut buf).await.unwrap();
println!("----- * after read"); // 不会执行输出
len
}
This shows that after the asynchronous task is canceled, it does not wait for the execution of read_file() to complete before canceling, but directly interrupts the execution of the read_data_from_file function. If you are smart, have you discovered that this will cause the local variable buf in the function to be released early, but subsequent asynchronous file readings will continue to use this memory, causing a dangling pointer problem. This can be proven by using the memory allocation and release tracking provided in the appendix at the end of the article: the local memory is released immediately after the asynchronous task is canceled.
-
We can try to use blocking reading of files instead of using tokio::task::spawn_blocking; that is, change the reading related code to the following method and observe what happens.
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
let len = data.len();
let ptr = data.as_ptr() as u64;
let fd = file.as_ref().as_raw_fd() as usize;
// let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
// Ok(len)
libc_pread(fd, pos, len, ptr)
}
After the asynchronous task is canceled in this way, it will wait for the completion of read_file() execution, which will not cause a dangling pointer problem. Because in a blocking method, the asynchronous task will have a chance to be canceled only after the execution is completed.
Return to the scene
In the long-term stability testing environment of CnosDB, we often use coredump files to troubleshoot faults. Analyzing the coredump file, we found that the location of the hangup is different every time, and it often hangs in different third-party libraries, and even in some codes where problems are unlikely to occur. We also suspected that it was caused by bad memory writing, but since we couldn't find the specific cause, we also investigated in the following ways:
- Analyze recent code changes and test suspicious commits
- Check for suspicious third-party libraries
- Try different compilation and packaging environments
- Test troubleshooting with Valgrind
- Conduct troubleshooting and testing for different program modules
- Add debugging, test code, etc.
- Test troubleshooting through asan
Finally, we found the following phenomenon through asan, indicating that there is a problem of continued use after memory is released.
Then, use addr2line to find the corresponding call stack and locate the memory used incorrectly, that is, the data parameter in the following function is released but continues to be used.
Although the problem of continued use of memory after it was released was discovered through tools; however, I did not understand how this memory was released for a while; in the end, I cast my doubts on the CancellationToken and confirmed this through testing. The following is the specific use. series_iter_closer is a variable of type CancellationToken, and iter.next() will eventually call the above pread() method.
At this point, the analysis of the tragedy caused by the dangling pointer has been completed.
Solution
Our short-term solution here is to modify the pread function to directly call os::pread(fd, pos, len, ptr) into a blocking call; in the long term, we will modify the pread function to internally allocate buffers and no longer pass Pass in external parameters to avoid dangling pointer problems.
Appendix
Custom memory allocation releaser
extern crate core;
use core::alloc::{GlobalAlloc, Layout};
use libc::{c_int, c_void};
use tikv_jemalloc_sys as ffi;
#[cfg(all(any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mipsel",
target_arch = "powerpc"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 8;
#[cfg(all(any(
target_arch = "x86",
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "powerpc64le",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
target_arch = "sparc64"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 16;
fn layout_to_flags(align: usize, size: usize) -> c_int {
if align <= ALIGNOF_MAX_ALIGN_T && align <= size {
0
} else {
ffi::MALLOCX_ALIGN(align)
}
}
// Assumes a condition that always must hold.
macro_rules! assume {
($e:expr) => {
debug_assert!($e);
if !($e) {
core::hint::unreachable_unchecked();
}
};
}
#[derive(Copy, Clone, Default, Debug)]
pub struct DebugMemoryAlloc;
unsafe impl GlobalAlloc for DebugMemoryAlloc {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
let ptr = if flags == 0 {
ffi::malloc(layout.size())
} else {
ffi::mallocx(layout.size(), flags)
};
ptr as *mut u8
}
#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
let ptr = if flags == 0 {
ffi::calloc(1, layout.size())
} else {
ffi::mallocx(layout.size(), flags | ffi::MALLOCX_ZERO)
};
ptr as *mut u8
}
#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) };
let new_ptr = unsafe { self.alloc(new_layout) };
unsafe {
let size = std::cmp::min(layout.size(), new_size);
std::ptr::copy_nonoverlapping(ptr, new_ptr, size);
self.dealloc(ptr, layout);
}
new_ptr
}
#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
assume!(!ptr.is_null());
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
ffi::sdallocx(ptr as *mut c_void, layout.size(), flags);
if layout.size() >= 2*1024*1024*1024 {
panic!("-------- free big memory: {}", layout.size());
}
}
}