cp-library

This documentation is automatically generated by online-judge-tools/verification-helper

View the Project on GitHub kobejean/cp-library

:warning: perf/list_view.py

Depends on

Code

#!/usr/bin/env python3
"""
Benchmark comparing operations on list slices vs view objects.
Tests various operations to measure the overhead of slice copying vs view indirection.
"""

import random
import sys
import os

from cp_library.ds.heap.fast_heapq import heapify, heappop, heapreplace, heappush, heappushpop
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.ds.view.view_cls import view

# Configure benchmark
config = BenchmarkConfig(
    name="list_view",
    sizes=[1000000, 100000, 10000, 1000, 100],  # Reverse order to warm up JIT
    operations=['construction', 'sum', 'modify', 'index', 'reverse', 'sort', 'nested_sum', 'pop', 'append'],
    iterations=10,
    warmup=3,
    output_dir="./output/benchmark_results/list_view"
)

# Create benchmark instance
benchmark = Benchmark(config)

# Data generator
@benchmark.data_generator("default")
def generate_view_data(size: int, operation: str):
    """Generate test data for slice/view operations"""
    # Generate random list
    data = [random.randint(0, 1000000) for _ in range(size)]
    
    # Generate random slice boundaries (10-50% of list)
    slice_size = random.randint(size // 10, size // 2)
    start = random.randint(0, size - slice_size)
    end = start + slice_size
    
    return {
        'data': data,
        'start': start,
        'end': end,
        'slice_size': slice_size,
        'search_value': random.randint(0, 1000000),
        'size': size,
        'operation': operation
    }

# Construction operation
@benchmark.implementation("slice", "construction")
def construction_slice(data):
    """Create a slice copy of the list"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice (copies data)
    slice_copy = lst[start:end]
    
    # Return checksum to ensure it's not optimized away
    checksum = 0
    for x in slice_copy:
        checksum ^= x
    return checksum

@benchmark.implementation("view", "construction")
def construction_view(data):
    """Create a view of the list"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view (no copy)
    list_view = view(lst, start, end)
    
    # Return checksum to ensure it's not optimized away
    checksum = 0
    for i in range(len(list_view)):
        checksum ^= list_view[i]
    return checksum

# Sum operation
@benchmark.implementation("slice", "sum")
def sum_slice(data):
    """Sum elements in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and sum
    slice_copy = lst[start:end]
    return sum(slice_copy) & 0xFFFFFFFF  # Keep as 32-bit for checksum

@benchmark.implementation("view", "sum")
def sum_view(data):
    """Sum elements in a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and sum through it
    list_view = view(lst, start, end)
    total = 0
    for i in range(len(list_view)):
        total += list_view[i]
    return total & 0xFFFFFFFF  # Keep as 32-bit for checksum

@benchmark.implementation("view_direct", "sum")
def sum_view_direct(data):
    """Sum elements using direct indexing"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Sum directly without creating slice or view
    total = 0
    for i in range(start, end):
        total += lst[i]
    return total & 0xFFFFFFFF  # Keep as 32-bit for checksum

# Setup functions for operations that need copying
@benchmark.setup("slice", ["modify", "reverse", "sort", "pop", "append"])
def setup_slice_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

@benchmark.setup("view", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

@benchmark.setup("view_direct", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_direct_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

# Modify operation
@benchmark.implementation("slice", "modify")
def modify_slice(data):
    """Modify elements in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and modify
    slice_copy = lst[start:end]
    for i in range(len(slice_copy)):
        slice_copy[i] = (slice_copy[i] * 2) & 0xFFFFFFFF
    
    # Copy back to original positions manually
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "modify")
def modify_view(data):
    """Modify elements through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and modify through it
    list_view = view(lst, start, end)
    for i in range(len(list_view)):
        list_view[i] = (list_view[i] * 2) & 0xFFFFFFFF
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view_direct", "modify")
def modify_view_direct(data):
    """Modify elements using direct indexing"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Modify directly
    for i in range(start, end):
        lst[i] = (lst[i] * 2) & 0xFFFFFFFF
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

# Index operation
@benchmark.implementation("slice", "index")
def index_slice(data):
    """Find index of element in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Create slice and search
    slice_copy = lst[start:end]
    try:
        idx = slice_copy.index(search_value)
        return start + idx  # Return global index
    except ValueError:
        return -1

@benchmark.implementation("view", "index")
def index_view(data):
    """Find index of element in a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Create view and search in it
    list_view = view(lst, start, end)
    try:
        idx = list_view.index(search_value)
        return start + idx  # Return global index
    except ValueError:
        return -1

