This documentation is automatically generated by online-judge-tools/verification-helper
#!/usr/bin/env python3
"""
Benchmark comparing heap operations on CSR sparse rows vs list of lists/heaps.
Tests heapify, heappop, heapreplace, heappush, heappushpop operations.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.ds.view.csr_cls import CSR
from cp_library.ds.heap.fast_heapq import heapify, heappop, heapreplace, heappush, heappushpop
# Configure benchmark
config = BenchmarkConfig(
name="heap_csr",
sizes=[10000000, 1000000, 100000, 10000, 1000, 100, 10], # Reverse order to warm up JIT
operations=['initialization', 'initialization_bucketize', 'heapify', 'heappop', 'heapreplace', 'heappush', 'heappushpop'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/heap_csr"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator for initialization
@benchmark.data_generator("initialization")
def generate_initialization_data(size: int, operation: str):
"""Generate raw data for initialization benchmark"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Generate raw row data
raw_rows = []
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
row_data = [random.randint(1, 1000000) for _ in range(row_size)]
raw_rows.append(row_data)
return {
'raw_rows': raw_rows,
'num_rows': num_rows,
'size': size,
'operation': operation
}
# Data generator for bucketize initialization
@benchmark.data_generator("initialization_bucketize")
def generate_bucketize_data(size: int, operation: str):
"""Generate (key, value) pairs for bucketize initialization benchmark"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Generate (key, value) pairs
keys = []
values = []
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
for _ in range(row_size):
keys.append(i) # Row index as key
values.append(random.randint(1, 1000000)) # Random value
# Shuffle to simulate unsorted input
combined = list(zip(keys, values))
random.shuffle(combined)
keys, values = zip(*combined)
keys, values = list(keys), list(values)
# Create equivalent raw rows for list_of_lists comparison
raw_rows = [[] for _ in range(num_rows)]
for key, value in zip(keys, values):
raw_rows[key].append(value)
return {
'keys': keys,
'values': values,
'raw_rows': raw_rows,
'num_rows': num_rows,
'size': size,
'operation': operation
}
# Data generator for other operations
@benchmark.data_generator("default")
def generate_csr_heap_data(size: int, operation: str):
"""Generate test data for CSR heap operations"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Create sparse row data
A = [] # All elements concatenated
O = [0] # Offsets for each row
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
row_data = [random.randint(1, 1000000) for _ in range(row_size)]
A.extend(row_data)
O.append(len(A))
# Create list of lists equivalent
list_of_lists = []
for i in range(num_rows):
start, end = O[i], O[i + 1]
list_of_lists.append(A[start:end].copy())
return {
'A': A,
'O': O,
'list_of_lists': list_of_lists,
'num_rows': num_rows,
'new_value': random.randint(1, 1000000),
'target_row': random.randint(0, num_rows - 1),
'size': size,
'operation': operation
}
# Setup functions for operations that modify data
@benchmark.setup("csr", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_csr_heap(data):
"""Setup function that copies data and heapifies CSR rows"""
new_data = data.copy()
new_data['A'] = data['A'].copy()
new_data['O'] = data['O'].copy()
# Create CSR and pre-heapify all rows
csr = CSR(new_data['A'], new_data['O'])
for row_view in csr:
heapify(row_view)
new_data['csr'] = csr
return new_data
@benchmark.setup("list_of_lists", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_list_of_lists_heap(data):
"""Setup function that copies data and heapifies list of lists"""
new_data = data.copy()
# Deep copy list of lists
new_data['list_of_lists'] = [row.copy() for row in data['list_of_lists']]
# Pre-heapify all lists
for row in new_data['list_of_lists']:
heapify(row)
return new_data
# For heapify operation, we need setup without pre-heapifying
@benchmark.setup("csr", ["heapify"])
def setup_csr_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['A'] = data['A'].copy()
new_data['O'] = data['O'].copy()
new_data['csr'] = CSR(new_data['A'], new_data['O'])
return new_data
@benchmark.setup("list_of_lists", ["heapify"])
def setup_list_of_lists_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['list_of_lists'] = [row.copy() for row in data['list_of_lists']]
return new_data
# Initialization operation
@benchmark.implementation("csr", "initialization")
def initialization_csr(data):
"""Initialize CSR from raw row data"""
raw_rows = data['raw_rows']
# Create CSR structure
A = [] # All elements concatenated
O = [0] # Offsets for each row
for row_data in raw_rows:
A.extend(row_data)
O.append(len(A))
# Create CSR object
csr = CSR(A, O)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(csr))): # First 10 rows
row_view = csr[i]
for j in range(min(3, len(row_view))): # First 3 elements per row
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "initialization")
def initialization_list_of_lists(data):
"""Initialize list of lists from raw row data"""
raw_rows = data['raw_rows']
# Create list of lists (deep copy)
list_of_lists = [row.copy() for row in raw_rows]
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(list_of_lists))): # First 10 rows
row = list_of_lists[i]
for j in range(min(3, len(row))): # First 3 elements per row
checksum ^= row[j]
return checksum
# Bucketize initialization operation
@benchmark.implementation("csr", "initialization_bucketize")
def initialization_bucketize_csr(data):
"""Initialize CSR using bucketize from (key, value) pairs"""
keys = data['keys']
values = data['values']
num_rows = data['num_rows']
# Create CSR using bucketize
csr = CSR.bucketize(num_rows, keys, values)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(csr))): # First 10 rows
row_view = csr[i]
for j in range(min(3, len(row_view))): # First 3 elements per row
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "initialization_bucketize")
def initialization_bucketize_list_of_lists(data):
"""Initialize list of lists by grouping (key, value) pairs manually"""
keys = data['keys']
values = data['values']
num_rows = data['num_rows']
# Group by key manually (simulating what bucketize does)
list_of_lists = [[] for _ in range(num_rows)]
for e, k in enumerate(keys):
list_of_lists[k].append(values[e])
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(list_of_lists))): # First 10 rows
row = list_of_lists[i]
for j in range(min(3, len(row))): # First 3 elements per row
checksum ^= row[j]
return checksum
# Heapify operation
@benchmark.implementation("csr", "heapify")
def heapify_csr(data):
"""Heapify all CSR rows"""
csr = data['csr']
checksum = 0
for row_view in csr:
heapify(row_view)
# Add first few elements to checksum
for j in range(min(3, len(row_view))):
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "heapify")
def heapify_list_of_lists(data):
"""Heapify all lists in list of lists"""
list_of_lists = data['list_of_lists']
checksum = 0
for row in list_of_lists:
heapify(row)
# Add first few elements to checksum
for j in range(min(3, len(row))):
checksum ^= row[j]
return checksum
# Heappop operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappop")
def heappop_csr(data):
"""Pop from CSR heaps"""
csr = data['csr']
checksum = 0
# Pop from multiple rows
for row_view in csr:
pop_count = min(3, len(row_view)) # Pop up to 3 elements per row
for _ in range(pop_count):
if row_view:
val = heappop(row_view)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heappop")
def heappop_list_of_lists(data):
"""Pop from list of lists heaps"""
list_of_lists = data['list_of_lists']
checksum = 0
# Pop from multiple rows
for row in list_of_lists:
pop_count = min(3, len(row)) # Pop up to 3 elements per row
for _ in range(pop_count):
if row:
val = heappop(row)
checksum ^= val
return checksum
# Heapreplace operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heapreplace")
def heapreplace_csr(data):
"""Replace in CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Replace in multiple rows
for row_view in csr:
if row_view:
replace_count = min(2, len(row_view)) # Replace up to 2 elements per row
for j in range(replace_count):
if row_view:
val = heapreplace(row_view, new_value + j)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heapreplace")
def heapreplace_list_of_lists(data):
"""Replace in list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Replace in multiple rows
for row in list_of_lists:
if row:
replace_count = min(2, len(row)) # Replace up to 2 elements per row
for j in range(replace_count):
if row:
val = heapreplace(row, new_value + j)
checksum ^= val
return checksum
# Heappush operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappush")
def heappush_csr(data):
"""Push to CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Push to multiple rows
for i in range(min(5, len(csr))): # Push to first 5 rows
row_view = csr[i]
# Check if there's space to expand (view can grow within A bounds)
if row_view.r < len(csr.A):
push_count = min(2, len(csr.A) - row_view.r) # Push up to 2 elements per row
for j in range(push_count):
val = new_value + i * 10 + j
heappush(row_view, val)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heappush")
def heappush_list_of_lists(data):
"""Push to list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Push to multiple rows
for i in range(min(5, len(list_of_lists))): # Push to first 5 rows
row = list_of_lists[i]
push_count = 2 # Push 2 elements per row
for j in range(push_count):
val = new_value + i * 10 + j
heappush(row, val)
checksum ^= val
return checksum
# Heappushpop operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappushpop")
def heappushpop_csr(data):
"""Push and pop from CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Pushpop from multiple rows
for i, row_view in enumerate(csr):
if row_view:
pushpop_count = min(2, len(row_view)) # Pushpop up to 2 elements per row
for j in range(pushpop_count):
val = new_value + i * 10 + j
popped = heappushpop(row_view, val)
checksum ^= popped
return checksum
@benchmark.implementation("list_of_lists", "heappushpop")
def heappushpop_list_of_lists(data):
"""Push and pop from list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Pushpop from multiple rows
for i, row in enumerate(list_of_lists):
if row:
pushpop_count = min(2, len(row)) # Pushpop up to 2 elements per row
for j in range(pushpop_count):
val = new_value + i * 10 + j
popped = heappushpop(row, val)
checksum ^= popped
return checksum
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()
#!/usr/bin/env python3
"""
Benchmark comparing heap operations on CSR sparse rows vs list of lists/heaps.
Tests heapify, heappop, heapreplace, heappush, heappushpop operations.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
"""
Declarative benchmark framework with minimal boilerplate.
Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""
import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs"""
name: str
sizes: List[int] = None
operations: List[str] = None
iterations: int = 10
warmup: int = 2
output_dir: str = "./output/benchmark_results"
save_results: bool = True
plot_results: bool = True
plot_scale: str = "loglog" # Options: "loglog", "linear", "semilogx", "semilogy"
progressive: bool = True # Show results operation by operation across sizes
# Profiling mode
profile_mode: bool = False
profile_size: int = None
profile_operation: str = None
profile_implementation: str = None
def __post_init__(self):
if self.sizes is None:
self.sizes = [100, 1000, 10000, 100000]
if self.operations is None:
self.operations = ['default']
class Benchmark:
"""Declarative benchmark framework using decorators"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.data_generators = {}
self.implementations = {}
self.validators = {}
self.setups = {}
self.results = []
def profile(self, operation: str = None, size: int = None, implementation: str = None):
"""Create a profiling version of this benchmark"""
profile_config = BenchmarkConfig(
name=f"{self.config.name}_profile",
sizes=self.config.sizes,
operations=self.config.operations,
profile_mode=True,
profile_operation=operation,
profile_size=size,
profile_implementation=implementation,
save_results=False,
plot_results=False
)
profile_benchmark = Benchmark(profile_config)
profile_benchmark.data_generators = self.data_generators
profile_benchmark.implementations = self.implementations
profile_benchmark.validators = self.validators
profile_benchmark.setups = self.setups
return profile_benchmark
def parse_args(self):
"""Parse command line arguments for profiling mode"""
parser = argparse.ArgumentParser(
description=f"Benchmark {self.config.name} with optional profiling mode",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Normal benchmark mode
python benchmark.py
# Profile specific operation and implementation
python benchmark.py --profile --operation random_access --implementation grid
# Profile with specific size
python benchmark.py --profile --size 1000000
# Profile all implementations of an operation
python benchmark.py --profile --operation construction
"""
)
parser.add_argument('--profile', action='store_true',
help='Run in profiling mode (minimal overhead for profilers)')
parser.add_argument('--operation', type=str,
help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
parser.add_argument('--size', type=int,
help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
parser.add_argument('--implementation', type=str,
help='Specific implementation to profile (default: all)')
args = parser.parse_args()
# If profile mode requested, return a profiling benchmark
if args.profile:
return self.profile(
operation=args.operation,
size=args.size,
implementation=args.implementation
)
# Otherwise return self for normal mode
return self
def data_generator(self, name: str = "default"):
"""Decorator to register data generator"""
def decorator(func):
self.data_generators[name] = func
return func
return decorator
def implementation(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register implementation"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.implementations:
self.implementations[op] = {}
self.implementations[op][name] = func
return func
return decorator
def validator(self, operation: str = "default"):
"""Decorator to register custom validator"""
def decorator(func):
self.validators[operation] = func
return func
return decorator
def setup(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register setup function that runs before timing"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.setups:
self.setups[op] = {}
self.setups[op][name] = func
return func
return decorator
def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
"""Measure execution time with warmup and optional setup"""
# Warmup runs
for _ in range(self.config.warmup):
try:
if setup_func:
setup_data = setup_func(data)
func(setup_data)
else:
func(data)
except Exception:
# If warmup fails, let the main measurement handle the error
break
# Actual measurement
start = time.perf_counter()
for _ in range(self.config.iterations):
if setup_func:
setup_data = setup_func(data)
result = func(setup_data)
else:
result = func(data)
elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
return result, elapsed_ms
def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
"""Validate result using custom validator or default comparison"""
if operation in self.validators:
return self.validators[operation](expected, actual)
return expected == actual
def run(self):
"""Run all benchmarks"""
if self.config.profile_mode:
self._run_profile_mode()
else:
self._run_normal_mode()
def _run_normal_mode(self):
"""Run normal benchmark mode"""
print(f"Running {self.config.name}")
print(f"Sizes: {self.config.sizes}")
print(f"Operations: {self.config.operations}")
print("="*80)
# Always show progressive results: operation by operation across all sizes
for operation in self.config.operations:
for size in self.config.sizes:
self._run_single(operation, size)
# Save and plot results
if self.config.save_results:
self._save_results()
if self.config.plot_results:
self._plot_results()
# Print summary
self._print_summary()
def _run_profile_mode(self):
"""Run profiling mode with minimal overhead for use with vmprof"""
operation = self.config.profile_operation or self.config.operations[0]
size = self.config.profile_size or max(self.config.sizes)
impl_name = self.config.profile_implementation
print(f"PROFILING MODE: {self.config.name}")
print(f"Operation: {operation}, Size: {size}")
if impl_name:
print(f"Implementation: {impl_name}")
print("="*80)
print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
print("="*80)
# Generate test data
generator = self.data_generators.get(operation, self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations
impls = self.implementations.get(operation, {})
if not impls:
raise ValueError(f"No implementations for operation: {operation}")
# Filter to specific implementation if requested
if impl_name:
if impl_name not in impls:
raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
impls = {impl_name: impls[impl_name]}
# Run with minimal overhead - no timing, no validation
for name, func in impls.items():
print(f"\nRunning {name}...")
sys.stdout.flush()
# Setup if needed
setup_func = self.setups.get(operation, {}).get(name)
if setup_func:
data = setup_func(test_data)
else:
data = test_data
# Run the actual function (this is what vmprof will profile)
result = func(data)
print(f"Completed {name}, result checksum: {result}")
sys.stdout.flush()
def _run_single(self, operation: str, size: int):
"""Run a single operation/size combination"""
print(f"\nOperation: {operation}, Size: {size}")
print("-" * 50)
sys.stdout.flush()
# Generate test data
generator = self.data_generators.get(operation,
self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations for this operation
impls = self.implementations.get(operation, {})
if not impls:
print(f"No implementations for operation: {operation}")
return
# Get setup functions for this operation
setups = self.setups.get(operation, {})
# Run reference implementation first
ref_name, ref_impl = next(iter(impls.items()))
ref_setup = setups.get(ref_name)
expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
# Run all implementations
for impl_name, impl_func in impls.items():
try:
setup_func = setups.get(impl_name)
result, time_ms = self.measure_time(impl_func, test_data, setup_func)
correct = self.validate_result(expected_result, result, operation)
# Store result
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': time_ms,
'correct': correct,
'error': None
})
status = "OK" if correct else "FAIL"
print(f" {impl_name:<20} {time_ms:>8.3f} ms {status}")
sys.stdout.flush()
except Exception as e:
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': float('inf'),
'correct': False,
'error': str(e)
})
print(f" {impl_name:<20} ERROR: {str(e)[:40]}")
sys.stdout.flush()
def _save_results(self):
"""Save results to JSON"""
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to {filename}")
def _plot_results(self):
"""Generate plots using matplotlib if available"""
try:
import matplotlib.pyplot as plt
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group and prepare data for plotting
data_by_op = self._group_results_by_operation()
# Create plots for each operation
for operation, operation_data in data_by_op.items():
self._create_performance_plot(plt, operation, operation_data, output_dir)
except ImportError:
print("Matplotlib not available - skipping plots")
except Exception as e:
print(f"Plotting failed: {e}")
def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
"""Group results by operation and size for plotting"""
data_by_op = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['time_ms'] != float('inf') and r['correct']:
data_by_op[r['operation']][r['size']].append({
'implementation': r['implementation'],
'time_ms': r['time_ms']
})
return data_by_op
def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
"""Create a performance plot for a single operation"""
sizes = sorted(operation_data.keys())
implementations = set()
for size_data in operation_data.values():
for entry in size_data:
implementations.add(entry['implementation'])
implementations = sorted(implementations)
plt.figure(figsize=(10, 6))
for impl in implementations:
impl_times = []
impl_sizes = []
for size in sizes:
times = [entry['time_ms'] for entry in operation_data[size]
if entry['implementation'] == impl]
if times:
impl_times.append(statistics.mean(times))
impl_sizes.append(size)
if impl_times:
plt.plot(impl_sizes, impl_times, 'o-', label=impl)
plt.xlabel('Input Size')
plt.ylabel('Time (ms)')
plt.title(f'{self.config.name} - {operation} Operation')
plt.legend()
plt.grid(True, alpha=0.3)
# Apply the configured scaling
if self.config.plot_scale == "loglog":
plt.loglog()
elif self.config.plot_scale == "linear":
pass # Default linear scale
elif self.config.plot_scale == "semilogx":
plt.semilogx()
elif self.config.plot_scale == "semilogy":
plt.semilogy()
else:
# Default to loglog if invalid option
plt.loglog()
plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Plot saved: {plot_file}")
def _print_summary(self):
"""Print performance summary"""
print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
# Group by operation
by_operation = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['error'] is None and r['time_ms'] != float('inf'):
by_operation[r['operation']][r['implementation']].append(r['time_ms'])
print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
print("-" * 70)
for op, impl_times in sorted(by_operation.items()):
# Calculate averages
avg_times = [(impl, statistics.mean(times))
for impl, times in impl_times.items()]
avg_times.sort(key=lambda x: x[1])
if avg_times:
best_impl, best_time = avg_times[0]
worst_time = avg_times[-1][1]
speedup = worst_time / best_time if best_time > 0 else 0
print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
https://kobejean.github.io/cp-library
'''
from typing import Generic
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')
def list_find(lst: list, value, start = 0, stop = sys.maxsize):
try:
return lst.index(value, start, stop)
except:
return -1
class view(Generic[_T]):
__slots__ = 'A', 'l', 'r'
def __init__(V, A: list[_T], l: int, r: int): V.A, V.l, V.r = A, l, r
def __len__(V): return V.r - V.l
def __getitem__(V, i: int):
if 0 <= i < V.r - V.l: return V.A[V.l+i]
else: raise IndexError
def __setitem__(V, i: int, v: _T): V.A[V.l+i] = v
def __contains__(V, v: _T): return list_find(V.A, v, V.l, V.r) != -1
def set_range(V, l: int, r: int): V.l, V.r = l, r
def index(V, v: _T): return V.A.index(v, V.l, V.r) - V.l
def reverse(V):
l, r = V.l, V.r-1
while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; l += 1; r -= 1
def sort(V, /, *args, **kwargs):
A = V.A[V.l:V.r]; A.sort(*args, **kwargs)
for i,a in enumerate(A,V.l): V.A[i] = a
def pop(V): V.r -= 1; return V.A[V.r]
def append(V, v: _T): V.A[V.r] = v; V.r += 1
def popleft(V): V.l += 1; return V.A[V.l-1]
def appendleft(V, v: _T): V.l -= 1; V.A[V.l] = v;
def validate(V): return 0 <= V.l <= V.r <= len(V.A)
class CSR(Generic[_T]):
__slots__ = 'A', 'O'
def __init__(csr, A: list[_T], O: list[int]): csr.A, csr.O = A, O
def __len__(csr): return len(csr.O)-1
def __getitem__(csr, i: int): return view(csr.A, csr.O[i], csr.O[i+1])
def __call__(csr, i: int, j: int): return csr.A[csr.O[i]+j]
def set(csr, i: int, j: int, v: _T): csr.A[csr.O[i]+j] = v
@classmethod
def bucketize(cls, N: int, K: list[int], V: list[_T]):
A: list[_T] = [0]*len(K); O = [0]*(N+1)
for k in K: O[k] += 1
for i in range(N): O[i+1] += O[i]
for e in range(len(K)): k = K[~e]; O[k] -= 1; A[O[k]] = V[~e]
return cls(A, O)
def heappush(heap: list, item):
heap.append(item)
heapsiftdown(heap, 0, len(heap)-1)
def heappop(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapreplace(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heappushpop(heap: list, item):
if heap and heap[0] < item: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapify(x: list):
for i in reversed(range(len(x)//2)): heapsiftup(x, i)
def heapsiftdown(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and item < heap[p := (pos-1)>>1]: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and heap[c := c+(heap[c+1]<heap[c])] < item: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and heap[c] < item: heap[pos], pos = heap[c], c
heap[pos] = item
def heappop_max(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapify_max(x: list):
for i in reversed(range(len(x)//2)): heapsiftup_max(x, i)
def heappush_max(heap: list, item):
heap.append(item); heapsiftdown_max(heap, 0, len(heap)-1)
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heappushpop_max(heap: list, item):
if heap and heap[0] > item: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapsiftdown_max(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and heap[p := (pos-1)>>1] < item: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup_max(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and item < heap[c := c+(heap[c]<heap[c+1])]: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and item < heap[c]: heap[pos], pos = heap[c], c
heap[pos] = item
# Configure benchmark
config = BenchmarkConfig(
name="heap_csr",
sizes=[10000000, 1000000, 100000, 10000, 1000, 100, 10], # Reverse order to warm up JIT
operations=['initialization', 'initialization_bucketize', 'heapify', 'heappop', 'heapreplace', 'heappush', 'heappushpop'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/heap_csr"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator for initialization
@benchmark.data_generator("initialization")
def generate_initialization_data(size: int, operation: str):
"""Generate raw data for initialization benchmark"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Generate raw row data
raw_rows = []
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
row_data = [random.randint(1, 1000000) for _ in range(row_size)]
raw_rows.append(row_data)
return {
'raw_rows': raw_rows,
'num_rows': num_rows,
'size': size,
'operation': operation
}
# Data generator for bucketize initialization
@benchmark.data_generator("initialization_bucketize")
def generate_bucketize_data(size: int, operation: str):
"""Generate (key, value) pairs for bucketize initialization benchmark"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Generate (key, value) pairs
keys = []
values = []
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
for _ in range(row_size):
keys.append(i) # Row index as key
values.append(random.randint(1, 1000000)) # Random value
# Shuffle to simulate unsorted input
combined = list(zip(keys, values))
random.shuffle(combined)
keys, values = zip(*combined)
keys, values = list(keys), list(values)
# Create equivalent raw rows for list_of_lists comparison
raw_rows = [[] for _ in range(num_rows)]
for key, value in zip(keys, values):
raw_rows[key].append(value)
return {
'keys': keys,
'values': values,
'raw_rows': raw_rows,
'num_rows': num_rows,
'size': size,
'operation': operation
}
# Data generator for other operations
@benchmark.data_generator("default")
def generate_csr_heap_data(size: int, operation: str):
"""Generate test data for CSR heap operations"""
# Generate multiple sparse rows (each row is a heap)
num_rows = max(10, size // 100) # 10-100 rows depending on size
total_elements = size
# Create sparse row data
A = [] # All elements concatenated
O = [0] # Offsets for each row
elements_per_row = total_elements // num_rows
remaining = total_elements % num_rows
for i in range(num_rows):
# Variable row sizes (some rows get extra elements)
row_size = elements_per_row + (1 if i < remaining else 0)
row_size = max(1, row_size) # Ensure at least 1 element per row
# Generate random heap data for this row
row_data = [random.randint(1, 1000000) for _ in range(row_size)]
A.extend(row_data)
O.append(len(A))
# Create list of lists equivalent
list_of_lists = []
for i in range(num_rows):
start, end = O[i], O[i + 1]
list_of_lists.append(A[start:end].copy())
return {
'A': A,
'O': O,
'list_of_lists': list_of_lists,
'num_rows': num_rows,
'new_value': random.randint(1, 1000000),
'target_row': random.randint(0, num_rows - 1),
'size': size,
'operation': operation
}
# Setup functions for operations that modify data
@benchmark.setup("csr", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_csr_heap(data):
"""Setup function that copies data and heapifies CSR rows"""
new_data = data.copy()
new_data['A'] = data['A'].copy()
new_data['O'] = data['O'].copy()
# Create CSR and pre-heapify all rows
csr = CSR(new_data['A'], new_data['O'])
for row_view in csr:
heapify(row_view)
new_data['csr'] = csr
return new_data
@benchmark.setup("list_of_lists", ["heappop", "heapreplace", "heappush", "heappushpop"])
def setup_list_of_lists_heap(data):
"""Setup function that copies data and heapifies list of lists"""
new_data = data.copy()
# Deep copy list of lists
new_data['list_of_lists'] = [row.copy() for row in data['list_of_lists']]
# Pre-heapify all lists
for row in new_data['list_of_lists']:
heapify(row)
return new_data
# For heapify operation, we need setup without pre-heapifying
@benchmark.setup("csr", ["heapify"])
def setup_csr_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['A'] = data['A'].copy()
new_data['O'] = data['O'].copy()
new_data['csr'] = CSR(new_data['A'], new_data['O'])
return new_data
@benchmark.setup("list_of_lists", ["heapify"])
def setup_list_of_lists_heapify(data):
"""Setup function that only copies data for heapify operation"""
new_data = data.copy()
new_data['list_of_lists'] = [row.copy() for row in data['list_of_lists']]
return new_data
# Initialization operation
@benchmark.implementation("csr", "initialization")
def initialization_csr(data):
"""Initialize CSR from raw row data"""
raw_rows = data['raw_rows']
# Create CSR structure
A = [] # All elements concatenated
O = [0] # Offsets for each row
for row_data in raw_rows:
A.extend(row_data)
O.append(len(A))
# Create CSR object
csr = CSR(A, O)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(csr))): # First 10 rows
row_view = csr[i]
for j in range(min(3, len(row_view))): # First 3 elements per row
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "initialization")
def initialization_list_of_lists(data):
"""Initialize list of lists from raw row data"""
raw_rows = data['raw_rows']
# Create list of lists (deep copy)
list_of_lists = [row.copy() for row in raw_rows]
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(list_of_lists))): # First 10 rows
row = list_of_lists[i]
for j in range(min(3, len(row))): # First 3 elements per row
checksum ^= row[j]
return checksum
# Bucketize initialization operation
@benchmark.implementation("csr", "initialization_bucketize")
def initialization_bucketize_csr(data):
"""Initialize CSR using bucketize from (key, value) pairs"""
keys = data['keys']
values = data['values']
num_rows = data['num_rows']
# Create CSR using bucketize
csr = CSR.bucketize(num_rows, keys, values)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(csr))): # First 10 rows
row_view = csr[i]
for j in range(min(3, len(row_view))): # First 3 elements per row
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "initialization_bucketize")
def initialization_bucketize_list_of_lists(data):
"""Initialize list of lists by grouping (key, value) pairs manually"""
keys = data['keys']
values = data['values']
num_rows = data['num_rows']
# Group by key manually (simulating what bucketize does)
list_of_lists = [[] for _ in range(num_rows)]
for e, k in enumerate(keys):
list_of_lists[k].append(values[e])
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(min(10, len(list_of_lists))): # First 10 rows
row = list_of_lists[i]
for j in range(min(3, len(row))): # First 3 elements per row
checksum ^= row[j]
return checksum
# Heapify operation
@benchmark.implementation("csr", "heapify")
def heapify_csr(data):
"""Heapify all CSR rows"""
csr = data['csr']
checksum = 0
for row_view in csr:
heapify(row_view)
# Add first few elements to checksum
for j in range(min(3, len(row_view))):
checksum ^= row_view[j]
return checksum
@benchmark.implementation("list_of_lists", "heapify")
def heapify_list_of_lists(data):
"""Heapify all lists in list of lists"""
list_of_lists = data['list_of_lists']
checksum = 0
for row in list_of_lists:
heapify(row)
# Add first few elements to checksum
for j in range(min(3, len(row))):
checksum ^= row[j]
return checksum
# Heappop operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappop")
def heappop_csr(data):
"""Pop from CSR heaps"""
csr = data['csr']
checksum = 0
# Pop from multiple rows
for row_view in csr:
pop_count = min(3, len(row_view)) # Pop up to 3 elements per row
for _ in range(pop_count):
if row_view:
val = heappop(row_view)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heappop")
def heappop_list_of_lists(data):
"""Pop from list of lists heaps"""
list_of_lists = data['list_of_lists']
checksum = 0
# Pop from multiple rows
for row in list_of_lists:
pop_count = min(3, len(row)) # Pop up to 3 elements per row
for _ in range(pop_count):
if row:
val = heappop(row)
checksum ^= val
return checksum
# Heapreplace operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heapreplace")
def heapreplace_csr(data):
"""Replace in CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Replace in multiple rows
for row_view in csr:
if row_view:
replace_count = min(2, len(row_view)) # Replace up to 2 elements per row
for j in range(replace_count):
if row_view:
val = heapreplace(row_view, new_value + j)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heapreplace")
def heapreplace_list_of_lists(data):
"""Replace in list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Replace in multiple rows
for row in list_of_lists:
if row:
replace_count = min(2, len(row)) # Replace up to 2 elements per row
for j in range(replace_count):
if row:
val = heapreplace(row, new_value + j)
checksum ^= val
return checksum
# Heappush operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappush")
def heappush_csr(data):
"""Push to CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Push to multiple rows
for i in range(min(5, len(csr))): # Push to first 5 rows
row_view = csr[i]
# Check if there's space to expand (view can grow within A bounds)
if row_view.r < len(csr.A):
push_count = min(2, len(csr.A) - row_view.r) # Push up to 2 elements per row
for j in range(push_count):
val = new_value + i * 10 + j
heappush(row_view, val)
checksum ^= val
return checksum
@benchmark.implementation("list_of_lists", "heappush")
def heappush_list_of_lists(data):
"""Push to list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Push to multiple rows
for i in range(min(5, len(list_of_lists))): # Push to first 5 rows
row = list_of_lists[i]
push_count = 2 # Push 2 elements per row
for j in range(push_count):
val = new_value + i * 10 + j
heappush(row, val)
checksum ^= val
return checksum
# Heappushpop operation (heaps are already heapified in setup)
@benchmark.implementation("csr", "heappushpop")
def heappushpop_csr(data):
"""Push and pop from CSR heaps"""
csr = data['csr']
new_value = data['new_value']
checksum = 0
# Pushpop from multiple rows
for i, row_view in enumerate(csr):
if row_view:
pushpop_count = min(2, len(row_view)) # Pushpop up to 2 elements per row
for j in range(pushpop_count):
val = new_value + i * 10 + j
popped = heappushpop(row_view, val)
checksum ^= popped
return checksum
@benchmark.implementation("list_of_lists", "heappushpop")
def heappushpop_list_of_lists(data):
"""Push and pop from list of lists heaps"""
list_of_lists = data['list_of_lists']
new_value = data['new_value']
checksum = 0
# Pushpop from multiple rows
for i, row in enumerate(list_of_lists):
if row:
pushpop_count = min(2, len(row)) # Pushpop up to 2 elements per row
for j in range(pushpop_count):
val = new_value + i * 10 + j
popped = heappushpop(row, val)
checksum ^= popped
return checksum
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()