This documentation is automatically generated by online-judge-tools/verification-helper
#!/usr/bin/env python3
"""
Benchmark comparing heap operations on list slices vs view objects vs direct indexing.
Tests heapify, heappop, heapreplace, heappush, heappushpop operations.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.ds.view.view_cls import view
from cp_library.ds.heap.fast_heapq import heapify, heappop, heapreplace, heappush, heappushpop
import heapq # Standard library for comparison
# Configure benchmark
config = BenchmarkConfig(
name="heap_view",
sizes=[100000, 10000, 1000, 100, 50], # Reverse order to warm up JIT
operations=['heapify', 'heappop', 'heapreplace', 'heappush', 'heappushpop'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/heap_view"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_heap_data(size: int, operation: str):
"""Generate test data for heap operations"""
# Generate random list for heap operations
data = [random.randint(1, 1000000) for _ in range(size)]
# Generate random slice boundaries (30-70% of list for reasonable heap size)
slice_size = random.randint(size // 3, min(size * 2 // 3, size - 1))
start = random.randint(0, size - slice_size)
end = start + slice_size
return {
'data': data,
'start': start,
'end': end,
'slice_size': slice_size,
'new_value': random.randint(1, 1000000),
'replace_value': random.randint(1, 1000000),
'size': size,
'operation': operation
}
# Setup functions for operations that modify data
@benchmark.setup("slice", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_slice_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the slice for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
slice_copy = lst[start:end]
heapify(slice_copy)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return new_data
@benchmark.setup("view", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_view_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the view for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
heap_view = view(lst, start, end)
heapify(heap_view)
return new_data
@benchmark.setup("direct", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_direct_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the range for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
temp_list = lst[start:end]
heapify(temp_list)
for i, val in enumerate(temp_list):
lst[start + i] = val
return new_data
@benchmark.setup("stdlib", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_stdlib_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the slice for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
slice_copy = lst[start:end]
heapq.heapify(slice_copy)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return new_data
# For heapify operation, we need setup without pre-heapifying
@benchmark.setup("slice", ["heapify"])
def setup_slice_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view", ["heapify"])
def setup_view_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("direct", ["heapify"])
def setup_direct_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("stdlib", ["heapify"])
def setup_stdlib_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
# Heapify operation
@benchmark.implementation("slice", "heapify")
def heapify_slice(data):
"""Heapify a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and heapify
slice_copy = lst[start:end]
heapify(slice_copy)
# Copy back manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "heapify")
def heapify_view(data):
"""Heapify through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and heapify
heap_view = view(lst, start, end)
heapify(heap_view)
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("direct", "heapify")
def heapify_direct(data):
"""Heapify using direct list access"""
lst = data['data']
start, end = data['start'], data['end']
# Heapify the range directly in the list
temp_list = lst[start:end]
heapify(temp_list)
for i, val in enumerate(temp_list):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("stdlib", "heapify")
def heapify_stdlib(data):
"""Heapify using standard library"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and heapify with stdlib
slice_copy = lst[start:end]
heapq.heapify(slice_copy)
# Copy back manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
# Heappop operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappop")
def heappop_slice(data):
"""Pop from heap slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop (already heapified)
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(10, len(slice_copy))): # Pop up to 10 elements
if slice_copy:
val = heappop(slice_copy)
checksum ^= val
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
@benchmark.implementation("view", "heappop")
def heappop_view(data):
"""Pop from heap view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and pop (already heapified)
heap_view = view(lst, start, end)
checksum = 0
for _ in range(min(10, len(heap_view))): # Pop up to 10 elements
if len(heap_view) > 0:
val = heappop(heap_view)
checksum ^= val
return checksum
@benchmark.implementation("direct", "heappop")
def heappop_direct(data):
"""Pop from heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
# Pop from range (already heapified)
temp_list = lst[start:end]
checksum = 0
for _ in range(min(10, len(temp_list))): # Pop up to 10 elements
if temp_list:
val = heappop(temp_list)
checksum ^= val
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
@benchmark.implementation("stdlib", "heappop")
def heappop_stdlib(data):
"""Pop from heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop with stdlib (already heapified)
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(10, len(slice_copy))): # Pop up to 10 elements
if slice_copy:
val = heapq.heappop(slice_copy)
checksum ^= val
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
# Heapreplace operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heapreplace")
def heapreplace_slice(data):
"""Replace in heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and replace (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for _ in range(min(5, len(slice_copy))): # Replace up to 5 elements
if slice_copy:
val = heapreplace(slice_copy, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("view", "heapreplace")
def heapreplace_view(data):
"""Replace in heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and replace (already heapified)
heap_view = view(lst, start, end)
if len(heap_view) > 0:
checksum = 0
for _ in range(min(5, len(heap_view))): # Replace up to 5 elements
if len(heap_view) > 0:
val = heapreplace(heap_view, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
return checksum
return 0
@benchmark.implementation("direct", "heapreplace")
def heapreplace_direct(data):
"""Replace in heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Replace from range (already heapified)
temp_list = lst[start:end]
if temp_list:
checksum = 0
for _ in range(min(5, len(temp_list))): # Replace up to 5 elements
if temp_list:
val = heapreplace(temp_list, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("stdlib", "heapreplace")
def heapreplace_stdlib(data):
"""Replace in heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and replace with stdlib (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for _ in range(min(5, len(slice_copy))): # Replace up to 5 elements
if slice_copy:
val = heapq.heapreplace(slice_copy, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
# Heappush operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappush")
def heappush_slice(data):
"""Push to heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and push (already heapified)
slice_copy = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heappush(slice_copy, val)
checksum ^= val
# Note: Can't copy back easily since slice grew, just return checksum
return checksum
@benchmark.implementation("view", "heappush")
def heappush_view(data):
"""Push to heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and push (already heapified)
heap_view = view(lst, start, end)
checksum = 0
max_push = min(5, len(lst) - end) # Limited by available space
for i in range(max_push):
val = (new_value + i) & 0xFFFFFFFF
heappush(heap_view, val)
checksum ^= val
return checksum
@benchmark.implementation("direct", "heappush")
def heappush_direct(data):
"""Push to heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Push to range (already heapified)
temp_list = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heappush(temp_list, val)
checksum ^= val
# Note: Can't copy back easily since temp_list grew, just return checksum
return checksum
@benchmark.implementation("stdlib", "heappush")
def heappush_stdlib(data):
"""Push to heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and push with stdlib (already heapified)
slice_copy = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heapq.heappush(slice_copy, val)
checksum ^= val
# Note: Can't copy back easily since slice grew, just return checksum
return checksum
# Heappushpop operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappushpop")
def heappushpop_slice(data):
"""Push and pop from heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and pushpop (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for i in range(min(5, len(slice_copy))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(slice_copy, val)
checksum ^= popped
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("view", "heappushpop")
def heappushpop_view(data):
"""Push and pop from heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and pushpop (already heapified)
heap_view = view(lst, start, end)
if len(heap_view) > 0:
checksum = 0
for i in range(min(5, len(heap_view))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(heap_view, val)
checksum ^= popped
return checksum
return 0
@benchmark.implementation("direct", "heappushpop")
def heappushpop_direct(data):
"""Push and pop from heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Pushpop from range (already heapified)
temp_list = lst[start:end]
if temp_list:
checksum = 0
for i in range(min(5, len(temp_list))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(temp_list, val)
checksum ^= popped
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("stdlib", "heappushpop")
def heappushpop_stdlib(data):
"""Push and pop from heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and pushpop with stdlib (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for i in range(min(5, len(slice_copy))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heapq.heappushpop(slice_copy, val)
checksum ^= popped
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()
#!/usr/bin/env python3
"""
Benchmark comparing heap operations on list slices vs view objects vs direct indexing.
Tests heapify, heappop, heapreplace, heappush, heappushpop operations.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
"""
Declarative benchmark framework with minimal boilerplate.
Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""
import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs"""
name: str
sizes: List[int] = None
operations: List[str] = None
iterations: int = 10
warmup: int = 2
output_dir: str = "./output/benchmark_results"
save_results: bool = True
plot_results: bool = True
plot_scale: str = "loglog" # Options: "loglog", "linear", "semilogx", "semilogy"
progressive: bool = True # Show results operation by operation across sizes
# Profiling mode
profile_mode: bool = False
profile_size: int = None
profile_operation: str = None
profile_implementation: str = None
def __post_init__(self):
if self.sizes is None:
self.sizes = [100, 1000, 10000, 100000]
if self.operations is None:
self.operations = ['default']
class Benchmark:
"""Declarative benchmark framework using decorators"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.data_generators = {}
self.implementations = {}
self.validators = {}
self.setups = {}
self.results = []
def profile(self, operation: str = None, size: int = None, implementation: str = None):
"""Create a profiling version of this benchmark"""
profile_config = BenchmarkConfig(
name=f"{self.config.name}_profile",
sizes=self.config.sizes,
operations=self.config.operations,
profile_mode=True,
profile_operation=operation,
profile_size=size,
profile_implementation=implementation,
save_results=False,
plot_results=False
)
profile_benchmark = Benchmark(profile_config)
profile_benchmark.data_generators = self.data_generators
profile_benchmark.implementations = self.implementations
profile_benchmark.validators = self.validators
profile_benchmark.setups = self.setups
return profile_benchmark
def parse_args(self):
"""Parse command line arguments for profiling mode"""
parser = argparse.ArgumentParser(
description=f"Benchmark {self.config.name} with optional profiling mode",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Normal benchmark mode
python benchmark.py
# Profile specific operation and implementation
python benchmark.py --profile --operation random_access --implementation grid
# Profile with specific size
python benchmark.py --profile --size 1000000
# Profile all implementations of an operation
python benchmark.py --profile --operation construction
"""
)
parser.add_argument('--profile', action='store_true',
help='Run in profiling mode (minimal overhead for profilers)')
parser.add_argument('--operation', type=str,
help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
parser.add_argument('--size', type=int,
help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
parser.add_argument('--implementation', type=str,
help='Specific implementation to profile (default: all)')
args = parser.parse_args()
# If profile mode requested, return a profiling benchmark
if args.profile:
return self.profile(
operation=args.operation,
size=args.size,
implementation=args.implementation
)
# Otherwise return self for normal mode
return self
def data_generator(self, name: str = "default"):
"""Decorator to register data generator"""
def decorator(func):
self.data_generators[name] = func
return func
return decorator
def implementation(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register implementation"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.implementations:
self.implementations[op] = {}
self.implementations[op][name] = func
return func
return decorator
def validator(self, operation: str = "default"):
"""Decorator to register custom validator"""
def decorator(func):
self.validators[operation] = func
return func
return decorator
def setup(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register setup function that runs before timing"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.setups:
self.setups[op] = {}
self.setups[op][name] = func
return func
return decorator
def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
"""Measure execution time with warmup and optional setup"""
# Warmup runs
for _ in range(self.config.warmup):
try:
if setup_func:
setup_data = setup_func(data)
func(setup_data)
else:
func(data)
except Exception:
# If warmup fails, let the main measurement handle the error
break
# Actual measurement
start = time.perf_counter()
for _ in range(self.config.iterations):
if setup_func:
setup_data = setup_func(data)
result = func(setup_data)
else:
result = func(data)
elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
return result, elapsed_ms
def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
"""Validate result using custom validator or default comparison"""
if operation in self.validators:
return self.validators[operation](expected, actual)
return expected == actual
def run(self):
"""Run all benchmarks"""
if self.config.profile_mode:
self._run_profile_mode()
else:
self._run_normal_mode()
def _run_normal_mode(self):
"""Run normal benchmark mode"""
print(f"Running {self.config.name}")
print(f"Sizes: {self.config.sizes}")
print(f"Operations: {self.config.operations}")
print("="*80)
# Always show progressive results: operation by operation across all sizes
for operation in self.config.operations:
for size in self.config.sizes:
self._run_single(operation, size)
# Save and plot results
if self.config.save_results:
self._save_results()
if self.config.plot_results:
self._plot_results()
# Print summary
self._print_summary()
def _run_profile_mode(self):
"""Run profiling mode with minimal overhead for use with vmprof"""
operation = self.config.profile_operation or self.config.operations[0]
size = self.config.profile_size or max(self.config.sizes)
impl_name = self.config.profile_implementation
print(f"PROFILING MODE: {self.config.name}")
print(f"Operation: {operation}, Size: {size}")
if impl_name:
print(f"Implementation: {impl_name}")
print("="*80)
print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
print("="*80)
# Generate test data
generator = self.data_generators.get(operation, self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations
impls = self.implementations.get(operation, {})
if not impls:
raise ValueError(f"No implementations for operation: {operation}")
# Filter to specific implementation if requested
if impl_name:
if impl_name not in impls:
raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
impls = {impl_name: impls[impl_name]}
# Run with minimal overhead - no timing, no validation
for name, func in impls.items():
print(f"\nRunning {name}...")
sys.stdout.flush()
# Setup if needed
setup_func = self.setups.get(operation, {}).get(name)
if setup_func:
data = setup_func(test_data)
else:
data = test_data
# Run the actual function (this is what vmprof will profile)
result = func(data)
print(f"Completed {name}, result checksum: {result}")
sys.stdout.flush()
def _run_single(self, operation: str, size: int):
"""Run a single operation/size combination"""
print(f"\nOperation: {operation}, Size: {size}")
print("-" * 50)
sys.stdout.flush()
# Generate test data
generator = self.data_generators.get(operation,
self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations for this operation
impls = self.implementations.get(operation, {})
if not impls:
print(f"No implementations for operation: {operation}")
return
# Get setup functions for this operation
setups = self.setups.get(operation, {})
# Run reference implementation first
ref_name, ref_impl = next(iter(impls.items()))
ref_setup = setups.get(ref_name)
expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
# Run all implementations
for impl_name, impl_func in impls.items():
try:
setup_func = setups.get(impl_name)
result, time_ms = self.measure_time(impl_func, test_data, setup_func)
correct = self.validate_result(expected_result, result, operation)
# Store result
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': time_ms,
'correct': correct,
'error': None
})
status = "OK" if correct else "FAIL"
print(f" {impl_name:<20} {time_ms:>8.3f} ms {status}")
sys.stdout.flush()
except Exception as e:
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': float('inf'),
'correct': False,
'error': str(e)
})
print(f" {impl_name:<20} ERROR: {str(e)[:40]}")
sys.stdout.flush()
def _save_results(self):
"""Save results to JSON"""
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to {filename}")
def _plot_results(self):
"""Generate plots using matplotlib if available"""
try:
import matplotlib.pyplot as plt
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group and prepare data for plotting
data_by_op = self._group_results_by_operation()
# Create plots for each operation
for operation, operation_data in data_by_op.items():
self._create_performance_plot(plt, operation, operation_data, output_dir)
except ImportError:
print("Matplotlib not available - skipping plots")
except Exception as e:
print(f"Plotting failed: {e}")
def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
"""Group results by operation and size for plotting"""
data_by_op = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['time_ms'] != float('inf') and r['correct']:
data_by_op[r['operation']][r['size']].append({
'implementation': r['implementation'],
'time_ms': r['time_ms']
})
return data_by_op
def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
"""Create a performance plot for a single operation"""
sizes = sorted(operation_data.keys())
implementations = set()
for size_data in operation_data.values():
for entry in size_data:
implementations.add(entry['implementation'])
implementations = sorted(implementations)
plt.figure(figsize=(10, 6))
for impl in implementations:
impl_times = []
impl_sizes = []
for size in sizes:
times = [entry['time_ms'] for entry in operation_data[size]
if entry['implementation'] == impl]
if times:
impl_times.append(statistics.mean(times))
impl_sizes.append(size)
if impl_times:
plt.plot(impl_sizes, impl_times, 'o-', label=impl)
plt.xlabel('Input Size')
plt.ylabel('Time (ms)')
plt.title(f'{self.config.name} - {operation} Operation')
plt.legend()
plt.grid(True, alpha=0.3)
# Apply the configured scaling
if self.config.plot_scale == "loglog":
plt.loglog()
elif self.config.plot_scale == "linear":
pass # Default linear scale
elif self.config.plot_scale == "semilogx":
plt.semilogx()
elif self.config.plot_scale == "semilogy":
plt.semilogy()
else:
# Default to loglog if invalid option
plt.loglog()
plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Plot saved: {plot_file}")
def _print_summary(self):
"""Print performance summary"""
print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
# Group by operation
by_operation = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['error'] is None and r['time_ms'] != float('inf'):
by_operation[r['operation']][r['implementation']].append(r['time_ms'])
print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
print("-" * 70)
for op, impl_times in sorted(by_operation.items()):
# Calculate averages
avg_times = [(impl, statistics.mean(times))
for impl, times in impl_times.items()]
avg_times.sort(key=lambda x: x[1])
if avg_times:
best_impl, best_time = avg_times[0]
worst_time = avg_times[-1][1]
speedup = worst_time / best_time if best_time > 0 else 0
print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
https://kobejean.github.io/cp-library
'''
from typing import Generic
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')
def list_find(lst: list, value, start = 0, stop = sys.maxsize):
try:
return lst.index(value, start, stop)
except:
return -1
class view(Generic[_T]):
__slots__ = 'A', 'l', 'r'
def __init__(V, A: list[_T], l: int, r: int): V.A, V.l, V.r = A, l, r
def __len__(V): return V.r - V.l
def __getitem__(V, i: int):
if 0 <= i < V.r - V.l: return V.A[V.l+i]
else: raise IndexError
def __setitem__(V, i: int, v: _T): V.A[V.l+i] = v
def __contains__(V, v: _T): return list_find(V.A, v, V.l, V.r) != -1
def set_range(V, l: int, r: int): V.l, V.r = l, r
def index(V, v: _T): return V.A.index(v, V.l, V.r) - V.l
def reverse(V):
l, r = V.l, V.r-1
while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; l += 1; r -= 1
def sort(V, /, *args, **kwargs):
A = V.A[V.l:V.r]; A.sort(*args, **kwargs)
for i,a in enumerate(A,V.l): V.A[i] = a
def pop(V): V.r -= 1; return V.A[V.r]
def append(V, v: _T): V.A[V.r] = v; V.r += 1
def popleft(V): V.l += 1; return V.A[V.l-1]
def appendleft(V, v: _T): V.l -= 1; V.A[V.l] = v;
def validate(V): return 0 <= V.l <= V.r <= len(V.A)
def heappush(heap: list, item):
heap.append(item)
heapsiftdown(heap, 0, len(heap)-1)
def heappop(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapreplace(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heappushpop(heap: list, item):
if heap and heap[0] < item: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapify(x: list):
for i in reversed(range(len(x)//2)): heapsiftup(x, i)
def heapsiftdown(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and item < heap[p := (pos-1)>>1]: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and heap[c := c+(heap[c+1]<heap[c])] < item: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and heap[c] < item: heap[pos], pos = heap[c], c
heap[pos] = item
def heappop_max(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapify_max(x: list):
for i in reversed(range(len(x)//2)): heapsiftup_max(x, i)
def heappush_max(heap: list, item):
heap.append(item); heapsiftdown_max(heap, 0, len(heap)-1)
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heappushpop_max(heap: list, item):
if heap and heap[0] > item: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapsiftdown_max(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and heap[p := (pos-1)>>1] < item: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup_max(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and item < heap[c := c+(heap[c]<heap[c+1])]: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and item < heap[c]: heap[pos], pos = heap[c], c
heap[pos] = item
import heapq # Standard library for comparison
# Configure benchmark
config = BenchmarkConfig(
name="heap_view",
sizes=[100000, 10000, 1000, 100, 50], # Reverse order to warm up JIT
operations=['heapify', 'heappop', 'heapreplace', 'heappush', 'heappushpop'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/heap_view"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_heap_data(size: int, operation: str):
"""Generate test data for heap operations"""
# Generate random list for heap operations
data = [random.randint(1, 1000000) for _ in range(size)]
# Generate random slice boundaries (30-70% of list for reasonable heap size)
slice_size = random.randint(size // 3, min(size * 2 // 3, size - 1))
start = random.randint(0, size - slice_size)
end = start + slice_size
return {
'data': data,
'start': start,
'end': end,
'slice_size': slice_size,
'new_value': random.randint(1, 1000000),
'replace_value': random.randint(1, 1000000),
'size': size,
'operation': operation
}
# Setup functions for operations that modify data
@benchmark.setup("slice", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_slice_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the slice for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
slice_copy = lst[start:end]
heapify(slice_copy)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return new_data
@benchmark.setup("view", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_view_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the view for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
heap_view = view(lst, start, end)
heapify(heap_view)
return new_data
@benchmark.setup("direct", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_direct_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the range for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
temp_list = lst[start:end]
heapify(temp_list)
for i, val in enumerate(temp_list):
lst[start + i] = val
return new_data
@benchmark.setup("stdlib", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_stdlib_heap(data):
"""Setup function that copies data and heapifies before heap operations"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
# Pre-heapify the slice for operations that need it
lst = new_data['data']
start, end = new_data['start'], new_data['end']
slice_copy = lst[start:end]
heapq.heapify(slice_copy)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return new_data
# For heapify operation, we need setup without pre-heapifying
@benchmark.setup("slice", ["heapify"])
def setup_slice_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view", ["heapify"])
def setup_view_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("direct", ["heapify"])
def setup_direct_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("stdlib", ["heapify"])
def setup_stdlib_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
# Heapify operation
@benchmark.implementation("slice", "heapify")
def heapify_slice(data):
"""Heapify a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and heapify
slice_copy = lst[start:end]
heapify(slice_copy)
# Copy back manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "heapify")
def heapify_view(data):
"""Heapify through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and heapify
heap_view = view(lst, start, end)
heapify(heap_view)
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("direct", "heapify")
def heapify_direct(data):
"""Heapify using direct list access"""
lst = data['data']
start, end = data['start'], data['end']
# Heapify the range directly in the list
temp_list = lst[start:end]
heapify(temp_list)
for i, val in enumerate(temp_list):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("stdlib", "heapify")
def heapify_stdlib(data):
"""Heapify using standard library"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and heapify with stdlib
slice_copy = lst[start:end]
heapq.heapify(slice_copy)
# Copy back manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 10, end)): # First 10 elements
checksum ^= lst[i]
return checksum
# Heappop operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappop")
def heappop_slice(data):
"""Pop from heap slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop (already heapified)
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(10, len(slice_copy))): # Pop up to 10 elements
if slice_copy:
val = heappop(slice_copy)
checksum ^= val
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
@benchmark.implementation("view", "heappop")
def heappop_view(data):
"""Pop from heap view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and pop (already heapified)
heap_view = view(lst, start, end)
checksum = 0
for _ in range(min(10, len(heap_view))): # Pop up to 10 elements
if len(heap_view) > 0:
val = heappop(heap_view)
checksum ^= val
return checksum
@benchmark.implementation("direct", "heappop")
def heappop_direct(data):
"""Pop from heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
# Pop from range (already heapified)
temp_list = lst[start:end]
checksum = 0
for _ in range(min(10, len(temp_list))): # Pop up to 10 elements
if temp_list:
val = heappop(temp_list)
checksum ^= val
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
@benchmark.implementation("stdlib", "heappop")
def heappop_stdlib(data):
"""Pop from heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop with stdlib (already heapified)
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(10, len(slice_copy))): # Pop up to 10 elements
if slice_copy:
val = heapq.heappop(slice_copy)
checksum ^= val
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
# Heapreplace operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heapreplace")
def heapreplace_slice(data):
"""Replace in heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and replace (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for _ in range(min(5, len(slice_copy))): # Replace up to 5 elements
if slice_copy:
val = heapreplace(slice_copy, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("view", "heapreplace")
def heapreplace_view(data):
"""Replace in heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and replace (already heapified)
heap_view = view(lst, start, end)
if len(heap_view) > 0:
checksum = 0
for _ in range(min(5, len(heap_view))): # Replace up to 5 elements
if len(heap_view) > 0:
val = heapreplace(heap_view, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
return checksum
return 0
@benchmark.implementation("direct", "heapreplace")
def heapreplace_direct(data):
"""Replace in heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Replace from range (already heapified)
temp_list = lst[start:end]
if temp_list:
checksum = 0
for _ in range(min(5, len(temp_list))): # Replace up to 5 elements
if temp_list:
val = heapreplace(temp_list, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("stdlib", "heapreplace")
def heapreplace_stdlib(data):
"""Replace in heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and replace with stdlib (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for _ in range(min(5, len(slice_copy))): # Replace up to 5 elements
if slice_copy:
val = heapq.heapreplace(slice_copy, new_value)
checksum ^= val
new_value = (new_value + 1) & 0xFFFFFFFF
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
# Heappush operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappush")
def heappush_slice(data):
"""Push to heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and push (already heapified)
slice_copy = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heappush(slice_copy, val)
checksum ^= val
# Note: Can't copy back easily since slice grew, just return checksum
return checksum
@benchmark.implementation("view", "heappush")
def heappush_view(data):
"""Push to heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and push (already heapified)
heap_view = view(lst, start, end)
checksum = 0
max_push = min(5, len(lst) - end) # Limited by available space
for i in range(max_push):
val = (new_value + i) & 0xFFFFFFFF
heappush(heap_view, val)
checksum ^= val
return checksum
@benchmark.implementation("direct", "heappush")
def heappush_direct(data):
"""Push to heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Push to range (already heapified)
temp_list = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heappush(temp_list, val)
checksum ^= val
# Note: Can't copy back easily since temp_list grew, just return checksum
return checksum
@benchmark.implementation("stdlib", "heappush")
def heappush_stdlib(data):
"""Push to heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and push with stdlib (already heapified)
slice_copy = lst[start:end]
checksum = 0
for i in range(5): # Push 5 elements
val = (new_value + i) & 0xFFFFFFFF
heapq.heappush(slice_copy, val)
checksum ^= val
# Note: Can't copy back easily since slice grew, just return checksum
return checksum
# Heappushpop operation (heap is already heapified in setup)
@benchmark.implementation("slice", "heappushpop")
def heappushpop_slice(data):
"""Push and pop from heap slice"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and pushpop (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for i in range(min(5, len(slice_copy))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(slice_copy, val)
checksum ^= popped
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("view", "heappushpop")
def heappushpop_view(data):
"""Push and pop from heap view"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create view and pushpop (already heapified)
heap_view = view(lst, start, end)
if len(heap_view) > 0:
checksum = 0
for i in range(min(5, len(heap_view))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(heap_view, val)
checksum ^= popped
return checksum
return 0
@benchmark.implementation("direct", "heappushpop")
def heappushpop_direct(data):
"""Push and pop from heap using direct access"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Pushpop from range (already heapified)
temp_list = lst[start:end]
if temp_list:
checksum = 0
for i in range(min(5, len(temp_list))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heappushpop(temp_list, val)
checksum ^= popped
# Copy back
for i, val in enumerate(temp_list):
lst[start + i] = val
return checksum
return 0
@benchmark.implementation("stdlib", "heappushpop")
def heappushpop_stdlib(data):
"""Push and pop from heap using standard library"""
lst = data['data']
start, end = data['start'], data['end']
new_value = data['new_value']
# Create slice and pushpop with stdlib (already heapified)
slice_copy = lst[start:end]
if slice_copy:
checksum = 0
for i in range(min(5, len(slice_copy))): # Pushpop up to 5 elements
val = (new_value + i) & 0xFFFFFFFF
popped = heapq.heappushpop(slice_copy, val)
checksum ^= popped
# Copy back
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
return 0
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()