This is unreleased documentation for Yew Next version.
For up-to-date documentation, see the latest version on docs.rs.

yew_agent/oneshot/
bridge.rs

1use futures::stream::StreamExt;
2use pinned::mpsc;
3use pinned::mpsc::UnboundedReceiver;
4
5use super::traits::Oneshot;
6use super::worker::OneshotWorker;
7use crate::codec::Codec;
8use crate::worker::{WorkerBridge, WorkerSpawner};
9
10/// A connection manager for components interaction with oneshot workers.
11#[derive(Debug)]
12pub struct OneshotBridge<N>
13where
14    N: Oneshot + 'static,
15{
16    inner: WorkerBridge<OneshotWorker<N>>,
17    rx: UnboundedReceiver<N::Output>,
18}
19
20impl<N> OneshotBridge<N>
21where
22    N: Oneshot + 'static,
23{
24    #[inline(always)]
25    pub(crate) fn new(
26        inner: WorkerBridge<OneshotWorker<N>>,
27        rx: UnboundedReceiver<N::Output>,
28    ) -> Self {
29        Self { inner, rx }
30    }
31
32    #[inline(always)]
33    pub(crate) fn register_callback<CODEC>(
34        spawner: &mut WorkerSpawner<OneshotWorker<N>, CODEC>,
35    ) -> UnboundedReceiver<N::Output>
36    where
37        CODEC: Codec,
38    {
39        let (tx, rx) = mpsc::unbounded();
40        spawner.callback(move |output| {
41            let _ = tx.send_now(output);
42        });
43
44        rx
45    }
46
47    /// Forks the bridge.
48    ///
49    /// This method creates a new bridge that can be used to execute tasks on the same worker
50    /// instance.
51    pub fn fork(&self) -> Self {
52        let (tx, rx) = mpsc::unbounded();
53        let inner = self.inner.fork(Some(move |output| {
54            let _ = tx.send_now(output);
55        }));
56
57        Self { inner, rx }
58    }
59
60    /// Run the the current oneshot worker once in the current worker instance.
61    pub async fn run(&mut self, input: N::Input) -> N::Output {
62        // &mut self guarantees that the bridge will be
63        // exclusively borrowed during the time the oneshot worker is running.
64        self.inner.send(input);
65
66        // For each bridge, there can only be 1 active task running on the worker instance.
67        // The next output will be the output for the input that we just sent.
68        self.rx
69            .next()
70            .await
71            .expect("failed to receive result from worker")
72    }
73}