Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<
47            UnderlyingBound = Self::UnderlyingBound,
48            ValueBound = Bounded,
49            EraseMonotonic = Self::WithBoundedValue,
50        >;
51
52    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55    /// The type of the keyed singleton if the value for each key is no longer monotonic.
56    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59    fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63    type UnderlyingBound = Unbounded;
64    type ValueBound = Unbounded;
65    type WithBoundedValue = BoundedValue;
66    type KeyedStreamToMonotone = MonotonicValue;
67    type EraseMonotonic = Unbounded;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Unbounded
71    }
72}
73
74impl KeyedSingletonBound for Bounded {
75    type UnderlyingBound = Bounded;
76    type ValueBound = Bounded;
77    type WithBoundedValue = Bounded;
78    type KeyedStreamToMonotone = Bounded;
79    type EraseMonotonic = Bounded;
80
81    fn bound_kind() -> KeyedSingletonBoundKind {
82        KeyedSingletonBoundKind::Bounded
83    }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91    type UnderlyingBound = Unbounded;
92    type ValueBound = Bounded;
93    type WithBoundedValue = BoundedValue;
94    type KeyedStreamToMonotone = BoundedValue;
95    type EraseMonotonic = BoundedValue;
96
97    fn bound_kind() -> KeyedSingletonBoundKind {
98        KeyedSingletonBoundKind::BoundedValue
99    }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107    type UnderlyingBound = Unbounded;
108    type ValueBound = Unbounded;
109    type WithBoundedValue = BoundedValue;
110    type KeyedStreamToMonotone = MonotonicValue;
111    type EraseMonotonic = Unbounded;
112
113    fn bound_kind() -> KeyedSingletonBoundKind {
114        KeyedSingletonBoundKind::MonotonicValue
115    }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131///     - [`Bounded`] (local and finite)
132///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135    pub(crate) location: Loc,
136    pub(crate) ir_node: RefCell<HydroNode>,
137    pub(crate) flow_state: FlowState,
138
139    _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143    fn drop(&mut self) {
144        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147                input: Box::new(ir_node),
148                op_metadata: HydroIrOpMetadata::new(),
149            });
150        }
151    }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155    for KeyedSingleton<K, V, Loc, Bound>
156{
157    fn clone(&self) -> Self {
158        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160            *self.ir_node.borrow_mut() = HydroNode::Tee {
161                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162                metadata: self.location.new_node_metadata(Self::collection_kind()),
163            };
164        }
165
166        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167            KeyedSingleton {
168                location: self.location.clone(),
169                flow_state: self.flow_state.clone(),
170                ir_node: HydroNode::Tee {
171                    inner: SharedNode(inner.0.clone()),
172                    metadata: metadata.clone(),
173                }
174                .into(),
175                _phantom: PhantomData,
176            }
177        } else {
178            unreachable!()
179        }
180    }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184    for KeyedSingleton<K, V, L, B>
185where
186    L: Location<'a>,
187{
188    type Location = L;
189
190    fn create_source(cycle_id: CycleId, location: L) -> Self {
191        KeyedSingleton {
192            flow_state: location.flow_state().clone(),
193            location: location.clone(),
194            ir_node: RefCell::new(HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            }),
198            _phantom: PhantomData,
199        }
200    }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205    L: Location<'a>,
206{
207    type Location = Tick<L>;
208
209    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210        KeyedSingleton::new(
211            location.clone(),
212            HydroNode::CycleSource {
213                cycle_id,
214                metadata: location.new_node_metadata(Self::collection_kind()),
215            },
216        )
217    }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        KeyedSingleton::defer_tick(self)
226    }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230    for KeyedSingleton<K, V, L, B>
231where
232    L: Location<'a>,
233{
234    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235        assert_eq!(
236            Location::id(&self.location),
237            expected_location,
238            "locations do not match"
239        );
240        self.location
241            .flow_state()
242            .borrow_mut()
243            .push_root(HydroRoot::CycleSink {
244                cycle_id,
245                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246                op_metadata: HydroIrOpMetadata::new(),
247            });
248    }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253    L: Location<'a>,
254{
255    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256        assert_eq!(
257            Location::id(&self.location),
258            expected_location,
259            "locations do not match"
260        );
261        self.location
262            .flow_state()
263            .borrow_mut()
264            .push_root(HydroRoot::CycleSink {
265                cycle_id,
266                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267                op_metadata: HydroIrOpMetadata::new(),
268            });
269    }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277        let flow_state = location.flow_state().clone();
278        KeyedSingleton {
279            location,
280            flow_state,
281            ir_node: RefCell::new(ir_node),
282            _phantom: PhantomData,
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed singleton is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Weakens the consistency of this live collection to not guarantee any consistency across
292    /// cluster members (if this collection is on a cluster).
293    pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
294    where
295        L: Location<'a>,
296    {
297        if L::consistency()
298            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299        {
300            // already no consistency
301            KeyedSingleton::new(
302                self.location.drop_consistency(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else {
306            KeyedSingleton::new(
307                self.location.drop_consistency(),
308                HydroNode::Cast {
309                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310                    metadata: self
311                        .location
312                        .drop_consistency()
313                        .new_node_metadata(
314                            KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
315                        ),
316                },
317            )
318        }
319    }
320
321    /// Casts this live collection to have the consistency guarantees specified in the given
322    /// location type parameter. The developer must ensure that the strengthened consistency
323    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
325        self,
326        _proof: impl crate::properties::ConsistencyProof,
327    ) -> KeyedSingleton<K, V, L2, B>
328    where
329        L: Location<'a>,
330    {
331        if L::consistency() == L2::consistency() {
332            // already consistent
333            KeyedSingleton::new(
334                self.location.with_consistency_of(),
335                self.ir_node.replace(HydroNode::Placeholder),
336            )
337        } else {
338            KeyedSingleton::new(
339                self.location.with_consistency_of(),
340                HydroNode::AssertIsConsistent {
341                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
342                    trusted: false,
343                    metadata: self
344                        .location
345                        .clone()
346                        .with_consistency_of::<L2>()
347                        .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
348                },
349            )
350        }
351    }
352}
353
354#[cfg(stageleft_runtime)]
355fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
356    me: KeyedSingleton<K, V, L, Bounded>,
357) -> Singleton<usize, L, Bounded> {
358    me.entries().count()
359}
360
361#[cfg(stageleft_runtime)]
362fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
363    me: KeyedSingleton<K, V, L, Bounded>,
364) -> Singleton<HashMap<K, V>, L, Bounded>
365where
366    K: Eq + Hash,
367{
368    me.entries()
369        .assume_ordering_trusted(nondet!(
370            /// There is only one element associated with each key. The closure technically
371            /// isn't commutative in the case where both passed entries have the same key
372            /// but different values.
373            ///
374            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
375            /// the key is never already present in the map.
376        ))
377        .fold(
378            q!(|| HashMap::new()),
379            q!(|map, (k, v)| {
380                map.insert(k, v);
381            }),
382        )
383}
384
385impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
386    pub(crate) fn collection_kind() -> CollectionKind {
387        CollectionKind::KeyedSingleton {
388            bound: B::bound_kind(),
389            key_type: stageleft::quote_type::<K>().into(),
390            value_type: stageleft::quote_type::<V>().into(),
391        }
392    }
393
394    /// Transforms each value by invoking `f` on each element, with keys staying the same
395    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
396    ///
397    /// If you do not want to modify the stream and instead only want to view
398    /// each item use [`KeyedSingleton::inspect`] instead.
399    ///
400    /// # Example
401    /// ```rust
402    /// # #[cfg(feature = "deploy")] {
403    /// # use hydro_lang::prelude::*;
404    /// # use futures::StreamExt;
405    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
406    /// let keyed_singleton = // { 1: 2, 2: 4 }
407    /// # process
408    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
409    /// #     .into_keyed()
410    /// #     .first();
411    /// keyed_singleton.map(q!(|v| v + 1))
412    /// #   .entries()
413    /// # }, |mut stream| async move {
414    /// // { 1: 3, 2: 5 }
415    /// # let mut results = Vec::new();
416    /// # for _ in 0..2 {
417    /// #     results.push(stream.next().await.unwrap());
418    /// # }
419    /// # results.sort();
420    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
421    /// # }));
422    /// # }
423    /// ```
424    pub fn map<U, F>(
425        self,
426        f: impl IntoQuotedMut<'a, F, L> + Copy,
427    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
428    where
429        F: Fn(V) -> U + 'a,
430    {
431        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
432        let map_f = q!({
433            let orig = f;
434            move |(k, v)| (k, orig(v))
435        })
436        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
437        .into();
438
439        KeyedSingleton::new(
440            self.location.clone(),
441            HydroNode::Map {
442                f: map_f,
443                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
444                metadata: self.location.new_node_metadata(KeyedSingleton::<
445                    K,
446                    U,
447                    L,
448                    B::EraseMonotonic,
449                >::collection_kind()),
450            },
451        )
452    }
453
454    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
455    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
456    ///
457    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
458    /// the new value `U`. The key remains unchanged in the output.
459    ///
460    /// # Example
461    /// ```rust
462    /// # #[cfg(feature = "deploy")] {
463    /// # use hydro_lang::prelude::*;
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466    /// let keyed_singleton = // { 1: 2, 2: 4 }
467    /// # process
468    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
469    /// #     .into_keyed()
470    /// #     .first();
471    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
472    /// #   .entries()
473    /// # }, |mut stream| async move {
474    /// // { 1: 3, 2: 6 }
475    /// # let mut results = Vec::new();
476    /// # for _ in 0..2 {
477    /// #     results.push(stream.next().await.unwrap());
478    /// # }
479    /// # results.sort();
480    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
481    /// # }));
482    /// # }
483    /// ```
484    pub fn map_with_key<U, F>(
485        self,
486        f: impl IntoQuotedMut<'a, F, L> + Copy,
487    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
488    where
489        F: Fn((K, V)) -> U + 'a,
490        K: Clone,
491    {
492        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
493        let map_f = q!({
494            let orig = f;
495            move |(k, v)| {
496                let out = orig((Clone::clone(&k), v));
497                (k, out)
498            }
499        })
500        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
501        .into();
502
503        KeyedSingleton::new(
504            self.location.clone(),
505            HydroNode::Map {
506                f: map_f,
507                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508                metadata: self.location.new_node_metadata(KeyedSingleton::<
509                    K,
510                    U,
511                    L,
512                    B::EraseMonotonic,
513                >::collection_kind()),
514            },
515        )
516    }
517
518    /// Gets the number of keys in the keyed singleton.
519    ///
520    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
521    /// since keys may be added / removed over time. When the set of keys changes, the count will
522    /// be asynchronously updated.
523    ///
524    /// # Example
525    /// ```rust
526    /// # #[cfg(feature = "deploy")] {
527    /// # use hydro_lang::prelude::*;
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530    /// # let tick = process.tick();
531    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
532    /// # process
533    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
534    /// #     .into_keyed()
535    /// #     .batch(&tick, nondet!(/** test */))
536    /// #     .first();
537    /// keyed_singleton.key_count()
538    /// # .all_ticks()
539    /// # }, |mut stream| async move {
540    /// // 3
541    /// # assert_eq!(stream.next().await.unwrap(), 3);
542    /// # }));
543    /// # }
544    /// ```
545    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
546        if B::ValueBound::BOUNDED {
547            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
548                location: self.location.clone(),
549                flow_state: self.flow_state.clone(),
550                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
551                _phantom: PhantomData,
552            };
553
554            me.entries().count().ignore_monotonic()
555        } else if L::is_top_level()
556            && let Some(tick) = self.location.try_tick()
557            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
558        {
559            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
560                self.location.clone(),
561                self.ir_node.replace(HydroNode::Placeholder),
562            );
563
564            let out =
565                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
566                    .latest();
567            Singleton::new(
568                self.location.clone(),
569                out.ir_node.replace(HydroNode::Placeholder),
570            )
571        } else {
572            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
573        }
574    }
575
576    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
577    ///
578    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
579    /// asynchronously as well.
580    ///
581    /// # Example
582    /// ```rust
583    /// # #[cfg(feature = "deploy")] {
584    /// # use hydro_lang::prelude::*;
585    /// # use futures::StreamExt;
586    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
587    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
588    /// # process
589    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
590    /// #     .into_keyed()
591    /// #     .batch(&process.tick(), nondet!(/** test */))
592    /// #     .first();
593    /// keyed_singleton.into_singleton()
594    /// # .all_ticks()
595    /// # }, |mut stream| async move {
596    /// // { 1: "a", 2: "b", 3: "c" }
597    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
598    /// # }));
599    /// # }
600    /// ```
601    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
602    where
603        K: Eq + Hash,
604    {
605        if B::ValueBound::BOUNDED {
606            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
607                location: self.location.clone(),
608                flow_state: self.flow_state.clone(),
609                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
610                _phantom: PhantomData,
611            };
612
613            me.entries()
614                .assume_ordering_trusted(nondet!(
615                    /// There is only one element associated with each key. The closure technically
616                    /// isn't commutative in the case where both passed entries have the same key
617                    /// but different values.
618                    ///
619                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
620                    /// the key is never already present in the map.
621                ))
622                .fold(
623                    q!(|| HashMap::new()),
624                    q!(|map, (k, v)| {
625                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
626                        map.insert(k, v);
627                    }),
628                )
629        } else if L::is_top_level()
630            && let Some(tick) = self.location.try_tick()
631            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
632        {
633            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
634                self.location.clone(),
635                self.ir_node.replace(HydroNode::Placeholder),
636            );
637
638            let out = into_singleton_inside_tick(
639                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640            )
641            .latest();
642            Singleton::new(
643                self.location.clone(),
644                out.ir_node.replace(HydroNode::Placeholder),
645            )
646        } else {
647            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
648        }
649    }
650
651    /// An operator which allows you to "name" a `HydroNode`.
652    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
653    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
654        {
655            let mut node = self.ir_node.borrow_mut();
656            let metadata = node.metadata_mut();
657            metadata.tag = Some(name.to_owned());
658        }
659        self
660    }
661
662    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
663    /// implies that `B == Bounded`.
664    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
665    where
666        B: IsBounded,
667    {
668        KeyedSingleton::new(
669            self.location.clone(),
670            self.ir_node.replace(HydroNode::Placeholder),
671        )
672    }
673
674    /// Gets the value associated with a specific key from the keyed singleton.
675    /// Returns `None` if the key is `None` or there is no associated value.
676    ///
677    /// # Example
678    /// ```rust
679    /// # #[cfg(feature = "deploy")] {
680    /// # use hydro_lang::prelude::*;
681    /// # use futures::StreamExt;
682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
683    /// let tick = process.tick();
684    /// let keyed_data = process
685    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
686    ///     .into_keyed()
687    ///     .batch(&tick, nondet!(/** test */))
688    ///     .first();
689    /// let key = tick.singleton(q!(1));
690    /// keyed_data.get(key).all_ticks()
691    /// # }, |mut stream| async move {
692    /// // 2
693    /// # assert_eq!(stream.next().await.unwrap(), 2);
694    /// # }));
695    /// # }
696    /// ```
697    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
698    where
699        B: IsBounded,
700        K: Hash + Eq + Clone,
701        V: Clone,
702    {
703        self.make_bounded()
704            .into_keyed_stream()
705            .get(key)
706            .cast_at_most_one_element()
707    }
708
709    /// Emit a keyed stream containing keys shared between the keyed singleton and the
710    /// keyed stream, where each value in the output keyed stream is a tuple of
711    /// (the keyed singleton's value, the keyed stream's value).
712    ///
713    /// # Example
714    /// ```rust
715    /// # #[cfg(feature = "deploy")] {
716    /// # use hydro_lang::prelude::*;
717    /// # use futures::StreamExt;
718    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719    /// let tick = process.tick();
720    /// let keyed_data = process
721    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
722    ///     .into_keyed()
723    ///     .batch(&tick, nondet!(/** test */))
724    ///     .first();
725    /// let other_data = process
726    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
727    ///     .into_keyed()
728    ///     .batch(&tick, nondet!(/** test */));
729    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
730    /// # }, |mut stream| async move {
731    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
732    /// # let mut results = vec![];
733    /// # for _ in 0..3 {
734    /// #     results.push(stream.next().await.unwrap());
735    /// # }
736    /// # results.sort();
737    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
738    /// # }));
739    /// # }
740    /// ```
741    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
742        self,
743        other: KeyedStream<K, V2, L, B2, O2, R2>,
744    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
745    where
746        B: IsBounded,
747        K: Eq + Hash + Clone,
748        V: Clone,
749        V2: Clone,
750    {
751        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
752        // always produces deterministic order per key (nested loop join), this could just use
753        // `join_keyed_stream` without constructing IRs manually
754        KeyedStream::new(
755            self.location.clone(),
756            HydroNode::Join {
757                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
758                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
759                metadata: self
760                    .location
761                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
762            },
763        )
764    }
765
766    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
767    /// where each value in the output keyed singleton is a tuple of
768    /// (self.value, other.value).
769    ///
770    /// # Example
771    /// ```rust
772    /// # #[cfg(feature = "deploy")] {
773    /// # use hydro_lang::prelude::*;
774    /// # use futures::StreamExt;
775    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776    /// # let tick = process.tick();
777    /// let requests = // { 1: 10, 2: 20, 3: 30 }
778    /// # process
779    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
780    /// #     .into_keyed()
781    /// #     .batch(&tick, nondet!(/** test */))
782    /// #     .first();
783    /// let other = // { 1: 100, 2: 200, 4: 400 }
784    /// # process
785    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
786    /// #     .into_keyed()
787    /// #     .batch(&tick, nondet!(/** test */))
788    /// #     .first();
789    /// requests.join_keyed_singleton(other)
790    /// # .entries().all_ticks()
791    /// # }, |mut stream| async move {
792    /// // { 1: (10, 100), 2: (20, 200) }
793    /// # let mut results = vec![];
794    /// # for _ in 0..2 {
795    /// #     results.push(stream.next().await.unwrap());
796    /// # }
797    /// # results.sort();
798    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
799    /// # }));
800    /// # }
801    /// ```
802    pub fn join_keyed_singleton<V2: Clone>(
803        self,
804        other: KeyedSingleton<K, V2, L, Bounded>,
805    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
806    where
807        B: IsBounded,
808        K: Eq + Hash + Clone,
809        V: Clone,
810    {
811        let result_stream = self
812            .make_bounded()
813            .entries()
814            .join(other.entries())
815            .into_keyed();
816
817        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
818        result_stream.cast_at_most_one_entry_per_key()
819    }
820
821    /// For each value in `self`, find the matching key in `lookup`.
822    /// The output is a keyed singleton with the key from `self`, and a value
823    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
824    /// If the key is not present in `lookup`, the option will be [`None`].
825    ///
826    /// # Example
827    /// ```rust
828    /// # #[cfg(feature = "deploy")] {
829    /// # use hydro_lang::prelude::*;
830    /// # use futures::StreamExt;
831    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
832    /// # let tick = process.tick();
833    /// let requests = // { 1: 10, 2: 20 }
834    /// # process
835    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
836    /// #     .into_keyed()
837    /// #     .batch(&tick, nondet!(/** test */))
838    /// #     .first();
839    /// let other_data = // { 10: 100, 11: 110 }
840    /// # process
841    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
842    /// #     .into_keyed()
843    /// #     .batch(&tick, nondet!(/** test */))
844    /// #     .first();
845    /// requests.lookup_keyed_singleton(other_data)
846    /// # .entries().all_ticks()
847    /// # }, |mut stream| async move {
848    /// // { 1: (10, Some(100)), 2: (20, None) }
849    /// # let mut results = vec![];
850    /// # for _ in 0..2 {
851    /// #     results.push(stream.next().await.unwrap());
852    /// # }
853    /// # results.sort();
854    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
855    /// # }));
856    /// # }
857    /// ```
858    pub fn lookup_keyed_singleton<V2>(
859        self,
860        lookup: KeyedSingleton<V, V2, L, Bounded>,
861    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
862    where
863        B: IsBounded,
864        K: Eq + Hash + Clone,
865        V: Eq + Hash + Clone,
866        V2: Clone,
867    {
868        let result_stream = self
869            .make_bounded()
870            .into_keyed_stream()
871            .lookup_keyed_stream(lookup.into_keyed_stream());
872
873        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
874        result_stream.cast_at_most_one_entry_per_key()
875    }
876
877    /// For each value in `self`, find the matching key in `lookup`.
878    /// The output is a keyed stream with the key from `self`, and a value
879    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
880    /// If the key is not present in `lookup`, the option will be [`None`].
881    ///
882    /// # Example
883    /// ```rust
884    /// # #[cfg(feature = "deploy")] {
885    /// # use hydro_lang::prelude::*;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888    /// # let tick = process.tick();
889    /// let requests = // { 1: 10, 2: 20 }
890    /// # process
891    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
892    /// #     .into_keyed()
893    /// #     .batch(&tick, nondet!(/** test */))
894    /// #     .first();
895    /// let other_data = // { 10: 100, 10: 110 }
896    /// # process
897    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
898    /// #     .into_keyed()
899    /// #     .batch(&tick, nondet!(/** test */));
900    /// requests.lookup_keyed_stream(other_data)
901    /// # .entries().all_ticks()
902    /// # }, |mut stream| async move {
903    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
904    /// # let mut results = vec![];
905    /// # for _ in 0..3 {
906    /// #     results.push(stream.next().await.unwrap());
907    /// # }
908    /// # results.sort();
909    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
910    /// # }));
911    /// # }
912    /// ```
913    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
914        self,
915        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
916    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
917    where
918        B: IsBounded,
919        K: Eq + Hash + Clone,
920        V: Eq + Hash + Clone,
921        V2: Clone,
922    {
923        self.make_bounded()
924            .entries()
925            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
926            .into_keyed()
927            .lookup_keyed_stream(lookup)
928    }
929}
930
931impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
932    KeyedSingleton<K, V, L, B>
933{
934    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
935    ///
936    /// The value for each key must be bounded, otherwise the resulting stream elements would be
937    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
938    /// into the output.
939    ///
940    /// # Example
941    /// ```rust
942    /// # #[cfg(feature = "deploy")] {
943    /// # use hydro_lang::prelude::*;
944    /// # use futures::StreamExt;
945    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
946    /// let keyed_singleton = // { 1: 2, 2: 4 }
947    /// # process
948    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
949    /// #     .into_keyed()
950    /// #     .first();
951    /// keyed_singleton.entries()
952    /// # }, |mut stream| async move {
953    /// // (1, 2), (2, 4) in any order
954    /// # let mut results = Vec::new();
955    /// # for _ in 0..2 {
956    /// #     results.push(stream.next().await.unwrap());
957    /// # }
958    /// # results.sort();
959    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
964        self.into_keyed_stream().entries()
965    }
966
967    /// Flattens the keyed singleton into an unordered stream of just the values.
968    ///
969    /// The value for each key must be bounded, otherwise the resulting stream elements would be
970    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
971    /// into the output.
972    ///
973    /// # Example
974    /// ```rust
975    /// # #[cfg(feature = "deploy")] {
976    /// # use hydro_lang::prelude::*;
977    /// # use futures::StreamExt;
978    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
979    /// let keyed_singleton = // { 1: 2, 2: 4 }
980    /// # process
981    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
982    /// #     .into_keyed()
983    /// #     .first();
984    /// keyed_singleton.values()
985    /// # }, |mut stream| async move {
986    /// // 2, 4 in any order
987    /// # let mut results = Vec::new();
988    /// # for _ in 0..2 {
989    /// #     results.push(stream.next().await.unwrap());
990    /// # }
991    /// # results.sort();
992    /// # assert_eq!(results, vec![2, 4]);
993    /// # }));
994    /// # }
995    /// ```
996    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
997        let map_f = q!(|(_, v)| v)
998            .splice_fn1_ctx::<(K, V), V>(&self.location)
999            .into();
1000
1001        Stream::new(
1002            self.location.clone(),
1003            HydroNode::Map {
1004                f: map_f,
1005                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1006                metadata: self.location.new_node_metadata(Stream::<
1007                    V,
1008                    L,
1009                    B::UnderlyingBound,
1010                    NoOrder,
1011                    ExactlyOnce,
1012                >::collection_kind()),
1013            },
1014        )
1015    }
1016
1017    /// Flattens the keyed singleton into an unordered stream of just the keys.
1018    ///
1019    /// The value for each key must be bounded, otherwise the removal of keys would result in
1020    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1021    /// into the output.
1022    ///
1023    /// # Example
1024    /// ```rust
1025    /// # #[cfg(feature = "deploy")] {
1026    /// # use hydro_lang::prelude::*;
1027    /// # use futures::StreamExt;
1028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1029    /// let keyed_singleton = // { 1: 2, 2: 4 }
1030    /// # process
1031    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1032    /// #     .into_keyed()
1033    /// #     .first();
1034    /// keyed_singleton.keys()
1035    /// # }, |mut stream| async move {
1036    /// // 1, 2 in any order
1037    /// # let mut results = Vec::new();
1038    /// # for _ in 0..2 {
1039    /// #     results.push(stream.next().await.unwrap());
1040    /// # }
1041    /// # results.sort();
1042    /// # assert_eq!(results, vec![1, 2]);
1043    /// # }));
1044    /// # }
1045    /// ```
1046    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1047        self.entries().map(q!(|(k, _)| k))
1048    }
1049
1050    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1051    /// entries whose keys are not in the provided stream.
1052    ///
1053    /// # Example
1054    /// ```rust
1055    /// # #[cfg(feature = "deploy")] {
1056    /// # use hydro_lang::prelude::*;
1057    /// # use futures::StreamExt;
1058    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1059    /// let tick = process.tick();
1060    /// let keyed_singleton = // { 1: 2, 2: 4 }
1061    /// # process
1062    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1063    /// #     .into_keyed()
1064    /// #     .first()
1065    /// #     .batch(&tick, nondet!(/** test */));
1066    /// let keys_to_remove = process
1067    ///     .source_iter(q!(vec![1]))
1068    ///     .batch(&tick, nondet!(/** test */));
1069    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1070    /// #   .entries().all_ticks()
1071    /// # }, |mut stream| async move {
1072    /// // { 2: 4 }
1073    /// # for w in vec![(2, 4)] {
1074    /// #     assert_eq!(stream.next().await.unwrap(), w);
1075    /// # }
1076    /// # }));
1077    /// # }
1078    /// ```
1079    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1080        self,
1081        other: Stream<K, L, Bounded, O2, R2>,
1082    ) -> Self
1083    where
1084        K: Hash + Eq,
1085    {
1086        check_matching_location(&self.location, &other.location);
1087
1088        KeyedSingleton::new(
1089            self.location.clone(),
1090            HydroNode::AntiJoin {
1091                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1092                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1093                metadata: self.location.new_node_metadata(Self::collection_kind()),
1094            },
1095        )
1096    }
1097
1098    /// An operator which allows you to "inspect" each value of a keyed singleton without
1099    /// modifying it. The closure `f` is called on a reference to each value. This is
1100    /// mainly useful for debugging, and should not be used to generate side-effects.
1101    ///
1102    /// # Example
1103    /// ```rust
1104    /// # #[cfg(feature = "deploy")] {
1105    /// # use hydro_lang::prelude::*;
1106    /// # use futures::StreamExt;
1107    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108    /// let keyed_singleton = // { 1: 2, 2: 4 }
1109    /// # process
1110    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1111    /// #     .into_keyed()
1112    /// #     .first();
1113    /// keyed_singleton
1114    ///     .inspect(q!(|v| println!("{}", v)))
1115    /// #   .entries()
1116    /// # }, |mut stream| async move {
1117    /// // { 1: 2, 2: 4 }
1118    /// # for w in vec![(1, 2), (2, 4)] {
1119    /// #     assert_eq!(stream.next().await.unwrap(), w);
1120    /// # }
1121    /// # }));
1122    /// # }
1123    /// ```
1124    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1125    where
1126        F: Fn(&V) + 'a,
1127    {
1128        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1129        let inspect_f = q!({
1130            let orig = f;
1131            move |t: &(_, _)| orig(&t.1)
1132        })
1133        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1134        .into();
1135
1136        KeyedSingleton::new(
1137            self.location.clone(),
1138            HydroNode::Inspect {
1139                f: inspect_f,
1140                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1141                metadata: self.location.new_node_metadata(Self::collection_kind()),
1142            },
1143        )
1144    }
1145
1146    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1147    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1148    /// mainly useful for debugging, and should not be used to generate side-effects.
1149    ///
1150    /// # Example
1151    /// ```rust
1152    /// # #[cfg(feature = "deploy")] {
1153    /// # use hydro_lang::prelude::*;
1154    /// # use futures::StreamExt;
1155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156    /// let keyed_singleton = // { 1: 2, 2: 4 }
1157    /// # process
1158    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1159    /// #     .into_keyed()
1160    /// #     .first();
1161    /// keyed_singleton
1162    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1163    /// #   .entries()
1164    /// # }, |mut stream| async move {
1165    /// // { 1: 2, 2: 4 }
1166    /// # for w in vec![(1, 2), (2, 4)] {
1167    /// #     assert_eq!(stream.next().await.unwrap(), w);
1168    /// # }
1169    /// # }));
1170    /// # }
1171    /// ```
1172    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1173    where
1174        F: Fn(&(K, V)) + 'a,
1175    {
1176        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1177
1178        KeyedSingleton::new(
1179            self.location.clone(),
1180            HydroNode::Inspect {
1181                f: inspect_f,
1182                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1183                metadata: self.location.new_node_metadata(Self::collection_kind()),
1184            },
1185        )
1186    }
1187
1188    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1189    ///
1190    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1191    /// asynchronously updated if a new key is added that is higher than the previous max key.
1192    ///
1193    /// # Example
1194    /// ```rust
1195    /// # #[cfg(feature = "deploy")] {
1196    /// # use hydro_lang::prelude::*;
1197    /// # use futures::StreamExt;
1198    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1199    /// let tick = process.tick();
1200    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1201    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1202    /// #     .into_keyed()
1203    /// #     .first();
1204    /// keyed_singleton.get_max_key()
1205    /// # .sample_eager(nondet!(/** test */))
1206    /// # }, |mut stream| async move {
1207    /// // (2, 456)
1208    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1209    /// # }));
1210    /// # }
1211    /// ```
1212    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1213    where
1214        K: Ord,
1215    {
1216        self.entries()
1217            .assume_ordering_trusted(nondet!(
1218                /// There is only one element associated with each key, and the keys are totallly
1219                /// ordered so we will produce a deterministic value. The closure technically
1220                /// isn't commutative in the case where both passed entries have the same key
1221                /// but different values.
1222                ///
1223                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1224                /// the two inputs do not have the same key.
1225            ))
1226            .reduce(q!(
1227                move |curr, new| {
1228                    if new.0 > curr.0 {
1229                        *curr = new;
1230                    }
1231                },
1232                idempotent = manual_proof!(/** repeated elements are ignored */)
1233            ))
1234    }
1235
1236    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1237    /// element, the value.
1238    ///
1239    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1240    ///
1241    /// # Example
1242    /// ```rust
1243    /// # #[cfg(feature = "deploy")] {
1244    /// # use hydro_lang::prelude::*;
1245    /// # use futures::StreamExt;
1246    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1247    /// let keyed_singleton = // { 1: 2, 2: 4 }
1248    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1249    /// #     .into_keyed()
1250    /// #     .first();
1251    /// keyed_singleton
1252    ///     .clone()
1253    ///     .into_keyed_stream()
1254    ///     .merge_unordered(
1255    ///         keyed_singleton.into_keyed_stream()
1256    ///     )
1257    /// #   .entries()
1258    /// # }, |mut stream| async move {
1259    /// /// // { 1: [2, 2], 2: [4, 4] }
1260    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1261    /// #     assert_eq!(stream.next().await.unwrap(), w);
1262    /// # }
1263    /// # }));
1264    /// # }
1265    /// ```
1266    pub fn into_keyed_stream(
1267        self,
1268    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1269        KeyedStream::new(
1270            self.location.clone(),
1271            HydroNode::Cast {
1272                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1273                metadata: self.location.new_node_metadata(KeyedStream::<
1274                    K,
1275                    V,
1276                    L,
1277                    B::UnderlyingBound,
1278                    TotalOrder,
1279                    ExactlyOnce,
1280                >::collection_kind()),
1281            },
1282        )
1283    }
1284}
1285
1286impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1287where
1288    L: Location<'a>,
1289{
1290    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1291    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1292    ///
1293    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1294    /// processed before an acknowledgement is emitted.
1295    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1296        let id = self.location.flow_state().borrow_mut().next_clock_id();
1297        let out_location = Atomic {
1298            tick: Tick {
1299                id,
1300                l: self.location.clone(),
1301            },
1302        };
1303        KeyedSingleton::new(
1304            out_location.clone(),
1305            HydroNode::BeginAtomic {
1306                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1307                metadata: out_location
1308                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1309            },
1310        )
1311    }
1312}
1313
1314impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1315where
1316    L: Location<'a>,
1317{
1318    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1319    /// See [`KeyedSingleton::atomic`] for more details.
1320    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1321        KeyedSingleton::new(
1322            self.location.tick.l.clone(),
1323            HydroNode::EndAtomic {
1324                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1325                metadata: self
1326                    .location
1327                    .tick
1328                    .l
1329                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1330            },
1331        )
1332    }
1333}
1334
1335impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1336    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1337    /// tick `T` always has the entries of `self` at tick `T - 1`.
1338    ///
1339    /// At tick `0`, the output has no entries, since there is no previous tick.
1340    ///
1341    /// This operator enables stateful iterative processing with ticks, by sending data from one
1342    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1343    ///
1344    /// # Example
1345    /// ```rust
1346    /// # #[cfg(feature = "deploy")] {
1347    /// # use hydro_lang::prelude::*;
1348    /// # use futures::StreamExt;
1349    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1350    /// let tick = process.tick();
1351    /// # // ticks are lazy by default, forces the second tick to run
1352    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1353    /// # let batch_first_tick = process
1354    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1355    /// #   .batch(&tick, nondet!(/** test */))
1356    /// #   .into_keyed();
1357    /// # let batch_second_tick = process
1358    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1359    /// #   .batch(&tick, nondet!(/** test */))
1360    /// #   .into_keyed()
1361    /// #   .defer_tick(); // appears on the second tick
1362    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1363    /// # batch_first_tick.chain(batch_second_tick).first();
1364    /// input_batch.clone().filter_key_not_in(
1365    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1366    /// )
1367    /// # .entries().all_ticks()
1368    /// # }, |mut stream| async move {
1369    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1370    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1371    /// #     assert_eq!(stream.next().await.unwrap(), w);
1372    /// # }
1373    /// # }));
1374    /// # }
1375    /// ```
1376    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1377        KeyedSingleton::new(
1378            self.location.clone(),
1379            HydroNode::DeferTick {
1380                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1381                metadata: self
1382                    .location
1383                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1384            },
1385        )
1386    }
1387}
1388
1389impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1390where
1391    L: Location<'a>,
1392{
1393    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1394    /// point in time.
1395    ///
1396    /// # Non-Determinism
1397    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1398    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1399    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1400        self,
1401        tick: &Tick<L2>,
1402        _nondet: NonDet,
1403    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1404        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1405        KeyedSingleton::new(
1406            tick.drop_consistency(),
1407            HydroNode::Batch {
1408                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1409                metadata: tick
1410                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1411            },
1412        )
1413    }
1414}
1415
1416impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1417where
1418    L: Location<'a>,
1419{
1420    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1421    /// state of the keyed singleton being atomically processed.
1422    ///
1423    /// # Non-Determinism
1424    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1425    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1426    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1427        self,
1428        tick: &Tick<L2>,
1429        _nondet: NonDet,
1430    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1431        KeyedSingleton::new(
1432            tick.drop_consistency(),
1433            HydroNode::Batch {
1434                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435                metadata: tick
1436                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1437            },
1438        )
1439    }
1440}
1441
1442impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1443where
1444    L: Location<'a>,
1445{
1446    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1447    ///
1448    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1449    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1450    /// is filtered out.
1451    ///
1452    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1453    /// not modify or take ownership of the values. If you need to modify the values while filtering
1454    /// use [`KeyedSingleton::filter_map`] instead.
1455    ///
1456    /// # Example
1457    /// ```rust
1458    /// # #[cfg(feature = "deploy")] {
1459    /// # use hydro_lang::prelude::*;
1460    /// # use futures::StreamExt;
1461    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1462    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1463    /// # process
1464    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1465    /// #     .into_keyed()
1466    /// #     .first();
1467    /// keyed_singleton.filter(q!(|&v| v > 1))
1468    /// #   .entries()
1469    /// # }, |mut stream| async move {
1470    /// // { 1: 2, 2: 4 }
1471    /// # let mut results = Vec::new();
1472    /// # for _ in 0..2 {
1473    /// #     results.push(stream.next().await.unwrap());
1474    /// # }
1475    /// # results.sort();
1476    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1477    /// # }));
1478    /// # }
1479    /// ```
1480    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1481    where
1482        F: Fn(&V) -> bool + 'a,
1483    {
1484        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1485        let filter_f = q!({
1486            let orig = f;
1487            move |t: &(_, _)| orig(&t.1)
1488        })
1489        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1490        .into();
1491
1492        KeyedSingleton::new(
1493            self.location.clone(),
1494            HydroNode::Filter {
1495                f: filter_f,
1496                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1497                metadata: self
1498                    .location
1499                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1500            },
1501        )
1502    }
1503
1504    /// An operator that both filters and maps values. It yields only the key-value pairs where
1505    /// the supplied closure `f` returns `Some(value)`.
1506    ///
1507    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1508    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1509    /// If it returns `None`, the key-value pair is filtered out.
1510    ///
1511    /// # Example
1512    /// ```rust
1513    /// # #[cfg(feature = "deploy")] {
1514    /// # use hydro_lang::prelude::*;
1515    /// # use futures::StreamExt;
1516    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1517    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1518    /// # process
1519    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1520    /// #     .into_keyed()
1521    /// #     .first();
1522    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1523    /// #   .entries()
1524    /// # }, |mut stream| async move {
1525    /// // { 1: 42, 3: 100 }
1526    /// # let mut results = Vec::new();
1527    /// # for _ in 0..2 {
1528    /// #     results.push(stream.next().await.unwrap());
1529    /// # }
1530    /// # results.sort();
1531    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1532    /// # }));
1533    /// # }
1534    /// ```
1535    pub fn filter_map<F, U>(
1536        self,
1537        f: impl IntoQuotedMut<'a, F, L> + Copy,
1538    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1539    where
1540        F: Fn(V) -> Option<U> + 'a,
1541    {
1542        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1543        let filter_map_f = q!({
1544            let orig = f;
1545            move |(k, v)| orig(v).map(|o| (k, o))
1546        })
1547        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1548        .into();
1549
1550        KeyedSingleton::new(
1551            self.location.clone(),
1552            HydroNode::FilterMap {
1553                f: filter_map_f,
1554                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1555                metadata: self.location.new_node_metadata(KeyedSingleton::<
1556                    K,
1557                    U,
1558                    L,
1559                    B::EraseMonotonic,
1560                >::collection_kind()),
1561            },
1562        )
1563    }
1564
1565    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1566    /// arrived since the previous batch was released.
1567    ///
1568    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1569    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1570    ///
1571    /// # Non-Determinism
1572    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1573    /// has a non-deterministic set of key-value pairs.
1574    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1575        self,
1576        tick: &Tick<L2>,
1577        _nondet: NonDet,
1578    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1579        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1580        KeyedSingleton::new(
1581            tick.drop_consistency(),
1582            HydroNode::Batch {
1583                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1584                metadata: tick
1585                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1586            },
1587        )
1588    }
1589}
1590
1591impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1592where
1593    L: Location<'a>,
1594{
1595    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1596    /// atomically processed.
1597    ///
1598    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1599    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1600    ///
1601    /// # Non-Determinism
1602    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1603    /// has a non-deterministic set of key-value pairs.
1604    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1605        self,
1606        tick: &Tick<L2>,
1607        nondet: NonDet,
1608    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1609        let _ = nondet;
1610        KeyedSingleton::new(
1611            tick.drop_consistency(),
1612            HydroNode::Batch {
1613                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1614                metadata: tick
1615                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1616            },
1617        )
1618    }
1619}
1620
1621#[cfg(test)]
1622mod tests {
1623    #[cfg(feature = "deploy")]
1624    use futures::{SinkExt, StreamExt};
1625    #[cfg(feature = "deploy")]
1626    use hydro_deploy::Deployment;
1627    #[cfg(any(feature = "deploy", feature = "sim"))]
1628    use stageleft::q;
1629
1630    #[cfg(any(feature = "deploy", feature = "sim"))]
1631    use crate::compile::builder::FlowBuilder;
1632    #[cfg(any(feature = "deploy", feature = "sim"))]
1633    use crate::location::Location;
1634    #[cfg(any(feature = "deploy", feature = "sim"))]
1635    use crate::nondet::nondet;
1636
1637    #[cfg(feature = "deploy")]
1638    #[tokio::test]
1639    async fn key_count_bounded_value() {
1640        let mut deployment = Deployment::new();
1641
1642        let mut flow = FlowBuilder::new();
1643        let node = flow.process::<()>();
1644        let external = flow.external::<()>();
1645
1646        let (input_port, input) = node.source_external_bincode(&external);
1647        let out = input
1648            .into_keyed()
1649            .first()
1650            .key_count()
1651            .sample_eager(nondet!(/** test */))
1652            .send_bincode_external(&external);
1653
1654        let nodes = flow
1655            .with_process(&node, deployment.Localhost())
1656            .with_external(&external, deployment.Localhost())
1657            .deploy(&mut deployment);
1658
1659        deployment.deploy().await.unwrap();
1660
1661        let mut external_in = nodes.connect(input_port).await;
1662        let mut external_out = nodes.connect(out).await;
1663
1664        deployment.start().await.unwrap();
1665
1666        assert_eq!(external_out.next().await.unwrap(), 0);
1667
1668        external_in.send((1, 1)).await.unwrap();
1669        assert_eq!(external_out.next().await.unwrap(), 1);
1670
1671        external_in.send((2, 2)).await.unwrap();
1672        assert_eq!(external_out.next().await.unwrap(), 2);
1673    }
1674
1675    #[cfg(feature = "deploy")]
1676    #[tokio::test]
1677    async fn key_count_unbounded_value() {
1678        let mut deployment = Deployment::new();
1679
1680        let mut flow = FlowBuilder::new();
1681        let node = flow.process::<()>();
1682        let external = flow.external::<()>();
1683
1684        let (input_port, input) = node.source_external_bincode(&external);
1685        let out = input
1686            .into_keyed()
1687            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1688            .key_count()
1689            .sample_eager(nondet!(/** test */))
1690            .send_bincode_external(&external);
1691
1692        let nodes = flow
1693            .with_process(&node, deployment.Localhost())
1694            .with_external(&external, deployment.Localhost())
1695            .deploy(&mut deployment);
1696
1697        deployment.deploy().await.unwrap();
1698
1699        let mut external_in = nodes.connect(input_port).await;
1700        let mut external_out = nodes.connect(out).await;
1701
1702        deployment.start().await.unwrap();
1703
1704        assert_eq!(external_out.next().await.unwrap(), 0);
1705
1706        external_in.send((1, 1)).await.unwrap();
1707        assert_eq!(external_out.next().await.unwrap(), 1);
1708
1709        external_in.send((1, 2)).await.unwrap();
1710        assert_eq!(external_out.next().await.unwrap(), 1);
1711
1712        external_in.send((2, 2)).await.unwrap();
1713        assert_eq!(external_out.next().await.unwrap(), 2);
1714
1715        external_in.send((1, 1)).await.unwrap();
1716        assert_eq!(external_out.next().await.unwrap(), 2);
1717
1718        external_in.send((3, 1)).await.unwrap();
1719        assert_eq!(external_out.next().await.unwrap(), 3);
1720    }
1721
1722    #[cfg(feature = "deploy")]
1723    #[tokio::test]
1724    async fn into_singleton_bounded_value() {
1725        let mut deployment = Deployment::new();
1726
1727        let mut flow = FlowBuilder::new();
1728        let node = flow.process::<()>();
1729        let external = flow.external::<()>();
1730
1731        let (input_port, input) = node.source_external_bincode(&external);
1732        let out = input
1733            .into_keyed()
1734            .first()
1735            .into_singleton()
1736            .sample_eager(nondet!(/** test */))
1737            .send_bincode_external(&external);
1738
1739        let nodes = flow
1740            .with_process(&node, deployment.Localhost())
1741            .with_external(&external, deployment.Localhost())
1742            .deploy(&mut deployment);
1743
1744        deployment.deploy().await.unwrap();
1745
1746        let mut external_in = nodes.connect(input_port).await;
1747        let mut external_out = nodes.connect(out).await;
1748
1749        deployment.start().await.unwrap();
1750
1751        assert_eq!(
1752            external_out.next().await.unwrap(),
1753            std::collections::HashMap::new()
1754        );
1755
1756        external_in.send((1, 1)).await.unwrap();
1757        assert_eq!(
1758            external_out.next().await.unwrap(),
1759            vec![(1, 1)].into_iter().collect()
1760        );
1761
1762        external_in.send((2, 2)).await.unwrap();
1763        assert_eq!(
1764            external_out.next().await.unwrap(),
1765            vec![(1, 1), (2, 2)].into_iter().collect()
1766        );
1767    }
1768
1769    #[cfg(feature = "deploy")]
1770    #[tokio::test]
1771    async fn into_singleton_unbounded_value() {
1772        let mut deployment = Deployment::new();
1773
1774        let mut flow = FlowBuilder::new();
1775        let node = flow.process::<()>();
1776        let external = flow.external::<()>();
1777
1778        let (input_port, input) = node.source_external_bincode(&external);
1779        let out = input
1780            .into_keyed()
1781            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1782            .into_singleton()
1783            .sample_eager(nondet!(/** test */))
1784            .send_bincode_external(&external);
1785
1786        let nodes = flow
1787            .with_process(&node, deployment.Localhost())
1788            .with_external(&external, deployment.Localhost())
1789            .deploy(&mut deployment);
1790
1791        deployment.deploy().await.unwrap();
1792
1793        let mut external_in = nodes.connect(input_port).await;
1794        let mut external_out = nodes.connect(out).await;
1795
1796        deployment.start().await.unwrap();
1797
1798        assert_eq!(
1799            external_out.next().await.unwrap(),
1800            std::collections::HashMap::new()
1801        );
1802
1803        external_in.send((1, 1)).await.unwrap();
1804        assert_eq!(
1805            external_out.next().await.unwrap(),
1806            vec![(1, 1)].into_iter().collect()
1807        );
1808
1809        external_in.send((1, 2)).await.unwrap();
1810        assert_eq!(
1811            external_out.next().await.unwrap(),
1812            vec![(1, 2)].into_iter().collect()
1813        );
1814
1815        external_in.send((2, 2)).await.unwrap();
1816        assert_eq!(
1817            external_out.next().await.unwrap(),
1818            vec![(1, 2), (2, 1)].into_iter().collect()
1819        );
1820
1821        external_in.send((1, 1)).await.unwrap();
1822        assert_eq!(
1823            external_out.next().await.unwrap(),
1824            vec![(1, 3), (2, 1)].into_iter().collect()
1825        );
1826
1827        external_in.send((3, 1)).await.unwrap();
1828        assert_eq!(
1829            external_out.next().await.unwrap(),
1830            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1831        );
1832    }
1833
1834    #[cfg(feature = "sim")]
1835    #[test]
1836    fn sim_unbounded_singleton_snapshot() {
1837        let mut flow = FlowBuilder::new();
1838        let node = flow.process::<()>();
1839
1840        let (input_port, input) = node.sim_input();
1841        let output = input
1842            .into_keyed()
1843            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1844            .snapshot(&node.tick(), nondet!(/** test */))
1845            .entries()
1846            .all_ticks()
1847            .sim_output();
1848
1849        let count = flow.sim().exhaustive(async || {
1850            input_port.send((1, 123));
1851            input_port.send((1, 456));
1852            input_port.send((2, 123));
1853
1854            let all = output.collect_sorted::<Vec<_>>().await;
1855            assert_eq!(all.last().unwrap(), &(2, 1));
1856        });
1857
1858        assert_eq!(count, 8);
1859    }
1860
1861    #[cfg(feature = "deploy")]
1862    #[tokio::test]
1863    async fn join_keyed_stream() {
1864        let mut deployment = Deployment::new();
1865
1866        let mut flow = FlowBuilder::new();
1867        let node = flow.process::<()>();
1868        let external = flow.external::<()>();
1869
1870        let tick = node.tick();
1871        let keyed_data = node
1872            .source_iter(q!(vec![(1, 10), (2, 20)]))
1873            .into_keyed()
1874            .batch(&tick, nondet!(/** test */))
1875            .first();
1876        let requests = node
1877            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1878            .into_keyed()
1879            .batch(&tick, nondet!(/** test */));
1880
1881        let out = keyed_data
1882            .join_keyed_stream(requests)
1883            .entries()
1884            .all_ticks()
1885            .send_bincode_external(&external);
1886
1887        let nodes = flow
1888            .with_process(&node, deployment.Localhost())
1889            .with_external(&external, deployment.Localhost())
1890            .deploy(&mut deployment);
1891
1892        deployment.deploy().await.unwrap();
1893
1894        let mut external_out = nodes.connect(out).await;
1895
1896        deployment.start().await.unwrap();
1897
1898        let mut results = vec![];
1899        for _ in 0..2 {
1900            results.push(external_out.next().await.unwrap());
1901        }
1902        results.sort();
1903
1904        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1905    }
1906}