cp-library

This documentation is automatically generated by online-judge-tools/verification-helper

View the Project on GitHub kobejean/cp-library

:warning: cp_library/perf/examples/rank_benchmark.py

Depends on

Code

from cp_library.perf.benchmark import Benchmark, BenchmarkConfig, TestCase
from cp_library.perf.generators import create_generator
from cp_library.alg.iter.rank.irank_fn import irank
from cp_library.alg.iter.rank.irank_multi_fn import irank as irank_multi
from itertools import product
from typing import Dict, List, Any, Callable, Tuple

class RankingBenchmark(Benchmark):
    """Benchmark for comparing ranking algorithm implementations"""
    
    def generate_test_cases(self, param_grid: Dict[str, List[Any]]) -> List[TestCase]:
        """Generate test cases from parameter grid"""
        test_cases = []
        
        # Extract parameters
        param_names = list(param_grid.keys())
        param_values = [param_grid[name] for name in param_names]
        
        for values in product(*param_values):
            params = dict(zip(param_names, values))
            
            # Generate data based on distribution parameter
            size = params['size']
            distribution = params['distribution']
            
            # Use the generator factory
            generator = create_generator(distribution)
            data = generator.generate(size=size)
            
            # Create descriptive name
            name = f"size={size}, dist={distribution}, distinct={params.get('distinct', False)}"
            
            test_cases.append(TestCase(
                name=name,
                params=params,
                data={'data': data, 'distinct': params.get('distinct', False)}
            ))
        
        return test_cases
    
    def get_implementations(self) -> Dict[str, Callable]:
        """Return implementations to benchmark"""
        return {
            'irank': lambda data, distinct=False: irank(data.copy(), distinct=distinct),
            'irank_multi': lambda data, distinct=False: irank_multi(data.copy(), distinct=distinct)
        }
    
    def measure_time(self, func: Callable, test_data: Dict) -> Tuple[Any, float]:
        """Override to handle dict test data"""
        import time
        
        # Warmup
        for _ in range(self.config.warmup):
            func(**test_data)
        
        # Actual measurement
        start = time.perf_counter()
        for _ in range(self.config.iterations):
            result = func(**test_data)
        elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
        
        return result, elapsed_ms

def run_ranking_benchmark():
    """Run a comprehensive ranking algorithm benchmark"""
    
    # Configure benchmark
    config = BenchmarkConfig(
        name="ranking_algorithms",
        iterations=10,
        warmup=2,
        save_results=True,
        plot_results=False,  # Set to True if matplotlib is installed
        output_dir="./benchmark_results/ranking"
    )
    
    # Create benchmark instance
    benchmark = RankingBenchmark(config)
    
    # Define parameter grid
    param_grid = {
        'size': [100, 1000, 10000, 50000],
        'distribution': ['random', 'sorted', 'reverse', 'duplicates', 'almost_sorted', 'plateau'],
        'distinct': [True, False]
    }
    
    # Run benchmark
    benchmark.run(param_grid)
    
    # Print summary statistics
    print_summary(benchmark.results)

def print_summary(results):
    """Print summary statistics from benchmark results"""
    from collections import defaultdict
    import statistics
    
    # Group results by implementation
    impl_times = defaultdict(list)
    
    for result in results:
        if result.time_ms != float('inf'):
            impl_times[result.implementation].append(result.time_ms)
    
    print("\n" + "="*60)
    print("SUMMARY STATISTICS")
    print("="*60)
    
    print(f"\n{'Implementation':<20} {'Mean (ms)':<12} {'Std Dev':<12} {'Min':<12} {'Max':<12} {'Samples':<10}")
    print("-"*80)
    
    for impl, times in impl_times.items():
        if times:
            mean_time = statistics.mean(times)
            std_time = statistics.stdev(times) if len(times) > 1 else 0
            min_time = min(times)
            max_time = max(times)
            
            print(f"{impl:<20} {mean_time:<12.3f} {std_time:<12.3f} {min_time:<12.3f} {max_time:<12.3f} {len(times):<10}")
    
    # Calculate speedup ratios
    if len(impl_times) == 2:
        impls = list(impl_times.keys())
        impl1_mean = statistics.mean(impl_times[impls[0]])
        impl2_mean = statistics.mean(impl_times[impls[1]])
        speedup = impl1_mean / impl2_mean
        
        faster = impls[1] if speedup > 1 else impls[0]
        ratio = max(speedup, 1/speedup)
        
        print(f"\n{faster} is {ratio:.2f}x faster on average")

if __name__ == "__main__":
    run_ranking_benchmark()