@benchmark.implementation("view_direct", "index")
def index_view_direct(data):
    """Find index of element using direct search"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Search directly
    for i in range(start, end):
        if lst[i] == search_value:
            return i
    return -1

# Reverse operation
@benchmark.implementation("slice", "reverse")
def reverse_slice(data):
    """Reverse a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice, reverse, and copy back manually
    slice_copy = lst[start:end]
    slice_copy.reverse()
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "reverse")
def reverse_view(data):
    """Reverse through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and reverse through it
    list_view = view(lst, start, end)
    list_view.reverse()
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view_direct", "reverse")
def reverse_view_direct(data):
    """Reverse using direct manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Reverse directly
    left, right = start, end - 1
    while left < right:
        lst[left], lst[right] = lst[right], lst[left]
        left += 1
        right -= 1
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

# Sort operation
@benchmark.implementation("slice", "sort")
def sort_slice(data):
    """Sort a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice, sort, and copy back manually
    slice_copy = lst[start:end]
    slice_copy.sort()
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum of first elements
    checksum = 0
    for i in range(start, min(start + 100, end)):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "sort")
def sort_view(data):
    """Sort through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and sort through it
    list_view = view(lst, start, end)
    list_view.sort()
    
    # Return checksum of first elements
    checksum = 0
    for i in range(start, min(start + 100, end)):
        checksum ^= lst[i]
    return checksum

# Nested sum operation (multiple slices)
@benchmark.implementation("slice", "nested_sum")
def nested_sum_slice(data):
    """Sum multiple overlapping slices"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Create 4 overlapping slices
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        slice_copy = lst[s:e]
        total += sum(slice_copy)
    
    return total & 0xFFFFFFFF

@benchmark.implementation("view", "nested_sum")
def nested_sum_view(data):
    """Sum multiple overlapping views"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Create 4 overlapping views
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        list_view = view(lst, s, e)
        for i in range(len(list_view)):
            total += list_view[i]
    
    return total & 0xFFFFFFFF

@benchmark.implementation("view_direct", "nested_sum")
def nested_sum_view_direct(data):
    """Sum multiple overlapping ranges directly"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Sum 4 overlapping ranges
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        for i in range(s, e):
            total += lst[i]
    
    return total & 0xFFFFFFFF

# Pop operation
@benchmark.implementation("slice", "pop")
def pop_slice(data):
    """Pop from end of slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and pop from it
    slice_copy = lst[start:end]
    checksum = 0
    for _ in range(min(100, len(slice_copy))):  # Pop up to 100 elements
        if slice_copy:
            val = slice_copy.pop()
            checksum ^= val
    
    # Copy back to original (shortened)
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    return checksum

@benchmark.implementation("view", "pop")
def pop_view(data):
    """Pop from end of view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and pop from it
    list_view = view(lst, start, end)
    checksum = 0
    for _ in range(min(100, len(list_view))):  # Pop up to 100 elements
        if len(list_view) > 0:
            val = list_view.pop()
            checksum ^= val
    
    return checksum

@benchmark.implementation("view_direct", "pop")
def pop_view_direct(data):
    """Pop using direct list manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    checksum = 0
    current_end = end
    for _ in range(min(100, end - start)):  # Pop up to 100 elements
        if current_end > start:
            current_end -= 1
            val = lst[current_end]
            checksum ^= val
    
    return checksum

# Append operation
@benchmark.implementation("slice", "append")
def append_slice(data):
    """Append to slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and append to it
    slice_copy = lst[start:end]
    checksum = 0
    for i in range(100):  # Append 100 elements
        val = (i * 17) & 0xFFFFFFFF
        slice_copy.append(val)
        checksum ^= val
    
    # Note: Can't copy back easily since slice grew, so just return checksum
    return checksum

@benchmark.implementation("view", "append")
def append_view(data):
    """Append to view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and append to it
    list_view = view(lst, start, end)
    checksum = 0
    for i in range(min(100, len(lst) - end)):  # Append up to 100 elements or space available
        val = (i * 17) & 0xFFFFFFFF
        list_view.append(val)
        checksum ^= val
    
    return checksum

