This documentation is automatically generated by online-judge-tools/verification-helper
#!/usr/bin/env python3
"""
Simple edge list benchmark using the new declarative framework.
Much less boilerplate, easier to understand and extend.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.alg.graph.edge.edge_list_weighted_cls import EdgeListWeighted
# Configure benchmark
config = BenchmarkConfig(
name="edge_list",
sizes=[1000000, 100000, 10000, 1000, 100, 10, 1], # Reverse order to warm up JIT
operations=['sum_weights', 'filter', 'degree_count', 'transform', 'sort', 'construction'],
iterations=10,
warmup=2,
output_dir="./output/benchmark_results/edge_list"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generators
@benchmark.data_generator("default")
def generate_edge_data(size: int, operation: str):
"""Generate edge list data in all formats"""
max_node = max(1, int(size ** 0.5) * 2)
# Generate raw edge data
U = [random.randint(0, max_node) for _ in range(size)]
V = [random.randint(0, max_node) for _ in range(size)]
W = [random.randint(1, 1000) for _ in range(size)]
# Create different representations
edges_tuple = [(U[i], V[i], W[i]) for i in range(size)]
edge_list = EdgeListWeighted(max_node + 1, U, V, W)
# Pre-initialize data for fair timing (exclude initialization)
preinitialized = {
'edges_tuple': list(edges_tuple),
'edge_list': EdgeListWeighted(max_node + 1, list(U), list(V), list(W)),
'U': list(U), 'V': list(V), 'W': list(W),
'threshold': 500,
'max_node': max_node,
'degree_array': [0] * (max_node + 1)
}
return {
'edges_tuple': edges_tuple,
'edge_list': edge_list,
'U': U, 'V': V, 'W': W,
'size': size,
'operation': operation,
'threshold': 500,
'max_node': max_node,
'preinitialized': preinitialized
}
# Construction benchmarks - all should do equivalent work
@benchmark.implementation("construct_tuple", "construction")
def construct_tuple_list(data):
"""Construct list of tuples from raw data"""
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i]) for i in range(len(U))]
@benchmark.implementation("construct_edge_list_ref", "construction")
def construct_edge_list_ref(data):
"""Construct EdgeListWeighted from raw data (reference assignment)"""
U, V, W = data['U'], data['V'], data['W']
return EdgeListWeighted(data['max_node'] + 1, U, V, W)
@benchmark.implementation("construct_edge_list_copy", "construction")
def construct_edge_list_copy(data):
"""Construct EdgeListWeighted from copied data (fair comparison)"""
U, V, W = data['U'], data['V'], data['W']
return EdgeListWeighted(data['max_node'] + 1, list(U), list(V), list(W))
@benchmark.implementation("construct_separated", "construction")
def construct_separated_lists(data):
"""Create separated lists (copy data)"""
U, V, W = data['U'], data['V'], data['W']
return (list(U), list(V), list(W))
# Sum weights operation
@benchmark.implementation("tuple_direct", "sum_weights")
def sum_weights_tuple_direct(data):
"""Sum weights using direct tuple iteration"""
return sum(w for u, v, w in data['edges_tuple'])
@benchmark.implementation("edge_list_iter", "sum_weights")
def sum_weights_edge_list_iter(data):
"""Sum weights using EdgeListWeighted iteration"""
return sum(w for u, v, w in data['edge_list'])
@benchmark.implementation("edge_list_direct", "sum_weights")
def sum_weights_edge_list_direct(data):
"""Sum weights using EdgeListWeighted direct access"""
return sum(data['edge_list'].W)
@benchmark.implementation("separated_lists", "sum_weights")
def sum_weights_separated(data):
"""Sum weights using separated lists"""
return sum(data['W'])
# Filter operation
@benchmark.implementation("tuple_direct", "filter")
def filter_tuple_direct(data):
"""Filter edges using tuple iteration"""
threshold = data['threshold']
return [(u, v, w) for u, v, w in data['edges_tuple'] if w > threshold]
@benchmark.implementation("edge_list_iter", "filter")
def filter_edge_list_iter(data):
"""Filter edges using EdgeListWeighted iteration"""
threshold = data['threshold']
return [(u, v, w) for u, v, w in data['edge_list'] if w > threshold]
@benchmark.implementation("edge_list_direct", "filter")
def filter_edge_list_direct(data):
"""Filter edges using EdgeListWeighted direct access"""
threshold = data['threshold']
edge_list = data['edge_list']
result = []
for i in range(len(edge_list)):
if edge_list.W[i] > threshold:
result.append((edge_list.U[i], edge_list.V[i], edge_list.W[i]))
return result
@benchmark.implementation("separated_lists", "filter")
def filter_separated(data):
"""Filter edges using separated lists"""
threshold = data['threshold']
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i]) for i in range(len(W)) if W[i] > threshold]
# Degree count operation
@benchmark.implementation("tuple_direct", "degree_count")
def degree_count_tuple_direct(data):
"""Count degrees using tuple iteration"""
degree = [0] * (data['max_node'] + 1)
for u, v, w in data['edges_tuple']:
degree[u] += 1
return degree
@benchmark.implementation("edge_list_iter", "degree_count")
def degree_count_edge_list_iter(data):
"""Count degrees using EdgeListWeighted iteration"""
degree = [0] * (data['max_node'] + 1)
for u, v, w in data['edge_list']:
degree[u] += 1
return degree
@benchmark.implementation("edge_list_direct", "degree_count")
def degree_count_edge_list_direct(data):
"""Count degrees using EdgeListWeighted direct access"""
degree = [0] * (data['max_node'] + 1)
for u in data['edge_list'].U:
degree[u] += 1
return degree
@benchmark.implementation("separated_lists", "degree_count")
def degree_count_separated(data):
"""Count degrees using separated lists"""
degree = [0] * (data['max_node'] + 1)
for u in data['U']:
degree[u] += 1
return degree
# Transform operation
@benchmark.implementation("tuple_direct", "transform")
def transform_tuple_direct(data):
"""Transform edges using tuple iteration"""
return [(u, v, w * 2) for u, v, w in data['edges_tuple']]
@benchmark.implementation("edge_list_iter", "transform")
def transform_edge_list_iter(data):
"""Transform edges using EdgeListWeighted iteration"""
return [(u, v, w * 2) for u, v, w in data['edge_list']]
@benchmark.implementation("edge_list_direct", "transform")
def transform_edge_list_direct(data):
"""Transform edges using EdgeListWeighted direct access"""
edge_list = data['edge_list']
return [(edge_list.U[i], edge_list.V[i], edge_list.W[i] * 2)
for i in range(len(edge_list))]
@benchmark.implementation("separated_lists", "transform")
def transform_separated(data):
"""Transform edges using separated lists"""
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i] * 2) for i in range(len(W))]
# Sort operation
@benchmark.implementation("tuple_direct", "sort")
def sort_tuple_direct(data):
"""Sort edges using tuple list"""
edges = list(data['edges_tuple'])
edges.sort(key=lambda x: x[2])
return edges
@benchmark.implementation("edge_list_sort", "sort")
def sort_edge_list_builtin(data):
"""Sort edges using EdgeListWeighted built-in sort"""
edge_list = EdgeListWeighted(data['edge_list'].N,
list(data['edge_list'].U),
list(data['edge_list'].V),
list(data['edge_list'].W))
edge_list.sort()
return edge_list
@benchmark.implementation("separated_lists", "sort")
def sort_separated(data):
"""Sort edges using separated lists"""
U, V, W = list(data['U']), list(data['V']), list(data['W'])
# Sort by weight using indices
indices = sorted(range(len(W)), key=lambda i: W[i])
return ([U[i] for i in indices], [V[i] for i in indices], [W[i] for i in indices])
# Custom validators
@benchmark.validator("sort")
def validate_sort(expected, actual):
"""Validate that sort results are equivalent"""
# Convert both to comparable format
if hasattr(expected, 'W'): # EdgeListWeighted
expected_weights = expected.W
elif isinstance(expected, tuple): # separated lists
expected_weights = expected[2]
else: # list of tuples
expected_weights = [w for u, v, w in expected]
if hasattr(actual, 'W'): # EdgeListWeighted
actual_weights = actual.W
elif isinstance(actual, tuple): # separated lists
actual_weights = actual[2]
else: # list of tuples
actual_weights = [w for u, v, w in actual]
return expected_weights == actual_weights
@benchmark.validator("construction")
def validate_construction(expected, actual):
"""Validate construction results"""
# Just check that something was created and has the right size
if actual is None:
return False
# Check based on type
if isinstance(actual, list): # tuple list
return len(actual) > 0
elif hasattr(actual, 'M'): # EdgeListWeighted
return actual.M > 0
elif isinstance(actual, tuple): # separated lists
return len(actual[0]) > 0
return True
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()
#!/usr/bin/env python3
"""
Simple edge list benchmark using the new declarative framework.
Much less boilerplate, easier to understand and extend.
"""
import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
"""
Declarative benchmark framework with minimal boilerplate.
Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""
import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs"""
name: str
sizes: List[int] = None
operations: List[str] = None
iterations: int = 10
warmup: int = 2
output_dir: str = "./output/benchmark_results"
save_results: bool = True
plot_results: bool = True
plot_scale: str = "loglog" # Options: "loglog", "linear", "semilogx", "semilogy"
progressive: bool = True # Show results operation by operation across sizes
# Profiling mode
profile_mode: bool = False
profile_size: int = None
profile_operation: str = None
profile_implementation: str = None
def __post_init__(self):
if self.sizes is None:
self.sizes = [100, 1000, 10000, 100000]
if self.operations is None:
self.operations = ['default']
class Benchmark:
"""Declarative benchmark framework using decorators"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.data_generators = {}
self.implementations = {}
self.validators = {}
self.setups = {}
self.results = []
def profile(self, operation: str = None, size: int = None, implementation: str = None):
"""Create a profiling version of this benchmark"""
profile_config = BenchmarkConfig(
name=f"{self.config.name}_profile",
sizes=self.config.sizes,
operations=self.config.operations,
profile_mode=True,
profile_operation=operation,
profile_size=size,
profile_implementation=implementation,
save_results=False,
plot_results=False
)
profile_benchmark = Benchmark(profile_config)
profile_benchmark.data_generators = self.data_generators
profile_benchmark.implementations = self.implementations
profile_benchmark.validators = self.validators
profile_benchmark.setups = self.setups
return profile_benchmark
def parse_args(self):
"""Parse command line arguments for profiling mode"""
parser = argparse.ArgumentParser(
description=f"Benchmark {self.config.name} with optional profiling mode",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Normal benchmark mode
python benchmark.py
# Profile specific operation and implementation
python benchmark.py --profile --operation random_access --implementation grid
# Profile with specific size
python benchmark.py --profile --size 1000000
# Profile all implementations of an operation
python benchmark.py --profile --operation construction
"""
)
parser.add_argument('--profile', action='store_true',
help='Run in profiling mode (minimal overhead for profilers)')
parser.add_argument('--operation', type=str,
help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
parser.add_argument('--size', type=int,
help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
parser.add_argument('--implementation', type=str,
help='Specific implementation to profile (default: all)')
args = parser.parse_args()
# If profile mode requested, return a profiling benchmark
if args.profile:
return self.profile(
operation=args.operation,
size=args.size,
implementation=args.implementation
)
# Otherwise return self for normal mode
return self
def data_generator(self, name: str = "default"):
"""Decorator to register data generator"""
def decorator(func):
self.data_generators[name] = func
return func
return decorator
def implementation(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register implementation"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.implementations:
self.implementations[op] = {}
self.implementations[op][name] = func
return func
return decorator
def validator(self, operation: str = "default"):
"""Decorator to register custom validator"""
def decorator(func):
self.validators[operation] = func
return func
return decorator
def setup(self, name: str, operations: Union[str, List[str]] = None):
"""Decorator to register setup function that runs before timing"""
if operations is None:
operations = ['default']
elif isinstance(operations, str):
operations = [operations]
def decorator(func):
for op in operations:
if op not in self.setups:
self.setups[op] = {}
self.setups[op][name] = func
return func
return decorator
def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
"""Measure execution time with warmup and optional setup"""
# Warmup runs
for _ in range(self.config.warmup):
try:
if setup_func:
setup_data = setup_func(data)
func(setup_data)
else:
func(data)
except Exception:
# If warmup fails, let the main measurement handle the error
break
# Actual measurement
start = time.perf_counter()
for _ in range(self.config.iterations):
if setup_func:
setup_data = setup_func(data)
result = func(setup_data)
else:
result = func(data)
elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
return result, elapsed_ms
def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
"""Validate result using custom validator or default comparison"""
if operation in self.validators:
return self.validators[operation](expected, actual)
return expected == actual
def run(self):
"""Run all benchmarks"""
if self.config.profile_mode:
self._run_profile_mode()
else:
self._run_normal_mode()
def _run_normal_mode(self):
"""Run normal benchmark mode"""
print(f"Running {self.config.name}")
print(f"Sizes: {self.config.sizes}")
print(f"Operations: {self.config.operations}")
print("="*80)
# Always show progressive results: operation by operation across all sizes
for operation in self.config.operations:
for size in self.config.sizes:
self._run_single(operation, size)
# Save and plot results
if self.config.save_results:
self._save_results()
if self.config.plot_results:
self._plot_results()
# Print summary
self._print_summary()
def _run_profile_mode(self):
"""Run profiling mode with minimal overhead for use with vmprof"""
operation = self.config.profile_operation or self.config.operations[0]
size = self.config.profile_size or max(self.config.sizes)
impl_name = self.config.profile_implementation
print(f"PROFILING MODE: {self.config.name}")
print(f"Operation: {operation}, Size: {size}")
if impl_name:
print(f"Implementation: {impl_name}")
print("="*80)
print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
print("="*80)
# Generate test data
generator = self.data_generators.get(operation, self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations
impls = self.implementations.get(operation, {})
if not impls:
raise ValueError(f"No implementations for operation: {operation}")
# Filter to specific implementation if requested
if impl_name:
if impl_name not in impls:
raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
impls = {impl_name: impls[impl_name]}
# Run with minimal overhead - no timing, no validation
for name, func in impls.items():
print(f"\nRunning {name}...")
sys.stdout.flush()
# Setup if needed
setup_func = self.setups.get(operation, {}).get(name)
if setup_func:
data = setup_func(test_data)
else:
data = test_data
# Run the actual function (this is what vmprof will profile)
result = func(data)
print(f"Completed {name}, result checksum: {result}")
sys.stdout.flush()
def _run_single(self, operation: str, size: int):
"""Run a single operation/size combination"""
print(f"\nOperation: {operation}, Size: {size}")
print("-" * 50)
sys.stdout.flush()
# Generate test data
generator = self.data_generators.get(operation,
self.data_generators.get('default'))
if not generator:
raise ValueError(f"No data generator for operation: {operation}")
test_data = generator(size, operation)
# Get implementations for this operation
impls = self.implementations.get(operation, {})
if not impls:
print(f"No implementations for operation: {operation}")
return
# Get setup functions for this operation
setups = self.setups.get(operation, {})
# Run reference implementation first
ref_name, ref_impl = next(iter(impls.items()))
ref_setup = setups.get(ref_name)
expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
# Run all implementations
for impl_name, impl_func in impls.items():
try:
setup_func = setups.get(impl_name)
result, time_ms = self.measure_time(impl_func, test_data, setup_func)
correct = self.validate_result(expected_result, result, operation)
# Store result
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': time_ms,
'correct': correct,
'error': None
})
status = "OK" if correct else "FAIL"
print(f" {impl_name:<20} {time_ms:>8.3f} ms {status}")
sys.stdout.flush()
except Exception as e:
self.results.append({
'operation': operation,
'size': size,
'implementation': impl_name,
'time_ms': float('inf'),
'correct': False,
'error': str(e)
})
print(f" {impl_name:<20} ERROR: {str(e)[:40]}")
sys.stdout.flush()
def _save_results(self):
"""Save results to JSON"""
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to {filename}")
def _plot_results(self):
"""Generate plots using matplotlib if available"""
try:
import matplotlib.pyplot as plt
output_dir = Path(self.config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group and prepare data for plotting
data_by_op = self._group_results_by_operation()
# Create plots for each operation
for operation, operation_data in data_by_op.items():
self._create_performance_plot(plt, operation, operation_data, output_dir)
except ImportError:
print("Matplotlib not available - skipping plots")
except Exception as e:
print(f"Plotting failed: {e}")
def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
"""Group results by operation and size for plotting"""
data_by_op = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['time_ms'] != float('inf') and r['correct']:
data_by_op[r['operation']][r['size']].append({
'implementation': r['implementation'],
'time_ms': r['time_ms']
})
return data_by_op
def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
"""Create a performance plot for a single operation"""
sizes = sorted(operation_data.keys())
implementations = set()
for size_data in operation_data.values():
for entry in size_data:
implementations.add(entry['implementation'])
implementations = sorted(implementations)
plt.figure(figsize=(10, 6))
for impl in implementations:
impl_times = []
impl_sizes = []
for size in sizes:
times = [entry['time_ms'] for entry in operation_data[size]
if entry['implementation'] == impl]
if times:
impl_times.append(statistics.mean(times))
impl_sizes.append(size)
if impl_times:
plt.plot(impl_sizes, impl_times, 'o-', label=impl)
plt.xlabel('Input Size')
plt.ylabel('Time (ms)')
plt.title(f'{self.config.name} - {operation} Operation')
plt.legend()
plt.grid(True, alpha=0.3)
# Apply the configured scaling
if self.config.plot_scale == "loglog":
plt.loglog()
elif self.config.plot_scale == "linear":
pass # Default linear scale
elif self.config.plot_scale == "semilogx":
plt.semilogx()
elif self.config.plot_scale == "semilogy":
plt.semilogy()
else:
# Default to loglog if invalid option
plt.loglog()
plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Plot saved: {plot_file}")
def _print_summary(self):
"""Print performance summary"""
print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
# Group by operation
by_operation = defaultdict(lambda: defaultdict(list))
for r in self.results:
if r['error'] is None and r['time_ms'] != float('inf'):
by_operation[r['operation']][r['implementation']].append(r['time_ms'])
print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
print("-" * 70)
for op, impl_times in sorted(by_operation.items()):
# Calculate averages
avg_times = [(impl, statistics.mean(times))
for impl, times in impl_times.items()]
avg_times.sort(key=lambda x: x[1])
if avg_times:
best_impl, best_time = avg_times[0]
worst_time = avg_times[-1][1]
speedup = worst_time / best_time if best_time > 0 else 0
print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
https://kobejean.github.io/cp-library
'''
def argsort(A: list[int], reverse=False):
P = Packer(len(I := list(A))-1); P.ienumerate(I, reverse); I.sort(); P.iindices(I)
return I
class Packer:
__slots__ = 's', 'm'
def __init__(P, mx: int): P.s = mx.bit_length(); P.m = (1 << P.s) - 1
def enc(P, a: int, b: int): return a << P.s | b
def dec(P, x: int) -> tuple[int, int]: return x >> P.s, x & P.m
def enumerate(P, A, reverse=False): P.ienumerate(A:=list(A), reverse); return A
def ienumerate(P, A, reverse=False):
if reverse:
for i,a in enumerate(A): A[i] = P.enc(-a, i)
else:
for i,a in enumerate(A): A[i] = P.enc(a, i)
def indices(P, A: list[int]): P.iindices(A:=list(A)); return A
def iindices(P, A):
for i,a in enumerate(A): A[i] = P.m&a
def isort_parallel(*L: list, reverse=False):
inv, order = [0]*len(L[0]), argsort(L[0], reverse=reverse)
for i, j in enumerate(order): inv[j] = i
for i, j in enumerate(order):
for A in L: A[i], A[j] = A[j], A[i]
order[inv[i]], inv[j] = j, inv[i]
return L
def sort_parallel(*L: list, reverse=False):
N, K, order = len(L[0]), len(L), argsort(L[0], reverse)
R = tuple([0]*N for _ in range(K))
for k, Lk in enumerate(L):
Rk = R[k]
for i, j in enumerate(order): Rk[i] = Lk[j]
return R
import typing
from collections import deque
from numbers import Number
from types import GenericAlias
from typing import Callable, Collection, Iterator, Union
from io import BytesIO, IOBase
class FastIO(IOBase):
BUFSIZE = 8192
newlines = 0
def __init__(self, file):
self._fd = file.fileno()
self.buffer = BytesIO()
self.writable = "x" in file.mode or "r" not in file.mode
self.write = self.buffer.write if self.writable else None
def read(self):
BUFSIZE = self.BUFSIZE
while True:
b = os.read(self._fd, max(os.fstat(self._fd).st_size, BUFSIZE))
if not b: break
ptr = self.buffer.tell()
self.buffer.seek(0, 2), self.buffer.write(b), self.buffer.seek(ptr)
self.newlines = 0
return self.buffer.read()
def readline(self):
BUFSIZE = self.BUFSIZE
while self.newlines == 0:
b = os.read(self._fd, max(os.fstat(self._fd).st_size, BUFSIZE))
self.newlines = b.count(b"\n") + (not b)
ptr = self.buffer.tell()
self.buffer.seek(0, 2), self.buffer.write(b), self.buffer.seek(ptr)
self.newlines -= 1
return self.buffer.readline()
def flush(self):
if self.writable:
os.write(self._fd, self.buffer.getvalue())
self.buffer.truncate(0), self.buffer.seek(0)
class IOWrapper(IOBase):
stdin: 'IOWrapper' = None
stdout: 'IOWrapper' = None
def __init__(self, file):
self.buffer = FastIO(file)
self.flush = self.buffer.flush
self.writable = self.buffer.writable
def write(self, s):
return self.buffer.write(s.encode("ascii"))
def read(self):
return self.buffer.read().decode("ascii")
def readline(self):
return self.buffer.readline().decode("ascii")
try:
sys.stdin = IOWrapper.stdin = IOWrapper(sys.stdin)
sys.stdout = IOWrapper.stdout = IOWrapper(sys.stdout)
except:
pass
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')
class TokenStream(Iterator):
stream = IOWrapper.stdin
def __init__(self):
self.queue = deque()
def __next__(self):
if not self.queue: self.queue.extend(self._line())
return self.queue.popleft()
def wait(self):
if not self.queue: self.queue.extend(self._line())
while self.queue: yield
def _line(self):
return TokenStream.stream.readline().split()
def line(self):
if self.queue:
A = list(self.queue)
self.queue.clear()
return A
return self._line()
TokenStream.default = TokenStream()
class CharStream(TokenStream):
def _line(self):
return TokenStream.stream.readline().rstrip()
CharStream.default = CharStream()
ParseFn = Callable[[TokenStream],_T]
class Parser:
def __init__(self, spec: Union[type[_T],_T]):
self.parse = Parser.compile(spec)
def __call__(self, ts: TokenStream) -> _T:
return self.parse(ts)
@staticmethod
def compile_type(cls: type[_T], args = ()) -> _T:
if issubclass(cls, Parsable):
return cls.compile(*args)
elif issubclass(cls, (Number, str)):
def parse(ts: TokenStream): return cls(next(ts))
return parse
elif issubclass(cls, tuple):
return Parser.compile_tuple(cls, args)
elif issubclass(cls, Collection):
return Parser.compile_collection(cls, args)
elif callable(cls):
def parse(ts: TokenStream):
return cls(next(ts))
return parse
else:
raise NotImplementedError()
@staticmethod
def compile(spec: Union[type[_T],_T]=int) -> ParseFn[_T]:
if isinstance(spec, (type, GenericAlias)):
cls = typing.get_origin(spec) or spec
args = typing.get_args(spec) or tuple()
return Parser.compile_type(cls, args)
elif isinstance(offset := spec, Number):
cls = type(spec)
def parse(ts: TokenStream): return cls(next(ts)) + offset
return parse
elif isinstance(args := spec, tuple):
return Parser.compile_tuple(type(spec), args)
elif isinstance(args := spec, Collection):
return Parser.compile_collection(type(spec), args)
elif isinstance(fn := spec, Callable):
def parse(ts: TokenStream): return fn(next(ts))
return parse
else:
raise NotImplementedError()
@staticmethod
def compile_line(cls: _T, spec=int) -> ParseFn[_T]:
if spec is int:
fn = Parser.compile(spec)
def parse(ts: TokenStream): return cls([int(token) for token in ts.line()])
return parse
else:
fn = Parser.compile(spec)
def parse(ts: TokenStream): return cls([fn(ts) for _ in ts.wait()])
return parse
@staticmethod
def compile_repeat(cls: _T, spec, N) -> ParseFn[_T]:
fn = Parser.compile(spec)
def parse(ts: TokenStream): return cls([fn(ts) for _ in range(N)])
return parse
@staticmethod
def compile_children(cls: _T, specs) -> ParseFn[_T]:
fns = tuple((Parser.compile(spec) for spec in specs))
def parse(ts: TokenStream): return cls([fn(ts) for fn in fns])
return parse
@staticmethod
def compile_tuple(cls: type[_T], specs) -> ParseFn[_T]:
if isinstance(specs, (tuple,list)) and len(specs) == 2 and specs[1] is ...:
return Parser.compile_line(cls, specs[0])
else:
return Parser.compile_children(cls, specs)
@staticmethod
def compile_collection(cls, specs):
if not specs or len(specs) == 1 or isinstance(specs, set):
return Parser.compile_line(cls, *specs)
elif (isinstance(specs, (tuple,list)) and len(specs) == 2 and isinstance(specs[1], int)):
return Parser.compile_repeat(cls, specs[0], specs[1])
else:
raise NotImplementedError()
class Parsable:
@classmethod
def compile(cls):
def parser(ts: TokenStream): return cls(next(ts))
return parser
@classmethod
def __class_getitem__(cls, item):
return GenericAlias(cls, item)
class EdgeListWeighted(Parsable):
def __init__(E, N: int, U: list[int], V: list[int], W: list[int]): E.N, E.M, E.U, E.V, E.W = N, len(U), U, V, W
def __len__(E): return E.M
def __getitem__(E, e): return E.U[e], E.V[e], E.W[e]
@classmethod
def compile(cls, N: int, M: int, I: int = -1):
def parse(ts: TokenStream):
U, V, W = [0]*M, [0]*M, [0]*M
for e in range(M): u, v, w = ts.line(); U[e], V[e], W[e] = int(u)+I, int(v)+I, int(w)
return cls(N, U, V, W)
return parse
def kruskal(E):
dsu, I = DSU(E.N), elist(E.N-1)
for e in argsort(E.W):
x, y = dsu.merge(E.U[e], E.V[e])
if x != y: I.append(e)
return I
def edmond(E, root):
U, W, F, pkr, dsu = [0]*E.N, [0]*E.N, SkewHeapForrest(E.N, E.M), Packer(E.M), DSU(E.N)
Ein, stem, cyc, I, C = [-1]*E.M, [-1]*E.N, [], [], []; vis = [0]*E.N; vis[root] = 2
for e in range(E.M): F.push(E.V[e], pkr.enc(E.W[e], e))
for v in range(E.N):
if vis[v := dsu.root(v)]: continue
cycle = 0; C.clear()
while vis[v] != 2:
if F.empty(v): return None
vis[v] = 1; cyc.append(v); W[v], e = pkr.dec(F.pop(v)); U[v] = dsu.root(E.U[e])
if stem[v] == -1: stem[v] = e
if U[v] == v: continue
while cycle: Ein[C.pop()] = e; cycle -= 1
I.append(e); C.append(e)
if vis[U[v]] == 1:
if not F.empty(v): F.add(v, -W[v]<<pkr.s)
U[v] = p = dsu.root(U[v]); cycle += 1
while p != v:
if not F.empty(p): F.add(p, -W[p]<<pkr.s)
F.roots[v := dsu.merge_dest(v, p)] = F.merge(F.roots[v], F.roots[p])
U[p] = p = dsu.root(U[p]); cycle += 1
else:
v = U[v]
while cyc: vis[cyc.pop()] = 2
vis, T = [0]*E.M, []
for e in reversed(I):
if vis[e]: continue
f = stem[E.V[e]]; T.append(e)
while f != e: vis[f] = 1; f = Ein[f]
return T
def sort(E):
isort_parallel(E.W, E.U, E.V)
def sub(E, I: list[int]):
U, V, W = elist(E.N-1), elist(E.N-1), elist(E.N-1)
for e in I: U.append(E.U[e]); V.append(E.V[e]); W.append(E.W[e])
return E.__class__(E.N, U, V, W)
class DSU(Parsable):
def __init__(dsu, N): dsu.N, dsu.cc, dsu.par = N, N, [-1]*N
def merge(dsu, u, v):
x, y = dsu.root(u), dsu.root(v)
if x == y: return x,y
if dsu.par[x] > dsu.par[y]: x, y = y, x
dsu.par[x] += dsu.par[y]; dsu.par[y] = x; dsu.cc -= 1
return x, y
def root(dsu, i) -> int:
p = (par := dsu.par)[i]
while p >= 0:
if par[p] < 0: return p
par[i], i, p = par[p], par[p], par[par[p]]
return i
def groups(dsu) -> 'CSRIncremental[int]':
sizes, row, p = [0]*dsu.cc, [-1]*dsu.N, 0
for i in range(dsu.cc):
while dsu.par[p] >= 0: p += 1
sizes[i], row[p] = -dsu.par[p], i; p += 1
csr = CSRIncremental(sizes)
for i in range(dsu.N): csr.append(row[dsu.root(i)], i)
return csr
__iter__ = groups
def merge_dest(dsu, u, v): return dsu.merge(u, v)[0]
def same(dsu, u: int, v: int): return dsu.root(u) == dsu.root(v)
def size(dsu, i) -> int: return -dsu.par[dsu.root(i)]
def __len__(dsu): return dsu.cc
def __contains__(dsu, uv): u, v = uv; return dsu.same(u, v)
@classmethod
def compile(cls, N: int, M: int, shift = -1):
def parse_fn(ts: TokenStream):
dsu = cls(N)
for _ in range(M): u, v = ts._line(); dsu.merge(int(u)+shift, int(v)+shift)
return dsu
return parse_fn
from typing import Sequence
class CSRIncremental(Sequence[list[_T]]):
def __init__(csr, sizes: list[int]):
csr.L, N = [0]*len(sizes), 0
for i,sz in enumerate(sizes):
csr.L[i] = N; N += sz
csr.R, csr.A = csr.L[:], [0]*N
def append(csr, i: int, x: _T):
csr.A[csr.R[i]] = x; csr.R[i] += 1
def __iter__(csr):
for i,l in enumerate(csr.L):
yield csr.A[l:csr.R[i]]
def __getitem__(csr, i: int) -> _T:
return csr.A[i]
def __len__(dsu):
return len(dsu.L)
def range(csr, i: int) -> _T:
return range(csr.L[i], csr.R[i])
def elist(est_len: int) -> list: ...
try:
from __pypy__ import newlist_hint
except:
def newlist_hint(hint):
return []
elist = newlist_hint
import operator
from typing import Generic
class SkewHeapForrest(Generic[_T]):
def __init__(shf, N, M, e: _T = 0, op = operator.add):
shf.V, shf.A, shf.L, shf.R, shf.roots = [e]*M, [e]*M, [-1]*M, [-1]*M, [-1]*N
shf.id, shf.st, shf.e, shf.op = 0, elist(M), e, op
def propagate(shf, u: int):
if (a := shf.A[u]) != shf.e:
if ~(l := shf.L[u]): shf.A[l] = shf.op(shf.A[l], a)
if ~(r := shf.R[u]): shf.A[r] = shf.op(shf.A[r], a)
shf.V[u] = shf.op(shf.V[u], a); shf.A[u] = shf.e
def merge(shf, u: int, v: int):
while ~u and ~v:
shf.propagate(u); shf.propagate(v)
if shf.V[v] < shf.V[u]: u, v = v, u
shf.st.append(u); shf.R[u], u = shf.L[u], shf.R[u]
u = u if ~u else v
while shf.st: shf.L[u := shf.st.pop()] = u
return u
def min(shf, i: int):
assert ~(root := shf.roots[i])
shf.propagate(root)
return shf.V[root]
def push(shf, i: int, x: _T):
shf.V[shf.id] = x
shf.roots[i] = shf.merge(shf.roots[i], shf.id)
shf.id += 1
def pop(shf, i: int) -> _T:
assert ~(root := shf.roots[i])
shf.propagate(root)
val, shf.roots[i] = shf.V[root], shf.merge(shf.L[root], shf.R[root])
return val
def add(shf, i: int, val: _T): shf.A[shf.roots[i]] = shf.op(shf.A[shf.roots[i]], val)
def empty(shf, i: int): return shf.roots[i] == -1
# Configure benchmark
config = BenchmarkConfig(
name="edge_list",
sizes=[1000000, 100000, 10000, 1000, 100, 10, 1], # Reverse order to warm up JIT
operations=['sum_weights', 'filter', 'degree_count', 'transform', 'sort', 'construction'],
iterations=10,
warmup=2,
output_dir="./output/benchmark_results/edge_list"
)
# Create benchmark instance
benchmark = Benchmark(config)
# Data generators
@benchmark.data_generator("default")
def generate_edge_data(size: int, operation: str):
"""Generate edge list data in all formats"""
max_node = max(1, int(size ** 0.5) * 2)
# Generate raw edge data
U = [random.randint(0, max_node) for _ in range(size)]
V = [random.randint(0, max_node) for _ in range(size)]
W = [random.randint(1, 1000) for _ in range(size)]
# Create different representations
edges_tuple = [(U[i], V[i], W[i]) for i in range(size)]
edge_list = EdgeListWeighted(max_node + 1, U, V, W)
# Pre-initialize data for fair timing (exclude initialization)
preinitialized = {
'edges_tuple': list(edges_tuple),
'edge_list': EdgeListWeighted(max_node + 1, list(U), list(V), list(W)),
'U': list(U), 'V': list(V), 'W': list(W),
'threshold': 500,
'max_node': max_node,
'degree_array': [0] * (max_node + 1)
}
return {
'edges_tuple': edges_tuple,
'edge_list': edge_list,
'U': U, 'V': V, 'W': W,
'size': size,
'operation': operation,
'threshold': 500,
'max_node': max_node,
'preinitialized': preinitialized
}
# Construction benchmarks - all should do equivalent work
@benchmark.implementation("construct_tuple", "construction")
def construct_tuple_list(data):
"""Construct list of tuples from raw data"""
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i]) for i in range(len(U))]
@benchmark.implementation("construct_edge_list_ref", "construction")
def construct_edge_list_ref(data):
"""Construct EdgeListWeighted from raw data (reference assignment)"""
U, V, W = data['U'], data['V'], data['W']
return EdgeListWeighted(data['max_node'] + 1, U, V, W)
@benchmark.implementation("construct_edge_list_copy", "construction")
def construct_edge_list_copy(data):
"""Construct EdgeListWeighted from copied data (fair comparison)"""
U, V, W = data['U'], data['V'], data['W']
return EdgeListWeighted(data['max_node'] + 1, list(U), list(V), list(W))
@benchmark.implementation("construct_separated", "construction")
def construct_separated_lists(data):
"""Create separated lists (copy data)"""
U, V, W = data['U'], data['V'], data['W']
return (list(U), list(V), list(W))
# Sum weights operation
@benchmark.implementation("tuple_direct", "sum_weights")
def sum_weights_tuple_direct(data):
"""Sum weights using direct tuple iteration"""
return sum(w for u, v, w in data['edges_tuple'])
@benchmark.implementation("edge_list_iter", "sum_weights")
def sum_weights_edge_list_iter(data):
"""Sum weights using EdgeListWeighted iteration"""
return sum(w for u, v, w in data['edge_list'])
@benchmark.implementation("edge_list_direct", "sum_weights")
def sum_weights_edge_list_direct(data):
"""Sum weights using EdgeListWeighted direct access"""
return sum(data['edge_list'].W)
@benchmark.implementation("separated_lists", "sum_weights")
def sum_weights_separated(data):
"""Sum weights using separated lists"""
return sum(data['W'])
# Filter operation
@benchmark.implementation("tuple_direct", "filter")
def filter_tuple_direct(data):
"""Filter edges using tuple iteration"""
threshold = data['threshold']
return [(u, v, w) for u, v, w in data['edges_tuple'] if w > threshold]
@benchmark.implementation("edge_list_iter", "filter")
def filter_edge_list_iter(data):
"""Filter edges using EdgeListWeighted iteration"""
threshold = data['threshold']
return [(u, v, w) for u, v, w in data['edge_list'] if w > threshold]
@benchmark.implementation("edge_list_direct", "filter")
def filter_edge_list_direct(data):
"""Filter edges using EdgeListWeighted direct access"""
threshold = data['threshold']
edge_list = data['edge_list']
result = []
for i in range(len(edge_list)):
if edge_list.W[i] > threshold:
result.append((edge_list.U[i], edge_list.V[i], edge_list.W[i]))
return result
@benchmark.implementation("separated_lists", "filter")
def filter_separated(data):
"""Filter edges using separated lists"""
threshold = data['threshold']
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i]) for i in range(len(W)) if W[i] > threshold]
# Degree count operation
@benchmark.implementation("tuple_direct", "degree_count")
def degree_count_tuple_direct(data):
"""Count degrees using tuple iteration"""
degree = [0] * (data['max_node'] + 1)
for u, v, w in data['edges_tuple']:
degree[u] += 1
return degree
@benchmark.implementation("edge_list_iter", "degree_count")
def degree_count_edge_list_iter(data):
"""Count degrees using EdgeListWeighted iteration"""
degree = [0] * (data['max_node'] + 1)
for u, v, w in data['edge_list']:
degree[u] += 1
return degree
@benchmark.implementation("edge_list_direct", "degree_count")
def degree_count_edge_list_direct(data):
"""Count degrees using EdgeListWeighted direct access"""
degree = [0] * (data['max_node'] + 1)
for u in data['edge_list'].U:
degree[u] += 1
return degree
@benchmark.implementation("separated_lists", "degree_count")
def degree_count_separated(data):
"""Count degrees using separated lists"""
degree = [0] * (data['max_node'] + 1)
for u in data['U']:
degree[u] += 1
return degree
# Transform operation
@benchmark.implementation("tuple_direct", "transform")
def transform_tuple_direct(data):
"""Transform edges using tuple iteration"""
return [(u, v, w * 2) for u, v, w in data['edges_tuple']]
@benchmark.implementation("edge_list_iter", "transform")
def transform_edge_list_iter(data):
"""Transform edges using EdgeListWeighted iteration"""
return [(u, v, w * 2) for u, v, w in data['edge_list']]
@benchmark.implementation("edge_list_direct", "transform")
def transform_edge_list_direct(data):
"""Transform edges using EdgeListWeighted direct access"""
edge_list = data['edge_list']
return [(edge_list.U[i], edge_list.V[i], edge_list.W[i] * 2)
for i in range(len(edge_list))]
@benchmark.implementation("separated_lists", "transform")
def transform_separated(data):
"""Transform edges using separated lists"""
U, V, W = data['U'], data['V'], data['W']
return [(U[i], V[i], W[i] * 2) for i in range(len(W))]
# Sort operation
@benchmark.implementation("tuple_direct", "sort")
def sort_tuple_direct(data):
"""Sort edges using tuple list"""
edges = list(data['edges_tuple'])
edges.sort(key=lambda x: x[2])
return edges
@benchmark.implementation("edge_list_sort", "sort")
def sort_edge_list_builtin(data):
"""Sort edges using EdgeListWeighted built-in sort"""
edge_list = EdgeListWeighted(data['edge_list'].N,
list(data['edge_list'].U),
list(data['edge_list'].V),
list(data['edge_list'].W))
edge_list.sort()
return edge_list
@benchmark.implementation("separated_lists", "sort")
def sort_separated(data):
"""Sort edges using separated lists"""
U, V, W = list(data['U']), list(data['V']), list(data['W'])
# Sort by weight using indices
indices = sorted(range(len(W)), key=lambda i: W[i])
return ([U[i] for i in indices], [V[i] for i in indices], [W[i] for i in indices])
# Custom validators
@benchmark.validator("sort")
def validate_sort(expected, actual):
"""Validate that sort results are equivalent"""
# Convert both to comparable format
if hasattr(expected, 'W'): # EdgeListWeighted
expected_weights = expected.W
elif isinstance(expected, tuple): # separated lists
expected_weights = expected[2]
else: # list of tuples
expected_weights = [w for u, v, w in expected]
if hasattr(actual, 'W'): # EdgeListWeighted
actual_weights = actual.W
elif isinstance(actual, tuple): # separated lists
actual_weights = actual[2]
else: # list of tuples
actual_weights = [w for u, v, w in actual]
return expected_weights == actual_weights
@benchmark.validator("construction")
def validate_construction(expected, actual):
"""Validate construction results"""
# Just check that something was created and has the right size
if actual is None:
return False
# Check based on type
if isinstance(actual, list): # tuple list
return len(actual) > 0
elif hasattr(actual, 'M'): # EdgeListWeighted
return actual.M > 0
elif isinstance(actual, tuple): # separated lists
return len(actual[0]) > 0
return True
if __name__ == "__main__":
# Parse command line args and run appropriate mode
runner = benchmark.parse_args()
runner.run()