hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<
47 UnderlyingBound = Self::UnderlyingBound,
48 ValueBound = Bounded,
49 EraseMonotonic = Self::WithBoundedValue,
50 >;
51
52 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55 /// The type of the keyed singleton if the value for each key is no longer monotonic.
56 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59 fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63 type UnderlyingBound = Unbounded;
64 type ValueBound = Unbounded;
65 type WithBoundedValue = BoundedValue;
66 type KeyedStreamToMonotone = MonotonicValue;
67 type EraseMonotonic = Unbounded;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Unbounded
71 }
72}
73
74impl KeyedSingletonBound for Bounded {
75 type UnderlyingBound = Bounded;
76 type ValueBound = Bounded;
77 type WithBoundedValue = Bounded;
78 type KeyedStreamToMonotone = Bounded;
79 type EraseMonotonic = Bounded;
80
81 fn bound_kind() -> KeyedSingletonBoundKind {
82 KeyedSingletonBoundKind::Bounded
83 }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91 type UnderlyingBound = Unbounded;
92 type ValueBound = Bounded;
93 type WithBoundedValue = BoundedValue;
94 type KeyedStreamToMonotone = BoundedValue;
95 type EraseMonotonic = BoundedValue;
96
97 fn bound_kind() -> KeyedSingletonBoundKind {
98 KeyedSingletonBoundKind::BoundedValue
99 }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107 type UnderlyingBound = Unbounded;
108 type ValueBound = Unbounded;
109 type WithBoundedValue = BoundedValue;
110 type KeyedStreamToMonotone = MonotonicValue;
111 type EraseMonotonic = Unbounded;
112
113 fn bound_kind() -> KeyedSingletonBoundKind {
114 KeyedSingletonBoundKind::MonotonicValue
115 }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131/// - [`Bounded`] (local and finite)
132/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135 pub(crate) location: Loc,
136 pub(crate) ir_node: RefCell<HydroNode>,
137 pub(crate) flow_state: FlowState,
138
139 _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143 fn drop(&mut self) {
144 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147 input: Box::new(ir_node),
148 op_metadata: HydroIrOpMetadata::new(),
149 });
150 }
151 }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155 for KeyedSingleton<K, V, Loc, Bound>
156{
157 fn clone(&self) -> Self {
158 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160 *self.ir_node.borrow_mut() = HydroNode::Tee {
161 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162 metadata: self.location.new_node_metadata(Self::collection_kind()),
163 };
164 }
165
166 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167 KeyedSingleton {
168 location: self.location.clone(),
169 flow_state: self.flow_state.clone(),
170 ir_node: HydroNode::Tee {
171 inner: SharedNode(inner.0.clone()),
172 metadata: metadata.clone(),
173 }
174 .into(),
175 _phantom: PhantomData,
176 }
177 } else {
178 unreachable!()
179 }
180 }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184 for KeyedSingleton<K, V, L, B>
185where
186 L: Location<'a>,
187{
188 type Location = L;
189
190 fn create_source(cycle_id: CycleId, location: L) -> Self {
191 KeyedSingleton {
192 flow_state: location.flow_state().clone(),
193 location: location.clone(),
194 ir_node: RefCell::new(HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 }),
198 _phantom: PhantomData,
199 }
200 }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205 L: Location<'a>,
206{
207 type Location = Tick<L>;
208
209 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210 KeyedSingleton::new(
211 location.clone(),
212 HydroNode::CycleSource {
213 cycle_id,
214 metadata: location.new_node_metadata(Self::collection_kind()),
215 },
216 )
217 }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 KeyedSingleton::defer_tick(self)
226 }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230 for KeyedSingleton<K, V, L, B>
231where
232 L: Location<'a>,
233{
234 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235 assert_eq!(
236 Location::id(&self.location),
237 expected_location,
238 "locations do not match"
239 );
240 self.location
241 .flow_state()
242 .borrow_mut()
243 .push_root(HydroRoot::CycleSink {
244 cycle_id,
245 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248 }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253 L: Location<'a>,
254{
255 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256 assert_eq!(
257 Location::id(&self.location),
258 expected_location,
259 "locations do not match"
260 );
261 self.location
262 .flow_state()
263 .borrow_mut()
264 .push_root(HydroRoot::CycleSink {
265 cycle_id,
266 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267 op_metadata: HydroIrOpMetadata::new(),
268 });
269 }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277 let flow_state = location.flow_state().clone();
278 KeyedSingleton {
279 location,
280 flow_state,
281 ir_node: RefCell::new(ir_node),
282 _phantom: PhantomData,
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed singleton is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Weakens the consistency of this live collection to not guarantee any consistency across
292 /// cluster members (if this collection is on a cluster).
293 pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
294 where
295 L: Location<'a>,
296 {
297 if L::consistency()
298 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299 {
300 // already no consistency
301 KeyedSingleton::new(
302 self.location.drop_consistency(),
303 self.ir_node.replace(HydroNode::Placeholder),
304 )
305 } else {
306 KeyedSingleton::new(
307 self.location.drop_consistency(),
308 HydroNode::Cast {
309 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310 metadata: self
311 .location
312 .drop_consistency()
313 .new_node_metadata(
314 KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
315 ),
316 },
317 )
318 }
319 }
320
321 /// Casts this live collection to have the consistency guarantees specified in the given
322 /// location type parameter. The developer must ensure that the strengthened consistency
323 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
325 self,
326 _proof: impl crate::properties::ConsistencyProof,
327 ) -> KeyedSingleton<K, V, L2, B>
328 where
329 L: Location<'a>,
330 {
331 if L::consistency() == L2::consistency() {
332 // already consistent
333 KeyedSingleton::new(
334 self.location.with_consistency_of(),
335 self.ir_node.replace(HydroNode::Placeholder),
336 )
337 } else {
338 KeyedSingleton::new(
339 self.location.with_consistency_of(),
340 HydroNode::AssertIsConsistent {
341 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
342 trusted: false,
343 metadata: self
344 .location
345 .clone()
346 .with_consistency_of::<L2>()
347 .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
348 },
349 )
350 }
351 }
352}
353
354#[cfg(stageleft_runtime)]
355fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
356 me: KeyedSingleton<K, V, L, Bounded>,
357) -> Singleton<usize, L, Bounded> {
358 me.entries().count()
359}
360
361#[cfg(stageleft_runtime)]
362fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
363 me: KeyedSingleton<K, V, L, Bounded>,
364) -> Singleton<HashMap<K, V>, L, Bounded>
365where
366 K: Eq + Hash,
367{
368 me.entries()
369 .assume_ordering_trusted(nondet!(
370 /// There is only one element associated with each key. The closure technically
371 /// isn't commutative in the case where both passed entries have the same key
372 /// but different values.
373 ///
374 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
375 /// the key is never already present in the map.
376 ))
377 .fold(
378 q!(|| HashMap::new()),
379 q!(|map, (k, v)| {
380 map.insert(k, v);
381 }),
382 )
383}
384
385impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
386 pub(crate) fn collection_kind() -> CollectionKind {
387 CollectionKind::KeyedSingleton {
388 bound: B::bound_kind(),
389 key_type: stageleft::quote_type::<K>().into(),
390 value_type: stageleft::quote_type::<V>().into(),
391 }
392 }
393
394 /// Transforms each value by invoking `f` on each element, with keys staying the same
395 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
396 ///
397 /// If you do not want to modify the stream and instead only want to view
398 /// each item use [`KeyedSingleton::inspect`] instead.
399 ///
400 /// # Example
401 /// ```rust
402 /// # #[cfg(feature = "deploy")] {
403 /// # use hydro_lang::prelude::*;
404 /// # use futures::StreamExt;
405 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
406 /// let keyed_singleton = // { 1: 2, 2: 4 }
407 /// # process
408 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
409 /// # .into_keyed()
410 /// # .first();
411 /// keyed_singleton.map(q!(|v| v + 1))
412 /// # .entries()
413 /// # }, |mut stream| async move {
414 /// // { 1: 3, 2: 5 }
415 /// # let mut results = Vec::new();
416 /// # for _ in 0..2 {
417 /// # results.push(stream.next().await.unwrap());
418 /// # }
419 /// # results.sort();
420 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
421 /// # }));
422 /// # }
423 /// ```
424 pub fn map<U, F>(
425 self,
426 f: impl IntoQuotedMut<'a, F, L> + Copy,
427 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
428 where
429 F: Fn(V) -> U + 'a,
430 {
431 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
432 let map_f = q!({
433 let orig = f;
434 move |(k, v)| (k, orig(v))
435 })
436 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
437 .into();
438
439 KeyedSingleton::new(
440 self.location.clone(),
441 HydroNode::Map {
442 f: map_f,
443 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
444 metadata: self.location.new_node_metadata(KeyedSingleton::<
445 K,
446 U,
447 L,
448 B::EraseMonotonic,
449 >::collection_kind()),
450 },
451 )
452 }
453
454 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
455 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
456 ///
457 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
458 /// the new value `U`. The key remains unchanged in the output.
459 ///
460 /// # Example
461 /// ```rust
462 /// # #[cfg(feature = "deploy")] {
463 /// # use hydro_lang::prelude::*;
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466 /// let keyed_singleton = // { 1: 2, 2: 4 }
467 /// # process
468 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
469 /// # .into_keyed()
470 /// # .first();
471 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
472 /// # .entries()
473 /// # }, |mut stream| async move {
474 /// // { 1: 3, 2: 6 }
475 /// # let mut results = Vec::new();
476 /// # for _ in 0..2 {
477 /// # results.push(stream.next().await.unwrap());
478 /// # }
479 /// # results.sort();
480 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
481 /// # }));
482 /// # }
483 /// ```
484 pub fn map_with_key<U, F>(
485 self,
486 f: impl IntoQuotedMut<'a, F, L> + Copy,
487 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
488 where
489 F: Fn((K, V)) -> U + 'a,
490 K: Clone,
491 {
492 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
493 let map_f = q!({
494 let orig = f;
495 move |(k, v)| {
496 let out = orig((Clone::clone(&k), v));
497 (k, out)
498 }
499 })
500 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
501 .into();
502
503 KeyedSingleton::new(
504 self.location.clone(),
505 HydroNode::Map {
506 f: map_f,
507 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508 metadata: self.location.new_node_metadata(KeyedSingleton::<
509 K,
510 U,
511 L,
512 B::EraseMonotonic,
513 >::collection_kind()),
514 },
515 )
516 }
517
518 /// Gets the number of keys in the keyed singleton.
519 ///
520 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
521 /// since keys may be added / removed over time. When the set of keys changes, the count will
522 /// be asynchronously updated.
523 ///
524 /// # Example
525 /// ```rust
526 /// # #[cfg(feature = "deploy")] {
527 /// # use hydro_lang::prelude::*;
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530 /// # let tick = process.tick();
531 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
532 /// # process
533 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
534 /// # .into_keyed()
535 /// # .batch(&tick, nondet!(/** test */))
536 /// # .first();
537 /// keyed_singleton.key_count()
538 /// # .all_ticks()
539 /// # }, |mut stream| async move {
540 /// // 3
541 /// # assert_eq!(stream.next().await.unwrap(), 3);
542 /// # }));
543 /// # }
544 /// ```
545 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
546 if B::ValueBound::BOUNDED {
547 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
548 location: self.location.clone(),
549 flow_state: self.flow_state.clone(),
550 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
551 _phantom: PhantomData,
552 };
553
554 me.entries().count().ignore_monotonic()
555 } else if L::is_top_level()
556 && let Some(tick) = self.location.try_tick()
557 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
558 {
559 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
560 self.location.clone(),
561 self.ir_node.replace(HydroNode::Placeholder),
562 );
563
564 let out =
565 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
566 .latest();
567 Singleton::new(
568 self.location.clone(),
569 out.ir_node.replace(HydroNode::Placeholder),
570 )
571 } else {
572 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
573 }
574 }
575
576 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
577 ///
578 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
579 /// asynchronously as well.
580 ///
581 /// # Example
582 /// ```rust
583 /// # #[cfg(feature = "deploy")] {
584 /// # use hydro_lang::prelude::*;
585 /// # use futures::StreamExt;
586 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
587 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
588 /// # process
589 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
590 /// # .into_keyed()
591 /// # .batch(&process.tick(), nondet!(/** test */))
592 /// # .first();
593 /// keyed_singleton.into_singleton()
594 /// # .all_ticks()
595 /// # }, |mut stream| async move {
596 /// // { 1: "a", 2: "b", 3: "c" }
597 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
598 /// # }));
599 /// # }
600 /// ```
601 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
602 where
603 K: Eq + Hash,
604 {
605 if B::ValueBound::BOUNDED {
606 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
607 location: self.location.clone(),
608 flow_state: self.flow_state.clone(),
609 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
610 _phantom: PhantomData,
611 };
612
613 me.entries()
614 .assume_ordering_trusted(nondet!(
615 /// There is only one element associated with each key. The closure technically
616 /// isn't commutative in the case where both passed entries have the same key
617 /// but different values.
618 ///
619 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
620 /// the key is never already present in the map.
621 ))
622 .fold(
623 q!(|| HashMap::new()),
624 q!(|map, (k, v)| {
625 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
626 map.insert(k, v);
627 }),
628 )
629 } else if L::is_top_level()
630 && let Some(tick) = self.location.try_tick()
631 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
632 {
633 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
634 self.location.clone(),
635 self.ir_node.replace(HydroNode::Placeholder),
636 );
637
638 let out = into_singleton_inside_tick(
639 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640 )
641 .latest();
642 Singleton::new(
643 self.location.clone(),
644 out.ir_node.replace(HydroNode::Placeholder),
645 )
646 } else {
647 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
648 }
649 }
650
651 /// An operator which allows you to "name" a `HydroNode`.
652 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
653 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
654 {
655 let mut node = self.ir_node.borrow_mut();
656 let metadata = node.metadata_mut();
657 metadata.tag = Some(name.to_owned());
658 }
659 self
660 }
661
662 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
663 /// implies that `B == Bounded`.
664 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
665 where
666 B: IsBounded,
667 {
668 KeyedSingleton::new(
669 self.location.clone(),
670 self.ir_node.replace(HydroNode::Placeholder),
671 )
672 }
673
674 /// Gets the value associated with a specific key from the keyed singleton.
675 /// Returns `None` if the key is `None` or there is no associated value.
676 ///
677 /// # Example
678 /// ```rust
679 /// # #[cfg(feature = "deploy")] {
680 /// # use hydro_lang::prelude::*;
681 /// # use futures::StreamExt;
682 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
683 /// let tick = process.tick();
684 /// let keyed_data = process
685 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
686 /// .into_keyed()
687 /// .batch(&tick, nondet!(/** test */))
688 /// .first();
689 /// let key = tick.singleton(q!(1));
690 /// keyed_data.get(key).all_ticks()
691 /// # }, |mut stream| async move {
692 /// // 2
693 /// # assert_eq!(stream.next().await.unwrap(), 2);
694 /// # }));
695 /// # }
696 /// ```
697 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
698 where
699 B: IsBounded,
700 K: Hash + Eq + Clone,
701 V: Clone,
702 {
703 self.make_bounded()
704 .into_keyed_stream()
705 .get(key)
706 .cast_at_most_one_element()
707 }
708
709 /// Emit a keyed stream containing keys shared between the keyed singleton and the
710 /// keyed stream, where each value in the output keyed stream is a tuple of
711 /// (the keyed singleton's value, the keyed stream's value).
712 ///
713 /// # Example
714 /// ```rust
715 /// # #[cfg(feature = "deploy")] {
716 /// # use hydro_lang::prelude::*;
717 /// # use futures::StreamExt;
718 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719 /// let tick = process.tick();
720 /// let keyed_data = process
721 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
722 /// .into_keyed()
723 /// .batch(&tick, nondet!(/** test */))
724 /// .first();
725 /// let other_data = process
726 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
727 /// .into_keyed()
728 /// .batch(&tick, nondet!(/** test */));
729 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
730 /// # }, |mut stream| async move {
731 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
732 /// # let mut results = vec![];
733 /// # for _ in 0..3 {
734 /// # results.push(stream.next().await.unwrap());
735 /// # }
736 /// # results.sort();
737 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
738 /// # }));
739 /// # }
740 /// ```
741 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
742 self,
743 other: KeyedStream<K, V2, L, B2, O2, R2>,
744 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
745 where
746 B: IsBounded,
747 K: Eq + Hash + Clone,
748 V: Clone,
749 V2: Clone,
750 {
751 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
752 // always produces deterministic order per key (nested loop join), this could just use
753 // `join_keyed_stream` without constructing IRs manually
754 KeyedStream::new(
755 self.location.clone(),
756 HydroNode::Join {
757 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
758 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
759 metadata: self
760 .location
761 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
762 },
763 )
764 }
765
766 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
767 /// where each value in the output keyed singleton is a tuple of
768 /// (self.value, other.value).
769 ///
770 /// # Example
771 /// ```rust
772 /// # #[cfg(feature = "deploy")] {
773 /// # use hydro_lang::prelude::*;
774 /// # use futures::StreamExt;
775 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776 /// # let tick = process.tick();
777 /// let requests = // { 1: 10, 2: 20, 3: 30 }
778 /// # process
779 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
780 /// # .into_keyed()
781 /// # .batch(&tick, nondet!(/** test */))
782 /// # .first();
783 /// let other = // { 1: 100, 2: 200, 4: 400 }
784 /// # process
785 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
786 /// # .into_keyed()
787 /// # .batch(&tick, nondet!(/** test */))
788 /// # .first();
789 /// requests.join_keyed_singleton(other)
790 /// # .entries().all_ticks()
791 /// # }, |mut stream| async move {
792 /// // { 1: (10, 100), 2: (20, 200) }
793 /// # let mut results = vec![];
794 /// # for _ in 0..2 {
795 /// # results.push(stream.next().await.unwrap());
796 /// # }
797 /// # results.sort();
798 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
799 /// # }));
800 /// # }
801 /// ```
802 pub fn join_keyed_singleton<V2: Clone>(
803 self,
804 other: KeyedSingleton<K, V2, L, Bounded>,
805 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
806 where
807 B: IsBounded,
808 K: Eq + Hash + Clone,
809 V: Clone,
810 {
811 let result_stream = self
812 .make_bounded()
813 .entries()
814 .join(other.entries())
815 .into_keyed();
816
817 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
818 result_stream.cast_at_most_one_entry_per_key()
819 }
820
821 /// For each value in `self`, find the matching key in `lookup`.
822 /// The output is a keyed singleton with the key from `self`, and a value
823 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
824 /// If the key is not present in `lookup`, the option will be [`None`].
825 ///
826 /// # Example
827 /// ```rust
828 /// # #[cfg(feature = "deploy")] {
829 /// # use hydro_lang::prelude::*;
830 /// # use futures::StreamExt;
831 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
832 /// # let tick = process.tick();
833 /// let requests = // { 1: 10, 2: 20 }
834 /// # process
835 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
836 /// # .into_keyed()
837 /// # .batch(&tick, nondet!(/** test */))
838 /// # .first();
839 /// let other_data = // { 10: 100, 11: 110 }
840 /// # process
841 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
842 /// # .into_keyed()
843 /// # .batch(&tick, nondet!(/** test */))
844 /// # .first();
845 /// requests.lookup_keyed_singleton(other_data)
846 /// # .entries().all_ticks()
847 /// # }, |mut stream| async move {
848 /// // { 1: (10, Some(100)), 2: (20, None) }
849 /// # let mut results = vec![];
850 /// # for _ in 0..2 {
851 /// # results.push(stream.next().await.unwrap());
852 /// # }
853 /// # results.sort();
854 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
855 /// # }));
856 /// # }
857 /// ```
858 pub fn lookup_keyed_singleton<V2>(
859 self,
860 lookup: KeyedSingleton<V, V2, L, Bounded>,
861 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
862 where
863 B: IsBounded,
864 K: Eq + Hash + Clone,
865 V: Eq + Hash + Clone,
866 V2: Clone,
867 {
868 let result_stream = self
869 .make_bounded()
870 .into_keyed_stream()
871 .lookup_keyed_stream(lookup.into_keyed_stream());
872
873 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
874 result_stream.cast_at_most_one_entry_per_key()
875 }
876
877 /// For each value in `self`, find the matching key in `lookup`.
878 /// The output is a keyed stream with the key from `self`, and a value
879 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
880 /// If the key is not present in `lookup`, the option will be [`None`].
881 ///
882 /// # Example
883 /// ```rust
884 /// # #[cfg(feature = "deploy")] {
885 /// # use hydro_lang::prelude::*;
886 /// # use futures::StreamExt;
887 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888 /// # let tick = process.tick();
889 /// let requests = // { 1: 10, 2: 20 }
890 /// # process
891 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
892 /// # .into_keyed()
893 /// # .batch(&tick, nondet!(/** test */))
894 /// # .first();
895 /// let other_data = // { 10: 100, 10: 110 }
896 /// # process
897 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
898 /// # .into_keyed()
899 /// # .batch(&tick, nondet!(/** test */));
900 /// requests.lookup_keyed_stream(other_data)
901 /// # .entries().all_ticks()
902 /// # }, |mut stream| async move {
903 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
904 /// # let mut results = vec![];
905 /// # for _ in 0..3 {
906 /// # results.push(stream.next().await.unwrap());
907 /// # }
908 /// # results.sort();
909 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
910 /// # }));
911 /// # }
912 /// ```
913 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
914 self,
915 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
916 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
917 where
918 B: IsBounded,
919 K: Eq + Hash + Clone,
920 V: Eq + Hash + Clone,
921 V2: Clone,
922 {
923 self.make_bounded()
924 .entries()
925 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
926 .into_keyed()
927 .lookup_keyed_stream(lookup)
928 }
929}
930
931impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
932 KeyedSingleton<K, V, L, B>
933{
934 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
935 ///
936 /// The value for each key must be bounded, otherwise the resulting stream elements would be
937 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
938 /// into the output.
939 ///
940 /// # Example
941 /// ```rust
942 /// # #[cfg(feature = "deploy")] {
943 /// # use hydro_lang::prelude::*;
944 /// # use futures::StreamExt;
945 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
946 /// let keyed_singleton = // { 1: 2, 2: 4 }
947 /// # process
948 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
949 /// # .into_keyed()
950 /// # .first();
951 /// keyed_singleton.entries()
952 /// # }, |mut stream| async move {
953 /// // (1, 2), (2, 4) in any order
954 /// # let mut results = Vec::new();
955 /// # for _ in 0..2 {
956 /// # results.push(stream.next().await.unwrap());
957 /// # }
958 /// # results.sort();
959 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
964 self.into_keyed_stream().entries()
965 }
966
967 /// Flattens the keyed singleton into an unordered stream of just the values.
968 ///
969 /// The value for each key must be bounded, otherwise the resulting stream elements would be
970 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
971 /// into the output.
972 ///
973 /// # Example
974 /// ```rust
975 /// # #[cfg(feature = "deploy")] {
976 /// # use hydro_lang::prelude::*;
977 /// # use futures::StreamExt;
978 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
979 /// let keyed_singleton = // { 1: 2, 2: 4 }
980 /// # process
981 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
982 /// # .into_keyed()
983 /// # .first();
984 /// keyed_singleton.values()
985 /// # }, |mut stream| async move {
986 /// // 2, 4 in any order
987 /// # let mut results = Vec::new();
988 /// # for _ in 0..2 {
989 /// # results.push(stream.next().await.unwrap());
990 /// # }
991 /// # results.sort();
992 /// # assert_eq!(results, vec![2, 4]);
993 /// # }));
994 /// # }
995 /// ```
996 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
997 let map_f = q!(|(_, v)| v)
998 .splice_fn1_ctx::<(K, V), V>(&self.location)
999 .into();
1000
1001 Stream::new(
1002 self.location.clone(),
1003 HydroNode::Map {
1004 f: map_f,
1005 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1006 metadata: self.location.new_node_metadata(Stream::<
1007 V,
1008 L,
1009 B::UnderlyingBound,
1010 NoOrder,
1011 ExactlyOnce,
1012 >::collection_kind()),
1013 },
1014 )
1015 }
1016
1017 /// Flattens the keyed singleton into an unordered stream of just the keys.
1018 ///
1019 /// The value for each key must be bounded, otherwise the removal of keys would result in
1020 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1021 /// into the output.
1022 ///
1023 /// # Example
1024 /// ```rust
1025 /// # #[cfg(feature = "deploy")] {
1026 /// # use hydro_lang::prelude::*;
1027 /// # use futures::StreamExt;
1028 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1029 /// let keyed_singleton = // { 1: 2, 2: 4 }
1030 /// # process
1031 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1032 /// # .into_keyed()
1033 /// # .first();
1034 /// keyed_singleton.keys()
1035 /// # }, |mut stream| async move {
1036 /// // 1, 2 in any order
1037 /// # let mut results = Vec::new();
1038 /// # for _ in 0..2 {
1039 /// # results.push(stream.next().await.unwrap());
1040 /// # }
1041 /// # results.sort();
1042 /// # assert_eq!(results, vec![1, 2]);
1043 /// # }));
1044 /// # }
1045 /// ```
1046 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1047 self.entries().map(q!(|(k, _)| k))
1048 }
1049
1050 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1051 /// entries whose keys are not in the provided stream.
1052 ///
1053 /// # Example
1054 /// ```rust
1055 /// # #[cfg(feature = "deploy")] {
1056 /// # use hydro_lang::prelude::*;
1057 /// # use futures::StreamExt;
1058 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1059 /// let tick = process.tick();
1060 /// let keyed_singleton = // { 1: 2, 2: 4 }
1061 /// # process
1062 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1063 /// # .into_keyed()
1064 /// # .first()
1065 /// # .batch(&tick, nondet!(/** test */));
1066 /// let keys_to_remove = process
1067 /// .source_iter(q!(vec![1]))
1068 /// .batch(&tick, nondet!(/** test */));
1069 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1070 /// # .entries().all_ticks()
1071 /// # }, |mut stream| async move {
1072 /// // { 2: 4 }
1073 /// # for w in vec![(2, 4)] {
1074 /// # assert_eq!(stream.next().await.unwrap(), w);
1075 /// # }
1076 /// # }));
1077 /// # }
1078 /// ```
1079 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1080 self,
1081 other: Stream<K, L, Bounded, O2, R2>,
1082 ) -> Self
1083 where
1084 K: Hash + Eq,
1085 {
1086 check_matching_location(&self.location, &other.location);
1087
1088 KeyedSingleton::new(
1089 self.location.clone(),
1090 HydroNode::AntiJoin {
1091 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1092 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1093 metadata: self.location.new_node_metadata(Self::collection_kind()),
1094 },
1095 )
1096 }
1097
1098 /// An operator which allows you to "inspect" each value of a keyed singleton without
1099 /// modifying it. The closure `f` is called on a reference to each value. This is
1100 /// mainly useful for debugging, and should not be used to generate side-effects.
1101 ///
1102 /// # Example
1103 /// ```rust
1104 /// # #[cfg(feature = "deploy")] {
1105 /// # use hydro_lang::prelude::*;
1106 /// # use futures::StreamExt;
1107 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108 /// let keyed_singleton = // { 1: 2, 2: 4 }
1109 /// # process
1110 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1111 /// # .into_keyed()
1112 /// # .first();
1113 /// keyed_singleton
1114 /// .inspect(q!(|v| println!("{}", v)))
1115 /// # .entries()
1116 /// # }, |mut stream| async move {
1117 /// // { 1: 2, 2: 4 }
1118 /// # for w in vec![(1, 2), (2, 4)] {
1119 /// # assert_eq!(stream.next().await.unwrap(), w);
1120 /// # }
1121 /// # }));
1122 /// # }
1123 /// ```
1124 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1125 where
1126 F: Fn(&V) + 'a,
1127 {
1128 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1129 let inspect_f = q!({
1130 let orig = f;
1131 move |t: &(_, _)| orig(&t.1)
1132 })
1133 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1134 .into();
1135
1136 KeyedSingleton::new(
1137 self.location.clone(),
1138 HydroNode::Inspect {
1139 f: inspect_f,
1140 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1141 metadata: self.location.new_node_metadata(Self::collection_kind()),
1142 },
1143 )
1144 }
1145
1146 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1147 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1148 /// mainly useful for debugging, and should not be used to generate side-effects.
1149 ///
1150 /// # Example
1151 /// ```rust
1152 /// # #[cfg(feature = "deploy")] {
1153 /// # use hydro_lang::prelude::*;
1154 /// # use futures::StreamExt;
1155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156 /// let keyed_singleton = // { 1: 2, 2: 4 }
1157 /// # process
1158 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1159 /// # .into_keyed()
1160 /// # .first();
1161 /// keyed_singleton
1162 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1163 /// # .entries()
1164 /// # }, |mut stream| async move {
1165 /// // { 1: 2, 2: 4 }
1166 /// # for w in vec![(1, 2), (2, 4)] {
1167 /// # assert_eq!(stream.next().await.unwrap(), w);
1168 /// # }
1169 /// # }));
1170 /// # }
1171 /// ```
1172 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1173 where
1174 F: Fn(&(K, V)) + 'a,
1175 {
1176 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1177
1178 KeyedSingleton::new(
1179 self.location.clone(),
1180 HydroNode::Inspect {
1181 f: inspect_f,
1182 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1183 metadata: self.location.new_node_metadata(Self::collection_kind()),
1184 },
1185 )
1186 }
1187
1188 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1189 ///
1190 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1191 /// asynchronously updated if a new key is added that is higher than the previous max key.
1192 ///
1193 /// # Example
1194 /// ```rust
1195 /// # #[cfg(feature = "deploy")] {
1196 /// # use hydro_lang::prelude::*;
1197 /// # use futures::StreamExt;
1198 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1199 /// let tick = process.tick();
1200 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1201 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1202 /// # .into_keyed()
1203 /// # .first();
1204 /// keyed_singleton.get_max_key()
1205 /// # .sample_eager(nondet!(/** test */))
1206 /// # }, |mut stream| async move {
1207 /// // (2, 456)
1208 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1209 /// # }));
1210 /// # }
1211 /// ```
1212 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1213 where
1214 K: Ord,
1215 {
1216 self.entries()
1217 .assume_ordering_trusted(nondet!(
1218 /// There is only one element associated with each key, and the keys are totallly
1219 /// ordered so we will produce a deterministic value. The closure technically
1220 /// isn't commutative in the case where both passed entries have the same key
1221 /// but different values.
1222 ///
1223 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1224 /// the two inputs do not have the same key.
1225 ))
1226 .reduce(q!(
1227 move |curr, new| {
1228 if new.0 > curr.0 {
1229 *curr = new;
1230 }
1231 },
1232 idempotent = manual_proof!(/** repeated elements are ignored */)
1233 ))
1234 }
1235
1236 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1237 /// element, the value.
1238 ///
1239 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1240 ///
1241 /// # Example
1242 /// ```rust
1243 /// # #[cfg(feature = "deploy")] {
1244 /// # use hydro_lang::prelude::*;
1245 /// # use futures::StreamExt;
1246 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1247 /// let keyed_singleton = // { 1: 2, 2: 4 }
1248 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1249 /// # .into_keyed()
1250 /// # .first();
1251 /// keyed_singleton
1252 /// .clone()
1253 /// .into_keyed_stream()
1254 /// .merge_unordered(
1255 /// keyed_singleton.into_keyed_stream()
1256 /// )
1257 /// # .entries()
1258 /// # }, |mut stream| async move {
1259 /// /// // { 1: [2, 2], 2: [4, 4] }
1260 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1261 /// # assert_eq!(stream.next().await.unwrap(), w);
1262 /// # }
1263 /// # }));
1264 /// # }
1265 /// ```
1266 pub fn into_keyed_stream(
1267 self,
1268 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1269 KeyedStream::new(
1270 self.location.clone(),
1271 HydroNode::Cast {
1272 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1273 metadata: self.location.new_node_metadata(KeyedStream::<
1274 K,
1275 V,
1276 L,
1277 B::UnderlyingBound,
1278 TotalOrder,
1279 ExactlyOnce,
1280 >::collection_kind()),
1281 },
1282 )
1283 }
1284}
1285
1286impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1287where
1288 L: Location<'a>,
1289{
1290 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1291 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1292 ///
1293 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1294 /// processed before an acknowledgement is emitted.
1295 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1296 let id = self.location.flow_state().borrow_mut().next_clock_id();
1297 let out_location = Atomic {
1298 tick: Tick {
1299 id,
1300 l: self.location.clone(),
1301 },
1302 };
1303 KeyedSingleton::new(
1304 out_location.clone(),
1305 HydroNode::BeginAtomic {
1306 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1307 metadata: out_location
1308 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1309 },
1310 )
1311 }
1312}
1313
1314impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1315where
1316 L: Location<'a>,
1317{
1318 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1319 /// See [`KeyedSingleton::atomic`] for more details.
1320 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1321 KeyedSingleton::new(
1322 self.location.tick.l.clone(),
1323 HydroNode::EndAtomic {
1324 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1325 metadata: self
1326 .location
1327 .tick
1328 .l
1329 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1330 },
1331 )
1332 }
1333}
1334
1335impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1336 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1337 /// tick `T` always has the entries of `self` at tick `T - 1`.
1338 ///
1339 /// At tick `0`, the output has no entries, since there is no previous tick.
1340 ///
1341 /// This operator enables stateful iterative processing with ticks, by sending data from one
1342 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1343 ///
1344 /// # Example
1345 /// ```rust
1346 /// # #[cfg(feature = "deploy")] {
1347 /// # use hydro_lang::prelude::*;
1348 /// # use futures::StreamExt;
1349 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1350 /// let tick = process.tick();
1351 /// # // ticks are lazy by default, forces the second tick to run
1352 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1353 /// # let batch_first_tick = process
1354 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1355 /// # .batch(&tick, nondet!(/** test */))
1356 /// # .into_keyed();
1357 /// # let batch_second_tick = process
1358 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1359 /// # .batch(&tick, nondet!(/** test */))
1360 /// # .into_keyed()
1361 /// # .defer_tick(); // appears on the second tick
1362 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1363 /// # batch_first_tick.chain(batch_second_tick).first();
1364 /// input_batch.clone().filter_key_not_in(
1365 /// input_batch.defer_tick().keys() // keys present in the previous tick
1366 /// )
1367 /// # .entries().all_ticks()
1368 /// # }, |mut stream| async move {
1369 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1370 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1371 /// # assert_eq!(stream.next().await.unwrap(), w);
1372 /// # }
1373 /// # }));
1374 /// # }
1375 /// ```
1376 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1377 KeyedSingleton::new(
1378 self.location.clone(),
1379 HydroNode::DeferTick {
1380 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1381 metadata: self
1382 .location
1383 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1384 },
1385 )
1386 }
1387}
1388
1389impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1390where
1391 L: Location<'a>,
1392{
1393 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1394 /// point in time.
1395 ///
1396 /// # Non-Determinism
1397 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1398 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1399 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1400 self,
1401 tick: &Tick<L2>,
1402 _nondet: NonDet,
1403 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1404 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1405 KeyedSingleton::new(
1406 tick.drop_consistency(),
1407 HydroNode::Batch {
1408 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1409 metadata: tick
1410 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1411 },
1412 )
1413 }
1414}
1415
1416impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1417where
1418 L: Location<'a>,
1419{
1420 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1421 /// state of the keyed singleton being atomically processed.
1422 ///
1423 /// # Non-Determinism
1424 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1425 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1426 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1427 self,
1428 tick: &Tick<L2>,
1429 _nondet: NonDet,
1430 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1431 KeyedSingleton::new(
1432 tick.drop_consistency(),
1433 HydroNode::Batch {
1434 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435 metadata: tick
1436 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1437 },
1438 )
1439 }
1440}
1441
1442impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1443where
1444 L: Location<'a>,
1445{
1446 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1447 ///
1448 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1449 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1450 /// is filtered out.
1451 ///
1452 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1453 /// not modify or take ownership of the values. If you need to modify the values while filtering
1454 /// use [`KeyedSingleton::filter_map`] instead.
1455 ///
1456 /// # Example
1457 /// ```rust
1458 /// # #[cfg(feature = "deploy")] {
1459 /// # use hydro_lang::prelude::*;
1460 /// # use futures::StreamExt;
1461 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1462 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1463 /// # process
1464 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1465 /// # .into_keyed()
1466 /// # .first();
1467 /// keyed_singleton.filter(q!(|&v| v > 1))
1468 /// # .entries()
1469 /// # }, |mut stream| async move {
1470 /// // { 1: 2, 2: 4 }
1471 /// # let mut results = Vec::new();
1472 /// # for _ in 0..2 {
1473 /// # results.push(stream.next().await.unwrap());
1474 /// # }
1475 /// # results.sort();
1476 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1477 /// # }));
1478 /// # }
1479 /// ```
1480 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1481 where
1482 F: Fn(&V) -> bool + 'a,
1483 {
1484 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1485 let filter_f = q!({
1486 let orig = f;
1487 move |t: &(_, _)| orig(&t.1)
1488 })
1489 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1490 .into();
1491
1492 KeyedSingleton::new(
1493 self.location.clone(),
1494 HydroNode::Filter {
1495 f: filter_f,
1496 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1497 metadata: self
1498 .location
1499 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1500 },
1501 )
1502 }
1503
1504 /// An operator that both filters and maps values. It yields only the key-value pairs where
1505 /// the supplied closure `f` returns `Some(value)`.
1506 ///
1507 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1508 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1509 /// If it returns `None`, the key-value pair is filtered out.
1510 ///
1511 /// # Example
1512 /// ```rust
1513 /// # #[cfg(feature = "deploy")] {
1514 /// # use hydro_lang::prelude::*;
1515 /// # use futures::StreamExt;
1516 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1517 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1518 /// # process
1519 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1520 /// # .into_keyed()
1521 /// # .first();
1522 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1523 /// # .entries()
1524 /// # }, |mut stream| async move {
1525 /// // { 1: 42, 3: 100 }
1526 /// # let mut results = Vec::new();
1527 /// # for _ in 0..2 {
1528 /// # results.push(stream.next().await.unwrap());
1529 /// # }
1530 /// # results.sort();
1531 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1532 /// # }));
1533 /// # }
1534 /// ```
1535 pub fn filter_map<F, U>(
1536 self,
1537 f: impl IntoQuotedMut<'a, F, L> + Copy,
1538 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1539 where
1540 F: Fn(V) -> Option<U> + 'a,
1541 {
1542 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1543 let filter_map_f = q!({
1544 let orig = f;
1545 move |(k, v)| orig(v).map(|o| (k, o))
1546 })
1547 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1548 .into();
1549
1550 KeyedSingleton::new(
1551 self.location.clone(),
1552 HydroNode::FilterMap {
1553 f: filter_map_f,
1554 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1555 metadata: self.location.new_node_metadata(KeyedSingleton::<
1556 K,
1557 U,
1558 L,
1559 B::EraseMonotonic,
1560 >::collection_kind()),
1561 },
1562 )
1563 }
1564
1565 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1566 /// arrived since the previous batch was released.
1567 ///
1568 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1569 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1570 ///
1571 /// # Non-Determinism
1572 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1573 /// has a non-deterministic set of key-value pairs.
1574 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1575 self,
1576 tick: &Tick<L2>,
1577 _nondet: NonDet,
1578 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1579 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1580 KeyedSingleton::new(
1581 tick.drop_consistency(),
1582 HydroNode::Batch {
1583 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1584 metadata: tick
1585 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1586 },
1587 )
1588 }
1589}
1590
1591impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1592where
1593 L: Location<'a>,
1594{
1595 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1596 /// atomically processed.
1597 ///
1598 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1599 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1600 ///
1601 /// # Non-Determinism
1602 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1603 /// has a non-deterministic set of key-value pairs.
1604 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1605 self,
1606 tick: &Tick<L2>,
1607 nondet: NonDet,
1608 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1609 let _ = nondet;
1610 KeyedSingleton::new(
1611 tick.drop_consistency(),
1612 HydroNode::Batch {
1613 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1614 metadata: tick
1615 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1616 },
1617 )
1618 }
1619}
1620
1621#[cfg(test)]
1622mod tests {
1623 #[cfg(feature = "deploy")]
1624 use futures::{SinkExt, StreamExt};
1625 #[cfg(feature = "deploy")]
1626 use hydro_deploy::Deployment;
1627 #[cfg(any(feature = "deploy", feature = "sim"))]
1628 use stageleft::q;
1629
1630 #[cfg(any(feature = "deploy", feature = "sim"))]
1631 use crate::compile::builder::FlowBuilder;
1632 #[cfg(any(feature = "deploy", feature = "sim"))]
1633 use crate::location::Location;
1634 #[cfg(any(feature = "deploy", feature = "sim"))]
1635 use crate::nondet::nondet;
1636
1637 #[cfg(feature = "deploy")]
1638 #[tokio::test]
1639 async fn key_count_bounded_value() {
1640 let mut deployment = Deployment::new();
1641
1642 let mut flow = FlowBuilder::new();
1643 let node = flow.process::<()>();
1644 let external = flow.external::<()>();
1645
1646 let (input_port, input) = node.source_external_bincode(&external);
1647 let out = input
1648 .into_keyed()
1649 .first()
1650 .key_count()
1651 .sample_eager(nondet!(/** test */))
1652 .send_bincode_external(&external);
1653
1654 let nodes = flow
1655 .with_process(&node, deployment.Localhost())
1656 .with_external(&external, deployment.Localhost())
1657 .deploy(&mut deployment);
1658
1659 deployment.deploy().await.unwrap();
1660
1661 let mut external_in = nodes.connect(input_port).await;
1662 let mut external_out = nodes.connect(out).await;
1663
1664 deployment.start().await.unwrap();
1665
1666 assert_eq!(external_out.next().await.unwrap(), 0);
1667
1668 external_in.send((1, 1)).await.unwrap();
1669 assert_eq!(external_out.next().await.unwrap(), 1);
1670
1671 external_in.send((2, 2)).await.unwrap();
1672 assert_eq!(external_out.next().await.unwrap(), 2);
1673 }
1674
1675 #[cfg(feature = "deploy")]
1676 #[tokio::test]
1677 async fn key_count_unbounded_value() {
1678 let mut deployment = Deployment::new();
1679
1680 let mut flow = FlowBuilder::new();
1681 let node = flow.process::<()>();
1682 let external = flow.external::<()>();
1683
1684 let (input_port, input) = node.source_external_bincode(&external);
1685 let out = input
1686 .into_keyed()
1687 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1688 .key_count()
1689 .sample_eager(nondet!(/** test */))
1690 .send_bincode_external(&external);
1691
1692 let nodes = flow
1693 .with_process(&node, deployment.Localhost())
1694 .with_external(&external, deployment.Localhost())
1695 .deploy(&mut deployment);
1696
1697 deployment.deploy().await.unwrap();
1698
1699 let mut external_in = nodes.connect(input_port).await;
1700 let mut external_out = nodes.connect(out).await;
1701
1702 deployment.start().await.unwrap();
1703
1704 assert_eq!(external_out.next().await.unwrap(), 0);
1705
1706 external_in.send((1, 1)).await.unwrap();
1707 assert_eq!(external_out.next().await.unwrap(), 1);
1708
1709 external_in.send((1, 2)).await.unwrap();
1710 assert_eq!(external_out.next().await.unwrap(), 1);
1711
1712 external_in.send((2, 2)).await.unwrap();
1713 assert_eq!(external_out.next().await.unwrap(), 2);
1714
1715 external_in.send((1, 1)).await.unwrap();
1716 assert_eq!(external_out.next().await.unwrap(), 2);
1717
1718 external_in.send((3, 1)).await.unwrap();
1719 assert_eq!(external_out.next().await.unwrap(), 3);
1720 }
1721
1722 #[cfg(feature = "deploy")]
1723 #[tokio::test]
1724 async fn into_singleton_bounded_value() {
1725 let mut deployment = Deployment::new();
1726
1727 let mut flow = FlowBuilder::new();
1728 let node = flow.process::<()>();
1729 let external = flow.external::<()>();
1730
1731 let (input_port, input) = node.source_external_bincode(&external);
1732 let out = input
1733 .into_keyed()
1734 .first()
1735 .into_singleton()
1736 .sample_eager(nondet!(/** test */))
1737 .send_bincode_external(&external);
1738
1739 let nodes = flow
1740 .with_process(&node, deployment.Localhost())
1741 .with_external(&external, deployment.Localhost())
1742 .deploy(&mut deployment);
1743
1744 deployment.deploy().await.unwrap();
1745
1746 let mut external_in = nodes.connect(input_port).await;
1747 let mut external_out = nodes.connect(out).await;
1748
1749 deployment.start().await.unwrap();
1750
1751 assert_eq!(
1752 external_out.next().await.unwrap(),
1753 std::collections::HashMap::new()
1754 );
1755
1756 external_in.send((1, 1)).await.unwrap();
1757 assert_eq!(
1758 external_out.next().await.unwrap(),
1759 vec![(1, 1)].into_iter().collect()
1760 );
1761
1762 external_in.send((2, 2)).await.unwrap();
1763 assert_eq!(
1764 external_out.next().await.unwrap(),
1765 vec![(1, 1), (2, 2)].into_iter().collect()
1766 );
1767 }
1768
1769 #[cfg(feature = "deploy")]
1770 #[tokio::test]
1771 async fn into_singleton_unbounded_value() {
1772 let mut deployment = Deployment::new();
1773
1774 let mut flow = FlowBuilder::new();
1775 let node = flow.process::<()>();
1776 let external = flow.external::<()>();
1777
1778 let (input_port, input) = node.source_external_bincode(&external);
1779 let out = input
1780 .into_keyed()
1781 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1782 .into_singleton()
1783 .sample_eager(nondet!(/** test */))
1784 .send_bincode_external(&external);
1785
1786 let nodes = flow
1787 .with_process(&node, deployment.Localhost())
1788 .with_external(&external, deployment.Localhost())
1789 .deploy(&mut deployment);
1790
1791 deployment.deploy().await.unwrap();
1792
1793 let mut external_in = nodes.connect(input_port).await;
1794 let mut external_out = nodes.connect(out).await;
1795
1796 deployment.start().await.unwrap();
1797
1798 assert_eq!(
1799 external_out.next().await.unwrap(),
1800 std::collections::HashMap::new()
1801 );
1802
1803 external_in.send((1, 1)).await.unwrap();
1804 assert_eq!(
1805 external_out.next().await.unwrap(),
1806 vec![(1, 1)].into_iter().collect()
1807 );
1808
1809 external_in.send((1, 2)).await.unwrap();
1810 assert_eq!(
1811 external_out.next().await.unwrap(),
1812 vec![(1, 2)].into_iter().collect()
1813 );
1814
1815 external_in.send((2, 2)).await.unwrap();
1816 assert_eq!(
1817 external_out.next().await.unwrap(),
1818 vec![(1, 2), (2, 1)].into_iter().collect()
1819 );
1820
1821 external_in.send((1, 1)).await.unwrap();
1822 assert_eq!(
1823 external_out.next().await.unwrap(),
1824 vec![(1, 3), (2, 1)].into_iter().collect()
1825 );
1826
1827 external_in.send((3, 1)).await.unwrap();
1828 assert_eq!(
1829 external_out.next().await.unwrap(),
1830 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1831 );
1832 }
1833
1834 #[cfg(feature = "sim")]
1835 #[test]
1836 fn sim_unbounded_singleton_snapshot() {
1837 let mut flow = FlowBuilder::new();
1838 let node = flow.process::<()>();
1839
1840 let (input_port, input) = node.sim_input();
1841 let output = input
1842 .into_keyed()
1843 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1844 .snapshot(&node.tick(), nondet!(/** test */))
1845 .entries()
1846 .all_ticks()
1847 .sim_output();
1848
1849 let count = flow.sim().exhaustive(async || {
1850 input_port.send((1, 123));
1851 input_port.send((1, 456));
1852 input_port.send((2, 123));
1853
1854 let all = output.collect_sorted::<Vec<_>>().await;
1855 assert_eq!(all.last().unwrap(), &(2, 1));
1856 });
1857
1858 assert_eq!(count, 8);
1859 }
1860
1861 #[cfg(feature = "deploy")]
1862 #[tokio::test]
1863 async fn join_keyed_stream() {
1864 let mut deployment = Deployment::new();
1865
1866 let mut flow = FlowBuilder::new();
1867 let node = flow.process::<()>();
1868 let external = flow.external::<()>();
1869
1870 let tick = node.tick();
1871 let keyed_data = node
1872 .source_iter(q!(vec![(1, 10), (2, 20)]))
1873 .into_keyed()
1874 .batch(&tick, nondet!(/** test */))
1875 .first();
1876 let requests = node
1877 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1878 .into_keyed()
1879 .batch(&tick, nondet!(/** test */));
1880
1881 let out = keyed_data
1882 .join_keyed_stream(requests)
1883 .entries()
1884 .all_ticks()
1885 .send_bincode_external(&external);
1886
1887 let nodes = flow
1888 .with_process(&node, deployment.Localhost())
1889 .with_external(&external, deployment.Localhost())
1890 .deploy(&mut deployment);
1891
1892 deployment.deploy().await.unwrap();
1893
1894 let mut external_out = nodes.connect(out).await;
1895
1896 deployment.start().await.unwrap();
1897
1898 let mut results = vec![];
1899 for _ in 0..2 {
1900 results.push(external_out.next().await.unwrap());
1901 }
1902 results.sort();
1903
1904 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1905 }
1906}