vex_runtime/
orchestrator.rs

1//! Orchestrator - manages hierarchical agent networks
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use vex_core::{
10    tournament_select, Agent, AgentConfig, Fitness, GeneticOperator, Genome, Hash, MerkleTree,
11    StandardOperator,
12};
13
14use crate::executor::{AgentExecutor, ExecutionResult, ExecutorConfig};
15use vex_llm::LlmProvider;
16
17/// Configuration for the orchestrator
18#[derive(Debug, Clone)]
19pub struct OrchestratorConfig {
20    /// Maximum depth of agent hierarchy
21    pub max_depth: u8,
22    /// Number of agents per level
23    pub agents_per_level: usize,
24    /// Enable evolutionary selection
25    pub enable_evolution: bool,
26    /// Mutation rate for evolution
27    pub mutation_rate: f64,
28    /// Executor configuration
29    pub executor_config: ExecutorConfig,
30    /// Maximum age for tracked agents before cleanup (prevents memory leaks)
31    pub max_agent_age: Duration,
32    /// Enable self-correcting genome evolution
33    pub enable_self_correction: bool,
34    /// Minimum fitness improvement to accept change
35    pub improvement_threshold: f64,
36    /// Number of tasks before reflection
37    pub reflect_every_n_tasks: usize,
38}
39
40impl Default for OrchestratorConfig {
41    fn default() -> Self {
42        Self {
43            max_depth: 3,
44            agents_per_level: 2,
45            enable_evolution: true,
46            mutation_rate: 0.1,
47            executor_config: ExecutorConfig::default(),
48            max_agent_age: Duration::from_secs(3600), // 1 hour default
49            enable_self_correction: false,
50            improvement_threshold: 0.02,
51            reflect_every_n_tasks: 5,
52        }
53    }
54}
55
56use vex_anchor::{AnchorBackend, AnchorMetadata, AnchorReceipt};
57
58/// Result from orchestrated execution
59#[derive(Debug)]
60pub struct OrchestrationResult {
61    /// Root agent ID
62    pub root_agent_id: Uuid,
63    /// Final synthesized response
64    pub response: String,
65    /// Merkle root of all context packets
66    pub merkle_root: Hash,
67    /// Aggregated trace root from all agents
68    pub trace_root: Option<Hash>,
69    /// All execution results (agent_id -> result)
70    pub agent_results: HashMap<Uuid, ExecutionResult>,
71    /// Anchor receipts from blockchain backends
72    pub anchor_receipts: Vec<AnchorReceipt>,
73    /// Total levels processed
74    pub levels_processed: u8,
75    /// Overall confidence
76    pub confidence: f64,
77}
78
79/// Tracked agent with creation timestamp for TTL-based cleanup
80#[derive(Clone)]
81struct TrackedAgent {
82    agent: Agent,
83    _tenant_id: String,
84    created_at: Instant,
85}
86
87/// Orchestrator manages hierarchical agent execution
88pub struct Orchestrator<L: LlmProvider> {
89    /// Configuration
90    pub config: OrchestratorConfig,
91    /// All agents (id -> tracked agent with timestamp)
92    agents: RwLock<HashMap<Uuid, TrackedAgent>>,
93    /// Executor
94    executor: AgentExecutor<L>,
95    /// Anchoring backends (Blockchain, Cloud, etc)
96    anchors: Vec<Arc<dyn AnchorBackend>>,
97    /// LLM backend (stored for future use)
98    #[allow(dead_code)]
99    llm: Arc<L>,
100    /// Evolution memory for self-correction (optional)
101    evolution_memory: Option<RwLock<vex_core::EvolutionMemory>>,
102    /// Reflection agent for LLM-based suggestions (optional)
103    reflection_agent: Option<vex_adversarial::ReflectionAgent<L>>,
104    /// Persistence layer for cross-session learning (optional)
105    persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
106}
107
108impl<L: LlmProvider + 'static> Orchestrator<L> {
109    /// Create a new orchestrator
110    pub fn new(
111        llm: Arc<L>,
112        config: OrchestratorConfig,
113        persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
114    ) -> Self {
115        let executor = AgentExecutor::new(llm.clone(), config.executor_config.clone());
116        let evolution_memory = if config.enable_self_correction {
117            Some(RwLock::new(vex_core::EvolutionMemory::new()))
118        } else {
119            None
120        };
121        let reflection_agent = if config.enable_self_correction {
122            Some(vex_adversarial::ReflectionAgent::new(llm.clone()))
123        } else {
124            None
125        };
126        Self {
127            config,
128            agents: RwLock::new(HashMap::new()),
129            executor,
130            anchors: Vec::new(),
131            llm,
132            evolution_memory,
133            reflection_agent,
134            persistence_layer,
135        }
136    }
137
138    /// Add an anchoring backend
139    pub fn add_anchor(&mut self, anchor: Arc<dyn AnchorBackend>) {
140        self.anchors.push(anchor);
141    }
142
143    /// Cleanup expired agents to prevent memory leaks
144    /// Returns the number of agents removed
145    pub async fn cleanup_expired(&self) -> usize {
146        let mut agents = self.agents.write().await;
147        let before = agents.len();
148        agents.retain(|_, tracked| tracked.created_at.elapsed() < self.config.max_agent_age);
149        let removed = before - agents.len();
150        if removed > 0 {
151            tracing::info!(
152                removed = removed,
153                remaining = agents.len(),
154                "Cleaned up expired agents"
155            );
156        }
157        removed
158    }
159
160    /// Get current agent count
161    pub async fn agent_count(&self) -> usize {
162        self.agents.read().await.len()
163    }
164
165    /// Process a query with full hierarchical agent network
166    pub async fn process(
167        &self,
168        tenant_id: &str,
169        query: &str,
170    ) -> Result<OrchestrationResult, String> {
171        // Create root agent
172        let root_config = AgentConfig {
173            name: "Root".to_string(),
174            role: "You are a strategic coordinator. Synthesize information from sub-agents into a coherent response.".to_string(),
175            max_depth: self.config.max_depth,
176            spawn_shadow: true,
177        };
178        let mut root = Agent::new(root_config);
179        let root_id = root.id;
180
181        // Spawn child agents for research
182        let child_configs = vec![
183            AgentConfig {
184                name: "Researcher".to_string(),
185                role: "You are a thorough researcher. Analyze the query and provide detailed findings.".to_string(),
186                max_depth: 1,
187                spawn_shadow: true,
188            },
189            AgentConfig {
190                name: "Critic".to_string(),
191                role: "You are a critical analyzer. Identify potential issues, edge cases, and weaknesses.".to_string(),
192                max_depth: 1,
193                spawn_shadow: true,
194            },
195        ];
196
197        // Execute child agents in parallel (no lock held during await)
198        let mut execution_futures = Vec::new();
199        for config in child_configs.into_iter().take(self.config.agents_per_level) {
200            let mut child = root.spawn_child(config);
201            let executor = self.executor.clone();
202            let query_str = query.to_string();
203
204            execution_futures.push(tokio::spawn(async move {
205                let result = executor.execute(&mut child, &query_str).await;
206                (child.id, child, result)
207            }));
208        }
209
210        let task_results = futures::future::join_all(execution_futures).await;
211
212        // Re-acquire lock to update agents map
213        let mut agents = self.agents.write().await;
214
215        let mut all_results: HashMap<Uuid, ExecutionResult> = HashMap::new();
216        let mut child_results = Vec::new();
217        for task_result in task_results {
218            let (child_id, child, result) = task_result.map_err(|e| e.to_string())?;
219            let execution_result: ExecutionResult = result?;
220
221            child_results.push((child_id, execution_result.clone()));
222            all_results.insert(child_id, execution_result);
223            agents.insert(
224                child_id,
225                TrackedAgent {
226                    agent: child,
227                    _tenant_id: tenant_id.to_string(),
228                    created_at: Instant::now(),
229                },
230            );
231        }
232
233        // Drop lock before root synthesis
234        drop(agents);
235
236        // Synthesize child results at root level
237        let synthesis_prompt = format!(
238            "Based on the following research from your sub-agents, provide a comprehensive answer:\n\n\
239             Original Query: \"{}\"\n\n\
240             Researcher's Findings: \"{}\"\n\n\
241             Critic's Analysis: \"{}\"\n\n\
242             Synthesize these into a final, well-reasoned response.",
243            query,
244            child_results.first().map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
245            child_results.get(1).map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
246        );
247
248        let root_result = self.executor.execute(&mut root, &synthesis_prompt).await?;
249        all_results.insert(root_id, root_result.clone());
250
251        // Re-acquire lock to update root and run evolution
252        let mut agents = self.agents.write().await;
253
254        // Insert root after children are handled
255        agents.insert(
256            root_id,
257            TrackedAgent {
258                agent: root,
259                _tenant_id: tenant_id.to_string(),
260                created_at: Instant::now(),
261            },
262        );
263
264        // Build Merkle tree from all context packets
265        let leaves: Vec<(String, Hash)> = all_results
266            .iter()
267            .map(|(id, r)| (id.to_string(), r.context.hash.clone()))
268            .collect();
269        let merkle_tree = MerkleTree::from_leaves(leaves);
270
271        // Calculate overall confidence
272        let total_confidence: f64 = all_results.values().map(|r| r.confidence).sum();
273        let avg_confidence = total_confidence / all_results.len() as f64;
274
275        // Evolution step (if enabled)
276        if self.config.enable_evolution {
277            if self.config.enable_self_correction {
278                self.evolve_agents_self_correcting(tenant_id, &mut agents, &all_results)
279                    .await;
280            } else {
281                self.evolve_agents(tenant_id, &mut agents, &all_results);
282            }
283        }
284
285        // Build trace merkle tree from agent trace roots
286        let trace_leaves: Vec<(String, Hash)> = all_results
287            .iter()
288            .filter_map(|(id, r)| r.trace_root.clone().map(|tr| (id.to_string(), tr)))
289            .collect();
290        let trace_merkle = MerkleTree::from_leaves(trace_leaves);
291
292        // Anchoring Step
293        let mut anchor_receipts = Vec::new();
294        if let Some(root_hash) = merkle_tree.root_hash() {
295            let metadata = AnchorMetadata::new(tenant_id, all_results.len() as u64);
296            for anchor in &self.anchors {
297                match anchor.anchor(root_hash, metadata.clone()).await {
298                    Ok(receipt) => anchor_receipts.push(receipt),
299                    Err(e) => tracing::warn!("Anchoring to {} failed: {}", anchor.name(), e),
300                }
301            }
302        }
303
304        Ok(OrchestrationResult {
305            root_agent_id: root_id,
306            response: root_result.response,
307            merkle_root: merkle_tree
308                .root_hash()
309                .cloned()
310                .unwrap_or(Hash::digest(b"empty")),
311            trace_root: trace_merkle.root_hash().cloned(),
312            agent_results: all_results,
313            anchor_receipts,
314            levels_processed: 2,
315            confidence: avg_confidence,
316        })
317    }
318
319    /// Evolve agents based on fitness - persists evolved genome to fittest agent
320    fn evolve_agents(
321        &self,
322        _tenant_id: &str,
323        agents: &mut HashMap<Uuid, TrackedAgent>,
324        results: &HashMap<Uuid, ExecutionResult>,
325    ) {
326        let operator = StandardOperator;
327
328        // Build population with fitness scores from actual agent genomes
329        let population: Vec<(Genome, Fitness)> = agents
330            .values()
331            .map(|tracked| {
332                let fitness = results
333                    .get(&tracked.agent.id)
334                    .map(|r| r.confidence)
335                    .unwrap_or(0.5);
336                (tracked.agent.genome.clone(), Fitness::new(fitness))
337            })
338            .collect();
339
340        if population.len() < 2 {
341            return;
342        }
343
344        // Select parents via tournament selection and create offspring
345        let parent_a = tournament_select(&population, 2);
346        let parent_b = tournament_select(&population, 2);
347        let mut offspring = operator.crossover(parent_a, parent_b);
348        operator.mutate(&mut offspring, self.config.mutation_rate);
349
350        // Find the least fit agent and apply the evolved genome to it (Elitism)
351        // We preserve the 'best' and replace the 'worst' to ensure no regression.
352        if let Some((worst_id, _worst_fitness)) = results.iter().min_by(|a, b| {
353            a.1.confidence
354                .partial_cmp(&b.1.confidence)
355                .unwrap_or(std::cmp::Ordering::Equal)
356        }) {
357            if let Some(tracked) = agents.get_mut(worst_id) {
358                let old_traits = tracked.agent.genome.traits.clone();
359                tracked.agent.apply_evolved_genome(offspring.clone());
360
361                tracing::info!(
362                    agent_id = %worst_id,
363                    old_traits = ?old_traits,
364                    new_traits = ?offspring.traits,
365                    "Evolved genome applied to least fit agent (Elitism preserved fittest)"
366                );
367            }
368        }
369    }
370
371    /// Self-correcting evolution using temporal memory and statistical learning
372    ///
373    /// This enhances basic evolution with:
374    /// - Temporal memory of past experiments  
375    /// - Statistical correlation learning (Pearson)
376    /// - Intelligent trait adjustment suggestions
377    ///
378    /// # Modular Design
379    /// Users can override this for custom strategies.
380    async fn evolve_agents_self_correcting(
381        &self,
382        tenant_id: &str,
383        agents: &mut HashMap<Uuid, TrackedAgent>,
384        results: &HashMap<Uuid, ExecutionResult>,
385    ) {
386        // Require evolution memory
387        let memory = match &self.evolution_memory {
388            Some(mem) => mem,
389            None => {
390                tracing::warn!("Self-correction enabled but memory not initialized");
391                return self.evolve_agents(tenant_id, agents, results);
392            }
393        };
394
395        // Record experiments to memory and collect for persistence
396        let experiments_to_save: Vec<vex_core::GenomeExperiment> = {
397            let mut memory_guard = memory.write().await;
398            let mut experiments = Vec::new();
399
400            for (id, result) in results {
401                if let Some(tracked) = agents.get(id) {
402                    let mut fitness_scores = std::collections::HashMap::new();
403                    fitness_scores.insert("confidence".to_string(), result.confidence);
404
405                    let experiment = vex_core::GenomeExperiment::new(
406                        &tracked.agent.genome,
407                        fitness_scores,
408                        result.confidence,
409                        &format!("Depth {}", tracked.agent.depth),
410                    );
411                    memory_guard.record(experiment.clone());
412                    experiments.push(experiment);
413                }
414            }
415            experiments
416        }; // Release lock here
417
418        // Save to persistence (async, no lock held)
419        if let Some(store) = &self.persistence_layer {
420            for experiment in experiments_to_save {
421                if let Err(e) = store.save_experiment(tenant_id, &experiment).await {
422                    tracing::warn!("Failed to persist evolution experiment: {}", e);
423                }
424            }
425        }
426
427        // Find best performer
428        let best = results.iter().max_by(|a, b| {
429            a.1.confidence
430                .partial_cmp(&b.1.confidence)
431                .unwrap_or(std::cmp::Ordering::Equal)
432        });
433
434        if let Some((best_id, best_result)) = best {
435            if let Some(tracked) = agents.get_mut(best_id) {
436                // Get suggestions using dual-mode learning (statistical + LLM)
437                let suggestions = if let Some(ref reflection) = self.reflection_agent {
438                    // Use ReflectionAgent for LLM + statistical suggestions
439                    let memory_guard = memory.read().await;
440
441                    let reflection_result = reflection
442                        .reflect(
443                            &tracked.agent,
444                            &format!("Orchestrated task at depth {}", tracked.agent.depth),
445                            &best_result.response,
446                            best_result.confidence,
447                            &memory_guard,
448                        )
449                        .await;
450
451                    drop(memory_guard);
452
453                    // Convert to trait adjustments format
454                    reflection_result
455                        .adjustments
456                        .into_iter()
457                        .map(|(name, current, suggested)| {
458                            vex_core::TraitAdjustment {
459                                trait_name: name,
460                                current_value: current,
461                                suggested_value: suggested,
462                                correlation: 0.5, // From LLM, not pure statistical
463                                confidence: reflection_result.expected_improvement,
464                            }
465                        })
466                        .collect()
467                } else {
468                    // Fallback to statistical-only if ReflectionAgent not available
469                    let memory_guard = memory.read().await;
470                    let suggestions = memory_guard.suggest_adjustments(&tracked.agent.genome);
471                    drop(memory_guard);
472                    suggestions
473                };
474
475                if !suggestions.is_empty() {
476                    let old_traits = tracked.agent.genome.traits.clone();
477
478                    // Apply suggestions with high confidence
479                    for (i, name) in tracked.agent.genome.trait_names.iter().enumerate() {
480                        if let Some(sugg) = suggestions.iter().find(|s| &s.trait_name == name) {
481                            if sugg.confidence >= 0.3 {
482                                tracked.agent.genome.traits[i] = sugg.suggested_value;
483                            }
484                        }
485                    }
486
487                    if old_traits != tracked.agent.genome.traits {
488                        let source = if self.reflection_agent.is_some() {
489                            "LLM + Statistical"
490                        } else {
491                            "Statistical"
492                        };
493
494                        tracing::info!(
495                            agent_id = %best_id,
496                            old_traits = ?old_traits,
497                            new_traits = ?tracked.agent.genome.traits,
498                            suggestions = suggestions.len(),
499                            source = source,
500                            "Self-correcting genome applied"
501                        );
502                    }
503                } else {
504                    // Fallback to standard evolution
505                    self.evolve_agents(tenant_id, agents, results);
506                }
507            }
508        }
509
510        // Periodically check for memory consolidation
511        self.maybe_consolidate_memory(tenant_id).await;
512    }
513
514    /// Check if memory needs consolidation and perform it if necessary
515    async fn maybe_consolidate_memory(&self, tenant_id: &str) {
516        let memory = match &self.evolution_memory {
517            Some(m) => m,
518            None => return,
519        };
520
521        let reflection = match &self.reflection_agent {
522            Some(r) => r,
523            None => return,
524        };
525
526        // Check buffer size (Read lock)
527        // Maintain a safety buffer of 20 recent experiments for statistical continuity
528        let (count, snapshot, batch_size) = {
529            let guard = memory.read().await;
530            if guard.len() >= 70 {
531                (guard.len(), guard.get_experiments_oldest(50), 50)
532            } else {
533                (0, Vec::new(), 0)
534            }
535        };
536
537        if count >= 70 {
538            tracing::info!(
539                "Consolidating evolution memory ({} items, batch execution)...",
540                batch_size
541            );
542
543            // 1. Extract rules using LLM
544            let consolidation_result = reflection.consolidate_memory(&snapshot).await;
545
546            let success = consolidation_result.is_ok();
547
548            match consolidation_result {
549                Ok(rules) => {
550                    if !rules.is_empty() {
551                        // 2. Save rules to persistence
552                        if let Some(store) = &self.persistence_layer {
553                            for rule in &rules {
554                                if let Err(e) = store.save_rule(tenant_id, rule).await {
555                                    tracing::warn!("Failed to save optimization rule: {}", e);
556                                }
557                            }
558                        }
559
560                        tracing::info!(
561                            "Consolidated memory into {} rules. Draining batch.",
562                            rules.len()
563                        );
564                    } else {
565                        tracing::info!(
566                            "Consolidation completed with no patterns found. Draining batch."
567                        );
568                    }
569                }
570                Err(e) => {
571                    tracing::warn!("Consolidation failed: {}", e);
572                }
573            }
574
575            // 3. Manage Memory (Write lock)
576            let mut guard = memory.write().await;
577
578            // If success (even if no rules), remove the processed batch
579            if success {
580                guard.drain_oldest(batch_size);
581            }
582
583            // Overflow Protection: Hard cap at 100 to prevent DoS
584            if guard.len() > 100 {
585                let excess = guard.len() - 100;
586                tracing::warn!(
587                    "Memory overflow ({} > 100). Evicting {} oldest items.",
588                    guard.len(),
589                    excess
590                );
591                guard.drain_oldest(excess);
592            }
593        }
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use async_trait::async_trait;
601
602    #[derive(Debug)]
603    struct MockLlm;
604
605    #[async_trait]
606    impl vex_llm::LlmProvider for MockLlm {
607        fn name(&self) -> &str {
608            "MockLLM"
609        }
610
611        async fn is_available(&self) -> bool {
612            true
613        }
614
615        async fn complete(
616            &self,
617            request: vex_llm::LlmRequest,
618        ) -> Result<vex_llm::LlmResponse, vex_llm::LlmError> {
619            let content = if request.system.contains("researcher") {
620                "Research finding: This is a detailed analysis of the topic.".to_string()
621            } else if request.system.contains("critic") {
622                "Critical analysis: The main concern is validation of assumptions.".to_string()
623            } else if request.prompt.is_empty() {
624                "Mock response".to_string()
625            } else {
626                "Synthesized response combining all findings into a coherent answer.".to_string()
627            };
628
629            Ok(vex_llm::LlmResponse {
630                content,
631                model: "mock".to_string(),
632                tokens_used: Some(10),
633                latency_ms: 10,
634                trace_root: None,
635            })
636        }
637    }
638
639    #[tokio::test]
640    async fn test_orchestrator() {
641        let llm = Arc::new(MockLlm);
642        let orchestrator = Orchestrator::new(llm, OrchestratorConfig::default(), None);
643
644        let result = orchestrator
645            .process("test-tenant", "What is the meaning of life?")
646            .await
647            .unwrap();
648
649        assert!(!result.response.is_empty());
650        assert!(result.confidence > 0.0);
651        assert!(!result.agent_results.is_empty());
652    }
653}