Skip to main content

hydro_lang/compile/
built.rs

1use std::marker::PhantomData;
2
3use dfir_lang::graph::{
4    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
5};
6use slotmap::{SecondaryMap, SlotMap};
7
8use super::compiled::CompiledFlow;
9use super::deploy::{DeployFlow, DeployResult};
10use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
11use super::ir::{HydroRoot, emit};
12use crate::location::{Cluster, External, LocationKey, LocationType, Process};
13#[cfg(stageleft_runtime)]
14#[cfg(feature = "sim")]
15use crate::sim::{flow::SimFlow, graph::SimNode};
16use crate::staging_util::Invariant;
17#[cfg(stageleft_runtime)]
18#[cfg(feature = "viz")]
19use crate::viz::api::GraphApi;
20
21pub struct BuiltFlow<'a> {
22    pub(super) ir: Vec<HydroRoot>,
23    pub(super) locations: SlotMap<LocationKey, LocationType>,
24    pub(super) location_names: SecondaryMap<LocationKey, String>,
25
26    /// Compile-time sidecar directives extracted from the flow state.
27    pub(super) sidecars: Vec<super::builder::Sidecar>,
28
29    /// Application name used in telemetry.
30    pub(super) flow_name: String,
31
32    pub(super) _phantom: Invariant<'a>,
33}
34
35pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
36    emit(ir)
37        .into_iter()
38        .map(|(k, v)| {
39            let FlatGraphBuilderOutput { mut flat_graph, .. } =
40                v.build().expect("Failed to build DFIR flat graph.");
41            eliminate_extra_unions_tees(&mut flat_graph);
42            let partitioned_graph =
43                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
44            (k, partitioned_graph)
45        })
46        .collect()
47}
48
49impl<'a> BuiltFlow<'a> {
50    /// Returns all [`HydroRoot`]s in the IR.
51    pub fn ir(&self) -> &[HydroRoot] {
52        &self.ir
53    }
54
55    /// Serialize the IR as JSON.
56    #[cfg(feature = "runtime_support")]
57    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
58        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
59    }
60
61    /// Returns all raw location ID -> location name mappings.
62    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
63        &self.location_names
64    }
65
66    /// Analyze consistency labels for all observable sinks using forward analysis.
67    ///
68    /// Derives labels from sink output types (ordering + retry) and checks for
69    /// untrusted nondeterminism upstream. Equivalent to coord-analysis but operates
70    /// directly on the in-memory IR (no serialization needed).
71    pub fn analyze_consistency(&self) -> Vec<super::consistency_label::SinkConsistency> {
72        super::consistency_label::analyze_sink_labels(&self.ir)
73    }
74
75    /// Get a GraphApi instance for this built flow
76    #[cfg(stageleft_runtime)]
77    #[cfg(feature = "viz")]
78    pub fn graph_api(&self) -> GraphApi<'_> {
79        GraphApi::new(&self.ir, self.location_names())
80    }
81
82    /// Render graph to string in the given format.
83    #[cfg(feature = "viz")]
84    pub fn render_graph(
85        &self,
86        format: crate::viz::config::GraphType,
87        use_short_labels: bool,
88        show_metadata: bool,
89    ) -> String {
90        self.graph_api()
91            .render(format, use_short_labels, show_metadata)
92    }
93
94    /// Write graph to file.
95    #[cfg(feature = "viz")]
96    pub fn write_graph_to_file(
97        &self,
98        format: crate::viz::config::GraphType,
99        filename: &str,
100        use_short_labels: bool,
101        show_metadata: bool,
102    ) -> Result<(), Box<dyn std::error::Error>> {
103        self.graph_api()
104            .write_to_file(format, filename, use_short_labels, show_metadata)
105    }
106
107    /// Generate graph based on CLI config. Returns Some(path) if written.
108    #[cfg(feature = "viz")]
109    pub fn generate_graph(
110        &self,
111        config: &crate::viz::config::GraphConfig,
112    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
113        self.graph_api().generate_graph(config)
114    }
115
116    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
117        f(&mut self.ir);
118        self
119    }
120
121    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
122        self.into_deploy()
123    }
124
125    #[cfg(feature = "sim")]
126    /// Creates a simulation for this builder, which can be used to run deterministic simulations
127    /// of the Hydro program.
128    pub fn sim(self) -> SimFlow<'a> {
129        use std::cell::RefCell;
130        use std::rc::Rc;
131
132        use slotmap::SparseSecondaryMap;
133
134        use crate::sim::graph::SimNodePort;
135
136        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));
137
138        let mut processes = SparseSecondaryMap::new();
139        let mut clusters = SparseSecondaryMap::new();
140        let externals = SparseSecondaryMap::new();
141
142        for (key, loc) in self.locations.iter() {
143            match loc {
144                LocationType::Process => {
145                    processes.insert(
146                        key,
147                        SimNode {
148                            shared_port_counter: shared_port_counter.clone(),
149                        },
150                    );
151                }
152                LocationType::Cluster => {
153                    clusters.insert(
154                        key,
155                        SimNode {
156                            shared_port_counter: shared_port_counter.clone(),
157                        },
158                    );
159                }
160                LocationType::External => {
161                    panic!("Sim cannot have externals");
162                }
163            }
164        }
165
166        SimFlow {
167            ir: self.ir,
168            processes,
169            clusters,
170            externals,
171            cluster_max_sizes: SparseSecondaryMap::new(),
172            externals_port_registry: Default::default(),
173            test_safety_only: false,
174            skip_consistency_assertions: false,
175            unit_test_fuzz_iterations: 8192,
176            _phantom: PhantomData,
177        }
178    }
179
180    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
181        let (processes, clusters, externals) = Default::default();
182        DeployFlow {
183            ir: self.ir,
184            locations: self.locations,
185            location_names: self.location_names,
186            processes,
187            clusters,
188            externals,
189            sidecars: self.sidecars,
190            flow_name: self.flow_name,
191            _phantom: PhantomData,
192        }
193    }
194
195    pub fn with_process<P, D: Deploy<'a>>(
196        self,
197        process: &Process<P>,
198        spec: impl IntoProcessSpec<'a, D>,
199    ) -> DeployFlow<'a, D> {
200        self.into_deploy().with_process(process, spec)
201    }
202
203    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
204        self,
205        spec: impl Fn() -> S,
206    ) -> DeployFlow<'a, D> {
207        self.into_deploy().with_remaining_processes(spec)
208    }
209
210    pub fn with_external<P, D: Deploy<'a>>(
211        self,
212        process: &External<P>,
213        spec: impl ExternalSpec<'a, D>,
214    ) -> DeployFlow<'a, D> {
215        self.into_deploy().with_external(process, spec)
216    }
217
218    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
219        self,
220        spec: impl Fn() -> S,
221    ) -> DeployFlow<'a, D> {
222        self.into_deploy().with_remaining_externals(spec)
223    }
224
225    pub fn with_cluster<C, D: Deploy<'a>>(
226        self,
227        cluster: &Cluster<C>,
228        spec: impl ClusterSpec<'a, D>,
229    ) -> DeployFlow<'a, D> {
230        self.into_deploy().with_cluster(cluster, spec)
231    }
232
233    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
234        self,
235        spec: impl Fn() -> S,
236    ) -> DeployFlow<'a, D> {
237        self.into_deploy().with_remaining_clusters(spec)
238    }
239
240    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
241        self.into_deploy::<D>().compile()
242    }
243
244    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
245        self.into_deploy::<D>().deploy(env)
246    }
247}