yew_agent/reactor/
worker.rs1use std::collections::HashMap;
2use std::convert::Infallible;
3
4use futures::sink;
5use futures::stream::StreamExt;
6use pinned::mpsc;
7use pinned::mpsc::UnboundedSender;
8use wasm_bindgen_futures::spawn_local;
9
10use super::messages::{ReactorInput, ReactorOutput};
11use super::scope::ReactorScoped;
12use super::traits::Reactor;
13use crate::worker::{HandlerId, Worker, WorkerDestroyHandle, WorkerScope};
14
15pub(crate) enum Message {
16 ReactorExited(HandlerId),
17}
18
19pub(crate) struct ReactorWorker<R>
20where
21 R: 'static + Reactor,
22{
23 senders: HashMap<HandlerId, UnboundedSender<<R::Scope as ReactorScoped>::Input>>,
24 destruct_handle: Option<WorkerDestroyHandle<Self>>,
25}
26
27impl<R> Worker for ReactorWorker<R>
28where
29 R: 'static + Reactor,
30{
31 type Input = ReactorInput<<R::Scope as ReactorScoped>::Input>;
32 type Message = Message;
33 type Output = ReactorOutput<<R::Scope as ReactorScoped>::Output>;
34
35 fn create(_scope: &WorkerScope<Self>) -> Self {
36 Self {
37 senders: HashMap::new(),
38 destruct_handle: None,
39 }
40 }
41
42 fn update(&mut self, scope: &WorkerScope<Self>, msg: Self::Message) {
43 match msg {
44 Self::Message::ReactorExited(id) => {
45 scope.respond(id, ReactorOutput::Finish);
46 self.senders.remove(&id);
47 }
48 }
49
50 if self.destruct_handle.is_some() && self.senders.is_empty() {
52 self.destruct_handle = None;
53 }
54 }
55
56 fn connected(&mut self, scope: &WorkerScope<Self>, id: HandlerId) {
57 let from_bridge = {
58 let (tx, rx) = mpsc::unbounded();
59 self.senders.insert(id, tx);
60
61 rx
62 };
63
64 let to_bridge = {
65 let scope_ = scope.clone();
66 let (tx, mut rx) = mpsc::unbounded();
67 spawn_local(async move {
68 while let Some(m) = rx.next().await {
69 scope_.respond(id, ReactorOutput::Output(m));
70 }
71 });
72
73 sink::unfold((), move |_, item: <R::Scope as ReactorScoped>::Output| {
74 let tx = tx.clone();
75
76 async move {
77 let _ = tx.send_now(item);
78
79 Ok::<(), Infallible>(())
80 }
81 })
82 };
83
84 let reactor_scope = ReactorScoped::new(from_bridge, to_bridge);
85
86 let reactor = R::create(reactor_scope);
87
88 scope.send_future(async move {
89 reactor.await;
90
91 Message::ReactorExited(id)
92 });
93 }
94
95 fn received(&mut self, _scope: &WorkerScope<Self>, input: Self::Input, id: HandlerId) {
96 match input {
97 Self::Input::Input(input) => {
98 if let Some(m) = self.senders.get_mut(&id) {
99 let _result = m.send_now(input);
100 }
101 }
102 }
103 }
104
105 fn disconnected(&mut self, _scope: &WorkerScope<Self>, id: HandlerId) {
106 if let Some(m) = self.senders.get_mut(&id) {
108 m.close_now();
109 }
110 }
111
112 fn destroy(&mut self, _scope: &WorkerScope<Self>, destruct: WorkerDestroyHandle<Self>) {
113 if !self.senders.is_empty() {
114 self.destruct_handle = Some(destruct);
115 }
116 }
117}