This is unreleased documentation for Yew Next version.
For up-to-date documentation, see the latest version on docs.rs.

yew_agent/reactor/
bridge.rs

1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::sink::Sink;
6use futures::stream::{FusedStream, Stream};
7use pinned::mpsc;
8use pinned::mpsc::{UnboundedReceiver, UnboundedSender};
9use thiserror::Error;
10
11use super::messages::{ReactorInput, ReactorOutput};
12use super::scope::ReactorScoped;
13use super::traits::Reactor;
14use super::worker::ReactorWorker;
15use crate::worker::{WorkerBridge, WorkerSpawner};
16use crate::Codec;
17
18/// A connection manager for components interaction with oneshot workers.
19///
20/// As this type implements [Stream] + [Sink], it can be splitted with [`StreamExt::split`].
21pub struct ReactorBridge<R>
22where
23    R: Reactor + 'static,
24{
25    inner: WorkerBridge<ReactorWorker<R>>,
26    rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
27}
28
29impl<R> fmt::Debug for ReactorBridge<R>
30where
31    R: Reactor,
32{
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.write_str("ReactorBridge<_>")
35    }
36}
37
38impl<R> ReactorBridge<R>
39where
40    R: Reactor + 'static,
41{
42    #[inline(always)]
43    pub(crate) fn new(
44        inner: WorkerBridge<ReactorWorker<R>>,
45        rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
46    ) -> Self {
47        Self { inner, rx }
48    }
49
50    pub(crate) fn output_callback(
51        tx: &UnboundedSender<<R::Scope as ReactorScoped>::Output>,
52        output: ReactorOutput<<R::Scope as ReactorScoped>::Output>,
53    ) {
54        match output {
55            ReactorOutput::Output(m) => {
56                let _ = tx.send_now(m);
57            }
58            ReactorOutput::Finish => {
59                tx.close_now();
60            }
61        }
62    }
63
64    #[inline(always)]
65    pub(crate) fn register_callback<CODEC>(
66        spawner: &mut WorkerSpawner<ReactorWorker<R>, CODEC>,
67    ) -> UnboundedReceiver<<R::Scope as ReactorScoped>::Output>
68    where
69        CODEC: Codec,
70    {
71        let (tx, rx) = mpsc::unbounded();
72        spawner.callback(move |output| Self::output_callback(&tx, output));
73
74        rx
75    }
76
77    /// Forks the bridge.
78    ///
79    /// This method creates a new bridge connected to a new reactor on the same worker instance.
80    pub fn fork(&self) -> Self {
81        let (tx, rx) = mpsc::unbounded();
82        let inner = self
83            .inner
84            .fork(Some(move |output| Self::output_callback(&tx, output)));
85
86        Self { inner, rx }
87    }
88
89    /// Sends an input to the current reactor.
90    pub fn send_input(&self, msg: <R::Scope as ReactorScoped>::Input) {
91        self.inner.send(ReactorInput::Input(msg));
92    }
93}
94
95impl<R> Stream for ReactorBridge<R>
96where
97    R: Reactor + 'static,
98{
99    type Item = <R::Scope as ReactorScoped>::Output;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        Pin::new(&mut self.rx).poll_next(cx)
103    }
104
105    fn size_hint(&self) -> (usize, Option<usize>) {
106        self.rx.size_hint()
107    }
108}
109
110impl<R> FusedStream for ReactorBridge<R>
111where
112    R: Reactor + 'static,
113{
114    fn is_terminated(&self) -> bool {
115        self.rx.is_terminated()
116    }
117}
118
119/// An error type for bridge sink.
120#[derive(Error, Clone, PartialEq, Eq, Debug)]
121pub enum ReactorBridgeSinkError {
122    /// A bridge is an RAII Guard, it can only be closed by dropping the value.
123    #[error("attempting to close the bridge via the sink")]
124    AttemptClosure,
125}
126
127impl<R> Sink<<R::Scope as ReactorScoped>::Input> for ReactorBridge<R>
128where
129    R: Reactor + 'static,
130{
131    type Error = ReactorBridgeSinkError;
132
133    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134        Poll::Ready(Err(ReactorBridgeSinkError::AttemptClosure))
135    }
136
137    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        Poll::Ready(Ok(()))
139    }
140
141    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142        Poll::Ready(Ok(()))
143    }
144
145    fn start_send(
146        self: Pin<&mut Self>,
147        item: <R::Scope as ReactorScoped>::Input,
148    ) -> Result<(), Self::Error> {
149        self.send_input(item);
150
151        Ok(())
152    }
153}