"""
Declarative benchmark framework with minimal boilerplate.

Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""

import time
import json
import statistics
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict

@dataclass
class BenchmarkConfig:
    """Configuration for benchmark runs"""
    name: str
    sizes: List[int] = None
    operations: List[str] = None
    iterations: int = 10
    warmup: int = 2
    output_dir: str = "./output/benchmark_results"
    save_results: bool = True
    plot_results: bool = True
    plot_scale: str = "loglog"  # Options: "loglog", "linear", "semilogx", "semilogy"
    
    def __post_init__(self):
        if self.sizes is None:
            self.sizes = [100, 1000, 10000, 100000]
        if self.operations is None:
            self.operations = ['default']

class Benchmark:
    """Declarative benchmark framework using decorators"""
    
    def __init__(self, config: BenchmarkConfig):
        self.config = config
        self.data_generators = {}
        self.implementations = {}
        self.validators = {}
        self.results = []
        
    def data_generator(self, name: str = "default"):
        """Decorator to register data generator"""
        def decorator(func):
            self.data_generators[name] = func
            return func
        return decorator
    
    def implementation(self, name: str, operations: Union[str, List[str]] = None):
        """Decorator to register implementation"""
        if operations is None:
            operations = ['default']
        elif isinstance(operations, str):
            operations = [operations]
            
        def decorator(func):
            for op in operations:
                if op not in self.implementations:
                    self.implementations[op] = {}
                self.implementations[op][name] = func
            return func
        return decorator
    
    def validator(self, operation: str = "default"):
        """Decorator to register custom validator"""
        def decorator(func):
            self.validators[operation] = func
            return func
        return decorator
    
    def measure_time(self, func: Callable, data: Any) -> tuple[Any, float]:
        """Measure execution time with warmup"""
        # Warmup runs
        for _ in range(self.config.warmup):
            try:
                func(data)
            except Exception:
                # If warmup fails, let the main measurement handle the error
                break
        
        # Actual measurement
        start = time.perf_counter()
        for _ in range(self.config.iterations):
            result = func(data)
        elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
        
        return result, elapsed_ms
    
    def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
        """Validate result using custom validator or default comparison"""
        if operation in self.validators:
            return self.validators[operation](expected, actual)
        return expected == actual
    
    def run(self):
        """Run all benchmarks"""
        print(f"Running {self.config.name}")
        print(f"Sizes: {self.config.sizes}")
        print(f"Operations: {self.config.operations}")
        print("="*80)
        
        for size in self.config.sizes:
            for operation in self.config.operations:
                print(f"\nOperation: {operation}, Size: {size}")
                print("-" * 50)
                
                # Generate test data
                generator = self.data_generators.get(operation, 
                                                   self.data_generators.get('default'))
                if not generator:
                    raise ValueError(f"No data generator for operation: {operation}")
                
                test_data = generator(size, operation)
                
                # Get implementations for this operation
                impls = self.implementations.get(operation, {})
                if not impls:
                    print(f"No implementations for operation: {operation}")
                    continue
                
                # Run reference implementation first
                ref_name, ref_impl = next(iter(impls.items()))
                expected_result, _ = self.measure_time(ref_impl, test_data)
                
                # Run all implementations
                for impl_name, impl_func in impls.items():
                    try:
                        result, time_ms = self.measure_time(impl_func, test_data)
                        correct = self.validate_result(expected_result, result, operation)
                        
                        # Store result
                        self.results.append({
                            'operation': operation,
                            'size': size,
                            'implementation': impl_name,
                            'time_ms': time_ms,
                            'correct': correct,
                            'error': None
                        })
                        
                        status = "OK" if correct else "FAIL"
                        print(f"  {impl_name:<20} {time_ms:>8.3f} ms  {status}")
                        
                    except Exception as e:
                        self.results.append({
                            'operation': operation,
                            'size': size,
                            'implementation': impl_name,
                            'time_ms': float('inf'),
                            'correct': False,
                            'error': str(e)
                        })
                        print(f"  {impl_name:<20} ERROR: {str(e)[:40]}")
        
        # Save and plot results
        if self.config.save_results:
            self._save_results()
        
        if self.config.plot_results:
            self._plot_results()
        
        # Print summary
        self._print_summary()
    
    def _save_results(self):
        """Save results to JSON"""
        output_dir = Path(self.config.output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        
        filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\nResults saved to {filename}")
    
    def _plot_results(self):
        """Generate plots using matplotlib if available"""
        try:
            import matplotlib.pyplot as plt
            
            output_dir = Path(self.config.output_dir)
            output_dir.mkdir(parents=True, exist_ok=True)
            
            # Group and prepare data for plotting
            data_by_op = self._group_results_by_operation()
            
            # Create plots for each operation
            for operation, operation_data in data_by_op.items():
                self._create_performance_plot(plt, operation, operation_data, output_dir)
                
        except ImportError:
            print("Matplotlib not available - skipping plots")
        except Exception as e:
            print(f"Plotting failed: {e}")
    
    def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
        """Group results by operation and size for plotting"""
        data_by_op = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['time_ms'] != float('inf') and r['correct']:
                data_by_op[r['operation']][r['size']].append({
                    'implementation': r['implementation'],
                    'time_ms': r['time_ms']
                })
        return data_by_op
    
    def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
        """Create a performance plot for a single operation"""
        sizes = sorted(operation_data.keys())
        implementations = set()
        for size_data in operation_data.values():
            for entry in size_data:
                implementations.add(entry['implementation'])
        
        implementations = sorted(implementations)
        
        plt.figure(figsize=(10, 6))
        for impl in implementations:
            impl_times = []
            impl_sizes = []
            for size in sizes:
                times = [entry['time_ms'] for entry in operation_data[size] 
                        if entry['implementation'] == impl]
                if times:
                    impl_times.append(statistics.mean(times))
                    impl_sizes.append(size)
            
            if impl_times:
                plt.plot(impl_sizes, impl_times, 'o-', label=impl)
        
        plt.xlabel('Input Size')
        plt.ylabel('Time (ms)')
        plt.title(f'{self.config.name} - {operation} Operation')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # Apply the configured scaling
        if self.config.plot_scale == "loglog":
            plt.loglog()
        elif self.config.plot_scale == "linear":
            pass  # Default linear scale
        elif self.config.plot_scale == "semilogx":
            plt.semilogx()
        elif self.config.plot_scale == "semilogy":
            plt.semilogy()
        else:
            # Default to loglog if invalid option
            plt.loglog()
        
        plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
        plt.savefig(plot_file, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"Plot saved: {plot_file}")
    
    def _print_summary(self):
        """Print performance summary"""
        print("\n" + "="*80)
        print("PERFORMANCE SUMMARY")
        print("="*80)
        
        # Group by operation
        by_operation = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['error'] is None and r['time_ms'] != float('inf'):
                by_operation[r['operation']][r['implementation']].append(r['time_ms'])
        
        print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
        print("-" * 70)
        
        for op, impl_times in sorted(by_operation.items()):
            # Calculate averages
            avg_times = [(impl, statistics.mean(times)) 
                        for impl, times in impl_times.items()]
            avg_times.sort(key=lambda x: x[1])
            
            if avg_times:
                best_impl, best_time = avg_times[0]
                worst_time = avg_times[-1][1]
                speedup = worst_time / best_time if best_time > 0 else 0
                
                print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")


from cp_library.perf.generators import create_generator
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
             https://kobejean.github.io/cp-library               
'''




def irank(A: list[int], distinct = False):
    P = Packer(len(A)-1); V = P.enumerate(A); V.sort()
    if distinct:
        for r, ai in enumerate(V): a, i = P.dec(ai); A[i], V[r] = r, a
    elif V:
        r, p = -1, V[-1]+1 # set p to unique value to trigger `if a != p` on first elm
        for ai in V:
            a, i = P.dec(ai)
            if a!=p: V[r:=r+1] = p = a
            A[i] = r
        del V[r+1:]
    return V



class Packer:
    def __init__(P, mx: int):
        P.s = mx.bit_length()
        P.m = (1 << P.s) - 1
    def enc(P, a: int, b: int): return a << P.s | b
    def dec(P, x: int) -> tuple[int, int]: return x >> P.s, x & P.m
    def enumerate(P, A, reverse=False): P.ienumerate(A:=A.copy(), reverse); return A
    def ienumerate(P, A, reverse=False):
        if reverse:
            for i,a in enumerate(A): A[i] = P.enc(-a, i)
        else:
            for i,a in enumerate(A): A[i] = P.enc(a, i)
    def indices(P, A: list[int]): P.iindices(A:=A.copy()); return A
    def iindices(P, A):
        for i,a in enumerate(A): A[i] = P.m&a


def max2(a, b):
    return a if a > b else b

def irank(*A: list[int], distinct = False):
    N = mxj = 0
    for Ai in A: N += len(Ai); mxj = max2(mxj, len(Ai))
    P = Packer3(len(A)-1, mxj); V = P.enumerate(A, N); V.sort()
    if distinct:
        for r,aij in enumerate(V):a,i,j=P.dec(aij);A[i][j],V[r]=r,a
    elif V:
        r, p = -1, V[-1]+1 # set p to unique value to trigger `if a != p` on first elm
        for aij in V:
            a,i,j=P.dec(aij)
            if a!=p:V[r:=r+1]=p=a
            A[i][j]=r
        del V[r+1:]
    return V

class Packer3:
    def __init__(P, mxb: int, mxc: int):
        bb, bc = mxb.bit_length(), mxc.bit_length()
        P.mc, P.mb, P.sb, P.sa = (1<<bc)-1, (1<<bb)-1, bc, bc+bb
    def enc(P, a: int, b: int, c: int): return a << P.sa | b << P.sb | c
    def dec(P, x: int) -> tuple[int, int, int]: return x >> P.sa, (x >> P.sb) & P.mb, x & P.mc
    def enumerate(P, A, N, reverse=False): 
        V, k = [0]*N, 0
        if reverse:
            for i,Ai in enumerate(A):
                for j, a in enumerate(Ai):V[k]=P.enc(-a, i, j);k+=1
        else:
            for i,Ai in enumerate(A):
                for j, a in enumerate(Ai):V[k]=P.enc(a, i, j);k+=1
        return V
from itertools import product
from typing import Dict, List, Any, Callable, Tuple

class RankingBenchmark(Benchmark):
    """Benchmark for comparing ranking algorithm implementations"""
    
    def generate_test_cases(self, param_grid: Dict[str, List[Any]]) -> List[TestCase]:
        """Generate test cases from parameter grid"""
        test_cases = []
        
        # Extract parameters
        param_names = list(param_grid.keys())
        param_values = [param_grid[name] for name in param_names]
        
        for values in product(*param_values):
            params = dict(zip(param_names, values))
            
            # Generate data based on distribution parameter
            size = params['size']
            distribution = params['distribution']
            
            # Use the generator factory
            generator = create_generator(distribution)
            data = generator.generate(size=size)
            
            # Create descriptive name
            name = f"size={size}, dist={distribution}, distinct={params.get('distinct', False)}"
            
            test_cases.append(TestCase(
                name=name,
                params=params,
                data={'data': data, 'distinct': params.get('distinct', False)}
            ))
        
        return test_cases
    
    def get_implementations(self) -> Dict[str, Callable]:
        """Return implementations to benchmark"""
        return {
            'irank': lambda data, distinct=False: irank(data.copy(), distinct=distinct),
            'irank_multi': lambda data, distinct=False: irank_multi(data.copy(), distinct=distinct)
        }
    
    def measure_time(self, func: Callable, test_data: Dict) -> Tuple[Any, float]:
        """Override to handle dict test data"""
        
        # Warmup
        for _ in range(self.config.warmup):
            func(**test_data)
        
        # Actual measurement
        start = time.perf_counter()
        for _ in range(self.config.iterations):
            result = func(**test_data)
        elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
        
        return result, elapsed_ms

def run_ranking_benchmark():
    """Run a comprehensive ranking algorithm benchmark"""
    
    # Configure benchmark
    config = BenchmarkConfig(
        name="ranking_algorithms",
        iterations=10,
        warmup=2,
        save_results=True,
        plot_results=False,  # Set to True if matplotlib is installed
        output_dir="./benchmark_results/ranking"
    )
    
    # Create benchmark instance
    benchmark = RankingBenchmark(config)
    
    # Define parameter grid
    param_grid = {
        'size': [100, 1000, 10000, 50000],
        'distribution': ['random', 'sorted', 'reverse', 'duplicates', 'almost_sorted', 'plateau'],
        'distinct': [True, False]
    }
    
    # Run benchmark
    benchmark.run(param_grid)
    
    # Print summary statistics
    print_summary(benchmark.results)

def print_summary(results):
    """Print summary statistics from benchmark results"""
    
    # Group results by implementation
    impl_times = defaultdict(list)
    
    for result in results:
        if result.time_ms != float('inf'):
            impl_times[result.implementation].append(result.time_ms)
    
    print("\n" + "="*60)
    print("SUMMARY STATISTICS")
    print("="*60)
    
    print(f"\n{'Implementation':<20} {'Mean (ms)':<12} {'Std Dev':<12} {'Min':<12} {'Max':<12} {'Samples':<10}")
    print("-"*80)
    
    for impl, times in impl_times.items():
        if times:
            mean_time = statistics.mean(times)
            std_time = statistics.stdev(times) if len(times) > 1 else 0
            min_time = min(times)
            max_time = max(times)
            
            print(f"{impl:<20} {mean_time:<12.3f} {std_time:<12.3f} {min_time:<12.3f} {max_time:<12.3f} {len(times):<10}")
    
    # Calculate speedup ratios
    if len(impl_times) == 2:
        impls = list(impl_times.keys())
        impl1_mean = statistics.mean(impl_times[impls[0]])
        impl2_mean = statistics.mean(impl_times[impls[1]])
        speedup = impl1_mean / impl2_mean
        
        faster = impls[1] if speedup > 1 else impls[0]
        ratio = max(speedup, 1/speedup)
        
        print(f"\n{faster} is {ratio:.2f}x faster on average")

if __name__ == "__main__":
    run_ranking_benchmark()
Back to top page