Expand description
Thread-safe, shared (asynchronous) stream buffer designed to lock only on accessing and storing new data.
Streamcatcher is designed to allow seeking on otherwise one-way streams (e.g., command output) whose output needs to be accessed by many threads without constant reallocations, contention over safe read-only data, or unnecessary stalling. Only threads who read in new data ever need to lock the data structure, and do not prevent earlier reads from occurring.
§Features
- Lockless access to pre-read data and finished streams.
- Transparent caching of newly read data.
- Allows seeking on read-only bytestreams.
- Piecewise allocation to reduce copying and support unknown input lengths.
- Optional acceleration of reads on stream completion by copying to a single backing store.
- (Stateful) bytestream transformations.
- Async support with the
"async"
feature, and runtimes via ["async-std-compat"
,"smol-compat"
,"tokio-compat"
].
The main algorithm is outlined in this blog post, with rope reference tracking moved to occur only in the core.
§Examples
use streamcatcher::Catcher;
use std::io::{
self,
Read,
Seek,
SeekFrom,
};
const THREAD_COUNT: usize = 256;
const PROCESS_LEN: u64 = 10_000_000;
// A read-only process, which many threads need to (re-)use.
let mut process = io::repeat(0xAC)
.take(PROCESS_LEN);
let mut catcher = Catcher::new(process);
// Many workers who need this data...
let mut handles = (0..THREAD_COUNT)
.map(|v| {
let mut handle = catcher.new_handle();
std::thread::spawn(move || {
let mut buf = [0u8; 4_096];
let mut correct_bytes = 0;
while let Ok(count) = handle.read(&mut buf[..]) {
if count == 0 { break }
for &byte in buf[..count].iter() {
if byte == 0xAC { correct_bytes += 1 }
}
}
correct_bytes
})
})
.collect::<Vec<_>>();
// And everything read out just fine!
let count_correct = handles.drain(..)
.map(|h| h.join().unwrap())
.filter(|&v| v == PROCESS_LEN)
.count();
assert_eq!(count_correct, THREAD_COUNT);
// Moving forwards and backwards *just works*.
catcher.seek(SeekFrom::End(0));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 0);
catcher.seek(SeekFrom::Current(-256));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 256);
Modules§
- future
- Support types for
AsyncRead
/AsyncSeek
compatible stream buffers. Requires the"async"
feature.
Structs§
- Config
- Options controlling backing store allocation, finalisation, and so on.
- Identity
- A no-op data transform.
- TxCatcher
- A shared stream buffer, using an applied input data transform.
Enums§
- Catcher
Error - Streamcatcher configuration errors.
- Finaliser
- Method to allocate a new contiguous backing store, if required by
Config::use_backing
. - Growth
Strategy - Growth pattern for allocating new chunks as the rope expands.
- Transform
Position - The number of bytes output by a
Transform
into aTxCatcher
.
Traits§
- Needs
Bytes - Common trait required by transforms, specifying how many contiguous bytes are needed
for any
read(...)
to succeed. - Read
Skip Ext - Utility trait to scan forward by discarding bytes.
- State
Access - External access to (Async)
Transform
state via aTxCatcher
(resp. async variants). - Stateful
- Transforms who can be queried about their internal state.
- Transform
- Allows an input bytestream to be modified before it is stored.