This documentation is automatically generated by online-judge-tools/verification-helper
#!/usr/bin/env python3
"""
Benchmark comparing operations on list slices vs view objects.
Tests various operations to measure the overhead of slice copying vs view indirection.
"""
import random
import sys
import os
from cp_library.ds.heap.fast_heapq import heapify, heappop, heapreplace, heappush, heappushpop
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.ds.view.view_cls import view
# Configure benchmark
config = BenchmarkConfig(
name="list_view",
sizes=[1000000, 100000, 10000, 1000, 100], # Reverse order to warm up JIT
operations=['construction', 'sum', 'modify', 'index', 'reverse', 'sort', 'nested_sum', 'pop', 'append'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/list_view"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_view_data(size: int, operation: str):
"""Generate test data for slice/view operations"""
# Generate random list
data = [random.randint(0, 1000000) for _ in range(size)]
# Generate random slice boundaries (10-50% of list)
slice_size = random.randint(size // 10, size // 2)
start = random.randint(0, size - slice_size)
end = start + slice_size
return {
'data': data,
'start': start,
'end': end,
'slice_size': slice_size,
'search_value': random.randint(0, 1000000),
'size': size,
'operation': operation
}
# Construction operation
@benchmark.implementation("slice", "construction")
def construction_slice(data):
"""Create a slice copy of the list"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice (copies data)
slice_copy = lst[start:end]
# Return checksum to ensure it's not optimized away
checksum = 0
for x in slice_copy:
checksum ^= x
return checksum
@benchmark.implementation("view", "construction")
def construction_view(data):
"""Create a view of the list"""
lst = data['data']
start, end = data['start'], data['end']
# Create view (no copy)
list_view = view(lst, start, end)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(len(list_view)):
checksum ^= list_view[i]
return checksum
# Sum operation
@benchmark.implementation("slice", "sum")
def sum_slice(data):
"""Sum elements in a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and sum
slice_copy = lst[start:end]
return sum(slice_copy) & 0xFFFFFFFF # Keep as 32-bit for checksum
@benchmark.implementation("view", "sum")
def sum_view(data):
"""Sum elements in a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and sum through it
list_view = view(lst, start, end)
total = 0
for i in range(len(list_view)):
total += list_view[i]
return total & 0xFFFFFFFF # Keep as 32-bit for checksum
@benchmark.implementation("view_direct", "sum")
def sum_view_direct(data):
"""Sum elements using direct indexing"""
lst = data['data']
start, end = data['start'], data['end']
# Sum directly without creating slice or view
total = 0
for i in range(start, end):
total += lst[i]
return total & 0xFFFFFFFF # Keep as 32-bit for checksum
# Setup functions for operations that need copying
@benchmark.setup("slice", ["modify", "reverse", "sort", "pop", "append"])
def setup_slice_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view_direct", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_direct_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
# Modify operation
@benchmark.implementation("slice", "modify")
def modify_slice(data):
"""Modify elements in a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and modify
slice_copy = lst[start:end]
for i in range(len(slice_copy)):
slice_copy[i] = (slice_copy[i] * 2) & 0xFFFFFFFF
# Copy back to original positions manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "modify")
def modify_view(data):
"""Modify elements through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and modify through it
list_view = view(lst, start, end)
for i in range(len(list_view)):
list_view[i] = (list_view[i] * 2) & 0xFFFFFFFF
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view_direct", "modify")
def modify_view_direct(data):
"""Modify elements using direct indexing"""
lst = data['data']
start, end = data['start'], data['end']
# Modify directly
for i in range(start, end):
lst[i] = (lst[i] * 2) & 0xFFFFFFFF
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
# Index operation
@benchmark.implementation("slice", "index")
def index_slice(data):
"""Find index of element in a slice"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Create slice and search
slice_copy = lst[start:end]
try:
idx = slice_copy.index(search_value)
return start + idx # Return global index
except ValueError:
return -1
@benchmark.implementation("view", "index")
def index_view(data):
"""Find index of element in a view"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Create view and search in it
list_view = view(lst, start, end)
try:
idx = list_view.index(search_value)
return start + idx # Return global index
except ValueError:
return -1
@benchmark.implementation("view_direct", "index")
def index_view_direct(data):
"""Find index of element using direct search"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Search directly
for i in range(start, end):
if lst[i] == search_value:
return i
return -1
# Reverse operation
@benchmark.implementation("slice", "reverse")
def reverse_slice(data):
"""Reverse a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice, reverse, and copy back manually
slice_copy = lst[start:end]
slice_copy.reverse()
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "reverse")
def reverse_view(data):
"""Reverse through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and reverse through it
list_view = view(lst, start, end)
list_view.reverse()
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view_direct", "reverse")
def reverse_view_direct(data):
"""Reverse using direct manipulation"""
lst = data['data']
start, end = data['start'], data['end']
# Reverse directly
left, right = start, end - 1
while left < right:
lst[left], lst[right] = lst[right], lst[left]
left += 1
right -= 1
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
# Sort operation
@benchmark.implementation("slice", "sort")
def sort_slice(data):
"""Sort a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice, sort, and copy back manually
slice_copy = lst[start:end]
slice_copy.sort()
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum of first elements
checksum = 0
for i in range(start, min(start + 100, end)):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "sort")
def sort_view(data):
"""Sort through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and sort through it
list_view = view(lst, start, end)
list_view.sort()
# Return checksum of first elements
checksum = 0
for i in range(start, min(start + 100, end)):
checksum ^= lst[i]
return checksum
# Nested sum operation (multiple slices)
@benchmark.implementation("slice", "nested_sum")
def nested_sum_slice(data):
"""Sum multiple overlapping slices"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Create 4 overlapping slices
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
slice_copy = lst[s:e]
total += sum(slice_copy)
return total & 0xFFFFFFFF
@benchmark.implementation("view", "nested_sum")
def nested_sum_view(data):
"""Sum multiple overlapping views"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Create 4 overlapping views
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
list_view = view(lst, s, e)
for i in range(len(list_view)):
total += list_view[i]
return total & 0xFFFFFFFF
@benchmark.implementation("view_direct", "nested_sum")
def nested_sum_view_direct(data):
"""Sum multiple overlapping ranges directly"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Sum 4 overlapping ranges
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
for i in range(s, e):
total += lst[i]
return total & 0xFFFFFFFF
# Pop operation
@benchmark.implementation("slice", "pop")
def pop_slice(data):
"""Pop from end of slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop from it
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(100, len(slice_copy))): # Pop up to 100 elements
if slice_copy:
val = slice_copy.pop()
checksum ^= val
# Copy back to original (shortened)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
@benchmark.implementation("view", "pop")
def pop_view(data):
"""Pop from end of view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and pop from it
list_view = view(lst, start, end)
checksum = 0
for _ in range(min(100, len(list_view))): # Pop up to 100 elements
if len(list_view) > 0:
val = list_view.pop()
checksum ^= val
return checksum
@benchmark.implementation("view_direct", "pop")
def pop_view_direct(data):
"""Pop using direct list manipulation"""
lst = data['data']
start, end = data['start'], data['end']
checksum = 0
current_end = end
for _ in range(min(100, end - start)): # Pop up to 100 elements
if current_end > start:
current_end -= 1
val = lst[current_end]
checksum ^= val
return checksum
# Append operation
@benchmark.implementation("slice", "append")
def append_slice(data):
"""Append to slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and append to it
slice_copy = lst[start:end]
checksum = 0
for i in range(100): # Append 100 elements
val = (i * 17) & 0xFFFFFFFF
slice_copy.append(val)
checksum ^= val
# Note: Can't copy back easily since slice grew, so just return checksum
return checksum
@benchmark.implementation("view", "append")
def append_view(data):
"""Append to view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and append to it
list_view = view(lst, start, end)
checksum = 0
for i in range(min(100, len(lst) - end)): # Append up to 100 elements or space available
val = (i * 17) & 0xFFFFFFFF
list_view.append(val)
checksum ^= val
return checksum
@benchmark.implementation("view_direct", "append")
def append_view_direct(data):
"""Append using direct list manipulation"""
lst = data['data']
start, end = data['start'], data['end']
checksum = 0
current_end = end
for i in range(min(100, len(lst) - end)): # Append up to 100 elements or space available
val = (i * 17) & 0xFFFFFFFF
if current_end < len(lst):
lst[current_end] = val
current_end += 1
checksum ^= val
return checksum
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()
#!/usr/bin/env python3
"""
Benchmark comparing operations on list slices vs view objects.
Tests various operations to measure the overhead of slice copying vs view indirection.
"""
import random
import sys
import os
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
https://kobejean.github.io/cp-library
'''
def heappush(heap: list, item):
heap.append(item)
heapsiftdown(heap, 0, len(heap)-1)
def heappop(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapreplace(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heappushpop(heap: list, item):
if heap and heap[0] < item: item, heap[0] = heap[0], item; heapsiftup(heap, 0)
return item
def heapify(x: list):
for i in reversed(range(len(x)//2)): heapsiftup(x, i)
def heapsiftdown(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and item < heap[p := (pos-1)>>1]: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and heap[c := c+(heap[c+1]<heap[c])] < item: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and heap[c] < item: heap[pos], pos = heap[c], c
heap[pos] = item
def heappop_max(heap: list):
item = heap.pop()
if heap: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapify_max(x: list):
for i in reversed(range(len(x)//2)): heapsiftup_max(x, i)
def heappush_max(heap: list, item):
heap.append(item); heapsiftdown_max(heap, 0, len(heap)-1)
def heapreplace_max(heap: list, item):
item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heappushpop_max(heap: list, item):
if heap and heap[0] > item: item, heap[0] = heap[0], item; heapsiftup_max(heap, 0)
return item
def heapsiftdown_max(heap: list, root: int, pos: int):
item = heap[pos]
while root < pos and heap[p := (pos-1)>>1] < item: heap[pos], pos = heap[p], p
heap[pos] = item
def heapsiftup_max(heap: list, pos: int):
n, item, c = len(heap)-1, heap[pos], pos<<1|1
while c < n and item < heap[c := c+(heap[c]<heap[c+1])]: heap[pos], pos, c = heap[c], c, c<<1|1
if c == n and item < heap[c]: heap[pos], pos = heap[c], c
heap[pos] = item
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
"""
Declarative benchmark framework with minimal boilerplate.
Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""
import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs"""
name: str
sizes: List[int] = None
operations: List[str] = None
iterations: int = 10
warmup: int = 2
output_dir: str = "./output/benchmark_results"
save_results: bool = True
plot_results: bool = True
plot_scale: str = "loglog" # Options: "loglog", "linear", "semilogx", "semilogy"
progressive: bool = True # Show results operation by operation across sizes
# Profiling mode
profile_mode: bool = False
profile_size: int = None
profile_operation: str = None
profile_implementation: str = None
def __post_init__(self):
if self.sizes is None:
self.sizes = [100, 1000, 10000, 100000]
if self.operations is None:
self.operations = ['default']
class Benchmark:
"""Declarative benchmark framework using decorators"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.data_generators = {}
self.implementations = {}
self.validators = {}
self.setups = {}
self.results = []
def profile(self, operation: str = None, size: int = None, implementation: str = None):
"""Create a profiling version of this benchmark"""
profile_config = BenchmarkConfig(
name=f"{self.config.name}_profile",
sizes=self.config.sizes,
operations=self.config.operations,
profile_mode=True,
profile_operation=operation,
profile_size=size,
profile_implementation=implementation,
save_results=False,
plot_results=False
)
profile_benchmark = Benchmark(profile_config)
profile_benchmark.data_generators = self.data_generators
profile_benchmark.implementations = self.implementations
profile_benchmark.validators = self.validators
profile_benchmark.setups = self.setups
return profile_benchmark
def parse_args(self):
"""Parse command line arguments for profiling mode"""
parser = argparse.ArgumentParser(
description=f"Benchmark {self.config.name} with optional profiling mode",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Normal benchmark mode
python benchmark.py
# Profile specific operation and implementation
python benchmark.py --profile --operation random_access --implementation grid
# Profile with specific size
python benchmark.py --profile --size 1000000
# Profile all implementations of an operation
python benchmark.py --profile --operation construction
"""
)
parser.add_argument('--profile', action='store_true',
help='Run in profiling mode (minimal overhead for profilers)')
parser.add_argument('--operation', type=str,
help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
parser.add_argument('--size', type=int,
help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
parser.add_argument('--implementation', type=str,
help='Specific implementation to profile (default: all)')
args = parser.parse_args()
# If profile mode requested, return a profiling benchmark
if args.profile:
return self.profile(
operation=args.operation,
size=args.size,
implementation=args.implementation
)
# Otherwise return self for normal mode
return self
def data_generator(self, name: str = "default"):
"""Decorator to register data generator"""
def decorator(func):
self.data_generators[name] = func
return func
return decorator
def implementation(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register implementation"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.implementations:
self.implementations[op] = {}
self.implementations[op][name] = func
return func
return decorator
def validator(self, operation: str = "default"):
"""Decorator to register custom validator"""
def decorator(func):
self.validators[operation] = func
return func
return decorator
def setup(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register setup function that runs before timing"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.setups:
self.setups[op] = {}
self.setups[op][name] = func
return func
return decorator
def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
"""Measure execution time with warmup and optional setup"""
# Warmup runs
for _ in range(self.config.warmup):
try:
if setup_func:
setup_data = setup_func(data)
func(setup_data)
else:
func(data)
except Exception:
# If warmup fails, let the main measurement handle the error
break
# Actual measurement
start = time.perf_counter()
for _ in range(self.config.iterations):
if setup_func:
setup_data = setup_func(data)
result = func(setup_data)
else:
result = func(data)
elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
return result, elapsed_ms
def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
"""Validate result using custom validator or default comparison"""
if operation in self.validators:
return self.validators[operation](expected, actual)
return expected == actual
def run(self):
"""Run all benchmarks"""
if self.config.profile_mode:
self._run_profile_mode()
else:
self._run_normal_mode()
def _run_normal_mode(self):
"""Run normal benchmark mode"""
print(f"Running {self.config.name}")
print(f"Sizes: {self.config.sizes}")
print(f"Operations: {self.config.operations}")
print("="*80)
# Always show progressive results: operation by operation across all sizes
for operation in self.config.operations:
for size in self.config.sizes:
self._run_single(operation, size)
# Save and plot results
if self.config.save_results:
self._save_results()
if self.config.plot_results:
self._plot_results()
# Print summary
self._print_summary()
def _run_profile_mode(self):
"""Run profiling mode with minimal overhead for use with vmprof"""
operation = self.config.profile_operation or self.config.operations[0]
size = self.config.profile_size or max(self.config.sizes)
impl_name = self.config.profile_implementation
print(f"PROFILING MODE: {self.config.name}")
print(f"Operation: {operation}, Size: {size}")
if impl_name:
print(f"Implementation: {impl_name}")
print("="*80)
print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
print("="*80)
# Generate test data
generator = self.data_generators.get(operation, self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations
impls = self.implementations.get(operation, {})
if not impls:
raise ValueError(f"No implementations for operation: {operation}")
# Filter to specific implementation if requested
if impl_name:
if impl_name not in impls:
raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
impls = {impl_name: impls[impl_name]}
# Run with minimal overhead - no timing, no validation
for name, func in impls.items():
print(f"\nRunning {name}...")
sys.stdout.flush()
# Setup if needed
setup_func = self.setups.get(operation, {}).get(name)
if setup_func:
data = setup_func(test_data)
else:
data = test_data
# Run the actual function (this is what vmprof will profile)
result = func(data)
print(f"Completed {name}, result checksum: {result}")
sys.stdout.flush()
def _run_single(self, operation: str, size: int):
"""Run a single operation/size combination"""
print(f"\nOperation: {operation}, Size: {size}")
print("-" * 50)
sys.stdout.flush()
# Generate test data
generator = self.data_generators.get(operation,
self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations for this operation
impls = self.implementations.get(operation, {})
if not impls:
print(f"No implementations for operation: {operation}")
return
# Get setup functions for this operation
setups = self.setups.get(operation, {})
# Run reference implementation first
ref_name, ref_impl = next(iter(impls.items()))
ref_setup = setups.get(ref_name)
expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
# Run all implementations
for impl_name, impl_func in impls.items():
try:
setup_func = setups.get(impl_name)
result, time_ms = self.measure_time(impl_func, test_data, setup_func)
correct = self.validate_result(expected_result, result, operation)
# Store result
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': time_ms,
'correct': correct,
'error': None
})
status = "OK" if correct else "FAIL"
print(f" {impl_name:<20} {time_ms:>8.3f} ms {status}")
sys.stdout.flush()
except Exception as e:
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': float('inf'),
'correct': False,
'error': str(e)
})
print(f" {impl_name:<20} ERROR: {str(e)[:40]}")
sys.stdout.flush()
def _save_results(self):
"""Save results to JSON"""
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to {filename}")
def _plot_results(self):
"""Generate plots using matplotlib if available"""
try:
import matplotlib.pyplot as plt
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group and prepare data for plotting
data_by_op = self._group_results_by_operation()
# Create plots for each operation
for operation, operation_data in data_by_op.items():
self._create_performance_plot(plt, operation, operation_data, output_dir)
except ImportError:
print("Matplotlib not available - skipping plots")
except Exception as e:
print(f"Plotting failed: {e}")
def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
"""Group results by operation and size for plotting"""
data_by_op = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['time_ms'] != float('inf') and r['correct']:
data_by_op[r['operation']][r['size']].append({
'implementation': r['implementation'],
'time_ms': r['time_ms']
})
return data_by_op
def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
"""Create a performance plot for a single operation"""
sizes = sorted(operation_data.keys())
implementations = set()
for size_data in operation_data.values():
for entry in size_data:
implementations.add(entry['implementation'])
implementations = sorted(implementations)
plt.figure(figsize=(10, 6))
for impl in implementations:
impl_times = []
impl_sizes = []
for size in sizes:
times = [entry['time_ms'] for entry in operation_data[size]
if entry['implementation'] == impl]
if times:
impl_times.append(statistics.mean(times))
impl_sizes.append(size)
if impl_times:
plt.plot(impl_sizes, impl_times, 'o-', label=impl)
plt.xlabel('Input Size')
plt.ylabel('Time (ms)')
plt.title(f'{self.config.name} - {operation} Operation')
plt.legend()
plt.grid(True, alpha=0.3)
# Apply the configured scaling
if self.config.plot_scale == "loglog":
plt.loglog()
elif self.config.plot_scale == "linear":
pass # Default linear scale
elif self.config.plot_scale == "semilogx":
plt.semilogx()
elif self.config.plot_scale == "semilogy":
plt.semilogy()
else:
# Default to loglog if invalid option
plt.loglog()
plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Plot saved: {plot_file}")
def _print_summary(self):
"""Print performance summary"""
print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
# Group by operation
by_operation = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['error'] is None and r['time_ms'] != float('inf'):
by_operation[r['operation']][r['implementation']].append(r['time_ms'])
print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
print("-" * 70)
for op, impl_times in sorted(by_operation.items()):
# Calculate averages
avg_times = [(impl, statistics.mean(times))
for impl, times in impl_times.items()]
avg_times.sort(key=lambda x: x[1])
if avg_times:
best_impl, best_time = avg_times[0]
worst_time = avg_times[-1][1]
speedup = worst_time / best_time if best_time > 0 else 0
print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")
from typing import Generic
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')
def list_find(lst: list, value, start = 0, stop = sys.maxsize):
try:
return lst.index(value, start, stop)
except:
return -1
class view(Generic[_T]):
__slots__ = 'A', 'l', 'r'
def __init__(V, A: list[_T], l: int, r: int): V.A, V.l, V.r = A, l, r
def __len__(V): return V.r - V.l
def __getitem__(V, i: int):
if 0 <= i < V.r - V.l: return V.A[V.l+i]
else: raise IndexError
def __setitem__(V, i: int, v: _T): V.A[V.l+i] = v
def __contains__(V, v: _T): return list_find(V.A, v, V.l, V.r) != -1
def set_range(V, l: int, r: int): V.l, V.r = l, r
def index(V, v: _T): return V.A.index(v, V.l, V.r) - V.l
def reverse(V):
l, r = V.l, V.r-1
while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; l += 1; r -= 1
def sort(V, /, *args, **kwargs):
A = V.A[V.l:V.r]; A.sort(*args, **kwargs)
for i,a in enumerate(A,V.l): V.A[i] = a
def pop(V): V.r -= 1; return V.A[V.r]
def append(V, v: _T): V.A[V.r] = v; V.r += 1
def popleft(V): V.l += 1; return V.A[V.l-1]
def appendleft(V, v: _T): V.l -= 1; V.A[V.l] = v;
def validate(V): return 0 <= V.l <= V.r <= len(V.A)
# Configure benchmark
config = BenchmarkConfig(
name="list_view",
sizes=[1000000, 100000, 10000, 1000, 100], # Reverse order to warm up JIT
operations=['construction', 'sum', 'modify', 'index', 'reverse', 'sort', 'nested_sum', 'pop', 'append'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/list_view"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_view_data(size: int, operation: str):
"""Generate test data for slice/view operations"""
# Generate random list
data = [random.randint(0, 1000000) for _ in range(size)]
# Generate random slice boundaries (10-50% of list)
slice_size = random.randint(size // 10, size // 2)
start = random.randint(0, size - slice_size)
end = start + slice_size
return {
'data': data,
'start': start,
'end': end,
'slice_size': slice_size,
'search_value': random.randint(0, 1000000),
'size': size,
'operation': operation
}
# Construction operation
@benchmark.implementation("slice", "construction")
def construction_slice(data):
"""Create a slice copy of the list"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice (copies data)
slice_copy = lst[start:end]
# Return checksum to ensure it's not optimized away
checksum = 0
for x in slice_copy:
checksum ^= x
return checksum
@benchmark.implementation("view", "construction")
def construction_view(data):
"""Create a view of the list"""
lst = data['data']
start, end = data['start'], data['end']
# Create view (no copy)
list_view = view(lst, start, end)
# Return checksum to ensure it's not optimized away
checksum = 0
for i in range(len(list_view)):
checksum ^= list_view[i]
return checksum
# Sum operation
@benchmark.implementation("slice", "sum")
def sum_slice(data):
"""Sum elements in a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and sum
slice_copy = lst[start:end]
return sum(slice_copy) & 0xFFFFFFFF # Keep as 32-bit for checksum
@benchmark.implementation("view", "sum")
def sum_view(data):
"""Sum elements in a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and sum through it
list_view = view(lst, start, end)
total = 0
for i in range(len(list_view)):
total += list_view[i]
return total & 0xFFFFFFFF # Keep as 32-bit for checksum
@benchmark.implementation("view_direct", "sum")
def sum_view_direct(data):
"""Sum elements using direct indexing"""
lst = data['data']
start, end = data['start'], data['end']
# Sum directly without creating slice or view
total = 0
for i in range(start, end):
total += lst[i]
return total & 0xFFFFFFFF # Keep as 32-bit for checksum
# Setup functions for operations that need copying
@benchmark.setup("slice", ["modify", "reverse", "sort", "pop", "append"])
def setup_slice_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
@benchmark.setup("view_direct", ["modify", "reverse", "sort", "pop", "append"])
def setup_view_direct_modify(data):
"""Setup function that copies data before modification"""
new_data = data.copy()
new_data['data'] = data['data'].copy()
return new_data
# Modify operation
@benchmark.implementation("slice", "modify")
def modify_slice(data):
"""Modify elements in a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and modify
slice_copy = lst[start:end]
for i in range(len(slice_copy)):
slice_copy[i] = (slice_copy[i] * 2) & 0xFFFFFFFF
# Copy back to original positions manually
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "modify")
def modify_view(data):
"""Modify elements through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and modify through it
list_view = view(lst, start, end)
for i in range(len(list_view)):
list_view[i] = (list_view[i] * 2) & 0xFFFFFFFF
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view_direct", "modify")
def modify_view_direct(data):
"""Modify elements using direct indexing"""
lst = data['data']
start, end = data['start'], data['end']
# Modify directly
for i in range(start, end):
lst[i] = (lst[i] * 2) & 0xFFFFFFFF
# Return checksum
checksum = 0
for i in range(start, end):
checksum ^= lst[i]
return checksum
# Index operation
@benchmark.implementation("slice", "index")
def index_slice(data):
"""Find index of element in a slice"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Create slice and search
slice_copy = lst[start:end]
try:
idx = slice_copy.index(search_value)
return start + idx # Return global index
except ValueError:
return -1
@benchmark.implementation("view", "index")
def index_view(data):
"""Find index of element in a view"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Create view and search in it
list_view = view(lst, start, end)
try:
idx = list_view.index(search_value)
return start + idx # Return global index
except ValueError:
return -1
@benchmark.implementation("view_direct", "index")
def index_view_direct(data):
"""Find index of element using direct search"""
lst = data['data']
start, end = data['start'], data['end']
search_value = data['search_value']
# Search directly
for i in range(start, end):
if lst[i] == search_value:
return i
return -1
# Reverse operation
@benchmark.implementation("slice", "reverse")
def reverse_slice(data):
"""Reverse a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice, reverse, and copy back manually
slice_copy = lst[start:end]
slice_copy.reverse()
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "reverse")
def reverse_view(data):
"""Reverse through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and reverse through it
list_view = view(lst, start, end)
list_view.reverse()
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
@benchmark.implementation("view_direct", "reverse")
def reverse_view_direct(data):
"""Reverse using direct manipulation"""
lst = data['data']
start, end = data['start'], data['end']
# Reverse directly
left, right = start, end - 1
while left < right:
lst[left], lst[right] = lst[right], lst[left]
left += 1
right -= 1
# Return checksum
checksum = 0
for i in range(start, min(start + 100, end)): # First 100 elements
checksum ^= lst[i]
return checksum
# Sort operation
@benchmark.implementation("slice", "sort")
def sort_slice(data):
"""Sort a slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice, sort, and copy back manually
slice_copy = lst[start:end]
slice_copy.sort()
for i, val in enumerate(slice_copy):
lst[start + i] = val
# Return checksum of first elements
checksum = 0
for i in range(start, min(start + 100, end)):
checksum ^= lst[i]
return checksum
@benchmark.implementation("view", "sort")
def sort_view(data):
"""Sort through a view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and sort through it
list_view = view(lst, start, end)
list_view.sort()
# Return checksum of first elements
checksum = 0
for i in range(start, min(start + 100, end)):
checksum ^= lst[i]
return checksum
# Nested sum operation (multiple slices)
@benchmark.implementation("slice", "nested_sum")
def nested_sum_slice(data):
"""Sum multiple overlapping slices"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Create 4 overlapping slices
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
slice_copy = lst[s:e]
total += sum(slice_copy)
return total & 0xFFFFFFFF
@benchmark.implementation("view", "nested_sum")
def nested_sum_view(data):
"""Sum multiple overlapping views"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Create 4 overlapping views
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
list_view = view(lst, s, e)
for i in range(len(list_view)):
total += list_view[i]
return total & 0xFFFFFFFF
@benchmark.implementation("view_direct", "nested_sum")
def nested_sum_view_direct(data):
"""Sum multiple overlapping ranges directly"""
lst = data['data']
start, end = data['start'], data['end']
slice_size = (end - start) // 4
total = 0
# Sum 4 overlapping ranges
for offset in range(4):
s = start + offset * slice_size // 2
e = min(s + slice_size, end)
for i in range(s, e):
total += lst[i]
return total & 0xFFFFFFFF
# Pop operation
@benchmark.implementation("slice", "pop")
def pop_slice(data):
"""Pop from end of slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and pop from it
slice_copy = lst[start:end]
checksum = 0
for _ in range(min(100, len(slice_copy))): # Pop up to 100 elements
if slice_copy:
val = slice_copy.pop()
checksum ^= val
# Copy back to original (shortened)
for i, val in enumerate(slice_copy):
lst[start + i] = val
return checksum
@benchmark.implementation("view", "pop")
def pop_view(data):
"""Pop from end of view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and pop from it
list_view = view(lst, start, end)
checksum = 0
for _ in range(min(100, len(list_view))): # Pop up to 100 elements
if len(list_view) > 0:
val = list_view.pop()
checksum ^= val
return checksum
@benchmark.implementation("view_direct", "pop")
def pop_view_direct(data):
"""Pop using direct list manipulation"""
lst = data['data']
start, end = data['start'], data['end']
checksum = 0
current_end = end
for _ in range(min(100, end - start)): # Pop up to 100 elements
if current_end > start:
current_end -= 1
val = lst[current_end]
checksum ^= val
return checksum
# Append operation
@benchmark.implementation("slice", "append")
def append_slice(data):
"""Append to slice"""
lst = data['data']
start, end = data['start'], data['end']
# Create slice and append to it
slice_copy = lst[start:end]
checksum = 0
for i in range(100): # Append 100 elements
val = (i * 17) & 0xFFFFFFFF
slice_copy.append(val)
checksum ^= val
# Note: Can't copy back easily since slice grew, so just return checksum
return checksum
@benchmark.implementation("view", "append")
def append_view(data):
"""Append to view"""
lst = data['data']
start, end = data['start'], data['end']
# Create view and append to it
list_view = view(lst, start, end)
checksum = 0
for i in range(min(100, len(lst) - end)): # Append up to 100 elements or space available
val = (i * 17) & 0xFFFFFFFF
list_view.append(val)
checksum ^= val
return checksum
@benchmark.implementation("view_direct", "append")
def append_view_direct(data):
"""Append using direct list manipulation"""
lst = data['data']
start, end = data['start'], data['end']
checksum = 0
current_end = end
for i in range(min(100, len(lst) - end)): # Append up to 100 elements or space available
val = (i * 17) & 0xFFFFFFFF
if current_end < len(lst):
lst[current_end] = val
current_end += 1
checksum ^= val
return checksum
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()