Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub expr: DebugExpr,
46    pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
47}
48
49impl Clone for ClosureExpr {
50    fn clone(&self) -> Self {
51        Self {
52            expr: self.expr.clone(),
53            singleton_refs: self
54                .singleton_refs
55                .iter()
56                .map(|(ident, node)| {
57                    let cloned_node = match node {
58                        HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
59                            inner: SharedNode(inner.0.clone()),
60                            metadata: metadata.clone(),
61                        },
62                        _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
63                    };
64                    (ident.clone(), cloned_node)
65                })
66                .collect(),
67        }
68    }
69}
70
71impl Hash for ClosureExpr {
72    fn hash<H: Hasher>(&self, state: &mut H) {
73        self.expr.hash(state);
74        // singleton_refs are structural children (like HydroIrMetadata), not
75        // identity-defining. Two closures with the same expr but different
76        // captured refs are the same closure text — the refs only affect codegen.
77    }
78}
79
80impl serde::Serialize for ClosureExpr {
81    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82        use serde::ser::SerializeStruct;
83        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
84        s.serialize_field("expr", &self.expr)?;
85        s.serialize_field(
86            "singleton_refs",
87            &SerializableSingletonRefs(&self.singleton_refs),
88        )?;
89        s.end()
90    }
91}
92
93struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
94
95impl serde::Serialize for SerializableSingletonRefs<'_> {
96    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
97        use serde::ser::SerializeSeq;
98        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
99        for (ident, node) in self.0 {
100            seq.serialize_element(&(ident.to_string(), node))?;
101        }
102        seq.end()
103    }
104}
105
106impl Debug for ClosureExpr {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        Debug::fmt(&self.expr, f)
109    }
110}
111
112impl Display for ClosureExpr {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        Display::fmt(&self.expr, f)
115    }
116}
117
118impl From<syn::Expr> for ClosureExpr {
119    fn from(expr: syn::Expr) -> Self {
120        Self {
121            expr: DebugExpr(Box::new(expr)),
122            singleton_refs: Vec::new(),
123        }
124    }
125}
126
127impl From<DebugExpr> for ClosureExpr {
128    fn from(expr: DebugExpr) -> Self {
129        Self {
130            expr,
131            singleton_refs: Vec::new(),
132        }
133    }
134}
135
136impl ClosureExpr {
137    pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
138        Self {
139            expr,
140            singleton_refs,
141        }
142    }
143
144    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
145        Self {
146            expr: self.expr.clone(),
147            singleton_refs: self
148                .singleton_refs
149                .iter()
150                .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
151                .collect(),
152        }
153    }
154
155    pub fn transform_children(
156        &mut self,
157        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
158        seen_tees: &mut SeenSharedNodes,
159    ) {
160        for (_ident, ref_node) in self.singleton_refs.iter_mut() {
161            transform(ref_node, seen_tees);
162        }
163    }
164
165    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
166    /// replacing local singleton ref idents with `#dfir_ident` references.
167    #[cfg(feature = "build")]
168    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
169        if self.singleton_refs.is_empty() {
170            self.expr.0.to_token_stream()
171        } else {
172            let ref_idents = (0..self.singleton_refs.len())
173                .map(|_| ident_stack.pop().unwrap())
174                .collect::<Vec<_>>()
175                .into_iter()
176                .rev()
177                .collect::<Vec<_>>();
178            let local_idents = self
179                .singleton_refs
180                .iter()
181                .map(|(local_ident, _)| local_ident);
182            let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
183            let expr = &self.expr.0;
184            quote! {
185                {
186                    #(
187                        let #local_idents = #hash #ref_idents;
188                    )*
189                    #expr
190                }
191            }
192        }
193    }
194}
195
196/// Wrapper that displays only the tokens of a parsed expr.
197///
198/// Boxes `syn::Type` which is ~240 bytes.
199#[derive(Clone, Hash)]
200pub struct DebugExpr(pub Box<syn::Expr>);
201
202impl serde::Serialize for DebugExpr {
203    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
204        serializer.serialize_str(&self.to_string())
205    }
206}
207
208impl From<syn::Expr> for DebugExpr {
209    fn from(expr: syn::Expr) -> Self {
210        Self(Box::new(expr))
211    }
212}
213
214impl Deref for DebugExpr {
215    type Target = syn::Expr;
216
217    fn deref(&self) -> &Self::Target {
218        &self.0
219    }
220}
221
222impl ToTokens for DebugExpr {
223    fn to_tokens(&self, tokens: &mut TokenStream) {
224        self.0.to_tokens(tokens);
225    }
226}
227
228impl Debug for DebugExpr {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        write!(f, "{}", self.0.to_token_stream())
231    }
232}
233
234impl Display for DebugExpr {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        let original = self.0.as_ref().clone();
237        let simplified = simplify_q_macro(original);
238
239        // For now, just use quote formatting without trying to parse as a statement
240        // This avoids the syn::parse_quote! issues entirely
241        write!(f, "q!({})", quote::quote!(#simplified))
242    }
243}
244
245/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
246fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
247    // Try to parse the token string as a syn::Expr
248    // Use a visitor to simplify q! macro expansions
249    let mut simplifier = QMacroSimplifier::new();
250    simplifier.visit_expr_mut(&mut expr);
251
252    // If we found and simplified a q! macro, return the simplified version
253    if let Some(simplified) = simplifier.simplified_result {
254        simplified
255    } else {
256        expr
257    }
258}
259
260/// AST visitor that simplifies q! macro expansions
261#[derive(Default)]
262pub struct QMacroSimplifier {
263    pub simplified_result: Option<syn::Expr>,
264}
265
266impl QMacroSimplifier {
267    pub fn new() -> Self {
268        Self::default()
269    }
270}
271
272impl VisitMut for QMacroSimplifier {
273    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
274        // Check if we already found a result to avoid further processing
275        if self.simplified_result.is_some() {
276            return;
277        }
278
279        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
280            // Look for calls to stageleft::runtime_support::fn*
281            && self.is_stageleft_runtime_support_call(&path_expr.path)
282            // Try to extract the closure from the arguments
283            && let Some(closure) = self.extract_closure_from_args(&call.args)
284        {
285            self.simplified_result = Some(closure);
286            return;
287        }
288
289        // Continue visiting child expressions using the default implementation
290        // Use the default visitor to avoid infinite recursion
291        syn::visit_mut::visit_expr_mut(self, expr);
292    }
293}
294
295impl QMacroSimplifier {
296    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
297        // Check if this is a call to stageleft::runtime_support::fn*
298        if let Some(last_segment) = path.segments.last() {
299            let fn_name = last_segment.ident.to_string();
300            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
301            fn_name.contains("_type_hint")
302                && path.segments.len() > 2
303                && path.segments[0].ident == "stageleft"
304                && path.segments[1].ident == "runtime_support"
305        } else {
306            false
307        }
308    }
309
310    fn extract_closure_from_args(
311        &self,
312        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
313    ) -> Option<syn::Expr> {
314        // Look through the arguments for a closure expression
315        for arg in args {
316            if let syn::Expr::Closure(_) = arg {
317                return Some(arg.clone());
318            }
319            // Also check for closures nested in other expressions (like blocks)
320            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
321                return Some(closure_expr);
322            }
323        }
324        None
325    }
326
327    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
328        let mut visitor = ClosureFinder {
329            found_closure: None,
330            prefer_inner_blocks: true,
331        };
332        visitor.visit_expr(expr);
333        visitor.found_closure
334    }
335}
336
337/// Visitor that finds closures in expressions with special block handling
338struct ClosureFinder {
339    found_closure: Option<syn::Expr>,
340    prefer_inner_blocks: bool,
341}
342
343impl<'ast> Visit<'ast> for ClosureFinder {
344    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
345        // If we already found a closure, don't continue searching
346        if self.found_closure.is_some() {
347            return;
348        }
349
350        match expr {
351            syn::Expr::Closure(_) => {
352                self.found_closure = Some(expr.clone());
353            }
354            syn::Expr::Block(block) if self.prefer_inner_blocks => {
355                // Special handling for blocks - look for inner blocks that contain closures
356                for stmt in &block.block.stmts {
357                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
358                        && let syn::Expr::Block(_) = stmt_expr
359                    {
360                        // Check if this nested block contains a closure
361                        let mut inner_visitor = ClosureFinder {
362                            found_closure: None,
363                            prefer_inner_blocks: false, // Avoid infinite recursion
364                        };
365                        inner_visitor.visit_expr(stmt_expr);
366                        if inner_visitor.found_closure.is_some() {
367                            // Found a closure in an inner block, return that block
368                            self.found_closure = Some(stmt_expr.clone());
369                            return;
370                        }
371                    }
372                }
373
374                // If no inner block with closure found, continue with normal visitation
375                visit::visit_expr(self, expr);
376
377                // If we found a closure, just return the closure itself, not the whole block
378                // unless we're in the special case where we want the containing block
379                if self.found_closure.is_some() {
380                    // The closure was found during visitation, no need to wrap in block
381                }
382            }
383            _ => {
384                // Use default visitor behavior for all other expressions
385                visit::visit_expr(self, expr);
386            }
387        }
388    }
389}
390
391/// Debug displays the type's tokens.
392///
393/// Boxes `syn::Type` which is ~320 bytes.
394#[derive(Clone, PartialEq, Eq, Hash)]
395pub struct DebugType(pub Box<syn::Type>);
396
397impl From<syn::Type> for DebugType {
398    fn from(t: syn::Type) -> Self {
399        Self(Box::new(t))
400    }
401}
402
403impl Deref for DebugType {
404    type Target = syn::Type;
405
406    fn deref(&self) -> &Self::Target {
407        &self.0
408    }
409}
410
411impl ToTokens for DebugType {
412    fn to_tokens(&self, tokens: &mut TokenStream) {
413        self.0.to_tokens(tokens);
414    }
415}
416
417impl Debug for DebugType {
418    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419        write!(f, "{}", self.0.to_token_stream())
420    }
421}
422
423impl serde::Serialize for DebugType {
424    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
426    }
427}
428
429fn serialize_backtrace_as_span<S: serde::Serializer>(
430    backtrace: &Backtrace,
431    serializer: S,
432) -> Result<S::Ok, S::Error> {
433    match backtrace.format_span() {
434        Some(span) => serializer.serialize_some(&span),
435        None => serializer.serialize_none(),
436    }
437}
438
439fn serialize_ident<S: serde::Serializer>(
440    ident: &syn::Ident,
441    serializer: S,
442) -> Result<S::Ok, S::Error> {
443    serializer.serialize_str(&ident.to_string())
444}
445
446pub enum DebugInstantiate {
447    Building,
448    Finalized(Box<DebugInstantiateFinalized>),
449}
450
451impl serde::Serialize for DebugInstantiate {
452    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
453        match self {
454            DebugInstantiate::Building => {
455                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
456            }
457            DebugInstantiate::Finalized(_) => {
458                panic!(
459                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
460                )
461            }
462        }
463    }
464}
465
466#[cfg_attr(
467    not(feature = "build"),
468    expect(
469        dead_code,
470        reason = "sink, source unused without `feature = \"build\"`."
471    )
472)]
473pub struct DebugInstantiateFinalized {
474    sink: syn::Expr,
475    source: syn::Expr,
476    connect_fn: Option<Box<dyn FnOnce()>>,
477}
478
479impl From<DebugInstantiateFinalized> for DebugInstantiate {
480    fn from(f: DebugInstantiateFinalized) -> Self {
481        Self::Finalized(Box::new(f))
482    }
483}
484
485impl Debug for DebugInstantiate {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        write!(f, "<network instantiate>")
488    }
489}
490
491impl Hash for DebugInstantiate {
492    fn hash<H: Hasher>(&self, _state: &mut H) {
493        // Do nothing
494    }
495}
496
497impl Clone for DebugInstantiate {
498    fn clone(&self) -> Self {
499        match self {
500            DebugInstantiate::Building => DebugInstantiate::Building,
501            DebugInstantiate::Finalized(_) => {
502                panic!("DebugInstantiate::Finalized should not be cloned")
503            }
504        }
505    }
506}
507
508/// Tracks the instantiation state of a `ClusterMembers` source.
509///
510/// During `compile_network`, the first `ClusterMembers` node for a given
511/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
512/// receives the expression returned by `Deploy::cluster_membership_stream`.
513/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
514/// during code-gen they simply reference the tee output of the first node
515/// instead of creating a redundant `source_stream`.
516#[derive(Debug, Hash, Clone, serde::Serialize)]
517pub enum ClusterMembersState {
518    /// Not yet instantiated.
519    Uninit,
520    /// The primary instance: holds the stream expression and will emit
521    /// `source_stream(expr) -> tee()` during code-gen.
522    Stream(DebugExpr),
523    /// A secondary instance that references the tee output of the primary.
524    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
525    /// can derive the deterministic tee ident without extra state.
526    Tee(LocationId, LocationId),
527}
528
529/// A source in a Hydro graph, where data enters the graph.
530#[derive(Debug, Hash, Clone, serde::Serialize)]
531pub enum HydroSource {
532    Stream(DebugExpr),
533    ExternalNetwork(),
534    Iter(DebugExpr),
535    Spin(),
536    ClusterMembers(LocationId, ClusterMembersState),
537    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
538    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
539}
540
541#[cfg(feature = "build")]
542/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
543/// and simulations.
544///
545/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
546/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
547pub trait DfirBuilder {
548    /// Whether the representation of singletons should include intermediate states.
549    fn singleton_intermediates(&self) -> bool;
550
551    /// Gets the DFIR builder for the given location, creating it if necessary.
552    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
553
554    #[expect(clippy::too_many_arguments, reason = "TODO")]
555    fn batch(
556        &mut self,
557        in_ident: syn::Ident,
558        in_location: &LocationId,
559        in_kind: &CollectionKind,
560        out_ident: &syn::Ident,
561        out_location: &LocationId,
562        op_meta: &HydroIrOpMetadata,
563        fold_hooked_idents: &HashSet<String>,
564    );
565    fn yield_from_tick(
566        &mut self,
567        in_ident: syn::Ident,
568        in_location: &LocationId,
569        in_kind: &CollectionKind,
570        out_ident: &syn::Ident,
571        out_location: &LocationId,
572    );
573
574    fn begin_atomic(
575        &mut self,
576        in_ident: syn::Ident,
577        in_location: &LocationId,
578        in_kind: &CollectionKind,
579        out_ident: &syn::Ident,
580        out_location: &LocationId,
581        op_meta: &HydroIrOpMetadata,
582    );
583    fn end_atomic(
584        &mut self,
585        in_ident: syn::Ident,
586        in_location: &LocationId,
587        in_kind: &CollectionKind,
588        out_ident: &syn::Ident,
589    );
590
591    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
592    fn observe_nondet(
593        &mut self,
594        trusted: bool,
595        location: &LocationId,
596        in_ident: syn::Ident,
597        in_kind: &CollectionKind,
598        out_ident: &syn::Ident,
599        out_kind: &CollectionKind,
600        op_meta: &HydroIrOpMetadata,
601    );
602
603    #[expect(clippy::too_many_arguments, reason = "TODO")]
604    fn merge_ordered(
605        &mut self,
606        location: &LocationId,
607        first_ident: syn::Ident,
608        second_ident: syn::Ident,
609        out_ident: &syn::Ident,
610        in_kind: &CollectionKind,
611        op_meta: &HydroIrOpMetadata,
612        operator_tag: Option<&str>,
613    );
614
615    #[expect(clippy::too_many_arguments, reason = "TODO")]
616    fn create_network(
617        &mut self,
618        from: &LocationId,
619        to: &LocationId,
620        input_ident: syn::Ident,
621        out_ident: &syn::Ident,
622        serialize: Option<&DebugExpr>,
623        sink: syn::Expr,
624        source: syn::Expr,
625        deserialize: Option<&DebugExpr>,
626        tag_id: StmtId,
627        networking_info: &crate::networking::NetworkingInfo,
628    );
629
630    fn create_external_source(
631        &mut self,
632        on: &LocationId,
633        source_expr: syn::Expr,
634        out_ident: &syn::Ident,
635        deserialize: Option<&DebugExpr>,
636        tag_id: StmtId,
637    );
638
639    fn create_external_output(
640        &mut self,
641        on: &LocationId,
642        sink_expr: syn::Expr,
643        input_ident: &syn::Ident,
644        serialize: Option<&DebugExpr>,
645        tag_id: StmtId,
646    );
647
648    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
649    /// Returns the new input ident to use for the fold if a hook was emitted.
650    fn emit_fold_hook(
651        &mut self,
652        location: &LocationId,
653        in_ident: &syn::Ident,
654        in_kind: &CollectionKind,
655        op_meta: &HydroIrOpMetadata,
656    ) -> Option<syn::Ident>;
657
658    /// Inserts necessary code to validate a manual assertion that at this point the
659    /// input live collection is consistent. In production, this is a no-op, but in simulation
660    /// this will (not yet implemented) inject assertions that validate consistency.
661    fn assert_is_consistent(
662        &mut self,
663        trusted: bool,
664        location: &LocationId,
665        in_ident: syn::Ident,
666        out_ident: &syn::Ident,
667    );
668}
669
670#[cfg(feature = "build")]
671impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
672    fn singleton_intermediates(&self) -> bool {
673        false
674    }
675
676    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
677        self.entry(location.root().key())
678            .expect("location was removed")
679            .or_default()
680    }
681
682    fn batch(
683        &mut self,
684        in_ident: syn::Ident,
685        in_location: &LocationId,
686        in_kind: &CollectionKind,
687        out_ident: &syn::Ident,
688        _out_location: &LocationId,
689        _op_meta: &HydroIrOpMetadata,
690        _fold_hooked_idents: &HashSet<String>,
691    ) {
692        let builder = self.get_dfir_mut(in_location.root());
693        if in_kind.is_bounded()
694            && matches!(
695                in_kind,
696                CollectionKind::Singleton { .. }
697                    | CollectionKind::Optional { .. }
698                    | CollectionKind::KeyedSingleton { .. }
699            )
700        {
701            assert!(in_location.is_top_level());
702            builder.add_dfir(
703                parse_quote! {
704                    #out_ident = #in_ident -> persist::<'static>();
705                },
706                None,
707                None,
708            );
709        } else {
710            builder.add_dfir(
711                parse_quote! {
712                    #out_ident = #in_ident;
713                },
714                None,
715                None,
716            );
717        }
718    }
719
720    fn yield_from_tick(
721        &mut self,
722        in_ident: syn::Ident,
723        in_location: &LocationId,
724        _in_kind: &CollectionKind,
725        out_ident: &syn::Ident,
726        _out_location: &LocationId,
727    ) {
728        let builder = self.get_dfir_mut(in_location.root());
729        builder.add_dfir(
730            parse_quote! {
731                #out_ident = #in_ident;
732            },
733            None,
734            None,
735        );
736    }
737
738    fn begin_atomic(
739        &mut self,
740        in_ident: syn::Ident,
741        in_location: &LocationId,
742        _in_kind: &CollectionKind,
743        out_ident: &syn::Ident,
744        _out_location: &LocationId,
745        _op_meta: &HydroIrOpMetadata,
746    ) {
747        let builder = self.get_dfir_mut(in_location.root());
748        builder.add_dfir(
749            parse_quote! {
750                #out_ident = #in_ident;
751            },
752            None,
753            None,
754        );
755    }
756
757    fn end_atomic(
758        &mut self,
759        in_ident: syn::Ident,
760        in_location: &LocationId,
761        _in_kind: &CollectionKind,
762        out_ident: &syn::Ident,
763    ) {
764        let builder = self.get_dfir_mut(in_location.root());
765        builder.add_dfir(
766            parse_quote! {
767                #out_ident = #in_ident;
768            },
769            None,
770            None,
771        );
772    }
773
774    fn observe_nondet(
775        &mut self,
776        _trusted: bool,
777        location: &LocationId,
778        in_ident: syn::Ident,
779        _in_kind: &CollectionKind,
780        out_ident: &syn::Ident,
781        _out_kind: &CollectionKind,
782        _op_meta: &HydroIrOpMetadata,
783    ) {
784        let builder = self.get_dfir_mut(location);
785        builder.add_dfir(
786            parse_quote! {
787                #out_ident = #in_ident;
788            },
789            None,
790            None,
791        );
792    }
793
794    fn merge_ordered(
795        &mut self,
796        location: &LocationId,
797        first_ident: syn::Ident,
798        second_ident: syn::Ident,
799        out_ident: &syn::Ident,
800        _in_kind: &CollectionKind,
801        _op_meta: &HydroIrOpMetadata,
802        operator_tag: Option<&str>,
803    ) {
804        let builder = self.get_dfir_mut(location);
805        builder.add_dfir(
806            parse_quote! {
807                #out_ident = union();
808                #first_ident -> [0]#out_ident;
809                #second_ident -> [1]#out_ident;
810            },
811            None,
812            operator_tag,
813        );
814    }
815
816    fn create_network(
817        &mut self,
818        from: &LocationId,
819        to: &LocationId,
820        input_ident: syn::Ident,
821        out_ident: &syn::Ident,
822        serialize: Option<&DebugExpr>,
823        sink: syn::Expr,
824        source: syn::Expr,
825        deserialize: Option<&DebugExpr>,
826        tag_id: StmtId,
827        _networking_info: &crate::networking::NetworkingInfo,
828    ) {
829        let sender_builder = self.get_dfir_mut(from);
830        if let Some(serialize_pipeline) = serialize {
831            sender_builder.add_dfir(
832                parse_quote! {
833                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
834                },
835                None,
836                // operator tag separates send and receive, which otherwise have the same next_stmt_id
837                Some(&format!("send{}", tag_id)),
838            );
839        } else {
840            sender_builder.add_dfir(
841                parse_quote! {
842                    #input_ident -> dest_sink(#sink);
843                },
844                None,
845                Some(&format!("send{}", tag_id)),
846            );
847        }
848
849        let receiver_builder = self.get_dfir_mut(to);
850        if let Some(deserialize_pipeline) = deserialize {
851            receiver_builder.add_dfir(
852                parse_quote! {
853                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
854                },
855                None,
856                Some(&format!("recv{}", tag_id)),
857            );
858        } else {
859            receiver_builder.add_dfir(
860                parse_quote! {
861                    #out_ident = source_stream(#source);
862                },
863                None,
864                Some(&format!("recv{}", tag_id)),
865            );
866        }
867    }
868
869    fn create_external_source(
870        &mut self,
871        on: &LocationId,
872        source_expr: syn::Expr,
873        out_ident: &syn::Ident,
874        deserialize: Option<&DebugExpr>,
875        tag_id: StmtId,
876    ) {
877        let receiver_builder = self.get_dfir_mut(on);
878        if let Some(deserialize_pipeline) = deserialize {
879            receiver_builder.add_dfir(
880                parse_quote! {
881                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
882                },
883                None,
884                Some(&format!("recv{}", tag_id)),
885            );
886        } else {
887            receiver_builder.add_dfir(
888                parse_quote! {
889                    #out_ident = source_stream(#source_expr);
890                },
891                None,
892                Some(&format!("recv{}", tag_id)),
893            );
894        }
895    }
896
897    fn create_external_output(
898        &mut self,
899        on: &LocationId,
900        sink_expr: syn::Expr,
901        input_ident: &syn::Ident,
902        serialize: Option<&DebugExpr>,
903        tag_id: StmtId,
904    ) {
905        let sender_builder = self.get_dfir_mut(on);
906        if let Some(serialize_fn) = serialize {
907            sender_builder.add_dfir(
908                parse_quote! {
909                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
910                },
911                None,
912                // operator tag separates send and receive, which otherwise have the same next_stmt_id
913                Some(&format!("send{}", tag_id)),
914            );
915        } else {
916            sender_builder.add_dfir(
917                parse_quote! {
918                    #input_ident -> dest_sink(#sink_expr);
919                },
920                None,
921                Some(&format!("send{}", tag_id)),
922            );
923        }
924    }
925
926    fn emit_fold_hook(
927        &mut self,
928        _location: &LocationId,
929        _in_ident: &syn::Ident,
930        _in_kind: &CollectionKind,
931        _op_meta: &HydroIrOpMetadata,
932    ) -> Option<syn::Ident> {
933        None
934    }
935
936    fn assert_is_consistent(
937        &mut self,
938        _trusted: bool,
939        location: &LocationId,
940        in_ident: syn::Ident,
941        out_ident: &syn::Ident,
942    ) {
943        let builder = self.get_dfir_mut(location);
944        builder.add_dfir(
945            parse_quote! {
946                #out_ident = #in_ident;
947            },
948            None,
949            None,
950        );
951    }
952}
953
954#[cfg(feature = "build")]
955pub enum BuildersOrCallback<'a, L, N>
956where
957    L: FnMut(&mut HydroRoot, &mut StmtId),
958    N: FnMut(&mut HydroNode, &mut StmtId),
959{
960    Builders(&'a mut dyn DfirBuilder),
961    Callback(L, N),
962}
963
964/// An root in a Hydro graph, which is an pipeline that doesn't emit
965/// any downstream values. Traversals over the dataflow graph and
966/// generating DFIR IR start from roots.
967#[derive(Debug, Hash, serde::Serialize)]
968pub enum HydroRoot {
969    ForEach {
970        f: DebugExpr,
971        input: Box<HydroNode>,
972        op_metadata: HydroIrOpMetadata,
973    },
974    SendExternal {
975        to_external_key: LocationKey,
976        to_port_id: ExternalPortId,
977        to_many: bool,
978        unpaired: bool,
979        serialize_fn: Option<DebugExpr>,
980        instantiate_fn: DebugInstantiate,
981        input: Box<HydroNode>,
982        op_metadata: HydroIrOpMetadata,
983    },
984    DestSink {
985        sink: DebugExpr,
986        input: Box<HydroNode>,
987        op_metadata: HydroIrOpMetadata,
988    },
989    CycleSink {
990        cycle_id: CycleId,
991        input: Box<HydroNode>,
992        op_metadata: HydroIrOpMetadata,
993    },
994    EmbeddedOutput {
995        #[serde(serialize_with = "serialize_ident")]
996        ident: syn::Ident,
997        input: Box<HydroNode>,
998        op_metadata: HydroIrOpMetadata,
999    },
1000    Null {
1001        input: Box<HydroNode>,
1002        op_metadata: HydroIrOpMetadata,
1003    },
1004}
1005
1006impl HydroRoot {
1007    #[cfg(feature = "build")]
1008    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1009    pub fn compile_network<'a, D>(
1010        &mut self,
1011        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1012        seen_tees: &mut SeenSharedNodes,
1013        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1014        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1015        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1016        externals: &SparseSecondaryMap<LocationKey, D::External>,
1017        env: &mut D::InstantiateEnv,
1018    ) where
1019        D: Deploy<'a>,
1020    {
1021        let refcell_extra_stmts = RefCell::new(extra_stmts);
1022        let refcell_env = RefCell::new(env);
1023        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1024        self.transform_bottom_up(
1025            &mut |l| {
1026                if let HydroRoot::SendExternal {
1027                    input,
1028                    to_external_key,
1029                    to_port_id,
1030                    to_many,
1031                    unpaired,
1032                    instantiate_fn,
1033                    ..
1034                } = l
1035                {
1036                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1037                        DebugInstantiate::Building => {
1038                            let to_node = externals
1039                                .get(*to_external_key)
1040                                .unwrap_or_else(|| {
1041                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1042                                })
1043                                .clone();
1044
1045                            match input.metadata().location_id.root() {
1046                                &LocationId::Process(process_key) => {
1047                                    if *to_many {
1048                                        (
1049                                            (
1050                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1051                                                parse_quote!(DUMMY),
1052                                            ),
1053                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1054                                        )
1055                                    } else {
1056                                        let from_node = processes
1057                                            .get(process_key)
1058                                            .unwrap_or_else(|| {
1059                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1060                                            })
1061                                            .clone();
1062
1063                                        let sink_port = from_node.next_port();
1064                                        let source_port = to_node.next_port();
1065
1066                                        if *unpaired {
1067                                            use stageleft::quote_type;
1068                                            use tokio_util::codec::LengthDelimitedCodec;
1069
1070                                            to_node.register(*to_port_id, source_port.clone());
1071
1072                                            let _ = D::e2o_source(
1073                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1074                                                &to_node, &source_port,
1075                                                &from_node, &sink_port,
1076                                                &quote_type::<LengthDelimitedCodec>(),
1077                                                format!("{}_{}", *to_external_key, *to_port_id)
1078                                            );
1079                                        }
1080
1081                                        (
1082                                            (
1083                                                D::o2e_sink(
1084                                                    &from_node,
1085                                                    &sink_port,
1086                                                    &to_node,
1087                                                    &source_port,
1088                                                    format!("{}_{}", *to_external_key, *to_port_id)
1089                                                ),
1090                                                parse_quote!(DUMMY),
1091                                            ),
1092                                            if *unpaired {
1093                                                D::e2o_connect(
1094                                                    &to_node,
1095                                                    &source_port,
1096                                                    &from_node,
1097                                                    &sink_port,
1098                                                    *to_many,
1099                                                    NetworkHint::Auto,
1100                                                )
1101                                            } else {
1102                                                Box::new(|| {}) as Box<dyn FnOnce()>
1103                                            },
1104                                        )
1105                                    }
1106                                }
1107                                LocationId::Cluster(cluster_key) => {
1108                                    let from_node = clusters
1109                                        .get(*cluster_key)
1110                                        .unwrap_or_else(|| {
1111                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1112                                        })
1113                                        .clone();
1114
1115                                    let sink_port = from_node.next_port();
1116                                    let source_port = to_node.next_port();
1117
1118                                    if *unpaired {
1119                                        to_node.register(*to_port_id, source_port.clone());
1120                                    }
1121
1122                                    (
1123                                        (
1124                                            D::m2e_sink(
1125                                                &from_node,
1126                                                &sink_port,
1127                                                &to_node,
1128                                                &source_port,
1129                                                format!("{}_{}", *to_external_key, *to_port_id)
1130                                            ),
1131                                            parse_quote!(DUMMY),
1132                                        ),
1133                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1134                                    )
1135                                }
1136                                _ => panic!()
1137                            }
1138                        },
1139
1140                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1141                    };
1142
1143                    *instantiate_fn = DebugInstantiateFinalized {
1144                        sink: sink_expr,
1145                        source: source_expr,
1146                        connect_fn: Some(connect_fn),
1147                    }
1148                    .into();
1149                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1150                    let element_type = match &input.metadata().collection_kind {
1151                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1152                        _ => panic!("Embedded output must have Stream collection kind"),
1153                    };
1154                    let location_key = match input.metadata().location_id.root() {
1155                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1156                        _ => panic!("Embedded output must be on a process or cluster"),
1157                    };
1158                    D::register_embedded_output(
1159                        &mut refcell_env.borrow_mut(),
1160                        location_key,
1161                        ident,
1162                        &element_type,
1163                    );
1164                }
1165            },
1166            &mut |n| {
1167                if let HydroNode::Network {
1168                    name,
1169                    networking_info,
1170                    input,
1171                    instantiate_fn,
1172                    metadata,
1173                    ..
1174                } = n
1175                {
1176                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1177                        DebugInstantiate::Building => instantiate_network::<D>(
1178                            &mut refcell_env.borrow_mut(),
1179                            input.metadata().location_id.root(),
1180                            metadata.location_id.root(),
1181                            processes,
1182                            clusters,
1183                            name.as_deref(),
1184                            networking_info,
1185                        ),
1186
1187                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1188                    };
1189
1190                    *instantiate_fn = DebugInstantiateFinalized {
1191                        sink: sink_expr,
1192                        source: source_expr,
1193                        connect_fn: Some(connect_fn),
1194                    }
1195                    .into();
1196                } else if let HydroNode::ExternalInput {
1197                    from_external_key,
1198                    from_port_id,
1199                    from_many,
1200                    codec_type,
1201                    port_hint,
1202                    instantiate_fn,
1203                    metadata,
1204                    ..
1205                } = n
1206                {
1207                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1208                        DebugInstantiate::Building => {
1209                            let from_node = externals
1210                                .get(*from_external_key)
1211                                .unwrap_or_else(|| {
1212                                    panic!(
1213                                        "A external used in the graph was not instantiated: {}",
1214                                        from_external_key,
1215                                    )
1216                                })
1217                                .clone();
1218
1219                            match metadata.location_id.root() {
1220                                &LocationId::Process(process_key) => {
1221                                    let to_node = processes
1222                                        .get(process_key)
1223                                        .unwrap_or_else(|| {
1224                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1225                                        })
1226                                        .clone();
1227
1228                                    let sink_port = from_node.next_port();
1229                                    let source_port = to_node.next_port();
1230
1231                                    from_node.register(*from_port_id, sink_port.clone());
1232
1233                                    (
1234                                        (
1235                                            parse_quote!(DUMMY),
1236                                            if *from_many {
1237                                                D::e2o_many_source(
1238                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1239                                                    &to_node, &source_port,
1240                                                    codec_type.0.as_ref(),
1241                                                    format!("{}_{}", *from_external_key, *from_port_id)
1242                                                )
1243                                            } else {
1244                                                D::e2o_source(
1245                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1246                                                    &from_node, &sink_port,
1247                                                    &to_node, &source_port,
1248                                                    codec_type.0.as_ref(),
1249                                                    format!("{}_{}", *from_external_key, *from_port_id)
1250                                                )
1251                                            },
1252                                        ),
1253                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1254                                    )
1255                                }
1256                                LocationId::Cluster(cluster_key) => {
1257                                    let to_node = clusters
1258                                        .get(*cluster_key)
1259                                        .unwrap_or_else(|| {
1260                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1261                                        })
1262                                        .clone();
1263
1264                                    let sink_port = from_node.next_port();
1265                                    let source_port = to_node.next_port();
1266
1267                                    from_node.register(*from_port_id, sink_port.clone());
1268
1269                                    (
1270                                        (
1271                                            parse_quote!(DUMMY),
1272                                            D::e2m_source(
1273                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1274                                                &from_node, &sink_port,
1275                                                &to_node, &source_port,
1276                                                codec_type.0.as_ref(),
1277                                                format!("{}_{}", *from_external_key, *from_port_id)
1278                                            ),
1279                                        ),
1280                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1281                                    )
1282                                }
1283                                _ => panic!()
1284                            }
1285                        },
1286
1287                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1288                    };
1289
1290                    *instantiate_fn = DebugInstantiateFinalized {
1291                        sink: sink_expr,
1292                        source: source_expr,
1293                        connect_fn: Some(connect_fn),
1294                    }
1295                    .into();
1296                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1297                    let element_type = match &metadata.collection_kind {
1298                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1299                        _ => panic!("Embedded source must have Stream collection kind"),
1300                    };
1301                    let location_key = match metadata.location_id.root() {
1302                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1303                        _ => panic!("Embedded source must be on a process or cluster"),
1304                    };
1305                    D::register_embedded_stream_input(
1306                        &mut refcell_env.borrow_mut(),
1307                        location_key,
1308                        ident,
1309                        &element_type,
1310                    );
1311                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1312                    let element_type = match &metadata.collection_kind {
1313                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1314                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1315                    };
1316                    let location_key = match metadata.location_id.root() {
1317                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1318                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1319                    };
1320                    D::register_embedded_singleton_input(
1321                        &mut refcell_env.borrow_mut(),
1322                        location_key,
1323                        ident,
1324                        &element_type,
1325                    );
1326                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1327                    match state {
1328                        ClusterMembersState::Uninit => {
1329                            let at_location = metadata.location_id.root().clone();
1330                            let key = (at_location.clone(), location_id.key());
1331                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1332                                // First occurrence: call cluster_membership_stream and mark as Stream.
1333                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1334                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1335                                    &(),
1336                                );
1337                                *state = ClusterMembersState::Stream(expr.into());
1338                            } else {
1339                                // Already instantiated for this (at, target) pair: just tee.
1340                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1341                            }
1342                        }
1343                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1344                            panic!("cluster members already finalized");
1345                        }
1346                    }
1347                }
1348            },
1349            seen_tees,
1350            false,
1351        );
1352    }
1353
1354    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1355        self.transform_bottom_up(
1356            &mut |l| {
1357                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1358                    match instantiate_fn {
1359                        DebugInstantiate::Building => panic!("network not built"),
1360
1361                        DebugInstantiate::Finalized(finalized) => {
1362                            (finalized.connect_fn.take().unwrap())();
1363                        }
1364                    }
1365                }
1366            },
1367            &mut |n| {
1368                if let HydroNode::Network { instantiate_fn, .. }
1369                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1370                {
1371                    match instantiate_fn {
1372                        DebugInstantiate::Building => panic!("network not built"),
1373
1374                        DebugInstantiate::Finalized(finalized) => {
1375                            (finalized.connect_fn.take().unwrap())();
1376                        }
1377                    }
1378                }
1379            },
1380            seen_tees,
1381            false,
1382        );
1383    }
1384
1385    pub fn transform_bottom_up(
1386        &mut self,
1387        transform_root: &mut impl FnMut(&mut HydroRoot),
1388        transform_node: &mut impl FnMut(&mut HydroNode),
1389        seen_tees: &mut SeenSharedNodes,
1390        check_well_formed: bool,
1391    ) {
1392        self.transform_children(
1393            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1394            seen_tees,
1395        );
1396
1397        transform_root(self);
1398    }
1399
1400    pub fn transform_children(
1401        &mut self,
1402        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1403        seen_tees: &mut SeenSharedNodes,
1404    ) {
1405        match self {
1406            HydroRoot::ForEach { input, .. }
1407            | HydroRoot::SendExternal { input, .. }
1408            | HydroRoot::DestSink { input, .. }
1409            | HydroRoot::CycleSink { input, .. }
1410            | HydroRoot::EmbeddedOutput { input, .. }
1411            | HydroRoot::Null { input, .. } => {
1412                transform(input, seen_tees);
1413            }
1414        }
1415    }
1416
1417    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1418        match self {
1419            HydroRoot::ForEach {
1420                f,
1421                input,
1422                op_metadata,
1423            } => HydroRoot::ForEach {
1424                f: f.clone(),
1425                input: Box::new(input.deep_clone(seen_tees)),
1426                op_metadata: op_metadata.clone(),
1427            },
1428            HydroRoot::SendExternal {
1429                to_external_key,
1430                to_port_id,
1431                to_many,
1432                unpaired,
1433                serialize_fn,
1434                instantiate_fn,
1435                input,
1436                op_metadata,
1437            } => HydroRoot::SendExternal {
1438                to_external_key: *to_external_key,
1439                to_port_id: *to_port_id,
1440                to_many: *to_many,
1441                unpaired: *unpaired,
1442                serialize_fn: serialize_fn.clone(),
1443                instantiate_fn: instantiate_fn.clone(),
1444                input: Box::new(input.deep_clone(seen_tees)),
1445                op_metadata: op_metadata.clone(),
1446            },
1447            HydroRoot::DestSink {
1448                sink,
1449                input,
1450                op_metadata,
1451            } => HydroRoot::DestSink {
1452                sink: sink.clone(),
1453                input: Box::new(input.deep_clone(seen_tees)),
1454                op_metadata: op_metadata.clone(),
1455            },
1456            HydroRoot::CycleSink {
1457                cycle_id,
1458                input,
1459                op_metadata,
1460            } => HydroRoot::CycleSink {
1461                cycle_id: *cycle_id,
1462                input: Box::new(input.deep_clone(seen_tees)),
1463                op_metadata: op_metadata.clone(),
1464            },
1465            HydroRoot::EmbeddedOutput {
1466                ident,
1467                input,
1468                op_metadata,
1469            } => HydroRoot::EmbeddedOutput {
1470                ident: ident.clone(),
1471                input: Box::new(input.deep_clone(seen_tees)),
1472                op_metadata: op_metadata.clone(),
1473            },
1474            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1475                input: Box::new(input.deep_clone(seen_tees)),
1476                op_metadata: op_metadata.clone(),
1477            },
1478        }
1479    }
1480
1481    #[cfg(feature = "build")]
1482    pub fn emit(
1483        &mut self,
1484        graph_builders: &mut dyn DfirBuilder,
1485        seen_tees: &mut SeenSharedNodes,
1486        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1487        next_stmt_id: &mut StmtId,
1488        fold_hooked_idents: &mut HashSet<String>,
1489    ) {
1490        self.emit_core(
1491            &mut BuildersOrCallback::<
1492                fn(&mut HydroRoot, &mut StmtId),
1493                fn(&mut HydroNode, &mut StmtId),
1494            >::Builders(graph_builders),
1495            seen_tees,
1496            built_tees,
1497            next_stmt_id,
1498            fold_hooked_idents,
1499        );
1500    }
1501
1502    #[cfg(feature = "build")]
1503    pub fn emit_core(
1504        &mut self,
1505        builders_or_callback: &mut BuildersOrCallback<
1506            impl FnMut(&mut HydroRoot, &mut StmtId),
1507            impl FnMut(&mut HydroNode, &mut StmtId),
1508        >,
1509        seen_tees: &mut SeenSharedNodes,
1510        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1511        next_stmt_id: &mut StmtId,
1512        fold_hooked_idents: &mut HashSet<String>,
1513    ) {
1514        match self {
1515            HydroRoot::ForEach { f, input, .. } => {
1516                let input_ident = input.emit_core(
1517                    builders_or_callback,
1518                    seen_tees,
1519                    built_tees,
1520                    next_stmt_id,
1521                    fold_hooked_idents,
1522                );
1523
1524                match builders_or_callback {
1525                    BuildersOrCallback::Builders(graph_builders) => {
1526                        graph_builders
1527                            .get_dfir_mut(&input.metadata().location_id)
1528                            .add_dfir(
1529                                parse_quote! {
1530                                    #input_ident -> for_each(#f);
1531                                },
1532                                None,
1533                                Some(&next_stmt_id.to_string()),
1534                            );
1535                    }
1536                    BuildersOrCallback::Callback(leaf_callback, _) => {
1537                        leaf_callback(self, next_stmt_id);
1538                    }
1539                }
1540
1541                let _ = next_stmt_id.get_and_increment();
1542            }
1543
1544            HydroRoot::SendExternal {
1545                serialize_fn,
1546                instantiate_fn,
1547                input,
1548                ..
1549            } => {
1550                let input_ident = input.emit_core(
1551                    builders_or_callback,
1552                    seen_tees,
1553                    built_tees,
1554                    next_stmt_id,
1555                    fold_hooked_idents,
1556                );
1557
1558                match builders_or_callback {
1559                    BuildersOrCallback::Builders(graph_builders) => {
1560                        let (sink_expr, _) = match instantiate_fn {
1561                            DebugInstantiate::Building => (
1562                                syn::parse_quote!(DUMMY_SINK),
1563                                syn::parse_quote!(DUMMY_SOURCE),
1564                            ),
1565
1566                            DebugInstantiate::Finalized(finalized) => {
1567                                (finalized.sink.clone(), finalized.source.clone())
1568                            }
1569                        };
1570
1571                        graph_builders.create_external_output(
1572                            &input.metadata().location_id,
1573                            sink_expr,
1574                            &input_ident,
1575                            serialize_fn.as_ref(),
1576                            *next_stmt_id,
1577                        );
1578                    }
1579                    BuildersOrCallback::Callback(leaf_callback, _) => {
1580                        leaf_callback(self, next_stmt_id);
1581                    }
1582                }
1583
1584                let _ = next_stmt_id.get_and_increment();
1585            }
1586
1587            HydroRoot::DestSink { sink, input, .. } => {
1588                let input_ident = input.emit_core(
1589                    builders_or_callback,
1590                    seen_tees,
1591                    built_tees,
1592                    next_stmt_id,
1593                    fold_hooked_idents,
1594                );
1595
1596                match builders_or_callback {
1597                    BuildersOrCallback::Builders(graph_builders) => {
1598                        graph_builders
1599                            .get_dfir_mut(&input.metadata().location_id)
1600                            .add_dfir(
1601                                parse_quote! {
1602                                    #input_ident -> dest_sink(#sink);
1603                                },
1604                                None,
1605                                Some(&next_stmt_id.to_string()),
1606                            );
1607                    }
1608                    BuildersOrCallback::Callback(leaf_callback, _) => {
1609                        leaf_callback(self, next_stmt_id);
1610                    }
1611                }
1612
1613                let _ = next_stmt_id.get_and_increment();
1614            }
1615
1616            HydroRoot::CycleSink {
1617                cycle_id, input, ..
1618            } => {
1619                let input_ident = input.emit_core(
1620                    builders_or_callback,
1621                    seen_tees,
1622                    built_tees,
1623                    next_stmt_id,
1624                    fold_hooked_idents,
1625                );
1626
1627                match builders_or_callback {
1628                    BuildersOrCallback::Builders(graph_builders) => {
1629                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1630                            CollectionKind::KeyedSingleton {
1631                                key_type,
1632                                value_type,
1633                                ..
1634                            }
1635                            | CollectionKind::KeyedStream {
1636                                key_type,
1637                                value_type,
1638                                ..
1639                            } => {
1640                                parse_quote!((#key_type, #value_type))
1641                            }
1642                            CollectionKind::Stream { element_type, .. }
1643                            | CollectionKind::Singleton { element_type, .. }
1644                            | CollectionKind::Optional { element_type, .. } => {
1645                                parse_quote!(#element_type)
1646                            }
1647                        };
1648
1649                        let cycle_id_ident = cycle_id.as_ident();
1650                        graph_builders
1651                            .get_dfir_mut(&input.metadata().location_id)
1652                            .add_dfir(
1653                                parse_quote! {
1654                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1655                                },
1656                                None,
1657                                None,
1658                            );
1659                    }
1660                    // No ID, no callback
1661                    BuildersOrCallback::Callback(_, _) => {}
1662                }
1663            }
1664
1665            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1666                let input_ident = input.emit_core(
1667                    builders_or_callback,
1668                    seen_tees,
1669                    built_tees,
1670                    next_stmt_id,
1671                    fold_hooked_idents,
1672                );
1673
1674                match builders_or_callback {
1675                    BuildersOrCallback::Builders(graph_builders) => {
1676                        graph_builders
1677                            .get_dfir_mut(&input.metadata().location_id)
1678                            .add_dfir(
1679                                parse_quote! {
1680                                    #input_ident -> for_each(&mut #ident);
1681                                },
1682                                None,
1683                                Some(&next_stmt_id.to_string()),
1684                            );
1685                    }
1686                    BuildersOrCallback::Callback(leaf_callback, _) => {
1687                        leaf_callback(self, next_stmt_id);
1688                    }
1689                }
1690
1691                let _ = next_stmt_id.get_and_increment();
1692            }
1693
1694            HydroRoot::Null { input, .. } => {
1695                let input_ident = input.emit_core(
1696                    builders_or_callback,
1697                    seen_tees,
1698                    built_tees,
1699                    next_stmt_id,
1700                    fold_hooked_idents,
1701                );
1702
1703                match builders_or_callback {
1704                    BuildersOrCallback::Builders(graph_builders) => {
1705                        graph_builders
1706                            .get_dfir_mut(&input.metadata().location_id)
1707                            .add_dfir(
1708                                parse_quote! {
1709                                    #input_ident -> for_each(|_| {});
1710                                },
1711                                None,
1712                                Some(&next_stmt_id.to_string()),
1713                            );
1714                    }
1715                    BuildersOrCallback::Callback(leaf_callback, _) => {
1716                        leaf_callback(self, next_stmt_id);
1717                    }
1718                }
1719
1720                let _ = next_stmt_id.get_and_increment();
1721            }
1722        }
1723    }
1724
1725    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1726        match self {
1727            HydroRoot::ForEach { op_metadata, .. }
1728            | HydroRoot::SendExternal { op_metadata, .. }
1729            | HydroRoot::DestSink { op_metadata, .. }
1730            | HydroRoot::CycleSink { op_metadata, .. }
1731            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1732            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1733        }
1734    }
1735
1736    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1737        match self {
1738            HydroRoot::ForEach { op_metadata, .. }
1739            | HydroRoot::SendExternal { op_metadata, .. }
1740            | HydroRoot::DestSink { op_metadata, .. }
1741            | HydroRoot::CycleSink { op_metadata, .. }
1742            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1743            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1744        }
1745    }
1746
1747    pub fn input(&self) -> &HydroNode {
1748        match self {
1749            HydroRoot::ForEach { input, .. }
1750            | HydroRoot::SendExternal { input, .. }
1751            | HydroRoot::DestSink { input, .. }
1752            | HydroRoot::CycleSink { input, .. }
1753            | HydroRoot::EmbeddedOutput { input, .. }
1754            | HydroRoot::Null { input, .. } => input,
1755        }
1756    }
1757
1758    pub fn input_metadata(&self) -> &HydroIrMetadata {
1759        self.input().metadata()
1760    }
1761
1762    pub fn print_root(&self) -> String {
1763        match self {
1764            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1765            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1766            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1767            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1768            HydroRoot::EmbeddedOutput { ident, .. } => {
1769                format!("EmbeddedOutput({})", ident)
1770            }
1771            HydroRoot::Null { .. } => "Null".to_owned(),
1772        }
1773    }
1774
1775    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1776        match self {
1777            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1778                transform(f);
1779            }
1780            HydroRoot::SendExternal { .. }
1781            | HydroRoot::CycleSink { .. }
1782            | HydroRoot::EmbeddedOutput { .. }
1783            | HydroRoot::Null { .. } => {}
1784        }
1785    }
1786}
1787
1788#[cfg(feature = "build")]
1789fn tick_of(loc: &LocationId) -> Option<ClockId> {
1790    match loc {
1791        LocationId::Tick(id, _) => Some(*id),
1792        LocationId::Atomic(inner) => tick_of(inner),
1793        _ => None,
1794    }
1795}
1796
1797#[cfg(feature = "build")]
1798fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1799    match loc {
1800        LocationId::Tick(id, inner) => {
1801            *id = uf_find(uf, *id);
1802            remap_location(inner, uf);
1803        }
1804        LocationId::Atomic(inner) => {
1805            remap_location(inner, uf);
1806        }
1807        LocationId::Process(_) | LocationId::Cluster(_) => {}
1808    }
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1813    let p = *parent.get(&x).unwrap_or(&x);
1814    if p == x {
1815        return x;
1816    }
1817    let root = uf_find(parent, p);
1818    parent.insert(x, root);
1819    root
1820}
1821
1822#[cfg(feature = "build")]
1823fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1824    let ra = uf_find(parent, a);
1825    let rb = uf_find(parent, b);
1826    if ra != rb {
1827        parent.insert(ra, rb);
1828    }
1829}
1830
1831/// Traverse the IR to build a union-find that unifies tick IDs connected
1832/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1833/// rewrite all `LocationId`s to use the representative tick ID.
1834#[cfg(feature = "build")]
1835pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1836    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1837
1838    // Pass 1: collect unifications.
1839    transform_bottom_up(
1840        ir,
1841        &mut |_| {},
1842        &mut |node: &mut HydroNode| match node {
1843            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1844                if let (Some(a), Some(b)) = (
1845                    tick_of(&inner.metadata().location_id),
1846                    tick_of(&metadata.location_id),
1847                ) {
1848                    uf_union(&mut uf, a, b);
1849                }
1850            }
1851            HydroNode::Chain {
1852                first,
1853                second,
1854                metadata,
1855            }
1856            | HydroNode::ChainFirst {
1857                first,
1858                second,
1859                metadata,
1860            }
1861            | HydroNode::MergeOrdered {
1862                first,
1863                second,
1864                metadata,
1865            } => {
1866                if let (Some(a), Some(b)) = (
1867                    tick_of(&first.metadata().location_id),
1868                    tick_of(&metadata.location_id),
1869                ) {
1870                    uf_union(&mut uf, a, b);
1871                }
1872                if let (Some(a), Some(b)) = (
1873                    tick_of(&second.metadata().location_id),
1874                    tick_of(&metadata.location_id),
1875                ) {
1876                    uf_union(&mut uf, a, b);
1877                }
1878            }
1879            _ => {}
1880        },
1881        false,
1882    );
1883
1884    // Pass 2: rewrite all LocationIds.
1885    transform_bottom_up(
1886        ir,
1887        &mut |_| {},
1888        &mut |node: &mut HydroNode| {
1889            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1890        },
1891        false,
1892    );
1893}
1894
1895#[cfg(feature = "build")]
1896pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1897    let mut builders = SecondaryMap::new();
1898    let mut seen_tees = HashMap::new();
1899    let mut built_tees = HashMap::new();
1900    let mut next_stmt_id = StmtId::default();
1901    let mut fold_hooked_idents = HashSet::new();
1902    for leaf in ir {
1903        leaf.emit(
1904            &mut builders,
1905            &mut seen_tees,
1906            &mut built_tees,
1907            &mut next_stmt_id,
1908            &mut fold_hooked_idents,
1909        );
1910    }
1911    builders
1912}
1913
1914#[cfg(feature = "build")]
1915pub fn traverse_dfir(
1916    ir: &mut [HydroRoot],
1917    transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1918    transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1919) {
1920    let mut seen_tees = HashMap::new();
1921    let mut built_tees = HashMap::new();
1922    let mut next_stmt_id = StmtId::default();
1923    let mut fold_hooked_idents = HashSet::new();
1924    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1925    ir.iter_mut().for_each(|leaf| {
1926        leaf.emit_core(
1927            &mut callback,
1928            &mut seen_tees,
1929            &mut built_tees,
1930            &mut next_stmt_id,
1931            &mut fold_hooked_idents,
1932        );
1933    });
1934}
1935
1936pub fn transform_bottom_up(
1937    ir: &mut [HydroRoot],
1938    transform_root: &mut impl FnMut(&mut HydroRoot),
1939    transform_node: &mut impl FnMut(&mut HydroNode),
1940    check_well_formed: bool,
1941) {
1942    let mut seen_tees = HashMap::new();
1943    ir.iter_mut().for_each(|leaf| {
1944        leaf.transform_bottom_up(
1945            transform_root,
1946            transform_node,
1947            &mut seen_tees,
1948            check_well_formed,
1949        );
1950    });
1951}
1952
1953pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1954    let mut seen_tees = HashMap::new();
1955    ir.iter()
1956        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1957        .collect()
1958}
1959
1960type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1961thread_local! {
1962    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1963    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1964    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1965    /// on subsequent encounters, preventing infinite loops.
1966    static SERIALIZED_SHARED: PrintedTees
1967        = const { RefCell::new(None) };
1968}
1969
1970pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1971    PRINTED_TEES.with(|printed_tees| {
1972        let mut printed_tees_mut = printed_tees.borrow_mut();
1973        *printed_tees_mut = Some((0, HashMap::new()));
1974        drop(printed_tees_mut);
1975
1976        let ret = f();
1977
1978        let mut printed_tees_mut = printed_tees.borrow_mut();
1979        *printed_tees_mut = None;
1980
1981        ret
1982    })
1983}
1984
1985/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1986/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1987/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1988/// back-reference.  The tracking state is restored when `f` returns or panics.
1989pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1990    let _guard = SerializedSharedGuard::enter();
1991    f()
1992}
1993
1994/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1995/// making `serialize_dedup_shared` re-entrant and panic-safe.
1996struct SerializedSharedGuard {
1997    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1998}
1999
2000impl SerializedSharedGuard {
2001    fn enter() -> Self {
2002        let previous = SERIALIZED_SHARED.with(|cell| {
2003            let mut guard = cell.borrow_mut();
2004            guard.replace((0, HashMap::new()))
2005        });
2006        Self { previous }
2007    }
2008}
2009
2010impl Drop for SerializedSharedGuard {
2011    fn drop(&mut self) {
2012        SERIALIZED_SHARED.with(|cell| {
2013            *cell.borrow_mut() = self.previous.take();
2014        });
2015    }
2016}
2017
2018pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2019
2020impl serde::Serialize for SharedNode {
2021    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2022    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2023    /// same subtree every time and, if the graph ever contains a cycle, loop
2024    /// forever.
2025    ///
2026    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2027    /// integer id.  The first time we see a pointer we assign it the next id and
2028    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2029    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2030    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2031    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2032        SERIALIZED_SHARED.with(|cell| {
2033            let mut guard = cell.borrow_mut();
2034            // (next_id, pointer → assigned_id)
2035            let state = guard.as_mut().ok_or_else(|| {
2036                serde::ser::Error::custom(
2037                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2038                )
2039            })?;
2040            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2041
2042            if let Some(&id) = state.1.get(&ptr) {
2043                drop(guard);
2044                use serde::ser::SerializeMap;
2045                let mut map = serializer.serialize_map(Some(1))?;
2046                map.serialize_entry("$shared_ref", &id)?;
2047                map.end()
2048            } else {
2049                let id = state.0;
2050                state.0 += 1;
2051                state.1.insert(ptr, id);
2052                drop(guard);
2053
2054                use serde::ser::SerializeMap;
2055                let mut map = serializer.serialize_map(Some(2))?;
2056                map.serialize_entry("$shared", &id)?;
2057                map.serialize_entry("node", &*self.0.borrow())?;
2058                map.end()
2059            }
2060        })
2061    }
2062}
2063
2064impl SharedNode {
2065    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2066        Rc::as_ptr(&self.0)
2067    }
2068}
2069
2070impl Debug for SharedNode {
2071    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2072        PRINTED_TEES.with(|printed_tees| {
2073            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2074            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2075
2076            if let Some(printed_tees_mut) = printed_tees_mut {
2077                if let Some(existing) = printed_tees_mut
2078                    .1
2079                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2080                {
2081                    write!(f, "<shared {}>", existing)
2082                } else {
2083                    let next_id = printed_tees_mut.0;
2084                    printed_tees_mut.0 += 1;
2085                    printed_tees_mut
2086                        .1
2087                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2088                    drop(printed_tees_mut_borrow);
2089                    write!(f, "<shared {}>: ", next_id)?;
2090                    Debug::fmt(&self.0.borrow(), f)
2091                }
2092            } else {
2093                drop(printed_tees_mut_borrow);
2094                write!(f, "<shared>: ")?;
2095                Debug::fmt(&self.0.borrow(), f)
2096            }
2097        })
2098    }
2099}
2100
2101impl Hash for SharedNode {
2102    fn hash<H: Hasher>(&self, state: &mut H) {
2103        self.0.borrow_mut().hash(state);
2104    }
2105}
2106
2107#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2108pub enum BoundKind {
2109    Unbounded,
2110    Bounded,
2111}
2112
2113#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2114pub enum StreamOrder {
2115    NoOrder,
2116    TotalOrder,
2117}
2118
2119#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2120pub enum StreamRetry {
2121    AtLeastOnce,
2122    ExactlyOnce,
2123}
2124
2125#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2126pub enum KeyedSingletonBoundKind {
2127    Unbounded,
2128    MonotonicValue,
2129    BoundedValue,
2130    Bounded,
2131}
2132
2133#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2134pub enum SingletonBoundKind {
2135    Unbounded,
2136    Monotonic,
2137    Bounded,
2138}
2139
2140#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2141pub enum CollectionKind {
2142    Stream {
2143        bound: BoundKind,
2144        order: StreamOrder,
2145        retry: StreamRetry,
2146        element_type: DebugType,
2147    },
2148    Singleton {
2149        bound: SingletonBoundKind,
2150        element_type: DebugType,
2151    },
2152    Optional {
2153        bound: BoundKind,
2154        element_type: DebugType,
2155    },
2156    KeyedStream {
2157        bound: BoundKind,
2158        value_order: StreamOrder,
2159        value_retry: StreamRetry,
2160        key_type: DebugType,
2161        value_type: DebugType,
2162    },
2163    KeyedSingleton {
2164        bound: KeyedSingletonBoundKind,
2165        key_type: DebugType,
2166        value_type: DebugType,
2167    },
2168}
2169
2170impl CollectionKind {
2171    pub fn is_bounded(&self) -> bool {
2172        matches!(
2173            self,
2174            CollectionKind::Stream {
2175                bound: BoundKind::Bounded,
2176                ..
2177            } | CollectionKind::Singleton {
2178                bound: SingletonBoundKind::Bounded,
2179                ..
2180            } | CollectionKind::Optional {
2181                bound: BoundKind::Bounded,
2182                ..
2183            } | CollectionKind::KeyedStream {
2184                bound: BoundKind::Bounded,
2185                ..
2186            } | CollectionKind::KeyedSingleton {
2187                bound: KeyedSingletonBoundKind::Bounded,
2188                ..
2189            }
2190        )
2191    }
2192}
2193
2194#[derive(Clone, serde::Serialize)]
2195pub struct HydroIrMetadata {
2196    pub location_id: LocationId,
2197    pub collection_kind: CollectionKind,
2198    pub consistency: Option<ClusterConsistency>,
2199    pub cardinality: Option<usize>,
2200    pub tag: Option<String>,
2201    pub op: HydroIrOpMetadata,
2202}
2203
2204// HydroIrMetadata shouldn't be used to hash or compare
2205impl Hash for HydroIrMetadata {
2206    fn hash<H: Hasher>(&self, _: &mut H) {}
2207}
2208
2209impl PartialEq for HydroIrMetadata {
2210    fn eq(&self, _: &Self) -> bool {
2211        true
2212    }
2213}
2214
2215impl Eq for HydroIrMetadata {}
2216
2217impl Debug for HydroIrMetadata {
2218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2219        f.debug_struct("HydroIrMetadata")
2220            .field("location_id", &self.location_id)
2221            .field("collection_kind", &self.collection_kind)
2222            .finish()
2223    }
2224}
2225
2226/// Metadata that is specific to the operator itself, rather than its outputs.
2227/// This is available on _both_ inner nodes and roots.
2228#[derive(Clone, serde::Serialize)]
2229pub struct HydroIrOpMetadata {
2230    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2231    pub backtrace: Backtrace,
2232    pub cpu_usage: Option<f64>,
2233    pub network_recv_cpu_usage: Option<f64>,
2234    pub id: Option<usize>,
2235}
2236
2237impl HydroIrOpMetadata {
2238    #[expect(
2239        clippy::new_without_default,
2240        reason = "explicit calls to new ensure correct backtrace bounds"
2241    )]
2242    pub fn new() -> HydroIrOpMetadata {
2243        Self::new_with_skip(1)
2244    }
2245
2246    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2247        HydroIrOpMetadata {
2248            backtrace: Backtrace::get_backtrace(2 + skip_count),
2249            cpu_usage: None,
2250            network_recv_cpu_usage: None,
2251            id: None,
2252        }
2253    }
2254}
2255
2256impl Debug for HydroIrOpMetadata {
2257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2258        f.debug_struct("HydroIrOpMetadata").finish()
2259    }
2260}
2261
2262impl Hash for HydroIrOpMetadata {
2263    fn hash<H: Hasher>(&self, _: &mut H) {}
2264}
2265
2266/// An intermediate node in a Hydro graph, which consumes data
2267/// from upstream nodes and emits data to downstream nodes.
2268#[derive(Debug, Hash, serde::Serialize)]
2269pub enum HydroNode {
2270    Placeholder,
2271
2272    /// Manually "casts" between two different collection kinds.
2273    ///
2274    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2275    /// correctness checks. In particular, the user must ensure that every possible
2276    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2277    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2278    /// collection. This ensures that the simulator does not miss any possible outputs.
2279    Cast {
2280        inner: Box<HydroNode>,
2281        metadata: HydroIrMetadata,
2282    },
2283
2284    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2285    /// interpretation of the input stream.
2286    ///
2287    /// In production, this simply passes through the input, but in simulation, this operator
2288    /// explicitly selects a randomized interpretation.
2289    ObserveNonDet {
2290        inner: Box<HydroNode>,
2291        trusted: bool, // if true, we do not need to simulate non-determinism
2292        metadata: HydroIrMetadata,
2293    },
2294
2295    Source {
2296        source: HydroSource,
2297        metadata: HydroIrMetadata,
2298    },
2299
2300    SingletonSource {
2301        value: DebugExpr,
2302        first_tick_only: bool,
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    CycleSource {
2307        cycle_id: CycleId,
2308        metadata: HydroIrMetadata,
2309    },
2310
2311    Tee {
2312        inner: SharedNode,
2313        metadata: HydroIrMetadata,
2314    },
2315
2316    /// A singleton materialization point. Wraps a SharedNode so that:
2317    /// - The pipe output delivers the single item to one consumer
2318    /// - `#var` references can borrow the value from the singleton slot
2319    ///
2320    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2321    ///
2322    /// Uses the same `built_tees` dedup pattern as `Tee`.
2323    Singleton {
2324        inner: SharedNode,
2325        metadata: HydroIrMetadata,
2326    },
2327
2328    Partition {
2329        inner: SharedNode,
2330        f: ClosureExpr,
2331        is_true: bool,
2332        metadata: HydroIrMetadata,
2333    },
2334
2335    BeginAtomic {
2336        inner: Box<HydroNode>,
2337        metadata: HydroIrMetadata,
2338    },
2339
2340    EndAtomic {
2341        inner: Box<HydroNode>,
2342        metadata: HydroIrMetadata,
2343    },
2344
2345    Batch {
2346        inner: Box<HydroNode>,
2347        metadata: HydroIrMetadata,
2348    },
2349
2350    YieldConcat {
2351        inner: Box<HydroNode>,
2352        metadata: HydroIrMetadata,
2353    },
2354
2355    Chain {
2356        first: Box<HydroNode>,
2357        second: Box<HydroNode>,
2358        metadata: HydroIrMetadata,
2359    },
2360
2361    MergeOrdered {
2362        first: Box<HydroNode>,
2363        second: Box<HydroNode>,
2364        metadata: HydroIrMetadata,
2365    },
2366
2367    ChainFirst {
2368        first: Box<HydroNode>,
2369        second: Box<HydroNode>,
2370        metadata: HydroIrMetadata,
2371    },
2372
2373    CrossProduct {
2374        left: Box<HydroNode>,
2375        right: Box<HydroNode>,
2376        metadata: HydroIrMetadata,
2377    },
2378
2379    CrossSingleton {
2380        left: Box<HydroNode>,
2381        right: Box<HydroNode>,
2382        metadata: HydroIrMetadata,
2383    },
2384
2385    Join {
2386        left: Box<HydroNode>,
2387        right: Box<HydroNode>,
2388        metadata: HydroIrMetadata,
2389    },
2390
2391    /// Asymmetric join where the right (build) side is bounded.
2392    /// The build side is accumulated (stratum-delayed) into a hash table,
2393    /// then the left (probe) side streams through preserving its ordering.
2394    JoinHalf {
2395        left: Box<HydroNode>,
2396        right: Box<HydroNode>,
2397        metadata: HydroIrMetadata,
2398    },
2399
2400    Difference {
2401        pos: Box<HydroNode>,
2402        neg: Box<HydroNode>,
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    AntiJoin {
2407        pos: Box<HydroNode>,
2408        neg: Box<HydroNode>,
2409        metadata: HydroIrMetadata,
2410    },
2411
2412    ResolveFutures {
2413        input: Box<HydroNode>,
2414        metadata: HydroIrMetadata,
2415    },
2416    ResolveFuturesBlocking {
2417        input: Box<HydroNode>,
2418        metadata: HydroIrMetadata,
2419    },
2420    ResolveFuturesOrdered {
2421        input: Box<HydroNode>,
2422        metadata: HydroIrMetadata,
2423    },
2424
2425    Map {
2426        f: ClosureExpr,
2427        input: Box<HydroNode>,
2428        metadata: HydroIrMetadata,
2429    },
2430    FlatMap {
2431        f: ClosureExpr,
2432        input: Box<HydroNode>,
2433        metadata: HydroIrMetadata,
2434    },
2435    FlatMapStreamBlocking {
2436        f: ClosureExpr,
2437        input: Box<HydroNode>,
2438        metadata: HydroIrMetadata,
2439    },
2440    Filter {
2441        f: ClosureExpr,
2442        input: Box<HydroNode>,
2443        metadata: HydroIrMetadata,
2444    },
2445    FilterMap {
2446        f: ClosureExpr,
2447        input: Box<HydroNode>,
2448        metadata: HydroIrMetadata,
2449    },
2450
2451    DeferTick {
2452        input: Box<HydroNode>,
2453        metadata: HydroIrMetadata,
2454    },
2455    Enumerate {
2456        input: Box<HydroNode>,
2457        metadata: HydroIrMetadata,
2458    },
2459    Inspect {
2460        f: ClosureExpr,
2461        input: Box<HydroNode>,
2462        metadata: HydroIrMetadata,
2463    },
2464
2465    Unique {
2466        input: Box<HydroNode>,
2467        metadata: HydroIrMetadata,
2468    },
2469
2470    Sort {
2471        input: Box<HydroNode>,
2472        metadata: HydroIrMetadata,
2473    },
2474    Fold {
2475        init: ClosureExpr,
2476        acc: ClosureExpr,
2477        input: Box<HydroNode>,
2478        metadata: HydroIrMetadata,
2479    },
2480
2481    Scan {
2482        init: ClosureExpr,
2483        acc: ClosureExpr,
2484        input: Box<HydroNode>,
2485        metadata: HydroIrMetadata,
2486    },
2487    ScanAsyncBlocking {
2488        init: ClosureExpr,
2489        acc: ClosureExpr,
2490        input: Box<HydroNode>,
2491        metadata: HydroIrMetadata,
2492    },
2493    FoldKeyed {
2494        init: ClosureExpr,
2495        acc: ClosureExpr,
2496        input: Box<HydroNode>,
2497        metadata: HydroIrMetadata,
2498    },
2499
2500    Reduce {
2501        f: ClosureExpr,
2502        input: Box<HydroNode>,
2503        metadata: HydroIrMetadata,
2504    },
2505    ReduceKeyed {
2506        f: ClosureExpr,
2507        input: Box<HydroNode>,
2508        metadata: HydroIrMetadata,
2509    },
2510    ReduceKeyedWatermark {
2511        f: ClosureExpr,
2512        input: Box<HydroNode>,
2513        watermark: Box<HydroNode>,
2514        metadata: HydroIrMetadata,
2515    },
2516
2517    Network {
2518        name: Option<String>,
2519        networking_info: crate::networking::NetworkingInfo,
2520        serialize_fn: Option<DebugExpr>,
2521        instantiate_fn: DebugInstantiate,
2522        deserialize_fn: Option<DebugExpr>,
2523        input: Box<HydroNode>,
2524        metadata: HydroIrMetadata,
2525    },
2526
2527    ExternalInput {
2528        from_external_key: LocationKey,
2529        from_port_id: ExternalPortId,
2530        from_many: bool,
2531        codec_type: DebugType,
2532        #[serde(skip)]
2533        port_hint: NetworkHint,
2534        instantiate_fn: DebugInstantiate,
2535        deserialize_fn: Option<DebugExpr>,
2536        metadata: HydroIrMetadata,
2537    },
2538
2539    Counter {
2540        tag: String,
2541        duration: DebugExpr,
2542        prefix: String,
2543        input: Box<HydroNode>,
2544        metadata: HydroIrMetadata,
2545    },
2546
2547    AssertIsConsistent {
2548        inner: Box<HydroNode>,
2549        trusted: bool,
2550        metadata: HydroIrMetadata,
2551    },
2552
2553    UnboundSingleton {
2554        inner: Box<HydroNode>,
2555        metadata: HydroIrMetadata,
2556    },
2557}
2558
2559pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2560pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2561
2562impl HydroNode {
2563    pub fn transform_bottom_up(
2564        &mut self,
2565        transform: &mut impl FnMut(&mut HydroNode),
2566        seen_tees: &mut SeenSharedNodes,
2567        check_well_formed: bool,
2568    ) {
2569        self.transform_children(
2570            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2571            seen_tees,
2572        );
2573
2574        transform(self);
2575
2576        let self_location = self.metadata().location_id.root();
2577
2578        if check_well_formed {
2579            match &*self {
2580                HydroNode::Network { .. } => {}
2581                _ => {
2582                    self.input_metadata().iter().for_each(|i| {
2583                        if i.location_id.root() != self_location {
2584                            panic!(
2585                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2586                                i,
2587                                i.location_id.root(),
2588                                self,
2589                                self_location
2590                            )
2591                        }
2592                    });
2593                }
2594            }
2595        }
2596    }
2597
2598    #[inline(always)]
2599    pub fn transform_children(
2600        &mut self,
2601        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2602        seen_tees: &mut SeenSharedNodes,
2603    ) {
2604        match self {
2605            HydroNode::Placeholder => {
2606                panic!();
2607            }
2608
2609            HydroNode::Source { .. }
2610            | HydroNode::SingletonSource { .. }
2611            | HydroNode::CycleSource { .. }
2612            | HydroNode::ExternalInput { .. } => {}
2613
2614            HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2615                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2616                    *inner = SharedNode(transformed.clone());
2617                } else {
2618                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2619                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2620                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2621                    transform(&mut orig, seen_tees);
2622                    *transformed_cell.borrow_mut() = orig;
2623                    *inner = SharedNode(transformed_cell);
2624                }
2625            }
2626
2627            HydroNode::Partition { inner, f, .. } => {
2628                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2629                    *inner = SharedNode(transformed.clone());
2630                } else {
2631                    f.transform_children(&mut transform, seen_tees);
2632                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2633                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2634                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2635                    transform(&mut orig, seen_tees);
2636                    *transformed_cell.borrow_mut() = orig;
2637                    *inner = SharedNode(transformed_cell);
2638                }
2639            }
2640
2641            HydroNode::Cast { inner, .. }
2642            | HydroNode::ObserveNonDet { inner, .. }
2643            | HydroNode::BeginAtomic { inner, .. }
2644            | HydroNode::EndAtomic { inner, .. }
2645            | HydroNode::Batch { inner, .. }
2646            | HydroNode::YieldConcat { inner, .. }
2647            | HydroNode::UnboundSingleton { inner, .. }
2648            | HydroNode::AssertIsConsistent { inner, .. } => {
2649                transform(inner.as_mut(), seen_tees);
2650            }
2651
2652            HydroNode::Chain { first, second, .. } => {
2653                transform(first.as_mut(), seen_tees);
2654                transform(second.as_mut(), seen_tees);
2655            }
2656
2657            HydroNode::MergeOrdered { first, second, .. } => {
2658                transform(first.as_mut(), seen_tees);
2659                transform(second.as_mut(), seen_tees);
2660            }
2661
2662            HydroNode::ChainFirst { first, second, .. } => {
2663                transform(first.as_mut(), seen_tees);
2664                transform(second.as_mut(), seen_tees);
2665            }
2666
2667            HydroNode::CrossSingleton { left, right, .. }
2668            | HydroNode::CrossProduct { left, right, .. }
2669            | HydroNode::Join { left, right, .. }
2670            | HydroNode::JoinHalf { left, right, .. } => {
2671                transform(left.as_mut(), seen_tees);
2672                transform(right.as_mut(), seen_tees);
2673            }
2674
2675            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2676                transform(pos.as_mut(), seen_tees);
2677                transform(neg.as_mut(), seen_tees);
2678            }
2679
2680            HydroNode::Map { f, input, .. } => {
2681                f.transform_children(&mut transform, seen_tees);
2682                transform(input.as_mut(), seen_tees);
2683            }
2684            HydroNode::FlatMap { f, input, .. }
2685            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2686            | HydroNode::Filter { f, input, .. }
2687            | HydroNode::FilterMap { f, input, .. }
2688            | HydroNode::Inspect { f, input, .. }
2689            | HydroNode::Reduce { f, input, .. }
2690            | HydroNode::ReduceKeyed { f, input, .. } => {
2691                f.transform_children(&mut transform, seen_tees);
2692                transform(input.as_mut(), seen_tees);
2693            }
2694            HydroNode::ReduceKeyedWatermark {
2695                f,
2696                input,
2697                watermark,
2698                ..
2699            } => {
2700                f.transform_children(&mut transform, seen_tees);
2701                transform(input.as_mut(), seen_tees);
2702                transform(watermark.as_mut(), seen_tees);
2703            }
2704            HydroNode::Fold {
2705                init, acc, input, ..
2706            }
2707            | HydroNode::Scan {
2708                init, acc, input, ..
2709            }
2710            | HydroNode::ScanAsyncBlocking {
2711                init, acc, input, ..
2712            }
2713            | HydroNode::FoldKeyed {
2714                init, acc, input, ..
2715            } => {
2716                init.transform_children(&mut transform, seen_tees);
2717                acc.transform_children(&mut transform, seen_tees);
2718                transform(input.as_mut(), seen_tees);
2719            }
2720            HydroNode::ResolveFutures { input, .. }
2721            | HydroNode::ResolveFuturesBlocking { input, .. }
2722            | HydroNode::ResolveFuturesOrdered { input, .. }
2723            | HydroNode::Sort { input, .. }
2724            | HydroNode::DeferTick { input, .. }
2725            | HydroNode::Enumerate { input, .. }
2726            | HydroNode::Unique { input, .. }
2727            | HydroNode::Network { input, .. }
2728            | HydroNode::Counter { input, .. } => {
2729                transform(input.as_mut(), seen_tees);
2730            }
2731        }
2732    }
2733
2734    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2735        match self {
2736            HydroNode::Placeholder => HydroNode::Placeholder,
2737            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2738                inner: Box::new(inner.deep_clone(seen_tees)),
2739                metadata: metadata.clone(),
2740            },
2741            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2742                inner: Box::new(inner.deep_clone(seen_tees)),
2743                metadata: metadata.clone(),
2744            },
2745            HydroNode::ObserveNonDet {
2746                inner,
2747                trusted,
2748                metadata,
2749            } => HydroNode::ObserveNonDet {
2750                inner: Box::new(inner.deep_clone(seen_tees)),
2751                trusted: *trusted,
2752                metadata: metadata.clone(),
2753            },
2754            HydroNode::AssertIsConsistent {
2755                inner,
2756                trusted,
2757                metadata,
2758            } => HydroNode::AssertIsConsistent {
2759                inner: Box::new(inner.deep_clone(seen_tees)),
2760                trusted: *trusted,
2761                metadata: metadata.clone(),
2762            },
2763            HydroNode::Source { source, metadata } => HydroNode::Source {
2764                source: source.clone(),
2765                metadata: metadata.clone(),
2766            },
2767            HydroNode::SingletonSource {
2768                value,
2769                first_tick_only,
2770                metadata,
2771            } => HydroNode::SingletonSource {
2772                value: value.clone(),
2773                first_tick_only: *first_tick_only,
2774                metadata: metadata.clone(),
2775            },
2776            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2777                cycle_id: *cycle_id,
2778                metadata: metadata.clone(),
2779            },
2780            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2781                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2782                    SharedNode(transformed.clone())
2783                } else {
2784                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2785                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2786                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2787                    *new_rc.borrow_mut() = cloned;
2788                    SharedNode(new_rc)
2789                };
2790                if matches!(self, HydroNode::Singleton { .. }) {
2791                    HydroNode::Singleton {
2792                        inner: cloned_inner,
2793                        metadata: metadata.clone(),
2794                    }
2795                } else {
2796                    HydroNode::Tee {
2797                        inner: cloned_inner,
2798                        metadata: metadata.clone(),
2799                    }
2800                }
2801            }
2802            HydroNode::Partition {
2803                inner,
2804                f,
2805                is_true,
2806                metadata,
2807            } => {
2808                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2809                    HydroNode::Partition {
2810                        inner: SharedNode(transformed.clone()),
2811                        f: f.deep_clone(seen_tees),
2812                        is_true: *is_true,
2813                        metadata: metadata.clone(),
2814                    }
2815                } else {
2816                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2817                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2818                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2819                    *new_rc.borrow_mut() = cloned;
2820                    HydroNode::Partition {
2821                        inner: SharedNode(new_rc),
2822                        f: f.deep_clone(seen_tees),
2823                        is_true: *is_true,
2824                        metadata: metadata.clone(),
2825                    }
2826                }
2827            }
2828            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2829                inner: Box::new(inner.deep_clone(seen_tees)),
2830                metadata: metadata.clone(),
2831            },
2832            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2833                inner: Box::new(inner.deep_clone(seen_tees)),
2834                metadata: metadata.clone(),
2835            },
2836            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2837                inner: Box::new(inner.deep_clone(seen_tees)),
2838                metadata: metadata.clone(),
2839            },
2840            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2841                inner: Box::new(inner.deep_clone(seen_tees)),
2842                metadata: metadata.clone(),
2843            },
2844            HydroNode::Chain {
2845                first,
2846                second,
2847                metadata,
2848            } => HydroNode::Chain {
2849                first: Box::new(first.deep_clone(seen_tees)),
2850                second: Box::new(second.deep_clone(seen_tees)),
2851                metadata: metadata.clone(),
2852            },
2853            HydroNode::MergeOrdered {
2854                first,
2855                second,
2856                metadata,
2857            } => HydroNode::MergeOrdered {
2858                first: Box::new(first.deep_clone(seen_tees)),
2859                second: Box::new(second.deep_clone(seen_tees)),
2860                metadata: metadata.clone(),
2861            },
2862            HydroNode::ChainFirst {
2863                first,
2864                second,
2865                metadata,
2866            } => HydroNode::ChainFirst {
2867                first: Box::new(first.deep_clone(seen_tees)),
2868                second: Box::new(second.deep_clone(seen_tees)),
2869                metadata: metadata.clone(),
2870            },
2871            HydroNode::CrossProduct {
2872                left,
2873                right,
2874                metadata,
2875            } => HydroNode::CrossProduct {
2876                left: Box::new(left.deep_clone(seen_tees)),
2877                right: Box::new(right.deep_clone(seen_tees)),
2878                metadata: metadata.clone(),
2879            },
2880            HydroNode::CrossSingleton {
2881                left,
2882                right,
2883                metadata,
2884            } => HydroNode::CrossSingleton {
2885                left: Box::new(left.deep_clone(seen_tees)),
2886                right: Box::new(right.deep_clone(seen_tees)),
2887                metadata: metadata.clone(),
2888            },
2889            HydroNode::Join {
2890                left,
2891                right,
2892                metadata,
2893            } => HydroNode::Join {
2894                left: Box::new(left.deep_clone(seen_tees)),
2895                right: Box::new(right.deep_clone(seen_tees)),
2896                metadata: metadata.clone(),
2897            },
2898            HydroNode::JoinHalf {
2899                left,
2900                right,
2901                metadata,
2902            } => HydroNode::JoinHalf {
2903                left: Box::new(left.deep_clone(seen_tees)),
2904                right: Box::new(right.deep_clone(seen_tees)),
2905                metadata: metadata.clone(),
2906            },
2907            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2908                pos: Box::new(pos.deep_clone(seen_tees)),
2909                neg: Box::new(neg.deep_clone(seen_tees)),
2910                metadata: metadata.clone(),
2911            },
2912            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2913                pos: Box::new(pos.deep_clone(seen_tees)),
2914                neg: Box::new(neg.deep_clone(seen_tees)),
2915                metadata: metadata.clone(),
2916            },
2917            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2918                input: Box::new(input.deep_clone(seen_tees)),
2919                metadata: metadata.clone(),
2920            },
2921            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2922                HydroNode::ResolveFuturesBlocking {
2923                    input: Box::new(input.deep_clone(seen_tees)),
2924                    metadata: metadata.clone(),
2925                }
2926            }
2927            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2928                HydroNode::ResolveFuturesOrdered {
2929                    input: Box::new(input.deep_clone(seen_tees)),
2930                    metadata: metadata.clone(),
2931                }
2932            }
2933            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2934                f: f.deep_clone(seen_tees),
2935                input: Box::new(input.deep_clone(seen_tees)),
2936                metadata: metadata.clone(),
2937            },
2938            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2939                f: f.deep_clone(seen_tees),
2940                input: Box::new(input.deep_clone(seen_tees)),
2941                metadata: metadata.clone(),
2942            },
2943            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2944                HydroNode::FlatMapStreamBlocking {
2945                    f: f.deep_clone(seen_tees),
2946                    input: Box::new(input.deep_clone(seen_tees)),
2947                    metadata: metadata.clone(),
2948                }
2949            }
2950            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2951                f: f.deep_clone(seen_tees),
2952                input: Box::new(input.deep_clone(seen_tees)),
2953                metadata: metadata.clone(),
2954            },
2955            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2956                f: f.deep_clone(seen_tees),
2957                input: Box::new(input.deep_clone(seen_tees)),
2958                metadata: metadata.clone(),
2959            },
2960            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2961                input: Box::new(input.deep_clone(seen_tees)),
2962                metadata: metadata.clone(),
2963            },
2964            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2965                input: Box::new(input.deep_clone(seen_tees)),
2966                metadata: metadata.clone(),
2967            },
2968            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2969                f: f.deep_clone(seen_tees),
2970                input: Box::new(input.deep_clone(seen_tees)),
2971                metadata: metadata.clone(),
2972            },
2973            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2974                input: Box::new(input.deep_clone(seen_tees)),
2975                metadata: metadata.clone(),
2976            },
2977            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2978                input: Box::new(input.deep_clone(seen_tees)),
2979                metadata: metadata.clone(),
2980            },
2981            HydroNode::Fold {
2982                init,
2983                acc,
2984                input,
2985                metadata,
2986            } => HydroNode::Fold {
2987                init: init.deep_clone(seen_tees),
2988                acc: acc.deep_clone(seen_tees),
2989                input: Box::new(input.deep_clone(seen_tees)),
2990                metadata: metadata.clone(),
2991            },
2992            HydroNode::Scan {
2993                init,
2994                acc,
2995                input,
2996                metadata,
2997            } => HydroNode::Scan {
2998                init: init.deep_clone(seen_tees),
2999                acc: acc.deep_clone(seen_tees),
3000                input: Box::new(input.deep_clone(seen_tees)),
3001                metadata: metadata.clone(),
3002            },
3003            HydroNode::ScanAsyncBlocking {
3004                init,
3005                acc,
3006                input,
3007                metadata,
3008            } => HydroNode::ScanAsyncBlocking {
3009                init: init.deep_clone(seen_tees),
3010                acc: acc.deep_clone(seen_tees),
3011                input: Box::new(input.deep_clone(seen_tees)),
3012                metadata: metadata.clone(),
3013            },
3014            HydroNode::FoldKeyed {
3015                init,
3016                acc,
3017                input,
3018                metadata,
3019            } => HydroNode::FoldKeyed {
3020                init: init.deep_clone(seen_tees),
3021                acc: acc.deep_clone(seen_tees),
3022                input: Box::new(input.deep_clone(seen_tees)),
3023                metadata: metadata.clone(),
3024            },
3025            HydroNode::ReduceKeyedWatermark {
3026                f,
3027                input,
3028                watermark,
3029                metadata,
3030            } => HydroNode::ReduceKeyedWatermark {
3031                f: f.deep_clone(seen_tees),
3032                input: Box::new(input.deep_clone(seen_tees)),
3033                watermark: Box::new(watermark.deep_clone(seen_tees)),
3034                metadata: metadata.clone(),
3035            },
3036            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3037                f: f.deep_clone(seen_tees),
3038                input: Box::new(input.deep_clone(seen_tees)),
3039                metadata: metadata.clone(),
3040            },
3041            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3042                f: f.deep_clone(seen_tees),
3043                input: Box::new(input.deep_clone(seen_tees)),
3044                metadata: metadata.clone(),
3045            },
3046            HydroNode::Network {
3047                name,
3048                networking_info,
3049                serialize_fn,
3050                instantiate_fn,
3051                deserialize_fn,
3052                input,
3053                metadata,
3054            } => HydroNode::Network {
3055                name: name.clone(),
3056                networking_info: networking_info.clone(),
3057                serialize_fn: serialize_fn.clone(),
3058                instantiate_fn: instantiate_fn.clone(),
3059                deserialize_fn: deserialize_fn.clone(),
3060                input: Box::new(input.deep_clone(seen_tees)),
3061                metadata: metadata.clone(),
3062            },
3063            HydroNode::ExternalInput {
3064                from_external_key,
3065                from_port_id,
3066                from_many,
3067                codec_type,
3068                port_hint,
3069                instantiate_fn,
3070                deserialize_fn,
3071                metadata,
3072            } => HydroNode::ExternalInput {
3073                from_external_key: *from_external_key,
3074                from_port_id: *from_port_id,
3075                from_many: *from_many,
3076                codec_type: codec_type.clone(),
3077                port_hint: *port_hint,
3078                instantiate_fn: instantiate_fn.clone(),
3079                deserialize_fn: deserialize_fn.clone(),
3080                metadata: metadata.clone(),
3081            },
3082            HydroNode::Counter {
3083                tag,
3084                duration,
3085                prefix,
3086                input,
3087                metadata,
3088            } => HydroNode::Counter {
3089                tag: tag.clone(),
3090                duration: duration.clone(),
3091                prefix: prefix.clone(),
3092                input: Box::new(input.deep_clone(seen_tees)),
3093                metadata: metadata.clone(),
3094            },
3095        }
3096    }
3097
3098    #[cfg(feature = "build")]
3099    pub fn emit_core(
3100        &mut self,
3101        builders_or_callback: &mut BuildersOrCallback<
3102            impl FnMut(&mut HydroRoot, &mut StmtId),
3103            impl FnMut(&mut HydroNode, &mut StmtId),
3104        >,
3105        seen_tees: &mut SeenSharedNodes,
3106        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3107        next_stmt_id: &mut StmtId,
3108        fold_hooked_idents: &mut HashSet<String>,
3109    ) -> syn::Ident {
3110        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3111
3112        self.transform_bottom_up(
3113            &mut |node: &mut HydroNode| {
3114                let out_location = node.metadata().location_id.clone();
3115                match node {
3116                    HydroNode::Placeholder => {
3117                        panic!()
3118                    }
3119
3120                    HydroNode::Cast { .. } => {
3121                        // Cast passes through the input ident unchanged
3122                        // The input ident is already on the stack from processing the child
3123                        match builders_or_callback {
3124                            BuildersOrCallback::Builders(_) => {}
3125                            BuildersOrCallback::Callback(_, node_callback) => {
3126                                node_callback(node, next_stmt_id);
3127                            }
3128                        }
3129
3130                        let _ = next_stmt_id.get_and_increment();
3131                        // input_ident stays on stack as output
3132                    }
3133
3134                    HydroNode::UnboundSingleton { .. } => {
3135                        let inner_ident = ident_stack.pop().unwrap();
3136
3137                        let out_ident =
3138                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3139
3140                        match builders_or_callback {
3141                            BuildersOrCallback::Builders(graph_builders) => {
3142                                if graph_builders.singleton_intermediates() {
3143                                    let builder = graph_builders.get_dfir_mut(&out_location);
3144                                    builder.add_dfir(
3145                                        parse_quote! {
3146                                            #out_ident = #inner_ident;
3147                                        },
3148                                        None,
3149                                        None,
3150                                    );
3151                                } else {
3152                                    let builder = graph_builders.get_dfir_mut(&out_location);
3153                                    builder.add_dfir(
3154                                        parse_quote! {
3155                                            #out_ident = #inner_ident -> persist::<'static>();
3156                                        },
3157                                        None,
3158                                        None,
3159                                    );
3160                                }
3161                            }
3162                            BuildersOrCallback::Callback(_, node_callback) => {
3163                                node_callback(node, next_stmt_id);
3164                            }
3165                        }
3166
3167                        let _ = next_stmt_id.get_and_increment();
3168
3169                        ident_stack.push(out_ident);
3170                    }
3171
3172                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3173                        let inner_ident = ident_stack.pop().unwrap();
3174
3175                        let out_ident =
3176                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3177
3178                        match builders_or_callback {
3179                            BuildersOrCallback::Builders(graph_builders) => {
3180                                graph_builders.assert_is_consistent(
3181                                    *trusted,
3182                                    &inner.metadata().location_id,
3183                                    inner_ident,
3184                                    &out_ident,
3185                                );
3186                            }
3187                            BuildersOrCallback::Callback(_, node_callback) => {
3188                                node_callback(node, next_stmt_id);
3189                            }
3190                        }
3191
3192                        let _ = next_stmt_id.get_and_increment();
3193
3194                        ident_stack.push(out_ident);
3195                    }
3196
3197                    HydroNode::ObserveNonDet {
3198                        inner,
3199                        trusted,
3200                        metadata,
3201                        ..
3202                    } => {
3203                        let inner_ident = ident_stack.pop().unwrap();
3204
3205                        let observe_ident =
3206                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3207
3208                        match builders_or_callback {
3209                            BuildersOrCallback::Builders(graph_builders) => {
3210                                graph_builders.observe_nondet(
3211                                    *trusted,
3212                                    &inner.metadata().location_id,
3213                                    inner_ident,
3214                                    &inner.metadata().collection_kind,
3215                                    &observe_ident,
3216                                    &metadata.collection_kind,
3217                                    &metadata.op,
3218                                );
3219                            }
3220                            BuildersOrCallback::Callback(_, node_callback) => {
3221                                node_callback(node, next_stmt_id);
3222                            }
3223                        }
3224
3225                        let _ = next_stmt_id.get_and_increment();
3226
3227                        ident_stack.push(observe_ident);
3228                    }
3229
3230                    HydroNode::Batch {
3231                        inner, metadata, ..
3232                    } => {
3233                        let inner_ident = ident_stack.pop().unwrap();
3234
3235                        let batch_ident =
3236                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3237
3238                        match builders_or_callback {
3239                            BuildersOrCallback::Builders(graph_builders) => {
3240                                graph_builders.batch(
3241                                    inner_ident,
3242                                    &inner.metadata().location_id,
3243                                    &inner.metadata().collection_kind,
3244                                    &batch_ident,
3245                                    &out_location,
3246                                    &metadata.op,
3247                                    fold_hooked_idents,
3248                                );
3249                            }
3250                            BuildersOrCallback::Callback(_, node_callback) => {
3251                                node_callback(node, next_stmt_id);
3252                            }
3253                        }
3254
3255                        let _ = next_stmt_id.get_and_increment();
3256
3257                        ident_stack.push(batch_ident);
3258                    }
3259
3260                    HydroNode::YieldConcat { inner, .. } => {
3261                        let inner_ident = ident_stack.pop().unwrap();
3262
3263                        let yield_ident =
3264                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3265
3266                        match builders_or_callback {
3267                            BuildersOrCallback::Builders(graph_builders) => {
3268                                graph_builders.yield_from_tick(
3269                                    inner_ident,
3270                                    &inner.metadata().location_id,
3271                                    &inner.metadata().collection_kind,
3272                                    &yield_ident,
3273                                    &out_location,
3274                                );
3275                            }
3276                            BuildersOrCallback::Callback(_, node_callback) => {
3277                                node_callback(node, next_stmt_id);
3278                            }
3279                        }
3280
3281                        let _ = next_stmt_id.get_and_increment();
3282
3283                        ident_stack.push(yield_ident);
3284                    }
3285
3286                    HydroNode::BeginAtomic { inner, metadata } => {
3287                        let inner_ident = ident_stack.pop().unwrap();
3288
3289                        let begin_ident =
3290                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3291
3292                        match builders_or_callback {
3293                            BuildersOrCallback::Builders(graph_builders) => {
3294                                graph_builders.begin_atomic(
3295                                    inner_ident,
3296                                    &inner.metadata().location_id,
3297                                    &inner.metadata().collection_kind,
3298                                    &begin_ident,
3299                                    &out_location,
3300                                    &metadata.op,
3301                                );
3302                            }
3303                            BuildersOrCallback::Callback(_, node_callback) => {
3304                                node_callback(node, next_stmt_id);
3305                            }
3306                        }
3307
3308                        let _ = next_stmt_id.get_and_increment();
3309
3310                        ident_stack.push(begin_ident);
3311                    }
3312
3313                    HydroNode::EndAtomic { inner, .. } => {
3314                        let inner_ident = ident_stack.pop().unwrap();
3315
3316                        let end_ident =
3317                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3318
3319                        match builders_or_callback {
3320                            BuildersOrCallback::Builders(graph_builders) => {
3321                                graph_builders.end_atomic(
3322                                    inner_ident,
3323                                    &inner.metadata().location_id,
3324                                    &inner.metadata().collection_kind,
3325                                    &end_ident,
3326                                );
3327                            }
3328                            BuildersOrCallback::Callback(_, node_callback) => {
3329                                node_callback(node, next_stmt_id);
3330                            }
3331                        }
3332
3333                        let _ = next_stmt_id.get_and_increment();
3334
3335                        ident_stack.push(end_ident);
3336                    }
3337
3338                    HydroNode::Source {
3339                        source, metadata, ..
3340                    } => {
3341                        if let HydroSource::ExternalNetwork() = source {
3342                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3343                        } else {
3344                            let source_ident =
3345                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3346
3347                            let source_stmt = match source {
3348                                HydroSource::Stream(expr) => {
3349                                    debug_assert!(metadata.location_id.is_top_level());
3350                                    parse_quote! {
3351                                        #source_ident = source_stream(#expr);
3352                                    }
3353                                }
3354
3355                                HydroSource::ExternalNetwork() => {
3356                                    unreachable!()
3357                                }
3358
3359                                HydroSource::Iter(expr) => {
3360                                    if metadata.location_id.is_top_level() {
3361                                        parse_quote! {
3362                                            #source_ident = source_iter(#expr);
3363                                        }
3364                                    } else {
3365                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3366                                        parse_quote! {
3367                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3368                                        }
3369                                    }
3370                                }
3371
3372                                HydroSource::Spin() => {
3373                                    debug_assert!(metadata.location_id.is_top_level());
3374                                    parse_quote! {
3375                                        #source_ident = spin();
3376                                    }
3377                                }
3378
3379                                HydroSource::ClusterMembers(target_loc, state) => {
3380                                    debug_assert!(metadata.location_id.is_top_level());
3381
3382                                    let members_tee_ident = syn::Ident::new(
3383                                        &format!(
3384                                            "__cluster_members_tee_{}_{}",
3385                                            metadata.location_id.root().key(),
3386                                            target_loc.key(),
3387                                        ),
3388                                        Span::call_site(),
3389                                    );
3390
3391                                    match state {
3392                                        ClusterMembersState::Stream(d) => {
3393                                            parse_quote! {
3394                                                #members_tee_ident = source_stream(#d) -> tee();
3395                                                #source_ident = #members_tee_ident;
3396                                            }
3397                                        },
3398                                        ClusterMembersState::Uninit => syn::parse_quote! {
3399                                            #source_ident = source_stream(DUMMY);
3400                                        },
3401                                        ClusterMembersState::Tee(..) => parse_quote! {
3402                                            #source_ident = #members_tee_ident;
3403                                        },
3404                                    }
3405                                }
3406
3407                                HydroSource::Embedded(ident) => {
3408                                    parse_quote! {
3409                                        #source_ident = source_stream(#ident);
3410                                    }
3411                                }
3412
3413                                HydroSource::EmbeddedSingleton(ident) => {
3414                                    parse_quote! {
3415                                        #source_ident = source_iter([#ident]);
3416                                    }
3417                                }
3418                            };
3419
3420                            match builders_or_callback {
3421                                BuildersOrCallback::Builders(graph_builders) => {
3422                                    let builder = graph_builders.get_dfir_mut(&out_location);
3423                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3424                                }
3425                                BuildersOrCallback::Callback(_, node_callback) => {
3426                                    node_callback(node, next_stmt_id);
3427                                }
3428                            }
3429
3430                            let _ = next_stmt_id.get_and_increment();
3431
3432                            ident_stack.push(source_ident);
3433                        }
3434                    }
3435
3436                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3437                        let source_ident =
3438                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3439
3440                        match builders_or_callback {
3441                            BuildersOrCallback::Builders(graph_builders) => {
3442                                let builder = graph_builders.get_dfir_mut(&out_location);
3443
3444                                if *first_tick_only {
3445                                    assert!(
3446                                        !metadata.location_id.is_top_level(),
3447                                        "first_tick_only SingletonSource must be inside a tick"
3448                                    );
3449                                }
3450
3451                                if *first_tick_only
3452                                    || (metadata.location_id.is_top_level()
3453                                        && metadata.collection_kind.is_bounded())
3454                                {
3455                                    builder.add_dfir(
3456                                        parse_quote! {
3457                                            #source_ident = source_iter([#value]);
3458                                        },
3459                                        None,
3460                                        Some(&next_stmt_id.to_string()),
3461                                    );
3462                                } else {
3463                                    builder.add_dfir(
3464                                        parse_quote! {
3465                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3466                                        },
3467                                        None,
3468                                        Some(&next_stmt_id.to_string()),
3469                                    );
3470                                }
3471                            }
3472                            BuildersOrCallback::Callback(_, node_callback) => {
3473                                node_callback(node, next_stmt_id);
3474                            }
3475                        }
3476
3477                        let _ = next_stmt_id.get_and_increment();
3478
3479                        ident_stack.push(source_ident);
3480                    }
3481
3482                    HydroNode::CycleSource { cycle_id, .. } => {
3483                        let ident = cycle_id.as_ident();
3484
3485                        match builders_or_callback {
3486                            BuildersOrCallback::Builders(_) => {}
3487                            BuildersOrCallback::Callback(_, node_callback) => {
3488                                node_callback(node, next_stmt_id);
3489                            }
3490                        }
3491
3492                        // consume a stmt id even though we did not emit anything so that we can instrument this
3493                        let _ = next_stmt_id.get_and_increment();
3494
3495                        ident_stack.push(ident);
3496                    }
3497
3498                    HydroNode::Tee { inner, .. } => {
3499                        let ret_ident = if let Some(built_idents) =
3500                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3501                        {
3502                            match builders_or_callback {
3503                                BuildersOrCallback::Builders(_) => {}
3504                                BuildersOrCallback::Callback(_, node_callback) => {
3505                                    node_callback(node, next_stmt_id);
3506                                }
3507                            }
3508
3509                            built_idents[0].clone()
3510                        } else {
3511                            // The inner node was already processed by transform_bottom_up,
3512                            // so its ident is on the stack
3513                            let inner_ident = ident_stack.pop().unwrap();
3514
3515                            let tee_ident =
3516                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3517
3518                            built_tees.insert(
3519                                inner.0.as_ref() as *const RefCell<HydroNode>,
3520                                vec![tee_ident.clone()],
3521                            );
3522
3523                            match builders_or_callback {
3524                                BuildersOrCallback::Builders(graph_builders) => {
3525                                    // NOTE: With `forward_ref`, the fold codegen may not have
3526                                    // run yet when we reach this tee, so `fold_hooked_idents`
3527                                    // might not contain the inner ident. In that case we won't
3528                                    // propagate the "hooked" status to the tee and the
3529                                    // downstream singleton batch will use the normal
3530                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3531                                    // This is not a soundness issue: the fallback hook still
3532                                    // produces correct behavior, just with a redundant decision
3533                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3534                                    // fix ordering so forward_ref folds are always processed
3535                                    // before their downstream tees.
3536                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3537                                        fold_hooked_idents.insert(tee_ident.to_string());
3538                                    }
3539                                    let builder = graph_builders.get_dfir_mut(&out_location);
3540                                    builder.add_dfir(
3541                                        parse_quote! {
3542                                            #tee_ident = #inner_ident -> tee();
3543                                        },
3544                                        None,
3545                                        Some(&next_stmt_id.to_string()),
3546                                    );
3547                                }
3548                                BuildersOrCallback::Callback(_, node_callback) => {
3549                                    node_callback(node, next_stmt_id);
3550                                }
3551                            }
3552
3553                            tee_ident
3554                        };
3555
3556                        // we consume a stmt id regardless of if we emit the tee() operator,
3557                        // so that during rewrites we touch all recipients of the tee()
3558
3559                        let _ = next_stmt_id.get_and_increment();
3560                        ident_stack.push(ret_ident);
3561                    }
3562
3563                    HydroNode::Singleton { inner, .. } => {
3564                        let ret_ident = if let Some(built_idents) =
3565                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3566                        {
3567                            built_idents[0].clone()
3568                        } else {
3569                            let inner_ident = ident_stack.pop().unwrap();
3570
3571                            let singleton_ident =
3572                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3573
3574                            built_tees.insert(
3575                                inner.0.as_ref() as *const RefCell<HydroNode>,
3576                                vec![singleton_ident.clone()],
3577                            );
3578
3579                            match builders_or_callback {
3580                                BuildersOrCallback::Builders(graph_builders) => {
3581                                    let builder = graph_builders.get_dfir_mut(&out_location);
3582                                    builder.add_dfir(
3583                                        parse_quote! {
3584                                            #singleton_ident = #inner_ident -> singleton();
3585                                        },
3586                                        None,
3587                                        Some(&next_stmt_id.to_string()),
3588                                    );
3589                                }
3590                                BuildersOrCallback::Callback(_, node_callback) => {
3591                                    node_callback(node, next_stmt_id);
3592                                }
3593                            }
3594
3595                            singleton_ident
3596                        };
3597
3598                        // we consume a stmt id regardless of if we emit the singleton() operator,
3599                        // so that during rewrites we touch all recipients of the singleton()
3600                        let _ = next_stmt_id.get_and_increment();
3601                        ident_stack.push(ret_ident);
3602                    }
3603
3604                    HydroNode::Partition {
3605                        inner, f, is_true, ..
3606                    } => {
3607                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3608                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3609                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3610                            match builders_or_callback {
3611                                BuildersOrCallback::Builders(_) => {}
3612                                BuildersOrCallback::Callback(_, node_callback) => {
3613                                    node_callback(node, next_stmt_id);
3614                                }
3615                            }
3616
3617                            let idx = if is_true { 0 } else { 1 };
3618                            built_idents[idx].clone()
3619                        } else {
3620                            // The inner node was already processed by transform_bottom_up,
3621                            // so its ident is on the stack
3622                            let inner_ident = ident_stack.pop().unwrap();
3623                            let f_tokens = f.emit_tokens(&mut ident_stack);
3624
3625                            let partition_ident = syn::Ident::new(
3626                                &format!("stream_{}_partition", *next_stmt_id),
3627                                Span::call_site(),
3628                            );
3629                            let true_ident = syn::Ident::new(
3630                                &format!("stream_{}_true", *next_stmt_id),
3631                                Span::call_site(),
3632                            );
3633                            let false_ident = syn::Ident::new(
3634                                &format!("stream_{}_false", *next_stmt_id),
3635                                Span::call_site(),
3636                            );
3637
3638                            built_tees.insert(
3639                                ptr,
3640                                vec![true_ident.clone(), false_ident.clone()],
3641                            );
3642
3643                            match builders_or_callback {
3644                                BuildersOrCallback::Builders(graph_builders) => {
3645                                    let builder = graph_builders.get_dfir_mut(&out_location);
3646                                    builder.add_dfir(
3647                                        parse_quote! {
3648                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3649                                            #true_ident = #partition_ident[0];
3650                                            #false_ident = #partition_ident[1];
3651                                        },
3652                                        None,
3653                                        Some(&next_stmt_id.to_string()),
3654                                    );
3655                                }
3656                                BuildersOrCallback::Callback(_, node_callback) => {
3657                                    node_callback(node, next_stmt_id);
3658                                }
3659                            }
3660
3661                            if is_true { true_ident } else { false_ident }
3662                        };
3663
3664                        let _ = next_stmt_id.get_and_increment();
3665                        ident_stack.push(ret_ident);
3666                    }
3667
3668                    HydroNode::Chain { .. } => {
3669                        // Children are processed left-to-right, so second is on top
3670                        let second_ident = ident_stack.pop().unwrap();
3671                        let first_ident = ident_stack.pop().unwrap();
3672
3673                        let chain_ident =
3674                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3675
3676                        match builders_or_callback {
3677                            BuildersOrCallback::Builders(graph_builders) => {
3678                                let builder = graph_builders.get_dfir_mut(&out_location);
3679                                builder.add_dfir(
3680                                    parse_quote! {
3681                                        #chain_ident = chain();
3682                                        #first_ident -> [0]#chain_ident;
3683                                        #second_ident -> [1]#chain_ident;
3684                                    },
3685                                    None,
3686                                    Some(&next_stmt_id.to_string()),
3687                                );
3688                            }
3689                            BuildersOrCallback::Callback(_, node_callback) => {
3690                                node_callback(node, next_stmt_id);
3691                            }
3692                        }
3693
3694                        let _ = next_stmt_id.get_and_increment();
3695
3696                        ident_stack.push(chain_ident);
3697                    }
3698
3699                    HydroNode::MergeOrdered { first, metadata, .. } => {
3700                        let second_ident = ident_stack.pop().unwrap();
3701                        let first_ident = ident_stack.pop().unwrap();
3702
3703                        let merge_ident =
3704                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3705
3706                        match builders_or_callback {
3707                            BuildersOrCallback::Builders(graph_builders) => {
3708                                graph_builders.merge_ordered(
3709                                    &first.metadata().location_id,
3710                                    first_ident,
3711                                    second_ident,
3712                                    &merge_ident,
3713                                    &first.metadata().collection_kind,
3714                                    &metadata.op,
3715                                    Some(&next_stmt_id.to_string()),
3716                                );
3717                            }
3718                            BuildersOrCallback::Callback(_, node_callback) => {
3719                                node_callback(node, next_stmt_id);
3720                            }
3721                        }
3722
3723                        let _ = next_stmt_id.get_and_increment();
3724
3725                        ident_stack.push(merge_ident);
3726                    }
3727
3728                    HydroNode::ChainFirst { .. } => {
3729                        let second_ident = ident_stack.pop().unwrap();
3730                        let first_ident = ident_stack.pop().unwrap();
3731
3732                        let chain_ident =
3733                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3734
3735                        match builders_or_callback {
3736                            BuildersOrCallback::Builders(graph_builders) => {
3737                                let builder = graph_builders.get_dfir_mut(&out_location);
3738                                builder.add_dfir(
3739                                    parse_quote! {
3740                                        #chain_ident = chain_first_n(1);
3741                                        #first_ident -> [0]#chain_ident;
3742                                        #second_ident -> [1]#chain_ident;
3743                                    },
3744                                    None,
3745                                    Some(&next_stmt_id.to_string()),
3746                                );
3747                            }
3748                            BuildersOrCallback::Callback(_, node_callback) => {
3749                                node_callback(node, next_stmt_id);
3750                            }
3751                        }
3752
3753                        let _ = next_stmt_id.get_and_increment();
3754
3755                        ident_stack.push(chain_ident);
3756                    }
3757
3758                    HydroNode::CrossSingleton { right, .. } => {
3759                        let right_ident = ident_stack.pop().unwrap();
3760                        let left_ident = ident_stack.pop().unwrap();
3761
3762                        let cross_ident =
3763                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3764
3765                        match builders_or_callback {
3766                            BuildersOrCallback::Builders(graph_builders) => {
3767                                let builder = graph_builders.get_dfir_mut(&out_location);
3768
3769                                if right.metadata().location_id.is_top_level()
3770                                    && right.metadata().collection_kind.is_bounded()
3771                                {
3772                                    builder.add_dfir(
3773                                        parse_quote! {
3774                                            #cross_ident = cross_singleton::<'static>();
3775                                            #left_ident -> [input]#cross_ident;
3776                                            #right_ident -> [single]#cross_ident;
3777                                        },
3778                                        None,
3779                                        Some(&next_stmt_id.to_string()),
3780                                    );
3781                                } else {
3782                                    builder.add_dfir(
3783                                        parse_quote! {
3784                                            #cross_ident = cross_singleton();
3785                                            #left_ident -> [input]#cross_ident;
3786                                            #right_ident -> [single]#cross_ident;
3787                                        },
3788                                        None,
3789                                        Some(&next_stmt_id.to_string()),
3790                                    );
3791                                }
3792                            }
3793                            BuildersOrCallback::Callback(_, node_callback) => {
3794                                node_callback(node, next_stmt_id);
3795                            }
3796                        }
3797
3798                        let _ = next_stmt_id.get_and_increment();
3799
3800                        ident_stack.push(cross_ident);
3801                    }
3802
3803                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3804                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3805                            parse_quote!(cross_join_multiset)
3806                        } else {
3807                            parse_quote!(join_multiset)
3808                        };
3809
3810                        let (HydroNode::CrossProduct { left, right, .. }
3811                        | HydroNode::Join { left, right, .. }) = node
3812                        else {
3813                            unreachable!()
3814                        };
3815
3816                        let is_top_level = left.metadata().location_id.is_top_level()
3817                            && right.metadata().location_id.is_top_level();
3818                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3819                            quote!('static)
3820                        } else {
3821                            quote!('tick)
3822                        };
3823
3824                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3825                            quote!('static)
3826                        } else {
3827                            quote!('tick)
3828                        };
3829
3830                        let right_ident = ident_stack.pop().unwrap();
3831                        let left_ident = ident_stack.pop().unwrap();
3832
3833                        let stream_ident =
3834                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3835
3836                        match builders_or_callback {
3837                            BuildersOrCallback::Builders(graph_builders) => {
3838                                let builder = graph_builders.get_dfir_mut(&out_location);
3839                                builder.add_dfir(
3840                                    if is_top_level {
3841                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3842                                        // a multiset_delta() to negate the replay behavior
3843                                        parse_quote! {
3844                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3845                                            #left_ident -> [0]#stream_ident;
3846                                            #right_ident -> [1]#stream_ident;
3847                                        }
3848                                    } else {
3849                                        parse_quote! {
3850                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3851                                            #left_ident -> [0]#stream_ident;
3852                                            #right_ident -> [1]#stream_ident;
3853                                        }
3854                                    }
3855                                    ,
3856                                    None,
3857                                    Some(&next_stmt_id.to_string()),
3858                                );
3859                            }
3860                            BuildersOrCallback::Callback(_, node_callback) => {
3861                                node_callback(node, next_stmt_id);
3862                            }
3863                        }
3864
3865                        let _ = next_stmt_id.get_and_increment();
3866
3867                        ident_stack.push(stream_ident);
3868                    }
3869
3870                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3871                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3872                            parse_quote!(difference)
3873                        } else {
3874                            parse_quote!(anti_join)
3875                        };
3876
3877                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3878                            node
3879                        else {
3880                            unreachable!()
3881                        };
3882
3883                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3884                            quote!('static)
3885                        } else {
3886                            quote!('tick)
3887                        };
3888
3889                        let neg_ident = ident_stack.pop().unwrap();
3890                        let pos_ident = ident_stack.pop().unwrap();
3891
3892                        let stream_ident =
3893                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3894
3895                        match builders_or_callback {
3896                            BuildersOrCallback::Builders(graph_builders) => {
3897                                let builder = graph_builders.get_dfir_mut(&out_location);
3898                                builder.add_dfir(
3899                                    parse_quote! {
3900                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3901                                        #pos_ident -> [pos]#stream_ident;
3902                                        #neg_ident -> [neg]#stream_ident;
3903                                    },
3904                                    None,
3905                                    Some(&next_stmt_id.to_string()),
3906                                );
3907                            }
3908                            BuildersOrCallback::Callback(_, node_callback) => {
3909                                node_callback(node, next_stmt_id);
3910                            }
3911                        }
3912
3913                        let _ = next_stmt_id.get_and_increment();
3914
3915                        ident_stack.push(stream_ident);
3916                    }
3917
3918                    HydroNode::JoinHalf { .. } => {
3919                        let HydroNode::JoinHalf { right, .. } = node else {
3920                            unreachable!()
3921                        };
3922
3923                        assert!(
3924                            right.metadata().collection_kind.is_bounded(),
3925                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3926                            right.metadata().collection_kind
3927                        );
3928
3929                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3930                            quote!('static)
3931                        } else {
3932                            quote!('tick)
3933                        };
3934
3935                        let build_ident = ident_stack.pop().unwrap();
3936                        let probe_ident = ident_stack.pop().unwrap();
3937
3938                        let stream_ident =
3939                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3940
3941                        match builders_or_callback {
3942                            BuildersOrCallback::Builders(graph_builders) => {
3943                                let builder = graph_builders.get_dfir_mut(&out_location);
3944                                builder.add_dfir(
3945                                    parse_quote! {
3946                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3947                                        #probe_ident -> [probe]#stream_ident;
3948                                        #build_ident -> [build]#stream_ident;
3949                                    },
3950                                    None,
3951                                    Some(&next_stmt_id.to_string()),
3952                                );
3953                            }
3954                            BuildersOrCallback::Callback(_, node_callback) => {
3955                                node_callback(node, next_stmt_id);
3956                            }
3957                        }
3958
3959                        let _ = next_stmt_id.get_and_increment();
3960
3961                        ident_stack.push(stream_ident);
3962                    }
3963
3964                    HydroNode::ResolveFutures { .. } => {
3965                        let input_ident = ident_stack.pop().unwrap();
3966
3967                        let futures_ident =
3968                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3969
3970                        match builders_or_callback {
3971                            BuildersOrCallback::Builders(graph_builders) => {
3972                                let builder = graph_builders.get_dfir_mut(&out_location);
3973                                builder.add_dfir(
3974                                    parse_quote! {
3975                                        #futures_ident = #input_ident -> resolve_futures();
3976                                    },
3977                                    None,
3978                                    Some(&next_stmt_id.to_string()),
3979                                );
3980                            }
3981                            BuildersOrCallback::Callback(_, node_callback) => {
3982                                node_callback(node, next_stmt_id);
3983                            }
3984                        }
3985
3986                        let _ = next_stmt_id.get_and_increment();
3987
3988                        ident_stack.push(futures_ident);
3989                    }
3990
3991                    HydroNode::ResolveFuturesBlocking { .. } => {
3992                        let input_ident = ident_stack.pop().unwrap();
3993
3994                        let futures_ident =
3995                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3996
3997                        match builders_or_callback {
3998                            BuildersOrCallback::Builders(graph_builders) => {
3999                                let builder = graph_builders.get_dfir_mut(&out_location);
4000                                builder.add_dfir(
4001                                    parse_quote! {
4002                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4003                                    },
4004                                    None,
4005                                    Some(&next_stmt_id.to_string()),
4006                                );
4007                            }
4008                            BuildersOrCallback::Callback(_, node_callback) => {
4009                                node_callback(node, next_stmt_id);
4010                            }
4011                        }
4012
4013                        let _ = next_stmt_id.get_and_increment();
4014
4015                        ident_stack.push(futures_ident);
4016                    }
4017
4018                    HydroNode::ResolveFuturesOrdered { .. } => {
4019                        let input_ident = ident_stack.pop().unwrap();
4020
4021                        let futures_ident =
4022                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4023
4024                        match builders_or_callback {
4025                            BuildersOrCallback::Builders(graph_builders) => {
4026                                let builder = graph_builders.get_dfir_mut(&out_location);
4027                                builder.add_dfir(
4028                                    parse_quote! {
4029                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4030                                    },
4031                                    None,
4032                                    Some(&next_stmt_id.to_string()),
4033                                );
4034                            }
4035                            BuildersOrCallback::Callback(_, node_callback) => {
4036                                node_callback(node, next_stmt_id);
4037                            }
4038                        }
4039
4040                        let _ = next_stmt_id.get_and_increment();
4041
4042                        ident_stack.push(futures_ident);
4043                    }
4044
4045                    HydroNode::Map { f, .. } => {
4046                        // Pop input ident (pushed last by transform_children).
4047                        let input_ident = ident_stack.pop().unwrap();
4048                        let f_tokens = f.emit_tokens(&mut ident_stack);
4049
4050                        let map_ident =
4051                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4052
4053                        match builders_or_callback {
4054                            BuildersOrCallback::Builders(graph_builders) => {
4055                                let builder = graph_builders.get_dfir_mut(&out_location);
4056                                builder.add_dfir(
4057                                    parse_quote! {
4058                                        #map_ident = #input_ident -> map(#f_tokens);
4059                                    },
4060                                    None,
4061                                    Some(&next_stmt_id.to_string()),
4062                                );
4063                            }
4064                            BuildersOrCallback::Callback(_, node_callback) => {
4065                                node_callback(node, next_stmt_id);
4066                            }
4067                        }
4068
4069                        let _ = next_stmt_id.get_and_increment();
4070
4071                        ident_stack.push(map_ident);
4072                    }
4073
4074                    HydroNode::FlatMap { f, .. } => {
4075                        let input_ident = ident_stack.pop().unwrap();
4076                        let f_tokens = f.emit_tokens(&mut ident_stack);
4077
4078                        let flat_map_ident =
4079                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4080
4081                        match builders_or_callback {
4082                            BuildersOrCallback::Builders(graph_builders) => {
4083                                let builder = graph_builders.get_dfir_mut(&out_location);
4084                                builder.add_dfir(
4085                                    parse_quote! {
4086                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4087                                    },
4088                                    None,
4089                                    Some(&next_stmt_id.to_string()),
4090                                );
4091                            }
4092                            BuildersOrCallback::Callback(_, node_callback) => {
4093                                node_callback(node, next_stmt_id);
4094                            }
4095                        }
4096
4097                        let _ = next_stmt_id.get_and_increment();
4098
4099                        ident_stack.push(flat_map_ident);
4100                    }
4101
4102                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4103                        let input_ident = ident_stack.pop().unwrap();
4104                        let f_tokens = f.emit_tokens(&mut ident_stack);
4105
4106                        let flat_map_stream_blocking_ident =
4107                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4108
4109                        match builders_or_callback {
4110                            BuildersOrCallback::Builders(graph_builders) => {
4111                                let builder = graph_builders.get_dfir_mut(&out_location);
4112                                builder.add_dfir(
4113                                    parse_quote! {
4114                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4115                                    },
4116                                    None,
4117                                    Some(&next_stmt_id.to_string()),
4118                                );
4119                            }
4120                            BuildersOrCallback::Callback(_, node_callback) => {
4121                                node_callback(node, next_stmt_id);
4122                            }
4123                        }
4124
4125                        let _ = next_stmt_id.get_and_increment();
4126
4127                        ident_stack.push(flat_map_stream_blocking_ident);
4128                    }
4129
4130                    HydroNode::Filter { f, .. } => {
4131                        let input_ident = ident_stack.pop().unwrap();
4132                        let f_tokens = f.emit_tokens(&mut ident_stack);
4133
4134                        let filter_ident =
4135                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4136
4137                        match builders_or_callback {
4138                            BuildersOrCallback::Builders(graph_builders) => {
4139                                let builder = graph_builders.get_dfir_mut(&out_location);
4140                                builder.add_dfir(
4141                                    parse_quote! {
4142                                        #filter_ident = #input_ident -> filter(#f_tokens);
4143                                    },
4144                                    None,
4145                                    Some(&next_stmt_id.to_string()),
4146                                );
4147                            }
4148                            BuildersOrCallback::Callback(_, node_callback) => {
4149                                node_callback(node, next_stmt_id);
4150                            }
4151                        }
4152
4153                        let _ = next_stmt_id.get_and_increment();
4154
4155                        ident_stack.push(filter_ident);
4156                    }
4157
4158                    HydroNode::FilterMap { f, .. } => {
4159                        let input_ident = ident_stack.pop().unwrap();
4160                        let f_tokens = f.emit_tokens(&mut ident_stack);
4161
4162                        let filter_map_ident =
4163                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4164
4165                        match builders_or_callback {
4166                            BuildersOrCallback::Builders(graph_builders) => {
4167                                let builder = graph_builders.get_dfir_mut(&out_location);
4168                                builder.add_dfir(
4169                                    parse_quote! {
4170                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4171                                    },
4172                                    None,
4173                                    Some(&next_stmt_id.to_string()),
4174                                );
4175                            }
4176                            BuildersOrCallback::Callback(_, node_callback) => {
4177                                node_callback(node, next_stmt_id);
4178                            }
4179                        }
4180
4181                        let _ = next_stmt_id.get_and_increment();
4182
4183                        ident_stack.push(filter_map_ident);
4184                    }
4185
4186                    HydroNode::Sort { .. } => {
4187                        let input_ident = ident_stack.pop().unwrap();
4188
4189                        let sort_ident =
4190                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4191
4192                        match builders_or_callback {
4193                            BuildersOrCallback::Builders(graph_builders) => {
4194                                let builder = graph_builders.get_dfir_mut(&out_location);
4195                                builder.add_dfir(
4196                                    parse_quote! {
4197                                        #sort_ident = #input_ident -> sort();
4198                                    },
4199                                    None,
4200                                    Some(&next_stmt_id.to_string()),
4201                                );
4202                            }
4203                            BuildersOrCallback::Callback(_, node_callback) => {
4204                                node_callback(node, next_stmt_id);
4205                            }
4206                        }
4207
4208                        let _ = next_stmt_id.get_and_increment();
4209
4210                        ident_stack.push(sort_ident);
4211                    }
4212
4213                    HydroNode::DeferTick { .. } => {
4214                        let input_ident = ident_stack.pop().unwrap();
4215
4216                        let defer_tick_ident =
4217                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4218
4219                        match builders_or_callback {
4220                            BuildersOrCallback::Builders(graph_builders) => {
4221                                let builder = graph_builders.get_dfir_mut(&out_location);
4222                                builder.add_dfir(
4223                                    parse_quote! {
4224                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4225                                    },
4226                                    None,
4227                                    Some(&next_stmt_id.to_string()),
4228                                );
4229                            }
4230                            BuildersOrCallback::Callback(_, node_callback) => {
4231                                node_callback(node, next_stmt_id);
4232                            }
4233                        }
4234
4235                        let _ = next_stmt_id.get_and_increment();
4236
4237                        ident_stack.push(defer_tick_ident);
4238                    }
4239
4240                    HydroNode::Enumerate { input, .. } => {
4241                        let input_ident = ident_stack.pop().unwrap();
4242
4243                        let enumerate_ident =
4244                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4245
4246                        match builders_or_callback {
4247                            BuildersOrCallback::Builders(graph_builders) => {
4248                                let builder = graph_builders.get_dfir_mut(&out_location);
4249                                let lifetime = if input.metadata().location_id.is_top_level() {
4250                                    quote!('static)
4251                                } else {
4252                                    quote!('tick)
4253                                };
4254                                builder.add_dfir(
4255                                    parse_quote! {
4256                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4257                                    },
4258                                    None,
4259                                    Some(&next_stmt_id.to_string()),
4260                                );
4261                            }
4262                            BuildersOrCallback::Callback(_, node_callback) => {
4263                                node_callback(node, next_stmt_id);
4264                            }
4265                        }
4266
4267                        let _ = next_stmt_id.get_and_increment();
4268
4269                        ident_stack.push(enumerate_ident);
4270                    }
4271
4272                    HydroNode::Inspect { f, .. } => {
4273                        let input_ident = ident_stack.pop().unwrap();
4274                        let f_tokens = f.emit_tokens(&mut ident_stack);
4275
4276                        let inspect_ident =
4277                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4278
4279                        match builders_or_callback {
4280                            BuildersOrCallback::Builders(graph_builders) => {
4281                                let builder = graph_builders.get_dfir_mut(&out_location);
4282                                builder.add_dfir(
4283                                    parse_quote! {
4284                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4285                                    },
4286                                    None,
4287                                    Some(&next_stmt_id.to_string()),
4288                                );
4289                            }
4290                            BuildersOrCallback::Callback(_, node_callback) => {
4291                                node_callback(node, next_stmt_id);
4292                            }
4293                        }
4294
4295                        let _ = next_stmt_id.get_and_increment();
4296
4297                        ident_stack.push(inspect_ident);
4298                    }
4299
4300                    HydroNode::Unique { input, .. } => {
4301                        let input_ident = ident_stack.pop().unwrap();
4302
4303                        let unique_ident =
4304                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4305
4306                        match builders_or_callback {
4307                            BuildersOrCallback::Builders(graph_builders) => {
4308                                let builder = graph_builders.get_dfir_mut(&out_location);
4309                                let lifetime = if input.metadata().location_id.is_top_level() {
4310                                    quote!('static)
4311                                } else {
4312                                    quote!('tick)
4313                                };
4314
4315                                builder.add_dfir(
4316                                    parse_quote! {
4317                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4318                                    },
4319                                    None,
4320                                    Some(&next_stmt_id.to_string()),
4321                                );
4322                            }
4323                            BuildersOrCallback::Callback(_, node_callback) => {
4324                                node_callback(node, next_stmt_id);
4325                            }
4326                        }
4327
4328                        let _ = next_stmt_id.get_and_increment();
4329
4330                        ident_stack.push(unique_ident);
4331                    }
4332
4333                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4334                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4335                            if input.metadata().location_id.is_top_level()
4336                                && input.metadata().collection_kind.is_bounded()
4337                            {
4338                                parse_quote!(fold_no_replay)
4339                            } else {
4340                                parse_quote!(fold)
4341                            }
4342                        } else if matches!(node, HydroNode::Scan { .. }) {
4343                            parse_quote!(scan)
4344                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4345                            parse_quote!(scan_async_blocking)
4346                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4347                            if input.metadata().location_id.is_top_level()
4348                                && input.metadata().collection_kind.is_bounded()
4349                            {
4350                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4351                            } else {
4352                                parse_quote!(fold_keyed)
4353                            }
4354                        } else {
4355                            unreachable!()
4356                        };
4357
4358                        let (HydroNode::Fold { input, .. }
4359                        | HydroNode::FoldKeyed { input, .. }
4360                        | HydroNode::Scan { input, .. }
4361                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4362                        else {
4363                            unreachable!()
4364                        };
4365
4366                        let lifetime = if input.metadata().location_id.is_top_level() {
4367                            quote!('static)
4368                        } else {
4369                            quote!('tick)
4370                        };
4371
4372                        let input_ident = ident_stack.pop().unwrap();
4373
4374                        let (HydroNode::Fold { init, acc, .. }
4375                        | HydroNode::FoldKeyed { init, acc, .. }
4376                        | HydroNode::Scan { init, acc, .. }
4377                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4378                        else {
4379                            unreachable!()
4380                        };
4381
4382                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4383                        let init_tokens = init.emit_tokens(&mut ident_stack);
4384
4385                        let fold_ident =
4386                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4387
4388                        match builders_or_callback {
4389                            BuildersOrCallback::Builders(graph_builders) => {
4390                                if matches!(node, HydroNode::Fold { .. })
4391                                    && node.metadata().location_id.is_top_level()
4392                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4393                                    && graph_builders.singleton_intermediates()
4394                                    && !node.metadata().collection_kind.is_bounded()
4395                                {
4396                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4397                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4398                                        &input.metadata().location_id,
4399                                        &input_ident,
4400                                        &input.metadata().collection_kind,
4401                                        &node.metadata().op,
4402                                    );
4403
4404                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4405                                        let acc: syn::Expr = parse_quote!({
4406                                            let mut __inner = #acc_tokens;
4407                                            move |__state, __batch: Vec<_>| {
4408                                                if __batch.is_empty() {
4409                                                    return None;
4410                                                }
4411                                                for __value in __batch {
4412                                                    __inner(__state, __value);
4413                                                }
4414                                                Some(__state.clone())
4415                                            }
4416                                        });
4417                                        (hooked, acc)
4418                                    } else {
4419                                        let acc: syn::Expr = parse_quote!({
4420                                            let mut __inner = #acc_tokens;
4421                                            move |__state, __value| {
4422                                                __inner(__state, __value);
4423                                                Some(__state.clone())
4424                                            }
4425                                        });
4426                                        (&input_ident, acc)
4427                                    };
4428
4429                                    let builder = graph_builders.get_dfir_mut(&out_location);
4430                                    builder.add_dfir(
4431                                        parse_quote! {
4432                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4433                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4434                                            #fold_ident = chain();
4435                                        },
4436                                        None,
4437                                        Some(&next_stmt_id.to_string()),
4438                                    );
4439
4440                                    if hooked_input_ident.is_some() {
4441                                        fold_hooked_idents.insert(fold_ident.to_string());
4442                                    }
4443                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4444                                    && node.metadata().location_id.is_top_level()
4445                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4446                                    && graph_builders.singleton_intermediates()
4447                                    && !node.metadata().collection_kind.is_bounded()
4448                                {
4449                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4450                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4451                                        &input.metadata().location_id,
4452                                        &input_ident,
4453                                        &input.metadata().collection_kind,
4454                                        &node.metadata().op,
4455                                    );
4456                                    let builder = graph_builders.get_dfir_mut(&out_location);
4457
4458                                    let wrapped_acc: syn::Expr = parse_quote!({
4459                                        let mut __init = #init_tokens;
4460                                        let mut __inner = #acc_tokens;
4461                                        move |__state, __kv: (_, _)| {
4462                                            // TODO(shadaj): we can avoid the clone when the entry exists
4463                                            let __state = __state
4464                                                .entry(::std::clone::Clone::clone(&__kv.0))
4465                                                .or_insert_with(|| (__init)());
4466                                            __inner(__state, __kv.1);
4467                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4468                                        }
4469                                    });
4470
4471                                    if let Some(hooked_input_ident) = hooked_input_ident {
4472                                        builder.add_dfir(
4473                                            parse_quote! {
4474                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4475                                            },
4476                                            None,
4477                                            Some(&next_stmt_id.to_string()),
4478                                        );
4479
4480                                        fold_hooked_idents.insert(fold_ident.to_string());
4481                                    } else {
4482                                        builder.add_dfir(
4483                                            parse_quote! {
4484                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4485                                            },
4486                                            None,
4487                                            Some(&next_stmt_id.to_string()),
4488                                        );
4489                                    }
4490                                } else if (matches!(node, HydroNode::Fold { .. })
4491                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4492                                    && !node.metadata().location_id.is_top_level()
4493                                    && graph_builders.singleton_intermediates()
4494                                {
4495                                    let input_ref = match &*node {
4496                                        HydroNode::Fold { input, .. } => input,
4497                                        HydroNode::FoldKeyed { input, .. } => input,
4498                                        _ => unreachable!(),
4499                                    };
4500                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4501                                        &input_ref.metadata().location_id,
4502                                        &input_ident,
4503                                        &input_ref.metadata().collection_kind,
4504                                        &node.metadata().op,
4505                                    );
4506
4507                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4508                                    let builder = graph_builders.get_dfir_mut(&out_location);
4509                                    builder.add_dfir(
4510                                        parse_quote! {
4511                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4512                                        },
4513                                        None,
4514                                        Some(&next_stmt_id.to_string()),
4515                                    );
4516                                } else {
4517                                    let builder = graph_builders.get_dfir_mut(&out_location);
4518                                    builder.add_dfir(
4519                                        parse_quote! {
4520                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4521                                        },
4522                                        None,
4523                                        Some(&next_stmt_id.to_string()),
4524                                    );
4525                                }
4526                            }
4527                            BuildersOrCallback::Callback(_, node_callback) => {
4528                                node_callback(node, next_stmt_id);
4529                            }
4530                        }
4531
4532                        let _ = next_stmt_id.get_and_increment();
4533
4534                        ident_stack.push(fold_ident);
4535                    }
4536
4537                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4538                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4539                            if input.metadata().location_id.is_top_level()
4540                                && input.metadata().collection_kind.is_bounded()
4541                            {
4542                                parse_quote!(reduce_no_replay)
4543                            } else {
4544                                parse_quote!(reduce)
4545                            }
4546                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4547                            if input.metadata().location_id.is_top_level()
4548                                && input.metadata().collection_kind.is_bounded()
4549                            {
4550                                todo!(
4551                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4552                                )
4553                            } else {
4554                                parse_quote!(reduce_keyed)
4555                            }
4556                        } else {
4557                            unreachable!()
4558                        };
4559
4560                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4561                        else {
4562                            unreachable!()
4563                        };
4564
4565                        let lifetime = if input.metadata().location_id.is_top_level() {
4566                            quote!('static)
4567                        } else {
4568                            quote!('tick)
4569                        };
4570
4571                        let input_ident = ident_stack.pop().unwrap();
4572
4573                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4574                        else {
4575                            unreachable!()
4576                        };
4577
4578                        let f_tokens = f.emit_tokens(&mut ident_stack);
4579
4580                        let reduce_ident =
4581                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4582
4583                        match builders_or_callback {
4584                            BuildersOrCallback::Builders(graph_builders) => {
4585                                if matches!(node, HydroNode::Reduce { .. })
4586                                    && node.metadata().location_id.is_top_level()
4587                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4588                                    && graph_builders.singleton_intermediates()
4589                                    && !node.metadata().collection_kind.is_bounded()
4590                                {
4591                                    todo!(
4592                                        "Reduce with optional intermediates is not yet supported in simulator"
4593                                    );
4594                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4595                                    && node.metadata().location_id.is_top_level()
4596                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4597                                    && graph_builders.singleton_intermediates()
4598                                    && !node.metadata().collection_kind.is_bounded()
4599                                {
4600                                    todo!(
4601                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4602                                    );
4603                                } else {
4604                                    let builder = graph_builders.get_dfir_mut(&out_location);
4605                                    builder.add_dfir(
4606                                        parse_quote! {
4607                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4608                                        },
4609                                        None,
4610                                        Some(&next_stmt_id.to_string()),
4611                                    );
4612                                }
4613                            }
4614                            BuildersOrCallback::Callback(_, node_callback) => {
4615                                node_callback(node, next_stmt_id);
4616                            }
4617                        }
4618
4619                        let _ = next_stmt_id.get_and_increment();
4620
4621                        ident_stack.push(reduce_ident);
4622                    }
4623
4624                    HydroNode::ReduceKeyedWatermark {
4625                        f,
4626                        input,
4627                        metadata,
4628                        ..
4629                    } => {
4630                        let lifetime = if input.metadata().location_id.is_top_level() {
4631                            quote!('static)
4632                        } else {
4633                            quote!('tick)
4634                        };
4635
4636                        // watermark is processed second, so it's on top
4637                        let watermark_ident = ident_stack.pop().unwrap();
4638                        let input_ident = ident_stack.pop().unwrap();
4639                        let f_tokens = f.emit_tokens(&mut ident_stack);
4640
4641                        let chain_ident = syn::Ident::new(
4642                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4643                            Span::call_site(),
4644                        );
4645
4646                        let fold_ident =
4647                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4648
4649                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4650                            && input.metadata().collection_kind.is_bounded()
4651                        {
4652                            parse_quote!(fold_no_replay)
4653                        } else {
4654                            parse_quote!(fold)
4655                        };
4656
4657                        match builders_or_callback {
4658                            BuildersOrCallback::Builders(graph_builders) => {
4659                                if metadata.location_id.is_top_level()
4660                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4661                                    && graph_builders.singleton_intermediates()
4662                                    && !metadata.collection_kind.is_bounded()
4663                                {
4664                                    todo!(
4665                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4666                                    )
4667                                } else {
4668                                    let builder = graph_builders.get_dfir_mut(&out_location);
4669                                    builder.add_dfir(
4670                                        parse_quote! {
4671                                            #chain_ident = chain();
4672                                            #input_ident
4673                                                -> map(|x| (Some(x), None))
4674                                                -> [0]#chain_ident;
4675                                            #watermark_ident
4676                                                -> map(|watermark| (None, Some(watermark)))
4677                                                -> [1]#chain_ident;
4678
4679                                            #fold_ident = #chain_ident
4680                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4681                                                    let __reduce_keyed_fn = #f_tokens;
4682                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4683                                                        if let Some((k, v)) = opt_payload {
4684                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4685                                                                if k < curr_watermark {
4686                                                                    return;
4687                                                                }
4688                                                            }
4689                                                            match map.entry(k) {
4690                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4691                                                                    e.insert(v);
4692                                                                }
4693                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4694                                                                    __reduce_keyed_fn(e.get_mut(), v);
4695                                                                }
4696                                                            }
4697                                                        } else {
4698                                                            let watermark = opt_watermark.unwrap();
4699                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4700                                                                if watermark <= curr_watermark {
4701                                                                    return;
4702                                                                }
4703                                                            }
4704                                                            map.retain(|k, _| *k >= watermark);
4705                                                            *opt_curr_watermark = Some(watermark);
4706                                                        }
4707                                                    }
4708                                                })
4709                                                -> flat_map(|(map, _curr_watermark)| map);
4710                                        },
4711                                        None,
4712                                        Some(&next_stmt_id.to_string()),
4713                                    );
4714                                }
4715                            }
4716                            BuildersOrCallback::Callback(_, node_callback) => {
4717                                node_callback(node, next_stmt_id);
4718                            }
4719                        }
4720
4721                        let _ = next_stmt_id.get_and_increment();
4722
4723                        ident_stack.push(fold_ident);
4724                    }
4725
4726                    HydroNode::Network {
4727                        networking_info,
4728                        serialize_fn: serialize_pipeline,
4729                        instantiate_fn,
4730                        deserialize_fn: deserialize_pipeline,
4731                        input,
4732                        ..
4733                    } => {
4734                        let input_ident = ident_stack.pop().unwrap();
4735
4736                        let receiver_stream_ident =
4737                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4738
4739                        match builders_or_callback {
4740                            BuildersOrCallback::Builders(graph_builders) => {
4741                                let (sink_expr, source_expr) = match instantiate_fn {
4742                                    DebugInstantiate::Building => (
4743                                        syn::parse_quote!(DUMMY_SINK),
4744                                        syn::parse_quote!(DUMMY_SOURCE),
4745                                    ),
4746
4747                                    DebugInstantiate::Finalized(finalized) => {
4748                                        (finalized.sink.clone(), finalized.source.clone())
4749                                    }
4750                                };
4751
4752                                graph_builders.create_network(
4753                                    &input.metadata().location_id,
4754                                    &out_location,
4755                                    input_ident,
4756                                    &receiver_stream_ident,
4757                                    serialize_pipeline.as_ref(),
4758                                    sink_expr,
4759                                    source_expr,
4760                                    deserialize_pipeline.as_ref(),
4761                                    *next_stmt_id,
4762                                    networking_info,
4763                                );
4764                            }
4765                            BuildersOrCallback::Callback(_, node_callback) => {
4766                                node_callback(node, next_stmt_id);
4767                            }
4768                        }
4769
4770                        let _ = next_stmt_id.get_and_increment();
4771
4772                        ident_stack.push(receiver_stream_ident);
4773                    }
4774
4775                    HydroNode::ExternalInput {
4776                        instantiate_fn,
4777                        deserialize_fn: deserialize_pipeline,
4778                        ..
4779                    } => {
4780                        let receiver_stream_ident =
4781                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4782
4783                        match builders_or_callback {
4784                            BuildersOrCallback::Builders(graph_builders) => {
4785                                let (_, source_expr) = match instantiate_fn {
4786                                    DebugInstantiate::Building => (
4787                                        syn::parse_quote!(DUMMY_SINK),
4788                                        syn::parse_quote!(DUMMY_SOURCE),
4789                                    ),
4790
4791                                    DebugInstantiate::Finalized(finalized) => {
4792                                        (finalized.sink.clone(), finalized.source.clone())
4793                                    }
4794                                };
4795
4796                                graph_builders.create_external_source(
4797                                    &out_location,
4798                                    source_expr,
4799                                    &receiver_stream_ident,
4800                                    deserialize_pipeline.as_ref(),
4801                                    *next_stmt_id,
4802                                );
4803                            }
4804                            BuildersOrCallback::Callback(_, node_callback) => {
4805                                node_callback(node, next_stmt_id);
4806                            }
4807                        }
4808
4809                        let _ = next_stmt_id.get_and_increment();
4810
4811                        ident_stack.push(receiver_stream_ident);
4812                    }
4813
4814                    HydroNode::Counter {
4815                        tag,
4816                        duration,
4817                        prefix,
4818                        ..
4819                    } => {
4820                        let input_ident = ident_stack.pop().unwrap();
4821
4822                        let counter_ident =
4823                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4824
4825                        match builders_or_callback {
4826                            BuildersOrCallback::Builders(graph_builders) => {
4827                                let arg = format!("{}({})", prefix, tag);
4828                                let builder = graph_builders.get_dfir_mut(&out_location);
4829                                builder.add_dfir(
4830                                    parse_quote! {
4831                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4832                                    },
4833                                    None,
4834                                    Some(&next_stmt_id.to_string()),
4835                                );
4836                            }
4837                            BuildersOrCallback::Callback(_, node_callback) => {
4838                                node_callback(node, next_stmt_id);
4839                            }
4840                        }
4841
4842                        let _ = next_stmt_id.get_and_increment();
4843
4844                        ident_stack.push(counter_ident);
4845                    }
4846                }
4847            },
4848            seen_tees,
4849            false,
4850        );
4851
4852        let ret = ident_stack
4853            .pop()
4854            .expect("ident_stack should have exactly one element after traversal");
4855        assert!(
4856            ident_stack.is_empty(),
4857            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4858             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4859            ident_stack.len()
4860        );
4861        ret
4862    }
4863
4864    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4865        match self {
4866            HydroNode::Placeholder => {
4867                panic!()
4868            }
4869            HydroNode::Cast { .. }
4870            | HydroNode::ObserveNonDet { .. }
4871            | HydroNode::UnboundSingleton { .. }
4872            | HydroNode::AssertIsConsistent { .. } => {}
4873            HydroNode::Source { source, .. } => match source {
4874                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4875                HydroSource::ExternalNetwork()
4876                | HydroSource::Spin()
4877                | HydroSource::ClusterMembers(_, _)
4878                | HydroSource::Embedded(_)
4879                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4880            },
4881            HydroNode::SingletonSource { value, .. } => {
4882                transform(value);
4883            }
4884            HydroNode::CycleSource { .. }
4885            | HydroNode::Tee { .. }
4886            | HydroNode::Singleton { .. }
4887            | HydroNode::YieldConcat { .. }
4888            | HydroNode::BeginAtomic { .. }
4889            | HydroNode::EndAtomic { .. }
4890            | HydroNode::Batch { .. }
4891            | HydroNode::Chain { .. }
4892            | HydroNode::MergeOrdered { .. }
4893            | HydroNode::ChainFirst { .. }
4894            | HydroNode::CrossProduct { .. }
4895            | HydroNode::CrossSingleton { .. }
4896            | HydroNode::ResolveFutures { .. }
4897            | HydroNode::ResolveFuturesBlocking { .. }
4898            | HydroNode::ResolveFuturesOrdered { .. }
4899            | HydroNode::Join { .. }
4900            | HydroNode::JoinHalf { .. }
4901            | HydroNode::Difference { .. }
4902            | HydroNode::AntiJoin { .. }
4903            | HydroNode::DeferTick { .. }
4904            | HydroNode::Enumerate { .. }
4905            | HydroNode::Unique { .. }
4906            | HydroNode::Sort { .. } => {}
4907            HydroNode::Map { f, .. }
4908            | HydroNode::FlatMap { f, .. }
4909            | HydroNode::FlatMapStreamBlocking { f, .. }
4910            | HydroNode::Filter { f, .. }
4911            | HydroNode::FilterMap { f, .. }
4912            | HydroNode::Inspect { f, .. }
4913            | HydroNode::Partition { f, .. }
4914            | HydroNode::Reduce { f, .. }
4915            | HydroNode::ReduceKeyed { f, .. }
4916            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4917                transform(&mut f.expr);
4918            }
4919            HydroNode::Fold { init, acc, .. }
4920            | HydroNode::Scan { init, acc, .. }
4921            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4922            | HydroNode::FoldKeyed { init, acc, .. } => {
4923                transform(&mut init.expr);
4924                transform(&mut acc.expr);
4925            }
4926            HydroNode::Network {
4927                serialize_fn,
4928                deserialize_fn,
4929                ..
4930            } => {
4931                if let Some(serialize_fn) = serialize_fn {
4932                    transform(serialize_fn);
4933                }
4934                if let Some(deserialize_fn) = deserialize_fn {
4935                    transform(deserialize_fn);
4936                }
4937            }
4938            HydroNode::ExternalInput { deserialize_fn, .. } => {
4939                if let Some(deserialize_fn) = deserialize_fn {
4940                    transform(deserialize_fn);
4941                }
4942            }
4943            HydroNode::Counter { duration, .. } => {
4944                transform(duration);
4945            }
4946        }
4947    }
4948
4949    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4950        &self.metadata().op
4951    }
4952
4953    pub fn metadata(&self) -> &HydroIrMetadata {
4954        match self {
4955            HydroNode::Placeholder => {
4956                panic!()
4957            }
4958            HydroNode::Cast { metadata, .. }
4959            | HydroNode::ObserveNonDet { metadata, .. }
4960            | HydroNode::AssertIsConsistent { metadata, .. }
4961            | HydroNode::UnboundSingleton { metadata, .. }
4962            | HydroNode::Source { metadata, .. }
4963            | HydroNode::SingletonSource { metadata, .. }
4964            | HydroNode::CycleSource { metadata, .. }
4965            | HydroNode::Tee { metadata, .. }
4966            | HydroNode::Singleton { metadata, .. }
4967            | HydroNode::Partition { metadata, .. }
4968            | HydroNode::YieldConcat { metadata, .. }
4969            | HydroNode::BeginAtomic { metadata, .. }
4970            | HydroNode::EndAtomic { metadata, .. }
4971            | HydroNode::Batch { metadata, .. }
4972            | HydroNode::Chain { metadata, .. }
4973            | HydroNode::MergeOrdered { metadata, .. }
4974            | HydroNode::ChainFirst { metadata, .. }
4975            | HydroNode::CrossProduct { metadata, .. }
4976            | HydroNode::CrossSingleton { metadata, .. }
4977            | HydroNode::Join { metadata, .. }
4978            | HydroNode::JoinHalf { metadata, .. }
4979            | HydroNode::Difference { metadata, .. }
4980            | HydroNode::AntiJoin { metadata, .. }
4981            | HydroNode::ResolveFutures { metadata, .. }
4982            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4983            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4984            | HydroNode::Map { metadata, .. }
4985            | HydroNode::FlatMap { metadata, .. }
4986            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4987            | HydroNode::Filter { metadata, .. }
4988            | HydroNode::FilterMap { metadata, .. }
4989            | HydroNode::DeferTick { metadata, .. }
4990            | HydroNode::Enumerate { metadata, .. }
4991            | HydroNode::Inspect { metadata, .. }
4992            | HydroNode::Unique { metadata, .. }
4993            | HydroNode::Sort { metadata, .. }
4994            | HydroNode::Scan { metadata, .. }
4995            | HydroNode::ScanAsyncBlocking { metadata, .. }
4996            | HydroNode::Fold { metadata, .. }
4997            | HydroNode::FoldKeyed { metadata, .. }
4998            | HydroNode::Reduce { metadata, .. }
4999            | HydroNode::ReduceKeyed { metadata, .. }
5000            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5001            | HydroNode::ExternalInput { metadata, .. }
5002            | HydroNode::Network { metadata, .. }
5003            | HydroNode::Counter { metadata, .. } => metadata,
5004        }
5005    }
5006
5007    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5008        &mut self.metadata_mut().op
5009    }
5010
5011    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5012        match self {
5013            HydroNode::Placeholder => {
5014                panic!()
5015            }
5016            HydroNode::Cast { metadata, .. }
5017            | HydroNode::ObserveNonDet { metadata, .. }
5018            | HydroNode::AssertIsConsistent { metadata, .. }
5019            | HydroNode::UnboundSingleton { metadata, .. }
5020            | HydroNode::Source { metadata, .. }
5021            | HydroNode::SingletonSource { metadata, .. }
5022            | HydroNode::CycleSource { metadata, .. }
5023            | HydroNode::Tee { metadata, .. }
5024            | HydroNode::Singleton { metadata, .. }
5025            | HydroNode::Partition { metadata, .. }
5026            | HydroNode::YieldConcat { metadata, .. }
5027            | HydroNode::BeginAtomic { metadata, .. }
5028            | HydroNode::EndAtomic { metadata, .. }
5029            | HydroNode::Batch { metadata, .. }
5030            | HydroNode::Chain { metadata, .. }
5031            | HydroNode::MergeOrdered { metadata, .. }
5032            | HydroNode::ChainFirst { metadata, .. }
5033            | HydroNode::CrossProduct { metadata, .. }
5034            | HydroNode::CrossSingleton { metadata, .. }
5035            | HydroNode::Join { metadata, .. }
5036            | HydroNode::JoinHalf { metadata, .. }
5037            | HydroNode::Difference { metadata, .. }
5038            | HydroNode::AntiJoin { metadata, .. }
5039            | HydroNode::ResolveFutures { metadata, .. }
5040            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5041            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5042            | HydroNode::Map { metadata, .. }
5043            | HydroNode::FlatMap { metadata, .. }
5044            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5045            | HydroNode::Filter { metadata, .. }
5046            | HydroNode::FilterMap { metadata, .. }
5047            | HydroNode::DeferTick { metadata, .. }
5048            | HydroNode::Enumerate { metadata, .. }
5049            | HydroNode::Inspect { metadata, .. }
5050            | HydroNode::Unique { metadata, .. }
5051            | HydroNode::Sort { metadata, .. }
5052            | HydroNode::Scan { metadata, .. }
5053            | HydroNode::ScanAsyncBlocking { metadata, .. }
5054            | HydroNode::Fold { metadata, .. }
5055            | HydroNode::FoldKeyed { metadata, .. }
5056            | HydroNode::Reduce { metadata, .. }
5057            | HydroNode::ReduceKeyed { metadata, .. }
5058            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5059            | HydroNode::ExternalInput { metadata, .. }
5060            | HydroNode::Network { metadata, .. }
5061            | HydroNode::Counter { metadata, .. } => metadata,
5062        }
5063    }
5064
5065    pub fn input(&self) -> Vec<&HydroNode> {
5066        match self {
5067            HydroNode::Placeholder => {
5068                panic!()
5069            }
5070            HydroNode::Source { .. }
5071            | HydroNode::SingletonSource { .. }
5072            | HydroNode::ExternalInput { .. }
5073            | HydroNode::CycleSource { .. }
5074            | HydroNode::Tee { .. }
5075            | HydroNode::Singleton { .. }
5076            | HydroNode::Partition { .. } => {
5077                // Tee/Partition should find their input in separate special ways
5078                vec![]
5079            }
5080            HydroNode::Cast { inner, .. }
5081            | HydroNode::ObserveNonDet { inner, .. }
5082            | HydroNode::YieldConcat { inner, .. }
5083            | HydroNode::BeginAtomic { inner, .. }
5084            | HydroNode::EndAtomic { inner, .. }
5085            | HydroNode::Batch { inner, .. }
5086            | HydroNode::UnboundSingleton { inner, .. }
5087            | HydroNode::AssertIsConsistent { inner, .. } => {
5088                vec![inner]
5089            }
5090            HydroNode::Chain { first, second, .. } => {
5091                vec![first, second]
5092            }
5093            HydroNode::MergeOrdered { first, second, .. } => {
5094                vec![first, second]
5095            }
5096            HydroNode::ChainFirst { first, second, .. } => {
5097                vec![first, second]
5098            }
5099            HydroNode::CrossProduct { left, right, .. }
5100            | HydroNode::CrossSingleton { left, right, .. }
5101            | HydroNode::Join { left, right, .. }
5102            | HydroNode::JoinHalf { left, right, .. } => {
5103                vec![left, right]
5104            }
5105            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5106                vec![pos, neg]
5107            }
5108            HydroNode::Map { input, .. }
5109            | HydroNode::FlatMap { input, .. }
5110            | HydroNode::FlatMapStreamBlocking { input, .. }
5111            | HydroNode::Filter { input, .. }
5112            | HydroNode::FilterMap { input, .. }
5113            | HydroNode::Sort { input, .. }
5114            | HydroNode::DeferTick { input, .. }
5115            | HydroNode::Enumerate { input, .. }
5116            | HydroNode::Inspect { input, .. }
5117            | HydroNode::Unique { input, .. }
5118            | HydroNode::Network { input, .. }
5119            | HydroNode::Counter { input, .. }
5120            | HydroNode::ResolveFutures { input, .. }
5121            | HydroNode::ResolveFuturesBlocking { input, .. }
5122            | HydroNode::ResolveFuturesOrdered { input, .. }
5123            | HydroNode::Fold { input, .. }
5124            | HydroNode::FoldKeyed { input, .. }
5125            | HydroNode::Reduce { input, .. }
5126            | HydroNode::ReduceKeyed { input, .. }
5127            | HydroNode::Scan { input, .. }
5128            | HydroNode::ScanAsyncBlocking { input, .. } => {
5129                vec![input]
5130            }
5131            HydroNode::ReduceKeyedWatermark {
5132                input, watermark, ..
5133            } => {
5134                vec![input, watermark]
5135            }
5136        }
5137    }
5138
5139    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5140        self.input()
5141            .iter()
5142            .map(|input_node| input_node.metadata())
5143            .collect()
5144    }
5145
5146    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5147    /// has other live references, meaning the upstream is already driven
5148    /// by another consumer and does not need a Null sink.
5149    pub fn is_shared_with_others(&self) -> bool {
5150        match self {
5151            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5152                Rc::strong_count(&inner.0) > 1
5153            }
5154            // A zero-output singleton() is valid in DFIR (it drains itself at
5155            // end of tick), so it doesn't need to be driven by another consumer.
5156            HydroNode::Singleton { .. } => false,
5157            _ => false,
5158        }
5159    }
5160
5161    pub fn print_root(&self) -> String {
5162        match self {
5163            HydroNode::Placeholder => {
5164                panic!()
5165            }
5166            HydroNode::Cast { .. } => "Cast()".to_owned(),
5167            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5168            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5169            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5170            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5171            HydroNode::SingletonSource {
5172                value,
5173                first_tick_only,
5174                ..
5175            } => format!(
5176                "SingletonSource({:?}, first_tick_only={})",
5177                value, first_tick_only
5178            ),
5179            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5180            HydroNode::Tee { inner, .. } => {
5181                format!("Tee({})", inner.0.borrow().print_root())
5182            }
5183            HydroNode::Singleton { inner, .. } => {
5184                format!("Singleton({})", inner.0.borrow().print_root())
5185            }
5186            HydroNode::Partition { f, is_true, .. } => {
5187                format!("Partition({:?}, is_true={})", f, is_true)
5188            }
5189            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5190            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5191            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5192            HydroNode::Batch { .. } => "Batch()".to_owned(),
5193            HydroNode::Chain { first, second, .. } => {
5194                format!("Chain({}, {})", first.print_root(), second.print_root())
5195            }
5196            HydroNode::MergeOrdered { first, second, .. } => {
5197                format!(
5198                    "MergeOrdered({}, {})",
5199                    first.print_root(),
5200                    second.print_root()
5201                )
5202            }
5203            HydroNode::ChainFirst { first, second, .. } => {
5204                format!(
5205                    "ChainFirst({}, {})",
5206                    first.print_root(),
5207                    second.print_root()
5208                )
5209            }
5210            HydroNode::CrossProduct { left, right, .. } => {
5211                format!(
5212                    "CrossProduct({}, {})",
5213                    left.print_root(),
5214                    right.print_root()
5215                )
5216            }
5217            HydroNode::CrossSingleton { left, right, .. } => {
5218                format!(
5219                    "CrossSingleton({}, {})",
5220                    left.print_root(),
5221                    right.print_root()
5222                )
5223            }
5224            HydroNode::Join { left, right, .. } => {
5225                format!("Join({}, {})", left.print_root(), right.print_root())
5226            }
5227            HydroNode::JoinHalf { left, right, .. } => {
5228                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5229            }
5230            HydroNode::Difference { pos, neg, .. } => {
5231                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5232            }
5233            HydroNode::AntiJoin { pos, neg, .. } => {
5234                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5235            }
5236            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5237            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5238            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5239            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5240            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5241            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5242            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5243            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5244            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5245            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5246            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5247            HydroNode::Unique { .. } => "Unique()".to_owned(),
5248            HydroNode::Sort { .. } => "Sort()".to_owned(),
5249            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5250            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5251            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5252                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5253            }
5254            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5255            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5256            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5257            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5258            HydroNode::Network { .. } => "Network()".to_owned(),
5259            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5260            HydroNode::Counter { tag, duration, .. } => {
5261                format!("Counter({:?}, {:?})", tag, duration)
5262            }
5263        }
5264    }
5265}
5266
5267#[cfg(feature = "build")]
5268fn instantiate_network<'a, D>(
5269    env: &mut D::InstantiateEnv,
5270    from_location: &LocationId,
5271    to_location: &LocationId,
5272    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5273    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5274    name: Option<&str>,
5275    networking_info: &crate::networking::NetworkingInfo,
5276) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5277where
5278    D: Deploy<'a>,
5279{
5280    let ((sink, source), connect_fn) = match (from_location, to_location) {
5281        (&LocationId::Process(from), &LocationId::Process(to)) => {
5282            let from_node = processes
5283                .get(from)
5284                .unwrap_or_else(|| {
5285                    panic!("A process used in the graph was not instantiated: {}", from)
5286                })
5287                .clone();
5288            let to_node = processes
5289                .get(to)
5290                .unwrap_or_else(|| {
5291                    panic!("A process used in the graph was not instantiated: {}", to)
5292                })
5293                .clone();
5294
5295            let sink_port = from_node.next_port();
5296            let source_port = to_node.next_port();
5297
5298            (
5299                D::o2o_sink_source(
5300                    env,
5301                    &from_node,
5302                    &sink_port,
5303                    &to_node,
5304                    &source_port,
5305                    name,
5306                    networking_info,
5307                ),
5308                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5309            )
5310        }
5311        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5312            let from_node = processes
5313                .get(from)
5314                .unwrap_or_else(|| {
5315                    panic!("A process used in the graph was not instantiated: {}", from)
5316                })
5317                .clone();
5318            let to_node = clusters
5319                .get(to)
5320                .unwrap_or_else(|| {
5321                    panic!("A cluster used in the graph was not instantiated: {}", to)
5322                })
5323                .clone();
5324
5325            let sink_port = from_node.next_port();
5326            let source_port = to_node.next_port();
5327
5328            (
5329                D::o2m_sink_source(
5330                    env,
5331                    &from_node,
5332                    &sink_port,
5333                    &to_node,
5334                    &source_port,
5335                    name,
5336                    networking_info,
5337                ),
5338                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5339            )
5340        }
5341        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5342            let from_node = clusters
5343                .get(from)
5344                .unwrap_or_else(|| {
5345                    panic!("A cluster used in the graph was not instantiated: {}", from)
5346                })
5347                .clone();
5348            let to_node = processes
5349                .get(to)
5350                .unwrap_or_else(|| {
5351                    panic!("A process used in the graph was not instantiated: {}", to)
5352                })
5353                .clone();
5354
5355            let sink_port = from_node.next_port();
5356            let source_port = to_node.next_port();
5357
5358            (
5359                D::m2o_sink_source(
5360                    env,
5361                    &from_node,
5362                    &sink_port,
5363                    &to_node,
5364                    &source_port,
5365                    name,
5366                    networking_info,
5367                ),
5368                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5369            )
5370        }
5371        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5372            let from_node = clusters
5373                .get(from)
5374                .unwrap_or_else(|| {
5375                    panic!("A cluster used in the graph was not instantiated: {}", from)
5376                })
5377                .clone();
5378            let to_node = clusters
5379                .get(to)
5380                .unwrap_or_else(|| {
5381                    panic!("A cluster used in the graph was not instantiated: {}", to)
5382                })
5383                .clone();
5384
5385            let sink_port = from_node.next_port();
5386            let source_port = to_node.next_port();
5387
5388            (
5389                D::m2m_sink_source(
5390                    env,
5391                    &from_node,
5392                    &sink_port,
5393                    &to_node,
5394                    &source_port,
5395                    name,
5396                    networking_info,
5397                ),
5398                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5399            )
5400        }
5401        (LocationId::Tick(_, _), _) => panic!(),
5402        (_, LocationId::Tick(_, _)) => panic!(),
5403        (LocationId::Atomic(_), _) => panic!(),
5404        (_, LocationId::Atomic(_)) => panic!(),
5405    };
5406    (sink, source, connect_fn)
5407}
5408
5409#[cfg(test)]
5410mod serde_test;
5411
5412#[cfg(test)]
5413mod test {
5414    use std::mem::size_of;
5415
5416    use stageleft::{QuotedWithContext, q};
5417
5418    use super::*;
5419
5420    #[test]
5421    #[cfg_attr(
5422        not(feature = "build"),
5423        ignore = "expects inclusion of feature-gated fields"
5424    )]
5425    fn hydro_node_size() {
5426        assert_eq!(size_of::<HydroNode>(), 264);
5427    }
5428
5429    #[test]
5430    #[cfg_attr(
5431        not(feature = "build"),
5432        ignore = "expects inclusion of feature-gated fields"
5433    )]
5434    fn hydro_root_size() {
5435        assert_eq!(size_of::<HydroRoot>(), 136);
5436    }
5437
5438    #[test]
5439    fn test_simplify_q_macro_basic() {
5440        // Test basic non-q! expression
5441        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5442        let result = simplify_q_macro(simple_expr.clone());
5443        assert_eq!(result, simple_expr);
5444    }
5445
5446    #[test]
5447    fn test_simplify_q_macro_actual_stageleft_call() {
5448        // Test a simplified version of what a real stageleft call might look like
5449        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5450        let result = simplify_q_macro(stageleft_call);
5451        // This should be processed by our visitor and simplified to q!(...)
5452        // since we detect the stageleft::runtime_support::fn_* pattern
5453        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5454    }
5455
5456    #[test]
5457    fn test_closure_no_pipe_at_start() {
5458        // Test a closure that does not start with a pipe
5459        let stageleft_call = q!({
5460            let foo = 123;
5461            move |b: usize| b + foo
5462        })
5463        .splice_fn1_ctx(&());
5464        let result = simplify_q_macro(stageleft_call);
5465        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5466    }
5467}