@benchmark.implementation("view_direct", "append")
def append_view_direct(data):
    """Append using direct list manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    checksum = 0
    current_end = end
    for i in range(min(100, len(lst) - end)):  # Append up to 100 elements or space available
        val = (i * 17) & 0xFFFFFFFF
        if current_end < len(lst):
            lst[current_end] = val
            current_end += 1
            checksum ^= val
    
    return checksum

if __name__ == "__main__":
    # Parse command line args and run appropriate mode
    runner = benchmark.parse_args()
    runner.run()
#!/usr/bin/env python3
"""
Benchmark comparing operations on list slices vs view objects.
Tests various operations to measure the overhead of slice copying vs view indirection.
"""

import random
import sys
import os

'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
             https://kobejean.github.io/cp-library               
'''



def heappush(heap: list, item):
    heap.append(item)
    heapsiftdown(heap, 0, len(heap)-1)

def heappop(heap: list):
    item = heap.pop()
    if heap: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
    return item

def heapreplace(heap: list, item):
    item, heap[0] = heap[0], item; heapsiftup(heap, 0)
    return item

def heappushpop(heap: list, item):
    if heap and heap[0] < item: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
    return item

def heapify(x: list):
    for i in reversed(range(len(x)//2)): heapsiftup(x, i)

def heapsiftdown(heap: list, root: int, pos: int):
    item = heap[pos]
    while root < pos and item < heap[p := (pos-1)>>1]: heap[pos], pos = heap[p], p
    heap[pos] = item

def heapsiftup(heap: list, pos: int):
    n, item, c = len(heap)-1, heap[pos], pos<<1|1
    while c < n and heap[c := c+(heap[c+1]<heap[c])] < item: heap[pos], pos, c = heap[c], c, c<<1|1
    if c == n and heap[c] < item: heap[pos], pos = heap[c], c
    heap[pos] = item

def heappop_max(heap: list):
    item = heap.pop()
    if heap: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
    return item

def heapreplace_max(heap: list, item):
    item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
    return item

def heapify_max(x: list):
    for i in reversed(range(len(x)//2)): heapsiftup_max(x, i)

def heappush_max(heap: list, item):
    heap.append(item); heapsiftdown_max(heap, 0, len(heap)-1)

def heapreplace_max(heap: list, item):
    item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
    return item

def heappushpop_max(heap: list, item):
    if heap and heap[0] > item: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
    return item

def heapsiftdown_max(heap: list, root: int, pos: int):
    item = heap[pos]
    while root < pos and heap[p := (pos-1)>>1] < item: heap[pos], pos = heap[p], p
    heap[pos] = item

def heapsiftup_max(heap: list, pos: int):
    n, item, c = len(heap)-1, heap[pos], pos<<1|1
    while c < n and item < heap[c := c+(heap[c]<heap[c+1])]: heap[pos], pos, c = heap[c], c, c<<1|1
    if c == n and item < heap[c]: heap[pos], pos = heap[c], c
    heap[pos] = item
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

"""
Declarative benchmark framework with minimal boilerplate.

Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""

import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict

@dataclass
class BenchmarkConfig:
    """Configuration for benchmark runs"""
    name: str
    sizes: List[int] = None
    operations: List[str] = None
    iterations: int = 10
    warmup: int = 2
    output_dir: str = "./output/benchmark_results"
    save_results: bool = True
    plot_results: bool = True
    plot_scale: str = "loglog"  # Options: "loglog", "linear", "semilogx", "semilogy"
    progressive: bool = True  # Show results operation by operation across sizes
    # Profiling mode
    profile_mode: bool = False
    profile_size: int = None
    profile_operation: str = None
    profile_implementation: str = None
    
    def __post_init__(self):
        if self.sizes is None:
            self.sizes = [100, 1000, 10000, 100000]
        if self.operations is None:
            self.operations = ['default']

