opentelemetry/context/
future_ext.rs1use crate::Context;
2use futures_core::Stream;
3use futures_sink::Sink;
4use pin_project_lite::pin_project;
5use std::pin::Pin;
6use std::task::Context as TaskContext;
7use std::task::Poll;
8impl<T: Sized> FutureExt for T {}
9
10impl<T: std::future::Future> std::future::Future for WithContext<T> {
11 type Output = T::Output;
12
13 fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
14 let this = self.project();
15 let _guard = this.otel_cx.clone().attach();
16
17 this.inner.poll(task_cx)
18 }
19}
20
21impl<T: Stream> Stream for WithContext<T> {
22 type Item = T::Item;
23
24 fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
25 let this = self.project();
26 let _guard = this.otel_cx.clone().attach();
27 T::poll_next(this.inner, task_cx)
28 }
29}
30
31pin_project! {
32 #[derive(Clone, Debug)]
34 pub struct WithContext<T> {
35 #[pin]
36 inner: T,
37 otel_cx: Context,
38 }
39}
40
41impl<I, T: Sink<I>> Sink<I> for WithContext<T>
42where
43 T: Sink<I>,
44{
45 type Error = T::Error;
46
47 fn poll_ready(
48 self: Pin<&mut Self>,
49 task_cx: &mut TaskContext<'_>,
50 ) -> Poll<Result<(), Self::Error>> {
51 let this = self.project();
52 let _guard = this.otel_cx.clone().attach();
53 T::poll_ready(this.inner, task_cx)
54 }
55
56 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
57 let this = self.project();
58 let _guard = this.otel_cx.clone().attach();
59 T::start_send(this.inner, item)
60 }
61
62 fn poll_flush(
63 self: Pin<&mut Self>,
64 task_cx: &mut TaskContext<'_>,
65 ) -> Poll<Result<(), Self::Error>> {
66 let this = self.project();
67 let _guard = this.otel_cx.clone().attach();
68 T::poll_flush(this.inner, task_cx)
69 }
70
71 fn poll_close(
72 self: Pin<&mut Self>,
73 task_cx: &mut TaskContext<'_>,
74 ) -> Poll<Result<(), Self::Error>> {
75 let this = self.project();
76 let _enter = this.otel_cx.clone().attach();
77 T::poll_close(this.inner, task_cx)
78 }
79}
80
81pub trait FutureExt: Sized {
83 fn with_context(self, otel_cx: Context) -> WithContext<Self> {
91 WithContext {
92 inner: self,
93 otel_cx,
94 }
95 }
96
97 fn with_current_context(self) -> WithContext<Self> {
105 let otel_cx = Context::current();
106 self.with_context(otel_cx)
107 }
108}