futures_util/compat/
compat01as03.rs

1use futures_01::executor::{
2    spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01,
3    UnsafeNotify as UnsafeNotify01,
4};
5use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
6#[cfg(feature = "sink")]
7use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
8use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03};
9#[cfg(feature = "sink")]
10use futures_sink::Sink as Sink03;
11use std::boxed::Box;
12use std::pin::Pin;
13use std::task::Context;
14
15#[cfg(feature = "io-compat")]
16#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
17#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
18pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
19
20/// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
21/// object to a futures 0.3-compatible version,
22#[derive(Debug)]
23#[must_use = "futures do nothing unless you `.await` or poll them"]
24pub struct Compat01As03<T> {
25    pub(crate) inner: Spawn01<T>,
26}
27
28impl<T> Unpin for Compat01As03<T> {}
29
30impl<T> Compat01As03<T> {
31    /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
32    /// object in a futures 0.3-compatible wrapper.
33    pub fn new(object: T) -> Self {
34        Self { inner: spawn01(object) }
35    }
36
37    fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
38        let notify = &WakerToHandle(cx.waker());
39        self.inner.poll_fn_notify(notify, 0, f)
40    }
41
42    /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
43    pub fn get_ref(&self) -> &T {
44        self.inner.get_ref()
45    }
46
47    /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
48    /// within.
49    pub fn get_mut(&mut self) -> &mut T {
50        self.inner.get_mut()
51    }
52
53    /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
54    /// AsyncWrite object.
55    pub fn into_inner(self) -> T {
56        self.inner.into_inner()
57    }
58}
59
60/// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
61pub trait Future01CompatExt: Future01 {
62    /// Converts a futures 0.1
63    /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
64    /// into a futures 0.3
65    /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
66    ///
67    /// ```
68    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
69    /// # futures::executor::block_on(async {
70    /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
71    /// # // feature issues
72    /// use futures_util::compat::Future01CompatExt;
73    ///
74    /// let future = futures_01::future::ok::<u32, ()>(1);
75    /// assert_eq!(future.compat().await, Ok(1));
76    /// # });
77    /// ```
78    fn compat(self) -> Compat01As03<Self>
79    where
80        Self: Sized,
81    {
82        Compat01As03::new(self)
83    }
84}
85impl<Fut: Future01> Future01CompatExt for Fut {}
86
87/// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
88pub trait Stream01CompatExt: Stream01 {
89    /// Converts a futures 0.1
90    /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
91    /// into a futures 0.3
92    /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
93    ///
94    /// ```
95    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
96    /// # futures::executor::block_on(async {
97    /// use futures::stream::StreamExt;
98    /// use futures_util::compat::Stream01CompatExt;
99    ///
100    /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
101    /// let mut stream = stream.compat();
102    /// assert_eq!(stream.next().await, Some(Ok(1)));
103    /// assert_eq!(stream.next().await, None);
104    /// # });
105    /// ```
106    fn compat(self) -> Compat01As03<Self>
107    where
108        Self: Sized,
109    {
110        Compat01As03::new(self)
111    }
112}
113impl<St: Stream01> Stream01CompatExt for St {}
114
115/// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
116#[cfg(feature = "sink")]
117#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
118pub trait Sink01CompatExt: Sink01 {
119    /// Converts a futures 0.1
120    /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
121    /// into a futures 0.3
122    /// [`Sink<T, Error = E>`](futures_sink::Sink).
123    ///
124    /// ```
125    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
126    /// # futures::executor::block_on(async {
127    /// use futures::{sink::SinkExt, stream::StreamExt};
128    /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
129    ///
130    /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
131    /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
132    ///
133    /// tx.send(1).await.unwrap();
134    /// drop(tx);
135    /// assert_eq!(rx.next().await, Some(Ok(1)));
136    /// assert_eq!(rx.next().await, None);
137    /// # });
138    /// ```
139    fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
140    where
141        Self: Sized,
142    {
143        Compat01As03Sink::new(self)
144    }
145}
146#[cfg(feature = "sink")]
147impl<Si: Sink01> Sink01CompatExt for Si {}
148
149fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
150    match x? {
151        Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
152        Async01::NotReady => task03::Poll::Pending,
153    }
154}
155
156impl<Fut: Future01> Future03 for Compat01As03<Fut> {
157    type Output = Result<Fut::Item, Fut::Error>;
158
159    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
160        poll_01_to_03(self.in_notify(cx, Future01::poll))
161    }
162}
163
164impl<St: Stream01> Stream03 for Compat01As03<St> {
165    type Item = Result<St::Item, St::Error>;
166
167    fn poll_next(
168        mut self: Pin<&mut Self>,
169        cx: &mut Context<'_>,
170    ) -> task03::Poll<Option<Self::Item>> {
171        match self.in_notify(cx, Stream01::poll)? {
172            Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
173            Async01::Ready(None) => task03::Poll::Ready(None),
174            Async01::NotReady => task03::Poll::Pending,
175        }
176    }
177}
178
179/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
180#[cfg(feature = "sink")]
181#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
182#[derive(Debug)]
183#[must_use = "sinks do nothing unless polled"]
184pub struct Compat01As03Sink<S, SinkItem> {
185    pub(crate) inner: Spawn01<S>,
186    pub(crate) buffer: Option<SinkItem>,
187    pub(crate) close_started: bool,
188}
189
190#[cfg(feature = "sink")]
191impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
192
193#[cfg(feature = "sink")]
194impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
195    /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
196    pub fn new(inner: S) -> Self {
197        Self { inner: spawn01(inner), buffer: None, close_started: false }
198    }
199
200    fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R {
201        let notify = &WakerToHandle(cx.waker());
202        self.inner.poll_fn_notify(notify, 0, f)
203    }
204
205    /// Get a reference to 0.1 Sink object contained within.
206    pub fn get_ref(&self) -> &S {
207        self.inner.get_ref()
208    }
209
210    /// Get a mutable reference to 0.1 Sink contained within.
211    pub fn get_mut(&mut self) -> &mut S {
212        self.inner.get_mut()
213    }
214
215    /// Consume this wrapper to return the underlying 0.1 Sink.
216    pub fn into_inner(self) -> S {
217        self.inner.into_inner()
218    }
219}
220
221#[cfg(feature = "sink")]
222impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
223where
224    S: Stream01,
225{
226    type Item = Result<S::Item, S::Error>;
227
228    fn poll_next(
229        mut self: Pin<&mut Self>,
230        cx: &mut Context<'_>,
231    ) -> task03::Poll<Option<Self::Item>> {
232        match self.in_notify(cx, Stream01::poll)? {
233            Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
234            Async01::Ready(None) => task03::Poll::Ready(None),
235            Async01::NotReady => task03::Poll::Pending,
236        }
237    }
238}
239
240#[cfg(feature = "sink")]
241impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
242where
243    S: Sink01<SinkItem = SinkItem>,
244{
245    type Error = S::SinkError;
246
247    fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
248        debug_assert!(self.buffer.is_none());
249        self.buffer = Some(item);
250        Ok(())
251    }
252
253    fn poll_ready(
254        mut self: Pin<&mut Self>,
255        cx: &mut Context<'_>,
256    ) -> task03::Poll<Result<(), Self::Error>> {
257        match self.buffer.take() {
258            Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
259                AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
260                AsyncSink01::NotReady(i) => {
261                    self.buffer = Some(i);
262                    task03::Poll::Pending
263                }
264            },
265            None => task03::Poll::Ready(Ok(())),
266        }
267    }
268
269    fn poll_flush(
270        mut self: Pin<&mut Self>,
271        cx: &mut Context<'_>,
272    ) -> task03::Poll<Result<(), Self::Error>> {
273        let item = self.buffer.take();
274        match self.in_notify(cx, |f| match item {
275            Some(i) => match f.start_send(i)? {
276                AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
277                AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))),
278            },
279            None => f.poll_complete().map(|i| (i, None)),
280        })? {
281            (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
282            (Async01::NotReady, item) => {
283                self.buffer = item;
284                task03::Poll::Pending
285            }
286        }
287    }
288
289    fn poll_close(
290        mut self: Pin<&mut Self>,
291        cx: &mut Context<'_>,
292    ) -> task03::Poll<Result<(), Self::Error>> {
293        let item = self.buffer.take();
294        let close_started = self.close_started;
295
296        let result = self.in_notify(cx, |f| {
297            if !close_started {
298                if let Some(item) = item {
299                    if let AsyncSink01::NotReady(item) = f.start_send(item)? {
300                        return Ok((Async01::NotReady, Some(item), false));
301                    }
302                }
303
304                if let Async01::NotReady = f.poll_complete()? {
305                    return Ok((Async01::NotReady, None, false));
306                }
307            }
308
309            Ok((<S as Sink01>::close(f)?, None, true))
310        });
311
312        match result? {
313            (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
314            (Async01::NotReady, item, close_started) => {
315                self.buffer = item;
316                self.close_started = close_started;
317                task03::Poll::Pending
318            }
319        }
320    }
321}
322
323struct NotifyWaker(task03::Waker);
324
325#[allow(missing_debug_implementations)] // false positive: this is private type
326#[derive(Clone)]
327struct WakerToHandle<'a>(&'a task03::Waker);
328
329impl From<WakerToHandle<'_>> for NotifyHandle01 {
330    fn from(handle: WakerToHandle<'_>) -> Self {
331        let ptr = Box::new(NotifyWaker(handle.0.clone()));
332
333        unsafe { Self::new(Box::into_raw(ptr)) }
334    }
335}
336
337impl Notify01 for NotifyWaker {
338    fn notify(&self, _: usize) {
339        self.0.wake_by_ref();
340    }
341}
342
343unsafe impl UnsafeNotify01 for NotifyWaker {
344    unsafe fn clone_raw(&self) -> NotifyHandle01 {
345        WakerToHandle(&self.0).into()
346    }
347
348    unsafe fn drop_raw(&self) {
349        let ptr: *const dyn UnsafeNotify01 = self;
350        drop(unsafe { Box::from_raw(ptr as *mut dyn UnsafeNotify01) });
351    }
352}
353
354#[cfg(feature = "io-compat")]
355#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
356mod io {
357    use super::*;
358    use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
359    use std::io::Error;
360    use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
361
362    /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
363    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
364    pub trait AsyncRead01CompatExt: AsyncRead01 {
365        /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
366        /// [`AsyncRead`](futures_io::AsyncRead).
367        ///
368        /// ```
369        /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
370        /// # futures::executor::block_on(async {
371        /// use futures::io::AsyncReadExt;
372        /// use futures_util::compat::AsyncRead01CompatExt;
373        ///
374        /// let input = b"Hello World!";
375        /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
376        /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
377        ///
378        /// let mut output = Vec::with_capacity(12);
379        /// reader.read_to_end(&mut output).await.unwrap();
380        /// assert_eq!(output, input);
381        /// # });
382        /// ```
383        fn compat(self) -> Compat01As03<Self>
384        where
385            Self: Sized,
386        {
387            Compat01As03::new(self)
388        }
389    }
390    impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
391
392    /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
393    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
394    pub trait AsyncWrite01CompatExt: AsyncWrite01 {
395        /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
396        /// [`AsyncWrite`](futures_io::AsyncWrite).
397        ///
398        /// ```
399        /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
400        /// # futures::executor::block_on(async {
401        /// use futures::io::AsyncWriteExt;
402        /// use futures_util::compat::AsyncWrite01CompatExt;
403        ///
404        /// let input = b"Hello World!";
405        /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
406        ///
407        /// let mut writer = (&mut cursor).compat();
408        /// writer.write_all(input).await.unwrap();
409        ///
410        /// assert_eq!(cursor.into_inner(), input);
411        /// # });
412        /// ```
413        fn compat(self) -> Compat01As03<Self>
414        where
415            Self: Sized,
416        {
417            Compat01As03::new(self)
418        }
419    }
420    impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
421
422    impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
423        fn poll_read(
424            mut self: Pin<&mut Self>,
425            cx: &mut Context<'_>,
426            buf: &mut [u8],
427        ) -> task03::Poll<Result<usize, Error>> {
428            poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
429        }
430    }
431
432    impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
433        fn poll_write(
434            mut self: Pin<&mut Self>,
435            cx: &mut Context<'_>,
436            buf: &[u8],
437        ) -> task03::Poll<Result<usize, Error>> {
438            poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
439        }
440
441        fn poll_flush(
442            mut self: Pin<&mut Self>,
443            cx: &mut Context<'_>,
444        ) -> task03::Poll<Result<(), Error>> {
445            poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
446        }
447
448        fn poll_close(
449            mut self: Pin<&mut Self>,
450            cx: &mut Context<'_>,
451        ) -> task03::Poll<Result<(), Error>> {
452            poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
453        }
454    }
455}
OSZAR »