class Benchmark:
    """Declarative benchmark framework using decorators"""
    
    def __init__(self, config: BenchmarkConfig):
        self.config = config
        self.data_generators = {}
        self.implementations = {}
        self.validators = {}
        self.setups = {}
        self.results = []
    
    def profile(self, operation: str = None, size: int = None, implementation: str = None):
        """Create a profiling version of this benchmark"""
        profile_config = BenchmarkConfig(
            name=f"{self.config.name}_profile",
            sizes=self.config.sizes,
            operations=self.config.operations,
            profile_mode=True,
            profile_operation=operation,
            profile_size=size,
            profile_implementation=implementation,
            save_results=False,
            plot_results=False
        )
        
        profile_benchmark = Benchmark(profile_config)
        profile_benchmark.data_generators = self.data_generators
        profile_benchmark.implementations = self.implementations
        profile_benchmark.validators = self.validators
        profile_benchmark.setups = self.setups
        
        return profile_benchmark
    
    def parse_args(self):
        """Parse command line arguments for profiling mode"""
        parser = argparse.ArgumentParser(
            description=f"Benchmark {self.config.name} with optional profiling mode",
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog="""
Examples:
  # Normal benchmark mode
  python benchmark.py
  
  # Profile specific operation and implementation
  python benchmark.py --profile --operation random_access --implementation grid
  
  # Profile with specific size
  python benchmark.py --profile --size 1000000
  
  # Profile all implementations of an operation
  python benchmark.py --profile --operation construction
"""
        )
        
        parser.add_argument('--profile', action='store_true',
                          help='Run in profiling mode (minimal overhead for profilers)')
        parser.add_argument('--operation', type=str, 
                          help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
        parser.add_argument('--size', type=int,
                          help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
        parser.add_argument('--implementation', type=str,
                          help='Specific implementation to profile (default: all)')
        
        args = parser.parse_args()
        
        # If profile mode requested, return a profiling benchmark
        if args.profile:
            return self.profile(
                operation=args.operation,
                size=args.size,
                implementation=args.implementation
            )
        
        # Otherwise return self for normal mode
        return self
        
    def data_generator(self, name: str = "default"):
        """Decorator to register data generator"""
        def decorator(func):
            self.data_generators[name] = func
            return func
        return decorator
    
    def implementation(self, name: str, operations: Union[str, List[str]] = None):
        """Decorator to register implementation"""
        if operations is None:
            operations = ['default']
        elif isinstance(operations, str):
            operations = [operations]
            
        def decorator(func):
            for op in operations:
                if op not in self.implementations:
                    self.implementations[op] = {}
                self.implementations[op][name] = func
            return func
        return decorator
    
    def validator(self, operation: str = "default"):
        """Decorator to register custom validator"""
        def decorator(func):
            self.validators[operation] = func
            return func
        return decorator
    
    def setup(self, name: str, operations: Union[str, List[str]] = None):
        """Decorator to register setup function that runs before timing"""
        if operations is None:
            operations = ['default']
        elif isinstance(operations, str):
            operations = [operations]
            
        def decorator(func):
            for op in operations:
                if op not in self.setups:
                    self.setups[op] = {}
                self.setups[op][name] = func
            return func
        return decorator
    
    def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
        """Measure execution time with warmup and optional setup"""
        # Warmup runs
        for _ in range(self.config.warmup):
            try:
                if setup_func:
                    setup_data = setup_func(data)
                    func(setup_data)
                else:
                    func(data)
            except Exception:
                # If warmup fails, let the main measurement handle the error
                break
        
        # Actual measurement
        start = time.perf_counter()
        for _ in range(self.config.iterations):
            if setup_func:
                setup_data = setup_func(data)
                result = func(setup_data)
            else:
                result = func(data)
        elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
        
        return result, elapsed_ms
    
    def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
        """Validate result using custom validator or default comparison"""
        if operation in self.validators:
            return self.validators[operation](expected, actual)
        return expected == actual
    
    def run(self):
        """Run all benchmarks"""
        if self.config.profile_mode:
            self._run_profile_mode()
        else:
            self._run_normal_mode()
    
    def _run_normal_mode(self):
        """Run normal benchmark mode"""
        print(f"Running {self.config.name}")
        print(f"Sizes: {self.config.sizes}")
        print(f"Operations: {self.config.operations}")
        print("="*80)
        
        # Always show progressive results: operation by operation across all sizes
        for operation in self.config.operations:
            for size in self.config.sizes:
                self._run_single(operation, size)
        
        # Save and plot results
        if self.config.save_results:
            self._save_results()
        
        if self.config.plot_results:
            self._plot_results()
        
        # Print summary
        self._print_summary()
    
    def _run_profile_mode(self):
        """Run profiling mode with minimal overhead for use with vmprof"""
        operation = self.config.profile_operation or self.config.operations[0]
        size = self.config.profile_size or max(self.config.sizes)
        impl_name = self.config.profile_implementation
        
        print(f"PROFILING MODE: {self.config.name}")
        print(f"Operation: {operation}, Size: {size}")
        if impl_name:
            print(f"Implementation: {impl_name}")
        print("="*80)
        print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
        print("="*80)
        
        # Generate test data
        generator = self.data_generators.get(operation, self.data_generators.get('default'))
        if not generator:
            raise ValueError(f"No data generator for operation: {operation}")
        
        test_data = generator(size, operation)
        
        # Get implementations
        impls = self.implementations.get(operation, {})
        if not impls:
            raise ValueError(f"No implementations for operation: {operation}")
        
        # Filter to specific implementation if requested
        if impl_name:
            if impl_name not in impls:
                raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
            impls = {impl_name: impls[impl_name]}
        
        # Run with minimal overhead - no timing, no validation
        for name, func in impls.items():
            print(f"\nRunning {name}...")
            sys.stdout.flush()
            
            # Setup if needed
            setup_func = self.setups.get(operation, {}).get(name)
            if setup_func:
                data = setup_func(test_data)
            else:
                data = test_data
            
            # Run the actual function (this is what vmprof will profile)
            result = func(data)
            print(f"Completed {name}, result checksum: {result}")
            sys.stdout.flush()
    
    def _run_single(self, operation: str, size: int):
        """Run a single operation/size combination"""
        print(f"\nOperation: {operation}, Size: {size}")
        print("-" * 50)
        sys.stdout.flush()
        
        # Generate test data
        generator = self.data_generators.get(operation, 
                                           self.data_generators.get('default'))
        if not generator:
            raise ValueError(f"No data generator for operation: {operation}")
        
        test_data = generator(size, operation)
        
        # Get implementations for this operation
        impls = self.implementations.get(operation, {})
        if not impls:
            print(f"No implementations for operation: {operation}")
            return
        
        # Get setup functions for this operation
        setups = self.setups.get(operation, {})
        
        # Run reference implementation first
        ref_name, ref_impl = next(iter(impls.items()))
        ref_setup = setups.get(ref_name)
        expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
        
        # Run all implementations
        for impl_name, impl_func in impls.items():
            try:
                setup_func = setups.get(impl_name)
                result, time_ms = self.measure_time(impl_func, test_data, setup_func)
                correct = self.validate_result(expected_result, result, operation)
                
                # Store result
                self.results.append({
                    'operation': operation,
                    'size': size,
                    'implementation': impl_name,
                    'time_ms': time_ms,
                    'correct': correct,
                    'error': None
                })
                
                status = "OK" if correct else "FAIL"
                print(f"  {impl_name:<20} {time_ms:>8.3f} ms  {status}")
                sys.stdout.flush()
                
            except Exception as e:
                self.results.append({
                    'operation': operation,
                    'size': size,
                    'implementation': impl_name,
                    'time_ms': float('inf'),
                    'correct': False,
                    'error': str(e)
                })
                print(f"  {impl_name:<20} ERROR: {str(e)[:40]}")
                sys.stdout.flush()
    
    def _save_results(self):
        """Save results to JSON"""
        output_dir = Path(self.config.output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        
        filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\nResults saved to {filename}")
    
    def _plot_results(self):
        """Generate plots using matplotlib if available"""
        try:
            import matplotlib.pyplot as plt
            
            output_dir = Path(self.config.output_dir)
            output_dir.mkdir(parents=True, exist_ok=True)
            
            # Group and prepare data for plotting
            data_by_op = self._group_results_by_operation()
            
            # Create plots for each operation
            for operation, operation_data in data_by_op.items():
                self._create_performance_plot(plt, operation, operation_data, output_dir)
                
        except ImportError:
            print("Matplotlib not available - skipping plots")
        except Exception as e:
            print(f"Plotting failed: {e}")
    
    def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
        """Group results by operation and size for plotting"""
        data_by_op = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['time_ms'] != float('inf') and r['correct']:
                data_by_op[r['operation']][r['size']].append({
                    'implementation': r['implementation'],
                    'time_ms': r['time_ms']
                })
        return data_by_op
    
    def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
        """Create a performance plot for a single operation"""
        sizes = sorted(operation_data.keys())
        implementations = set()
        for size_data in operation_data.values():
            for entry in size_data:
                implementations.add(entry['implementation'])
        
        implementations = sorted(implementations)
        
        plt.figure(figsize=(10, 6))
        for impl in implementations:
            impl_times = []
            impl_sizes = []
            for size in sizes:
                times = [entry['time_ms'] for entry in operation_data[size] 
                        if entry['implementation'] == impl]
                if times:
                    impl_times.append(statistics.mean(times))
                    impl_sizes.append(size)
            
            if impl_times:
                plt.plot(impl_sizes, impl_times, 'o-', label=impl)
        
        plt.xlabel('Input Size')
        plt.ylabel('Time (ms)')
        plt.title(f'{self.config.name} - {operation} Operation')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # Apply the configured scaling
        if self.config.plot_scale == "loglog":
            plt.loglog()
        elif self.config.plot_scale == "linear":
            pass  # Default linear scale
        elif self.config.plot_scale == "semilogx":
            plt.semilogx()
        elif self.config.plot_scale == "semilogy":
            plt.semilogy()
        else:
            # Default to loglog if invalid option
            plt.loglog()
        
        plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
        plt.savefig(plot_file, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"Plot saved: {plot_file}")
    
    def _print_summary(self):
        """Print performance summary"""
        print("\n" + "="*80)
        print("PERFORMANCE SUMMARY")
        print("="*80)
        
        # Group by operation
        by_operation = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['error'] is None and r['time_ms'] != float('inf'):
                by_operation[r['operation']][r['implementation']].append(r['time_ms'])
        
        print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
        print("-" * 70)
        
        for op, impl_times in sorted(by_operation.items()):
            # Calculate averages
            avg_times = [(impl, statistics.mean(times)) 
                        for impl, times in impl_times.items()]
            avg_times.sort(key=lambda x: x[1])
            
            if avg_times:
                best_impl, best_time = avg_times[0]
                worst_time = avg_times[-1][1]
                speedup = worst_time / best_time if best_time > 0 else 0
                
                print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")



from typing import Generic
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')


def list_find(lst: list, value, start = 0, stop = sys.maxsize):
    try:
        return lst.index(value, start, stop)
    except:
        return -1


class view(Generic[_T]):
    __slots__ = 'A', 'l', 'r'
    def __init__(V, A: list[_T], l: int, r: int): V.A, V.l, V.r = A, l, r
    def __len__(V): return V.r - V.l
    def __getitem__(V, i: int): 
        if 0 <= i < V.r - V.l: return V.A[V.l+i]
        else: raise IndexError
    def __setitem__(V, i: int, v: _T): V.A[V.l+i] = v
    def __contains__(V, v: _T): return list_find(V.A, v, V.l, V.r) != -1
    def set_range(V, l: int, r: int): V.l, V.r = l, r
    def index(V, v: _T): return V.A.index(v, V.l, V.r) - V.l
    def reverse(V):
        l, r = V.l, V.r-1
        while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; l += 1; r -= 1
    def sort(V, /, *args, **kwargs):
        A = V.A[V.l:V.r]; A.sort(*args, **kwargs)
        for i,a in enumerate(A,V.l): V.A[i] = a
    def pop(V): V.r -= 1; return V.A[V.r]
    def append(V, v: _T): V.A[V.r] = v; V.r += 1
    def popleft(V): V.l += 1; return V.A[V.l-1]
    def appendleft(V, v: _T): V.l -= 1; V.A[V.l] = v; 
    def validate(V): return 0 <= V.l <= V.r <= len(V.A)

# Configure benchmark
config = BenchmarkConfig(
    name="list_view",
    sizes=[1000000, 100000, 10000, 1000, 100],  # Reverse order to warm up JIT
    operations=['construction', 'sum', 'modify', 'index', 'reverse', 'sort', 'nested_sum', 'pop', 'append'],
    iterations=10,
    warmup=3,
    output_dir="./output/benchmark_results/list_view"
)

# Create benchmark instance
benchmark = Benchmark(config)

# Data generator
@benchmark.data_generator("default")
def generate_view_data(size: int, operation: str):
    """Generate test data for slice/view operations"""
    # Generate random list
    data = [random.randint(0, 1000000) for _ in range(size)]
    
    # Generate random slice boundaries (10-50% of list)
    slice_size = random.randint(size // 10, size // 2)
    start = random.randint(0, size - slice_size)
    end = start + slice_size
    
    return {
        'data': data,
        'start': start,
        'end': end,
        'slice_size': slice_size,
        'search_value': random.randint(0, 1000000),
        'size': size,
        'operation': operation
    }

# Construction operation
@benchmark.implementation("slice", "construction")
def construction_slice(data):
    """Create a slice copy of the list"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice (copies data)
    slice_copy = lst[start:end]
    
    # Return checksum to ensure it's not optimized away
    checksum = 0
    for x in slice_copy:
        checksum ^= x
    return checksum

@benchmark.implementation("view", "construction")
def construction_view(data):
    """Create a view of the list"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view (no copy)
    list_view = view(lst, start, end)
    
    # Return checksum to ensure it's not optimized away
    checksum = 0
    for i in range(len(list_view)):
        checksum ^= list_view[i]
    return checksum

# Sum operation
@benchmark.implementation("slice", "sum")
def sum_slice(data):
    """Sum elements in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and sum
    slice_copy = lst[start:end]
    return sum(slice_copy) & 0xFFFFFFFF  # Keep as 32-bit for checksum

@benchmark.implementation("view", "sum")
def sum_view(data):
    """Sum elements in a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and sum through it
    list_view = view(lst, start, end)
    total = 0
    for i in range(len(list_view)):
        total += list_view[i]
    return total & 0xFFFFFFFF  # Keep as 32-bit for checksum

@benchmark.implementation("view_direct", "sum")
def sum_view_direct(data):
    """Sum elements using direct indexing"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Sum directly without creating slice or view
    total = 0
    for i in range(start, end):
        total += lst[i]
    return total & 0xFFFFFFFF  # Keep as 32-bit for checksum

# Setup functions for operations that need copying
@benchmark.setup("slice", ["modify", "reverse", "sort", "pop", "append"])
def setup_slice_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

@benchmark.setup("view", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

@benchmark.setup("view_direct", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_direct_modify(data):
    """Setup function that copies data before modification"""
    new_data = data.copy()
    new_data['data'] = data['data'].copy()
    return new_data

# Modify operation
@benchmark.implementation("slice", "modify")
def modify_slice(data):
    """Modify elements in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and modify
    slice_copy = lst[start:end]
    for i in range(len(slice_copy)):
        slice_copy[i] = (slice_copy[i] * 2) & 0xFFFFFFFF
    
    # Copy back to original positions manually
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "modify")
def modify_view(data):
    """Modify elements through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and modify through it
    list_view = view(lst, start, end)
    for i in range(len(list_view)):
        list_view[i] = (list_view[i] * 2) & 0xFFFFFFFF
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view_direct", "modify")
def modify_view_direct(data):
    """Modify elements using direct indexing"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Modify directly
    for i in range(start, end):
        lst[i] = (lst[i] * 2) & 0xFFFFFFFF
    
    # Return checksum
    checksum = 0
    for i in range(start, end):
        checksum ^= lst[i]
    return checksum

# Index operation
@benchmark.implementation("slice", "index")
def index_slice(data):
    """Find index of element in a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Create slice and search
    slice_copy = lst[start:end]
    try:
        idx = slice_copy.index(search_value)
        return start + idx  # Return global index
    except ValueError:
        return -1

@benchmark.implementation("view", "index")
def index_view(data):
    """Find index of element in a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Create view and search in it
    list_view = view(lst, start, end)
    try:
        idx = list_view.index(search_value)
        return start + idx  # Return global index
    except ValueError:
        return -1

@benchmark.implementation("view_direct", "index")
def index_view_direct(data):
    """Find index of element using direct search"""
    lst = data['data']
    start, end = data['start'], data['end']
    search_value = data['search_value']
    
    # Search directly
    for i in range(start, end):
        if lst[i] == search_value:
            return i
    return -1

# Reverse operation
@benchmark.implementation("slice", "reverse")
def reverse_slice(data):
    """Reverse a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice, reverse, and copy back manually
    slice_copy = lst[start:end]
    slice_copy.reverse()
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "reverse")
def reverse_view(data):
    """Reverse through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and reverse through it
    list_view = view(lst, start, end)
    list_view.reverse()
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view_direct", "reverse")
def reverse_view_direct(data):
    """Reverse using direct manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Reverse directly
    left, right = start, end - 1
    while left < right:
        lst[left], lst[right] = lst[right], lst[left]
        left += 1
        right -= 1
    
    # Return checksum
    checksum = 0
    for i in range(start, min(start + 100, end)):  # First 100 elements
        checksum ^= lst[i]
    return checksum

# Sort operation
@benchmark.implementation("slice", "sort")
def sort_slice(data):
    """Sort a slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice, sort, and copy back manually
    slice_copy = lst[start:end]
    slice_copy.sort()
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    # Return checksum of first elements
    checksum = 0
    for i in range(start, min(start + 100, end)):
        checksum ^= lst[i]
    return checksum

@benchmark.implementation("view", "sort")
def sort_view(data):
    """Sort through a view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and sort through it
    list_view = view(lst, start, end)
    list_view.sort()
    
    # Return checksum of first elements
    checksum = 0
    for i in range(start, min(start + 100, end)):
        checksum ^= lst[i]
    return checksum

# Nested sum operation (multiple slices)
@benchmark.implementation("slice", "nested_sum")
def nested_sum_slice(data):
    """Sum multiple overlapping slices"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Create 4 overlapping slices
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        slice_copy = lst[s:e]
        total += sum(slice_copy)
    
    return total & 0xFFFFFFFF

@benchmark.implementation("view", "nested_sum")
def nested_sum_view(data):
    """Sum multiple overlapping views"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Create 4 overlapping views
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        list_view = view(lst, s, e)
        for i in range(len(list_view)):
            total += list_view[i]
    
    return total & 0xFFFFFFFF

@benchmark.implementation("view_direct", "nested_sum")
def nested_sum_view_direct(data):
    """Sum multiple overlapping ranges directly"""
    lst = data['data']
    start, end = data['start'], data['end']
    slice_size = (end - start) // 4
    
    total = 0
    # Sum 4 overlapping ranges
    for offset in range(4):
        s = start + offset * slice_size // 2
        e = min(s + slice_size, end)
        for i in range(s, e):
            total += lst[i]
    
    return total & 0xFFFFFFFF

# Pop operation
@benchmark.implementation("slice", "pop")
def pop_slice(data):
    """Pop from end of slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and pop from it
    slice_copy = lst[start:end]
    checksum = 0
    for _ in range(min(100, len(slice_copy))):  # Pop up to 100 elements
        if slice_copy:
            val = slice_copy.pop()
            checksum ^= val
    
    # Copy back to original (shortened)
    for i, val in enumerate(slice_copy):
        lst[start + i] = val
    
    return checksum

@benchmark.implementation("view", "pop")
def pop_view(data):
    """Pop from end of view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and pop from it
    list_view = view(lst, start, end)
    checksum = 0
    for _ in range(min(100, len(list_view))):  # Pop up to 100 elements
        if len(list_view) > 0:
            val = list_view.pop()
            checksum ^= val
    
    return checksum

@benchmark.implementation("view_direct", "pop")
def pop_view_direct(data):
    """Pop using direct list manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    checksum = 0
    current_end = end
    for _ in range(min(100, end - start)):  # Pop up to 100 elements
        if current_end > start:
            current_end -= 1
            val = lst[current_end]
            checksum ^= val
    
    return checksum

# Append operation
@benchmark.implementation("slice", "append")
def append_slice(data):
    """Append to slice"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create slice and append to it
    slice_copy = lst[start:end]
    checksum = 0
    for i in range(100):  # Append 100 elements
        val = (i * 17) & 0xFFFFFFFF
        slice_copy.append(val)
        checksum ^= val
    
    # Note: Can't copy back easily since slice grew, so just return checksum
    return checksum

@benchmark.implementation("view", "append")
def append_view(data):
    """Append to view"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    # Create view and append to it
    list_view = view(lst, start, end)
    checksum = 0
    for i in range(min(100, len(lst) - end)):  # Append up to 100 elements or space available
        val = (i * 17) & 0xFFFFFFFF
        list_view.append(val)
        checksum ^= val
    
    return checksum

@benchmark.implementation("view_direct", "append")
def append_view_direct(data):
    """Append using direct list manipulation"""
    lst = data['data']
    start, end = data['start'], data['end']
    
    checksum = 0
    current_end = end
    for i in range(min(100, len(lst) - end)):  # Append up to 100 elements or space available
        val = (i * 17) & 0xFFFFFFFF
        if current_end < len(lst):
            lst[current_end] = val
            current_end += 1
            checksum ^= val
    
    return checksum

if __name__ == "__main__":
    # Parse command line args and run appropriate mode
    runner = benchmark.parse_args()
    runner.run()
Back to top page