This is unreleased documentation for Yew Next version.
For up-to-date documentation, see the latest version on docs.rs.

yew_agent/reactor/
worker.rs

1use std::collections::HashMap;
2use std::convert::Infallible;
3
4use futures::sink;
5use futures::stream::StreamExt;
6use pinned::mpsc;
7use pinned::mpsc::UnboundedSender;
8use wasm_bindgen_futures::spawn_local;
9
10use super::messages::{ReactorInput, ReactorOutput};
11use super::scope::ReactorScoped;
12use super::traits::Reactor;
13use crate::worker::{HandlerId, Worker, WorkerDestroyHandle, WorkerScope};
14
15pub(crate) enum Message {
16    ReactorExited(HandlerId),
17}
18
19pub(crate) struct ReactorWorker<R>
20where
21    R: 'static + Reactor,
22{
23    senders: HashMap<HandlerId, UnboundedSender<<R::Scope as ReactorScoped>::Input>>,
24    destruct_handle: Option<WorkerDestroyHandle<Self>>,
25}
26
27impl<R> Worker for ReactorWorker<R>
28where
29    R: 'static + Reactor,
30{
31    type Input = ReactorInput<<R::Scope as ReactorScoped>::Input>;
32    type Message = Message;
33    type Output = ReactorOutput<<R::Scope as ReactorScoped>::Output>;
34
35    fn create(_scope: &WorkerScope<Self>) -> Self {
36        Self {
37            senders: HashMap::new(),
38            destruct_handle: None,
39        }
40    }
41
42    fn update(&mut self, scope: &WorkerScope<Self>, msg: Self::Message) {
43        match msg {
44            Self::Message::ReactorExited(id) => {
45                scope.respond(id, ReactorOutput::Finish);
46                self.senders.remove(&id);
47            }
48        }
49
50        // All reactors have closed themselves, the worker can now close.
51        if self.destruct_handle.is_some() && self.senders.is_empty() {
52            self.destruct_handle = None;
53        }
54    }
55
56    fn connected(&mut self, scope: &WorkerScope<Self>, id: HandlerId) {
57        let from_bridge = {
58            let (tx, rx) = mpsc::unbounded();
59            self.senders.insert(id, tx);
60
61            rx
62        };
63
64        let to_bridge = {
65            let scope_ = scope.clone();
66            let (tx, mut rx) = mpsc::unbounded();
67            spawn_local(async move {
68                while let Some(m) = rx.next().await {
69                    scope_.respond(id, ReactorOutput::Output(m));
70                }
71            });
72
73            sink::unfold((), move |_, item: <R::Scope as ReactorScoped>::Output| {
74                let tx = tx.clone();
75
76                async move {
77                    let _ = tx.send_now(item);
78
79                    Ok::<(), Infallible>(())
80                }
81            })
82        };
83
84        let reactor_scope = ReactorScoped::new(from_bridge, to_bridge);
85
86        let reactor = R::create(reactor_scope);
87
88        scope.send_future(async move {
89            reactor.await;
90
91            Message::ReactorExited(id)
92        });
93    }
94
95    fn received(&mut self, _scope: &WorkerScope<Self>, input: Self::Input, id: HandlerId) {
96        match input {
97            Self::Input::Input(input) => {
98                if let Some(m) = self.senders.get_mut(&id) {
99                    let _result = m.send_now(input);
100                }
101            }
102        }
103    }
104
105    fn disconnected(&mut self, _scope: &WorkerScope<Self>, id: HandlerId) {
106        // We close this channel, but drop it when the reactor has exited itself.
107        if let Some(m) = self.senders.get_mut(&id) {
108            m.close_now();
109        }
110    }
111
112    fn destroy(&mut self, _scope: &WorkerScope<Self>, destruct: WorkerDestroyHandle<Self>) {
113        if !self.senders.is_empty() {
114            self.destruct_handle = Some(destruct);
115        }
116    }
117}