yew_agent/reactor/
bridge.rs1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::sink::Sink;
6use futures::stream::{FusedStream, Stream};
7use pinned::mpsc;
8use pinned::mpsc::{UnboundedReceiver, UnboundedSender};
9use thiserror::Error;
10
11use super::messages::{ReactorInput, ReactorOutput};
12use super::scope::ReactorScoped;
13use super::traits::Reactor;
14use super::worker::ReactorWorker;
15use crate::worker::{WorkerBridge, WorkerSpawner};
16use crate::Codec;
17
18pub struct ReactorBridge<R>
22where
23 R: Reactor + 'static,
24{
25 inner: WorkerBridge<ReactorWorker<R>>,
26 rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
27}
28
29impl<R> fmt::Debug for ReactorBridge<R>
30where
31 R: Reactor,
32{
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 f.write_str("ReactorBridge<_>")
35 }
36}
37
38impl<R> ReactorBridge<R>
39where
40 R: Reactor + 'static,
41{
42 #[inline(always)]
43 pub(crate) fn new(
44 inner: WorkerBridge<ReactorWorker<R>>,
45 rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
46 ) -> Self {
47 Self { inner, rx }
48 }
49
50 pub(crate) fn output_callback(
51 tx: &UnboundedSender<<R::Scope as ReactorScoped>::Output>,
52 output: ReactorOutput<<R::Scope as ReactorScoped>::Output>,
53 ) {
54 match output {
55 ReactorOutput::Output(m) => {
56 let _ = tx.send_now(m);
57 }
58 ReactorOutput::Finish => {
59 tx.close_now();
60 }
61 }
62 }
63
64 #[inline(always)]
65 pub(crate) fn register_callback<CODEC>(
66 spawner: &mut WorkerSpawner<ReactorWorker<R>, CODEC>,
67 ) -> UnboundedReceiver<<R::Scope as ReactorScoped>::Output>
68 where
69 CODEC: Codec,
70 {
71 let (tx, rx) = mpsc::unbounded();
72 spawner.callback(move |output| Self::output_callback(&tx, output));
73
74 rx
75 }
76
77 pub fn fork(&self) -> Self {
81 let (tx, rx) = mpsc::unbounded();
82 let inner = self
83 .inner
84 .fork(Some(move |output| Self::output_callback(&tx, output)));
85
86 Self { inner, rx }
87 }
88
89 pub fn send_input(&self, msg: <R::Scope as ReactorScoped>::Input) {
91 self.inner.send(ReactorInput::Input(msg));
92 }
93}
94
95impl<R> Stream for ReactorBridge<R>
96where
97 R: Reactor + 'static,
98{
99 type Item = <R::Scope as ReactorScoped>::Output;
100
101 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 Pin::new(&mut self.rx).poll_next(cx)
103 }
104
105 fn size_hint(&self) -> (usize, Option<usize>) {
106 self.rx.size_hint()
107 }
108}
109
110impl<R> FusedStream for ReactorBridge<R>
111where
112 R: Reactor + 'static,
113{
114 fn is_terminated(&self) -> bool {
115 self.rx.is_terminated()
116 }
117}
118
119#[derive(Error, Clone, PartialEq, Eq, Debug)]
121pub enum ReactorBridgeSinkError {
122 #[error("attempting to close the bridge via the sink")]
124 AttemptClosure,
125}
126
127impl<R> Sink<<R::Scope as ReactorScoped>::Input> for ReactorBridge<R>
128where
129 R: Reactor + 'static,
130{
131 type Error = ReactorBridgeSinkError;
132
133 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134 Poll::Ready(Err(ReactorBridgeSinkError::AttemptClosure))
135 }
136
137 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 Poll::Ready(Ok(()))
139 }
140
141 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142 Poll::Ready(Ok(()))
143 }
144
145 fn start_send(
146 self: Pin<&mut Self>,
147 item: <R::Scope as ReactorScoped>::Input,
148 ) -> Result<(), Self::Error> {
149 self.send_input(item);
150
151 Ok(())
152 }
153}