1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use vex_core::{
10 tournament_select, Agent, AgentConfig, Fitness, GeneticOperator, Genome, Hash, MerkleTree,
11 StandardOperator,
12};
13
14use crate::executor::{AgentExecutor, ExecutionResult, ExecutorConfig};
15use vex_llm::LlmProvider;
16
17#[derive(Debug, Clone)]
19pub struct OrchestratorConfig {
20 pub max_depth: u8,
22 pub agents_per_level: usize,
24 pub enable_evolution: bool,
26 pub mutation_rate: f64,
28 pub executor_config: ExecutorConfig,
30 pub max_agent_age: Duration,
32 pub enable_self_correction: bool,
34 pub improvement_threshold: f64,
36 pub reflect_every_n_tasks: usize,
38}
39
40impl Default for OrchestratorConfig {
41 fn default() -> Self {
42 Self {
43 max_depth: 3,
44 agents_per_level: 2,
45 enable_evolution: true,
46 mutation_rate: 0.1,
47 executor_config: ExecutorConfig::default(),
48 max_agent_age: Duration::from_secs(3600), enable_self_correction: false,
50 improvement_threshold: 0.02,
51 reflect_every_n_tasks: 5,
52 }
53 }
54}
55
56use vex_anchor::{AnchorBackend, AnchorMetadata, AnchorReceipt};
57
58#[derive(Debug)]
60pub struct OrchestrationResult {
61 pub root_agent_id: Uuid,
63 pub response: String,
65 pub merkle_root: Hash,
67 pub trace_root: Option<Hash>,
69 pub agent_results: HashMap<Uuid, ExecutionResult>,
71 pub anchor_receipts: Vec<AnchorReceipt>,
73 pub levels_processed: u8,
75 pub confidence: f64,
77}
78
79#[derive(Clone)]
81struct TrackedAgent {
82 agent: Agent,
83 _tenant_id: String,
84 created_at: Instant,
85}
86
87pub struct Orchestrator<L: LlmProvider> {
89 pub config: OrchestratorConfig,
91 agents: RwLock<HashMap<Uuid, TrackedAgent>>,
93 executor: AgentExecutor<L>,
95 anchors: Vec<Arc<dyn AnchorBackend>>,
97 #[allow(dead_code)]
99 llm: Arc<L>,
100 evolution_memory: Option<RwLock<vex_core::EvolutionMemory>>,
102 reflection_agent: Option<vex_adversarial::ReflectionAgent<L>>,
104 persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
106}
107
108impl<L: LlmProvider + 'static> Orchestrator<L> {
109 pub fn new(
111 llm: Arc<L>,
112 config: OrchestratorConfig,
113 persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
114 ) -> Self {
115 let executor = AgentExecutor::new(llm.clone(), config.executor_config.clone());
116 let evolution_memory = if config.enable_self_correction {
117 Some(RwLock::new(vex_core::EvolutionMemory::new()))
118 } else {
119 None
120 };
121 let reflection_agent = if config.enable_self_correction {
122 Some(vex_adversarial::ReflectionAgent::new(llm.clone()))
123 } else {
124 None
125 };
126 Self {
127 config,
128 agents: RwLock::new(HashMap::new()),
129 executor,
130 anchors: Vec::new(),
131 llm,
132 evolution_memory,
133 reflection_agent,
134 persistence_layer,
135 }
136 }
137
138 pub fn add_anchor(&mut self, anchor: Arc<dyn AnchorBackend>) {
140 self.anchors.push(anchor);
141 }
142
143 pub async fn cleanup_expired(&self) -> usize {
146 let mut agents = self.agents.write().await;
147 let before = agents.len();
148 agents.retain(|_, tracked| tracked.created_at.elapsed() < self.config.max_agent_age);
149 let removed = before - agents.len();
150 if removed > 0 {
151 tracing::info!(
152 removed = removed,
153 remaining = agents.len(),
154 "Cleaned up expired agents"
155 );
156 }
157 removed
158 }
159
160 pub async fn agent_count(&self) -> usize {
162 self.agents.read().await.len()
163 }
164
165 pub async fn process(
167 &self,
168 tenant_id: &str,
169 query: &str,
170 ) -> Result<OrchestrationResult, String> {
171 let root_config = AgentConfig {
173 name: "Root".to_string(),
174 role: "You are a strategic coordinator. Synthesize information from sub-agents into a coherent response.".to_string(),
175 max_depth: self.config.max_depth,
176 spawn_shadow: true,
177 };
178 let mut root = Agent::new(root_config);
179 let root_id = root.id;
180
181 let child_configs = vec![
183 AgentConfig {
184 name: "Researcher".to_string(),
185 role: "You are a thorough researcher. Analyze the query and provide detailed findings.".to_string(),
186 max_depth: 1,
187 spawn_shadow: true,
188 },
189 AgentConfig {
190 name: "Critic".to_string(),
191 role: "You are a critical analyzer. Identify potential issues, edge cases, and weaknesses.".to_string(),
192 max_depth: 1,
193 spawn_shadow: true,
194 },
195 ];
196
197 let mut execution_futures = Vec::new();
199 for config in child_configs.into_iter().take(self.config.agents_per_level) {
200 let mut child = root.spawn_child(config);
201 let executor = self.executor.clone();
202 let query_str = query.to_string();
203
204 execution_futures.push(tokio::spawn(async move {
205 let result = executor.execute(&mut child, &query_str).await;
206 (child.id, child, result)
207 }));
208 }
209
210 let task_results = futures::future::join_all(execution_futures).await;
211
212 let mut agents = self.agents.write().await;
214
215 let mut all_results: HashMap<Uuid, ExecutionResult> = HashMap::new();
216 let mut child_results = Vec::new();
217 for task_result in task_results {
218 let (child_id, child, result) = task_result.map_err(|e| e.to_string())?;
219 let execution_result: ExecutionResult = result?;
220
221 child_results.push((child_id, execution_result.clone()));
222 all_results.insert(child_id, execution_result);
223 agents.insert(
224 child_id,
225 TrackedAgent {
226 agent: child,
227 _tenant_id: tenant_id.to_string(),
228 created_at: Instant::now(),
229 },
230 );
231 }
232
233 drop(agents);
235
236 let synthesis_prompt = format!(
238 "Based on the following research from your sub-agents, provide a comprehensive answer:\n\n\
239 Original Query: \"{}\"\n\n\
240 Researcher's Findings: \"{}\"\n\n\
241 Critic's Analysis: \"{}\"\n\n\
242 Synthesize these into a final, well-reasoned response.",
243 query,
244 child_results.first().map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
245 child_results.get(1).map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
246 );
247
248 let root_result = self.executor.execute(&mut root, &synthesis_prompt).await?;
249 all_results.insert(root_id, root_result.clone());
250
251 let mut agents = self.agents.write().await;
253
254 agents.insert(
256 root_id,
257 TrackedAgent {
258 agent: root,
259 _tenant_id: tenant_id.to_string(),
260 created_at: Instant::now(),
261 },
262 );
263
264 let leaves: Vec<(String, Hash)> = all_results
266 .iter()
267 .map(|(id, r)| (id.to_string(), r.context.hash.clone()))
268 .collect();
269 let merkle_tree = MerkleTree::from_leaves(leaves);
270
271 let total_confidence: f64 = all_results.values().map(|r| r.confidence).sum();
273 let avg_confidence = total_confidence / all_results.len() as f64;
274
275 if self.config.enable_evolution {
277 if self.config.enable_self_correction {
278 self.evolve_agents_self_correcting(tenant_id, &mut agents, &all_results)
279 .await;
280 } else {
281 self.evolve_agents(tenant_id, &mut agents, &all_results);
282 }
283 }
284
285 let trace_leaves: Vec<(String, Hash)> = all_results
287 .iter()
288 .filter_map(|(id, r)| r.trace_root.clone().map(|tr| (id.to_string(), tr)))
289 .collect();
290 let trace_merkle = MerkleTree::from_leaves(trace_leaves);
291
292 let mut anchor_receipts = Vec::new();
294 if let Some(root_hash) = merkle_tree.root_hash() {
295 let metadata = AnchorMetadata::new(tenant_id, all_results.len() as u64);
296 for anchor in &self.anchors {
297 match anchor.anchor(root_hash, metadata.clone()).await {
298 Ok(receipt) => anchor_receipts.push(receipt),
299 Err(e) => tracing::warn!("Anchoring to {} failed: {}", anchor.name(), e),
300 }
301 }
302 }
303
304 Ok(OrchestrationResult {
305 root_agent_id: root_id,
306 response: root_result.response,
307 merkle_root: merkle_tree
308 .root_hash()
309 .cloned()
310 .unwrap_or(Hash::digest(b"empty")),
311 trace_root: trace_merkle.root_hash().cloned(),
312 agent_results: all_results,
313 anchor_receipts,
314 levels_processed: 2,
315 confidence: avg_confidence,
316 })
317 }
318
319 fn evolve_agents(
321 &self,
322 _tenant_id: &str,
323 agents: &mut HashMap<Uuid, TrackedAgent>,
324 results: &HashMap<Uuid, ExecutionResult>,
325 ) {
326 let operator = StandardOperator;
327
328 let population: Vec<(Genome, Fitness)> = agents
330 .values()
331 .map(|tracked| {
332 let fitness = results
333 .get(&tracked.agent.id)
334 .map(|r| r.confidence)
335 .unwrap_or(0.5);
336 (tracked.agent.genome.clone(), Fitness::new(fitness))
337 })
338 .collect();
339
340 if population.len() < 2 {
341 return;
342 }
343
344 let parent_a = tournament_select(&population, 2);
346 let parent_b = tournament_select(&population, 2);
347 let mut offspring = operator.crossover(parent_a, parent_b);
348 operator.mutate(&mut offspring, self.config.mutation_rate);
349
350 if let Some((worst_id, _worst_fitness)) = results.iter().min_by(|a, b| {
353 a.1.confidence
354 .partial_cmp(&b.1.confidence)
355 .unwrap_or(std::cmp::Ordering::Equal)
356 }) {
357 if let Some(tracked) = agents.get_mut(worst_id) {
358 let old_traits = tracked.agent.genome.traits.clone();
359 tracked.agent.apply_evolved_genome(offspring.clone());
360
361 tracing::info!(
362 agent_id = %worst_id,
363 old_traits = ?old_traits,
364 new_traits = ?offspring.traits,
365 "Evolved genome applied to least fit agent (Elitism preserved fittest)"
366 );
367 }
368 }
369 }
370
371 async fn evolve_agents_self_correcting(
381 &self,
382 tenant_id: &str,
383 agents: &mut HashMap<Uuid, TrackedAgent>,
384 results: &HashMap<Uuid, ExecutionResult>,
385 ) {
386 let memory = match &self.evolution_memory {
388 Some(mem) => mem,
389 None => {
390 tracing::warn!("Self-correction enabled but memory not initialized");
391 return self.evolve_agents(tenant_id, agents, results);
392 }
393 };
394
395 let experiments_to_save: Vec<vex_core::GenomeExperiment> = {
397 let mut memory_guard = memory.write().await;
398 let mut experiments = Vec::new();
399
400 for (id, result) in results {
401 if let Some(tracked) = agents.get(id) {
402 let mut fitness_scores = std::collections::HashMap::new();
403 fitness_scores.insert("confidence".to_string(), result.confidence);
404
405 let experiment = vex_core::GenomeExperiment::new(
406 &tracked.agent.genome,
407 fitness_scores,
408 result.confidence,
409 &format!("Depth {}", tracked.agent.depth),
410 );
411 memory_guard.record(experiment.clone());
412 experiments.push(experiment);
413 }
414 }
415 experiments
416 }; if let Some(store) = &self.persistence_layer {
420 for experiment in experiments_to_save {
421 if let Err(e) = store.save_experiment(tenant_id, &experiment).await {
422 tracing::warn!("Failed to persist evolution experiment: {}", e);
423 }
424 }
425 }
426
427 let best = results.iter().max_by(|a, b| {
429 a.1.confidence
430 .partial_cmp(&b.1.confidence)
431 .unwrap_or(std::cmp::Ordering::Equal)
432 });
433
434 if let Some((best_id, best_result)) = best {
435 if let Some(tracked) = agents.get_mut(best_id) {
436 let suggestions = if let Some(ref reflection) = self.reflection_agent {
438 let memory_guard = memory.read().await;
440
441 let reflection_result = reflection
442 .reflect(
443 &tracked.agent,
444 &format!("Orchestrated task at depth {}", tracked.agent.depth),
445 &best_result.response,
446 best_result.confidence,
447 &memory_guard,
448 )
449 .await;
450
451 drop(memory_guard);
452
453 reflection_result
455 .adjustments
456 .into_iter()
457 .map(|(name, current, suggested)| {
458 vex_core::TraitAdjustment {
459 trait_name: name,
460 current_value: current,
461 suggested_value: suggested,
462 correlation: 0.5, confidence: reflection_result.expected_improvement,
464 }
465 })
466 .collect()
467 } else {
468 let memory_guard = memory.read().await;
470 let suggestions = memory_guard.suggest_adjustments(&tracked.agent.genome);
471 drop(memory_guard);
472 suggestions
473 };
474
475 if !suggestions.is_empty() {
476 let old_traits = tracked.agent.genome.traits.clone();
477
478 for (i, name) in tracked.agent.genome.trait_names.iter().enumerate() {
480 if let Some(sugg) = suggestions.iter().find(|s| &s.trait_name == name) {
481 if sugg.confidence >= 0.3 {
482 tracked.agent.genome.traits[i] = sugg.suggested_value;
483 }
484 }
485 }
486
487 if old_traits != tracked.agent.genome.traits {
488 let source = if self.reflection_agent.is_some() {
489 "LLM + Statistical"
490 } else {
491 "Statistical"
492 };
493
494 tracing::info!(
495 agent_id = %best_id,
496 old_traits = ?old_traits,
497 new_traits = ?tracked.agent.genome.traits,
498 suggestions = suggestions.len(),
499 source = source,
500 "Self-correcting genome applied"
501 );
502 }
503 } else {
504 self.evolve_agents(tenant_id, agents, results);
506 }
507 }
508 }
509
510 self.maybe_consolidate_memory(tenant_id).await;
512 }
513
514 async fn maybe_consolidate_memory(&self, tenant_id: &str) {
516 let memory = match &self.evolution_memory {
517 Some(m) => m,
518 None => return,
519 };
520
521 let reflection = match &self.reflection_agent {
522 Some(r) => r,
523 None => return,
524 };
525
526 let (count, snapshot, batch_size) = {
529 let guard = memory.read().await;
530 if guard.len() >= 70 {
531 (guard.len(), guard.get_experiments_oldest(50), 50)
532 } else {
533 (0, Vec::new(), 0)
534 }
535 };
536
537 if count >= 70 {
538 tracing::info!(
539 "Consolidating evolution memory ({} items, batch execution)...",
540 batch_size
541 );
542
543 let consolidation_result = reflection.consolidate_memory(&snapshot).await;
545
546 let success = consolidation_result.is_ok();
547
548 match consolidation_result {
549 Ok(rules) => {
550 if !rules.is_empty() {
551 if let Some(store) = &self.persistence_layer {
553 for rule in &rules {
554 if let Err(e) = store.save_rule(tenant_id, rule).await {
555 tracing::warn!("Failed to save optimization rule: {}", e);
556 }
557 }
558 }
559
560 tracing::info!(
561 "Consolidated memory into {} rules. Draining batch.",
562 rules.len()
563 );
564 } else {
565 tracing::info!(
566 "Consolidation completed with no patterns found. Draining batch."
567 );
568 }
569 }
570 Err(e) => {
571 tracing::warn!("Consolidation failed: {}", e);
572 }
573 }
574
575 let mut guard = memory.write().await;
577
578 if success {
580 guard.drain_oldest(batch_size);
581 }
582
583 if guard.len() > 100 {
585 let excess = guard.len() - 100;
586 tracing::warn!(
587 "Memory overflow ({} > 100). Evicting {} oldest items.",
588 guard.len(),
589 excess
590 );
591 guard.drain_oldest(excess);
592 }
593 }
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use async_trait::async_trait;
601
602 #[derive(Debug)]
603 struct MockLlm;
604
605 #[async_trait]
606 impl vex_llm::LlmProvider for MockLlm {
607 fn name(&self) -> &str {
608 "MockLLM"
609 }
610
611 async fn is_available(&self) -> bool {
612 true
613 }
614
615 async fn complete(
616 &self,
617 request: vex_llm::LlmRequest,
618 ) -> Result<vex_llm::LlmResponse, vex_llm::LlmError> {
619 let content = if request.system.contains("researcher") {
620 "Research finding: This is a detailed analysis of the topic.".to_string()
621 } else if request.system.contains("critic") {
622 "Critical analysis: The main concern is validation of assumptions.".to_string()
623 } else if request.prompt.is_empty() {
624 "Mock response".to_string()
625 } else {
626 "Synthesized response combining all findings into a coherent answer.".to_string()
627 };
628
629 Ok(vex_llm::LlmResponse {
630 content,
631 model: "mock".to_string(),
632 tokens_used: Some(10),
633 latency_ms: 10,
634 trace_root: None,
635 })
636 }
637 }
638
639 #[tokio::test]
640 async fn test_orchestrator() {
641 let llm = Arc::new(MockLlm);
642 let orchestrator = Orchestrator::new(llm, OrchestratorConfig::default(), None);
643
644 let result = orchestrator
645 .process("test-tenant", "What is the meaning of life?")
646 .await
647 .unwrap();
648
649 assert!(!result.response.is_empty());
650 assert!(result.confidence > 0.0);
651 assert!(!result.agent_results.is_empty());
652 }
653}