yew_agent/reactor/
hooks.rs1use std::any::type_name;
2use std::fmt;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use futures::sink::SinkExt;
7use futures::stream::{SplitSink, StreamExt};
8use wasm_bindgen::UnwrapThrowExt;
9use yew::platform::pinned::RwLock;
10use yew::platform::spawn_local;
11use yew::prelude::*;
12
13use super::provider::ReactorProviderState;
14use super::{Reactor, ReactorBridge, ReactorScoped};
15use crate::utils::{BridgeIdState, OutputsAction, OutputsState};
16
17type ReactorTx<R> =
18 Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
19
20pub enum ReactorEvent<R>
22where
23 R: Reactor,
24{
25 Output(<R::Scope as ReactorScoped>::Output),
27 Finished,
29}
30
31impl<R> fmt::Debug for ReactorEvent<R>
32where
33 R: Reactor,
34 <R::Scope as ReactorScoped>::Output: fmt::Debug,
35{
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(),
39 Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(),
40 }
41 }
42}
43
44pub struct UseReactorBridgeHandle<R>
46where
47 R: 'static + Reactor,
48{
49 tx: ReactorTx<R>,
50 ctr: UseReducerDispatcher<BridgeIdState>,
51}
52
53impl<R> fmt::Debug for UseReactorBridgeHandle<R>
54where
55 R: 'static + Reactor,
56 <R::Scope as ReactorScoped>::Input: fmt::Debug,
57{
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct(type_name::<Self>())
60 .field("inner", &self.tx)
61 .finish()
62 }
63}
64
65impl<R> Clone for UseReactorBridgeHandle<R>
66where
67 R: 'static + Reactor,
68{
69 fn clone(&self) -> Self {
70 Self {
71 tx: self.tx.clone(),
72 ctr: self.ctr.clone(),
73 }
74 }
75}
76
77impl<R> UseReactorBridgeHandle<R>
78where
79 R: 'static + Reactor,
80{
81 pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
83 let tx = self.tx.clone();
84 spawn_local(async move {
85 let mut tx = tx.write().await;
86 let _ = tx.send(msg).await;
87 });
88 }
89
90 pub fn reset(&self) {
94 self.ctr.dispatch(());
95 }
96}
97
98impl<R> PartialEq for UseReactorBridgeHandle<R>
99where
100 R: 'static + Reactor,
101{
102 fn eq(&self, rhs: &Self) -> bool {
103 self.ctr == rhs.ctr
104 }
105}
106
107#[hook]
116pub fn use_reactor_bridge<R, F>(on_output: F) -> UseReactorBridgeHandle<R>
117where
118 R: 'static + Reactor,
119 F: Fn(ReactorEvent<R>) + 'static,
120{
121 let ctr = use_reducer(BridgeIdState::default);
122
123 let worker_state = use_context::<ReactorProviderState<R>>()
124 .expect_throw("cannot find a provider for current agent.");
125
126 let on_output = Rc::new(on_output);
127
128 let on_output_ref = {
129 let on_output = on_output.clone();
130 use_mut_ref(move || on_output)
131 };
132
133 {
135 let mut on_output_ref = on_output_ref.borrow_mut();
136 *on_output_ref = on_output;
137 }
138
139 let tx = use_memo((worker_state, ctr.inner), |(state, _ctr)| {
140 let bridge = state.create_bridge();
141
142 let (tx, mut rx) = bridge.split();
143
144 spawn_local(async move {
145 while let Some(m) = rx.next().await {
146 let on_output = on_output_ref.borrow().clone();
147 on_output(ReactorEvent::<R>::Output(m));
148 }
149
150 let on_output = on_output_ref.borrow().clone();
151 on_output(ReactorEvent::<R>::Finished);
152 });
153
154 RwLock::new(tx)
155 });
156
157 UseReactorBridgeHandle {
158 tx: tx.clone(),
159 ctr: ctr.dispatcher(),
160 }
161}
162
163pub struct UseReactorSubscriptionHandle<R>
165where
166 R: 'static + Reactor,
167{
168 bridge: UseReactorBridgeHandle<R>,
169 outputs: Vec<Rc<<R::Scope as ReactorScoped>::Output>>,
170 finished: bool,
171 ctr: usize,
172}
173
174impl<R> UseReactorSubscriptionHandle<R>
175where
176 R: 'static + Reactor,
177{
178 pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
180 self.bridge.send(msg);
181 }
182
183 pub fn finished(&self) -> bool {
185 self.finished
186 }
187
188 pub fn reset(&self) {
193 self.bridge.reset();
194 }
195}
196
197impl<R> Clone for UseReactorSubscriptionHandle<R>
198where
199 R: 'static + Reactor,
200{
201 fn clone(&self) -> Self {
202 Self {
203 bridge: self.bridge.clone(),
204 outputs: self.outputs.clone(),
205 ctr: self.ctr,
206 finished: self.finished,
207 }
208 }
209}
210
211impl<R> fmt::Debug for UseReactorSubscriptionHandle<R>
212where
213 R: 'static + Reactor,
214 <R::Scope as ReactorScoped>::Input: fmt::Debug,
215 <R::Scope as ReactorScoped>::Output: fmt::Debug,
216{
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 f.debug_struct(type_name::<Self>())
219 .field("bridge", &self.bridge)
220 .field("outputs", &self.outputs)
221 .finish()
222 }
223}
224
225impl<R> Deref for UseReactorSubscriptionHandle<R>
226where
227 R: 'static + Reactor,
228{
229 type Target = [Rc<<R::Scope as ReactorScoped>::Output>];
230
231 fn deref(&self) -> &Self::Target {
232 &self.outputs
233 }
234}
235
236impl<R> PartialEq for UseReactorSubscriptionHandle<R>
237where
238 R: 'static + Reactor,
239{
240 fn eq(&self, rhs: &Self) -> bool {
241 self.bridge == rhs.bridge && self.ctr == rhs.ctr
242 }
243}
244
245#[hook]
249pub fn use_reactor_subscription<R>() -> UseReactorSubscriptionHandle<R>
250where
251 R: 'static + Reactor,
252{
253 let outputs = use_reducer(OutputsState::<<R::Scope as ReactorScoped>::Output>::default);
254
255 let bridge = {
256 let outputs = outputs.clone();
257 use_reactor_bridge::<R, _>(move |output| {
258 outputs.dispatch(match output {
259 ReactorEvent::Output(m) => OutputsAction::Push(m.into()),
260 ReactorEvent::Finished => OutputsAction::Close,
261 })
262 })
263 };
264
265 {
266 let outputs = outputs.clone();
267 use_effect_with(bridge.clone(), move |_| {
268 outputs.dispatch(OutputsAction::Reset);
269
270 || {}
271 });
272 }
273
274 UseReactorSubscriptionHandle {
275 bridge,
276 outputs: outputs.inner.clone(),
277 ctr: outputs.ctr,
278 finished: outputs.closed,
279 }
280}