1use std::any::type_name;
4use std::fmt;
5use std::rc::Rc;
6
7use futures::stream::SplitSink;
8use futures::{SinkExt, StreamExt};
9use wasm_bindgen::UnwrapThrowExt;
10use yew::html::Scope;
11use yew::platform::pinned::RwLock;
12use yew::platform::spawn_local;
13use yew::prelude::*;
14
15use crate::oneshot::{Oneshot, OneshotProviderState};
16use crate::reactor::{Reactor, ReactorBridge, ReactorEvent, ReactorProviderState, ReactorScoped};
17use crate::worker::{Worker, WorkerBridge, WorkerProviderState};
18
19#[derive(Debug)]
21pub struct WorkerBridgeHandle<W>
22where
23 W: Worker,
24{
25 inner: WorkerBridge<W>,
26}
27
28impl<W> WorkerBridgeHandle<W>
29where
30 W: Worker,
31{
32 pub fn send(&self, input: W::Input) {
34 self.inner.send(input)
35 }
36}
37
38type ReactorTx<R> =
39 Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
40
41pub struct ReactorBridgeHandle<R>
43where
44 R: Reactor + 'static,
45{
46 tx: ReactorTx<R>,
47}
48
49impl<R> fmt::Debug for ReactorBridgeHandle<R>
50where
51 R: Reactor + 'static,
52{
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
55 }
56}
57
58impl<R> ReactorBridgeHandle<R>
59where
60 R: Reactor + 'static,
61{
62 pub fn send(&self, input: <R::Scope as ReactorScoped>::Input) {
64 let tx = self.tx.clone();
65 spawn_local(async move {
66 let mut tx = tx.write().await;
67 let _ = tx.send(input).await;
68 });
69 }
70}
71
72pub trait AgentScopeExt {
76 fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
78 where
79 W: Worker + 'static;
80
81 fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
83 where
84 R: Reactor + 'static,
85 <R::Scope as ReactorScoped>::Output: 'static;
86
87 fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
89 where
90 T: Oneshot + 'static;
91}
92
93impl<COMP> AgentScopeExt for Scope<COMP>
94where
95 COMP: Component,
96{
97 fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
98 where
99 W: Worker + 'static,
100 {
101 let inner = self
102 .context::<Rc<WorkerProviderState<W>>>((|_| {}).into())
103 .expect_throw("failed to bridge to agent.")
104 .0
105 .create_bridge(callback);
106
107 WorkerBridgeHandle { inner }
108 }
109
110 fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
111 where
112 R: Reactor + 'static,
113 <R::Scope as ReactorScoped>::Output: 'static,
114 {
115 let (tx, mut rx) = self
116 .context::<ReactorProviderState<R>>((|_| {}).into())
117 .expect_throw("failed to bridge to agent.")
118 .0
119 .create_bridge()
120 .split();
121
122 spawn_local(async move {
123 while let Some(m) = rx.next().await {
124 callback.emit(ReactorEvent::<R>::Output(m));
125 }
126
127 callback.emit(ReactorEvent::<R>::Finished);
128 });
129
130 let tx = Rc::new(RwLock::new(tx));
131
132 ReactorBridgeHandle { tx }
133 }
134
135 fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
136 where
137 T: Oneshot + 'static,
138 {
139 let (inner, _) = self
140 .context::<OneshotProviderState<T>>((|_| {}).into())
141 .expect_throw("failed to bridge to agent.");
142
143 spawn_local(async move { callback.emit(inner.create_bridge().run(input).await) });
144 }
145}