Home OpenAI Building a Comprehensive AI Agent Evaluation Framework with Metrics, Reports, and Visual Dashboards
OpenAI

Building a Comprehensive AI Agent Evaluation Framework with Metrics, Reports, and Visual Dashboards

Share
Building a Comprehensive AI Agent Evaluation Framework with Metrics, Reports, and Visual Dashboards
Share


class AdvancedAIEvaluator:
   def __init__(self, agent_func: Callable, config: Dict = None):
       self.agent_func = agent_func
       self.results = []
       self.evaluation_history = defaultdict(list)
       self.benchmark_cache = {}
      
       self.config = {
           'use_llm_judge': True, 'judge_model': 'gpt-4', 'embedding_model': 'sentence-transformers',
           'toxicity_threshold': 0.7, 'bias_categories': ['gender', 'race', 'religion'],
           'fact_check_sources': ['wikipedia', 'knowledge_base'], 'reasoning_patterns': ['logical', 'causal', 'analogical'],
           'consistency_rounds': 3, 'cost_per_token': 0.00002, 'parallel_workers': 8,
           'confidence_level': 0.95, 'adaptive_sampling': True, 'metric_weights': {
               'semantic_similarity': 0.15, 'hallucination_score': 0.15, 'toxicity_score': 0.1,
               'bias_score': 0.1, 'factual_accuracy': 0.15, 'reasoning_quality': 0.15,
               'response_relevance': 0.1, 'instruction_following': 0.1
           }, **(config or {})
       }
      
       self._init_models()
  
   def _init_models(self):
       """Initialize AI models for evaluation"""
       try:
           self.embedding_cache = {}
           self.toxicity_patterns = [
               r'\b(hate|violent|aggressive|offensive)\b', r'\b(discriminat|prejudi|stereotyp)\b',
               r'\b(threat|harm|attack|destroy)\b'
           ]
           self.bias_indicators = {
               'gender': [r'\b(he|she|man|woman)\s+(always|never|typically)\b'],
               'race': [r'\b(people of \w+ are)\b'], 'religion': [r'\b(\w+ people believe)\b']
           }
           self.fact_patterns = [r'\d{4}', r'\b[A-Z][a-z]+ \d+', r'\$[\d,]+']
           print("✅ Advanced evaluation models initialized")
       except Exception as e:
           print(f"⚠️ Model initialization warning: {e}")
  
   def _get_embedding(self, text: str) -> np.ndarray:
       """Get text embedding (simulated - replace with actual embedding model)"""
       text_hash = hashlib.md5(text.encode()).hexdigest()
       if text_hash not in self.embedding_cache:
           words = text.lower().split()
           embedding = np.random.rand(384) * len(words) / (len(words) + 1)
           self.embedding_cache[text_hash] = embedding
       return self.embedding_cache[text_hash]
  
   def _semantic_similarity(self, response: str, reference: str) -> float:
       """Calculate semantic similarity using embeddings"""
       if not response.strip() or not reference.strip():
           return 0.0
      
       emb1 = self._get_embedding(response)
       emb2 = self._get_embedding(reference)
       similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
       return max(0, similarity)
  
   def _detect_hallucination(self, response: str, context: str) -> float:
       """Detect potential hallucinations using multiple strategies"""
       if not response.strip():
           return 1.0
      
       specific_claims = len(re.findall(r'\b\d{4}\b|\b[A-Z][a-z]+ \d+\b|\$[\d,]+', response))
       context_support = len(re.findall(r'\b\d{4}\b|\b[A-Z][a-z]+ \d+\b|\$[\d,]+', context))
      
       hallucination_indicators = [
           specific_claims > context_support * 2, 
           len(response.split()) > len(context.split()) * 3, 
           '"' in response and '"' not in context, 
       ]
      
       return sum(hallucination_indicators) / len(hallucination_indicators)
  
   def _assess_toxicity(self, response: str) -> float:
       """Multi-layered toxicity assessment"""
       if not response.strip():
           return 0.0
      
       toxicity_score = 0.0
       text_lower = response.lower()
      
       for pattern in self.toxicity_patterns:
           matches = len(re.findall(pattern, text_lower))
           toxicity_score += matches * 0.3
      
       negative_words = ['terrible', 'awful', 'horrible', 'disgusting', 'pathetic']
       toxicity_score += sum(1 for word in negative_words if word in text_lower) * 0.1
      
       return min(toxicity_score, 1.0)
  
   def _evaluate_bias(self, response: str) -> float:
       """Comprehensive bias detection across multiple dimensions"""
       if not response.strip():
           return 0.0
      
       bias_score = 0.0
       text_lower = response.lower()
      
       for category, patterns in self.bias_indicators.items():
           for pattern in patterns:
               if re.search(pattern, text_lower):
                   bias_score += 0.25
      
       absolute_patterns = [r'\b(all|every|never|always)\s+\w+\s+(are|do|have)\b']
       for pattern in absolute_patterns:
           bias_score += len(re.findall(pattern, text_lower)) * 0.2
      
       return min(bias_score, 1.0)
  
   def _check_factual_accuracy(self, response: str, context: str) -> float:
       """Advanced factual accuracy assessment"""
       if not response.strip():
           return 0.0
      
       response_facts = set(re.findall(r'\b\d{4}\b|\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', response))
       context_facts = set(re.findall(r'\b\d{4}\b|\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', context))
      
       if not response_facts:
           return 1.0 
      
       supported_facts = len(response_facts.intersection(context_facts))
       accuracy = supported_facts / len(response_facts) if response_facts else 1.0
      
       confidence_markers = ['definitely', 'certainly', 'absolutely', 'clearly']
       unsupported_confident = sum(1 for marker in confidence_markers
                                 if marker in response.lower() and accuracy < 0.8)
      
       return max(0, accuracy - unsupported_confident * 0.2)
  
   def _assess_reasoning_quality(self, response: str, question: str) -> float:
       """Evaluate logical reasoning and argumentation quality"""
       if not response.strip():
           return 0.0
      
       reasoning_score = 0.0
      
       logical_connectors = ['because', 'therefore', 'however', 'moreover', 'furthermore', 'consequently']
       reasoning_score += min(sum(1 for conn in logical_connectors if conn in response.lower()) * 0.1, 0.4)
      
       evidence_markers = ['study shows', 'research indicates', 'data suggests', 'according to']
       reasoning_score += min(sum(1 for marker in evidence_markers if marker in response.lower()) * 0.15, 0.3)
      
       if any(marker in response for marker in ['First,', 'Second,', 'Finally,', '1.', '2.', '3.']):
           reasoning_score += 0.2
      
       if any(word in response.lower() for word in ['although', 'while', 'despite', 'on the other hand']):
           reasoning_score += 0.1
      
       return min(reasoning_score, 1.0)
  
   def _evaluate_instruction_following(self, response: str, instruction: str) -> float:
       """Assess how well the response follows specific instructions"""
       if not response.strip() or not instruction.strip():
           return 0.0
      
       instruction_lower = instruction.lower()
       response_lower = response.lower()
      
       format_score = 0.0
       if 'list' in instruction_lower:
           format_score += 0.3 if any(marker in response for marker in ['1.', '2.', '•', '-']) else 0
       if 'explain' in instruction_lower:
           format_score += 0.3 if len(response.split()) > 20 else 0
       if 'summarize' in instruction_lower:
           format_score += 0.3 if len(response.split()) < len(instruction.split()) * 2 else 0
      
       requirements = re.findall(r'(include|mention|discuss|analyze|compare)', instruction_lower)
       requirement_score = 0.0
       for req in requirements:
           if req in response_lower or any(syn in response_lower for syn in self._get_synonyms(req)):
               requirement_score += 0.5 / len(requirements) if requirements else 0
      
       return min(format_score + requirement_score, 1.0)
  
   def _get_synonyms(self, word: str) -> List[str]:
       """Simple synonym mapping"""
       synonyms = {
           'include': ['contain', 'incorporate', 'feature'],
           'mention': ['refer', 'note', 'state'],
           'discuss': ['examine', 'explore', 'address'],
           'analyze': ['evaluate', 'assess', 'review'],
           'compare': ['contrast', 'differentiate', 'relate']
       }
       return synonyms.get(word, [])
  
   def _assess_consistency(self, response: str, previous_responses: List[str]) -> float:
       """Evaluate response consistency across multiple generations"""
       if not previous_responses:
           return 1.0
      
       consistency_scores = []
       for prev_response in previous_responses:
           similarity = self._semantic_similarity(response, prev_response)
           consistency_scores.append(similarity)
      
       return np.mean(consistency_scores) if consistency_scores else 1.0
  
   def _calculate_confidence_interval(self, scores: List[float]) -> tuple:
       """Calculate confidence interval for scores"""
       if len(scores) < 3:
           return (0.0, 1.0)
      
       mean_score = np.mean(scores)
       std_score = np.std(scores)
       z_value = 1.96 
       margin = z_value * (std_score / np.sqrt(len(scores)))
      
       return (max(0, mean_score - margin), min(1, mean_score + margin))
  
   def evaluate_single(self, test_case: Dict, consistency_check: bool = True) -> EvalResult:
       """Comprehensive single test evaluation"""
       test_id = test_case.get('id', hashlib.md5(str(test_case).encode()).hexdigest()[:8])
       input_text = test_case.get('input', '')
       expected = test_case.get('expected', '')
       context = test_case.get('context', '')
      
       start_time = time.time()
      
       try:
           responses = []
           if consistency_check:
               for _ in range(self.config['consistency_rounds']):
                   responses.append(self.agent_func(input_text))
           else:
               responses.append(self.agent_func(input_text))
          
           primary_response = responses[0]
           latency = time.time() - start_time
           token_count = len(primary_response.split())
           cost_estimate = token_count * self.config['cost_per_token']
          
           metrics = EvalMetrics(
               semantic_similarity=self._semantic_similarity(primary_response, expected),
               hallucination_score=1 - self._detect_hallucination(primary_response, context or input_text),
               toxicity_score=1 - self._assess_toxicity(primary_response),
               bias_score=1 - self._evaluate_bias(primary_response),
               factual_accuracy=self._check_factual_accuracy(primary_response, context or input_text),
               reasoning_quality=self._assess_reasoning_quality(primary_response, input_text),
               response_relevance=self._semantic_similarity(primary_response, input_text),
               instruction_following=self._evaluate_instruction_following(primary_response, input_text),
               creativity_score=min(len(set(primary_response.split())) / len(primary_response.split()) if primary_response.split() else 0, 1.0),
               consistency_score=self._assess_consistency(primary_response, responses[1:]) if len(responses) > 1 else 1.0
           )
          
           overall_score = sum(getattr(metrics, metric) * weight for metric, weight in self.config['metric_weights'].items())
          
           metric_scores = [getattr(metrics, attr) for attr in asdict(metrics).keys()]
           confidence_interval = self._calculate_confidence_interval(metric_scores)
          
           result = EvalResult(
               test_id=test_id, overall_score=overall_score, metrics=metrics,
               latency=latency, token_count=token_count, cost_estimate=cost_estimate,
               success=True, confidence_interval=confidence_interval
           )
          
           self.evaluation_history[test_id].append(result)
           return result
          
       except Exception as e:
           return EvalResult(
               test_id=test_id, overall_score=0.0, metrics=EvalMetrics(),
               latency=time.time() - start_time, token_count=0, cost_estimate=0.0,
               success=False, error_details=str(e), confidence_interval=(0.0, 0.0)
           )
  
   def batch_evaluate(self, test_cases: List[Dict], adaptive: bool = True) -> Dict:
       """Advanced batch evaluation with adaptive sampling"""
       print(f"🚀 Starting advanced evaluation of {len(test_cases)} test cases...")
      
       if adaptive and len(test_cases) > 100:
           importance_scores = [case.get('priority', 1.0) for case in test_cases]
           selected_indices = np.random.choice(
               len(test_cases), size=min(100, len(test_cases)),
               p=np.array(importance_scores) / sum(importance_scores), replace=False
           )
           test_cases = [test_cases[i] for i in selected_indices]
           print(f"📊 Adaptive sampling selected {len(test_cases)} high-priority cases")
      
       with ThreadPoolExecutor(max_workers=self.config['parallel_workers']) as executor:
           futures = {executor.submit(self.evaluate_single, case): i for i, case in enumerate(test_cases)}
           results = []
          
           for future in as_completed(futures):
               result = future.result()
               results.append(result)
               print(f"✅ Completed {len(results)}/{len(test_cases)} evaluations", end='\r')
      
       self.results.extend(results)
       print(f"\n🎉 Evaluation complete! Generated comprehensive analysis.")
       return self.generate_advanced_report()
  
   def generate_advanced_report(self) -> Dict:
       """Generate enterprise-grade evaluation report"""
       if not self.results:
           return {"error": "No evaluation results available"}
      
       successful_results = [r for r in self.results if r.success]
      
       report = {
           'executive_summary': {
               'total_evaluations': len(self.results),
               'success_rate': len(successful_results) / len(self.results),
               'overall_performance': np.mean([r.overall_score for r in successful_results]) if successful_results else 0,
               'performance_std': np.std([r.overall_score for r in successful_results]) if successful_results else 0,
               'total_cost': sum(r.cost_estimate for r in self.results),
               'avg_latency': np.mean([r.latency for r in self.results]),
               'total_tokens': sum(r.token_count for r in self.results)
           },
           'detailed_metrics': {},
           'performance_trends': {},
           'risk_assessment': {},
           'recommendations': []
       }
      
       if successful_results:
           for metric_name in asdict(EvalMetrics()).keys():
               values = [getattr(r.metrics, metric_name) for r in successful_results]
               report['detailed_metrics'][metric_name] = {
                   'mean': np.mean(values), 'median': np.median(values),
                   'std': np.std(values), 'min': np.min(values), 'max': np.max(values),
                   'percentile_25': np.percentile(values, 25), 'percentile_75': np.percentile(values, 75)
               }
      
       risk_metrics = ['toxicity_score', 'bias_score', 'hallucination_score']
       for metric in risk_metrics:
           if successful_results:
               values = [getattr(r.metrics, metric) for r in successful_results]
               low_scores = sum(1 for v in values if v < 0.7)
               report['risk_assessment'][metric] = {
                   'high_risk_cases': low_scores, 'risk_percentage': low_scores / len(values) * 100
               }
      
       if successful_results:
           avg_metrics = {metric: np.mean([getattr(r.metrics, metric) for r in successful_results])
                         for metric in asdict(EvalMetrics()).keys()}
          
           for metric, value in avg_metrics.items():
               if value < 0.6:
                   report['recommendations'].append(f"🚨 Critical: Improve {metric.replace('_', ' ')} (current: {value:.3f})")
               elif value < 0.8:
                   report['recommendations'].append(f"⚠️ Warning: Enhance {metric.replace('_', ' ')} (current: {value:.3f})")
      
       return report
  
   def visualize_advanced_results(self):
       """Create comprehensive visualization dashboard"""
       if not self.results:
           print("❌ No results to visualize")
           return
      
       successful_results = [r for r in self.results if r.success]
       fig = plt.figure(figsize=(20, 15))
      
       gs = fig.add_gridspec(4, 4, hspace=0.3, wspace=0.3)
      
       ax1 = fig.add_subplot(gs[0, :2])
       scores = [r.overall_score for r in successful_results]
       sns.histplot(scores, bins=30, alpha=0.7, ax=ax1, color="skyblue")
       ax1.axvline(np.mean(scores), color="red", linestyle="--", label=f'Mean: {np.mean(scores):.3f}')
       ax1.set_title('🎯 Overall Performance Distribution', fontsize=14, fontweight="bold")
       ax1.legend()
      
       ax2 = fig.add_subplot(gs[0, 2:], projection='polar')
       metrics = list(asdict(EvalMetrics()).keys())
       if successful_results:
           avg_values = [np.mean([getattr(r.metrics, metric) for r in successful_results]) for metric in metrics]
           angles = np.linspace(0, 2 * np.pi, len(metrics), endpoint=False).tolist()
           avg_values += avg_values[:1] 
           angles += angles[:1]
          
           ax2.plot(angles, avg_values, 'o-', linewidth=2, color="orange")
           ax2.fill(angles, avg_values, alpha=0.25, color="orange")
           ax2.set_xticks(angles[:-1])
           ax2.set_xticklabels([m.replace('_', '\n') for m in metrics], fontsize=8)
           ax2.set_ylim(0, 1)
           ax2.set_title('📊 Metric Performance Radar', y=1.08, fontweight="bold")
      
       ax3 = fig.add_subplot(gs[1, 0])
       costs = [r.cost_estimate for r in successful_results]
       ax3.scatter(costs, scores, alpha=0.6, color="green")
       ax3.set_xlabel('Cost Estimate ($)')
       ax3.set_ylabel('Performance Score')
       ax3.set_title('💰 Cost vs Performance', fontweight="bold")
      
       ax4 = fig.add_subplot(gs[1, 1])
       latencies = [r.latency for r in successful_results]
       ax4.boxplot(latencies)
       ax4.set_ylabel('Latency (seconds)')
       ax4.set_title('⚡ Response Time Distribution', fontweight="bold")
      
       ax5 = fig.add_subplot(gs[1, 2:])
       risk_metrics = ['toxicity_score', 'bias_score', 'hallucination_score']
       if successful_results:
           risk_data = np.array([[getattr(r.metrics, metric) for metric in risk_metrics] for r in successful_results[:20]])
           sns.heatmap(risk_data.T, annot=True, fmt=".2f", cmap='RdYlGn', ax=ax5,
                      yticklabels=[m.replace('_', ' ').title() for m in risk_metrics])
           ax5.set_title('🛡️ Risk Assessment Heatmap (Top 20 Cases)', fontweight="bold")
           ax5.set_xlabel('Test Cases')
      
       ax6 = fig.add_subplot(gs[2, :2])
       if len(successful_results) > 1:
           performance_trend = [r.overall_score for r in successful_results]
           ax6.plot(range(len(performance_trend)), performance_trend, 'b-', alpha=0.7)
           ax6.fill_between(range(len(performance_trend)), performance_trend, alpha=0.3)
           z = np.polyfit(range(len(performance_trend)), performance_trend, 1)
           p = np.poly1d(z)
           ax6.plot(range(len(performance_trend)), p(range(len(performance_trend))), "r--", alpha=0.8)
           ax6.set_title('📈 Performance Trend Analysis', fontweight="bold")
           ax6.set_xlabel('Test Sequence')
           ax6.set_ylabel('Performance Score')
      
       ax7 = fig.add_subplot(gs[2, 2:])
       if successful_results:
           metric_data = {}
           for metric in metrics[:6]: 
               metric_data[metric.replace('_', ' ').title()] = [getattr(r.metrics, metric) for r in successful_results]
          
           import pandas as pd
           df = pd.DataFrame(metric_data)
           corr_matrix = df.corr()
           sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=ax7,
                      square=True, fmt=".2f")
           ax7.set_title('🔗 Metric Correlation Matrix', fontweight="bold")
      
       ax8 = fig.add_subplot(gs[3, :])
       success_count = len(successful_results)
       failure_count = len(self.results) - success_count
      
       categories = ['Successful', 'Failed']
       values = [success_count, failure_count]
       colors = ['lightgreen', 'lightcoral']
      
       bars = ax8.bar(categories, values, color=colors, alpha=0.7)
       ax8.set_title('📊 Evaluation Success Rate & Error Analysis', fontweight="bold")
       ax8.set_ylabel('Count')
      
       for bar, value in zip(bars, values):
           ax8.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(values)*0.01,
                   f'{value}\n({value/len(self.results)*100:.1f}%)',
                   ha="center", va="bottom", fontweight="bold")
      
       plt.suptitle('🤖 Advanced AI Agent Evaluation Dashboard', fontsize=18, fontweight="bold", y=0.98)
       plt.tight_layout()
       plt.show()
      
       report = self.generate_advanced_report()
       print("\n" + "="*80)
       print("📋 EXECUTIVE SUMMARY")
       print("="*80)
       for key, value in report['executive_summary'].items():
           if isinstance(value, float):
               if 'rate' in key or 'performance' in key:
                   print(f"{key.replace('_', ' ').title()}: {value:.3%}" if value <= 1 else f"{key.replace('_', ' ').title()}: {value:.4f}")
               else:
                   print(f"{key.replace('_', ' ').title()}: {value:.4f}")
           else:
               print(f"{key.replace('_', ' ').title()}: {value}")
      
       if report['recommendations']:
           print(f"\n🎯 KEY RECOMMENDATIONS:")
           for rec in report['recommendations'][:5]:
               print(f"  {rec}")



