hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a>,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let location = value.location().clone();
137 Singleton::new(
138 location.clone(),
139 HydroNode::UnboundSingleton {
140 inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
141 metadata: location
142 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
143 },
144 )
145 }
146}
147
148impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
149where
150 L: Location<'a>,
151{
152 type Location = Tick<L>;
153
154 fn location(&self) -> &Self::Location {
155 self.location()
156 }
157
158 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
159 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
160 location.clone(),
161 HydroNode::DeferTick {
162 input: Box::new(HydroNode::CycleSource {
163 cycle_id,
164 metadata: location.new_node_metadata(Self::collection_kind()),
165 }),
166 metadata: location
167 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
168 },
169 );
170
171 from_previous_tick.unwrap_or(initial)
172 }
173}
174
175impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
176where
177 L: Location<'a>,
178{
179 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
180 assert_eq!(
181 Location::id(&self.location),
182 expected_location,
183 "locations do not match"
184 );
185 self.location
186 .flow_state()
187 .borrow_mut()
188 .push_root(HydroRoot::CycleSink {
189 cycle_id,
190 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
191 op_metadata: HydroIrOpMetadata::new(),
192 });
193 }
194}
195
196impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
197where
198 L: Location<'a>,
199{
200 type Location = L;
201
202 fn create_source(cycle_id: CycleId, location: L) -> Self {
203 Singleton::new(
204 location.clone(),
205 HydroNode::CycleSource {
206 cycle_id,
207 metadata: location.new_node_metadata(Self::collection_kind()),
208 },
209 )
210 }
211}
212
213impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
214where
215 L: Location<'a>,
216{
217 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
218 assert_eq!(
219 Location::id(&self.location),
220 expected_location,
221 "locations do not match"
222 );
223 self.location
224 .flow_state()
225 .borrow_mut()
226 .push_root(HydroRoot::CycleSink {
227 cycle_id,
228 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
229 op_metadata: HydroIrOpMetadata::new(),
230 });
231 }
232}
233
234impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
235where
236 T: Clone,
237 L: Location<'a>,
238{
239 fn clone(&self) -> Self {
240 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
241 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
242 *self.ir_node.borrow_mut() = HydroNode::Tee {
243 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
244 metadata: self.location.new_node_metadata(Self::collection_kind()),
245 };
246 }
247
248 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
249 Singleton {
250 location: self.location.clone(),
251 flow_state: self.flow_state.clone(),
252 ir_node: HydroNode::Tee {
253 inner: SharedNode(inner.0.clone()),
254 metadata: metadata.clone(),
255 }
256 .into(),
257 _phantom: PhantomData,
258 }
259 } else {
260 unreachable!()
261 }
262 }
263}
264
265#[cfg(stageleft_runtime)]
266fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
267 me: Singleton<T, Tick<L>, B>,
268 other: Optional<O, Tick<L>, B::UnderlyingBound>,
269) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
270 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
271 super::optional::zip_inside_tick(me_as_optional, other)
272}
273
274impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
275where
276 L: Location<'a>,
277{
278 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
279 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
280 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
281 let flow_state = location.flow_state().clone();
282 Singleton {
283 location,
284 flow_state,
285 ir_node: RefCell::new(ir_node),
286 _phantom: PhantomData,
287 }
288 }
289
290 pub(crate) fn collection_kind() -> CollectionKind {
291 CollectionKind::Singleton {
292 bound: B::bound_kind(),
293 element_type: stageleft::quote_type::<T>().into(),
294 }
295 }
296
297 /// Returns the [`Location`] where this singleton is being materialized.
298 pub fn location(&self) -> &L {
299 &self.location
300 }
301
302 /// Creates a lightweight reference handle to this singleton that can be captured
303 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
304 ///
305 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
306 ///
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(async {
312 /// # let mut deployment = hydro_deploy::Deployment::new();
313 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
314 /// # let process = builder.process::<()>();
315 /// # let external = builder.external::<()>();
316 /// let my_count = process
317 /// .source_iter(q!(0..5i32))
318 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
319 /// let count_ref = my_count.by_ref();
320 /// let out_port = process
321 /// .source_iter(q!(1..=3i32))
322 /// .map(q!(|x| x + *count_ref))
323 /// .send_bincode_external(&external);
324 /// # let nodes = builder
325 /// # .with_default_optimize()
326 /// # .with_process(&process, deployment.Localhost())
327 /// # .with_external(&external, deployment.Localhost())
328 /// # .deploy(&mut deployment);
329 /// # deployment.deploy().await.unwrap();
330 /// # let mut out_recv = nodes.connect(out_port).await;
331 /// # deployment.start().await.unwrap();
332 /// # let mut results = Vec::new();
333 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
334 /// # results.sort();
335 /// // fold(0..5) = 10, so results are 11, 12, 13
336 /// # assert_eq!(results, vec![11, 12, 13]);
337 /// # });
338 /// # }
339 /// ```
340 pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, '_, T, L>
341 where
342 B: IsBounded,
343 {
344 crate::singleton_ref::SingletonRef::new(&self.ir_node)
345 }
346
347 /// Weakens the consistency of this live collection to not guarantee any consistency across
348 /// cluster members (if this collection is on a cluster).
349 pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
350 where
351 L: Location<'a>,
352 {
353 if L::consistency()
354 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
355 {
356 // already no consistency
357 Singleton::new(
358 self.location.drop_consistency(),
359 self.ir_node.replace(HydroNode::Placeholder),
360 )
361 } else {
362 Singleton::new(
363 self.location.drop_consistency(),
364 HydroNode::Cast {
365 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366 metadata:
367 self.location
368 .clone()
369 .drop_consistency()
370 .new_node_metadata(
371 Singleton::<T, L::DropConsistency, B>::collection_kind(),
372 ),
373 },
374 )
375 }
376 }
377
378 /// Casts this live collection to have the consistency guarantees specified in the given
379 /// location type parameter. The developer must ensure that the strengthened consistency
380 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
381 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
382 self,
383 _proof: impl crate::properties::ConsistencyProof,
384 ) -> Singleton<T, L2, B>
385 where
386 L: Location<'a>,
387 {
388 if L::consistency() == L2::consistency() {
389 Singleton::new(
390 self.location.with_consistency_of(),
391 self.ir_node.replace(HydroNode::Placeholder),
392 )
393 } else {
394 Singleton::new(
395 self.location.with_consistency_of(),
396 HydroNode::AssertIsConsistent {
397 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
398 trusted: false,
399 metadata: self
400 .location
401 .clone()
402 .with_consistency_of::<L2>()
403 .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
404 },
405 )
406 }
407 }
408
409 /// Drops the monotonicity property of the [`Singleton`].
410 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
411 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
412 Singleton::new(
413 self.location.clone(),
414 self.ir_node.replace(HydroNode::Placeholder),
415 )
416 } else {
417 Singleton::new(
418 self.location.clone(),
419 HydroNode::Cast {
420 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
421 metadata:
422 self.location.new_node_metadata(
423 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
424 ),
425 },
426 )
427 }
428 }
429
430 /// Transforms the singleton value by applying a function `f` to it,
431 /// continuously as the input is updated.
432 ///
433 /// # Example
434 /// ```rust
435 /// # #[cfg(feature = "deploy")] {
436 /// # use hydro_lang::prelude::*;
437 /// # use futures::StreamExt;
438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439 /// let tick = process.tick();
440 /// let singleton = tick.singleton(q!(5));
441 /// singleton.map(q!(|v| v * 2)).all_ticks()
442 /// # }, |mut stream| async move {
443 /// // 10
444 /// # assert_eq!(stream.next().await.unwrap(), 10);
445 /// # }));
446 /// # }
447 /// ```
448 pub fn map<U, F, OP, B2: SingletonBound>(
449 self,
450 f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
451 ) -> Singleton<U, L, B2>
452 where
453 F: Fn(T) -> U + 'a,
454 B: ApplyOrderPreservingSingleton<OP, B2>,
455 {
456 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
457 proof.register_proof(&f);
458 let f = f.into();
459 Singleton::new(
460 self.location.clone(),
461 HydroNode::Map {
462 f,
463 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
464 metadata: self
465 .location
466 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
467 },
468 )
469 }
470
471 /// Transforms the singleton value by applying a function `f` to it and then flattening
472 /// the result into a stream, preserving the order of elements.
473 ///
474 /// The function `f` is applied to the singleton value to produce an iterator, and all items
475 /// from that iterator are emitted in the output stream in deterministic order.
476 ///
477 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
478 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
479 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
480 ///
481 /// # Example
482 /// ```rust
483 /// # #[cfg(feature = "deploy")] {
484 /// # use hydro_lang::prelude::*;
485 /// # use futures::StreamExt;
486 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
487 /// let tick = process.tick();
488 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
489 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
490 /// # }, |mut stream| async move {
491 /// // 1, 2, 3
492 /// # for w in vec![1, 2, 3] {
493 /// # assert_eq!(stream.next().await.unwrap(), w);
494 /// # }
495 /// # }));
496 /// # }
497 /// ```
498 pub fn flat_map_ordered<U, I, F>(
499 self,
500 f: impl IntoQuotedMut<'a, F, L>,
501 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
502 where
503 B: IsBounded,
504 I: IntoIterator<Item = U>,
505 F: Fn(T) -> I + 'a,
506 {
507 self.into_stream().flat_map_ordered(f)
508 }
509
510 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
511 /// for the output type `I` to produce items in any order.
512 ///
513 /// The function `f` is applied to the singleton value to produce an iterator, and all items
514 /// from that iterator are emitted in the output stream in non-deterministic order.
515 ///
516 /// # Example
517 /// ```rust
518 /// # #[cfg(feature = "deploy")] {
519 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520 /// # use futures::StreamExt;
521 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
522 /// let tick = process.tick();
523 /// let singleton = tick.singleton(q!(
524 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
525 /// ));
526 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
527 /// # }, |mut stream| async move {
528 /// // 1, 2, 3, but in no particular order
529 /// # let mut results = Vec::new();
530 /// # for _ in 0..3 {
531 /// # results.push(stream.next().await.unwrap());
532 /// # }
533 /// # results.sort();
534 /// # assert_eq!(results, vec![1, 2, 3]);
535 /// # }));
536 /// # }
537 /// ```
538 pub fn flat_map_unordered<U, I, F>(
539 self,
540 f: impl IntoQuotedMut<'a, F, L>,
541 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
542 where
543 B: IsBounded,
544 I: IntoIterator<Item = U>,
545 F: Fn(T) -> I + 'a,
546 {
547 self.into_stream().flat_map_unordered(f)
548 }
549
550 /// Flattens the singleton value into a stream, preserving the order of elements.
551 ///
552 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
553 /// are emitted in the output stream in deterministic order.
554 ///
555 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
556 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
557 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
558 ///
559 /// # Example
560 /// ```rust
561 /// # #[cfg(feature = "deploy")] {
562 /// # use hydro_lang::prelude::*;
563 /// # use futures::StreamExt;
564 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565 /// let tick = process.tick();
566 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
567 /// singleton.flatten_ordered().all_ticks()
568 /// # }, |mut stream| async move {
569 /// // 1, 2, 3
570 /// # for w in vec![1, 2, 3] {
571 /// # assert_eq!(stream.next().await.unwrap(), w);
572 /// # }
573 /// # }));
574 /// # }
575 /// ```
576 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
577 where
578 B: IsBounded,
579 T: IntoIterator<Item = U>,
580 {
581 self.flat_map_ordered(q!(|x| x))
582 }
583
584 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
585 /// for the element type `T` to produce items in any order.
586 ///
587 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
588 /// are emitted in the output stream in non-deterministic order.
589 ///
590 /// # Example
591 /// ```rust
592 /// # #[cfg(feature = "deploy")] {
593 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
594 /// # use futures::StreamExt;
595 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
596 /// let tick = process.tick();
597 /// let singleton = tick.singleton(q!(
598 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
599 /// ));
600 /// singleton.flatten_unordered().all_ticks()
601 /// # }, |mut stream| async move {
602 /// // 1, 2, 3, but in no particular order
603 /// # let mut results = Vec::new();
604 /// # for _ in 0..3 {
605 /// # results.push(stream.next().await.unwrap());
606 /// # }
607 /// # results.sort();
608 /// # assert_eq!(results, vec![1, 2, 3]);
609 /// # }));
610 /// # }
611 /// ```
612 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
613 where
614 B: IsBounded,
615 T: IntoIterator<Item = U>,
616 {
617 self.flat_map_unordered(q!(|x| x))
618 }
619
620 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
621 ///
622 /// If the predicate returns `true`, the output optional contains the same value.
623 /// If the predicate returns `false`, the output optional is empty.
624 ///
625 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
626 /// not modify or take ownership of the value. If you need to modify the value while filtering
627 /// use [`Singleton::filter_map`] instead.
628 ///
629 /// # Example
630 /// ```rust
631 /// # #[cfg(feature = "deploy")] {
632 /// # use hydro_lang::prelude::*;
633 /// # use futures::StreamExt;
634 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
635 /// let tick = process.tick();
636 /// let singleton = tick.singleton(q!(5));
637 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
638 /// # }, |mut stream| async move {
639 /// // 5
640 /// # assert_eq!(stream.next().await.unwrap(), 5);
641 /// # }));
642 /// # }
643 /// ```
644 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
645 where
646 F: Fn(&T) -> bool + 'a,
647 {
648 let f = f.splice_fn1_borrow_ctx(&self.location).into();
649 Optional::new(
650 self.location.clone(),
651 HydroNode::Filter {
652 f,
653 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
654 metadata: self
655 .location
656 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
657 },
658 )
659 }
660
661 /// An operator that both filters and maps. It yields the value only if the supplied
662 /// closure `f` returns `Some(value)`.
663 ///
664 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
665 /// If the closure returns `None`, the output optional is empty.
666 ///
667 /// # Example
668 /// ```rust
669 /// # #[cfg(feature = "deploy")] {
670 /// # use hydro_lang::prelude::*;
671 /// # use futures::StreamExt;
672 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673 /// let tick = process.tick();
674 /// let singleton = tick.singleton(q!("42"));
675 /// singleton
676 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
677 /// .all_ticks()
678 /// # }, |mut stream| async move {
679 /// // 42
680 /// # assert_eq!(stream.next().await.unwrap(), 42);
681 /// # }));
682 /// # }
683 /// ```
684 pub fn filter_map<U, F>(
685 self,
686 f: impl IntoQuotedMut<'a, F, L>,
687 ) -> Optional<U, L, B::UnderlyingBound>
688 where
689 F: Fn(T) -> Option<U> + 'a,
690 {
691 let f = f.splice_fn1_ctx(&self.location).into();
692 Optional::new(
693 self.location.clone(),
694 HydroNode::FilterMap {
695 f,
696 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697 metadata: self
698 .location
699 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
700 },
701 )
702 }
703
704 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
705 ///
706 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
707 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
708 /// non-null. This is useful for combining several pieces of state together.
709 ///
710 /// # Example
711 /// ```rust
712 /// # #[cfg(feature = "deploy")] {
713 /// # use hydro_lang::prelude::*;
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
716 /// let tick = process.tick();
717 /// let numbers = process
718 /// .source_iter(q!(vec![123, 456]))
719 /// .batch(&tick, nondet!(/** test */));
720 /// let count = numbers.clone().count(); // Singleton
721 /// let max = numbers.max(); // Optional
722 /// count.zip(max).all_ticks()
723 /// # }, |mut stream| async move {
724 /// // [(2, 456)]
725 /// # for w in vec![(2, 456)] {
726 /// # assert_eq!(stream.next().await.unwrap(), w);
727 /// # }
728 /// # }));
729 /// # }
730 /// ```
731 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
732 where
733 Self: ZipResult<'a, O, Location = L>,
734 B: IsBounded,
735 {
736 check_matching_location(&self.location, &Self::other_location(&other));
737
738 if L::is_top_level()
739 && let Some(tick) = self.location.try_tick()
740 {
741 let self_location = self.location().clone();
742 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
743 let out = zip_inside_tick(
744 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
745 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
746 other_location.clone(),
747 HydroNode::Cast {
748 inner: Box::new(Self::other_ir_node(other)),
749 metadata: other_location.new_node_metadata(Optional::<
750 <Self as ZipResult<'a, O>>::OtherType,
751 Tick<L>,
752 Bounded,
753 >::collection_kind(
754 )),
755 },
756 )
757 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
758 )
759 .latest();
760
761 Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
762 } else {
763 Self::make(
764 self.location.clone(),
765 HydroNode::CrossSingleton {
766 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
767 right: Box::new(Self::other_ir_node(other)),
768 metadata: self.location.new_node_metadata(CollectionKind::Optional {
769 bound: B::BOUND_KIND,
770 element_type: stageleft::quote_type::<
771 <Self as ZipResult<'a, O>>::ElementType,
772 >()
773 .into(),
774 }),
775 },
776 )
777 }
778 }
779
780 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
781 /// boolean signal is `true`, otherwise the output is null.
782 ///
783 /// # Example
784 /// ```rust
785 /// # #[cfg(feature = "deploy")] {
786 /// # use hydro_lang::prelude::*;
787 /// # use futures::StreamExt;
788 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
789 /// let tick = process.tick();
790 /// // ticks are lazy by default, forces the second tick to run
791 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
792 ///
793 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
794 /// let batch_first_tick = process
795 /// .source_iter(q!(vec![1]))
796 /// .batch(&tick, nondet!(/** test */));
797 /// let batch_second_tick = process
798 /// .source_iter(q!(vec![1, 2, 3]))
799 /// .batch(&tick, nondet!(/** test */))
800 /// .defer_tick();
801 /// batch_first_tick.chain(batch_second_tick).count()
802 /// .filter_if(signal)
803 /// .all_ticks()
804 /// # }, |mut stream| async move {
805 /// // [1]
806 /// # for w in vec![1] {
807 /// # assert_eq!(stream.next().await.unwrap(), w);
808 /// # }
809 /// # }));
810 /// # }
811 /// ```
812 pub fn filter_if(
813 self,
814 signal: Singleton<bool, L, B>,
815 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
816 where
817 B: IsBounded,
818 {
819 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
820 }
821
822 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
823 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
824 ///
825 /// Useful for conditionally processing, such as only emitting a singleton's value outside
826 /// a tick if some other condition is satisfied.
827 ///
828 /// # Example
829 /// ```rust
830 /// # #[cfg(feature = "deploy")] {
831 /// # use hydro_lang::prelude::*;
832 /// # use futures::StreamExt;
833 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
834 /// let tick = process.tick();
835 /// // ticks are lazy by default, forces the second tick to run
836 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
837 ///
838 /// let batch_first_tick = process
839 /// .source_iter(q!(vec![1]))
840 /// .batch(&tick, nondet!(/** test */));
841 /// let batch_second_tick = process
842 /// .source_iter(q!(vec![1, 2, 3]))
843 /// .batch(&tick, nondet!(/** test */))
844 /// .defer_tick(); // appears on the second tick
845 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
846 /// batch_first_tick.chain(batch_second_tick).count()
847 /// .filter_if_some(some_on_first_tick)
848 /// .all_ticks()
849 /// # }, |mut stream| async move {
850 /// // [1]
851 /// # for w in vec![1] {
852 /// # assert_eq!(stream.next().await.unwrap(), w);
853 /// # }
854 /// # }));
855 /// # }
856 /// ```
857 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
858 pub fn filter_if_some<U>(
859 self,
860 signal: Optional<U, L, B>,
861 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
862 where
863 B: IsBounded,
864 {
865 self.filter_if(signal.is_some())
866 }
867
868 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
869 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
870 ///
871 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
872 /// the condition.
873 ///
874 /// # Example
875 /// ```rust
876 /// # #[cfg(feature = "deploy")] {
877 /// # use hydro_lang::prelude::*;
878 /// # use futures::StreamExt;
879 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880 /// let tick = process.tick();
881 /// // ticks are lazy by default, forces the second tick to run
882 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
883 ///
884 /// let batch_first_tick = process
885 /// .source_iter(q!(vec![1]))
886 /// .batch(&tick, nondet!(/** test */));
887 /// let batch_second_tick = process
888 /// .source_iter(q!(vec![1, 2, 3]))
889 /// .batch(&tick, nondet!(/** test */))
890 /// .defer_tick(); // appears on the second tick
891 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
892 /// batch_first_tick.chain(batch_second_tick).count()
893 /// .filter_if_none(some_on_first_tick)
894 /// .all_ticks()
895 /// # }, |mut stream| async move {
896 /// // [3]
897 /// # for w in vec![3] {
898 /// # assert_eq!(stream.next().await.unwrap(), w);
899 /// # }
900 /// # }));
901 /// # }
902 /// ```
903 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
904 pub fn filter_if_none<U>(
905 self,
906 other: Optional<U, L, B>,
907 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
908 where
909 B: IsBounded,
910 {
911 self.filter_if(other.is_none())
912 }
913
914 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
915 ///
916 /// # Example
917 /// ```rust
918 /// # #[cfg(feature = "deploy")] {
919 /// # use hydro_lang::prelude::*;
920 /// # use futures::StreamExt;
921 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922 /// let tick = process.tick();
923 /// let a = tick.singleton(q!(5));
924 /// let b = tick.singleton(q!(5));
925 /// a.equals(b).all_ticks()
926 /// # }, |mut stream| async move {
927 /// // [true]
928 /// # assert_eq!(stream.next().await.unwrap(), true);
929 /// # }));
930 /// # }
931 /// ```
932 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
933 where
934 T: PartialEq,
935 B: IsBounded,
936 {
937 self.zip(other).map(q!(|(a, b)| a == b))
938 }
939
940 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
941 /// greater than or equal to the provided threshold. The event will have the value of the
942 /// given threshold.
943 ///
944 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
945 /// the threshold would be non-deterministic.
946 ///
947 /// # Example
948 /// ```rust
949 /// # #[cfg(feature = "deploy")] {
950 /// # use hydro_lang::prelude::*;
951 /// # use futures::StreamExt;
952 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953 /// let a = // singleton 1 ~> 5 ~> 10
954 /// # process.singleton(q!(5));
955 /// let b = process.singleton(q!(4));
956 /// a.threshold_greater_or_equal(b)
957 /// # }, |mut stream| async move {
958 /// // [4]
959 /// # assert_eq!(stream.next().await.unwrap(), 4);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn threshold_greater_or_equal<B2: IsBounded>(
964 self,
965 threshold: Singleton<T, L, B2>,
966 ) -> Stream<T, L, B::UnderlyingBound>
967 where
968 T: Clone + PartialOrd,
969 B: IsMonotonic,
970 {
971 let threshold = threshold.make_bounded();
972 let self_location = self.location().clone();
973 match self.try_make_bounded() {
974 Ok(bounded) => {
975 let uncasted = threshold
976 .zip(bounded)
977 .into_stream()
978 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
979
980 Stream::new(
981 uncasted.location.clone(),
982 uncasted.ir_node.replace(HydroNode::Placeholder),
983 )
984 }
985 Err(me) => {
986 let uncasted = sliced! {
987 let me = use(me, nondet!(/** thresholds are deterministic */));
988 let mut remaining_threshold = use::state(|l| {
989 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
990 as_option
991 });
992
993 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
994 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
995 passed.map(q!(|(t, _)| t))
996 };
997
998 Stream::new(
999 self_location,
1000 uncasted.ir_node.replace(HydroNode::Placeholder),
1001 )
1002 }
1003 }
1004 }
1005
1006 /// An operator which allows you to "name" a `HydroNode`.
1007 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1008 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1009 {
1010 let mut node = self.ir_node.borrow_mut();
1011 let metadata = node.metadata_mut();
1012 metadata.tag = Some(name.to_owned());
1013 }
1014 self
1015 }
1016}
1017
1018impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1019 type Output = Singleton<bool, L, B::UnderlyingBound>;
1020
1021 fn not(self) -> Self::Output {
1022 self.map(q!(|b| !b))
1023 }
1024}
1025
1026impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1027where
1028 L: Location<'a>,
1029{
1030 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1031 /// the inner `Option`.
1032 ///
1033 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1034 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1035 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1036 ///
1037 /// # Example
1038 /// ```rust
1039 /// # #[cfg(feature = "deploy")] {
1040 /// # use hydro_lang::prelude::*;
1041 /// # use futures::StreamExt;
1042 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1043 /// let tick = process.tick();
1044 /// let singleton = tick.singleton(q!(Some(42)));
1045 /// singleton.into_optional().all_ticks()
1046 /// # }, |mut stream| async move {
1047 /// // 42
1048 /// # assert_eq!(stream.next().await.unwrap(), 42);
1049 /// # }));
1050 /// # }
1051 /// ```
1052 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1053 self.filter_map(q!(|v| v))
1054 }
1055}
1056
1057impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1058where
1059 L: Location<'a>,
1060{
1061 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1062 ///
1063 /// # Example
1064 /// ```rust
1065 /// # #[cfg(feature = "deploy")] {
1066 /// # use hydro_lang::prelude::*;
1067 /// # use futures::StreamExt;
1068 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1069 /// let tick = process.tick();
1070 /// // ticks are lazy by default, forces the second tick to run
1071 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1072 ///
1073 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1074 /// let b = tick.singleton(q!(true)); // true, true
1075 /// a.and(b).all_ticks()
1076 /// # }, |mut stream| async move {
1077 /// // [true, false]
1078 /// # for w in vec![true, false] {
1079 /// # assert_eq!(stream.next().await.unwrap(), w);
1080 /// # }
1081 /// # }));
1082 /// # }
1083 /// ```
1084 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1085 where
1086 B: IsBounded,
1087 {
1088 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1089 }
1090
1091 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1092 ///
1093 /// # Example
1094 /// ```rust
1095 /// # #[cfg(feature = "deploy")] {
1096 /// # use hydro_lang::prelude::*;
1097 /// # use futures::StreamExt;
1098 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099 /// let tick = process.tick();
1100 /// // ticks are lazy by default, forces the second tick to run
1101 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1102 ///
1103 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1104 /// let b = tick.singleton(q!(false)); // false, false
1105 /// a.or(b).all_ticks()
1106 /// # }, |mut stream| async move {
1107 /// // [true, false]
1108 /// # for w in vec![true, false] {
1109 /// # assert_eq!(stream.next().await.unwrap(), w);
1110 /// # }
1111 /// # }));
1112 /// # }
1113 /// ```
1114 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1115 where
1116 B: IsBounded,
1117 {
1118 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1119 }
1120}
1121
1122impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1123where
1124 L: Location<'a>,
1125{
1126 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1127 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1128 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1129 /// all snapshots of this singleton into the atomic-associated tick will observe the
1130 /// same value each tick.
1131 ///
1132 /// # Non-Determinism
1133 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1134 /// the output singleton has a non-deterministic value since the snapshot can be at an
1135 /// arbitrary point in time.
1136 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1137 self,
1138 tick: &Tick<L2>,
1139 _nondet: NonDet,
1140 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1141 Singleton::new(
1142 tick.drop_consistency(),
1143 HydroNode::Batch {
1144 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1145 metadata: tick
1146 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1147 },
1148 )
1149 }
1150
1151 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1152 /// to the value will be asynchronously propagated.
1153 pub fn end_atomic(self) -> Singleton<T, L, B> {
1154 Singleton::new(
1155 self.location.tick.l.clone(),
1156 HydroNode::EndAtomic {
1157 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1158 metadata: self
1159 .location
1160 .tick
1161 .l
1162 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1163 },
1164 )
1165 }
1166}
1167
1168impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1169where
1170 L: Location<'a>,
1171{
1172 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1173 /// will observe the same version of the value and will be executed synchronously before any
1174 /// outputs are yielded (in [`Optional::end_atomic`]).
1175 ///
1176 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1177 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1178 /// a different version).
1179 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1180 let id = self.location.flow_state().borrow_mut().next_clock_id();
1181 let out_location = Atomic {
1182 tick: Tick {
1183 id,
1184 l: self.location.clone(),
1185 },
1186 };
1187 Singleton::new(
1188 out_location.clone(),
1189 HydroNode::BeginAtomic {
1190 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1191 metadata: out_location
1192 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1193 },
1194 )
1195 }
1196
1197 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1198 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1199 /// relevant data that contributed to the snapshot at tick `t`.
1200 ///
1201 /// # Non-Determinism
1202 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1203 /// the output singleton has a non-deterministic value since the snapshot can be at an
1204 /// arbitrary point in time.
1205 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1206 self,
1207 tick: &Tick<L2>,
1208 _nondet: NonDet,
1209 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1210 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1211 Singleton::new(
1212 tick.drop_consistency(),
1213 HydroNode::Batch {
1214 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1215 metadata: tick
1216 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1217 },
1218 )
1219 }
1220
1221 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1222 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1223 ///
1224 /// # Non-Determinism
1225 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1226 /// to non-deterministic batching and arrival of inputs, the output stream is
1227 /// non-deterministic.
1228 pub fn sample_eager(
1229 self,
1230 nondet: NonDet,
1231 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1232 sliced! {
1233 let snapshot = use(self, nondet);
1234 snapshot.into_stream()
1235 }
1236 .weaken_retries()
1237 }
1238
1239 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1240 /// value taken at various points in time. Because the input singleton may be
1241 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1242 /// represent the value of the singleton given some prefix of the streams leading up to
1243 /// it.
1244 ///
1245 /// # Non-Determinism
1246 /// The output stream is non-deterministic in which elements are sampled, since this
1247 /// is controlled by a clock.
1248 pub fn sample_every(
1249 self,
1250 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1251 nondet: NonDet,
1252 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1253 where
1254 L: TopLevel<'a>,
1255 {
1256 let samples = self.location.source_interval(interval);
1257 sliced! {
1258 let snapshot = use(self, nondet);
1259 let sample_batch = use(samples, nondet);
1260
1261 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1262 }
1263 .weaken_retries()
1264 }
1265
1266 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1267 /// implies that `B == Bounded`.
1268 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1269 where
1270 B: IsBounded,
1271 {
1272 Singleton::new(
1273 self.location.clone(),
1274 self.ir_node.replace(HydroNode::Placeholder),
1275 )
1276 }
1277
1278 #[expect(clippy::result_large_err, reason = "internal use only")]
1279 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1280 if B::UnderlyingBound::BOUNDED {
1281 Ok(Singleton::new(
1282 self.location.clone(),
1283 self.ir_node.replace(HydroNode::Placeholder),
1284 ))
1285 } else {
1286 Err(self)
1287 }
1288 }
1289
1290 /// Clones this bounded singleton into a tick, returning a singleton that has the
1291 /// same value as the outer singleton. Because the outer singleton is bounded, this
1292 /// is deterministic because there is only a single immutable version.
1293 pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1294 self,
1295 tick: &Tick<L2>,
1296 ) -> Singleton<T, Tick<L2>, Bounded>
1297 where
1298 B: IsBounded,
1299 T: Clone,
1300 {
1301 // TODO(shadaj): avoid printing simulator logs for this snapshot
1302 let inner = self.snapshot(
1303 tick,
1304 nondet!(/** bounded top-level singleton so deterministic */),
1305 );
1306 Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1307 }
1308
1309 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1310 ///
1311 /// # Example
1312 /// ```rust
1313 /// # #[cfg(feature = "deploy")] {
1314 /// # use hydro_lang::prelude::*;
1315 /// # use futures::StreamExt;
1316 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1317 /// let tick = process.tick();
1318 /// let batch_input = process
1319 /// .source_iter(q!(vec![123, 456]))
1320 /// .batch(&tick, nondet!(/** test */));
1321 /// batch_input.clone().chain(
1322 /// batch_input.count().into_stream()
1323 /// ).all_ticks()
1324 /// # }, |mut stream| async move {
1325 /// // [123, 456, 2]
1326 /// # for w in vec![123, 456, 2] {
1327 /// # assert_eq!(stream.next().await.unwrap(), w);
1328 /// # }
1329 /// # }));
1330 /// # }
1331 /// ```
1332 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1333 where
1334 B: IsBounded,
1335 {
1336 Stream::new(
1337 self.location.clone(),
1338 HydroNode::Cast {
1339 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1340 metadata: self.location.new_node_metadata(Stream::<
1341 T,
1342 Tick<L>,
1343 Bounded,
1344 TotalOrder,
1345 ExactlyOnce,
1346 >::collection_kind()),
1347 },
1348 )
1349 }
1350
1351 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1352 /// producing a singleton of the resolved output.
1353 ///
1354 /// This is useful when the singleton contains an async computation that must
1355 /// be awaited before further processing. The future is polled to completion
1356 /// before the output value is emitted.
1357 ///
1358 /// # Example
1359 /// ```rust
1360 /// # #[cfg(feature = "deploy")] {
1361 /// # use hydro_lang::prelude::*;
1362 /// # use futures::StreamExt;
1363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1364 /// let tick = process.tick();
1365 /// let singleton = tick.singleton(q!(5));
1366 /// singleton
1367 /// .map(q!(|v| async move { v * 2 }))
1368 /// .resolve_future_blocking()
1369 /// .all_ticks()
1370 /// # }, |mut stream| async move {
1371 /// // 10
1372 /// # assert_eq!(stream.next().await.unwrap(), 10);
1373 /// # }));
1374 /// # }
1375 /// ```
1376 pub fn resolve_future_blocking(
1377 self,
1378 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1379 where
1380 T: Future,
1381 B: IsBounded,
1382 {
1383 Singleton::new(
1384 self.location.clone(),
1385 HydroNode::ResolveFuturesBlocking {
1386 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1387 metadata: self
1388 .location
1389 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1390 },
1391 )
1392 }
1393}
1394
1395impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1396where
1397 L: Location<'a>,
1398{
1399 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1400 /// which will stream the value computed in _each_ tick as a separate stream element.
1401 ///
1402 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1403 /// producing one element in the output for each tick. This is useful for batched computations,
1404 /// where the results from each tick must be combined together.
1405 ///
1406 /// # Example
1407 /// ```rust
1408 /// # #[cfg(feature = "deploy")] {
1409 /// # use hydro_lang::prelude::*;
1410 /// # use futures::StreamExt;
1411 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412 /// let tick = process.tick();
1413 /// # // ticks are lazy by default, forces the second tick to run
1414 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1415 /// # let batch_first_tick = process
1416 /// # .source_iter(q!(vec![1]))
1417 /// # .batch(&tick, nondet!(/** test */));
1418 /// # let batch_second_tick = process
1419 /// # .source_iter(q!(vec![1, 2, 3]))
1420 /// # .batch(&tick, nondet!(/** test */))
1421 /// # .defer_tick(); // appears on the second tick
1422 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1423 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1424 /// .count()
1425 /// .all_ticks()
1426 /// # }, |mut stream| async move {
1427 /// // [1, 3]
1428 /// # for w in vec![1, 3] {
1429 /// # assert_eq!(stream.next().await.unwrap(), w);
1430 /// # }
1431 /// # }));
1432 /// # }
1433 /// ```
1434 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1435 self.into_stream().all_ticks()
1436 }
1437
1438 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1439 /// which will stream the value computed in _each_ tick as a separate stream element.
1440 ///
1441 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1442 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1443 /// singleton's [`Tick`] context.
1444 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1445 self.into_stream().all_ticks_atomic()
1446 }
1447
1448 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1449 /// be asynchronously updated with the latest value of the singleton inside the tick.
1450 ///
1451 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1452 /// tick that tracks the inner value. This is useful for getting the value as of the
1453 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1454 ///
1455 /// # Example
1456 /// ```rust
1457 /// # #[cfg(feature = "deploy")] {
1458 /// # use hydro_lang::prelude::*;
1459 /// # use futures::StreamExt;
1460 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461 /// let tick = process.tick();
1462 /// # // ticks are lazy by default, forces the second tick to run
1463 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1464 /// # let batch_first_tick = process
1465 /// # .source_iter(q!(vec![1]))
1466 /// # .batch(&tick, nondet!(/** test */));
1467 /// # let batch_second_tick = process
1468 /// # .source_iter(q!(vec![1, 2, 3]))
1469 /// # .batch(&tick, nondet!(/** test */))
1470 /// # .defer_tick(); // appears on the second tick
1471 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1472 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1473 /// .count()
1474 /// .latest()
1475 /// # .sample_eager(nondet!(/** test */))
1476 /// # }, |mut stream| async move {
1477 /// // asynchronously changes from 1 ~> 3
1478 /// # for w in vec![1, 3] {
1479 /// # assert_eq!(stream.next().await.unwrap(), w);
1480 /// # }
1481 /// # }));
1482 /// # }
1483 /// ```
1484 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1485 Singleton::new(
1486 self.location.outer().clone(),
1487 HydroNode::YieldConcat {
1488 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1489 metadata: self
1490 .location
1491 .outer()
1492 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1493 },
1494 )
1495 }
1496
1497 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1498 /// be updated with the latest value of the singleton inside the tick.
1499 ///
1500 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1501 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1502 /// singleton's [`Tick`] context.
1503 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1504 let out_location = Atomic {
1505 tick: self.location.clone(),
1506 };
1507 Singleton::new(
1508 out_location.clone(),
1509 HydroNode::YieldConcat {
1510 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1511 metadata: out_location
1512 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1513 },
1514 )
1515 }
1516}
1517
1518#[doc(hidden)]
1519/// Helper trait that determines the output collection type for [`Singleton::zip`].
1520///
1521/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1522/// [`Singleton`].
1523#[sealed::sealed]
1524pub trait ZipResult<'a, Other> {
1525 /// The output collection type.
1526 type Out;
1527 /// The type of the tupled output value.
1528 type ElementType;
1529 /// The type of the other collection's value.
1530 type OtherType;
1531 /// The location where the tupled result will be materialized.
1532 type Location: Location<'a>;
1533
1534 /// The location of the second input to the `zip`.
1535 fn other_location(other: &Other) -> Self::Location;
1536 /// The IR node of the second input to the `zip`.
1537 fn other_ir_node(other: Other) -> HydroNode;
1538
1539 /// Constructs the output live collection given an IR node containing the zip result.
1540 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1541}
1542
1543#[sealed::sealed]
1544impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1545where
1546 L: Location<'a>,
1547{
1548 type Out = Singleton<(T, U), L, B>;
1549 type ElementType = (T, U);
1550 type OtherType = U;
1551 type Location = L;
1552
1553 fn other_location(other: &Singleton<U, L, B>) -> L {
1554 other.location.clone()
1555 }
1556
1557 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1558 other.ir_node.replace(HydroNode::Placeholder)
1559 }
1560
1561 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1562 Singleton::new(
1563 location.clone(),
1564 HydroNode::Cast {
1565 inner: Box::new(ir_node),
1566 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1567 },
1568 )
1569 }
1570}
1571
1572#[sealed::sealed]
1573impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1574 for Singleton<T, L, B>
1575where
1576 L: Location<'a>,
1577{
1578 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1579 type ElementType = (T, U);
1580 type OtherType = U;
1581 type Location = L;
1582
1583 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1584 other.location.clone()
1585 }
1586
1587 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1588 other.ir_node.replace(HydroNode::Placeholder)
1589 }
1590
1591 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1592 Optional::new(location, ir_node)
1593 }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 #[cfg(feature = "deploy")]
1599 use futures::{SinkExt, StreamExt};
1600 #[cfg(feature = "deploy")]
1601 use hydro_deploy::Deployment;
1602 #[cfg(any(feature = "deploy", feature = "sim"))]
1603 use stageleft::q;
1604
1605 #[cfg(any(feature = "deploy", feature = "sim"))]
1606 use crate::compile::builder::FlowBuilder;
1607 #[cfg(feature = "deploy")]
1608 use crate::live_collections::stream::ExactlyOnce;
1609 #[cfg(any(feature = "deploy", feature = "sim"))]
1610 use crate::location::Location;
1611 #[cfg(any(feature = "deploy", feature = "sim"))]
1612 use crate::nondet::nondet;
1613
1614 #[cfg(feature = "deploy")]
1615 #[tokio::test]
1616 async fn tick_cycle_cardinality() {
1617 let mut deployment = Deployment::new();
1618
1619 let mut flow = FlowBuilder::new();
1620 let node = flow.process::<()>();
1621 let external = flow.external::<()>();
1622
1623 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1624
1625 let node_tick = node.tick();
1626 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1627 let counts = singleton
1628 .clone()
1629 .into_stream()
1630 .count()
1631 .filter_if(
1632 input
1633 .batch(&node_tick, nondet!(/** testing */))
1634 .first()
1635 .is_some(),
1636 )
1637 .all_ticks()
1638 .send_bincode_external(&external);
1639 complete_cycle.complete_next_tick(singleton);
1640
1641 let nodes = flow
1642 .with_process(&node, deployment.Localhost())
1643 .with_external(&external, deployment.Localhost())
1644 .deploy(&mut deployment);
1645
1646 deployment.deploy().await.unwrap();
1647
1648 let mut tick_trigger = nodes.connect(input_send).await;
1649 let mut external_out = nodes.connect(counts).await;
1650
1651 deployment.start().await.unwrap();
1652
1653 tick_trigger.send(()).await.unwrap();
1654
1655 assert_eq!(external_out.next().await.unwrap(), 1);
1656
1657 tick_trigger.send(()).await.unwrap();
1658
1659 assert_eq!(external_out.next().await.unwrap(), 1);
1660 }
1661
1662 #[cfg(feature = "sim")]
1663 #[test]
1664 #[should_panic]
1665 fn sim_fold_intermediate_states() {
1666 let mut flow = FlowBuilder::new();
1667 let node = flow.process::<()>();
1668
1669 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1670 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1671
1672 let tick = node.tick();
1673 let batch = folded.snapshot(&tick, nondet!(/** test */));
1674 let out_recv = batch.all_ticks().sim_output();
1675
1676 flow.sim().exhaustive(async || {
1677 assert_eq!(out_recv.next().await.unwrap(), 10);
1678 });
1679 }
1680
1681 #[cfg(feature = "sim")]
1682 #[test]
1683 fn sim_fold_intermediate_state_count() {
1684 let mut flow = FlowBuilder::new();
1685 let node = flow.process::<()>();
1686
1687 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1688 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1689
1690 let tick = node.tick();
1691 let batch = folded.snapshot(&tick, nondet!(/** test */));
1692 let out_recv = batch.all_ticks().sim_output();
1693
1694 let instance_count = flow.sim().exhaustive(async || {
1695 let out = out_recv.collect::<Vec<_>>().await;
1696 assert_eq!(out.last(), Some(&10));
1697 });
1698
1699 assert_eq!(
1700 instance_count,
1701 16 // 2^4 possible subsets of intermediates (including initial state)
1702 )
1703 }
1704
1705 #[cfg(feature = "sim")]
1706 #[test]
1707 fn sim_fold_no_repeat_initial() {
1708 // check that we don't repeat the initial state of the fold in autonomous decisions
1709
1710 let mut flow = FlowBuilder::new();
1711 let node = flow.process::<()>();
1712
1713 let (in_port, input) = node.sim_input();
1714 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1715
1716 let tick = node.tick();
1717 let batch = folded.snapshot(&tick, nondet!(/** test */));
1718 let out_recv = batch.all_ticks().sim_output();
1719
1720 flow.sim().exhaustive(async || {
1721 assert_eq!(out_recv.next().await.unwrap(), 0);
1722
1723 in_port.send(123);
1724
1725 assert_eq!(out_recv.next().await.unwrap(), 123);
1726 });
1727 }
1728
1729 #[cfg(feature = "sim")]
1730 #[test]
1731 #[should_panic]
1732 fn sim_fold_repeats_snapshots() {
1733 // when the tick is driven by a snapshot AND something else, the snapshot can
1734 // "stutter" and repeat the same state multiple times
1735
1736 let mut flow = FlowBuilder::new();
1737 let node = flow.process::<()>();
1738
1739 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1740 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1741
1742 let tick = node.tick();
1743 let batch = source
1744 .batch(&tick, nondet!(/** test */))
1745 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1746 let out_recv = batch.all_ticks().sim_output();
1747
1748 flow.sim().exhaustive(async || {
1749 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1750 {
1751 panic!("repeated snapshot");
1752 }
1753 });
1754 }
1755
1756 #[cfg(feature = "sim")]
1757 #[test]
1758 fn sim_fold_repeats_snapshots_count() {
1759 // check the number of instances
1760 let mut flow = FlowBuilder::new();
1761 let node = flow.process::<()>();
1762
1763 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1764 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1765
1766 let tick = node.tick();
1767 let batch = source
1768 .batch(&tick, nondet!(/** test */))
1769 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1770 let out_recv = batch.all_ticks().sim_output();
1771
1772 let count = flow.sim().exhaustive(async || {
1773 let _ = out_recv.collect::<Vec<_>>().await;
1774 });
1775
1776 assert_eq!(count, 52);
1777 // don't have a combinatorial explanation for this number yet, but checked via logs
1778 }
1779
1780 #[cfg(feature = "sim")]
1781 #[test]
1782 fn sim_top_level_singleton_exhaustive() {
1783 // ensures that top-level singletons have only one snapshot
1784 let mut flow = FlowBuilder::new();
1785 let node = flow.process::<()>();
1786
1787 let singleton = node.singleton(q!(1));
1788 let tick = node.tick();
1789 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1790 let out_recv = batch.all_ticks().sim_output();
1791
1792 let count = flow.sim().exhaustive(async || {
1793 let _ = out_recv.collect::<Vec<_>>().await;
1794 });
1795
1796 assert_eq!(count, 1);
1797 }
1798
1799 #[cfg(feature = "sim")]
1800 #[test]
1801 fn sim_top_level_singleton_join_count() {
1802 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1803 // exploration
1804
1805 let mut flow = FlowBuilder::new();
1806 let node = flow.process::<()>();
1807
1808 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1809 let tick = node.tick();
1810 let batch = source_iter
1811 .batch(&tick, nondet!(/** test */))
1812 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1813 let out_recv = batch.all_ticks().sim_output();
1814
1815 let instance_count = flow.sim().exhaustive(async || {
1816 let _ = out_recv.collect::<Vec<_>>().await;
1817 });
1818
1819 assert_eq!(
1820 instance_count,
1821 16 // 2^4 ways to split up (including a possibly empty first batch)
1822 )
1823 }
1824
1825 #[cfg(feature = "sim")]
1826 #[test]
1827 fn top_level_singleton_into_stream_no_replay() {
1828 let mut flow = FlowBuilder::new();
1829 let node = flow.process::<()>();
1830
1831 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1832 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1833
1834 let out_recv = folded.into_stream().sim_output();
1835
1836 flow.sim().exhaustive(async || {
1837 out_recv.assert_yields_only([10]).await;
1838 });
1839 }
1840
1841 #[cfg(feature = "sim")]
1842 #[test]
1843 fn inside_tick_singleton_zip() {
1844 use crate::live_collections::Stream;
1845 use crate::live_collections::sliced::sliced;
1846
1847 let mut flow = FlowBuilder::new();
1848 let node = flow.process::<()>();
1849
1850 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1851 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1852
1853 let out_recv = sliced! {
1854 let v = use(folded, nondet!(/** test */));
1855 v.clone().zip(v).into_stream()
1856 }
1857 .sim_output();
1858
1859 let count = flow.sim().exhaustive(async || {
1860 let out = out_recv.collect::<Vec<_>>().await;
1861 assert_eq!(out.last(), Some(&(3, 3)));
1862 });
1863
1864 assert_eq!(count, 4);
1865 }
1866
1867 /// Reproducer for simulator hang when using cross_singleton on a top-level
1868 /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1869 /// after the first iteration.
1870 #[cfg(feature = "sim")]
1871 #[test]
1872 fn sim_cross_singleton_top_level_unbounded_hang() {
1873 let mut flow = FlowBuilder::new();
1874 let node = flow.process::<()>();
1875
1876 let (cmd_port, input) = node.sim_input::<String, _, _>();
1877
1878 let top_level_singleton = node.singleton(q!(123));
1879
1880 // cross_singleton on a top-level stream - bug trigger
1881 let crossed = input.cross_singleton(top_level_singleton);
1882
1883 // Output directly
1884 let resp_port = crossed.sim_output();
1885
1886 let count = flow.sim().exhaustive(async || {
1887 cmd_port.send("abc".to_owned());
1888
1889 let responses: Vec<_> = resp_port.collect().await;
1890 assert!(!responses.is_empty());
1891 });
1892
1893 assert_eq!(count, 1);
1894 }
1895
1896 #[cfg(feature = "sim")]
1897 #[test]
1898 fn sim_top_level_singleton_state_count() {
1899 let mut flow = FlowBuilder::new();
1900 let process = flow.process::<()>();
1901
1902 let (cmd_port, input) = process.sim_input();
1903 {
1904 // increases exhaustive inputs from 1 to 2 before we optimized `From`
1905 use super::Singleton;
1906 use crate::live_collections::boundedness::Unbounded;
1907 let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1908 }
1909 let tick = process.tick();
1910 let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1911 let resp_port = batched_unbatched.sim_output();
1912
1913 let count = flow.sim().exhaustive(async || {
1914 cmd_port.send(());
1915 let _responses: Vec<_> = resp_port.collect().await;
1916 });
1917
1918 assert_eq!(count, 1);
1919 }
1920}