1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub expr: DebugExpr,
46 pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
47}
48
49impl Clone for ClosureExpr {
50 fn clone(&self) -> Self {
51 Self {
52 expr: self.expr.clone(),
53 singleton_refs: self
54 .singleton_refs
55 .iter()
56 .map(|(ident, node)| {
57 let cloned_node = match node {
58 HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
59 inner: SharedNode(inner.0.clone()),
60 metadata: metadata.clone(),
61 },
62 _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
63 };
64 (ident.clone(), cloned_node)
65 })
66 .collect(),
67 }
68 }
69}
70
71impl Hash for ClosureExpr {
72 fn hash<H: Hasher>(&self, state: &mut H) {
73 self.expr.hash(state);
74 }
78}
79
80impl serde::Serialize for ClosureExpr {
81 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82 use serde::ser::SerializeStruct;
83 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
84 s.serialize_field("expr", &self.expr)?;
85 s.serialize_field(
86 "singleton_refs",
87 &SerializableSingletonRefs(&self.singleton_refs),
88 )?;
89 s.end()
90 }
91}
92
93struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
94
95impl serde::Serialize for SerializableSingletonRefs<'_> {
96 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
97 use serde::ser::SerializeSeq;
98 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
99 for (ident, node) in self.0 {
100 seq.serialize_element(&(ident.to_string(), node))?;
101 }
102 seq.end()
103 }
104}
105
106impl Debug for ClosureExpr {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 Debug::fmt(&self.expr, f)
109 }
110}
111
112impl Display for ClosureExpr {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 Display::fmt(&self.expr, f)
115 }
116}
117
118impl From<syn::Expr> for ClosureExpr {
119 fn from(expr: syn::Expr) -> Self {
120 Self {
121 expr: DebugExpr(Box::new(expr)),
122 singleton_refs: Vec::new(),
123 }
124 }
125}
126
127impl From<DebugExpr> for ClosureExpr {
128 fn from(expr: DebugExpr) -> Self {
129 Self {
130 expr,
131 singleton_refs: Vec::new(),
132 }
133 }
134}
135
136impl ClosureExpr {
137 pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
138 Self {
139 expr,
140 singleton_refs,
141 }
142 }
143
144 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
145 Self {
146 expr: self.expr.clone(),
147 singleton_refs: self
148 .singleton_refs
149 .iter()
150 .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
151 .collect(),
152 }
153 }
154
155 pub fn transform_children(
156 &mut self,
157 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
158 seen_tees: &mut SeenSharedNodes,
159 ) {
160 for (_ident, ref_node) in self.singleton_refs.iter_mut() {
161 transform(ref_node, seen_tees);
162 }
163 }
164
165 #[cfg(feature = "build")]
168 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
169 if self.singleton_refs.is_empty() {
170 self.expr.0.to_token_stream()
171 } else {
172 let ref_idents = (0..self.singleton_refs.len())
173 .map(|_| ident_stack.pop().unwrap())
174 .collect::<Vec<_>>()
175 .into_iter()
176 .rev()
177 .collect::<Vec<_>>();
178 let local_idents = self
179 .singleton_refs
180 .iter()
181 .map(|(local_ident, _)| local_ident);
182 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
183 let expr = &self.expr.0;
184 quote! {
185 {
186 #(
187 let #local_idents = #hash #ref_idents;
188 )*
189 #expr
190 }
191 }
192 }
193 }
194}
195
196#[derive(Clone, Hash)]
200pub struct DebugExpr(pub Box<syn::Expr>);
201
202impl serde::Serialize for DebugExpr {
203 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
204 serializer.serialize_str(&self.to_string())
205 }
206}
207
208impl From<syn::Expr> for DebugExpr {
209 fn from(expr: syn::Expr) -> Self {
210 Self(Box::new(expr))
211 }
212}
213
214impl Deref for DebugExpr {
215 type Target = syn::Expr;
216
217 fn deref(&self) -> &Self::Target {
218 &self.0
219 }
220}
221
222impl ToTokens for DebugExpr {
223 fn to_tokens(&self, tokens: &mut TokenStream) {
224 self.0.to_tokens(tokens);
225 }
226}
227
228impl Debug for DebugExpr {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 write!(f, "{}", self.0.to_token_stream())
231 }
232}
233
234impl Display for DebugExpr {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 let original = self.0.as_ref().clone();
237 let simplified = simplify_q_macro(original);
238
239 write!(f, "q!({})", quote::quote!(#simplified))
242 }
243}
244
245fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
247 let mut simplifier = QMacroSimplifier::new();
250 simplifier.visit_expr_mut(&mut expr);
251
252 if let Some(simplified) = simplifier.simplified_result {
254 simplified
255 } else {
256 expr
257 }
258}
259
260#[derive(Default)]
262pub struct QMacroSimplifier {
263 pub simplified_result: Option<syn::Expr>,
264}
265
266impl QMacroSimplifier {
267 pub fn new() -> Self {
268 Self::default()
269 }
270}
271
272impl VisitMut for QMacroSimplifier {
273 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
274 if self.simplified_result.is_some() {
276 return;
277 }
278
279 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
280 && self.is_stageleft_runtime_support_call(&path_expr.path)
282 && let Some(closure) = self.extract_closure_from_args(&call.args)
284 {
285 self.simplified_result = Some(closure);
286 return;
287 }
288
289 syn::visit_mut::visit_expr_mut(self, expr);
292 }
293}
294
295impl QMacroSimplifier {
296 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
297 if let Some(last_segment) = path.segments.last() {
299 let fn_name = last_segment.ident.to_string();
300 fn_name.contains("_type_hint")
302 && path.segments.len() > 2
303 && path.segments[0].ident == "stageleft"
304 && path.segments[1].ident == "runtime_support"
305 } else {
306 false
307 }
308 }
309
310 fn extract_closure_from_args(
311 &self,
312 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
313 ) -> Option<syn::Expr> {
314 for arg in args {
316 if let syn::Expr::Closure(_) = arg {
317 return Some(arg.clone());
318 }
319 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
321 return Some(closure_expr);
322 }
323 }
324 None
325 }
326
327 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
328 let mut visitor = ClosureFinder {
329 found_closure: None,
330 prefer_inner_blocks: true,
331 };
332 visitor.visit_expr(expr);
333 visitor.found_closure
334 }
335}
336
337struct ClosureFinder {
339 found_closure: Option<syn::Expr>,
340 prefer_inner_blocks: bool,
341}
342
343impl<'ast> Visit<'ast> for ClosureFinder {
344 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
345 if self.found_closure.is_some() {
347 return;
348 }
349
350 match expr {
351 syn::Expr::Closure(_) => {
352 self.found_closure = Some(expr.clone());
353 }
354 syn::Expr::Block(block) if self.prefer_inner_blocks => {
355 for stmt in &block.block.stmts {
357 if let syn::Stmt::Expr(stmt_expr, _) = stmt
358 && let syn::Expr::Block(_) = stmt_expr
359 {
360 let mut inner_visitor = ClosureFinder {
362 found_closure: None,
363 prefer_inner_blocks: false, };
365 inner_visitor.visit_expr(stmt_expr);
366 if inner_visitor.found_closure.is_some() {
367 self.found_closure = Some(stmt_expr.clone());
369 return;
370 }
371 }
372 }
373
374 visit::visit_expr(self, expr);
376
377 if self.found_closure.is_some() {
380 }
382 }
383 _ => {
384 visit::visit_expr(self, expr);
386 }
387 }
388 }
389}
390
391#[derive(Clone, PartialEq, Eq, Hash)]
395pub struct DebugType(pub Box<syn::Type>);
396
397impl From<syn::Type> for DebugType {
398 fn from(t: syn::Type) -> Self {
399 Self(Box::new(t))
400 }
401}
402
403impl Deref for DebugType {
404 type Target = syn::Type;
405
406 fn deref(&self) -> &Self::Target {
407 &self.0
408 }
409}
410
411impl ToTokens for DebugType {
412 fn to_tokens(&self, tokens: &mut TokenStream) {
413 self.0.to_tokens(tokens);
414 }
415}
416
417impl Debug for DebugType {
418 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419 write!(f, "{}", self.0.to_token_stream())
420 }
421}
422
423impl serde::Serialize for DebugType {
424 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
426 }
427}
428
429fn serialize_backtrace_as_span<S: serde::Serializer>(
430 backtrace: &Backtrace,
431 serializer: S,
432) -> Result<S::Ok, S::Error> {
433 match backtrace.format_span() {
434 Some(span) => serializer.serialize_some(&span),
435 None => serializer.serialize_none(),
436 }
437}
438
439fn serialize_ident<S: serde::Serializer>(
440 ident: &syn::Ident,
441 serializer: S,
442) -> Result<S::Ok, S::Error> {
443 serializer.serialize_str(&ident.to_string())
444}
445
446pub enum DebugInstantiate {
447 Building,
448 Finalized(Box<DebugInstantiateFinalized>),
449}
450
451impl serde::Serialize for DebugInstantiate {
452 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
453 match self {
454 DebugInstantiate::Building => {
455 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
456 }
457 DebugInstantiate::Finalized(_) => {
458 panic!(
459 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
460 )
461 }
462 }
463 }
464}
465
466#[cfg_attr(
467 not(feature = "build"),
468 expect(
469 dead_code,
470 reason = "sink, source unused without `feature = \"build\"`."
471 )
472)]
473pub struct DebugInstantiateFinalized {
474 sink: syn::Expr,
475 source: syn::Expr,
476 connect_fn: Option<Box<dyn FnOnce()>>,
477}
478
479impl From<DebugInstantiateFinalized> for DebugInstantiate {
480 fn from(f: DebugInstantiateFinalized) -> Self {
481 Self::Finalized(Box::new(f))
482 }
483}
484
485impl Debug for DebugInstantiate {
486 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487 write!(f, "<network instantiate>")
488 }
489}
490
491impl Hash for DebugInstantiate {
492 fn hash<H: Hasher>(&self, _state: &mut H) {
493 }
495}
496
497impl Clone for DebugInstantiate {
498 fn clone(&self) -> Self {
499 match self {
500 DebugInstantiate::Building => DebugInstantiate::Building,
501 DebugInstantiate::Finalized(_) => {
502 panic!("DebugInstantiate::Finalized should not be cloned")
503 }
504 }
505 }
506}
507
508#[derive(Debug, Hash, Clone, serde::Serialize)]
517pub enum ClusterMembersState {
518 Uninit,
520 Stream(DebugExpr),
523 Tee(LocationId, LocationId),
527}
528
529#[derive(Debug, Hash, Clone, serde::Serialize)]
531pub enum HydroSource {
532 Stream(DebugExpr),
533 ExternalNetwork(),
534 Iter(DebugExpr),
535 Spin(),
536 ClusterMembers(LocationId, ClusterMembersState),
537 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
538 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
539}
540
541#[cfg(feature = "build")]
542pub trait DfirBuilder {
548 fn singleton_intermediates(&self) -> bool;
550
551 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
553
554 #[expect(clippy::too_many_arguments, reason = "TODO")]
555 fn batch(
556 &mut self,
557 in_ident: syn::Ident,
558 in_location: &LocationId,
559 in_kind: &CollectionKind,
560 out_ident: &syn::Ident,
561 out_location: &LocationId,
562 op_meta: &HydroIrOpMetadata,
563 fold_hooked_idents: &HashSet<String>,
564 );
565 fn yield_from_tick(
566 &mut self,
567 in_ident: syn::Ident,
568 in_location: &LocationId,
569 in_kind: &CollectionKind,
570 out_ident: &syn::Ident,
571 out_location: &LocationId,
572 );
573
574 fn begin_atomic(
575 &mut self,
576 in_ident: syn::Ident,
577 in_location: &LocationId,
578 in_kind: &CollectionKind,
579 out_ident: &syn::Ident,
580 out_location: &LocationId,
581 op_meta: &HydroIrOpMetadata,
582 );
583 fn end_atomic(
584 &mut self,
585 in_ident: syn::Ident,
586 in_location: &LocationId,
587 in_kind: &CollectionKind,
588 out_ident: &syn::Ident,
589 );
590
591 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
592 fn observe_nondet(
593 &mut self,
594 trusted: bool,
595 location: &LocationId,
596 in_ident: syn::Ident,
597 in_kind: &CollectionKind,
598 out_ident: &syn::Ident,
599 out_kind: &CollectionKind,
600 op_meta: &HydroIrOpMetadata,
601 );
602
603 #[expect(clippy::too_many_arguments, reason = "TODO")]
604 fn merge_ordered(
605 &mut self,
606 location: &LocationId,
607 first_ident: syn::Ident,
608 second_ident: syn::Ident,
609 out_ident: &syn::Ident,
610 in_kind: &CollectionKind,
611 op_meta: &HydroIrOpMetadata,
612 operator_tag: Option<&str>,
613 );
614
615 #[expect(clippy::too_many_arguments, reason = "TODO")]
616 fn create_network(
617 &mut self,
618 from: &LocationId,
619 to: &LocationId,
620 input_ident: syn::Ident,
621 out_ident: &syn::Ident,
622 serialize: Option<&DebugExpr>,
623 sink: syn::Expr,
624 source: syn::Expr,
625 deserialize: Option<&DebugExpr>,
626 tag_id: StmtId,
627 networking_info: &crate::networking::NetworkingInfo,
628 );
629
630 fn create_external_source(
631 &mut self,
632 on: &LocationId,
633 source_expr: syn::Expr,
634 out_ident: &syn::Ident,
635 deserialize: Option<&DebugExpr>,
636 tag_id: StmtId,
637 );
638
639 fn create_external_output(
640 &mut self,
641 on: &LocationId,
642 sink_expr: syn::Expr,
643 input_ident: &syn::Ident,
644 serialize: Option<&DebugExpr>,
645 tag_id: StmtId,
646 );
647
648 fn emit_fold_hook(
651 &mut self,
652 location: &LocationId,
653 in_ident: &syn::Ident,
654 in_kind: &CollectionKind,
655 op_meta: &HydroIrOpMetadata,
656 ) -> Option<syn::Ident>;
657
658 fn assert_is_consistent(
662 &mut self,
663 trusted: bool,
664 location: &LocationId,
665 in_ident: syn::Ident,
666 out_ident: &syn::Ident,
667 );
668}
669
670#[cfg(feature = "build")]
671impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
672 fn singleton_intermediates(&self) -> bool {
673 false
674 }
675
676 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
677 self.entry(location.root().key())
678 .expect("location was removed")
679 .or_default()
680 }
681
682 fn batch(
683 &mut self,
684 in_ident: syn::Ident,
685 in_location: &LocationId,
686 in_kind: &CollectionKind,
687 out_ident: &syn::Ident,
688 _out_location: &LocationId,
689 _op_meta: &HydroIrOpMetadata,
690 _fold_hooked_idents: &HashSet<String>,
691 ) {
692 let builder = self.get_dfir_mut(in_location.root());
693 if in_kind.is_bounded()
694 && matches!(
695 in_kind,
696 CollectionKind::Singleton { .. }
697 | CollectionKind::Optional { .. }
698 | CollectionKind::KeyedSingleton { .. }
699 )
700 {
701 assert!(in_location.is_top_level());
702 builder.add_dfir(
703 parse_quote! {
704 #out_ident = #in_ident -> persist::<'static>();
705 },
706 None,
707 None,
708 );
709 } else {
710 builder.add_dfir(
711 parse_quote! {
712 #out_ident = #in_ident;
713 },
714 None,
715 None,
716 );
717 }
718 }
719
720 fn yield_from_tick(
721 &mut self,
722 in_ident: syn::Ident,
723 in_location: &LocationId,
724 _in_kind: &CollectionKind,
725 out_ident: &syn::Ident,
726 _out_location: &LocationId,
727 ) {
728 let builder = self.get_dfir_mut(in_location.root());
729 builder.add_dfir(
730 parse_quote! {
731 #out_ident = #in_ident;
732 },
733 None,
734 None,
735 );
736 }
737
738 fn begin_atomic(
739 &mut self,
740 in_ident: syn::Ident,
741 in_location: &LocationId,
742 _in_kind: &CollectionKind,
743 out_ident: &syn::Ident,
744 _out_location: &LocationId,
745 _op_meta: &HydroIrOpMetadata,
746 ) {
747 let builder = self.get_dfir_mut(in_location.root());
748 builder.add_dfir(
749 parse_quote! {
750 #out_ident = #in_ident;
751 },
752 None,
753 None,
754 );
755 }
756
757 fn end_atomic(
758 &mut self,
759 in_ident: syn::Ident,
760 in_location: &LocationId,
761 _in_kind: &CollectionKind,
762 out_ident: &syn::Ident,
763 ) {
764 let builder = self.get_dfir_mut(in_location.root());
765 builder.add_dfir(
766 parse_quote! {
767 #out_ident = #in_ident;
768 },
769 None,
770 None,
771 );
772 }
773
774 fn observe_nondet(
775 &mut self,
776 _trusted: bool,
777 location: &LocationId,
778 in_ident: syn::Ident,
779 _in_kind: &CollectionKind,
780 out_ident: &syn::Ident,
781 _out_kind: &CollectionKind,
782 _op_meta: &HydroIrOpMetadata,
783 ) {
784 let builder = self.get_dfir_mut(location);
785 builder.add_dfir(
786 parse_quote! {
787 #out_ident = #in_ident;
788 },
789 None,
790 None,
791 );
792 }
793
794 fn merge_ordered(
795 &mut self,
796 location: &LocationId,
797 first_ident: syn::Ident,
798 second_ident: syn::Ident,
799 out_ident: &syn::Ident,
800 _in_kind: &CollectionKind,
801 _op_meta: &HydroIrOpMetadata,
802 operator_tag: Option<&str>,
803 ) {
804 let builder = self.get_dfir_mut(location);
805 builder.add_dfir(
806 parse_quote! {
807 #out_ident = union();
808 #first_ident -> [0]#out_ident;
809 #second_ident -> [1]#out_ident;
810 },
811 None,
812 operator_tag,
813 );
814 }
815
816 fn create_network(
817 &mut self,
818 from: &LocationId,
819 to: &LocationId,
820 input_ident: syn::Ident,
821 out_ident: &syn::Ident,
822 serialize: Option<&DebugExpr>,
823 sink: syn::Expr,
824 source: syn::Expr,
825 deserialize: Option<&DebugExpr>,
826 tag_id: StmtId,
827 _networking_info: &crate::networking::NetworkingInfo,
828 ) {
829 let sender_builder = self.get_dfir_mut(from);
830 if let Some(serialize_pipeline) = serialize {
831 sender_builder.add_dfir(
832 parse_quote! {
833 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
834 },
835 None,
836 Some(&format!("send{}", tag_id)),
838 );
839 } else {
840 sender_builder.add_dfir(
841 parse_quote! {
842 #input_ident -> dest_sink(#sink);
843 },
844 None,
845 Some(&format!("send{}", tag_id)),
846 );
847 }
848
849 let receiver_builder = self.get_dfir_mut(to);
850 if let Some(deserialize_pipeline) = deserialize {
851 receiver_builder.add_dfir(
852 parse_quote! {
853 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
854 },
855 None,
856 Some(&format!("recv{}", tag_id)),
857 );
858 } else {
859 receiver_builder.add_dfir(
860 parse_quote! {
861 #out_ident = source_stream(#source);
862 },
863 None,
864 Some(&format!("recv{}", tag_id)),
865 );
866 }
867 }
868
869 fn create_external_source(
870 &mut self,
871 on: &LocationId,
872 source_expr: syn::Expr,
873 out_ident: &syn::Ident,
874 deserialize: Option<&DebugExpr>,
875 tag_id: StmtId,
876 ) {
877 let receiver_builder = self.get_dfir_mut(on);
878 if let Some(deserialize_pipeline) = deserialize {
879 receiver_builder.add_dfir(
880 parse_quote! {
881 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
882 },
883 None,
884 Some(&format!("recv{}", tag_id)),
885 );
886 } else {
887 receiver_builder.add_dfir(
888 parse_quote! {
889 #out_ident = source_stream(#source_expr);
890 },
891 None,
892 Some(&format!("recv{}", tag_id)),
893 );
894 }
895 }
896
897 fn create_external_output(
898 &mut self,
899 on: &LocationId,
900 sink_expr: syn::Expr,
901 input_ident: &syn::Ident,
902 serialize: Option<&DebugExpr>,
903 tag_id: StmtId,
904 ) {
905 let sender_builder = self.get_dfir_mut(on);
906 if let Some(serialize_fn) = serialize {
907 sender_builder.add_dfir(
908 parse_quote! {
909 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
910 },
911 None,
912 Some(&format!("send{}", tag_id)),
914 );
915 } else {
916 sender_builder.add_dfir(
917 parse_quote! {
918 #input_ident -> dest_sink(#sink_expr);
919 },
920 None,
921 Some(&format!("send{}", tag_id)),
922 );
923 }
924 }
925
926 fn emit_fold_hook(
927 &mut self,
928 _location: &LocationId,
929 _in_ident: &syn::Ident,
930 _in_kind: &CollectionKind,
931 _op_meta: &HydroIrOpMetadata,
932 ) -> Option<syn::Ident> {
933 None
934 }
935
936 fn assert_is_consistent(
937 &mut self,
938 _trusted: bool,
939 location: &LocationId,
940 in_ident: syn::Ident,
941 out_ident: &syn::Ident,
942 ) {
943 let builder = self.get_dfir_mut(location);
944 builder.add_dfir(
945 parse_quote! {
946 #out_ident = #in_ident;
947 },
948 None,
949 None,
950 );
951 }
952}
953
954#[cfg(feature = "build")]
955pub enum BuildersOrCallback<'a, L, N>
956where
957 L: FnMut(&mut HydroRoot, &mut StmtId),
958 N: FnMut(&mut HydroNode, &mut StmtId),
959{
960 Builders(&'a mut dyn DfirBuilder),
961 Callback(L, N),
962}
963
964#[derive(Debug, Hash, serde::Serialize)]
968pub enum HydroRoot {
969 ForEach {
970 f: DebugExpr,
971 input: Box<HydroNode>,
972 op_metadata: HydroIrOpMetadata,
973 },
974 SendExternal {
975 to_external_key: LocationKey,
976 to_port_id: ExternalPortId,
977 to_many: bool,
978 unpaired: bool,
979 serialize_fn: Option<DebugExpr>,
980 instantiate_fn: DebugInstantiate,
981 input: Box<HydroNode>,
982 op_metadata: HydroIrOpMetadata,
983 },
984 DestSink {
985 sink: DebugExpr,
986 input: Box<HydroNode>,
987 op_metadata: HydroIrOpMetadata,
988 },
989 CycleSink {
990 cycle_id: CycleId,
991 input: Box<HydroNode>,
992 op_metadata: HydroIrOpMetadata,
993 },
994 EmbeddedOutput {
995 #[serde(serialize_with = "serialize_ident")]
996 ident: syn::Ident,
997 input: Box<HydroNode>,
998 op_metadata: HydroIrOpMetadata,
999 },
1000 Null {
1001 input: Box<HydroNode>,
1002 op_metadata: HydroIrOpMetadata,
1003 },
1004}
1005
1006impl HydroRoot {
1007 #[cfg(feature = "build")]
1008 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1009 pub fn compile_network<'a, D>(
1010 &mut self,
1011 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1012 seen_tees: &mut SeenSharedNodes,
1013 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1014 processes: &SparseSecondaryMap<LocationKey, D::Process>,
1015 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1016 externals: &SparseSecondaryMap<LocationKey, D::External>,
1017 env: &mut D::InstantiateEnv,
1018 ) where
1019 D: Deploy<'a>,
1020 {
1021 let refcell_extra_stmts = RefCell::new(extra_stmts);
1022 let refcell_env = RefCell::new(env);
1023 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1024 self.transform_bottom_up(
1025 &mut |l| {
1026 if let HydroRoot::SendExternal {
1027 input,
1028 to_external_key,
1029 to_port_id,
1030 to_many,
1031 unpaired,
1032 instantiate_fn,
1033 ..
1034 } = l
1035 {
1036 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1037 DebugInstantiate::Building => {
1038 let to_node = externals
1039 .get(*to_external_key)
1040 .unwrap_or_else(|| {
1041 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1042 })
1043 .clone();
1044
1045 match input.metadata().location_id.root() {
1046 &LocationId::Process(process_key) => {
1047 if *to_many {
1048 (
1049 (
1050 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1051 parse_quote!(DUMMY),
1052 ),
1053 Box::new(|| {}) as Box<dyn FnOnce()>,
1054 )
1055 } else {
1056 let from_node = processes
1057 .get(process_key)
1058 .unwrap_or_else(|| {
1059 panic!("A process used in the graph was not instantiated: {}", process_key)
1060 })
1061 .clone();
1062
1063 let sink_port = from_node.next_port();
1064 let source_port = to_node.next_port();
1065
1066 if *unpaired {
1067 use stageleft::quote_type;
1068 use tokio_util::codec::LengthDelimitedCodec;
1069
1070 to_node.register(*to_port_id, source_port.clone());
1071
1072 let _ = D::e2o_source(
1073 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1074 &to_node, &source_port,
1075 &from_node, &sink_port,
1076 "e_type::<LengthDelimitedCodec>(),
1077 format!("{}_{}", *to_external_key, *to_port_id)
1078 );
1079 }
1080
1081 (
1082 (
1083 D::o2e_sink(
1084 &from_node,
1085 &sink_port,
1086 &to_node,
1087 &source_port,
1088 format!("{}_{}", *to_external_key, *to_port_id)
1089 ),
1090 parse_quote!(DUMMY),
1091 ),
1092 if *unpaired {
1093 D::e2o_connect(
1094 &to_node,
1095 &source_port,
1096 &from_node,
1097 &sink_port,
1098 *to_many,
1099 NetworkHint::Auto,
1100 )
1101 } else {
1102 Box::new(|| {}) as Box<dyn FnOnce()>
1103 },
1104 )
1105 }
1106 }
1107 LocationId::Cluster(cluster_key) => {
1108 let from_node = clusters
1109 .get(*cluster_key)
1110 .unwrap_or_else(|| {
1111 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1112 })
1113 .clone();
1114
1115 let sink_port = from_node.next_port();
1116 let source_port = to_node.next_port();
1117
1118 if *unpaired {
1119 to_node.register(*to_port_id, source_port.clone());
1120 }
1121
1122 (
1123 (
1124 D::m2e_sink(
1125 &from_node,
1126 &sink_port,
1127 &to_node,
1128 &source_port,
1129 format!("{}_{}", *to_external_key, *to_port_id)
1130 ),
1131 parse_quote!(DUMMY),
1132 ),
1133 Box::new(|| {}) as Box<dyn FnOnce()>,
1134 )
1135 }
1136 _ => panic!()
1137 }
1138 },
1139
1140 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1141 };
1142
1143 *instantiate_fn = DebugInstantiateFinalized {
1144 sink: sink_expr,
1145 source: source_expr,
1146 connect_fn: Some(connect_fn),
1147 }
1148 .into();
1149 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1150 let element_type = match &input.metadata().collection_kind {
1151 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1152 _ => panic!("Embedded output must have Stream collection kind"),
1153 };
1154 let location_key = match input.metadata().location_id.root() {
1155 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1156 _ => panic!("Embedded output must be on a process or cluster"),
1157 };
1158 D::register_embedded_output(
1159 &mut refcell_env.borrow_mut(),
1160 location_key,
1161 ident,
1162 &element_type,
1163 );
1164 }
1165 },
1166 &mut |n| {
1167 if let HydroNode::Network {
1168 name,
1169 networking_info,
1170 input,
1171 instantiate_fn,
1172 metadata,
1173 ..
1174 } = n
1175 {
1176 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1177 DebugInstantiate::Building => instantiate_network::<D>(
1178 &mut refcell_env.borrow_mut(),
1179 input.metadata().location_id.root(),
1180 metadata.location_id.root(),
1181 processes,
1182 clusters,
1183 name.as_deref(),
1184 networking_info,
1185 ),
1186
1187 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1188 };
1189
1190 *instantiate_fn = DebugInstantiateFinalized {
1191 sink: sink_expr,
1192 source: source_expr,
1193 connect_fn: Some(connect_fn),
1194 }
1195 .into();
1196 } else if let HydroNode::ExternalInput {
1197 from_external_key,
1198 from_port_id,
1199 from_many,
1200 codec_type,
1201 port_hint,
1202 instantiate_fn,
1203 metadata,
1204 ..
1205 } = n
1206 {
1207 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1208 DebugInstantiate::Building => {
1209 let from_node = externals
1210 .get(*from_external_key)
1211 .unwrap_or_else(|| {
1212 panic!(
1213 "A external used in the graph was not instantiated: {}",
1214 from_external_key,
1215 )
1216 })
1217 .clone();
1218
1219 match metadata.location_id.root() {
1220 &LocationId::Process(process_key) => {
1221 let to_node = processes
1222 .get(process_key)
1223 .unwrap_or_else(|| {
1224 panic!("A process used in the graph was not instantiated: {}", process_key)
1225 })
1226 .clone();
1227
1228 let sink_port = from_node.next_port();
1229 let source_port = to_node.next_port();
1230
1231 from_node.register(*from_port_id, sink_port.clone());
1232
1233 (
1234 (
1235 parse_quote!(DUMMY),
1236 if *from_many {
1237 D::e2o_many_source(
1238 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1239 &to_node, &source_port,
1240 codec_type.0.as_ref(),
1241 format!("{}_{}", *from_external_key, *from_port_id)
1242 )
1243 } else {
1244 D::e2o_source(
1245 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1246 &from_node, &sink_port,
1247 &to_node, &source_port,
1248 codec_type.0.as_ref(),
1249 format!("{}_{}", *from_external_key, *from_port_id)
1250 )
1251 },
1252 ),
1253 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1254 )
1255 }
1256 LocationId::Cluster(cluster_key) => {
1257 let to_node = clusters
1258 .get(*cluster_key)
1259 .unwrap_or_else(|| {
1260 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1261 })
1262 .clone();
1263
1264 let sink_port = from_node.next_port();
1265 let source_port = to_node.next_port();
1266
1267 from_node.register(*from_port_id, sink_port.clone());
1268
1269 (
1270 (
1271 parse_quote!(DUMMY),
1272 D::e2m_source(
1273 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1274 &from_node, &sink_port,
1275 &to_node, &source_port,
1276 codec_type.0.as_ref(),
1277 format!("{}_{}", *from_external_key, *from_port_id)
1278 ),
1279 ),
1280 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1281 )
1282 }
1283 _ => panic!()
1284 }
1285 },
1286
1287 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1288 };
1289
1290 *instantiate_fn = DebugInstantiateFinalized {
1291 sink: sink_expr,
1292 source: source_expr,
1293 connect_fn: Some(connect_fn),
1294 }
1295 .into();
1296 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1297 let element_type = match &metadata.collection_kind {
1298 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1299 _ => panic!("Embedded source must have Stream collection kind"),
1300 };
1301 let location_key = match metadata.location_id.root() {
1302 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1303 _ => panic!("Embedded source must be on a process or cluster"),
1304 };
1305 D::register_embedded_stream_input(
1306 &mut refcell_env.borrow_mut(),
1307 location_key,
1308 ident,
1309 &element_type,
1310 );
1311 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1312 let element_type = match &metadata.collection_kind {
1313 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1314 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1315 };
1316 let location_key = match metadata.location_id.root() {
1317 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1318 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1319 };
1320 D::register_embedded_singleton_input(
1321 &mut refcell_env.borrow_mut(),
1322 location_key,
1323 ident,
1324 &element_type,
1325 );
1326 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1327 match state {
1328 ClusterMembersState::Uninit => {
1329 let at_location = metadata.location_id.root().clone();
1330 let key = (at_location.clone(), location_id.key());
1331 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1332 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1334 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1335 &(),
1336 );
1337 *state = ClusterMembersState::Stream(expr.into());
1338 } else {
1339 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1341 }
1342 }
1343 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1344 panic!("cluster members already finalized");
1345 }
1346 }
1347 }
1348 },
1349 seen_tees,
1350 false,
1351 );
1352 }
1353
1354 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1355 self.transform_bottom_up(
1356 &mut |l| {
1357 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1358 match instantiate_fn {
1359 DebugInstantiate::Building => panic!("network not built"),
1360
1361 DebugInstantiate::Finalized(finalized) => {
1362 (finalized.connect_fn.take().unwrap())();
1363 }
1364 }
1365 }
1366 },
1367 &mut |n| {
1368 if let HydroNode::Network { instantiate_fn, .. }
1369 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1370 {
1371 match instantiate_fn {
1372 DebugInstantiate::Building => panic!("network not built"),
1373
1374 DebugInstantiate::Finalized(finalized) => {
1375 (finalized.connect_fn.take().unwrap())();
1376 }
1377 }
1378 }
1379 },
1380 seen_tees,
1381 false,
1382 );
1383 }
1384
1385 pub fn transform_bottom_up(
1386 &mut self,
1387 transform_root: &mut impl FnMut(&mut HydroRoot),
1388 transform_node: &mut impl FnMut(&mut HydroNode),
1389 seen_tees: &mut SeenSharedNodes,
1390 check_well_formed: bool,
1391 ) {
1392 self.transform_children(
1393 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1394 seen_tees,
1395 );
1396
1397 transform_root(self);
1398 }
1399
1400 pub fn transform_children(
1401 &mut self,
1402 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1403 seen_tees: &mut SeenSharedNodes,
1404 ) {
1405 match self {
1406 HydroRoot::ForEach { input, .. }
1407 | HydroRoot::SendExternal { input, .. }
1408 | HydroRoot::DestSink { input, .. }
1409 | HydroRoot::CycleSink { input, .. }
1410 | HydroRoot::EmbeddedOutput { input, .. }
1411 | HydroRoot::Null { input, .. } => {
1412 transform(input, seen_tees);
1413 }
1414 }
1415 }
1416
1417 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1418 match self {
1419 HydroRoot::ForEach {
1420 f,
1421 input,
1422 op_metadata,
1423 } => HydroRoot::ForEach {
1424 f: f.clone(),
1425 input: Box::new(input.deep_clone(seen_tees)),
1426 op_metadata: op_metadata.clone(),
1427 },
1428 HydroRoot::SendExternal {
1429 to_external_key,
1430 to_port_id,
1431 to_many,
1432 unpaired,
1433 serialize_fn,
1434 instantiate_fn,
1435 input,
1436 op_metadata,
1437 } => HydroRoot::SendExternal {
1438 to_external_key: *to_external_key,
1439 to_port_id: *to_port_id,
1440 to_many: *to_many,
1441 unpaired: *unpaired,
1442 serialize_fn: serialize_fn.clone(),
1443 instantiate_fn: instantiate_fn.clone(),
1444 input: Box::new(input.deep_clone(seen_tees)),
1445 op_metadata: op_metadata.clone(),
1446 },
1447 HydroRoot::DestSink {
1448 sink,
1449 input,
1450 op_metadata,
1451 } => HydroRoot::DestSink {
1452 sink: sink.clone(),
1453 input: Box::new(input.deep_clone(seen_tees)),
1454 op_metadata: op_metadata.clone(),
1455 },
1456 HydroRoot::CycleSink {
1457 cycle_id,
1458 input,
1459 op_metadata,
1460 } => HydroRoot::CycleSink {
1461 cycle_id: *cycle_id,
1462 input: Box::new(input.deep_clone(seen_tees)),
1463 op_metadata: op_metadata.clone(),
1464 },
1465 HydroRoot::EmbeddedOutput {
1466 ident,
1467 input,
1468 op_metadata,
1469 } => HydroRoot::EmbeddedOutput {
1470 ident: ident.clone(),
1471 input: Box::new(input.deep_clone(seen_tees)),
1472 op_metadata: op_metadata.clone(),
1473 },
1474 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1475 input: Box::new(input.deep_clone(seen_tees)),
1476 op_metadata: op_metadata.clone(),
1477 },
1478 }
1479 }
1480
1481 #[cfg(feature = "build")]
1482 pub fn emit(
1483 &mut self,
1484 graph_builders: &mut dyn DfirBuilder,
1485 seen_tees: &mut SeenSharedNodes,
1486 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1487 next_stmt_id: &mut StmtId,
1488 fold_hooked_idents: &mut HashSet<String>,
1489 ) {
1490 self.emit_core(
1491 &mut BuildersOrCallback::<
1492 fn(&mut HydroRoot, &mut StmtId),
1493 fn(&mut HydroNode, &mut StmtId),
1494 >::Builders(graph_builders),
1495 seen_tees,
1496 built_tees,
1497 next_stmt_id,
1498 fold_hooked_idents,
1499 );
1500 }
1501
1502 #[cfg(feature = "build")]
1503 pub fn emit_core(
1504 &mut self,
1505 builders_or_callback: &mut BuildersOrCallback<
1506 impl FnMut(&mut HydroRoot, &mut StmtId),
1507 impl FnMut(&mut HydroNode, &mut StmtId),
1508 >,
1509 seen_tees: &mut SeenSharedNodes,
1510 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1511 next_stmt_id: &mut StmtId,
1512 fold_hooked_idents: &mut HashSet<String>,
1513 ) {
1514 match self {
1515 HydroRoot::ForEach { f, input, .. } => {
1516 let input_ident = input.emit_core(
1517 builders_or_callback,
1518 seen_tees,
1519 built_tees,
1520 next_stmt_id,
1521 fold_hooked_idents,
1522 );
1523
1524 match builders_or_callback {
1525 BuildersOrCallback::Builders(graph_builders) => {
1526 graph_builders
1527 .get_dfir_mut(&input.metadata().location_id)
1528 .add_dfir(
1529 parse_quote! {
1530 #input_ident -> for_each(#f);
1531 },
1532 None,
1533 Some(&next_stmt_id.to_string()),
1534 );
1535 }
1536 BuildersOrCallback::Callback(leaf_callback, _) => {
1537 leaf_callback(self, next_stmt_id);
1538 }
1539 }
1540
1541 let _ = next_stmt_id.get_and_increment();
1542 }
1543
1544 HydroRoot::SendExternal {
1545 serialize_fn,
1546 instantiate_fn,
1547 input,
1548 ..
1549 } => {
1550 let input_ident = input.emit_core(
1551 builders_or_callback,
1552 seen_tees,
1553 built_tees,
1554 next_stmt_id,
1555 fold_hooked_idents,
1556 );
1557
1558 match builders_or_callback {
1559 BuildersOrCallback::Builders(graph_builders) => {
1560 let (sink_expr, _) = match instantiate_fn {
1561 DebugInstantiate::Building => (
1562 syn::parse_quote!(DUMMY_SINK),
1563 syn::parse_quote!(DUMMY_SOURCE),
1564 ),
1565
1566 DebugInstantiate::Finalized(finalized) => {
1567 (finalized.sink.clone(), finalized.source.clone())
1568 }
1569 };
1570
1571 graph_builders.create_external_output(
1572 &input.metadata().location_id,
1573 sink_expr,
1574 &input_ident,
1575 serialize_fn.as_ref(),
1576 *next_stmt_id,
1577 );
1578 }
1579 BuildersOrCallback::Callback(leaf_callback, _) => {
1580 leaf_callback(self, next_stmt_id);
1581 }
1582 }
1583
1584 let _ = next_stmt_id.get_and_increment();
1585 }
1586
1587 HydroRoot::DestSink { sink, input, .. } => {
1588 let input_ident = input.emit_core(
1589 builders_or_callback,
1590 seen_tees,
1591 built_tees,
1592 next_stmt_id,
1593 fold_hooked_idents,
1594 );
1595
1596 match builders_or_callback {
1597 BuildersOrCallback::Builders(graph_builders) => {
1598 graph_builders
1599 .get_dfir_mut(&input.metadata().location_id)
1600 .add_dfir(
1601 parse_quote! {
1602 #input_ident -> dest_sink(#sink);
1603 },
1604 None,
1605 Some(&next_stmt_id.to_string()),
1606 );
1607 }
1608 BuildersOrCallback::Callback(leaf_callback, _) => {
1609 leaf_callback(self, next_stmt_id);
1610 }
1611 }
1612
1613 let _ = next_stmt_id.get_and_increment();
1614 }
1615
1616 HydroRoot::CycleSink {
1617 cycle_id, input, ..
1618 } => {
1619 let input_ident = input.emit_core(
1620 builders_or_callback,
1621 seen_tees,
1622 built_tees,
1623 next_stmt_id,
1624 fold_hooked_idents,
1625 );
1626
1627 match builders_or_callback {
1628 BuildersOrCallback::Builders(graph_builders) => {
1629 let elem_type: syn::Type = match &input.metadata().collection_kind {
1630 CollectionKind::KeyedSingleton {
1631 key_type,
1632 value_type,
1633 ..
1634 }
1635 | CollectionKind::KeyedStream {
1636 key_type,
1637 value_type,
1638 ..
1639 } => {
1640 parse_quote!((#key_type, #value_type))
1641 }
1642 CollectionKind::Stream { element_type, .. }
1643 | CollectionKind::Singleton { element_type, .. }
1644 | CollectionKind::Optional { element_type, .. } => {
1645 parse_quote!(#element_type)
1646 }
1647 };
1648
1649 let cycle_id_ident = cycle_id.as_ident();
1650 graph_builders
1651 .get_dfir_mut(&input.metadata().location_id)
1652 .add_dfir(
1653 parse_quote! {
1654 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1655 },
1656 None,
1657 None,
1658 );
1659 }
1660 BuildersOrCallback::Callback(_, _) => {}
1662 }
1663 }
1664
1665 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1666 let input_ident = input.emit_core(
1667 builders_or_callback,
1668 seen_tees,
1669 built_tees,
1670 next_stmt_id,
1671 fold_hooked_idents,
1672 );
1673
1674 match builders_or_callback {
1675 BuildersOrCallback::Builders(graph_builders) => {
1676 graph_builders
1677 .get_dfir_mut(&input.metadata().location_id)
1678 .add_dfir(
1679 parse_quote! {
1680 #input_ident -> for_each(&mut #ident);
1681 },
1682 None,
1683 Some(&next_stmt_id.to_string()),
1684 );
1685 }
1686 BuildersOrCallback::Callback(leaf_callback, _) => {
1687 leaf_callback(self, next_stmt_id);
1688 }
1689 }
1690
1691 let _ = next_stmt_id.get_and_increment();
1692 }
1693
1694 HydroRoot::Null { input, .. } => {
1695 let input_ident = input.emit_core(
1696 builders_or_callback,
1697 seen_tees,
1698 built_tees,
1699 next_stmt_id,
1700 fold_hooked_idents,
1701 );
1702
1703 match builders_or_callback {
1704 BuildersOrCallback::Builders(graph_builders) => {
1705 graph_builders
1706 .get_dfir_mut(&input.metadata().location_id)
1707 .add_dfir(
1708 parse_quote! {
1709 #input_ident -> for_each(|_| {});
1710 },
1711 None,
1712 Some(&next_stmt_id.to_string()),
1713 );
1714 }
1715 BuildersOrCallback::Callback(leaf_callback, _) => {
1716 leaf_callback(self, next_stmt_id);
1717 }
1718 }
1719
1720 let _ = next_stmt_id.get_and_increment();
1721 }
1722 }
1723 }
1724
1725 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1726 match self {
1727 HydroRoot::ForEach { op_metadata, .. }
1728 | HydroRoot::SendExternal { op_metadata, .. }
1729 | HydroRoot::DestSink { op_metadata, .. }
1730 | HydroRoot::CycleSink { op_metadata, .. }
1731 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1732 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1733 }
1734 }
1735
1736 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1737 match self {
1738 HydroRoot::ForEach { op_metadata, .. }
1739 | HydroRoot::SendExternal { op_metadata, .. }
1740 | HydroRoot::DestSink { op_metadata, .. }
1741 | HydroRoot::CycleSink { op_metadata, .. }
1742 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1743 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1744 }
1745 }
1746
1747 pub fn input(&self) -> &HydroNode {
1748 match self {
1749 HydroRoot::ForEach { input, .. }
1750 | HydroRoot::SendExternal { input, .. }
1751 | HydroRoot::DestSink { input, .. }
1752 | HydroRoot::CycleSink { input, .. }
1753 | HydroRoot::EmbeddedOutput { input, .. }
1754 | HydroRoot::Null { input, .. } => input,
1755 }
1756 }
1757
1758 pub fn input_metadata(&self) -> &HydroIrMetadata {
1759 self.input().metadata()
1760 }
1761
1762 pub fn print_root(&self) -> String {
1763 match self {
1764 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1765 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1766 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1767 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1768 HydroRoot::EmbeddedOutput { ident, .. } => {
1769 format!("EmbeddedOutput({})", ident)
1770 }
1771 HydroRoot::Null { .. } => "Null".to_owned(),
1772 }
1773 }
1774
1775 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1776 match self {
1777 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1778 transform(f);
1779 }
1780 HydroRoot::SendExternal { .. }
1781 | HydroRoot::CycleSink { .. }
1782 | HydroRoot::EmbeddedOutput { .. }
1783 | HydroRoot::Null { .. } => {}
1784 }
1785 }
1786}
1787
1788#[cfg(feature = "build")]
1789fn tick_of(loc: &LocationId) -> Option<ClockId> {
1790 match loc {
1791 LocationId::Tick(id, _) => Some(*id),
1792 LocationId::Atomic(inner) => tick_of(inner),
1793 _ => None,
1794 }
1795}
1796
1797#[cfg(feature = "build")]
1798fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1799 match loc {
1800 LocationId::Tick(id, inner) => {
1801 *id = uf_find(uf, *id);
1802 remap_location(inner, uf);
1803 }
1804 LocationId::Atomic(inner) => {
1805 remap_location(inner, uf);
1806 }
1807 LocationId::Process(_) | LocationId::Cluster(_) => {}
1808 }
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1813 let p = *parent.get(&x).unwrap_or(&x);
1814 if p == x {
1815 return x;
1816 }
1817 let root = uf_find(parent, p);
1818 parent.insert(x, root);
1819 root
1820}
1821
1822#[cfg(feature = "build")]
1823fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1824 let ra = uf_find(parent, a);
1825 let rb = uf_find(parent, b);
1826 if ra != rb {
1827 parent.insert(ra, rb);
1828 }
1829}
1830
1831#[cfg(feature = "build")]
1835pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1836 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1837
1838 transform_bottom_up(
1840 ir,
1841 &mut |_| {},
1842 &mut |node: &mut HydroNode| match node {
1843 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1844 if let (Some(a), Some(b)) = (
1845 tick_of(&inner.metadata().location_id),
1846 tick_of(&metadata.location_id),
1847 ) {
1848 uf_union(&mut uf, a, b);
1849 }
1850 }
1851 HydroNode::Chain {
1852 first,
1853 second,
1854 metadata,
1855 }
1856 | HydroNode::ChainFirst {
1857 first,
1858 second,
1859 metadata,
1860 }
1861 | HydroNode::MergeOrdered {
1862 first,
1863 second,
1864 metadata,
1865 } => {
1866 if let (Some(a), Some(b)) = (
1867 tick_of(&first.metadata().location_id),
1868 tick_of(&metadata.location_id),
1869 ) {
1870 uf_union(&mut uf, a, b);
1871 }
1872 if let (Some(a), Some(b)) = (
1873 tick_of(&second.metadata().location_id),
1874 tick_of(&metadata.location_id),
1875 ) {
1876 uf_union(&mut uf, a, b);
1877 }
1878 }
1879 _ => {}
1880 },
1881 false,
1882 );
1883
1884 transform_bottom_up(
1886 ir,
1887 &mut |_| {},
1888 &mut |node: &mut HydroNode| {
1889 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1890 },
1891 false,
1892 );
1893}
1894
1895#[cfg(feature = "build")]
1896pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1897 let mut builders = SecondaryMap::new();
1898 let mut seen_tees = HashMap::new();
1899 let mut built_tees = HashMap::new();
1900 let mut next_stmt_id = StmtId::default();
1901 let mut fold_hooked_idents = HashSet::new();
1902 for leaf in ir {
1903 leaf.emit(
1904 &mut builders,
1905 &mut seen_tees,
1906 &mut built_tees,
1907 &mut next_stmt_id,
1908 &mut fold_hooked_idents,
1909 );
1910 }
1911 builders
1912}
1913
1914#[cfg(feature = "build")]
1915pub fn traverse_dfir(
1916 ir: &mut [HydroRoot],
1917 transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1918 transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1919) {
1920 let mut seen_tees = HashMap::new();
1921 let mut built_tees = HashMap::new();
1922 let mut next_stmt_id = StmtId::default();
1923 let mut fold_hooked_idents = HashSet::new();
1924 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1925 ir.iter_mut().for_each(|leaf| {
1926 leaf.emit_core(
1927 &mut callback,
1928 &mut seen_tees,
1929 &mut built_tees,
1930 &mut next_stmt_id,
1931 &mut fold_hooked_idents,
1932 );
1933 });
1934}
1935
1936pub fn transform_bottom_up(
1937 ir: &mut [HydroRoot],
1938 transform_root: &mut impl FnMut(&mut HydroRoot),
1939 transform_node: &mut impl FnMut(&mut HydroNode),
1940 check_well_formed: bool,
1941) {
1942 let mut seen_tees = HashMap::new();
1943 ir.iter_mut().for_each(|leaf| {
1944 leaf.transform_bottom_up(
1945 transform_root,
1946 transform_node,
1947 &mut seen_tees,
1948 check_well_formed,
1949 );
1950 });
1951}
1952
1953pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1954 let mut seen_tees = HashMap::new();
1955 ir.iter()
1956 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1957 .collect()
1958}
1959
1960type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1961thread_local! {
1962 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1963 static SERIALIZED_SHARED: PrintedTees
1967 = const { RefCell::new(None) };
1968}
1969
1970pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1971 PRINTED_TEES.with(|printed_tees| {
1972 let mut printed_tees_mut = printed_tees.borrow_mut();
1973 *printed_tees_mut = Some((0, HashMap::new()));
1974 drop(printed_tees_mut);
1975
1976 let ret = f();
1977
1978 let mut printed_tees_mut = printed_tees.borrow_mut();
1979 *printed_tees_mut = None;
1980
1981 ret
1982 })
1983}
1984
1985pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1990 let _guard = SerializedSharedGuard::enter();
1991 f()
1992}
1993
1994struct SerializedSharedGuard {
1997 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1998}
1999
2000impl SerializedSharedGuard {
2001 fn enter() -> Self {
2002 let previous = SERIALIZED_SHARED.with(|cell| {
2003 let mut guard = cell.borrow_mut();
2004 guard.replace((0, HashMap::new()))
2005 });
2006 Self { previous }
2007 }
2008}
2009
2010impl Drop for SerializedSharedGuard {
2011 fn drop(&mut self) {
2012 SERIALIZED_SHARED.with(|cell| {
2013 *cell.borrow_mut() = self.previous.take();
2014 });
2015 }
2016}
2017
2018pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2019
2020impl serde::Serialize for SharedNode {
2021 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2032 SERIALIZED_SHARED.with(|cell| {
2033 let mut guard = cell.borrow_mut();
2034 let state = guard.as_mut().ok_or_else(|| {
2036 serde::ser::Error::custom(
2037 "SharedNode serialization requires an active serialize_dedup_shared scope",
2038 )
2039 })?;
2040 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2041
2042 if let Some(&id) = state.1.get(&ptr) {
2043 drop(guard);
2044 use serde::ser::SerializeMap;
2045 let mut map = serializer.serialize_map(Some(1))?;
2046 map.serialize_entry("$shared_ref", &id)?;
2047 map.end()
2048 } else {
2049 let id = state.0;
2050 state.0 += 1;
2051 state.1.insert(ptr, id);
2052 drop(guard);
2053
2054 use serde::ser::SerializeMap;
2055 let mut map = serializer.serialize_map(Some(2))?;
2056 map.serialize_entry("$shared", &id)?;
2057 map.serialize_entry("node", &*self.0.borrow())?;
2058 map.end()
2059 }
2060 })
2061 }
2062}
2063
2064impl SharedNode {
2065 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2066 Rc::as_ptr(&self.0)
2067 }
2068}
2069
2070impl Debug for SharedNode {
2071 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2072 PRINTED_TEES.with(|printed_tees| {
2073 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2074 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2075
2076 if let Some(printed_tees_mut) = printed_tees_mut {
2077 if let Some(existing) = printed_tees_mut
2078 .1
2079 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2080 {
2081 write!(f, "<shared {}>", existing)
2082 } else {
2083 let next_id = printed_tees_mut.0;
2084 printed_tees_mut.0 += 1;
2085 printed_tees_mut
2086 .1
2087 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2088 drop(printed_tees_mut_borrow);
2089 write!(f, "<shared {}>: ", next_id)?;
2090 Debug::fmt(&self.0.borrow(), f)
2091 }
2092 } else {
2093 drop(printed_tees_mut_borrow);
2094 write!(f, "<shared>: ")?;
2095 Debug::fmt(&self.0.borrow(), f)
2096 }
2097 })
2098 }
2099}
2100
2101impl Hash for SharedNode {
2102 fn hash<H: Hasher>(&self, state: &mut H) {
2103 self.0.borrow_mut().hash(state);
2104 }
2105}
2106
2107#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2108pub enum BoundKind {
2109 Unbounded,
2110 Bounded,
2111}
2112
2113#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2114pub enum StreamOrder {
2115 NoOrder,
2116 TotalOrder,
2117}
2118
2119#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2120pub enum StreamRetry {
2121 AtLeastOnce,
2122 ExactlyOnce,
2123}
2124
2125#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2126pub enum KeyedSingletonBoundKind {
2127 Unbounded,
2128 MonotonicValue,
2129 BoundedValue,
2130 Bounded,
2131}
2132
2133#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2134pub enum SingletonBoundKind {
2135 Unbounded,
2136 Monotonic,
2137 Bounded,
2138}
2139
2140#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2141pub enum CollectionKind {
2142 Stream {
2143 bound: BoundKind,
2144 order: StreamOrder,
2145 retry: StreamRetry,
2146 element_type: DebugType,
2147 },
2148 Singleton {
2149 bound: SingletonBoundKind,
2150 element_type: DebugType,
2151 },
2152 Optional {
2153 bound: BoundKind,
2154 element_type: DebugType,
2155 },
2156 KeyedStream {
2157 bound: BoundKind,
2158 value_order: StreamOrder,
2159 value_retry: StreamRetry,
2160 key_type: DebugType,
2161 value_type: DebugType,
2162 },
2163 KeyedSingleton {
2164 bound: KeyedSingletonBoundKind,
2165 key_type: DebugType,
2166 value_type: DebugType,
2167 },
2168}
2169
2170impl CollectionKind {
2171 pub fn is_bounded(&self) -> bool {
2172 matches!(
2173 self,
2174 CollectionKind::Stream {
2175 bound: BoundKind::Bounded,
2176 ..
2177 } | CollectionKind::Singleton {
2178 bound: SingletonBoundKind::Bounded,
2179 ..
2180 } | CollectionKind::Optional {
2181 bound: BoundKind::Bounded,
2182 ..
2183 } | CollectionKind::KeyedStream {
2184 bound: BoundKind::Bounded,
2185 ..
2186 } | CollectionKind::KeyedSingleton {
2187 bound: KeyedSingletonBoundKind::Bounded,
2188 ..
2189 }
2190 )
2191 }
2192}
2193
2194#[derive(Clone, serde::Serialize)]
2195pub struct HydroIrMetadata {
2196 pub location_id: LocationId,
2197 pub collection_kind: CollectionKind,
2198 pub consistency: Option<ClusterConsistency>,
2199 pub cardinality: Option<usize>,
2200 pub tag: Option<String>,
2201 pub op: HydroIrOpMetadata,
2202}
2203
2204impl Hash for HydroIrMetadata {
2206 fn hash<H: Hasher>(&self, _: &mut H) {}
2207}
2208
2209impl PartialEq for HydroIrMetadata {
2210 fn eq(&self, _: &Self) -> bool {
2211 true
2212 }
2213}
2214
2215impl Eq for HydroIrMetadata {}
2216
2217impl Debug for HydroIrMetadata {
2218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2219 f.debug_struct("HydroIrMetadata")
2220 .field("location_id", &self.location_id)
2221 .field("collection_kind", &self.collection_kind)
2222 .finish()
2223 }
2224}
2225
2226#[derive(Clone, serde::Serialize)]
2229pub struct HydroIrOpMetadata {
2230 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2231 pub backtrace: Backtrace,
2232 pub cpu_usage: Option<f64>,
2233 pub network_recv_cpu_usage: Option<f64>,
2234 pub id: Option<usize>,
2235}
2236
2237impl HydroIrOpMetadata {
2238 #[expect(
2239 clippy::new_without_default,
2240 reason = "explicit calls to new ensure correct backtrace bounds"
2241 )]
2242 pub fn new() -> HydroIrOpMetadata {
2243 Self::new_with_skip(1)
2244 }
2245
2246 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2247 HydroIrOpMetadata {
2248 backtrace: Backtrace::get_backtrace(2 + skip_count),
2249 cpu_usage: None,
2250 network_recv_cpu_usage: None,
2251 id: None,
2252 }
2253 }
2254}
2255
2256impl Debug for HydroIrOpMetadata {
2257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2258 f.debug_struct("HydroIrOpMetadata").finish()
2259 }
2260}
2261
2262impl Hash for HydroIrOpMetadata {
2263 fn hash<H: Hasher>(&self, _: &mut H) {}
2264}
2265
2266#[derive(Debug, Hash, serde::Serialize)]
2269pub enum HydroNode {
2270 Placeholder,
2271
2272 Cast {
2280 inner: Box<HydroNode>,
2281 metadata: HydroIrMetadata,
2282 },
2283
2284 ObserveNonDet {
2290 inner: Box<HydroNode>,
2291 trusted: bool, metadata: HydroIrMetadata,
2293 },
2294
2295 Source {
2296 source: HydroSource,
2297 metadata: HydroIrMetadata,
2298 },
2299
2300 SingletonSource {
2301 value: DebugExpr,
2302 first_tick_only: bool,
2303 metadata: HydroIrMetadata,
2304 },
2305
2306 CycleSource {
2307 cycle_id: CycleId,
2308 metadata: HydroIrMetadata,
2309 },
2310
2311 Tee {
2312 inner: SharedNode,
2313 metadata: HydroIrMetadata,
2314 },
2315
2316 Singleton {
2324 inner: SharedNode,
2325 metadata: HydroIrMetadata,
2326 },
2327
2328 Partition {
2329 inner: SharedNode,
2330 f: ClosureExpr,
2331 is_true: bool,
2332 metadata: HydroIrMetadata,
2333 },
2334
2335 BeginAtomic {
2336 inner: Box<HydroNode>,
2337 metadata: HydroIrMetadata,
2338 },
2339
2340 EndAtomic {
2341 inner: Box<HydroNode>,
2342 metadata: HydroIrMetadata,
2343 },
2344
2345 Batch {
2346 inner: Box<HydroNode>,
2347 metadata: HydroIrMetadata,
2348 },
2349
2350 YieldConcat {
2351 inner: Box<HydroNode>,
2352 metadata: HydroIrMetadata,
2353 },
2354
2355 Chain {
2356 first: Box<HydroNode>,
2357 second: Box<HydroNode>,
2358 metadata: HydroIrMetadata,
2359 },
2360
2361 MergeOrdered {
2362 first: Box<HydroNode>,
2363 second: Box<HydroNode>,
2364 metadata: HydroIrMetadata,
2365 },
2366
2367 ChainFirst {
2368 first: Box<HydroNode>,
2369 second: Box<HydroNode>,
2370 metadata: HydroIrMetadata,
2371 },
2372
2373 CrossProduct {
2374 left: Box<HydroNode>,
2375 right: Box<HydroNode>,
2376 metadata: HydroIrMetadata,
2377 },
2378
2379 CrossSingleton {
2380 left: Box<HydroNode>,
2381 right: Box<HydroNode>,
2382 metadata: HydroIrMetadata,
2383 },
2384
2385 Join {
2386 left: Box<HydroNode>,
2387 right: Box<HydroNode>,
2388 metadata: HydroIrMetadata,
2389 },
2390
2391 JoinHalf {
2395 left: Box<HydroNode>,
2396 right: Box<HydroNode>,
2397 metadata: HydroIrMetadata,
2398 },
2399
2400 Difference {
2401 pos: Box<HydroNode>,
2402 neg: Box<HydroNode>,
2403 metadata: HydroIrMetadata,
2404 },
2405
2406 AntiJoin {
2407 pos: Box<HydroNode>,
2408 neg: Box<HydroNode>,
2409 metadata: HydroIrMetadata,
2410 },
2411
2412 ResolveFutures {
2413 input: Box<HydroNode>,
2414 metadata: HydroIrMetadata,
2415 },
2416 ResolveFuturesBlocking {
2417 input: Box<HydroNode>,
2418 metadata: HydroIrMetadata,
2419 },
2420 ResolveFuturesOrdered {
2421 input: Box<HydroNode>,
2422 metadata: HydroIrMetadata,
2423 },
2424
2425 Map {
2426 f: ClosureExpr,
2427 input: Box<HydroNode>,
2428 metadata: HydroIrMetadata,
2429 },
2430 FlatMap {
2431 f: ClosureExpr,
2432 input: Box<HydroNode>,
2433 metadata: HydroIrMetadata,
2434 },
2435 FlatMapStreamBlocking {
2436 f: ClosureExpr,
2437 input: Box<HydroNode>,
2438 metadata: HydroIrMetadata,
2439 },
2440 Filter {
2441 f: ClosureExpr,
2442 input: Box<HydroNode>,
2443 metadata: HydroIrMetadata,
2444 },
2445 FilterMap {
2446 f: ClosureExpr,
2447 input: Box<HydroNode>,
2448 metadata: HydroIrMetadata,
2449 },
2450
2451 DeferTick {
2452 input: Box<HydroNode>,
2453 metadata: HydroIrMetadata,
2454 },
2455 Enumerate {
2456 input: Box<HydroNode>,
2457 metadata: HydroIrMetadata,
2458 },
2459 Inspect {
2460 f: ClosureExpr,
2461 input: Box<HydroNode>,
2462 metadata: HydroIrMetadata,
2463 },
2464
2465 Unique {
2466 input: Box<HydroNode>,
2467 metadata: HydroIrMetadata,
2468 },
2469
2470 Sort {
2471 input: Box<HydroNode>,
2472 metadata: HydroIrMetadata,
2473 },
2474 Fold {
2475 init: ClosureExpr,
2476 acc: ClosureExpr,
2477 input: Box<HydroNode>,
2478 metadata: HydroIrMetadata,
2479 },
2480
2481 Scan {
2482 init: ClosureExpr,
2483 acc: ClosureExpr,
2484 input: Box<HydroNode>,
2485 metadata: HydroIrMetadata,
2486 },
2487 ScanAsyncBlocking {
2488 init: ClosureExpr,
2489 acc: ClosureExpr,
2490 input: Box<HydroNode>,
2491 metadata: HydroIrMetadata,
2492 },
2493 FoldKeyed {
2494 init: ClosureExpr,
2495 acc: ClosureExpr,
2496 input: Box<HydroNode>,
2497 metadata: HydroIrMetadata,
2498 },
2499
2500 Reduce {
2501 f: ClosureExpr,
2502 input: Box<HydroNode>,
2503 metadata: HydroIrMetadata,
2504 },
2505 ReduceKeyed {
2506 f: ClosureExpr,
2507 input: Box<HydroNode>,
2508 metadata: HydroIrMetadata,
2509 },
2510 ReduceKeyedWatermark {
2511 f: ClosureExpr,
2512 input: Box<HydroNode>,
2513 watermark: Box<HydroNode>,
2514 metadata: HydroIrMetadata,
2515 },
2516
2517 Network {
2518 name: Option<String>,
2519 networking_info: crate::networking::NetworkingInfo,
2520 serialize_fn: Option<DebugExpr>,
2521 instantiate_fn: DebugInstantiate,
2522 deserialize_fn: Option<DebugExpr>,
2523 input: Box<HydroNode>,
2524 metadata: HydroIrMetadata,
2525 },
2526
2527 ExternalInput {
2528 from_external_key: LocationKey,
2529 from_port_id: ExternalPortId,
2530 from_many: bool,
2531 codec_type: DebugType,
2532 #[serde(skip)]
2533 port_hint: NetworkHint,
2534 instantiate_fn: DebugInstantiate,
2535 deserialize_fn: Option<DebugExpr>,
2536 metadata: HydroIrMetadata,
2537 },
2538
2539 Counter {
2540 tag: String,
2541 duration: DebugExpr,
2542 prefix: String,
2543 input: Box<HydroNode>,
2544 metadata: HydroIrMetadata,
2545 },
2546
2547 AssertIsConsistent {
2548 inner: Box<HydroNode>,
2549 trusted: bool,
2550 metadata: HydroIrMetadata,
2551 },
2552
2553 UnboundSingleton {
2554 inner: Box<HydroNode>,
2555 metadata: HydroIrMetadata,
2556 },
2557}
2558
2559pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2560pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2561
2562impl HydroNode {
2563 pub fn transform_bottom_up(
2564 &mut self,
2565 transform: &mut impl FnMut(&mut HydroNode),
2566 seen_tees: &mut SeenSharedNodes,
2567 check_well_formed: bool,
2568 ) {
2569 self.transform_children(
2570 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2571 seen_tees,
2572 );
2573
2574 transform(self);
2575
2576 let self_location = self.metadata().location_id.root();
2577
2578 if check_well_formed {
2579 match &*self {
2580 HydroNode::Network { .. } => {}
2581 _ => {
2582 self.input_metadata().iter().for_each(|i| {
2583 if i.location_id.root() != self_location {
2584 panic!(
2585 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2586 i,
2587 i.location_id.root(),
2588 self,
2589 self_location
2590 )
2591 }
2592 });
2593 }
2594 }
2595 }
2596 }
2597
2598 #[inline(always)]
2599 pub fn transform_children(
2600 &mut self,
2601 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2602 seen_tees: &mut SeenSharedNodes,
2603 ) {
2604 match self {
2605 HydroNode::Placeholder => {
2606 panic!();
2607 }
2608
2609 HydroNode::Source { .. }
2610 | HydroNode::SingletonSource { .. }
2611 | HydroNode::CycleSource { .. }
2612 | HydroNode::ExternalInput { .. } => {}
2613
2614 HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2615 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2616 *inner = SharedNode(transformed.clone());
2617 } else {
2618 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2619 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2620 let mut orig = inner.0.replace(HydroNode::Placeholder);
2621 transform(&mut orig, seen_tees);
2622 *transformed_cell.borrow_mut() = orig;
2623 *inner = SharedNode(transformed_cell);
2624 }
2625 }
2626
2627 HydroNode::Partition { inner, f, .. } => {
2628 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2629 *inner = SharedNode(transformed.clone());
2630 } else {
2631 f.transform_children(&mut transform, seen_tees);
2632 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2633 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2634 let mut orig = inner.0.replace(HydroNode::Placeholder);
2635 transform(&mut orig, seen_tees);
2636 *transformed_cell.borrow_mut() = orig;
2637 *inner = SharedNode(transformed_cell);
2638 }
2639 }
2640
2641 HydroNode::Cast { inner, .. }
2642 | HydroNode::ObserveNonDet { inner, .. }
2643 | HydroNode::BeginAtomic { inner, .. }
2644 | HydroNode::EndAtomic { inner, .. }
2645 | HydroNode::Batch { inner, .. }
2646 | HydroNode::YieldConcat { inner, .. }
2647 | HydroNode::UnboundSingleton { inner, .. }
2648 | HydroNode::AssertIsConsistent { inner, .. } => {
2649 transform(inner.as_mut(), seen_tees);
2650 }
2651
2652 HydroNode::Chain { first, second, .. } => {
2653 transform(first.as_mut(), seen_tees);
2654 transform(second.as_mut(), seen_tees);
2655 }
2656
2657 HydroNode::MergeOrdered { first, second, .. } => {
2658 transform(first.as_mut(), seen_tees);
2659 transform(second.as_mut(), seen_tees);
2660 }
2661
2662 HydroNode::ChainFirst { first, second, .. } => {
2663 transform(first.as_mut(), seen_tees);
2664 transform(second.as_mut(), seen_tees);
2665 }
2666
2667 HydroNode::CrossSingleton { left, right, .. }
2668 | HydroNode::CrossProduct { left, right, .. }
2669 | HydroNode::Join { left, right, .. }
2670 | HydroNode::JoinHalf { left, right, .. } => {
2671 transform(left.as_mut(), seen_tees);
2672 transform(right.as_mut(), seen_tees);
2673 }
2674
2675 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2676 transform(pos.as_mut(), seen_tees);
2677 transform(neg.as_mut(), seen_tees);
2678 }
2679
2680 HydroNode::Map { f, input, .. } => {
2681 f.transform_children(&mut transform, seen_tees);
2682 transform(input.as_mut(), seen_tees);
2683 }
2684 HydroNode::FlatMap { f, input, .. }
2685 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2686 | HydroNode::Filter { f, input, .. }
2687 | HydroNode::FilterMap { f, input, .. }
2688 | HydroNode::Inspect { f, input, .. }
2689 | HydroNode::Reduce { f, input, .. }
2690 | HydroNode::ReduceKeyed { f, input, .. } => {
2691 f.transform_children(&mut transform, seen_tees);
2692 transform(input.as_mut(), seen_tees);
2693 }
2694 HydroNode::ReduceKeyedWatermark {
2695 f,
2696 input,
2697 watermark,
2698 ..
2699 } => {
2700 f.transform_children(&mut transform, seen_tees);
2701 transform(input.as_mut(), seen_tees);
2702 transform(watermark.as_mut(), seen_tees);
2703 }
2704 HydroNode::Fold {
2705 init, acc, input, ..
2706 }
2707 | HydroNode::Scan {
2708 init, acc, input, ..
2709 }
2710 | HydroNode::ScanAsyncBlocking {
2711 init, acc, input, ..
2712 }
2713 | HydroNode::FoldKeyed {
2714 init, acc, input, ..
2715 } => {
2716 init.transform_children(&mut transform, seen_tees);
2717 acc.transform_children(&mut transform, seen_tees);
2718 transform(input.as_mut(), seen_tees);
2719 }
2720 HydroNode::ResolveFutures { input, .. }
2721 | HydroNode::ResolveFuturesBlocking { input, .. }
2722 | HydroNode::ResolveFuturesOrdered { input, .. }
2723 | HydroNode::Sort { input, .. }
2724 | HydroNode::DeferTick { input, .. }
2725 | HydroNode::Enumerate { input, .. }
2726 | HydroNode::Unique { input, .. }
2727 | HydroNode::Network { input, .. }
2728 | HydroNode::Counter { input, .. } => {
2729 transform(input.as_mut(), seen_tees);
2730 }
2731 }
2732 }
2733
2734 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2735 match self {
2736 HydroNode::Placeholder => HydroNode::Placeholder,
2737 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2738 inner: Box::new(inner.deep_clone(seen_tees)),
2739 metadata: metadata.clone(),
2740 },
2741 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2742 inner: Box::new(inner.deep_clone(seen_tees)),
2743 metadata: metadata.clone(),
2744 },
2745 HydroNode::ObserveNonDet {
2746 inner,
2747 trusted,
2748 metadata,
2749 } => HydroNode::ObserveNonDet {
2750 inner: Box::new(inner.deep_clone(seen_tees)),
2751 trusted: *trusted,
2752 metadata: metadata.clone(),
2753 },
2754 HydroNode::AssertIsConsistent {
2755 inner,
2756 trusted,
2757 metadata,
2758 } => HydroNode::AssertIsConsistent {
2759 inner: Box::new(inner.deep_clone(seen_tees)),
2760 trusted: *trusted,
2761 metadata: metadata.clone(),
2762 },
2763 HydroNode::Source { source, metadata } => HydroNode::Source {
2764 source: source.clone(),
2765 metadata: metadata.clone(),
2766 },
2767 HydroNode::SingletonSource {
2768 value,
2769 first_tick_only,
2770 metadata,
2771 } => HydroNode::SingletonSource {
2772 value: value.clone(),
2773 first_tick_only: *first_tick_only,
2774 metadata: metadata.clone(),
2775 },
2776 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2777 cycle_id: *cycle_id,
2778 metadata: metadata.clone(),
2779 },
2780 HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2781 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2782 SharedNode(transformed.clone())
2783 } else {
2784 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2785 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2786 let cloned = inner.0.borrow().deep_clone(seen_tees);
2787 *new_rc.borrow_mut() = cloned;
2788 SharedNode(new_rc)
2789 };
2790 if matches!(self, HydroNode::Singleton { .. }) {
2791 HydroNode::Singleton {
2792 inner: cloned_inner,
2793 metadata: metadata.clone(),
2794 }
2795 } else {
2796 HydroNode::Tee {
2797 inner: cloned_inner,
2798 metadata: metadata.clone(),
2799 }
2800 }
2801 }
2802 HydroNode::Partition {
2803 inner,
2804 f,
2805 is_true,
2806 metadata,
2807 } => {
2808 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2809 HydroNode::Partition {
2810 inner: SharedNode(transformed.clone()),
2811 f: f.deep_clone(seen_tees),
2812 is_true: *is_true,
2813 metadata: metadata.clone(),
2814 }
2815 } else {
2816 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2817 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2818 let cloned = inner.0.borrow().deep_clone(seen_tees);
2819 *new_rc.borrow_mut() = cloned;
2820 HydroNode::Partition {
2821 inner: SharedNode(new_rc),
2822 f: f.deep_clone(seen_tees),
2823 is_true: *is_true,
2824 metadata: metadata.clone(),
2825 }
2826 }
2827 }
2828 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2829 inner: Box::new(inner.deep_clone(seen_tees)),
2830 metadata: metadata.clone(),
2831 },
2832 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2833 inner: Box::new(inner.deep_clone(seen_tees)),
2834 metadata: metadata.clone(),
2835 },
2836 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2837 inner: Box::new(inner.deep_clone(seen_tees)),
2838 metadata: metadata.clone(),
2839 },
2840 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2841 inner: Box::new(inner.deep_clone(seen_tees)),
2842 metadata: metadata.clone(),
2843 },
2844 HydroNode::Chain {
2845 first,
2846 second,
2847 metadata,
2848 } => HydroNode::Chain {
2849 first: Box::new(first.deep_clone(seen_tees)),
2850 second: Box::new(second.deep_clone(seen_tees)),
2851 metadata: metadata.clone(),
2852 },
2853 HydroNode::MergeOrdered {
2854 first,
2855 second,
2856 metadata,
2857 } => HydroNode::MergeOrdered {
2858 first: Box::new(first.deep_clone(seen_tees)),
2859 second: Box::new(second.deep_clone(seen_tees)),
2860 metadata: metadata.clone(),
2861 },
2862 HydroNode::ChainFirst {
2863 first,
2864 second,
2865 metadata,
2866 } => HydroNode::ChainFirst {
2867 first: Box::new(first.deep_clone(seen_tees)),
2868 second: Box::new(second.deep_clone(seen_tees)),
2869 metadata: metadata.clone(),
2870 },
2871 HydroNode::CrossProduct {
2872 left,
2873 right,
2874 metadata,
2875 } => HydroNode::CrossProduct {
2876 left: Box::new(left.deep_clone(seen_tees)),
2877 right: Box::new(right.deep_clone(seen_tees)),
2878 metadata: metadata.clone(),
2879 },
2880 HydroNode::CrossSingleton {
2881 left,
2882 right,
2883 metadata,
2884 } => HydroNode::CrossSingleton {
2885 left: Box::new(left.deep_clone(seen_tees)),
2886 right: Box::new(right.deep_clone(seen_tees)),
2887 metadata: metadata.clone(),
2888 },
2889 HydroNode::Join {
2890 left,
2891 right,
2892 metadata,
2893 } => HydroNode::Join {
2894 left: Box::new(left.deep_clone(seen_tees)),
2895 right: Box::new(right.deep_clone(seen_tees)),
2896 metadata: metadata.clone(),
2897 },
2898 HydroNode::JoinHalf {
2899 left,
2900 right,
2901 metadata,
2902 } => HydroNode::JoinHalf {
2903 left: Box::new(left.deep_clone(seen_tees)),
2904 right: Box::new(right.deep_clone(seen_tees)),
2905 metadata: metadata.clone(),
2906 },
2907 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2908 pos: Box::new(pos.deep_clone(seen_tees)),
2909 neg: Box::new(neg.deep_clone(seen_tees)),
2910 metadata: metadata.clone(),
2911 },
2912 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2913 pos: Box::new(pos.deep_clone(seen_tees)),
2914 neg: Box::new(neg.deep_clone(seen_tees)),
2915 metadata: metadata.clone(),
2916 },
2917 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2918 input: Box::new(input.deep_clone(seen_tees)),
2919 metadata: metadata.clone(),
2920 },
2921 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2922 HydroNode::ResolveFuturesBlocking {
2923 input: Box::new(input.deep_clone(seen_tees)),
2924 metadata: metadata.clone(),
2925 }
2926 }
2927 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2928 HydroNode::ResolveFuturesOrdered {
2929 input: Box::new(input.deep_clone(seen_tees)),
2930 metadata: metadata.clone(),
2931 }
2932 }
2933 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2934 f: f.deep_clone(seen_tees),
2935 input: Box::new(input.deep_clone(seen_tees)),
2936 metadata: metadata.clone(),
2937 },
2938 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2939 f: f.deep_clone(seen_tees),
2940 input: Box::new(input.deep_clone(seen_tees)),
2941 metadata: metadata.clone(),
2942 },
2943 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2944 HydroNode::FlatMapStreamBlocking {
2945 f: f.deep_clone(seen_tees),
2946 input: Box::new(input.deep_clone(seen_tees)),
2947 metadata: metadata.clone(),
2948 }
2949 }
2950 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2951 f: f.deep_clone(seen_tees),
2952 input: Box::new(input.deep_clone(seen_tees)),
2953 metadata: metadata.clone(),
2954 },
2955 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2956 f: f.deep_clone(seen_tees),
2957 input: Box::new(input.deep_clone(seen_tees)),
2958 metadata: metadata.clone(),
2959 },
2960 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2961 input: Box::new(input.deep_clone(seen_tees)),
2962 metadata: metadata.clone(),
2963 },
2964 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2965 input: Box::new(input.deep_clone(seen_tees)),
2966 metadata: metadata.clone(),
2967 },
2968 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2969 f: f.deep_clone(seen_tees),
2970 input: Box::new(input.deep_clone(seen_tees)),
2971 metadata: metadata.clone(),
2972 },
2973 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2974 input: Box::new(input.deep_clone(seen_tees)),
2975 metadata: metadata.clone(),
2976 },
2977 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2978 input: Box::new(input.deep_clone(seen_tees)),
2979 metadata: metadata.clone(),
2980 },
2981 HydroNode::Fold {
2982 init,
2983 acc,
2984 input,
2985 metadata,
2986 } => HydroNode::Fold {
2987 init: init.deep_clone(seen_tees),
2988 acc: acc.deep_clone(seen_tees),
2989 input: Box::new(input.deep_clone(seen_tees)),
2990 metadata: metadata.clone(),
2991 },
2992 HydroNode::Scan {
2993 init,
2994 acc,
2995 input,
2996 metadata,
2997 } => HydroNode::Scan {
2998 init: init.deep_clone(seen_tees),
2999 acc: acc.deep_clone(seen_tees),
3000 input: Box::new(input.deep_clone(seen_tees)),
3001 metadata: metadata.clone(),
3002 },
3003 HydroNode::ScanAsyncBlocking {
3004 init,
3005 acc,
3006 input,
3007 metadata,
3008 } => HydroNode::ScanAsyncBlocking {
3009 init: init.deep_clone(seen_tees),
3010 acc: acc.deep_clone(seen_tees),
3011 input: Box::new(input.deep_clone(seen_tees)),
3012 metadata: metadata.clone(),
3013 },
3014 HydroNode::FoldKeyed {
3015 init,
3016 acc,
3017 input,
3018 metadata,
3019 } => HydroNode::FoldKeyed {
3020 init: init.deep_clone(seen_tees),
3021 acc: acc.deep_clone(seen_tees),
3022 input: Box::new(input.deep_clone(seen_tees)),
3023 metadata: metadata.clone(),
3024 },
3025 HydroNode::ReduceKeyedWatermark {
3026 f,
3027 input,
3028 watermark,
3029 metadata,
3030 } => HydroNode::ReduceKeyedWatermark {
3031 f: f.deep_clone(seen_tees),
3032 input: Box::new(input.deep_clone(seen_tees)),
3033 watermark: Box::new(watermark.deep_clone(seen_tees)),
3034 metadata: metadata.clone(),
3035 },
3036 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3037 f: f.deep_clone(seen_tees),
3038 input: Box::new(input.deep_clone(seen_tees)),
3039 metadata: metadata.clone(),
3040 },
3041 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3042 f: f.deep_clone(seen_tees),
3043 input: Box::new(input.deep_clone(seen_tees)),
3044 metadata: metadata.clone(),
3045 },
3046 HydroNode::Network {
3047 name,
3048 networking_info,
3049 serialize_fn,
3050 instantiate_fn,
3051 deserialize_fn,
3052 input,
3053 metadata,
3054 } => HydroNode::Network {
3055 name: name.clone(),
3056 networking_info: networking_info.clone(),
3057 serialize_fn: serialize_fn.clone(),
3058 instantiate_fn: instantiate_fn.clone(),
3059 deserialize_fn: deserialize_fn.clone(),
3060 input: Box::new(input.deep_clone(seen_tees)),
3061 metadata: metadata.clone(),
3062 },
3063 HydroNode::ExternalInput {
3064 from_external_key,
3065 from_port_id,
3066 from_many,
3067 codec_type,
3068 port_hint,
3069 instantiate_fn,
3070 deserialize_fn,
3071 metadata,
3072 } => HydroNode::ExternalInput {
3073 from_external_key: *from_external_key,
3074 from_port_id: *from_port_id,
3075 from_many: *from_many,
3076 codec_type: codec_type.clone(),
3077 port_hint: *port_hint,
3078 instantiate_fn: instantiate_fn.clone(),
3079 deserialize_fn: deserialize_fn.clone(),
3080 metadata: metadata.clone(),
3081 },
3082 HydroNode::Counter {
3083 tag,
3084 duration,
3085 prefix,
3086 input,
3087 metadata,
3088 } => HydroNode::Counter {
3089 tag: tag.clone(),
3090 duration: duration.clone(),
3091 prefix: prefix.clone(),
3092 input: Box::new(input.deep_clone(seen_tees)),
3093 metadata: metadata.clone(),
3094 },
3095 }
3096 }
3097
3098 #[cfg(feature = "build")]
3099 pub fn emit_core(
3100 &mut self,
3101 builders_or_callback: &mut BuildersOrCallback<
3102 impl FnMut(&mut HydroRoot, &mut StmtId),
3103 impl FnMut(&mut HydroNode, &mut StmtId),
3104 >,
3105 seen_tees: &mut SeenSharedNodes,
3106 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3107 next_stmt_id: &mut StmtId,
3108 fold_hooked_idents: &mut HashSet<String>,
3109 ) -> syn::Ident {
3110 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3111
3112 self.transform_bottom_up(
3113 &mut |node: &mut HydroNode| {
3114 let out_location = node.metadata().location_id.clone();
3115 match node {
3116 HydroNode::Placeholder => {
3117 panic!()
3118 }
3119
3120 HydroNode::Cast { .. } => {
3121 match builders_or_callback {
3124 BuildersOrCallback::Builders(_) => {}
3125 BuildersOrCallback::Callback(_, node_callback) => {
3126 node_callback(node, next_stmt_id);
3127 }
3128 }
3129
3130 let _ = next_stmt_id.get_and_increment();
3131 }
3133
3134 HydroNode::UnboundSingleton { .. } => {
3135 let inner_ident = ident_stack.pop().unwrap();
3136
3137 let out_ident =
3138 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3139
3140 match builders_or_callback {
3141 BuildersOrCallback::Builders(graph_builders) => {
3142 if graph_builders.singleton_intermediates() {
3143 let builder = graph_builders.get_dfir_mut(&out_location);
3144 builder.add_dfir(
3145 parse_quote! {
3146 #out_ident = #inner_ident;
3147 },
3148 None,
3149 None,
3150 );
3151 } else {
3152 let builder = graph_builders.get_dfir_mut(&out_location);
3153 builder.add_dfir(
3154 parse_quote! {
3155 #out_ident = #inner_ident -> persist::<'static>();
3156 },
3157 None,
3158 None,
3159 );
3160 }
3161 }
3162 BuildersOrCallback::Callback(_, node_callback) => {
3163 node_callback(node, next_stmt_id);
3164 }
3165 }
3166
3167 let _ = next_stmt_id.get_and_increment();
3168
3169 ident_stack.push(out_ident);
3170 }
3171
3172 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3173 let inner_ident = ident_stack.pop().unwrap();
3174
3175 let out_ident =
3176 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3177
3178 match builders_or_callback {
3179 BuildersOrCallback::Builders(graph_builders) => {
3180 graph_builders.assert_is_consistent(
3181 *trusted,
3182 &inner.metadata().location_id,
3183 inner_ident,
3184 &out_ident,
3185 );
3186 }
3187 BuildersOrCallback::Callback(_, node_callback) => {
3188 node_callback(node, next_stmt_id);
3189 }
3190 }
3191
3192 let _ = next_stmt_id.get_and_increment();
3193
3194 ident_stack.push(out_ident);
3195 }
3196
3197 HydroNode::ObserveNonDet {
3198 inner,
3199 trusted,
3200 metadata,
3201 ..
3202 } => {
3203 let inner_ident = ident_stack.pop().unwrap();
3204
3205 let observe_ident =
3206 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3207
3208 match builders_or_callback {
3209 BuildersOrCallback::Builders(graph_builders) => {
3210 graph_builders.observe_nondet(
3211 *trusted,
3212 &inner.metadata().location_id,
3213 inner_ident,
3214 &inner.metadata().collection_kind,
3215 &observe_ident,
3216 &metadata.collection_kind,
3217 &metadata.op,
3218 );
3219 }
3220 BuildersOrCallback::Callback(_, node_callback) => {
3221 node_callback(node, next_stmt_id);
3222 }
3223 }
3224
3225 let _ = next_stmt_id.get_and_increment();
3226
3227 ident_stack.push(observe_ident);
3228 }
3229
3230 HydroNode::Batch {
3231 inner, metadata, ..
3232 } => {
3233 let inner_ident = ident_stack.pop().unwrap();
3234
3235 let batch_ident =
3236 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3237
3238 match builders_or_callback {
3239 BuildersOrCallback::Builders(graph_builders) => {
3240 graph_builders.batch(
3241 inner_ident,
3242 &inner.metadata().location_id,
3243 &inner.metadata().collection_kind,
3244 &batch_ident,
3245 &out_location,
3246 &metadata.op,
3247 fold_hooked_idents,
3248 );
3249 }
3250 BuildersOrCallback::Callback(_, node_callback) => {
3251 node_callback(node, next_stmt_id);
3252 }
3253 }
3254
3255 let _ = next_stmt_id.get_and_increment();
3256
3257 ident_stack.push(batch_ident);
3258 }
3259
3260 HydroNode::YieldConcat { inner, .. } => {
3261 let inner_ident = ident_stack.pop().unwrap();
3262
3263 let yield_ident =
3264 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3265
3266 match builders_or_callback {
3267 BuildersOrCallback::Builders(graph_builders) => {
3268 graph_builders.yield_from_tick(
3269 inner_ident,
3270 &inner.metadata().location_id,
3271 &inner.metadata().collection_kind,
3272 &yield_ident,
3273 &out_location,
3274 );
3275 }
3276 BuildersOrCallback::Callback(_, node_callback) => {
3277 node_callback(node, next_stmt_id);
3278 }
3279 }
3280
3281 let _ = next_stmt_id.get_and_increment();
3282
3283 ident_stack.push(yield_ident);
3284 }
3285
3286 HydroNode::BeginAtomic { inner, metadata } => {
3287 let inner_ident = ident_stack.pop().unwrap();
3288
3289 let begin_ident =
3290 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3291
3292 match builders_or_callback {
3293 BuildersOrCallback::Builders(graph_builders) => {
3294 graph_builders.begin_atomic(
3295 inner_ident,
3296 &inner.metadata().location_id,
3297 &inner.metadata().collection_kind,
3298 &begin_ident,
3299 &out_location,
3300 &metadata.op,
3301 );
3302 }
3303 BuildersOrCallback::Callback(_, node_callback) => {
3304 node_callback(node, next_stmt_id);
3305 }
3306 }
3307
3308 let _ = next_stmt_id.get_and_increment();
3309
3310 ident_stack.push(begin_ident);
3311 }
3312
3313 HydroNode::EndAtomic { inner, .. } => {
3314 let inner_ident = ident_stack.pop().unwrap();
3315
3316 let end_ident =
3317 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3318
3319 match builders_or_callback {
3320 BuildersOrCallback::Builders(graph_builders) => {
3321 graph_builders.end_atomic(
3322 inner_ident,
3323 &inner.metadata().location_id,
3324 &inner.metadata().collection_kind,
3325 &end_ident,
3326 );
3327 }
3328 BuildersOrCallback::Callback(_, node_callback) => {
3329 node_callback(node, next_stmt_id);
3330 }
3331 }
3332
3333 let _ = next_stmt_id.get_and_increment();
3334
3335 ident_stack.push(end_ident);
3336 }
3337
3338 HydroNode::Source {
3339 source, metadata, ..
3340 } => {
3341 if let HydroSource::ExternalNetwork() = source {
3342 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3343 } else {
3344 let source_ident =
3345 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3346
3347 let source_stmt = match source {
3348 HydroSource::Stream(expr) => {
3349 debug_assert!(metadata.location_id.is_top_level());
3350 parse_quote! {
3351 #source_ident = source_stream(#expr);
3352 }
3353 }
3354
3355 HydroSource::ExternalNetwork() => {
3356 unreachable!()
3357 }
3358
3359 HydroSource::Iter(expr) => {
3360 if metadata.location_id.is_top_level() {
3361 parse_quote! {
3362 #source_ident = source_iter(#expr);
3363 }
3364 } else {
3365 parse_quote! {
3367 #source_ident = source_iter(#expr) -> persist::<'static>();
3368 }
3369 }
3370 }
3371
3372 HydroSource::Spin() => {
3373 debug_assert!(metadata.location_id.is_top_level());
3374 parse_quote! {
3375 #source_ident = spin();
3376 }
3377 }
3378
3379 HydroSource::ClusterMembers(target_loc, state) => {
3380 debug_assert!(metadata.location_id.is_top_level());
3381
3382 let members_tee_ident = syn::Ident::new(
3383 &format!(
3384 "__cluster_members_tee_{}_{}",
3385 metadata.location_id.root().key(),
3386 target_loc.key(),
3387 ),
3388 Span::call_site(),
3389 );
3390
3391 match state {
3392 ClusterMembersState::Stream(d) => {
3393 parse_quote! {
3394 #members_tee_ident = source_stream(#d) -> tee();
3395 #source_ident = #members_tee_ident;
3396 }
3397 },
3398 ClusterMembersState::Uninit => syn::parse_quote! {
3399 #source_ident = source_stream(DUMMY);
3400 },
3401 ClusterMembersState::Tee(..) => parse_quote! {
3402 #source_ident = #members_tee_ident;
3403 },
3404 }
3405 }
3406
3407 HydroSource::Embedded(ident) => {
3408 parse_quote! {
3409 #source_ident = source_stream(#ident);
3410 }
3411 }
3412
3413 HydroSource::EmbeddedSingleton(ident) => {
3414 parse_quote! {
3415 #source_ident = source_iter([#ident]);
3416 }
3417 }
3418 };
3419
3420 match builders_or_callback {
3421 BuildersOrCallback::Builders(graph_builders) => {
3422 let builder = graph_builders.get_dfir_mut(&out_location);
3423 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3424 }
3425 BuildersOrCallback::Callback(_, node_callback) => {
3426 node_callback(node, next_stmt_id);
3427 }
3428 }
3429
3430 let _ = next_stmt_id.get_and_increment();
3431
3432 ident_stack.push(source_ident);
3433 }
3434 }
3435
3436 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3437 let source_ident =
3438 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3439
3440 match builders_or_callback {
3441 BuildersOrCallback::Builders(graph_builders) => {
3442 let builder = graph_builders.get_dfir_mut(&out_location);
3443
3444 if *first_tick_only {
3445 assert!(
3446 !metadata.location_id.is_top_level(),
3447 "first_tick_only SingletonSource must be inside a tick"
3448 );
3449 }
3450
3451 if *first_tick_only
3452 || (metadata.location_id.is_top_level()
3453 && metadata.collection_kind.is_bounded())
3454 {
3455 builder.add_dfir(
3456 parse_quote! {
3457 #source_ident = source_iter([#value]);
3458 },
3459 None,
3460 Some(&next_stmt_id.to_string()),
3461 );
3462 } else {
3463 builder.add_dfir(
3464 parse_quote! {
3465 #source_ident = source_iter([#value]) -> persist::<'static>();
3466 },
3467 None,
3468 Some(&next_stmt_id.to_string()),
3469 );
3470 }
3471 }
3472 BuildersOrCallback::Callback(_, node_callback) => {
3473 node_callback(node, next_stmt_id);
3474 }
3475 }
3476
3477 let _ = next_stmt_id.get_and_increment();
3478
3479 ident_stack.push(source_ident);
3480 }
3481
3482 HydroNode::CycleSource { cycle_id, .. } => {
3483 let ident = cycle_id.as_ident();
3484
3485 match builders_or_callback {
3486 BuildersOrCallback::Builders(_) => {}
3487 BuildersOrCallback::Callback(_, node_callback) => {
3488 node_callback(node, next_stmt_id);
3489 }
3490 }
3491
3492 let _ = next_stmt_id.get_and_increment();
3494
3495 ident_stack.push(ident);
3496 }
3497
3498 HydroNode::Tee { inner, .. } => {
3499 let ret_ident = if let Some(built_idents) =
3500 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3501 {
3502 match builders_or_callback {
3503 BuildersOrCallback::Builders(_) => {}
3504 BuildersOrCallback::Callback(_, node_callback) => {
3505 node_callback(node, next_stmt_id);
3506 }
3507 }
3508
3509 built_idents[0].clone()
3510 } else {
3511 let inner_ident = ident_stack.pop().unwrap();
3514
3515 let tee_ident =
3516 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3517
3518 built_tees.insert(
3519 inner.0.as_ref() as *const RefCell<HydroNode>,
3520 vec![tee_ident.clone()],
3521 );
3522
3523 match builders_or_callback {
3524 BuildersOrCallback::Builders(graph_builders) => {
3525 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3537 fold_hooked_idents.insert(tee_ident.to_string());
3538 }
3539 let builder = graph_builders.get_dfir_mut(&out_location);
3540 builder.add_dfir(
3541 parse_quote! {
3542 #tee_ident = #inner_ident -> tee();
3543 },
3544 None,
3545 Some(&next_stmt_id.to_string()),
3546 );
3547 }
3548 BuildersOrCallback::Callback(_, node_callback) => {
3549 node_callback(node, next_stmt_id);
3550 }
3551 }
3552
3553 tee_ident
3554 };
3555
3556 let _ = next_stmt_id.get_and_increment();
3560 ident_stack.push(ret_ident);
3561 }
3562
3563 HydroNode::Singleton { inner, .. } => {
3564 let ret_ident = if let Some(built_idents) =
3565 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3566 {
3567 built_idents[0].clone()
3568 } else {
3569 let inner_ident = ident_stack.pop().unwrap();
3570
3571 let singleton_ident =
3572 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3573
3574 built_tees.insert(
3575 inner.0.as_ref() as *const RefCell<HydroNode>,
3576 vec![singleton_ident.clone()],
3577 );
3578
3579 match builders_or_callback {
3580 BuildersOrCallback::Builders(graph_builders) => {
3581 let builder = graph_builders.get_dfir_mut(&out_location);
3582 builder.add_dfir(
3583 parse_quote! {
3584 #singleton_ident = #inner_ident -> singleton();
3585 },
3586 None,
3587 Some(&next_stmt_id.to_string()),
3588 );
3589 }
3590 BuildersOrCallback::Callback(_, node_callback) => {
3591 node_callback(node, next_stmt_id);
3592 }
3593 }
3594
3595 singleton_ident
3596 };
3597
3598 let _ = next_stmt_id.get_and_increment();
3601 ident_stack.push(ret_ident);
3602 }
3603
3604 HydroNode::Partition {
3605 inner, f, is_true, ..
3606 } => {
3607 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3609 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3610 match builders_or_callback {
3611 BuildersOrCallback::Builders(_) => {}
3612 BuildersOrCallback::Callback(_, node_callback) => {
3613 node_callback(node, next_stmt_id);
3614 }
3615 }
3616
3617 let idx = if is_true { 0 } else { 1 };
3618 built_idents[idx].clone()
3619 } else {
3620 let inner_ident = ident_stack.pop().unwrap();
3623 let f_tokens = f.emit_tokens(&mut ident_stack);
3624
3625 let partition_ident = syn::Ident::new(
3626 &format!("stream_{}_partition", *next_stmt_id),
3627 Span::call_site(),
3628 );
3629 let true_ident = syn::Ident::new(
3630 &format!("stream_{}_true", *next_stmt_id),
3631 Span::call_site(),
3632 );
3633 let false_ident = syn::Ident::new(
3634 &format!("stream_{}_false", *next_stmt_id),
3635 Span::call_site(),
3636 );
3637
3638 built_tees.insert(
3639 ptr,
3640 vec![true_ident.clone(), false_ident.clone()],
3641 );
3642
3643 match builders_or_callback {
3644 BuildersOrCallback::Builders(graph_builders) => {
3645 let builder = graph_builders.get_dfir_mut(&out_location);
3646 builder.add_dfir(
3647 parse_quote! {
3648 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3649 #true_ident = #partition_ident[0];
3650 #false_ident = #partition_ident[1];
3651 },
3652 None,
3653 Some(&next_stmt_id.to_string()),
3654 );
3655 }
3656 BuildersOrCallback::Callback(_, node_callback) => {
3657 node_callback(node, next_stmt_id);
3658 }
3659 }
3660
3661 if is_true { true_ident } else { false_ident }
3662 };
3663
3664 let _ = next_stmt_id.get_and_increment();
3665 ident_stack.push(ret_ident);
3666 }
3667
3668 HydroNode::Chain { .. } => {
3669 let second_ident = ident_stack.pop().unwrap();
3671 let first_ident = ident_stack.pop().unwrap();
3672
3673 let chain_ident =
3674 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3675
3676 match builders_or_callback {
3677 BuildersOrCallback::Builders(graph_builders) => {
3678 let builder = graph_builders.get_dfir_mut(&out_location);
3679 builder.add_dfir(
3680 parse_quote! {
3681 #chain_ident = chain();
3682 #first_ident -> [0]#chain_ident;
3683 #second_ident -> [1]#chain_ident;
3684 },
3685 None,
3686 Some(&next_stmt_id.to_string()),
3687 );
3688 }
3689 BuildersOrCallback::Callback(_, node_callback) => {
3690 node_callback(node, next_stmt_id);
3691 }
3692 }
3693
3694 let _ = next_stmt_id.get_and_increment();
3695
3696 ident_stack.push(chain_ident);
3697 }
3698
3699 HydroNode::MergeOrdered { first, metadata, .. } => {
3700 let second_ident = ident_stack.pop().unwrap();
3701 let first_ident = ident_stack.pop().unwrap();
3702
3703 let merge_ident =
3704 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3705
3706 match builders_or_callback {
3707 BuildersOrCallback::Builders(graph_builders) => {
3708 graph_builders.merge_ordered(
3709 &first.metadata().location_id,
3710 first_ident,
3711 second_ident,
3712 &merge_ident,
3713 &first.metadata().collection_kind,
3714 &metadata.op,
3715 Some(&next_stmt_id.to_string()),
3716 );
3717 }
3718 BuildersOrCallback::Callback(_, node_callback) => {
3719 node_callback(node, next_stmt_id);
3720 }
3721 }
3722
3723 let _ = next_stmt_id.get_and_increment();
3724
3725 ident_stack.push(merge_ident);
3726 }
3727
3728 HydroNode::ChainFirst { .. } => {
3729 let second_ident = ident_stack.pop().unwrap();
3730 let first_ident = ident_stack.pop().unwrap();
3731
3732 let chain_ident =
3733 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3734
3735 match builders_or_callback {
3736 BuildersOrCallback::Builders(graph_builders) => {
3737 let builder = graph_builders.get_dfir_mut(&out_location);
3738 builder.add_dfir(
3739 parse_quote! {
3740 #chain_ident = chain_first_n(1);
3741 #first_ident -> [0]#chain_ident;
3742 #second_ident -> [1]#chain_ident;
3743 },
3744 None,
3745 Some(&next_stmt_id.to_string()),
3746 );
3747 }
3748 BuildersOrCallback::Callback(_, node_callback) => {
3749 node_callback(node, next_stmt_id);
3750 }
3751 }
3752
3753 let _ = next_stmt_id.get_and_increment();
3754
3755 ident_stack.push(chain_ident);
3756 }
3757
3758 HydroNode::CrossSingleton { right, .. } => {
3759 let right_ident = ident_stack.pop().unwrap();
3760 let left_ident = ident_stack.pop().unwrap();
3761
3762 let cross_ident =
3763 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3764
3765 match builders_or_callback {
3766 BuildersOrCallback::Builders(graph_builders) => {
3767 let builder = graph_builders.get_dfir_mut(&out_location);
3768
3769 if right.metadata().location_id.is_top_level()
3770 && right.metadata().collection_kind.is_bounded()
3771 {
3772 builder.add_dfir(
3773 parse_quote! {
3774 #cross_ident = cross_singleton::<'static>();
3775 #left_ident -> [input]#cross_ident;
3776 #right_ident -> [single]#cross_ident;
3777 },
3778 None,
3779 Some(&next_stmt_id.to_string()),
3780 );
3781 } else {
3782 builder.add_dfir(
3783 parse_quote! {
3784 #cross_ident = cross_singleton();
3785 #left_ident -> [input]#cross_ident;
3786 #right_ident -> [single]#cross_ident;
3787 },
3788 None,
3789 Some(&next_stmt_id.to_string()),
3790 );
3791 }
3792 }
3793 BuildersOrCallback::Callback(_, node_callback) => {
3794 node_callback(node, next_stmt_id);
3795 }
3796 }
3797
3798 let _ = next_stmt_id.get_and_increment();
3799
3800 ident_stack.push(cross_ident);
3801 }
3802
3803 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3804 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3805 parse_quote!(cross_join_multiset)
3806 } else {
3807 parse_quote!(join_multiset)
3808 };
3809
3810 let (HydroNode::CrossProduct { left, right, .. }
3811 | HydroNode::Join { left, right, .. }) = node
3812 else {
3813 unreachable!()
3814 };
3815
3816 let is_top_level = left.metadata().location_id.is_top_level()
3817 && right.metadata().location_id.is_top_level();
3818 let left_lifetime = if left.metadata().location_id.is_top_level() {
3819 quote!('static)
3820 } else {
3821 quote!('tick)
3822 };
3823
3824 let right_lifetime = if right.metadata().location_id.is_top_level() {
3825 quote!('static)
3826 } else {
3827 quote!('tick)
3828 };
3829
3830 let right_ident = ident_stack.pop().unwrap();
3831 let left_ident = ident_stack.pop().unwrap();
3832
3833 let stream_ident =
3834 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3835
3836 match builders_or_callback {
3837 BuildersOrCallback::Builders(graph_builders) => {
3838 let builder = graph_builders.get_dfir_mut(&out_location);
3839 builder.add_dfir(
3840 if is_top_level {
3841 parse_quote! {
3844 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3845 #left_ident -> [0]#stream_ident;
3846 #right_ident -> [1]#stream_ident;
3847 }
3848 } else {
3849 parse_quote! {
3850 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3851 #left_ident -> [0]#stream_ident;
3852 #right_ident -> [1]#stream_ident;
3853 }
3854 }
3855 ,
3856 None,
3857 Some(&next_stmt_id.to_string()),
3858 );
3859 }
3860 BuildersOrCallback::Callback(_, node_callback) => {
3861 node_callback(node, next_stmt_id);
3862 }
3863 }
3864
3865 let _ = next_stmt_id.get_and_increment();
3866
3867 ident_stack.push(stream_ident);
3868 }
3869
3870 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3871 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3872 parse_quote!(difference)
3873 } else {
3874 parse_quote!(anti_join)
3875 };
3876
3877 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3878 node
3879 else {
3880 unreachable!()
3881 };
3882
3883 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3884 quote!('static)
3885 } else {
3886 quote!('tick)
3887 };
3888
3889 let neg_ident = ident_stack.pop().unwrap();
3890 let pos_ident = ident_stack.pop().unwrap();
3891
3892 let stream_ident =
3893 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3894
3895 match builders_or_callback {
3896 BuildersOrCallback::Builders(graph_builders) => {
3897 let builder = graph_builders.get_dfir_mut(&out_location);
3898 builder.add_dfir(
3899 parse_quote! {
3900 #stream_ident = #operator::<'tick, #neg_lifetime>();
3901 #pos_ident -> [pos]#stream_ident;
3902 #neg_ident -> [neg]#stream_ident;
3903 },
3904 None,
3905 Some(&next_stmt_id.to_string()),
3906 );
3907 }
3908 BuildersOrCallback::Callback(_, node_callback) => {
3909 node_callback(node, next_stmt_id);
3910 }
3911 }
3912
3913 let _ = next_stmt_id.get_and_increment();
3914
3915 ident_stack.push(stream_ident);
3916 }
3917
3918 HydroNode::JoinHalf { .. } => {
3919 let HydroNode::JoinHalf { right, .. } = node else {
3920 unreachable!()
3921 };
3922
3923 assert!(
3924 right.metadata().collection_kind.is_bounded(),
3925 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3926 right.metadata().collection_kind
3927 );
3928
3929 let build_lifetime = if right.metadata().location_id.is_top_level() {
3930 quote!('static)
3931 } else {
3932 quote!('tick)
3933 };
3934
3935 let build_ident = ident_stack.pop().unwrap();
3936 let probe_ident = ident_stack.pop().unwrap();
3937
3938 let stream_ident =
3939 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3940
3941 match builders_or_callback {
3942 BuildersOrCallback::Builders(graph_builders) => {
3943 let builder = graph_builders.get_dfir_mut(&out_location);
3944 builder.add_dfir(
3945 parse_quote! {
3946 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3947 #probe_ident -> [probe]#stream_ident;
3948 #build_ident -> [build]#stream_ident;
3949 },
3950 None,
3951 Some(&next_stmt_id.to_string()),
3952 );
3953 }
3954 BuildersOrCallback::Callback(_, node_callback) => {
3955 node_callback(node, next_stmt_id);
3956 }
3957 }
3958
3959 let _ = next_stmt_id.get_and_increment();
3960
3961 ident_stack.push(stream_ident);
3962 }
3963
3964 HydroNode::ResolveFutures { .. } => {
3965 let input_ident = ident_stack.pop().unwrap();
3966
3967 let futures_ident =
3968 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3969
3970 match builders_or_callback {
3971 BuildersOrCallback::Builders(graph_builders) => {
3972 let builder = graph_builders.get_dfir_mut(&out_location);
3973 builder.add_dfir(
3974 parse_quote! {
3975 #futures_ident = #input_ident -> resolve_futures();
3976 },
3977 None,
3978 Some(&next_stmt_id.to_string()),
3979 );
3980 }
3981 BuildersOrCallback::Callback(_, node_callback) => {
3982 node_callback(node, next_stmt_id);
3983 }
3984 }
3985
3986 let _ = next_stmt_id.get_and_increment();
3987
3988 ident_stack.push(futures_ident);
3989 }
3990
3991 HydroNode::ResolveFuturesBlocking { .. } => {
3992 let input_ident = ident_stack.pop().unwrap();
3993
3994 let futures_ident =
3995 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3996
3997 match builders_or_callback {
3998 BuildersOrCallback::Builders(graph_builders) => {
3999 let builder = graph_builders.get_dfir_mut(&out_location);
4000 builder.add_dfir(
4001 parse_quote! {
4002 #futures_ident = #input_ident -> resolve_futures_blocking();
4003 },
4004 None,
4005 Some(&next_stmt_id.to_string()),
4006 );
4007 }
4008 BuildersOrCallback::Callback(_, node_callback) => {
4009 node_callback(node, next_stmt_id);
4010 }
4011 }
4012
4013 let _ = next_stmt_id.get_and_increment();
4014
4015 ident_stack.push(futures_ident);
4016 }
4017
4018 HydroNode::ResolveFuturesOrdered { .. } => {
4019 let input_ident = ident_stack.pop().unwrap();
4020
4021 let futures_ident =
4022 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4023
4024 match builders_or_callback {
4025 BuildersOrCallback::Builders(graph_builders) => {
4026 let builder = graph_builders.get_dfir_mut(&out_location);
4027 builder.add_dfir(
4028 parse_quote! {
4029 #futures_ident = #input_ident -> resolve_futures_ordered();
4030 },
4031 None,
4032 Some(&next_stmt_id.to_string()),
4033 );
4034 }
4035 BuildersOrCallback::Callback(_, node_callback) => {
4036 node_callback(node, next_stmt_id);
4037 }
4038 }
4039
4040 let _ = next_stmt_id.get_and_increment();
4041
4042 ident_stack.push(futures_ident);
4043 }
4044
4045 HydroNode::Map { f, .. } => {
4046 let input_ident = ident_stack.pop().unwrap();
4048 let f_tokens = f.emit_tokens(&mut ident_stack);
4049
4050 let map_ident =
4051 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4052
4053 match builders_or_callback {
4054 BuildersOrCallback::Builders(graph_builders) => {
4055 let builder = graph_builders.get_dfir_mut(&out_location);
4056 builder.add_dfir(
4057 parse_quote! {
4058 #map_ident = #input_ident -> map(#f_tokens);
4059 },
4060 None,
4061 Some(&next_stmt_id.to_string()),
4062 );
4063 }
4064 BuildersOrCallback::Callback(_, node_callback) => {
4065 node_callback(node, next_stmt_id);
4066 }
4067 }
4068
4069 let _ = next_stmt_id.get_and_increment();
4070
4071 ident_stack.push(map_ident);
4072 }
4073
4074 HydroNode::FlatMap { f, .. } => {
4075 let input_ident = ident_stack.pop().unwrap();
4076 let f_tokens = f.emit_tokens(&mut ident_stack);
4077
4078 let flat_map_ident =
4079 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4080
4081 match builders_or_callback {
4082 BuildersOrCallback::Builders(graph_builders) => {
4083 let builder = graph_builders.get_dfir_mut(&out_location);
4084 builder.add_dfir(
4085 parse_quote! {
4086 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4087 },
4088 None,
4089 Some(&next_stmt_id.to_string()),
4090 );
4091 }
4092 BuildersOrCallback::Callback(_, node_callback) => {
4093 node_callback(node, next_stmt_id);
4094 }
4095 }
4096
4097 let _ = next_stmt_id.get_and_increment();
4098
4099 ident_stack.push(flat_map_ident);
4100 }
4101
4102 HydroNode::FlatMapStreamBlocking { f, .. } => {
4103 let input_ident = ident_stack.pop().unwrap();
4104 let f_tokens = f.emit_tokens(&mut ident_stack);
4105
4106 let flat_map_stream_blocking_ident =
4107 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4108
4109 match builders_or_callback {
4110 BuildersOrCallback::Builders(graph_builders) => {
4111 let builder = graph_builders.get_dfir_mut(&out_location);
4112 builder.add_dfir(
4113 parse_quote! {
4114 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4115 },
4116 None,
4117 Some(&next_stmt_id.to_string()),
4118 );
4119 }
4120 BuildersOrCallback::Callback(_, node_callback) => {
4121 node_callback(node, next_stmt_id);
4122 }
4123 }
4124
4125 let _ = next_stmt_id.get_and_increment();
4126
4127 ident_stack.push(flat_map_stream_blocking_ident);
4128 }
4129
4130 HydroNode::Filter { f, .. } => {
4131 let input_ident = ident_stack.pop().unwrap();
4132 let f_tokens = f.emit_tokens(&mut ident_stack);
4133
4134 let filter_ident =
4135 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4136
4137 match builders_or_callback {
4138 BuildersOrCallback::Builders(graph_builders) => {
4139 let builder = graph_builders.get_dfir_mut(&out_location);
4140 builder.add_dfir(
4141 parse_quote! {
4142 #filter_ident = #input_ident -> filter(#f_tokens);
4143 },
4144 None,
4145 Some(&next_stmt_id.to_string()),
4146 );
4147 }
4148 BuildersOrCallback::Callback(_, node_callback) => {
4149 node_callback(node, next_stmt_id);
4150 }
4151 }
4152
4153 let _ = next_stmt_id.get_and_increment();
4154
4155 ident_stack.push(filter_ident);
4156 }
4157
4158 HydroNode::FilterMap { f, .. } => {
4159 let input_ident = ident_stack.pop().unwrap();
4160 let f_tokens = f.emit_tokens(&mut ident_stack);
4161
4162 let filter_map_ident =
4163 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4164
4165 match builders_or_callback {
4166 BuildersOrCallback::Builders(graph_builders) => {
4167 let builder = graph_builders.get_dfir_mut(&out_location);
4168 builder.add_dfir(
4169 parse_quote! {
4170 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4171 },
4172 None,
4173 Some(&next_stmt_id.to_string()),
4174 );
4175 }
4176 BuildersOrCallback::Callback(_, node_callback) => {
4177 node_callback(node, next_stmt_id);
4178 }
4179 }
4180
4181 let _ = next_stmt_id.get_and_increment();
4182
4183 ident_stack.push(filter_map_ident);
4184 }
4185
4186 HydroNode::Sort { .. } => {
4187 let input_ident = ident_stack.pop().unwrap();
4188
4189 let sort_ident =
4190 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4191
4192 match builders_or_callback {
4193 BuildersOrCallback::Builders(graph_builders) => {
4194 let builder = graph_builders.get_dfir_mut(&out_location);
4195 builder.add_dfir(
4196 parse_quote! {
4197 #sort_ident = #input_ident -> sort();
4198 },
4199 None,
4200 Some(&next_stmt_id.to_string()),
4201 );
4202 }
4203 BuildersOrCallback::Callback(_, node_callback) => {
4204 node_callback(node, next_stmt_id);
4205 }
4206 }
4207
4208 let _ = next_stmt_id.get_and_increment();
4209
4210 ident_stack.push(sort_ident);
4211 }
4212
4213 HydroNode::DeferTick { .. } => {
4214 let input_ident = ident_stack.pop().unwrap();
4215
4216 let defer_tick_ident =
4217 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4218
4219 match builders_or_callback {
4220 BuildersOrCallback::Builders(graph_builders) => {
4221 let builder = graph_builders.get_dfir_mut(&out_location);
4222 builder.add_dfir(
4223 parse_quote! {
4224 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4225 },
4226 None,
4227 Some(&next_stmt_id.to_string()),
4228 );
4229 }
4230 BuildersOrCallback::Callback(_, node_callback) => {
4231 node_callback(node, next_stmt_id);
4232 }
4233 }
4234
4235 let _ = next_stmt_id.get_and_increment();
4236
4237 ident_stack.push(defer_tick_ident);
4238 }
4239
4240 HydroNode::Enumerate { input, .. } => {
4241 let input_ident = ident_stack.pop().unwrap();
4242
4243 let enumerate_ident =
4244 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4245
4246 match builders_or_callback {
4247 BuildersOrCallback::Builders(graph_builders) => {
4248 let builder = graph_builders.get_dfir_mut(&out_location);
4249 let lifetime = if input.metadata().location_id.is_top_level() {
4250 quote!('static)
4251 } else {
4252 quote!('tick)
4253 };
4254 builder.add_dfir(
4255 parse_quote! {
4256 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4257 },
4258 None,
4259 Some(&next_stmt_id.to_string()),
4260 );
4261 }
4262 BuildersOrCallback::Callback(_, node_callback) => {
4263 node_callback(node, next_stmt_id);
4264 }
4265 }
4266
4267 let _ = next_stmt_id.get_and_increment();
4268
4269 ident_stack.push(enumerate_ident);
4270 }
4271
4272 HydroNode::Inspect { f, .. } => {
4273 let input_ident = ident_stack.pop().unwrap();
4274 let f_tokens = f.emit_tokens(&mut ident_stack);
4275
4276 let inspect_ident =
4277 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4278
4279 match builders_or_callback {
4280 BuildersOrCallback::Builders(graph_builders) => {
4281 let builder = graph_builders.get_dfir_mut(&out_location);
4282 builder.add_dfir(
4283 parse_quote! {
4284 #inspect_ident = #input_ident -> inspect(#f_tokens);
4285 },
4286 None,
4287 Some(&next_stmt_id.to_string()),
4288 );
4289 }
4290 BuildersOrCallback::Callback(_, node_callback) => {
4291 node_callback(node, next_stmt_id);
4292 }
4293 }
4294
4295 let _ = next_stmt_id.get_and_increment();
4296
4297 ident_stack.push(inspect_ident);
4298 }
4299
4300 HydroNode::Unique { input, .. } => {
4301 let input_ident = ident_stack.pop().unwrap();
4302
4303 let unique_ident =
4304 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4305
4306 match builders_or_callback {
4307 BuildersOrCallback::Builders(graph_builders) => {
4308 let builder = graph_builders.get_dfir_mut(&out_location);
4309 let lifetime = if input.metadata().location_id.is_top_level() {
4310 quote!('static)
4311 } else {
4312 quote!('tick)
4313 };
4314
4315 builder.add_dfir(
4316 parse_quote! {
4317 #unique_ident = #input_ident -> unique::<#lifetime>();
4318 },
4319 None,
4320 Some(&next_stmt_id.to_string()),
4321 );
4322 }
4323 BuildersOrCallback::Callback(_, node_callback) => {
4324 node_callback(node, next_stmt_id);
4325 }
4326 }
4327
4328 let _ = next_stmt_id.get_and_increment();
4329
4330 ident_stack.push(unique_ident);
4331 }
4332
4333 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4334 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4335 if input.metadata().location_id.is_top_level()
4336 && input.metadata().collection_kind.is_bounded()
4337 {
4338 parse_quote!(fold_no_replay)
4339 } else {
4340 parse_quote!(fold)
4341 }
4342 } else if matches!(node, HydroNode::Scan { .. }) {
4343 parse_quote!(scan)
4344 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4345 parse_quote!(scan_async_blocking)
4346 } else if let HydroNode::FoldKeyed { input, .. } = node {
4347 if input.metadata().location_id.is_top_level()
4348 && input.metadata().collection_kind.is_bounded()
4349 {
4350 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4351 } else {
4352 parse_quote!(fold_keyed)
4353 }
4354 } else {
4355 unreachable!()
4356 };
4357
4358 let (HydroNode::Fold { input, .. }
4359 | HydroNode::FoldKeyed { input, .. }
4360 | HydroNode::Scan { input, .. }
4361 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4362 else {
4363 unreachable!()
4364 };
4365
4366 let lifetime = if input.metadata().location_id.is_top_level() {
4367 quote!('static)
4368 } else {
4369 quote!('tick)
4370 };
4371
4372 let input_ident = ident_stack.pop().unwrap();
4373
4374 let (HydroNode::Fold { init, acc, .. }
4375 | HydroNode::FoldKeyed { init, acc, .. }
4376 | HydroNode::Scan { init, acc, .. }
4377 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4378 else {
4379 unreachable!()
4380 };
4381
4382 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4383 let init_tokens = init.emit_tokens(&mut ident_stack);
4384
4385 let fold_ident =
4386 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4387
4388 match builders_or_callback {
4389 BuildersOrCallback::Builders(graph_builders) => {
4390 if matches!(node, HydroNode::Fold { .. })
4391 && node.metadata().location_id.is_top_level()
4392 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4393 && graph_builders.singleton_intermediates()
4394 && !node.metadata().collection_kind.is_bounded()
4395 {
4396 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4397 let hooked_input_ident = graph_builders.emit_fold_hook(
4398 &input.metadata().location_id,
4399 &input_ident,
4400 &input.metadata().collection_kind,
4401 &node.metadata().op,
4402 );
4403
4404 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4405 let acc: syn::Expr = parse_quote!({
4406 let mut __inner = #acc_tokens;
4407 move |__state, __batch: Vec<_>| {
4408 if __batch.is_empty() {
4409 return None;
4410 }
4411 for __value in __batch {
4412 __inner(__state, __value);
4413 }
4414 Some(__state.clone())
4415 }
4416 });
4417 (hooked, acc)
4418 } else {
4419 let acc: syn::Expr = parse_quote!({
4420 let mut __inner = #acc_tokens;
4421 move |__state, __value| {
4422 __inner(__state, __value);
4423 Some(__state.clone())
4424 }
4425 });
4426 (&input_ident, acc)
4427 };
4428
4429 let builder = graph_builders.get_dfir_mut(&out_location);
4430 builder.add_dfir(
4431 parse_quote! {
4432 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4433 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4434 #fold_ident = chain();
4435 },
4436 None,
4437 Some(&next_stmt_id.to_string()),
4438 );
4439
4440 if hooked_input_ident.is_some() {
4441 fold_hooked_idents.insert(fold_ident.to_string());
4442 }
4443 } else if matches!(node, HydroNode::FoldKeyed { .. })
4444 && node.metadata().location_id.is_top_level()
4445 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4446 && graph_builders.singleton_intermediates()
4447 && !node.metadata().collection_kind.is_bounded()
4448 {
4449 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4450 let hooked_input_ident = graph_builders.emit_fold_hook(
4451 &input.metadata().location_id,
4452 &input_ident,
4453 &input.metadata().collection_kind,
4454 &node.metadata().op,
4455 );
4456 let builder = graph_builders.get_dfir_mut(&out_location);
4457
4458 let wrapped_acc: syn::Expr = parse_quote!({
4459 let mut __init = #init_tokens;
4460 let mut __inner = #acc_tokens;
4461 move |__state, __kv: (_, _)| {
4462 let __state = __state
4464 .entry(::std::clone::Clone::clone(&__kv.0))
4465 .or_insert_with(|| (__init)());
4466 __inner(__state, __kv.1);
4467 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4468 }
4469 });
4470
4471 if let Some(hooked_input_ident) = hooked_input_ident {
4472 builder.add_dfir(
4473 parse_quote! {
4474 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4475 },
4476 None,
4477 Some(&next_stmt_id.to_string()),
4478 );
4479
4480 fold_hooked_idents.insert(fold_ident.to_string());
4481 } else {
4482 builder.add_dfir(
4483 parse_quote! {
4484 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4485 },
4486 None,
4487 Some(&next_stmt_id.to_string()),
4488 );
4489 }
4490 } else if (matches!(node, HydroNode::Fold { .. })
4491 || matches!(node, HydroNode::FoldKeyed { .. }))
4492 && !node.metadata().location_id.is_top_level()
4493 && graph_builders.singleton_intermediates()
4494 {
4495 let input_ref = match &*node {
4496 HydroNode::Fold { input, .. } => input,
4497 HydroNode::FoldKeyed { input, .. } => input,
4498 _ => unreachable!(),
4499 };
4500 let hooked_input_ident = graph_builders.emit_fold_hook(
4501 &input_ref.metadata().location_id,
4502 &input_ident,
4503 &input_ref.metadata().collection_kind,
4504 &node.metadata().op,
4505 );
4506
4507 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4508 let builder = graph_builders.get_dfir_mut(&out_location);
4509 builder.add_dfir(
4510 parse_quote! {
4511 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4512 },
4513 None,
4514 Some(&next_stmt_id.to_string()),
4515 );
4516 } else {
4517 let builder = graph_builders.get_dfir_mut(&out_location);
4518 builder.add_dfir(
4519 parse_quote! {
4520 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4521 },
4522 None,
4523 Some(&next_stmt_id.to_string()),
4524 );
4525 }
4526 }
4527 BuildersOrCallback::Callback(_, node_callback) => {
4528 node_callback(node, next_stmt_id);
4529 }
4530 }
4531
4532 let _ = next_stmt_id.get_and_increment();
4533
4534 ident_stack.push(fold_ident);
4535 }
4536
4537 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4538 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4539 if input.metadata().location_id.is_top_level()
4540 && input.metadata().collection_kind.is_bounded()
4541 {
4542 parse_quote!(reduce_no_replay)
4543 } else {
4544 parse_quote!(reduce)
4545 }
4546 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4547 if input.metadata().location_id.is_top_level()
4548 && input.metadata().collection_kind.is_bounded()
4549 {
4550 todo!(
4551 "Calling keyed reduce on a top-level bounded collection is not supported"
4552 )
4553 } else {
4554 parse_quote!(reduce_keyed)
4555 }
4556 } else {
4557 unreachable!()
4558 };
4559
4560 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4561 else {
4562 unreachable!()
4563 };
4564
4565 let lifetime = if input.metadata().location_id.is_top_level() {
4566 quote!('static)
4567 } else {
4568 quote!('tick)
4569 };
4570
4571 let input_ident = ident_stack.pop().unwrap();
4572
4573 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4574 else {
4575 unreachable!()
4576 };
4577
4578 let f_tokens = f.emit_tokens(&mut ident_stack);
4579
4580 let reduce_ident =
4581 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4582
4583 match builders_or_callback {
4584 BuildersOrCallback::Builders(graph_builders) => {
4585 if matches!(node, HydroNode::Reduce { .. })
4586 && node.metadata().location_id.is_top_level()
4587 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4588 && graph_builders.singleton_intermediates()
4589 && !node.metadata().collection_kind.is_bounded()
4590 {
4591 todo!(
4592 "Reduce with optional intermediates is not yet supported in simulator"
4593 );
4594 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4595 && node.metadata().location_id.is_top_level()
4596 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4597 && graph_builders.singleton_intermediates()
4598 && !node.metadata().collection_kind.is_bounded()
4599 {
4600 todo!(
4601 "Reduce keyed with optional intermediates is not yet supported in simulator"
4602 );
4603 } else {
4604 let builder = graph_builders.get_dfir_mut(&out_location);
4605 builder.add_dfir(
4606 parse_quote! {
4607 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4608 },
4609 None,
4610 Some(&next_stmt_id.to_string()),
4611 );
4612 }
4613 }
4614 BuildersOrCallback::Callback(_, node_callback) => {
4615 node_callback(node, next_stmt_id);
4616 }
4617 }
4618
4619 let _ = next_stmt_id.get_and_increment();
4620
4621 ident_stack.push(reduce_ident);
4622 }
4623
4624 HydroNode::ReduceKeyedWatermark {
4625 f,
4626 input,
4627 metadata,
4628 ..
4629 } => {
4630 let lifetime = if input.metadata().location_id.is_top_level() {
4631 quote!('static)
4632 } else {
4633 quote!('tick)
4634 };
4635
4636 let watermark_ident = ident_stack.pop().unwrap();
4638 let input_ident = ident_stack.pop().unwrap();
4639 let f_tokens = f.emit_tokens(&mut ident_stack);
4640
4641 let chain_ident = syn::Ident::new(
4642 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4643 Span::call_site(),
4644 );
4645
4646 let fold_ident =
4647 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4648
4649 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4650 && input.metadata().collection_kind.is_bounded()
4651 {
4652 parse_quote!(fold_no_replay)
4653 } else {
4654 parse_quote!(fold)
4655 };
4656
4657 match builders_or_callback {
4658 BuildersOrCallback::Builders(graph_builders) => {
4659 if metadata.location_id.is_top_level()
4660 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4661 && graph_builders.singleton_intermediates()
4662 && !metadata.collection_kind.is_bounded()
4663 {
4664 todo!(
4665 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4666 )
4667 } else {
4668 let builder = graph_builders.get_dfir_mut(&out_location);
4669 builder.add_dfir(
4670 parse_quote! {
4671 #chain_ident = chain();
4672 #input_ident
4673 -> map(|x| (Some(x), None))
4674 -> [0]#chain_ident;
4675 #watermark_ident
4676 -> map(|watermark| (None, Some(watermark)))
4677 -> [1]#chain_ident;
4678
4679 #fold_ident = #chain_ident
4680 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4681 let __reduce_keyed_fn = #f_tokens;
4682 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4683 if let Some((k, v)) = opt_payload {
4684 if let Some(curr_watermark) = *opt_curr_watermark {
4685 if k < curr_watermark {
4686 return;
4687 }
4688 }
4689 match map.entry(k) {
4690 ::std::collections::hash_map::Entry::Vacant(e) => {
4691 e.insert(v);
4692 }
4693 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4694 __reduce_keyed_fn(e.get_mut(), v);
4695 }
4696 }
4697 } else {
4698 let watermark = opt_watermark.unwrap();
4699 if let Some(curr_watermark) = *opt_curr_watermark {
4700 if watermark <= curr_watermark {
4701 return;
4702 }
4703 }
4704 map.retain(|k, _| *k >= watermark);
4705 *opt_curr_watermark = Some(watermark);
4706 }
4707 }
4708 })
4709 -> flat_map(|(map, _curr_watermark)| map);
4710 },
4711 None,
4712 Some(&next_stmt_id.to_string()),
4713 );
4714 }
4715 }
4716 BuildersOrCallback::Callback(_, node_callback) => {
4717 node_callback(node, next_stmt_id);
4718 }
4719 }
4720
4721 let _ = next_stmt_id.get_and_increment();
4722
4723 ident_stack.push(fold_ident);
4724 }
4725
4726 HydroNode::Network {
4727 networking_info,
4728 serialize_fn: serialize_pipeline,
4729 instantiate_fn,
4730 deserialize_fn: deserialize_pipeline,
4731 input,
4732 ..
4733 } => {
4734 let input_ident = ident_stack.pop().unwrap();
4735
4736 let receiver_stream_ident =
4737 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4738
4739 match builders_or_callback {
4740 BuildersOrCallback::Builders(graph_builders) => {
4741 let (sink_expr, source_expr) = match instantiate_fn {
4742 DebugInstantiate::Building => (
4743 syn::parse_quote!(DUMMY_SINK),
4744 syn::parse_quote!(DUMMY_SOURCE),
4745 ),
4746
4747 DebugInstantiate::Finalized(finalized) => {
4748 (finalized.sink.clone(), finalized.source.clone())
4749 }
4750 };
4751
4752 graph_builders.create_network(
4753 &input.metadata().location_id,
4754 &out_location,
4755 input_ident,
4756 &receiver_stream_ident,
4757 serialize_pipeline.as_ref(),
4758 sink_expr,
4759 source_expr,
4760 deserialize_pipeline.as_ref(),
4761 *next_stmt_id,
4762 networking_info,
4763 );
4764 }
4765 BuildersOrCallback::Callback(_, node_callback) => {
4766 node_callback(node, next_stmt_id);
4767 }
4768 }
4769
4770 let _ = next_stmt_id.get_and_increment();
4771
4772 ident_stack.push(receiver_stream_ident);
4773 }
4774
4775 HydroNode::ExternalInput {
4776 instantiate_fn,
4777 deserialize_fn: deserialize_pipeline,
4778 ..
4779 } => {
4780 let receiver_stream_ident =
4781 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4782
4783 match builders_or_callback {
4784 BuildersOrCallback::Builders(graph_builders) => {
4785 let (_, source_expr) = match instantiate_fn {
4786 DebugInstantiate::Building => (
4787 syn::parse_quote!(DUMMY_SINK),
4788 syn::parse_quote!(DUMMY_SOURCE),
4789 ),
4790
4791 DebugInstantiate::Finalized(finalized) => {
4792 (finalized.sink.clone(), finalized.source.clone())
4793 }
4794 };
4795
4796 graph_builders.create_external_source(
4797 &out_location,
4798 source_expr,
4799 &receiver_stream_ident,
4800 deserialize_pipeline.as_ref(),
4801 *next_stmt_id,
4802 );
4803 }
4804 BuildersOrCallback::Callback(_, node_callback) => {
4805 node_callback(node, next_stmt_id);
4806 }
4807 }
4808
4809 let _ = next_stmt_id.get_and_increment();
4810
4811 ident_stack.push(receiver_stream_ident);
4812 }
4813
4814 HydroNode::Counter {
4815 tag,
4816 duration,
4817 prefix,
4818 ..
4819 } => {
4820 let input_ident = ident_stack.pop().unwrap();
4821
4822 let counter_ident =
4823 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4824
4825 match builders_or_callback {
4826 BuildersOrCallback::Builders(graph_builders) => {
4827 let arg = format!("{}({})", prefix, tag);
4828 let builder = graph_builders.get_dfir_mut(&out_location);
4829 builder.add_dfir(
4830 parse_quote! {
4831 #counter_ident = #input_ident -> _counter(#arg, #duration);
4832 },
4833 None,
4834 Some(&next_stmt_id.to_string()),
4835 );
4836 }
4837 BuildersOrCallback::Callback(_, node_callback) => {
4838 node_callback(node, next_stmt_id);
4839 }
4840 }
4841
4842 let _ = next_stmt_id.get_and_increment();
4843
4844 ident_stack.push(counter_ident);
4845 }
4846 }
4847 },
4848 seen_tees,
4849 false,
4850 );
4851
4852 let ret = ident_stack
4853 .pop()
4854 .expect("ident_stack should have exactly one element after traversal");
4855 assert!(
4856 ident_stack.is_empty(),
4857 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4858 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4859 ident_stack.len()
4860 );
4861 ret
4862 }
4863
4864 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4865 match self {
4866 HydroNode::Placeholder => {
4867 panic!()
4868 }
4869 HydroNode::Cast { .. }
4870 | HydroNode::ObserveNonDet { .. }
4871 | HydroNode::UnboundSingleton { .. }
4872 | HydroNode::AssertIsConsistent { .. } => {}
4873 HydroNode::Source { source, .. } => match source {
4874 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4875 HydroSource::ExternalNetwork()
4876 | HydroSource::Spin()
4877 | HydroSource::ClusterMembers(_, _)
4878 | HydroSource::Embedded(_)
4879 | HydroSource::EmbeddedSingleton(_) => {} },
4881 HydroNode::SingletonSource { value, .. } => {
4882 transform(value);
4883 }
4884 HydroNode::CycleSource { .. }
4885 | HydroNode::Tee { .. }
4886 | HydroNode::Singleton { .. }
4887 | HydroNode::YieldConcat { .. }
4888 | HydroNode::BeginAtomic { .. }
4889 | HydroNode::EndAtomic { .. }
4890 | HydroNode::Batch { .. }
4891 | HydroNode::Chain { .. }
4892 | HydroNode::MergeOrdered { .. }
4893 | HydroNode::ChainFirst { .. }
4894 | HydroNode::CrossProduct { .. }
4895 | HydroNode::CrossSingleton { .. }
4896 | HydroNode::ResolveFutures { .. }
4897 | HydroNode::ResolveFuturesBlocking { .. }
4898 | HydroNode::ResolveFuturesOrdered { .. }
4899 | HydroNode::Join { .. }
4900 | HydroNode::JoinHalf { .. }
4901 | HydroNode::Difference { .. }
4902 | HydroNode::AntiJoin { .. }
4903 | HydroNode::DeferTick { .. }
4904 | HydroNode::Enumerate { .. }
4905 | HydroNode::Unique { .. }
4906 | HydroNode::Sort { .. } => {}
4907 HydroNode::Map { f, .. }
4908 | HydroNode::FlatMap { f, .. }
4909 | HydroNode::FlatMapStreamBlocking { f, .. }
4910 | HydroNode::Filter { f, .. }
4911 | HydroNode::FilterMap { f, .. }
4912 | HydroNode::Inspect { f, .. }
4913 | HydroNode::Partition { f, .. }
4914 | HydroNode::Reduce { f, .. }
4915 | HydroNode::ReduceKeyed { f, .. }
4916 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4917 transform(&mut f.expr);
4918 }
4919 HydroNode::Fold { init, acc, .. }
4920 | HydroNode::Scan { init, acc, .. }
4921 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4922 | HydroNode::FoldKeyed { init, acc, .. } => {
4923 transform(&mut init.expr);
4924 transform(&mut acc.expr);
4925 }
4926 HydroNode::Network {
4927 serialize_fn,
4928 deserialize_fn,
4929 ..
4930 } => {
4931 if let Some(serialize_fn) = serialize_fn {
4932 transform(serialize_fn);
4933 }
4934 if let Some(deserialize_fn) = deserialize_fn {
4935 transform(deserialize_fn);
4936 }
4937 }
4938 HydroNode::ExternalInput { deserialize_fn, .. } => {
4939 if let Some(deserialize_fn) = deserialize_fn {
4940 transform(deserialize_fn);
4941 }
4942 }
4943 HydroNode::Counter { duration, .. } => {
4944 transform(duration);
4945 }
4946 }
4947 }
4948
4949 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4950 &self.metadata().op
4951 }
4952
4953 pub fn metadata(&self) -> &HydroIrMetadata {
4954 match self {
4955 HydroNode::Placeholder => {
4956 panic!()
4957 }
4958 HydroNode::Cast { metadata, .. }
4959 | HydroNode::ObserveNonDet { metadata, .. }
4960 | HydroNode::AssertIsConsistent { metadata, .. }
4961 | HydroNode::UnboundSingleton { metadata, .. }
4962 | HydroNode::Source { metadata, .. }
4963 | HydroNode::SingletonSource { metadata, .. }
4964 | HydroNode::CycleSource { metadata, .. }
4965 | HydroNode::Tee { metadata, .. }
4966 | HydroNode::Singleton { metadata, .. }
4967 | HydroNode::Partition { metadata, .. }
4968 | HydroNode::YieldConcat { metadata, .. }
4969 | HydroNode::BeginAtomic { metadata, .. }
4970 | HydroNode::EndAtomic { metadata, .. }
4971 | HydroNode::Batch { metadata, .. }
4972 | HydroNode::Chain { metadata, .. }
4973 | HydroNode::MergeOrdered { metadata, .. }
4974 | HydroNode::ChainFirst { metadata, .. }
4975 | HydroNode::CrossProduct { metadata, .. }
4976 | HydroNode::CrossSingleton { metadata, .. }
4977 | HydroNode::Join { metadata, .. }
4978 | HydroNode::JoinHalf { metadata, .. }
4979 | HydroNode::Difference { metadata, .. }
4980 | HydroNode::AntiJoin { metadata, .. }
4981 | HydroNode::ResolveFutures { metadata, .. }
4982 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4983 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4984 | HydroNode::Map { metadata, .. }
4985 | HydroNode::FlatMap { metadata, .. }
4986 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4987 | HydroNode::Filter { metadata, .. }
4988 | HydroNode::FilterMap { metadata, .. }
4989 | HydroNode::DeferTick { metadata, .. }
4990 | HydroNode::Enumerate { metadata, .. }
4991 | HydroNode::Inspect { metadata, .. }
4992 | HydroNode::Unique { metadata, .. }
4993 | HydroNode::Sort { metadata, .. }
4994 | HydroNode::Scan { metadata, .. }
4995 | HydroNode::ScanAsyncBlocking { metadata, .. }
4996 | HydroNode::Fold { metadata, .. }
4997 | HydroNode::FoldKeyed { metadata, .. }
4998 | HydroNode::Reduce { metadata, .. }
4999 | HydroNode::ReduceKeyed { metadata, .. }
5000 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5001 | HydroNode::ExternalInput { metadata, .. }
5002 | HydroNode::Network { metadata, .. }
5003 | HydroNode::Counter { metadata, .. } => metadata,
5004 }
5005 }
5006
5007 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5008 &mut self.metadata_mut().op
5009 }
5010
5011 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5012 match self {
5013 HydroNode::Placeholder => {
5014 panic!()
5015 }
5016 HydroNode::Cast { metadata, .. }
5017 | HydroNode::ObserveNonDet { metadata, .. }
5018 | HydroNode::AssertIsConsistent { metadata, .. }
5019 | HydroNode::UnboundSingleton { metadata, .. }
5020 | HydroNode::Source { metadata, .. }
5021 | HydroNode::SingletonSource { metadata, .. }
5022 | HydroNode::CycleSource { metadata, .. }
5023 | HydroNode::Tee { metadata, .. }
5024 | HydroNode::Singleton { metadata, .. }
5025 | HydroNode::Partition { metadata, .. }
5026 | HydroNode::YieldConcat { metadata, .. }
5027 | HydroNode::BeginAtomic { metadata, .. }
5028 | HydroNode::EndAtomic { metadata, .. }
5029 | HydroNode::Batch { metadata, .. }
5030 | HydroNode::Chain { metadata, .. }
5031 | HydroNode::MergeOrdered { metadata, .. }
5032 | HydroNode::ChainFirst { metadata, .. }
5033 | HydroNode::CrossProduct { metadata, .. }
5034 | HydroNode::CrossSingleton { metadata, .. }
5035 | HydroNode::Join { metadata, .. }
5036 | HydroNode::JoinHalf { metadata, .. }
5037 | HydroNode::Difference { metadata, .. }
5038 | HydroNode::AntiJoin { metadata, .. }
5039 | HydroNode::ResolveFutures { metadata, .. }
5040 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5041 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5042 | HydroNode::Map { metadata, .. }
5043 | HydroNode::FlatMap { metadata, .. }
5044 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5045 | HydroNode::Filter { metadata, .. }
5046 | HydroNode::FilterMap { metadata, .. }
5047 | HydroNode::DeferTick { metadata, .. }
5048 | HydroNode::Enumerate { metadata, .. }
5049 | HydroNode::Inspect { metadata, .. }
5050 | HydroNode::Unique { metadata, .. }
5051 | HydroNode::Sort { metadata, .. }
5052 | HydroNode::Scan { metadata, .. }
5053 | HydroNode::ScanAsyncBlocking { metadata, .. }
5054 | HydroNode::Fold { metadata, .. }
5055 | HydroNode::FoldKeyed { metadata, .. }
5056 | HydroNode::Reduce { metadata, .. }
5057 | HydroNode::ReduceKeyed { metadata, .. }
5058 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5059 | HydroNode::ExternalInput { metadata, .. }
5060 | HydroNode::Network { metadata, .. }
5061 | HydroNode::Counter { metadata, .. } => metadata,
5062 }
5063 }
5064
5065 pub fn input(&self) -> Vec<&HydroNode> {
5066 match self {
5067 HydroNode::Placeholder => {
5068 panic!()
5069 }
5070 HydroNode::Source { .. }
5071 | HydroNode::SingletonSource { .. }
5072 | HydroNode::ExternalInput { .. }
5073 | HydroNode::CycleSource { .. }
5074 | HydroNode::Tee { .. }
5075 | HydroNode::Singleton { .. }
5076 | HydroNode::Partition { .. } => {
5077 vec![]
5079 }
5080 HydroNode::Cast { inner, .. }
5081 | HydroNode::ObserveNonDet { inner, .. }
5082 | HydroNode::YieldConcat { inner, .. }
5083 | HydroNode::BeginAtomic { inner, .. }
5084 | HydroNode::EndAtomic { inner, .. }
5085 | HydroNode::Batch { inner, .. }
5086 | HydroNode::UnboundSingleton { inner, .. }
5087 | HydroNode::AssertIsConsistent { inner, .. } => {
5088 vec![inner]
5089 }
5090 HydroNode::Chain { first, second, .. } => {
5091 vec![first, second]
5092 }
5093 HydroNode::MergeOrdered { first, second, .. } => {
5094 vec![first, second]
5095 }
5096 HydroNode::ChainFirst { first, second, .. } => {
5097 vec![first, second]
5098 }
5099 HydroNode::CrossProduct { left, right, .. }
5100 | HydroNode::CrossSingleton { left, right, .. }
5101 | HydroNode::Join { left, right, .. }
5102 | HydroNode::JoinHalf { left, right, .. } => {
5103 vec![left, right]
5104 }
5105 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5106 vec![pos, neg]
5107 }
5108 HydroNode::Map { input, .. }
5109 | HydroNode::FlatMap { input, .. }
5110 | HydroNode::FlatMapStreamBlocking { input, .. }
5111 | HydroNode::Filter { input, .. }
5112 | HydroNode::FilterMap { input, .. }
5113 | HydroNode::Sort { input, .. }
5114 | HydroNode::DeferTick { input, .. }
5115 | HydroNode::Enumerate { input, .. }
5116 | HydroNode::Inspect { input, .. }
5117 | HydroNode::Unique { input, .. }
5118 | HydroNode::Network { input, .. }
5119 | HydroNode::Counter { input, .. }
5120 | HydroNode::ResolveFutures { input, .. }
5121 | HydroNode::ResolveFuturesBlocking { input, .. }
5122 | HydroNode::ResolveFuturesOrdered { input, .. }
5123 | HydroNode::Fold { input, .. }
5124 | HydroNode::FoldKeyed { input, .. }
5125 | HydroNode::Reduce { input, .. }
5126 | HydroNode::ReduceKeyed { input, .. }
5127 | HydroNode::Scan { input, .. }
5128 | HydroNode::ScanAsyncBlocking { input, .. } => {
5129 vec![input]
5130 }
5131 HydroNode::ReduceKeyedWatermark {
5132 input, watermark, ..
5133 } => {
5134 vec![input, watermark]
5135 }
5136 }
5137 }
5138
5139 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5140 self.input()
5141 .iter()
5142 .map(|input_node| input_node.metadata())
5143 .collect()
5144 }
5145
5146 pub fn is_shared_with_others(&self) -> bool {
5150 match self {
5151 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5152 Rc::strong_count(&inner.0) > 1
5153 }
5154 HydroNode::Singleton { .. } => false,
5157 _ => false,
5158 }
5159 }
5160
5161 pub fn print_root(&self) -> String {
5162 match self {
5163 HydroNode::Placeholder => {
5164 panic!()
5165 }
5166 HydroNode::Cast { .. } => "Cast()".to_owned(),
5167 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5168 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5169 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5170 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5171 HydroNode::SingletonSource {
5172 value,
5173 first_tick_only,
5174 ..
5175 } => format!(
5176 "SingletonSource({:?}, first_tick_only={})",
5177 value, first_tick_only
5178 ),
5179 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5180 HydroNode::Tee { inner, .. } => {
5181 format!("Tee({})", inner.0.borrow().print_root())
5182 }
5183 HydroNode::Singleton { inner, .. } => {
5184 format!("Singleton({})", inner.0.borrow().print_root())
5185 }
5186 HydroNode::Partition { f, is_true, .. } => {
5187 format!("Partition({:?}, is_true={})", f, is_true)
5188 }
5189 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5190 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5191 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5192 HydroNode::Batch { .. } => "Batch()".to_owned(),
5193 HydroNode::Chain { first, second, .. } => {
5194 format!("Chain({}, {})", first.print_root(), second.print_root())
5195 }
5196 HydroNode::MergeOrdered { first, second, .. } => {
5197 format!(
5198 "MergeOrdered({}, {})",
5199 first.print_root(),
5200 second.print_root()
5201 )
5202 }
5203 HydroNode::ChainFirst { first, second, .. } => {
5204 format!(
5205 "ChainFirst({}, {})",
5206 first.print_root(),
5207 second.print_root()
5208 )
5209 }
5210 HydroNode::CrossProduct { left, right, .. } => {
5211 format!(
5212 "CrossProduct({}, {})",
5213 left.print_root(),
5214 right.print_root()
5215 )
5216 }
5217 HydroNode::CrossSingleton { left, right, .. } => {
5218 format!(
5219 "CrossSingleton({}, {})",
5220 left.print_root(),
5221 right.print_root()
5222 )
5223 }
5224 HydroNode::Join { left, right, .. } => {
5225 format!("Join({}, {})", left.print_root(), right.print_root())
5226 }
5227 HydroNode::JoinHalf { left, right, .. } => {
5228 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5229 }
5230 HydroNode::Difference { pos, neg, .. } => {
5231 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5232 }
5233 HydroNode::AntiJoin { pos, neg, .. } => {
5234 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5235 }
5236 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5237 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5238 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5239 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5240 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5241 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5242 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5243 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5244 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5245 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5246 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5247 HydroNode::Unique { .. } => "Unique()".to_owned(),
5248 HydroNode::Sort { .. } => "Sort()".to_owned(),
5249 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5250 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5251 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5252 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5253 }
5254 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5255 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5256 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5257 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5258 HydroNode::Network { .. } => "Network()".to_owned(),
5259 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5260 HydroNode::Counter { tag, duration, .. } => {
5261 format!("Counter({:?}, {:?})", tag, duration)
5262 }
5263 }
5264 }
5265}
5266
5267#[cfg(feature = "build")]
5268fn instantiate_network<'a, D>(
5269 env: &mut D::InstantiateEnv,
5270 from_location: &LocationId,
5271 to_location: &LocationId,
5272 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5273 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5274 name: Option<&str>,
5275 networking_info: &crate::networking::NetworkingInfo,
5276) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5277where
5278 D: Deploy<'a>,
5279{
5280 let ((sink, source), connect_fn) = match (from_location, to_location) {
5281 (&LocationId::Process(from), &LocationId::Process(to)) => {
5282 let from_node = processes
5283 .get(from)
5284 .unwrap_or_else(|| {
5285 panic!("A process used in the graph was not instantiated: {}", from)
5286 })
5287 .clone();
5288 let to_node = processes
5289 .get(to)
5290 .unwrap_or_else(|| {
5291 panic!("A process used in the graph was not instantiated: {}", to)
5292 })
5293 .clone();
5294
5295 let sink_port = from_node.next_port();
5296 let source_port = to_node.next_port();
5297
5298 (
5299 D::o2o_sink_source(
5300 env,
5301 &from_node,
5302 &sink_port,
5303 &to_node,
5304 &source_port,
5305 name,
5306 networking_info,
5307 ),
5308 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5309 )
5310 }
5311 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5312 let from_node = processes
5313 .get(from)
5314 .unwrap_or_else(|| {
5315 panic!("A process used in the graph was not instantiated: {}", from)
5316 })
5317 .clone();
5318 let to_node = clusters
5319 .get(to)
5320 .unwrap_or_else(|| {
5321 panic!("A cluster used in the graph was not instantiated: {}", to)
5322 })
5323 .clone();
5324
5325 let sink_port = from_node.next_port();
5326 let source_port = to_node.next_port();
5327
5328 (
5329 D::o2m_sink_source(
5330 env,
5331 &from_node,
5332 &sink_port,
5333 &to_node,
5334 &source_port,
5335 name,
5336 networking_info,
5337 ),
5338 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5339 )
5340 }
5341 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5342 let from_node = clusters
5343 .get(from)
5344 .unwrap_or_else(|| {
5345 panic!("A cluster used in the graph was not instantiated: {}", from)
5346 })
5347 .clone();
5348 let to_node = processes
5349 .get(to)
5350 .unwrap_or_else(|| {
5351 panic!("A process used in the graph was not instantiated: {}", to)
5352 })
5353 .clone();
5354
5355 let sink_port = from_node.next_port();
5356 let source_port = to_node.next_port();
5357
5358 (
5359 D::m2o_sink_source(
5360 env,
5361 &from_node,
5362 &sink_port,
5363 &to_node,
5364 &source_port,
5365 name,
5366 networking_info,
5367 ),
5368 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5369 )
5370 }
5371 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5372 let from_node = clusters
5373 .get(from)
5374 .unwrap_or_else(|| {
5375 panic!("A cluster used in the graph was not instantiated: {}", from)
5376 })
5377 .clone();
5378 let to_node = clusters
5379 .get(to)
5380 .unwrap_or_else(|| {
5381 panic!("A cluster used in the graph was not instantiated: {}", to)
5382 })
5383 .clone();
5384
5385 let sink_port = from_node.next_port();
5386 let source_port = to_node.next_port();
5387
5388 (
5389 D::m2m_sink_source(
5390 env,
5391 &from_node,
5392 &sink_port,
5393 &to_node,
5394 &source_port,
5395 name,
5396 networking_info,
5397 ),
5398 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5399 )
5400 }
5401 (LocationId::Tick(_, _), _) => panic!(),
5402 (_, LocationId::Tick(_, _)) => panic!(),
5403 (LocationId::Atomic(_), _) => panic!(),
5404 (_, LocationId::Atomic(_)) => panic!(),
5405 };
5406 (sink, source, connect_fn)
5407}
5408
5409#[cfg(test)]
5410mod serde_test;
5411
5412#[cfg(test)]
5413mod test {
5414 use std::mem::size_of;
5415
5416 use stageleft::{QuotedWithContext, q};
5417
5418 use super::*;
5419
5420 #[test]
5421 #[cfg_attr(
5422 not(feature = "build"),
5423 ignore = "expects inclusion of feature-gated fields"
5424 )]
5425 fn hydro_node_size() {
5426 assert_eq!(size_of::<HydroNode>(), 264);
5427 }
5428
5429 #[test]
5430 #[cfg_attr(
5431 not(feature = "build"),
5432 ignore = "expects inclusion of feature-gated fields"
5433 )]
5434 fn hydro_root_size() {
5435 assert_eq!(size_of::<HydroRoot>(), 136);
5436 }
5437
5438 #[test]
5439 fn test_simplify_q_macro_basic() {
5440 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5442 let result = simplify_q_macro(simple_expr.clone());
5443 assert_eq!(result, simple_expr);
5444 }
5445
5446 #[test]
5447 fn test_simplify_q_macro_actual_stageleft_call() {
5448 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5450 let result = simplify_q_macro(stageleft_call);
5451 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5454 }
5455
5456 #[test]
5457 fn test_closure_no_pipe_at_start() {
5458 let stageleft_call = q!({
5460 let foo = 123;
5461 move |b: usize| b + foo
5462 })
5463 .splice_fn1_ctx(&());
5464 let result = simplify_q_macro(stageleft_call);
5465 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5466 }
5467}