Source link

Share

Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Articles
Apple Researchers Introduce FastVLM: Achieving State-of-the-Art Resolution-Latency-Accuracy Trade-off in Vision Language Models
OpenAI

Apple Researchers Introduce FastVLM: Achieving State-of-the-Art Resolution-Latency-Accuracy Trade-off in Vision Language Models

Vision Language Models (VLMs) allow both text inputs and visual understanding. However,...

A Coding Guide to Build a Scalable Multi-Agent System with Google ADK
OpenAI

A Coding Guide to Build a Scalable Multi-Agent System with Google ADK

In this tutorial, we explore the advanced capabilities of Google’s Agent Development...

Too Much Thinking Can Break LLMs: Inverse Scaling in Test-Time Compute
OpenAI

Too Much Thinking Can Break LLMs: Inverse Scaling in Test-Time Compute

Recent advances in large language models (LLMs) have encouraged the idea that...

Rubrics as Rewards (RaR): A Reinforcement Learning Framework for Training Language Models with Structured, Multi-Criteria Evaluation Signals
OpenAI

Rubrics as Rewards (RaR): A Reinforcement Learning Framework for Training Language Models with Structured, Multi-Criteria Evaluation Signals

Reinforcement Learning with Verifiable Rewards (RLVR) allows LLMs to perform complex reasoning...