This documentation is automatically generated by online-judge-tools/verification-helper
#!/usr/bin/env python3
"""
Benchmark comparing CSR2 vs direct arrays for dual-array sparse data.
CSR2 provides view2 objects for efficient row access patterns.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.ds.view.csr2_cls import CSR2
from cp_library.ds.view.view2_cls import view2
# Configure benchmark
config = BenchmarkConfig(
name="csr2",
sizes=[10000000, 1000000, 100000, 10000, 1000, 100], # Reverse order to warm up JIT
operations=['copy_construction', 'direct_access', 'random_access', 'indexed_iter', 'foreach_iter', 'modification', 'bucketize', 'col1_indexed_iter', 'col1_foreach_iter'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/csr2"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_csr2_data(size: int, operation: str):
"""Generate test data for CSR2 operations"""
# Create rows with variable sizes
num_rows = max(1, size // 100) # Average 100 elements per row
row_sizes = []
total = 0
while total < size:
row_size = random.randint(50, 150)
if total + row_size > size:
row_size = size - total
row_sizes.append(row_size)
total += row_size
# Generate offset array
offsets = [0]
for row_size in row_sizes:
offsets.append(offsets[-1] + row_size)
# Generate data arrays
array_a = [random.randint(0, 1000000) for _ in range(size)]
array_b = [random.randint(0, 1000000) for _ in range(size)]
# Create list of lists structure
list_structure = []
for i in range(len(row_sizes)):
start = offsets[i]
end = offsets[i + 1]
row = [(array_a[j], array_b[j]) for j in range(start, end)]
list_structure.append(row)
# For bucketize operation
actual_num_rows = len(row_sizes)
keys = [random.randint(0, max(0, actual_num_rows - 1)) for _ in range(size)]
values_v = [random.randint(0, 1000000) for _ in range(size)]
values_w = [random.randint(0, 1000000) for _ in range(size)]
return {
'array_a': array_a,
'array_b': array_b,
'offsets': offsets,
'num_rows': len(row_sizes),
'list_structure': list_structure,
'keys': keys,
'values_v': values_v,
'values_w': values_w,
'size': size
}
# Specialized data generator for single-element rows
@benchmark.data_generator("col1_indexed_iter")
def generate_col1_indexed_data(size: int, operation: str):
"""Generate test data where every row has exactly 1 column - tests indexed iteration"""
return _generate_col1_data(size, operation)
@benchmark.data_generator("col1_foreach_iter")
def generate_col1_foreach_data(size: int, operation: str):
"""Generate test data where every row has exactly 1 column - tests foreach iteration"""
return _generate_col1_data(size, operation)
def _generate_col1_data(size: int, operation: str):
"""Helper to generate single-element row data"""
# Each row has exactly 1 element
num_rows = size
# Generate offset array for single-element rows
offsets = list(range(size + 1)) # [0, 1, 2, 3, ..., size]
# Generate data arrays
array_a = [random.randint(0, 1000000) for _ in range(size)]
array_b = [random.randint(0, 1000000) for _ in range(size)]
# Create list of lists structure (each row has 1 element)
list_structure = [[(array_a[i], array_b[i])] for i in range(size)]
# For bucketize operation - distribute across fewer buckets to avoid empty ones
bucket_count = max(1, size // 10) # 10 elements per bucket on average
keys = [random.randint(0, bucket_count - 1) for _ in range(size)]
values_v = [random.randint(0, 1000000) for _ in range(size)]
values_w = [random.randint(0, 1000000) for _ in range(size)]
return {
'array_a': array_a,
'array_b': array_b,
'offsets': offsets,
'num_rows': num_rows,
'list_structure': list_structure,
'keys': keys,
'values_v': values_v,
'values_w': values_w,
'size': size,
'bucket_count': bucket_count
}
# Copy construction operation
@benchmark.implementation("csr2", "copy_construction")
def copy_construction_csr2(data):
"""Construct CSR2 from copied arrays for fair comparison"""
array_a_copy = data['array_a'].copy()
array_b_copy = data['array_b'].copy()
offsets_copy = data['offsets'].copy()
csr = CSR2(array_a_copy, array_b_copy, offsets_copy)
return len(csr)
@benchmark.implementation("direct_arrays", "copy_construction")
def copy_construction_direct(data):
"""Copy arrays for fair comparison with CSR2"""
array_a_copy = data['array_a'].copy()
array_b_copy = data['array_b'].copy()
offsets_copy = data['offsets'].copy()
return len(offsets_copy) - 1 # Return number of rows
@benchmark.implementation("list_of_lists", "copy_construction")
def copy_construction_list_of_lists(data):
"""Copy pre-initialized list of lists"""
list_structure = [row.copy() for row in data['list_structure']]
return len(list_structure)
# Direct access operation
@benchmark.implementation("csr2", "direct_access")
def direct_access_csr2(data):
"""Direct access through CSR2 views"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for i in range(len(csr)):
view = csr[i]
for j in range(len(view)):
a_val = view.A[view.l + j]
b_val = view.B[view.l + j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "direct_access")
def direct_access_direct(data):
"""Direct access using direct arrays"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "direct_access")
def direct_access_list_of_lists(data):
"""Direct access through list of lists"""
list_structure = data['list_structure']
checksum = 0
for row in list_structure:
for a_val, b_val in row:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Random access operation
@benchmark.implementation("csr2", "random_access")
def random_access_csr2(data):
"""Random access using csr(i,j)"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
row_size = data['offsets'][i + 1] - data['offsets'][i]
if row_size > 0:
j = rng.randint(0, row_size - 1)
a_val, b_val = csr(i, j)
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "random_access")
def random_access_direct(data):
"""Random access using direct indexing"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
start = offsets[i]
end = offsets[i + 1]
if end > start:
j = rng.randint(0, end - start - 1)
checksum ^= (array_a[start + j] + array_b[start + j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "random_access")
def random_access_list_of_lists(data):
"""Random access through list of lists"""
list_structure = data['list_structure']
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
if len(list_structure[i]) > 0:
j = rng.randint(0, len(list_structure[i]) - 1)
a_val, b_val = list_structure[i][j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Setup for modify operations
@benchmark.setup("csr2", ["modification"])
def setup_csr2_modify(data):
"""Copy data before modification"""
new_data = data.copy()
new_data['array_a'] = data['array_a'].copy()
new_data['array_b'] = data['array_b'].copy()
return new_data
@benchmark.setup("direct_arrays", ["modification"])
def setup_direct_modify(data):
"""Copy data before modification"""
new_data = data.copy()
new_data['array_a'] = data['array_a'].copy()
new_data['array_b'] = data['array_b'].copy()
return new_data
@benchmark.setup("list_of_lists", ["modification"])
def setup_list_of_lists_modify(data):
"""Copy list structure before modification"""
new_data = data.copy()
new_data['list_structure'] = [row.copy() for row in data['list_structure']]
return new_data
# Modification operation
@benchmark.implementation("csr2", "modification")
def modification_csr2(data):
"""Modify elements using csr.set(i,j,val)"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
row_size = data['offsets'][i + 1] - data['offsets'][i]
if row_size > 0:
j = rng.randint(0, row_size - 1)
new_val = (rng.randint(0, 1000000), rng.randint(0, 1000000))
csr.set(i, j, new_val)
checksum ^= (new_val[0] + new_val[1])
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "modification")
def modification_direct(data):
"""Modify elements using direct array access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
start = offsets[i]
end = offsets[i + 1]
if end > start:
j = rng.randint(0, end - start - 1)
new_val_a = rng.randint(0, 1000000)
new_val_b = rng.randint(0, 1000000)
array_a[start + j] = new_val_a
array_b[start + j] = new_val_b
checksum ^= (new_val_a + new_val_b)
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "modification")
def modification_list_of_lists(data):
"""Modify elements in list of lists"""
list_structure = data['list_structure']
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
if len(list_structure[i]) > 0:
j = rng.randint(0, len(list_structure[i]) - 1)
new_val = (rng.randint(0, 1000000), rng.randint(0, 1000000))
list_structure[i][j] = new_val
checksum ^= (new_val[0] + new_val[1])
return checksum & 0xFFFFFFFF
# Indexed iteration using [i] access
@benchmark.implementation("csr2", "indexed_iter")
def indexed_iter_csr2(data):
"""Iterate using csr[i] access"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for i in range(len(csr)):
row_view = csr[i]
for j in range(len(row_view)):
a_val, b_val = row_view[j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "indexed_iter")
def indexed_iter_direct(data):
"""Iterate using direct array access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "indexed_iter")
def indexed_iter_list_of_lists(data):
"""Iterate using list[i] access"""
list_structure = data['list_structure']
checksum = 0
for i in range(len(list_structure)):
row = list_structure[i]
for j in range(len(row)):
a_val, b_val = row[j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Foreach iteration using for-in pattern
@benchmark.implementation("csr2", "foreach_iter")
def foreach_iter_csr2(data):
"""Iterate using for row in csr"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for row_view in csr:
for a_val, b_val in row_view:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "foreach_iter")
def foreach_iter_direct(data):
"""Iterate using direct arrays with manual chunking"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "foreach_iter")
def foreach_iter_list_of_lists(data):
"""Iterate using for row in list"""
list_structure = data['list_structure']
checksum = 0
for row in list_structure:
for a_val, b_val in row:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Bucketize operation
@benchmark.implementation("csr2", "bucketize")
def bucketize_csr2(data):
"""Use CSR2.bucketize method"""
csr = CSR2.bucketize(data['num_rows'], data['keys'], data['values_v'], data['values_w'])
checksum = 0
for i in range(len(csr)):
view = csr[i]
for j in range(len(view)):
checksum ^= (view.A[view.l + j] + view.B[view.l + j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("manual_bucketize", "bucketize")
def bucketize_manual(data):
"""Manual bucketization into lists"""
keys = data['keys']
values_v = data['values_v']
values_w = data['values_w']
num_rows = data['num_rows']
buckets_a = [[] for _ in range(num_rows)]
buckets_b = [[] for _ in range(num_rows)]
for i in range(len(keys)):
k = keys[i]
if 0 <= k < num_rows:
buckets_a[k].append(values_v[i])
buckets_b[k].append(values_w[i])
checksum = 0
for i in range(num_rows):
for j in range(len(buckets_a[i])):
checksum ^= (buckets_a[i][j] + buckets_b[i][j])
return checksum & 0xFFFFFFFF
# Column-1 iteration - indexed access
@benchmark.implementation("csr2", "col1_indexed_iter")
def col1_indexed_csr2(data):
"""Iterate through CSR2 where every row has exactly 1 element using indexed access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
csr = CSR2(array_a, array_b, offsets)
checksum = 0
# Iterate through many single-element rows using indexing
for i in range(len(csr)):
view = csr[i] # Each view has exactly 1 element
checksum ^= (view.A[view.l] + view.B[view.l]) # Direct access to single element
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "col1_indexed_iter")
def col1_indexed_lists(data):
"""Iterate through list of single-element lists using indexed access"""
list_structure = data['list_structure']
checksum = 0
# Iterate through many single-element lists using indexing
for i in range(len(list_structure)):
a_val, b_val = list_structure[i][0] # Each row has exactly 1 element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "col1_indexed_iter")
def col1_indexed_direct(data):
"""Direct indexed access through arrays (baseline for single elements)"""
array_a = data['array_a']
array_b = data['array_b']
checksum = 0
# Direct indexed iteration - each element is its own "row"
for i in range(len(array_a)):
checksum ^= (array_a[i] + array_b[i])
return checksum & 0xFFFFFFFF
@benchmark.implementation("csr2", "col1_foreach_iter")
def col1_foreach_csr2(data):
"""Iterate through CSR2 using foreach loop where every row has exactly 1 element"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
csr = CSR2(array_a, array_b, offsets)
checksum = 0
# Use foreach iteration pattern
for view in csr: # Each view has exactly 1 element
a_val, b_val = view[0] # Access the single element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "col1_foreach_iter")
def col1_foreach_lists(data):
"""Iterate through list of single-element lists using foreach"""
list_structure = data['list_structure']
checksum = 0
# Use foreach iteration pattern
for row in list_structure:
a_val, b_val = row[0] # Each row has exactly 1 element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "col1_foreach_iter")
def col1_foreach_direct(data):
"""Direct foreach iteration through arrays (baseline for single elements)"""
array_a = data['array_a']
array_b = data['array_b']
checksum = 0
# Direct foreach iteration - each element is its own "row"
for i in range(len(array_a)):
checksum ^= (array_a[i] + array_b[i])
return checksum & 0xFFFFFFFF
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()
#!/usr/bin/env python3
"""
Benchmark comparing CSR2 vs direct arrays for dual-array sparse data.
CSR2 provides view2 objects for efficient row access patterns.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
"""
Declarative benchmark framework with minimal boilerplate.
Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""
import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs"""
name: str
sizes: List[int] = None
operations: List[str] = None
iterations: int = 10
warmup: int = 2
output_dir: str = "./output/benchmark_results"
save_results: bool = True
plot_results: bool = True
plot_scale: str = "loglog" # Options: "loglog", "linear", "semilogx", "semilogy"
progressive: bool = True # Show results operation by operation across sizes
# Profiling mode
profile_mode: bool = False
profile_size: int = None
profile_operation: str = None
profile_implementation: str = None
def __post_init__(self):
if self.sizes is None:
self.sizes = [100, 1000, 10000, 100000]
if self.operations is None:
self.operations = ['default']
class Benchmark:
"""Declarative benchmark framework using decorators"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.data_generators = {}
self.implementations = {}
self.validators = {}
self.setups = {}
self.results = []
def profile(self, operation: str = None, size: int = None, implementation: str = None):
"""Create a profiling version of this benchmark"""
profile_config = BenchmarkConfig(
name=f"{self.config.name}_profile",
sizes=self.config.sizes,
operations=self.config.operations,
profile_mode=True,
profile_operation=operation,
profile_size=size,
profile_implementation=implementation,
save_results=False,
plot_results=False
)
profile_benchmark = Benchmark(profile_config)
profile_benchmark.data_generators = self.data_generators
profile_benchmark.implementations = self.implementations
profile_benchmark.validators = self.validators
profile_benchmark.setups = self.setups
return profile_benchmark
def parse_args(self):
"""Parse command line arguments for profiling mode"""
parser = argparse.ArgumentParser(
description=f"Benchmark {self.config.name} with optional profiling mode",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Normal benchmark mode
python benchmark.py
# Profile specific operation and implementation
python benchmark.py --profile --operation random_access --implementation grid
# Profile with specific size
python benchmark.py --profile --size 1000000
# Profile all implementations of an operation
python benchmark.py --profile --operation construction
"""
)
parser.add_argument('--profile', action='store_true',
help='Run in profiling mode (minimal overhead for profilers)')
parser.add_argument('--operation', type=str,
help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
parser.add_argument('--size', type=int,
help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
parser.add_argument('--implementation', type=str,
help='Specific implementation to profile (default: all)')
args = parser.parse_args()
# If profile mode requested, return a profiling benchmark
if args.profile:
return self.profile(
operation=args.operation,
size=args.size,
implementation=args.implementation
)
# Otherwise return self for normal mode
return self
def data_generator(self, name: str = "default"):
"""Decorator to register data generator"""
def decorator(func):
self.data_generators[name] = func
return func
return decorator
def implementation(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register implementation"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.implementations:
self.implementations[op] = {}
self.implementations[op][name] = func
return func
return decorator
def validator(self, operation: str = "default"):
"""Decorator to register custom validator"""
def decorator(func):
self.validators[operation] = func
return func
return decorator
def setup(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register setup function that runs before timing"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.setups:
self.setups[op] = {}
self.setups[op][name] = func
return func
return decorator
def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
"""Measure execution time with warmup and optional setup"""
# Warmup runs
for _ in range(self.config.warmup):
try:
if setup_func:
setup_data = setup_func(data)
func(setup_data)
else:
func(data)
except Exception:
# If warmup fails, let the main measurement handle the error
break
# Actual measurement
start = time.perf_counter()
for _ in range(self.config.iterations):
if setup_func:
setup_data = setup_func(data)
result = func(setup_data)
else:
result = func(data)
elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
return result, elapsed_ms
def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
"""Validate result using custom validator or default comparison"""
if operation in self.validators:
return self.validators[operation](expected, actual)
return expected == actual
def run(self):
"""Run all benchmarks"""
if self.config.profile_mode:
self._run_profile_mode()
else:
self._run_normal_mode()
def _run_normal_mode(self):
"""Run normal benchmark mode"""
print(f"Running {self.config.name}")
print(f"Sizes: {self.config.sizes}")
print(f"Operations: {self.config.operations}")
print("="*80)
# Always show progressive results: operation by operation across all sizes
for operation in self.config.operations:
for size in self.config.sizes:
self._run_single(operation, size)
# Save and plot results
if self.config.save_results:
self._save_results()
if self.config.plot_results:
self._plot_results()
# Print summary
self._print_summary()
def _run_profile_mode(self):
"""Run profiling mode with minimal overhead for use with vmprof"""
operation = self.config.profile_operation or self.config.operations[0]
size = self.config.profile_size or max(self.config.sizes)
impl_name = self.config.profile_implementation
print(f"PROFILING MODE: {self.config.name}")
print(f"Operation: {operation}, Size: {size}")
if impl_name:
print(f"Implementation: {impl_name}")
print("="*80)
print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
print("="*80)
# Generate test data
generator = self.data_generators.get(operation, self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations
impls = self.implementations.get(operation, {})
if not impls:
raise ValueError(f"No implementations for operation: {operation}")
# Filter to specific implementation if requested
if impl_name:
if impl_name not in impls:
raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
impls = {impl_name: impls[impl_name]}
# Run with minimal overhead - no timing, no validation
for name, func in impls.items():
print(f"\nRunning {name}...")
sys.stdout.flush()
# Setup if needed
setup_func = self.setups.get(operation, {}).get(name)
if setup_func:
data = setup_func(test_data)
else:
data = test_data
# Run the actual function (this is what vmprof will profile)
result = func(data)
print(f"Completed {name}, result checksum: {result}")
sys.stdout.flush()
def _run_single(self, operation: str, size: int):
"""Run a single operation/size combination"""
print(f"\nOperation: {operation}, Size: {size}")
print("-" * 50)
sys.stdout.flush()
# Generate test data
generator = self.data_generators.get(operation,
self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations for this operation
impls = self.implementations.get(operation, {})
if not impls:
print(f"No implementations for operation: {operation}")
return
# Get setup functions for this operation
setups = self.setups.get(operation, {})
# Run reference implementation first
ref_name, ref_impl = next(iter(impls.items()))
ref_setup = setups.get(ref_name)
expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
# Run all implementations
for impl_name, impl_func in impls.items():
try:
setup_func = setups.get(impl_name)
result, time_ms = self.measure_time(impl_func, test_data, setup_func)
correct = self.validate_result(expected_result, result, operation)
# Store result
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': time_ms,
'correct': correct,
'error': None
})
status = "OK" if correct else "FAIL"
print(f" {impl_name:<20} {time_ms:>8.3f} ms {status}")
sys.stdout.flush()
except Exception as e:
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': float('inf'),
'correct': False,
'error': str(e)
})
print(f" {impl_name:<20} ERROR: {str(e)[:40]}")
sys.stdout.flush()
def _save_results(self):
"""Save results to JSON"""
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to {filename}")
def _plot_results(self):
"""Generate plots using matplotlib if available"""
try:
import matplotlib.pyplot as plt
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group and prepare data for plotting
data_by_op = self._group_results_by_operation()
# Create plots for each operation
for operation, operation_data in data_by_op.items():
self._create_performance_plot(plt, operation, operation_data, output_dir)
except ImportError:
print("Matplotlib not available - skipping plots")
except Exception as e:
print(f"Plotting failed: {e}")
def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
"""Group results by operation and size for plotting"""
data_by_op = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['time_ms'] != float('inf') and r['correct']:
data_by_op[r['operation']][r['size']].append({
'implementation': r['implementation'],
'time_ms': r['time_ms']
})
return data_by_op
def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
"""Create a performance plot for a single operation"""
sizes = sorted(operation_data.keys())
implementations = set()
for size_data in operation_data.values():
for entry in size_data:
implementations.add(entry['implementation'])
implementations = sorted(implementations)
plt.figure(figsize=(10, 6))
for impl in implementations:
impl_times = []
impl_sizes = []
for size in sizes:
times = [entry['time_ms'] for entry in operation_data[size]
if entry['implementation'] == impl]
if times:
impl_times.append(statistics.mean(times))
impl_sizes.append(size)
if impl_times:
plt.plot(impl_sizes, impl_times, 'o-', label=impl)
plt.xlabel('Input Size')
plt.ylabel('Time (ms)')
plt.title(f'{self.config.name} - {operation} Operation')
plt.legend()
plt.grid(True, alpha=0.3)
# Apply the configured scaling
if self.config.plot_scale == "loglog":
plt.loglog()
elif self.config.plot_scale == "linear":
pass # Default linear scale
elif self.config.plot_scale == "semilogx":
plt.semilogx()
elif self.config.plot_scale == "semilogy":
plt.semilogy()
else:
# Default to loglog if invalid option
plt.loglog()
plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Plot saved: {plot_file}")
def _print_summary(self):
"""Print performance summary"""
print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
# Group by operation
by_operation = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['error'] is None and r['time_ms'] != float('inf'):
by_operation[r['operation']][r['implementation']].append(r['time_ms'])
print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
print("-" * 70)
for op, impl_times in sorted(by_operation.items()):
# Calculate averages
avg_times = [(impl, statistics.mean(times))
for impl, times in impl_times.items()]
avg_times.sort(key=lambda x: x[1])
if avg_times:
best_impl, best_time = avg_times[0]
worst_time = avg_times[-1][1]
speedup = worst_time / best_time if best_time > 0 else 0
print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
https://kobejean.github.io/cp-library
'''
from typing import Generic
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')
def argsort_ranged(A: list[int], l: int, r: int, reverse=False):
P = Packer(r-l-1); I = [A[l+i] for i in range(r-l)]; P.ienumerate(I, reverse); I.sort()
for i in range(r-l): I[i] = (I[i] & P.m) + l
return I
class Packer:
__slots__ = 's', 'm'
def __init__(P, mx: int): P.s = mx.bit_length(); P.m = (1 << P.s) - 1
def enc(P, a: int, b: int): return a << P.s | b
def dec(P, x: int) -> tuple[int, int]: return x >> P.s, x & P.m
def enumerate(P, A, reverse=False): P.ienumerate(A:=list(A), reverse); return A
def ienumerate(P, A, reverse=False):
if reverse:
for i,a in enumerate(A): A[i] = P.enc(-a, i)
else:
for i,a in enumerate(A): A[i] = P.enc(a, i)
def indices(P, A: list[int]): P.iindices(A:=list(A)); return A
def iindices(P, A):
for i,a in enumerate(A): A[i] = P.m&a
def isort_ranged(*L: list, l: int, r: int, reverse=False):
n = r - l
order = argsort_ranged(L[0], l, r, reverse=reverse)
inv = [0] * n
# order contains indices in range [l, r), need to map to [0, n)
for i in range(n): inv[order[i]-l] = i
for i in range(n):
j = order[i] - l # j is in range [0, n)
for A in L: A[l+i], A[l+j] = A[l+j], A[l+i]
order[inv[i]], order[inv[j]] = order[inv[j]], order[inv[i]]
inv[i], inv[j] = inv[j], inv[i]
return L
class view2(Generic[_S, _T]):
__slots__ = 'A', 'B', 'l', 'r'
def __init__(V, A: list[_S], B: list[_T], l: int, r: int): V.A, V.B, V.l, V.r = A, B, l, r
def __len__(V): return V.r - V.l
def __getitem__(V, i: int):
if 0 <= i < V.r - V.l: return V.A[V.l+i], V.B[V.l+i]
else: raise IndexError
def __setitem__(V, i: int, v: tuple[_S, _T]): V.A[V.l+i], V.B[V.l+i] = v
def __contains__(V, v: tuple[_S, _T]): raise NotImplemented
def set_range(V, l: int, r: int): V.l, V.r = l, r
def index(V, v: tuple[_S, _T]): raise NotImplemented
def reverse(V):
l, r = V.l, V.r-1
while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; V.B[l], V.B[r] = V.B[r], V.B[l]; l += 1; r -= 1
def sort(V, reverse=False): isort_ranged(V.A, V.B, l=V.l, r=V.r, reverse=reverse)
def pop(V): V.r -= 1; return V.A[V.r], V.B[V.r]
def append(V, v: tuple[_S, _T]): V.A[V.r], V.B[V.r] = v; V.r += 1
def popleft(V): V.l += 1; return V.A[V.l-1], V.B[V.l-1]
def appendleft(V, v: tuple[_S, _T]): V.l -= 1; V.A[V.l], V.B[V.l] = v;
def validate(V): return 0 <= V.l <= V.r <= len(V.A)
class CSR2(Generic[_T]):
__slots__ = 'A', 'B', 'O'
def __init__(csr, A: list[_S], B: list[_T], O: list[int]): csr.A, csr.B, csr.O = A, B, O
def __len__(csr): return len(csr.O)-1
def __getitem__(csr, i: int): return view2(csr.A, csr.B, csr.O[i], csr.O[i+1])
def __call__(csr, i: int, j: int): ij = csr.O[i]+j; return csr.A[ij], csr.B[ij]
def set(csr, i: int, j: int, v: _T): ij = csr.O[i]+j; csr.A[ij], csr.B[ij] = v
@classmethod
def bucketize(cls, N: int, K: list[int], V: list[_T], W: list[_T]):
A: list[_S] = [0]*len(K); B: list[_T] = [0]*len(K); O = [0]*(N+1)
for k in K: O[k] += 1
for i in range(N): O[i+1] += O[i]
for e in range(len(K)): k = K[~e]; O[k] -= 1; A[O[k]] = V[~e]; B[O[k]] = W[~e]
return cls(A, B, O)
# Configure benchmark
config = BenchmarkConfig(
name="csr2",
sizes=[10000000, 1000000, 100000, 10000, 1000, 100], # Reverse order to warm up JIT
operations=['copy_construction', 'direct_access', 'random_access', 'indexed_iter', 'foreach_iter', 'modification', 'bucketize', 'col1_indexed_iter', 'col1_foreach_iter'],
iterations=10,
warmup=3,
output_dir="./output/benchmark_results/csr2"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generator
@benchmark.data_generator("default")
def generate_csr2_data(size: int, operation: str):
"""Generate test data for CSR2 operations"""
# Create rows with variable sizes
num_rows = max(1, size // 100) # Average 100 elements per row
row_sizes = []
total = 0
while total < size:
row_size = random.randint(50, 150)
if total + row_size > size:
row_size = size - total
row_sizes.append(row_size)
total += row_size
# Generate offset array
offsets = [0]
for row_size in row_sizes:
offsets.append(offsets[-1] + row_size)
# Generate data arrays
array_a = [random.randint(0, 1000000) for _ in range(size)]
array_b = [random.randint(0, 1000000) for _ in range(size)]
# Create list of lists structure
list_structure = []
for i in range(len(row_sizes)):
start = offsets[i]
end = offsets[i + 1]
row = [(array_a[j], array_b[j]) for j in range(start, end)]
list_structure.append(row)
# For bucketize operation
actual_num_rows = len(row_sizes)
keys = [random.randint(0, max(0, actual_num_rows - 1)) for _ in range(size)]
values_v = [random.randint(0, 1000000) for _ in range(size)]
values_w = [random.randint(0, 1000000) for _ in range(size)]
return {
'array_a': array_a,
'array_b': array_b,
'offsets': offsets,
'num_rows': len(row_sizes),
'list_structure': list_structure,
'keys': keys,
'values_v': values_v,
'values_w': values_w,
'size': size
}
# Specialized data generator for single-element rows
@benchmark.data_generator("col1_indexed_iter")
def generate_col1_indexed_data(size: int, operation: str):
"""Generate test data where every row has exactly 1 column - tests indexed iteration"""
return _generate_col1_data(size, operation)
@benchmark.data_generator("col1_foreach_iter")
def generate_col1_foreach_data(size: int, operation: str):
"""Generate test data where every row has exactly 1 column - tests foreach iteration"""
return _generate_col1_data(size, operation)
def _generate_col1_data(size: int, operation: str):
"""Helper to generate single-element row data"""
# Each row has exactly 1 element
num_rows = size
# Generate offset array for single-element rows
offsets = list(range(size + 1)) # [0, 1, 2, 3, ..., size]
# Generate data arrays
array_a = [random.randint(0, 1000000) for _ in range(size)]
array_b = [random.randint(0, 1000000) for _ in range(size)]
# Create list of lists structure (each row has 1 element)
list_structure = [[(array_a[i], array_b[i])] for i in range(size)]
# For bucketize operation - distribute across fewer buckets to avoid empty ones
bucket_count = max(1, size // 10) # 10 elements per bucket on average
keys = [random.randint(0, bucket_count - 1) for _ in range(size)]
values_v = [random.randint(0, 1000000) for _ in range(size)]
values_w = [random.randint(0, 1000000) for _ in range(size)]
return {
'array_a': array_a,
'array_b': array_b,
'offsets': offsets,
'num_rows': num_rows,
'list_structure': list_structure,
'keys': keys,
'values_v': values_v,
'values_w': values_w,
'size': size,
'bucket_count': bucket_count
}
# Copy construction operation
@benchmark.implementation("csr2", "copy_construction")
def copy_construction_csr2(data):
"""Construct CSR2 from copied arrays for fair comparison"""
array_a_copy = data['array_a'].copy()
array_b_copy = data['array_b'].copy()
offsets_copy = data['offsets'].copy()
csr = CSR2(array_a_copy, array_b_copy, offsets_copy)
return len(csr)
@benchmark.implementation("direct_arrays", "copy_construction")
def copy_construction_direct(data):
"""Copy arrays for fair comparison with CSR2"""
array_a_copy = data['array_a'].copy()
array_b_copy = data['array_b'].copy()
offsets_copy = data['offsets'].copy()
return len(offsets_copy) - 1 # Return number of rows
@benchmark.implementation("list_of_lists", "copy_construction")
def copy_construction_list_of_lists(data):
"""Copy pre-initialized list of lists"""
list_structure = [row.copy() for row in data['list_structure']]
return len(list_structure)
# Direct access operation
@benchmark.implementation("csr2", "direct_access")
def direct_access_csr2(data):
"""Direct access through CSR2 views"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for i in range(len(csr)):
view = csr[i]
for j in range(len(view)):
a_val = view.A[view.l + j]
b_val = view.B[view.l + j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "direct_access")
def direct_access_direct(data):
"""Direct access using direct arrays"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "direct_access")
def direct_access_list_of_lists(data):
"""Direct access through list of lists"""
list_structure = data['list_structure']
checksum = 0
for row in list_structure:
for a_val, b_val in row:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Random access operation
@benchmark.implementation("csr2", "random_access")
def random_access_csr2(data):
"""Random access using csr(i,j)"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
row_size = data['offsets'][i + 1] - data['offsets'][i]
if row_size > 0:
j = rng.randint(0, row_size - 1)
a_val, b_val = csr(i, j)
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "random_access")
def random_access_direct(data):
"""Random access using direct indexing"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
start = offsets[i]
end = offsets[i + 1]
if end > start:
j = rng.randint(0, end - start - 1)
checksum ^= (array_a[start + j] + array_b[start + j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "random_access")
def random_access_list_of_lists(data):
"""Random access through list of lists"""
list_structure = data['list_structure']
checksum = 0
num_accesses = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_accesses):
i = rng.randint(0, data['num_rows'] - 1)
if len(list_structure[i]) > 0:
j = rng.randint(0, len(list_structure[i]) - 1)
a_val, b_val = list_structure[i][j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Setup for modify operations
@benchmark.setup("csr2", ["modification"])
def setup_csr2_modify(data):
"""Copy data before modification"""
new_data = data.copy()
new_data['array_a'] = data['array_a'].copy()
new_data['array_b'] = data['array_b'].copy()
return new_data
@benchmark.setup("direct_arrays", ["modification"])
def setup_direct_modify(data):
"""Copy data before modification"""
new_data = data.copy()
new_data['array_a'] = data['array_a'].copy()
new_data['array_b'] = data['array_b'].copy()
return new_data
@benchmark.setup("list_of_lists", ["modification"])
def setup_list_of_lists_modify(data):
"""Copy list structure before modification"""
new_data = data.copy()
new_data['list_structure'] = [row.copy() for row in data['list_structure']]
return new_data
# Modification operation
@benchmark.implementation("csr2", "modification")
def modification_csr2(data):
"""Modify elements using csr.set(i,j,val)"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
row_size = data['offsets'][i + 1] - data['offsets'][i]
if row_size > 0:
j = rng.randint(0, row_size - 1)
new_val = (rng.randint(0, 1000000), rng.randint(0, 1000000))
csr.set(i, j, new_val)
checksum ^= (new_val[0] + new_val[1])
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "modification")
def modification_direct(data):
"""Modify elements using direct array access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
start = offsets[i]
end = offsets[i + 1]
if end > start:
j = rng.randint(0, end - start - 1)
new_val_a = rng.randint(0, 1000000)
new_val_b = rng.randint(0, 1000000)
array_a[start + j] = new_val_a
array_b[start + j] = new_val_b
checksum ^= (new_val_a + new_val_b)
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "modification")
def modification_list_of_lists(data):
"""Modify elements in list of lists"""
list_structure = data['list_structure']
checksum = 0
num_modifications = min(1000, data['size'] // 10)
rng = random.Random(42)
for _ in range(num_modifications):
i = rng.randint(0, data['num_rows'] - 1)
if len(list_structure[i]) > 0:
j = rng.randint(0, len(list_structure[i]) - 1)
new_val = (rng.randint(0, 1000000), rng.randint(0, 1000000))
list_structure[i][j] = new_val
checksum ^= (new_val[0] + new_val[1])
return checksum & 0xFFFFFFFF
# Indexed iteration using [i] access
@benchmark.implementation("csr2", "indexed_iter")
def indexed_iter_csr2(data):
"""Iterate using csr[i] access"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for i in range(len(csr)):
row_view = csr[i]
for j in range(len(row_view)):
a_val, b_val = row_view[j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "indexed_iter")
def indexed_iter_direct(data):
"""Iterate using direct array access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "indexed_iter")
def indexed_iter_list_of_lists(data):
"""Iterate using list[i] access"""
list_structure = data['list_structure']
checksum = 0
for i in range(len(list_structure)):
row = list_structure[i]
for j in range(len(row)):
a_val, b_val = row[j]
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Foreach iteration using for-in pattern
@benchmark.implementation("csr2", "foreach_iter")
def foreach_iter_csr2(data):
"""Iterate using for row in csr"""
csr = CSR2(data['array_a'], data['array_b'], data['offsets'])
checksum = 0
for row_view in csr:
for a_val, b_val in row_view:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "foreach_iter")
def foreach_iter_direct(data):
"""Iterate using direct arrays with manual chunking"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
checksum = 0
for i in range(data['num_rows']):
for j in range(offsets[i], offsets[i + 1]):
checksum ^= (array_a[j] + array_b[j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "foreach_iter")
def foreach_iter_list_of_lists(data):
"""Iterate using for row in list"""
list_structure = data['list_structure']
checksum = 0
for row in list_structure:
for a_val, b_val in row:
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
# Bucketize operation
@benchmark.implementation("csr2", "bucketize")
def bucketize_csr2(data):
"""Use CSR2.bucketize method"""
csr = CSR2.bucketize(data['num_rows'], data['keys'], data['values_v'], data['values_w'])
checksum = 0
for i in range(len(csr)):
view = csr[i]
for j in range(len(view)):
checksum ^= (view.A[view.l + j] + view.B[view.l + j])
return checksum & 0xFFFFFFFF
@benchmark.implementation("manual_bucketize", "bucketize")
def bucketize_manual(data):
"""Manual bucketization into lists"""
keys = data['keys']
values_v = data['values_v']
values_w = data['values_w']
num_rows = data['num_rows']
buckets_a = [[] for _ in range(num_rows)]
buckets_b = [[] for _ in range(num_rows)]
for i in range(len(keys)):
k = keys[i]
if 0 <= k < num_rows:
buckets_a[k].append(values_v[i])
buckets_b[k].append(values_w[i])
checksum = 0
for i in range(num_rows):
for j in range(len(buckets_a[i])):
checksum ^= (buckets_a[i][j] + buckets_b[i][j])
return checksum & 0xFFFFFFFF
# Column-1 iteration - indexed access
@benchmark.implementation("csr2", "col1_indexed_iter")
def col1_indexed_csr2(data):
"""Iterate through CSR2 where every row has exactly 1 element using indexed access"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
csr = CSR2(array_a, array_b, offsets)
checksum = 0
# Iterate through many single-element rows using indexing
for i in range(len(csr)):
view = csr[i] # Each view has exactly 1 element
checksum ^= (view.A[view.l] + view.B[view.l]) # Direct access to single element
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "col1_indexed_iter")
def col1_indexed_lists(data):
"""Iterate through list of single-element lists using indexed access"""
list_structure = data['list_structure']
checksum = 0
# Iterate through many single-element lists using indexing
for i in range(len(list_structure)):
a_val, b_val = list_structure[i][0] # Each row has exactly 1 element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "col1_indexed_iter")
def col1_indexed_direct(data):
"""Direct indexed access through arrays (baseline for single elements)"""
array_a = data['array_a']
array_b = data['array_b']
checksum = 0
# Direct indexed iteration - each element is its own "row"
for i in range(len(array_a)):
checksum ^= (array_a[i] + array_b[i])
return checksum & 0xFFFFFFFF
@benchmark.implementation("csr2", "col1_foreach_iter")
def col1_foreach_csr2(data):
"""Iterate through CSR2 using foreach loop where every row has exactly 1 element"""
array_a = data['array_a']
array_b = data['array_b']
offsets = data['offsets']
csr = CSR2(array_a, array_b, offsets)
checksum = 0
# Use foreach iteration pattern
for view in csr: # Each view has exactly 1 element
a_val, b_val = view[0] # Access the single element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("list_of_lists", "col1_foreach_iter")
def col1_foreach_lists(data):
"""Iterate through list of single-element lists using foreach"""
list_structure = data['list_structure']
checksum = 0
# Use foreach iteration pattern
for row in list_structure:
a_val, b_val = row[0] # Each row has exactly 1 element
checksum ^= (a_val + b_val)
return checksum & 0xFFFFFFFF
@benchmark.implementation("direct_arrays", "col1_foreach_iter")
def col1_foreach_direct(data):
"""Direct foreach iteration through arrays (baseline for single elements)"""
array_a = data['array_a']
array_b = data['array_b']
checksum = 0
# Direct foreach iteration - each element is its own "row"
for i in range(len(array_a)):
checksum ^= (array_a[i] + array_b[i])
return checksum & 0xFFFFFFFF
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()