Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
#[cfg(any(target_os = "linux", target_os = "android"))]
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::fs::File;
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::{
io::{Read, Write},
fs::File,
io::{PipeReader, PipeWriter, Read, Write},
os::fd::AsFd,
sync::OnceLock,
};
Expand All @@ -27,19 +26,17 @@ const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;
/// used for resolving the limitation for splice: one of a input or output should be pipe
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn pipe<const SIZE_REQUIRED: bool>(
s: usize,
) -> std::io::Result<(std::io::PipeReader, std::io::PipeWriter)> {
let (read, write) = std::io::pipe()?;
pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(PipeReader, PipeWriter)> {
let pair = std::io::pipe()?;
// guard unnecessary syscall
if s > KERNEL_DEFAULT_PIPE_SIZE {
let r = fcntl_setpipe_size(&read, s);
let r = fcntl_setpipe_size(&pair.0, s);
if SIZE_REQUIRED {
r?;
}
}

Ok((read, write))
Ok(pair)
}

/// Less noisy wrapper around [`rustix::pipe::splice`].
Expand Down Expand Up @@ -87,11 +84,7 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
/// fails if one of in/output should be pipe
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd,
{
pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
// improve throughput
// todo: avoid fcntl overhead for small input, but don't fcntl inside of the loop
// no need to increase pipe size of input fd since
Expand All @@ -118,8 +111,7 @@ where
R: Read + AsFd,
S: AsFd,
{
static PIPE_CACHE: OnceLock<Option<(std::io::PipeReader, std::io::PipeWriter)>> =
OnceLock::new();
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
.as_ref()
Expand Down Expand Up @@ -163,8 +155,7 @@ pub fn send_n_bytes(
mut target: impl Write + AsFd,
n: u64,
) -> std::io::Result<u64> {
static PIPE_CACHE: OnceLock<Option<(std::io::PipeReader, std::io::PipeWriter)>> =
OnceLock::new();
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
let mut n = n;
let mut bytes_written: u64 = 0;
Expand All @@ -187,7 +178,7 @@ pub fn send_n_bytes(
loop {
match splice(&input, &target, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s @ 1..) => {
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
}
Expand All @@ -202,7 +193,7 @@ pub fn send_n_bytes(
loop {
match splice(&input, &broker_w, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s @ 1..) => {
Ok(s) => {
if splice_exact(&broker_r, &target, s).is_ok() {
n -= s as u64;
bytes_written += s as u64;
Expand Down
Loading