Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42    /// Returns the [`SingletonBoundKind`] corresponding to this type.
43    fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47    type UnderlyingBound = Unbounded;
48
49    type StreamToMonotone = Monotonic;
50
51    fn bound_kind() -> SingletonBoundKind {
52        SingletonBoundKind::Unbounded
53    }
54}
55
56impl SingletonBound for Bounded {
57    type UnderlyingBound = Bounded;
58
59    type StreamToMonotone = Bounded;
60
61    fn bound_kind() -> SingletonBoundKind {
62        SingletonBoundKind::Bounded
63    }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70    type UnderlyingBound = Unbounded;
71
72    type StreamToMonotone = Monotonic;
73
74    fn bound_kind() -> SingletonBoundKind {
75        SingletonBoundKind::Monotonic
76    }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82    label = "required here",
83    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111    pub(crate) location: Loc,
112    pub(crate) ir_node: RefCell<HydroNode>,
113    pub(crate) flow_state: FlowState,
114
115    _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119    fn drop(&mut self) {
120        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123                input: Box::new(ir_node),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126        }
127    }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132    T: Clone,
133    L: Location<'a>,
134{
135    fn from(value: Singleton<T, L, Bounded>) -> Self {
136        let location = value.location().clone();
137        Singleton::new(
138            location.clone(),
139            HydroNode::UnboundSingleton {
140                inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
141                metadata: location
142                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
143            },
144        )
145    }
146}
147
148impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
149where
150    L: Location<'a>,
151{
152    type Location = Tick<L>;
153
154    fn location(&self) -> &Self::Location {
155        self.location()
156    }
157
158    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
159        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
160            location.clone(),
161            HydroNode::DeferTick {
162                input: Box::new(HydroNode::CycleSource {
163                    cycle_id,
164                    metadata: location.new_node_metadata(Self::collection_kind()),
165                }),
166                metadata: location
167                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
168            },
169        );
170
171        from_previous_tick.unwrap_or(initial)
172    }
173}
174
175impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
176where
177    L: Location<'a>,
178{
179    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
180        assert_eq!(
181            Location::id(&self.location),
182            expected_location,
183            "locations do not match"
184        );
185        self.location
186            .flow_state()
187            .borrow_mut()
188            .push_root(HydroRoot::CycleSink {
189                cycle_id,
190                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
191                op_metadata: HydroIrOpMetadata::new(),
192            });
193    }
194}
195
196impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
197where
198    L: Location<'a>,
199{
200    type Location = L;
201
202    fn create_source(cycle_id: CycleId, location: L) -> Self {
203        Singleton::new(
204            location.clone(),
205            HydroNode::CycleSource {
206                cycle_id,
207                metadata: location.new_node_metadata(Self::collection_kind()),
208            },
209        )
210    }
211}
212
213impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
214where
215    L: Location<'a>,
216{
217    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
218        assert_eq!(
219            Location::id(&self.location),
220            expected_location,
221            "locations do not match"
222        );
223        self.location
224            .flow_state()
225            .borrow_mut()
226            .push_root(HydroRoot::CycleSink {
227                cycle_id,
228                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
229                op_metadata: HydroIrOpMetadata::new(),
230            });
231    }
232}
233
234impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
235where
236    T: Clone,
237    L: Location<'a>,
238{
239    fn clone(&self) -> Self {
240        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
241            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
242            *self.ir_node.borrow_mut() = HydroNode::Tee {
243                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
244                metadata: self.location.new_node_metadata(Self::collection_kind()),
245            };
246        }
247
248        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
249            Singleton {
250                location: self.location.clone(),
251                flow_state: self.flow_state.clone(),
252                ir_node: HydroNode::Tee {
253                    inner: SharedNode(inner.0.clone()),
254                    metadata: metadata.clone(),
255                }
256                .into(),
257                _phantom: PhantomData,
258            }
259        } else {
260            unreachable!()
261        }
262    }
263}
264
265#[cfg(stageleft_runtime)]
266fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
267    me: Singleton<T, Tick<L>, B>,
268    other: Optional<O, Tick<L>, B::UnderlyingBound>,
269) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
270    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
271    super::optional::zip_inside_tick(me_as_optional, other)
272}
273
274impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
275where
276    L: Location<'a>,
277{
278    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
279        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
280        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
281        let flow_state = location.flow_state().clone();
282        Singleton {
283            location,
284            flow_state,
285            ir_node: RefCell::new(ir_node),
286            _phantom: PhantomData,
287        }
288    }
289
290    pub(crate) fn collection_kind() -> CollectionKind {
291        CollectionKind::Singleton {
292            bound: B::bound_kind(),
293            element_type: stageleft::quote_type::<T>().into(),
294        }
295    }
296
297    /// Returns the [`Location`] where this singleton is being materialized.
298    pub fn location(&self) -> &L {
299        &self.location
300    }
301
302    /// Creates a lightweight reference handle to this singleton that can be captured
303    /// inside `q!()` closures. The handle resolves to `&T` at runtime.
304    ///
305    /// The singleton must be bounded, otherwise reading it would be non-deterministic.
306    ///
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(async {
312    /// # let mut deployment = hydro_deploy::Deployment::new();
313    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
314    /// # let process = builder.process::<()>();
315    /// # let external = builder.external::<()>();
316    /// let my_count = process
317    ///     .source_iter(q!(0..5i32))
318    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
319    /// let count_ref = my_count.by_ref();
320    /// let out_port = process
321    ///     .source_iter(q!(1..=3i32))
322    ///     .map(q!(|x| x + *count_ref))
323    ///     .send_bincode_external(&external);
324    /// # let nodes = builder
325    /// #     .with_default_optimize()
326    /// #     .with_process(&process, deployment.Localhost())
327    /// #     .with_external(&external, deployment.Localhost())
328    /// #     .deploy(&mut deployment);
329    /// # deployment.deploy().await.unwrap();
330    /// # let mut out_recv = nodes.connect(out_port).await;
331    /// # deployment.start().await.unwrap();
332    /// # let mut results = Vec::new();
333    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
334    /// # results.sort();
335    /// // fold(0..5) = 10, so results are 11, 12, 13
336    /// # assert_eq!(results, vec![11, 12, 13]);
337    /// # });
338    /// # }
339    /// ```
340    pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, '_, T, L>
341    where
342        B: IsBounded,
343    {
344        crate::singleton_ref::SingletonRef::new(&self.ir_node)
345    }
346
347    /// Weakens the consistency of this live collection to not guarantee any consistency across
348    /// cluster members (if this collection is on a cluster).
349    pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
350    where
351        L: Location<'a>,
352    {
353        if L::consistency()
354            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
355        {
356            // already no consistency
357            Singleton::new(
358                self.location.drop_consistency(),
359                self.ir_node.replace(HydroNode::Placeholder),
360            )
361        } else {
362            Singleton::new(
363                self.location.drop_consistency(),
364                HydroNode::Cast {
365                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366                    metadata:
367                        self.location
368                            .clone()
369                            .drop_consistency()
370                            .new_node_metadata(
371                                Singleton::<T, L::DropConsistency, B>::collection_kind(),
372                            ),
373                },
374            )
375        }
376    }
377
378    /// Casts this live collection to have the consistency guarantees specified in the given
379    /// location type parameter. The developer must ensure that the strengthened consistency
380    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
381    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
382        self,
383        _proof: impl crate::properties::ConsistencyProof,
384    ) -> Singleton<T, L2, B>
385    where
386        L: Location<'a>,
387    {
388        if L::consistency() == L2::consistency() {
389            Singleton::new(
390                self.location.with_consistency_of(),
391                self.ir_node.replace(HydroNode::Placeholder),
392            )
393        } else {
394            Singleton::new(
395                self.location.with_consistency_of(),
396                HydroNode::AssertIsConsistent {
397                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
398                    trusted: false,
399                    metadata: self
400                        .location
401                        .clone()
402                        .with_consistency_of::<L2>()
403                        .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
404                },
405            )
406        }
407    }
408
409    /// Drops the monotonicity property of the [`Singleton`].
410    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
411        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
412            Singleton::new(
413                self.location.clone(),
414                self.ir_node.replace(HydroNode::Placeholder),
415            )
416        } else {
417            Singleton::new(
418                self.location.clone(),
419                HydroNode::Cast {
420                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
421                    metadata:
422                        self.location.new_node_metadata(
423                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
424                        ),
425                },
426            )
427        }
428    }
429
430    /// Transforms the singleton value by applying a function `f` to it,
431    /// continuously as the input is updated.
432    ///
433    /// # Example
434    /// ```rust
435    /// # #[cfg(feature = "deploy")] {
436    /// # use hydro_lang::prelude::*;
437    /// # use futures::StreamExt;
438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439    /// let tick = process.tick();
440    /// let singleton = tick.singleton(q!(5));
441    /// singleton.map(q!(|v| v * 2)).all_ticks()
442    /// # }, |mut stream| async move {
443    /// // 10
444    /// # assert_eq!(stream.next().await.unwrap(), 10);
445    /// # }));
446    /// # }
447    /// ```
448    pub fn map<U, F, OP, B2: SingletonBound>(
449        self,
450        f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
451    ) -> Singleton<U, L, B2>
452    where
453        F: Fn(T) -> U + 'a,
454        B: ApplyOrderPreservingSingleton<OP, B2>,
455    {
456        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
457        proof.register_proof(&f);
458        let f = f.into();
459        Singleton::new(
460            self.location.clone(),
461            HydroNode::Map {
462                f,
463                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
464                metadata: self
465                    .location
466                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
467            },
468        )
469    }
470
471    /// Transforms the singleton value by applying a function `f` to it and then flattening
472    /// the result into a stream, preserving the order of elements.
473    ///
474    /// The function `f` is applied to the singleton value to produce an iterator, and all items
475    /// from that iterator are emitted in the output stream in deterministic order.
476    ///
477    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
478    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
479    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
480    ///
481    /// # Example
482    /// ```rust
483    /// # #[cfg(feature = "deploy")] {
484    /// # use hydro_lang::prelude::*;
485    /// # use futures::StreamExt;
486    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
487    /// let tick = process.tick();
488    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
489    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
490    /// # }, |mut stream| async move {
491    /// // 1, 2, 3
492    /// # for w in vec![1, 2, 3] {
493    /// #     assert_eq!(stream.next().await.unwrap(), w);
494    /// # }
495    /// # }));
496    /// # }
497    /// ```
498    pub fn flat_map_ordered<U, I, F>(
499        self,
500        f: impl IntoQuotedMut<'a, F, L>,
501    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
502    where
503        B: IsBounded,
504        I: IntoIterator<Item = U>,
505        F: Fn(T) -> I + 'a,
506    {
507        self.into_stream().flat_map_ordered(f)
508    }
509
510    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
511    /// for the output type `I` to produce items in any order.
512    ///
513    /// The function `f` is applied to the singleton value to produce an iterator, and all items
514    /// from that iterator are emitted in the output stream in non-deterministic order.
515    ///
516    /// # Example
517    /// ```rust
518    /// # #[cfg(feature = "deploy")] {
519    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520    /// # use futures::StreamExt;
521    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
522    /// let tick = process.tick();
523    /// let singleton = tick.singleton(q!(
524    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
525    /// ));
526    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for _ in 0..3 {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flat_map_unordered<U, I, F>(
539        self,
540        f: impl IntoQuotedMut<'a, F, L>,
541    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
542    where
543        B: IsBounded,
544        I: IntoIterator<Item = U>,
545        F: Fn(T) -> I + 'a,
546    {
547        self.into_stream().flat_map_unordered(f)
548    }
549
550    /// Flattens the singleton value into a stream, preserving the order of elements.
551    ///
552    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
553    /// are emitted in the output stream in deterministic order.
554    ///
555    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
556    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
557    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
558    ///
559    /// # Example
560    /// ```rust
561    /// # #[cfg(feature = "deploy")] {
562    /// # use hydro_lang::prelude::*;
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565    /// let tick = process.tick();
566    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
567    /// singleton.flatten_ordered().all_ticks()
568    /// # }, |mut stream| async move {
569    /// // 1, 2, 3
570    /// # for w in vec![1, 2, 3] {
571    /// #     assert_eq!(stream.next().await.unwrap(), w);
572    /// # }
573    /// # }));
574    /// # }
575    /// ```
576    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
577    where
578        B: IsBounded,
579        T: IntoIterator<Item = U>,
580    {
581        self.flat_map_ordered(q!(|x| x))
582    }
583
584    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
585    /// for the element type `T` to produce items in any order.
586    ///
587    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
588    /// are emitted in the output stream in non-deterministic order.
589    ///
590    /// # Example
591    /// ```rust
592    /// # #[cfg(feature = "deploy")] {
593    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
594    /// # use futures::StreamExt;
595    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
596    /// let tick = process.tick();
597    /// let singleton = tick.singleton(q!(
598    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
599    /// ));
600    /// singleton.flatten_unordered().all_ticks()
601    /// # }, |mut stream| async move {
602    /// // 1, 2, 3, but in no particular order
603    /// # let mut results = Vec::new();
604    /// # for _ in 0..3 {
605    /// #     results.push(stream.next().await.unwrap());
606    /// # }
607    /// # results.sort();
608    /// # assert_eq!(results, vec![1, 2, 3]);
609    /// # }));
610    /// # }
611    /// ```
612    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
613    where
614        B: IsBounded,
615        T: IntoIterator<Item = U>,
616    {
617        self.flat_map_unordered(q!(|x| x))
618    }
619
620    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
621    ///
622    /// If the predicate returns `true`, the output optional contains the same value.
623    /// If the predicate returns `false`, the output optional is empty.
624    ///
625    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
626    /// not modify or take ownership of the value. If you need to modify the value while filtering
627    /// use [`Singleton::filter_map`] instead.
628    ///
629    /// # Example
630    /// ```rust
631    /// # #[cfg(feature = "deploy")] {
632    /// # use hydro_lang::prelude::*;
633    /// # use futures::StreamExt;
634    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
635    /// let tick = process.tick();
636    /// let singleton = tick.singleton(q!(5));
637    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
638    /// # }, |mut stream| async move {
639    /// // 5
640    /// # assert_eq!(stream.next().await.unwrap(), 5);
641    /// # }));
642    /// # }
643    /// ```
644    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
645    where
646        F: Fn(&T) -> bool + 'a,
647    {
648        let f = f.splice_fn1_borrow_ctx(&self.location).into();
649        Optional::new(
650            self.location.clone(),
651            HydroNode::Filter {
652                f,
653                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
654                metadata: self
655                    .location
656                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
657            },
658        )
659    }
660
661    /// An operator that both filters and maps. It yields the value only if the supplied
662    /// closure `f` returns `Some(value)`.
663    ///
664    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
665    /// If the closure returns `None`, the output optional is empty.
666    ///
667    /// # Example
668    /// ```rust
669    /// # #[cfg(feature = "deploy")] {
670    /// # use hydro_lang::prelude::*;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673    /// let tick = process.tick();
674    /// let singleton = tick.singleton(q!("42"));
675    /// singleton
676    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
677    ///     .all_ticks()
678    /// # }, |mut stream| async move {
679    /// // 42
680    /// # assert_eq!(stream.next().await.unwrap(), 42);
681    /// # }));
682    /// # }
683    /// ```
684    pub fn filter_map<U, F>(
685        self,
686        f: impl IntoQuotedMut<'a, F, L>,
687    ) -> Optional<U, L, B::UnderlyingBound>
688    where
689        F: Fn(T) -> Option<U> + 'a,
690    {
691        let f = f.splice_fn1_ctx(&self.location).into();
692        Optional::new(
693            self.location.clone(),
694            HydroNode::FilterMap {
695                f,
696                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697                metadata: self
698                    .location
699                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
700            },
701        )
702    }
703
704    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
705    ///
706    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
707    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
708    /// non-null. This is useful for combining several pieces of state together.
709    ///
710    /// # Example
711    /// ```rust
712    /// # #[cfg(feature = "deploy")] {
713    /// # use hydro_lang::prelude::*;
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
716    /// let tick = process.tick();
717    /// let numbers = process
718    ///   .source_iter(q!(vec![123, 456]))
719    ///   .batch(&tick, nondet!(/** test */));
720    /// let count = numbers.clone().count(); // Singleton
721    /// let max = numbers.max(); // Optional
722    /// count.zip(max).all_ticks()
723    /// # }, |mut stream| async move {
724    /// // [(2, 456)]
725    /// # for w in vec![(2, 456)] {
726    /// #     assert_eq!(stream.next().await.unwrap(), w);
727    /// # }
728    /// # }));
729    /// # }
730    /// ```
731    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
732    where
733        Self: ZipResult<'a, O, Location = L>,
734        B: IsBounded,
735    {
736        check_matching_location(&self.location, &Self::other_location(&other));
737
738        if L::is_top_level()
739            && let Some(tick) = self.location.try_tick()
740        {
741            let self_location = self.location().clone();
742            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
743            let out = zip_inside_tick(
744                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
745                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
746                    other_location.clone(),
747                    HydroNode::Cast {
748                        inner: Box::new(Self::other_ir_node(other)),
749                        metadata: other_location.new_node_metadata(Optional::<
750                            <Self as ZipResult<'a, O>>::OtherType,
751                            Tick<L>,
752                            Bounded,
753                        >::collection_kind(
754                        )),
755                    },
756                )
757                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
758            )
759            .latest();
760
761            Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
762        } else {
763            Self::make(
764                self.location.clone(),
765                HydroNode::CrossSingleton {
766                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
767                    right: Box::new(Self::other_ir_node(other)),
768                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
769                        bound: B::BOUND_KIND,
770                        element_type: stageleft::quote_type::<
771                            <Self as ZipResult<'a, O>>::ElementType,
772                        >()
773                        .into(),
774                    }),
775                },
776            )
777        }
778    }
779
780    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
781    /// boolean signal is `true`, otherwise the output is null.
782    ///
783    /// # Example
784    /// ```rust
785    /// # #[cfg(feature = "deploy")] {
786    /// # use hydro_lang::prelude::*;
787    /// # use futures::StreamExt;
788    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
789    /// let tick = process.tick();
790    /// // ticks are lazy by default, forces the second tick to run
791    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
792    ///
793    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
794    /// let batch_first_tick = process
795    ///   .source_iter(q!(vec![1]))
796    ///   .batch(&tick, nondet!(/** test */));
797    /// let batch_second_tick = process
798    ///   .source_iter(q!(vec![1, 2, 3]))
799    ///   .batch(&tick, nondet!(/** test */))
800    ///   .defer_tick();
801    /// batch_first_tick.chain(batch_second_tick).count()
802    ///   .filter_if(signal)
803    ///   .all_ticks()
804    /// # }, |mut stream| async move {
805    /// // [1]
806    /// # for w in vec![1] {
807    /// #     assert_eq!(stream.next().await.unwrap(), w);
808    /// # }
809    /// # }));
810    /// # }
811    /// ```
812    pub fn filter_if(
813        self,
814        signal: Singleton<bool, L, B>,
815    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
816    where
817        B: IsBounded,
818    {
819        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
820    }
821
822    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
823    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
824    ///
825    /// Useful for conditionally processing, such as only emitting a singleton's value outside
826    /// a tick if some other condition is satisfied.
827    ///
828    /// # Example
829    /// ```rust
830    /// # #[cfg(feature = "deploy")] {
831    /// # use hydro_lang::prelude::*;
832    /// # use futures::StreamExt;
833    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
834    /// let tick = process.tick();
835    /// // ticks are lazy by default, forces the second tick to run
836    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
837    ///
838    /// let batch_first_tick = process
839    ///   .source_iter(q!(vec![1]))
840    ///   .batch(&tick, nondet!(/** test */));
841    /// let batch_second_tick = process
842    ///   .source_iter(q!(vec![1, 2, 3]))
843    ///   .batch(&tick, nondet!(/** test */))
844    ///   .defer_tick(); // appears on the second tick
845    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
846    /// batch_first_tick.chain(batch_second_tick).count()
847    ///   .filter_if_some(some_on_first_tick)
848    ///   .all_ticks()
849    /// # }, |mut stream| async move {
850    /// // [1]
851    /// # for w in vec![1] {
852    /// #     assert_eq!(stream.next().await.unwrap(), w);
853    /// # }
854    /// # }));
855    /// # }
856    /// ```
857    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
858    pub fn filter_if_some<U>(
859        self,
860        signal: Optional<U, L, B>,
861    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
862    where
863        B: IsBounded,
864    {
865        self.filter_if(signal.is_some())
866    }
867
868    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
869    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
870    ///
871    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
872    /// the condition.
873    ///
874    /// # Example
875    /// ```rust
876    /// # #[cfg(feature = "deploy")] {
877    /// # use hydro_lang::prelude::*;
878    /// # use futures::StreamExt;
879    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880    /// let tick = process.tick();
881    /// // ticks are lazy by default, forces the second tick to run
882    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
883    ///
884    /// let batch_first_tick = process
885    ///   .source_iter(q!(vec![1]))
886    ///   .batch(&tick, nondet!(/** test */));
887    /// let batch_second_tick = process
888    ///   .source_iter(q!(vec![1, 2, 3]))
889    ///   .batch(&tick, nondet!(/** test */))
890    ///   .defer_tick(); // appears on the second tick
891    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
892    /// batch_first_tick.chain(batch_second_tick).count()
893    ///   .filter_if_none(some_on_first_tick)
894    ///   .all_ticks()
895    /// # }, |mut stream| async move {
896    /// // [3]
897    /// # for w in vec![3] {
898    /// #     assert_eq!(stream.next().await.unwrap(), w);
899    /// # }
900    /// # }));
901    /// # }
902    /// ```
903    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
904    pub fn filter_if_none<U>(
905        self,
906        other: Optional<U, L, B>,
907    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
908    where
909        B: IsBounded,
910    {
911        self.filter_if(other.is_none())
912    }
913
914    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
915    ///
916    /// # Example
917    /// ```rust
918    /// # #[cfg(feature = "deploy")] {
919    /// # use hydro_lang::prelude::*;
920    /// # use futures::StreamExt;
921    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922    /// let tick = process.tick();
923    /// let a = tick.singleton(q!(5));
924    /// let b = tick.singleton(q!(5));
925    /// a.equals(b).all_ticks()
926    /// # }, |mut stream| async move {
927    /// // [true]
928    /// # assert_eq!(stream.next().await.unwrap(), true);
929    /// # }));
930    /// # }
931    /// ```
932    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
933    where
934        T: PartialEq,
935        B: IsBounded,
936    {
937        self.zip(other).map(q!(|(a, b)| a == b))
938    }
939
940    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
941    /// greater than or equal to the provided threshold. The event will have the value of the
942    /// given threshold.
943    ///
944    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
945    /// the threshold would be non-deterministic.
946    ///
947    /// # Example
948    /// ```rust
949    /// # #[cfg(feature = "deploy")] {
950    /// # use hydro_lang::prelude::*;
951    /// # use futures::StreamExt;
952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953    /// let a = // singleton 1 ~> 5 ~> 10
954    /// # process.singleton(q!(5));
955    /// let b = process.singleton(q!(4));
956    /// a.threshold_greater_or_equal(b)
957    /// # }, |mut stream| async move {
958    /// // [4]
959    /// # assert_eq!(stream.next().await.unwrap(), 4);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn threshold_greater_or_equal<B2: IsBounded>(
964        self,
965        threshold: Singleton<T, L, B2>,
966    ) -> Stream<T, L, B::UnderlyingBound>
967    where
968        T: Clone + PartialOrd,
969        B: IsMonotonic,
970    {
971        let threshold = threshold.make_bounded();
972        let self_location = self.location().clone();
973        match self.try_make_bounded() {
974            Ok(bounded) => {
975                let uncasted = threshold
976                    .zip(bounded)
977                    .into_stream()
978                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
979
980                Stream::new(
981                    uncasted.location.clone(),
982                    uncasted.ir_node.replace(HydroNode::Placeholder),
983                )
984            }
985            Err(me) => {
986                let uncasted = sliced! {
987                    let me = use(me, nondet!(/** thresholds are deterministic */));
988                    let mut remaining_threshold = use::state(|l| {
989                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
990                        as_option
991                    });
992
993                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
994                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
995                    passed.map(q!(|(t, _)| t))
996                };
997
998                Stream::new(
999                    self_location,
1000                    uncasted.ir_node.replace(HydroNode::Placeholder),
1001                )
1002            }
1003        }
1004    }
1005
1006    /// An operator which allows you to "name" a `HydroNode`.
1007    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1008    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1009        {
1010            let mut node = self.ir_node.borrow_mut();
1011            let metadata = node.metadata_mut();
1012            metadata.tag = Some(name.to_owned());
1013        }
1014        self
1015    }
1016}
1017
1018impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1019    type Output = Singleton<bool, L, B::UnderlyingBound>;
1020
1021    fn not(self) -> Self::Output {
1022        self.map(q!(|b| !b))
1023    }
1024}
1025
1026impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1027where
1028    L: Location<'a>,
1029{
1030    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1031    /// the inner `Option`.
1032    ///
1033    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1034    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1035    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1036    ///
1037    /// # Example
1038    /// ```rust
1039    /// # #[cfg(feature = "deploy")] {
1040    /// # use hydro_lang::prelude::*;
1041    /// # use futures::StreamExt;
1042    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1043    /// let tick = process.tick();
1044    /// let singleton = tick.singleton(q!(Some(42)));
1045    /// singleton.into_optional().all_ticks()
1046    /// # }, |mut stream| async move {
1047    /// // 42
1048    /// # assert_eq!(stream.next().await.unwrap(), 42);
1049    /// # }));
1050    /// # }
1051    /// ```
1052    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1053        self.filter_map(q!(|v| v))
1054    }
1055}
1056
1057impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1058where
1059    L: Location<'a>,
1060{
1061    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1062    ///
1063    /// # Example
1064    /// ```rust
1065    /// # #[cfg(feature = "deploy")] {
1066    /// # use hydro_lang::prelude::*;
1067    /// # use futures::StreamExt;
1068    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1069    /// let tick = process.tick();
1070    /// // ticks are lazy by default, forces the second tick to run
1071    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1072    ///
1073    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1074    /// let b = tick.singleton(q!(true)); // true, true
1075    /// a.and(b).all_ticks()
1076    /// # }, |mut stream| async move {
1077    /// // [true, false]
1078    /// # for w in vec![true, false] {
1079    /// #     assert_eq!(stream.next().await.unwrap(), w);
1080    /// # }
1081    /// # }));
1082    /// # }
1083    /// ```
1084    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1085    where
1086        B: IsBounded,
1087    {
1088        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1089    }
1090
1091    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1092    ///
1093    /// # Example
1094    /// ```rust
1095    /// # #[cfg(feature = "deploy")] {
1096    /// # use hydro_lang::prelude::*;
1097    /// # use futures::StreamExt;
1098    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099    /// let tick = process.tick();
1100    /// // ticks are lazy by default, forces the second tick to run
1101    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1102    ///
1103    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1104    /// let b = tick.singleton(q!(false)); // false, false
1105    /// a.or(b).all_ticks()
1106    /// # }, |mut stream| async move {
1107    /// // [true, false]
1108    /// # for w in vec![true, false] {
1109    /// #     assert_eq!(stream.next().await.unwrap(), w);
1110    /// # }
1111    /// # }));
1112    /// # }
1113    /// ```
1114    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1115    where
1116        B: IsBounded,
1117    {
1118        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1119    }
1120}
1121
1122impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1123where
1124    L: Location<'a>,
1125{
1126    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1127    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1128    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1129    /// all snapshots of this singleton into the atomic-associated tick will observe the
1130    /// same value each tick.
1131    ///
1132    /// # Non-Determinism
1133    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1134    /// the output singleton has a non-deterministic value since the snapshot can be at an
1135    /// arbitrary point in time.
1136    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1137        self,
1138        tick: &Tick<L2>,
1139        _nondet: NonDet,
1140    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1141        Singleton::new(
1142            tick.drop_consistency(),
1143            HydroNode::Batch {
1144                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1145                metadata: tick
1146                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1147            },
1148        )
1149    }
1150
1151    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1152    /// to the value will be asynchronously propagated.
1153    pub fn end_atomic(self) -> Singleton<T, L, B> {
1154        Singleton::new(
1155            self.location.tick.l.clone(),
1156            HydroNode::EndAtomic {
1157                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1158                metadata: self
1159                    .location
1160                    .tick
1161                    .l
1162                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1163            },
1164        )
1165    }
1166}
1167
1168impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1169where
1170    L: Location<'a>,
1171{
1172    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1173    /// will observe the same version of the value and will be executed synchronously before any
1174    /// outputs are yielded (in [`Optional::end_atomic`]).
1175    ///
1176    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1177    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1178    /// a different version).
1179    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1180        let id = self.location.flow_state().borrow_mut().next_clock_id();
1181        let out_location = Atomic {
1182            tick: Tick {
1183                id,
1184                l: self.location.clone(),
1185            },
1186        };
1187        Singleton::new(
1188            out_location.clone(),
1189            HydroNode::BeginAtomic {
1190                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1191                metadata: out_location
1192                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1193            },
1194        )
1195    }
1196
1197    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1198    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1199    /// relevant data that contributed to the snapshot at tick `t`.
1200    ///
1201    /// # Non-Determinism
1202    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1203    /// the output singleton has a non-deterministic value since the snapshot can be at an
1204    /// arbitrary point in time.
1205    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1206        self,
1207        tick: &Tick<L2>,
1208        _nondet: NonDet,
1209    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1210        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1211        Singleton::new(
1212            tick.drop_consistency(),
1213            HydroNode::Batch {
1214                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1215                metadata: tick
1216                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1217            },
1218        )
1219    }
1220
1221    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1222    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1223    ///
1224    /// # Non-Determinism
1225    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1226    /// to non-deterministic batching and arrival of inputs, the output stream is
1227    /// non-deterministic.
1228    pub fn sample_eager(
1229        self,
1230        nondet: NonDet,
1231    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1232        sliced! {
1233            let snapshot = use(self, nondet);
1234            snapshot.into_stream()
1235        }
1236        .weaken_retries()
1237    }
1238
1239    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1240    /// value taken at various points in time. Because the input singleton may be
1241    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1242    /// represent the value of the singleton given some prefix of the streams leading up to
1243    /// it.
1244    ///
1245    /// # Non-Determinism
1246    /// The output stream is non-deterministic in which elements are sampled, since this
1247    /// is controlled by a clock.
1248    pub fn sample_every(
1249        self,
1250        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1251        nondet: NonDet,
1252    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1253    where
1254        L: TopLevel<'a>,
1255    {
1256        let samples = self.location.source_interval(interval);
1257        sliced! {
1258            let snapshot = use(self, nondet);
1259            let sample_batch = use(samples, nondet);
1260
1261            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1262        }
1263        .weaken_retries()
1264    }
1265
1266    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1267    /// implies that `B == Bounded`.
1268    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1269    where
1270        B: IsBounded,
1271    {
1272        Singleton::new(
1273            self.location.clone(),
1274            self.ir_node.replace(HydroNode::Placeholder),
1275        )
1276    }
1277
1278    #[expect(clippy::result_large_err, reason = "internal use only")]
1279    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1280        if B::UnderlyingBound::BOUNDED {
1281            Ok(Singleton::new(
1282                self.location.clone(),
1283                self.ir_node.replace(HydroNode::Placeholder),
1284            ))
1285        } else {
1286            Err(self)
1287        }
1288    }
1289
1290    /// Clones this bounded singleton into a tick, returning a singleton that has the
1291    /// same value as the outer singleton. Because the outer singleton is bounded, this
1292    /// is deterministic because there is only a single immutable version.
1293    pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1294        self,
1295        tick: &Tick<L2>,
1296    ) -> Singleton<T, Tick<L2>, Bounded>
1297    where
1298        B: IsBounded,
1299        T: Clone,
1300    {
1301        // TODO(shadaj): avoid printing simulator logs for this snapshot
1302        let inner = self.snapshot(
1303            tick,
1304            nondet!(/** bounded top-level singleton so deterministic */),
1305        );
1306        Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1307    }
1308
1309    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1310    ///
1311    /// # Example
1312    /// ```rust
1313    /// # #[cfg(feature = "deploy")] {
1314    /// # use hydro_lang::prelude::*;
1315    /// # use futures::StreamExt;
1316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1317    /// let tick = process.tick();
1318    /// let batch_input = process
1319    ///   .source_iter(q!(vec![123, 456]))
1320    ///   .batch(&tick, nondet!(/** test */));
1321    /// batch_input.clone().chain(
1322    ///   batch_input.count().into_stream()
1323    /// ).all_ticks()
1324    /// # }, |mut stream| async move {
1325    /// // [123, 456, 2]
1326    /// # for w in vec![123, 456, 2] {
1327    /// #     assert_eq!(stream.next().await.unwrap(), w);
1328    /// # }
1329    /// # }));
1330    /// # }
1331    /// ```
1332    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1333    where
1334        B: IsBounded,
1335    {
1336        Stream::new(
1337            self.location.clone(),
1338            HydroNode::Cast {
1339                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1340                metadata: self.location.new_node_metadata(Stream::<
1341                    T,
1342                    Tick<L>,
1343                    Bounded,
1344                    TotalOrder,
1345                    ExactlyOnce,
1346                >::collection_kind()),
1347            },
1348        )
1349    }
1350
1351    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1352    /// producing a singleton of the resolved output.
1353    ///
1354    /// This is useful when the singleton contains an async computation that must
1355    /// be awaited before further processing. The future is polled to completion
1356    /// before the output value is emitted.
1357    ///
1358    /// # Example
1359    /// ```rust
1360    /// # #[cfg(feature = "deploy")] {
1361    /// # use hydro_lang::prelude::*;
1362    /// # use futures::StreamExt;
1363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1364    /// let tick = process.tick();
1365    /// let singleton = tick.singleton(q!(5));
1366    /// singleton
1367    ///     .map(q!(|v| async move { v * 2 }))
1368    ///     .resolve_future_blocking()
1369    ///     .all_ticks()
1370    /// # }, |mut stream| async move {
1371    /// // 10
1372    /// # assert_eq!(stream.next().await.unwrap(), 10);
1373    /// # }));
1374    /// # }
1375    /// ```
1376    pub fn resolve_future_blocking(
1377        self,
1378    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1379    where
1380        T: Future,
1381        B: IsBounded,
1382    {
1383        Singleton::new(
1384            self.location.clone(),
1385            HydroNode::ResolveFuturesBlocking {
1386                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1387                metadata: self
1388                    .location
1389                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1390            },
1391        )
1392    }
1393}
1394
1395impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1396where
1397    L: Location<'a>,
1398{
1399    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1400    /// which will stream the value computed in _each_ tick as a separate stream element.
1401    ///
1402    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1403    /// producing one element in the output for each tick. This is useful for batched computations,
1404    /// where the results from each tick must be combined together.
1405    ///
1406    /// # Example
1407    /// ```rust
1408    /// # #[cfg(feature = "deploy")] {
1409    /// # use hydro_lang::prelude::*;
1410    /// # use futures::StreamExt;
1411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412    /// let tick = process.tick();
1413    /// # // ticks are lazy by default, forces the second tick to run
1414    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1415    /// # let batch_first_tick = process
1416    /// #   .source_iter(q!(vec![1]))
1417    /// #   .batch(&tick, nondet!(/** test */));
1418    /// # let batch_second_tick = process
1419    /// #   .source_iter(q!(vec![1, 2, 3]))
1420    /// #   .batch(&tick, nondet!(/** test */))
1421    /// #   .defer_tick(); // appears on the second tick
1422    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1423    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1424    ///     .count()
1425    ///     .all_ticks()
1426    /// # }, |mut stream| async move {
1427    /// // [1, 3]
1428    /// # for w in vec![1, 3] {
1429    /// #     assert_eq!(stream.next().await.unwrap(), w);
1430    /// # }
1431    /// # }));
1432    /// # }
1433    /// ```
1434    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1435        self.into_stream().all_ticks()
1436    }
1437
1438    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1439    /// which will stream the value computed in _each_ tick as a separate stream element.
1440    ///
1441    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1442    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1443    /// singleton's [`Tick`] context.
1444    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1445        self.into_stream().all_ticks_atomic()
1446    }
1447
1448    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1449    /// be asynchronously updated with the latest value of the singleton inside the tick.
1450    ///
1451    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1452    /// tick that tracks the inner value. This is useful for getting the value as of the
1453    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1454    ///
1455    /// # Example
1456    /// ```rust
1457    /// # #[cfg(feature = "deploy")] {
1458    /// # use hydro_lang::prelude::*;
1459    /// # use futures::StreamExt;
1460    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461    /// let tick = process.tick();
1462    /// # // ticks are lazy by default, forces the second tick to run
1463    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1464    /// # let batch_first_tick = process
1465    /// #   .source_iter(q!(vec![1]))
1466    /// #   .batch(&tick, nondet!(/** test */));
1467    /// # let batch_second_tick = process
1468    /// #   .source_iter(q!(vec![1, 2, 3]))
1469    /// #   .batch(&tick, nondet!(/** test */))
1470    /// #   .defer_tick(); // appears on the second tick
1471    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1472    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1473    ///     .count()
1474    ///     .latest()
1475    /// # .sample_eager(nondet!(/** test */))
1476    /// # }, |mut stream| async move {
1477    /// // asynchronously changes from 1 ~> 3
1478    /// # for w in vec![1, 3] {
1479    /// #     assert_eq!(stream.next().await.unwrap(), w);
1480    /// # }
1481    /// # }));
1482    /// # }
1483    /// ```
1484    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1485        Singleton::new(
1486            self.location.outer().clone(),
1487            HydroNode::YieldConcat {
1488                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1489                metadata: self
1490                    .location
1491                    .outer()
1492                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1493            },
1494        )
1495    }
1496
1497    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1498    /// be updated with the latest value of the singleton inside the tick.
1499    ///
1500    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1501    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1502    /// singleton's [`Tick`] context.
1503    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1504        let out_location = Atomic {
1505            tick: self.location.clone(),
1506        };
1507        Singleton::new(
1508            out_location.clone(),
1509            HydroNode::YieldConcat {
1510                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1511                metadata: out_location
1512                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1513            },
1514        )
1515    }
1516}
1517
1518#[doc(hidden)]
1519/// Helper trait that determines the output collection type for [`Singleton::zip`].
1520///
1521/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1522/// [`Singleton`].
1523#[sealed::sealed]
1524pub trait ZipResult<'a, Other> {
1525    /// The output collection type.
1526    type Out;
1527    /// The type of the tupled output value.
1528    type ElementType;
1529    /// The type of the other collection's value.
1530    type OtherType;
1531    /// The location where the tupled result will be materialized.
1532    type Location: Location<'a>;
1533
1534    /// The location of the second input to the `zip`.
1535    fn other_location(other: &Other) -> Self::Location;
1536    /// The IR node of the second input to the `zip`.
1537    fn other_ir_node(other: Other) -> HydroNode;
1538
1539    /// Constructs the output live collection given an IR node containing the zip result.
1540    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1541}
1542
1543#[sealed::sealed]
1544impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1545where
1546    L: Location<'a>,
1547{
1548    type Out = Singleton<(T, U), L, B>;
1549    type ElementType = (T, U);
1550    type OtherType = U;
1551    type Location = L;
1552
1553    fn other_location(other: &Singleton<U, L, B>) -> L {
1554        other.location.clone()
1555    }
1556
1557    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1558        other.ir_node.replace(HydroNode::Placeholder)
1559    }
1560
1561    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1562        Singleton::new(
1563            location.clone(),
1564            HydroNode::Cast {
1565                inner: Box::new(ir_node),
1566                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1567            },
1568        )
1569    }
1570}
1571
1572#[sealed::sealed]
1573impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1574    for Singleton<T, L, B>
1575where
1576    L: Location<'a>,
1577{
1578    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1579    type ElementType = (T, U);
1580    type OtherType = U;
1581    type Location = L;
1582
1583    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1584        other.location.clone()
1585    }
1586
1587    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1588        other.ir_node.replace(HydroNode::Placeholder)
1589    }
1590
1591    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1592        Optional::new(location, ir_node)
1593    }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598    #[cfg(feature = "deploy")]
1599    use futures::{SinkExt, StreamExt};
1600    #[cfg(feature = "deploy")]
1601    use hydro_deploy::Deployment;
1602    #[cfg(any(feature = "deploy", feature = "sim"))]
1603    use stageleft::q;
1604
1605    #[cfg(any(feature = "deploy", feature = "sim"))]
1606    use crate::compile::builder::FlowBuilder;
1607    #[cfg(feature = "deploy")]
1608    use crate::live_collections::stream::ExactlyOnce;
1609    #[cfg(any(feature = "deploy", feature = "sim"))]
1610    use crate::location::Location;
1611    #[cfg(any(feature = "deploy", feature = "sim"))]
1612    use crate::nondet::nondet;
1613
1614    #[cfg(feature = "deploy")]
1615    #[tokio::test]
1616    async fn tick_cycle_cardinality() {
1617        let mut deployment = Deployment::new();
1618
1619        let mut flow = FlowBuilder::new();
1620        let node = flow.process::<()>();
1621        let external = flow.external::<()>();
1622
1623        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1624
1625        let node_tick = node.tick();
1626        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1627        let counts = singleton
1628            .clone()
1629            .into_stream()
1630            .count()
1631            .filter_if(
1632                input
1633                    .batch(&node_tick, nondet!(/** testing */))
1634                    .first()
1635                    .is_some(),
1636            )
1637            .all_ticks()
1638            .send_bincode_external(&external);
1639        complete_cycle.complete_next_tick(singleton);
1640
1641        let nodes = flow
1642            .with_process(&node, deployment.Localhost())
1643            .with_external(&external, deployment.Localhost())
1644            .deploy(&mut deployment);
1645
1646        deployment.deploy().await.unwrap();
1647
1648        let mut tick_trigger = nodes.connect(input_send).await;
1649        let mut external_out = nodes.connect(counts).await;
1650
1651        deployment.start().await.unwrap();
1652
1653        tick_trigger.send(()).await.unwrap();
1654
1655        assert_eq!(external_out.next().await.unwrap(), 1);
1656
1657        tick_trigger.send(()).await.unwrap();
1658
1659        assert_eq!(external_out.next().await.unwrap(), 1);
1660    }
1661
1662    #[cfg(feature = "sim")]
1663    #[test]
1664    #[should_panic]
1665    fn sim_fold_intermediate_states() {
1666        let mut flow = FlowBuilder::new();
1667        let node = flow.process::<()>();
1668
1669        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1670        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1671
1672        let tick = node.tick();
1673        let batch = folded.snapshot(&tick, nondet!(/** test */));
1674        let out_recv = batch.all_ticks().sim_output();
1675
1676        flow.sim().exhaustive(async || {
1677            assert_eq!(out_recv.next().await.unwrap(), 10);
1678        });
1679    }
1680
1681    #[cfg(feature = "sim")]
1682    #[test]
1683    fn sim_fold_intermediate_state_count() {
1684        let mut flow = FlowBuilder::new();
1685        let node = flow.process::<()>();
1686
1687        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1688        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1689
1690        let tick = node.tick();
1691        let batch = folded.snapshot(&tick, nondet!(/** test */));
1692        let out_recv = batch.all_ticks().sim_output();
1693
1694        let instance_count = flow.sim().exhaustive(async || {
1695            let out = out_recv.collect::<Vec<_>>().await;
1696            assert_eq!(out.last(), Some(&10));
1697        });
1698
1699        assert_eq!(
1700            instance_count,
1701            16 // 2^4 possible subsets of intermediates (including initial state)
1702        )
1703    }
1704
1705    #[cfg(feature = "sim")]
1706    #[test]
1707    fn sim_fold_no_repeat_initial() {
1708        // check that we don't repeat the initial state of the fold in autonomous decisions
1709
1710        let mut flow = FlowBuilder::new();
1711        let node = flow.process::<()>();
1712
1713        let (in_port, input) = node.sim_input();
1714        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1715
1716        let tick = node.tick();
1717        let batch = folded.snapshot(&tick, nondet!(/** test */));
1718        let out_recv = batch.all_ticks().sim_output();
1719
1720        flow.sim().exhaustive(async || {
1721            assert_eq!(out_recv.next().await.unwrap(), 0);
1722
1723            in_port.send(123);
1724
1725            assert_eq!(out_recv.next().await.unwrap(), 123);
1726        });
1727    }
1728
1729    #[cfg(feature = "sim")]
1730    #[test]
1731    #[should_panic]
1732    fn sim_fold_repeats_snapshots() {
1733        // when the tick is driven by a snapshot AND something else, the snapshot can
1734        // "stutter" and repeat the same state multiple times
1735
1736        let mut flow = FlowBuilder::new();
1737        let node = flow.process::<()>();
1738
1739        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1740        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1741
1742        let tick = node.tick();
1743        let batch = source
1744            .batch(&tick, nondet!(/** test */))
1745            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1746        let out_recv = batch.all_ticks().sim_output();
1747
1748        flow.sim().exhaustive(async || {
1749            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1750            {
1751                panic!("repeated snapshot");
1752            }
1753        });
1754    }
1755
1756    #[cfg(feature = "sim")]
1757    #[test]
1758    fn sim_fold_repeats_snapshots_count() {
1759        // check the number of instances
1760        let mut flow = FlowBuilder::new();
1761        let node = flow.process::<()>();
1762
1763        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1764        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1765
1766        let tick = node.tick();
1767        let batch = source
1768            .batch(&tick, nondet!(/** test */))
1769            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1770        let out_recv = batch.all_ticks().sim_output();
1771
1772        let count = flow.sim().exhaustive(async || {
1773            let _ = out_recv.collect::<Vec<_>>().await;
1774        });
1775
1776        assert_eq!(count, 52);
1777        // don't have a combinatorial explanation for this number yet, but checked via logs
1778    }
1779
1780    #[cfg(feature = "sim")]
1781    #[test]
1782    fn sim_top_level_singleton_exhaustive() {
1783        // ensures that top-level singletons have only one snapshot
1784        let mut flow = FlowBuilder::new();
1785        let node = flow.process::<()>();
1786
1787        let singleton = node.singleton(q!(1));
1788        let tick = node.tick();
1789        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1790        let out_recv = batch.all_ticks().sim_output();
1791
1792        let count = flow.sim().exhaustive(async || {
1793            let _ = out_recv.collect::<Vec<_>>().await;
1794        });
1795
1796        assert_eq!(count, 1);
1797    }
1798
1799    #[cfg(feature = "sim")]
1800    #[test]
1801    fn sim_top_level_singleton_join_count() {
1802        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1803        // exploration
1804
1805        let mut flow = FlowBuilder::new();
1806        let node = flow.process::<()>();
1807
1808        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1809        let tick = node.tick();
1810        let batch = source_iter
1811            .batch(&tick, nondet!(/** test */))
1812            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1813        let out_recv = batch.all_ticks().sim_output();
1814
1815        let instance_count = flow.sim().exhaustive(async || {
1816            let _ = out_recv.collect::<Vec<_>>().await;
1817        });
1818
1819        assert_eq!(
1820            instance_count,
1821            16 // 2^4 ways to split up (including a possibly empty first batch)
1822        )
1823    }
1824
1825    #[cfg(feature = "sim")]
1826    #[test]
1827    fn top_level_singleton_into_stream_no_replay() {
1828        let mut flow = FlowBuilder::new();
1829        let node = flow.process::<()>();
1830
1831        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1832        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1833
1834        let out_recv = folded.into_stream().sim_output();
1835
1836        flow.sim().exhaustive(async || {
1837            out_recv.assert_yields_only([10]).await;
1838        });
1839    }
1840
1841    #[cfg(feature = "sim")]
1842    #[test]
1843    fn inside_tick_singleton_zip() {
1844        use crate::live_collections::Stream;
1845        use crate::live_collections::sliced::sliced;
1846
1847        let mut flow = FlowBuilder::new();
1848        let node = flow.process::<()>();
1849
1850        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1851        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1852
1853        let out_recv = sliced! {
1854            let v = use(folded, nondet!(/** test */));
1855            v.clone().zip(v).into_stream()
1856        }
1857        .sim_output();
1858
1859        let count = flow.sim().exhaustive(async || {
1860            let out = out_recv.collect::<Vec<_>>().await;
1861            assert_eq!(out.last(), Some(&(3, 3)));
1862        });
1863
1864        assert_eq!(count, 4);
1865    }
1866
1867    /// Reproducer for simulator hang when using cross_singleton on a top-level
1868    /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1869    /// after the first iteration.
1870    #[cfg(feature = "sim")]
1871    #[test]
1872    fn sim_cross_singleton_top_level_unbounded_hang() {
1873        let mut flow = FlowBuilder::new();
1874        let node = flow.process::<()>();
1875
1876        let (cmd_port, input) = node.sim_input::<String, _, _>();
1877
1878        let top_level_singleton = node.singleton(q!(123));
1879
1880        // cross_singleton on a top-level stream - bug trigger
1881        let crossed = input.cross_singleton(top_level_singleton);
1882
1883        // Output directly
1884        let resp_port = crossed.sim_output();
1885
1886        let count = flow.sim().exhaustive(async || {
1887            cmd_port.send("abc".to_owned());
1888
1889            let responses: Vec<_> = resp_port.collect().await;
1890            assert!(!responses.is_empty());
1891        });
1892
1893        assert_eq!(count, 1);
1894    }
1895
1896    #[cfg(feature = "sim")]
1897    #[test]
1898    fn sim_top_level_singleton_state_count() {
1899        let mut flow = FlowBuilder::new();
1900        let process = flow.process::<()>();
1901
1902        let (cmd_port, input) = process.sim_input();
1903        {
1904            // increases exhaustive inputs from 1 to 2 before we optimized `From`
1905            use super::Singleton;
1906            use crate::live_collections::boundedness::Unbounded;
1907            let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1908        }
1909        let tick = process.tick();
1910        let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1911        let resp_port = batched_unbatched.sim_output();
1912
1913        let count = flow.sim().exhaustive(async || {
1914            cmd_port.send(());
1915            let _responses: Vec<_> = resp_port.collect().await;
1916        });
1917
1918        assert_eq!(count, 1);
1919    }
1920}