cp-library

This documentation is automatically generated by online-judge-tools/verification-helper

View the Project on GitHub kobejean/cp-library

:warning: perf/grid.py

Depends on

Code

#!/usr/bin/env python3
"""
Simple Grid benchmark - minimal overhead, focused on core operations.
"""

import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from cp_library.perf.benchmark import Benchmark, BenchmarkConfig
from cp_library.perf.checksum import update_checksum
from cp_library.ds.grid.grid_cls import Grid

config = BenchmarkConfig(
    name="grid",
    sizes=[10000000, 1000000, 100000, 10000, 1000, 100],
    operations=['construction', 'random_access', 'row_access', 'sequential_access'],
    iterations=10,
    warmup=2,
    output_dir="./output/benchmark_results/grid"
)

benchmark = Benchmark(config)

@benchmark.data_generator("default")
def generate_data(size: int, _: str):
    H = int(size ** 0.5)
    W = size // H
    
    data = [[random.randint(1, 1000000) for _ in range(W)] for _ in range(H)]
    flat = [val for row in data for val in row]
    
    return {
        'grid': Grid(H, W, data),
        'lists': data,
        'flat': flat,
        'H': H, 'W': W,
        'coords': [(random.randint(0, H-1), random.randint(0, W-1)) for _ in range(min(100, size))]
    }

# Construction
@benchmark.implementation("grid", "construction")
def construction_grid(data):
    H, W = data['H'], data['W']
    grid = Grid(H, W, data['flat'])
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "construction")
def construction_lists(data):
    H, W = data['H'], data['W']
    lists = [row[:] for row in data['lists']]
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "construction")
def construction_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat'][:]
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, flat[i * W + j])
    return result

# Random access
@benchmark.implementation("grid", "random_access")
def random_access_grid(data):
    grid = data['grid']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "random_access")
def random_access_lists(data):
    lists = data['lists']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "random_access")
def random_access_flat(data):
    flat, W = data['flat'], data['W']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, flat[i * W + j])
    return result

# Row access
@benchmark.implementation("grid", "row_access")
def row_access_grid(data):
    H, W = data['H'], data['W']
    grid = data['grid']
    return sum(W for i in range(H) if grid[i])  # Count logical width

@benchmark.implementation("lists", "row_access")
def row_access_lists(data):
    H = data['H']
    lists = data['lists']
    return sum(len(lists[i]) for i in range(H))

@benchmark.implementation("flat", "row_access")
def row_access_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat']
    return sum(len(flat[i * W:(i + 1) * W]) for i in range(H))

# Sequential access
@benchmark.implementation("grid", "sequential_access")
def sequential_access_grid(data):
    H, W = data['H'], data['W']
    grid = data['grid']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "sequential_access")
def sequential_access_lists(data):
    H, W = data['H'], data['W']
    lists = data['lists']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "sequential_access")
def sequential_access_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, flat[i * W + j])
    return result

if __name__ == "__main__":
    # Parse command line args and run appropriate mode
    runner = benchmark.parse_args()
    runner.run()
#!/usr/bin/env python3
"""
Simple Grid benchmark - minimal overhead, focused on core operations.
"""

import random
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

"""
Declarative benchmark framework with minimal boilerplate.

Features:
- Decorator-based benchmark registration
- Automatic data generation and validation
- Built-in timing with warmup
- Configurable operations and sizes
- JSON results and matplotlib plotting
"""

import time
import json
import statistics
import argparse
from typing import Dict, List, Any, Callable, Union
from dataclasses import dataclass
from pathlib import Path
from collections import defaultdict

@dataclass
class BenchmarkConfig:
    """Configuration for benchmark runs"""
    name: str
    sizes: List[int] = None
    operations: List[str] = None
    iterations: int = 10
    warmup: int = 2
    output_dir: str = "./output/benchmark_results"
    save_results: bool = True
    plot_results: bool = True
    plot_scale: str = "loglog"  # Options: "loglog", "linear", "semilogx", "semilogy"
    progressive: bool = True  # Show results operation by operation across sizes
    # Profiling mode
    profile_mode: bool = False
    profile_size: int = None
    profile_operation: str = None
    profile_implementation: str = None
    
    def __post_init__(self):
        if self.sizes is None:
            self.sizes = [100, 1000, 10000, 100000]
        if self.operations is None:
            self.operations = ['default']

class Benchmark:
    """Declarative benchmark framework using decorators"""
    
    def __init__(self, config: BenchmarkConfig):
        self.config = config
        self.data_generators = {}
        self.implementations = {}
        self.validators = {}
        self.setups = {}
        self.results = []
    
    def profile(self, operation: str = None, size: int = None, implementation: str = None):
        """Create a profiling version of this benchmark"""
        profile_config = BenchmarkConfig(
            name=f"{self.config.name}_profile",
            sizes=self.config.sizes,
            operations=self.config.operations,
            profile_mode=True,
            profile_operation=operation,
            profile_size=size,
            profile_implementation=implementation,
            save_results=False,
            plot_results=False
        )
        
        profile_benchmark = Benchmark(profile_config)
        profile_benchmark.data_generators = self.data_generators
        profile_benchmark.implementations = self.implementations
        profile_benchmark.validators = self.validators
        profile_benchmark.setups = self.setups
        
        return profile_benchmark
    
    def parse_args(self):
        """Parse command line arguments for profiling mode"""
        parser = argparse.ArgumentParser(
            description=f"Benchmark {self.config.name} with optional profiling mode",
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog="""
Examples:
  # Normal benchmark mode
  python benchmark.py
  
  # Profile specific operation and implementation
  python benchmark.py --profile --operation random_access --implementation grid
  
  # Profile with specific size
  python benchmark.py --profile --size 1000000
  
  # Profile all implementations of an operation
  python benchmark.py --profile --operation construction
"""
        )
        
        parser.add_argument('--profile', action='store_true',
                          help='Run in profiling mode (minimal overhead for profilers)')
        parser.add_argument('--operation', type=str, 
                          help=f'Operation to profile. Options: {", ".join(self.config.operations)}')
        parser.add_argument('--size', type=int,
                          help=f'Size to profile. Options: {", ".join(map(str, self.config.sizes))}')
        parser.add_argument('--implementation', type=str,
                          help='Specific implementation to profile (default: all)')
        
        args = parser.parse_args()
        
        # If profile mode requested, return a profiling benchmark
        if args.profile:
            return self.profile(
                operation=args.operation,
                size=args.size,
                implementation=args.implementation
            )
        
        # Otherwise return self for normal mode
        return self
        
    def data_generator(self, name: str = "default"):
        """Decorator to register data generator"""
        def decorator(func):
            self.data_generators[name] = func
            return func
        return decorator
    
    def implementation(self, name: str, operations: Union[str, List[str]] = None):
        """Decorator to register implementation"""
        if operations is None:
            operations = ['default']
        elif isinstance(operations, str):
            operations = [operations]
            
        def decorator(func):
            for op in operations:
                if op not in self.implementations:
                    self.implementations[op] = {}
                self.implementations[op][name] = func
            return func
        return decorator
    
    def validator(self, operation: str = "default"):
        """Decorator to register custom validator"""
        def decorator(func):
            self.validators[operation] = func
            return func
        return decorator
    
    def setup(self, name: str, operations: Union[str, List[str]] = None):
        """Decorator to register setup function that runs before timing"""
        if operations is None:
            operations = ['default']
        elif isinstance(operations, str):
            operations = [operations]
            
        def decorator(func):
            for op in operations:
                if op not in self.setups:
                    self.setups[op] = {}
                self.setups[op][name] = func
            return func
        return decorator
    
    def measure_time(self, func: Callable, data: Any, setup_func: Callable = None) -> tuple[Any, float]:
        """Measure execution time with warmup and optional setup"""
        # Warmup runs
        for _ in range(self.config.warmup):
            try:
                if setup_func:
                    setup_data = setup_func(data)
                    func(setup_data)
                else:
                    func(data)
            except Exception:
                # If warmup fails, let the main measurement handle the error
                break
        
        # Actual measurement
        start = time.perf_counter()
        for _ in range(self.config.iterations):
            if setup_func:
                setup_data = setup_func(data)
                result = func(setup_data)
            else:
                result = func(data)
        elapsed_ms = (time.perf_counter() - start) * 1000 / self.config.iterations
        
        return result, elapsed_ms
    
    def validate_result(self, expected: Any, actual: Any, operation: str) -> bool:
        """Validate result using custom validator or default comparison"""
        if operation in self.validators:
            return self.validators[operation](expected, actual)
        return expected == actual
    
    def run(self):
        """Run all benchmarks"""
        if self.config.profile_mode:
            self._run_profile_mode()
        else:
            self._run_normal_mode()
    
    def _run_normal_mode(self):
        """Run normal benchmark mode"""
        print(f"Running {self.config.name}")
        print(f"Sizes: {self.config.sizes}")
        print(f"Operations: {self.config.operations}")
        print("="*80)
        
        # Always show progressive results: operation by operation across all sizes
        for operation in self.config.operations:
            for size in self.config.sizes:
                self._run_single(operation, size)
        
        # Save and plot results
        if self.config.save_results:
            self._save_results()
        
        if self.config.plot_results:
            self._plot_results()
        
        # Print summary
        self._print_summary()
    
    def _run_profile_mode(self):
        """Run profiling mode with minimal overhead for use with vmprof"""
        operation = self.config.profile_operation or self.config.operations[0]
        size = self.config.profile_size or max(self.config.sizes)
        impl_name = self.config.profile_implementation
        
        print(f"PROFILING MODE: {self.config.name}")
        print(f"Operation: {operation}, Size: {size}")
        if impl_name:
            print(f"Implementation: {impl_name}")
        print("="*80)
        print("Run with vmprof: vmprof --web " + ' '.join(sys.argv))
        print("="*80)
        
        # Generate test data
        generator = self.data_generators.get(operation, self.data_generators.get('default'))
        if not generator:
            raise ValueError(f"No data generator for operation: {operation}")
        
        test_data = generator(size, operation)
        
        # Get implementations
        impls = self.implementations.get(operation, {})
        if not impls:
            raise ValueError(f"No implementations for operation: {operation}")
        
        # Filter to specific implementation if requested
        if impl_name:
            if impl_name not in impls:
                raise ValueError(f"Implementation '{impl_name}' not found for operation '{operation}'")
            impls = {impl_name: impls[impl_name]}
        
        # Run with minimal overhead - no timing, no validation
        for name, func in impls.items():
            print(f"\nRunning {name}...")
            sys.stdout.flush()
            
            # Setup if needed
            setup_func = self.setups.get(operation, {}).get(name)
            if setup_func:
                data = setup_func(test_data)
            else:
                data = test_data
            
            # Run the actual function (this is what vmprof will profile)
            result = func(data)
            print(f"Completed {name}, result checksum: {result}")
            sys.stdout.flush()
    
    def _run_single(self, operation: str, size: int):
        """Run a single operation/size combination"""
        print(f"\nOperation: {operation}, Size: {size}")
        print("-" * 50)
        sys.stdout.flush()
        
        # Generate test data
        generator = self.data_generators.get(operation, 
                                           self.data_generators.get('default'))
        if not generator:
            raise ValueError(f"No data generator for operation: {operation}")
        
        test_data = generator(size, operation)
        
        # Get implementations for this operation
        impls = self.implementations.get(operation, {})
        if not impls:
            print(f"No implementations for operation: {operation}")
            return
        
        # Get setup functions for this operation
        setups = self.setups.get(operation, {})
        
        # Run reference implementation first
        ref_name, ref_impl = next(iter(impls.items()))
        ref_setup = setups.get(ref_name)
        expected_result, _ = self.measure_time(ref_impl, test_data, ref_setup)
        
        # Run all implementations
        for impl_name, impl_func in impls.items():
            try:
                setup_func = setups.get(impl_name)
                result, time_ms = self.measure_time(impl_func, test_data, setup_func)
                correct = self.validate_result(expected_result, result, operation)
                
                # Store result
                self.results.append({
                    'operation': operation,
                    'size': size,
                    'implementation': impl_name,
                    'time_ms': time_ms,
                    'correct': correct,
                    'error': None
                })
                
                status = "OK" if correct else "FAIL"
                print(f"  {impl_name:<20} {time_ms:>8.3f} ms  {status}")
                sys.stdout.flush()
                
            except Exception as e:
                self.results.append({
                    'operation': operation,
                    'size': size,
                    'implementation': impl_name,
                    'time_ms': float('inf'),
                    'correct': False,
                    'error': str(e)
                })
                print(f"  {impl_name:<20} ERROR: {str(e)[:40]}")
                sys.stdout.flush()
    
    def _save_results(self):
        """Save results to JSON"""
        output_dir = Path(self.config.output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        
        filename = output_dir / f"{self.config.name}_{int(time.time())}.json"
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\nResults saved to {filename}")
    
    def _plot_results(self):
        """Generate plots using matplotlib if available"""
        try:
            import matplotlib.pyplot as plt
            
            output_dir = Path(self.config.output_dir)
            output_dir.mkdir(parents=True, exist_ok=True)
            
            # Group and prepare data for plotting
            data_by_op = self._group_results_by_operation()
            
            # Create plots for each operation
            for operation, operation_data in data_by_op.items():
                self._create_performance_plot(plt, operation, operation_data, output_dir)
                
        except ImportError:
            print("Matplotlib not available - skipping plots")
        except Exception as e:
            print(f"Plotting failed: {e}")
    
    def _group_results_by_operation(self) -> Dict[str, Dict[int, List[Dict[str, Any]]]]:
        """Group results by operation and size for plotting"""
        data_by_op = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['time_ms'] != float('inf') and r['correct']:
                data_by_op[r['operation']][r['size']].append({
                    'implementation': r['implementation'],
                    'time_ms': r['time_ms']
                })
        return data_by_op
    
    def _create_performance_plot(self, plt, operation: str, operation_data: Dict[int, List[Dict[str, Any]]], output_dir: Path):
        """Create a performance plot for a single operation"""
        sizes = sorted(operation_data.keys())
        implementations = set()
        for size_data in operation_data.values():
            for entry in size_data:
                implementations.add(entry['implementation'])
        
        implementations = sorted(implementations)
        
        plt.figure(figsize=(10, 6))
        for impl in implementations:
            impl_times = []
            impl_sizes = []
            for size in sizes:
                times = [entry['time_ms'] for entry in operation_data[size] 
                        if entry['implementation'] == impl]
                if times:
                    impl_times.append(statistics.mean(times))
                    impl_sizes.append(size)
            
            if impl_times:
                plt.plot(impl_sizes, impl_times, 'o-', label=impl)
        
        plt.xlabel('Input Size')
        plt.ylabel('Time (ms)')
        plt.title(f'{self.config.name} - {operation} Operation')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # Apply the configured scaling
        if self.config.plot_scale == "loglog":
            plt.loglog()
        elif self.config.plot_scale == "linear":
            pass  # Default linear scale
        elif self.config.plot_scale == "semilogx":
            plt.semilogx()
        elif self.config.plot_scale == "semilogy":
            plt.semilogy()
        else:
            # Default to loglog if invalid option
            plt.loglog()
        
        plot_file = output_dir / f"{self.config.name}_{operation}_performance.png"
        plt.savefig(plot_file, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"Plot saved: {plot_file}")
    
    def _print_summary(self):
        """Print performance summary"""
        print("\n" + "="*80)
        print("PERFORMANCE SUMMARY")
        print("="*80)
        
        # Group by operation
        by_operation = defaultdict(lambda: defaultdict(list))
        for r in self.results:
            if r['error'] is None and r['time_ms'] != float('inf'):
                by_operation[r['operation']][r['implementation']].append(r['time_ms'])
        
        print(f"{'Operation':<15} {'Best Implementation':<20} {'Avg Time (ms)':<15} {'Speedup':<10}")
        print("-" * 70)
        
        for op, impl_times in sorted(by_operation.items()):
            # Calculate averages
            avg_times = [(impl, statistics.mean(times)) 
                        for impl, times in impl_times.items()]
            avg_times.sort(key=lambda x: x[1])
            
            if avg_times:
                best_impl, best_time = avg_times[0]
                worst_time = avg_times[-1][1]
                speedup = worst_time / best_time if best_time > 0 else 0
                
                print(f"{op:<15} {best_impl:<20} {best_time:<15.3f} {speedup:<10.1f}x")


"""
Minimal checksum utilities for benchmark validation.
"""

def update_checksum(current: int, value: int) -> int:
    """Update checksum with a single value."""
    return (current * 31 + value) & 0xFFFFFFFF
'''
╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸
             https://kobejean.github.io/cp-library               
'''
import typing
from collections import deque
from numbers import Number
from types import GenericAlias 
from typing import Callable, Collection, Iterator, Union
from io import BytesIO, IOBase


class FastIO(IOBase):
    BUFSIZE = 8192
    newlines = 0

    def __init__(self, file):
        self._fd = file.fileno()
        self.buffer = BytesIO()
        self.writable = "x" in file.mode or "r" not in file.mode
        self.write = self.buffer.write if self.writable else None

    def read(self):
        BUFSIZE = self.BUFSIZE
        while True:
            b = os.read(self._fd, max(os.fstat(self._fd).st_size, BUFSIZE))
            if not b: break
            ptr = self.buffer.tell()
            self.buffer.seek(0, 2), self.buffer.write(b), self.buffer.seek(ptr)
        self.newlines = 0
        return self.buffer.read()

    def readline(self):
        BUFSIZE = self.BUFSIZE
        while self.newlines == 0:
            b = os.read(self._fd, max(os.fstat(self._fd).st_size, BUFSIZE))
            self.newlines = b.count(b"\n") + (not b)
            ptr = self.buffer.tell()
            self.buffer.seek(0, 2), self.buffer.write(b), self.buffer.seek(ptr)
        self.newlines -= 1
        return self.buffer.readline()

    def flush(self):
        if self.writable:
            os.write(self._fd, self.buffer.getvalue())
            self.buffer.truncate(0), self.buffer.seek(0)


class IOWrapper(IOBase):
    stdin: 'IOWrapper' = None
    stdout: 'IOWrapper' = None
    
    def __init__(self, file):
        self.buffer = FastIO(file)
        self.flush = self.buffer.flush
        self.writable = self.buffer.writable

    def write(self, s):
        return self.buffer.write(s.encode("ascii"))
    
    def read(self):
        return self.buffer.read().decode("ascii")
    
    def readline(self):
        return self.buffer.readline().decode("ascii")
try:
    sys.stdin = IOWrapper.stdin = IOWrapper(sys.stdin)
    sys.stdout = IOWrapper.stdout = IOWrapper(sys.stdout)
except:
    pass
from typing import TypeVar
_S = TypeVar('S')
_T = TypeVar('T')
_U = TypeVar('U')

class TokenStream(Iterator):
    stream = IOWrapper.stdin

    def __init__(self):
        self.queue = deque()

    def __next__(self):
        if not self.queue: self.queue.extend(self._line())
        return self.queue.popleft()
    
    def wait(self):
        if not self.queue: self.queue.extend(self._line())
        while self.queue: yield
 
    def _line(self):
        return TokenStream.stream.readline().split()

    def line(self):
        if self.queue:
            A = list(self.queue)
            self.queue.clear()
            return A
        return self._line()
TokenStream.default = TokenStream()

class CharStream(TokenStream):
    def _line(self):
        return TokenStream.stream.readline().rstrip()
CharStream.default = CharStream()

ParseFn = Callable[[TokenStream],_T]
class Parser:
    def __init__(self, spec: Union[type[_T],_T]):
        self.parse = Parser.compile(spec)

    def __call__(self, ts: TokenStream) -> _T:
        return self.parse(ts)
    
    @staticmethod
    def compile_type(cls: type[_T], args = ()) -> _T:
        if issubclass(cls, Parsable):
            return cls.compile(*args)
        elif issubclass(cls, (Number, str)):
            def parse(ts: TokenStream): return cls(next(ts))              
            return parse
        elif issubclass(cls, tuple):
            return Parser.compile_tuple(cls, args)
        elif issubclass(cls, Collection):
            return Parser.compile_collection(cls, args)
        elif callable(cls):
            def parse(ts: TokenStream):
                return cls(next(ts))              
            return parse
        else:
            raise NotImplementedError()
    
    @staticmethod
    def compile(spec: Union[type[_T],_T]=int) -> ParseFn[_T]:
        if isinstance(spec, (type, GenericAlias)):
            cls = typing.get_origin(spec) or spec
            args = typing.get_args(spec) or tuple()
            return Parser.compile_type(cls, args)
        elif isinstance(offset := spec, Number): 
            cls = type(spec)  
            def parse(ts: TokenStream): return cls(next(ts)) + offset
            return parse
        elif isinstance(args := spec, tuple):      
            return Parser.compile_tuple(type(spec), args)
        elif isinstance(args := spec, Collection):
            return Parser.compile_collection(type(spec), args)
        elif isinstance(fn := spec, Callable): 
            def parse(ts: TokenStream): return fn(next(ts))
            return parse
        else:
            raise NotImplementedError()

    @staticmethod
    def compile_line(cls: _T, spec=int) -> ParseFn[_T]:
        if spec is int:
            fn = Parser.compile(spec)
            def parse(ts: TokenStream): return cls([int(token) for token in ts.line()])
            return parse
        else:
            fn = Parser.compile(spec)
            def parse(ts: TokenStream): return cls([fn(ts) for _ in ts.wait()])
            return parse

    @staticmethod
    def compile_repeat(cls: _T, spec, N) -> ParseFn[_T]:
        fn = Parser.compile(spec)
        def parse(ts: TokenStream): return cls([fn(ts) for _ in range(N)])
        return parse

    @staticmethod
    def compile_children(cls: _T, specs) -> ParseFn[_T]:
        fns = tuple((Parser.compile(spec) for spec in specs))
        def parse(ts: TokenStream): return cls([fn(ts) for fn in fns])  
        return parse
            
    @staticmethod
    def compile_tuple(cls: type[_T], specs) -> ParseFn[_T]:
        if isinstance(specs, (tuple,list)) and len(specs) == 2 and specs[1] is ...:
            return Parser.compile_line(cls, specs[0])
        else:
            return Parser.compile_children(cls, specs)

    @staticmethod
    def compile_collection(cls, specs):
        if not specs or len(specs) == 1 or isinstance(specs, set):
            return Parser.compile_line(cls, *specs)
        elif (isinstance(specs, (tuple,list)) and len(specs) == 2 and isinstance(specs[1], int)):
            return Parser.compile_repeat(cls, specs[0], specs[1])
        else:
            raise NotImplementedError()

class Parsable:
    @classmethod
    def compile(cls):
        def parser(ts: TokenStream): return cls(next(ts))
        return parser
    
    @classmethod
    def __class_getitem__(cls, item):
        return GenericAlias(cls, item)

from typing import Generic



def list_find(lst: list, value, start = 0, stop = sys.maxsize):
    try:
        return lst.index(value, start, stop)
    except:
        return -1


class view(Generic[_T]):
    __slots__ = 'A', 'l', 'r'
    def __init__(V, A: list[_T], l: int, r: int): V.A, V.l, V.r = A, l, r
    def __len__(V): return V.r - V.l
    def __getitem__(V, i: int): 
        if 0 <= i < V.r - V.l: return V.A[V.l+i]
        else: raise IndexError
    def __setitem__(V, i: int, v: _T): V.A[V.l+i] = v
    def __contains__(V, v: _T): return list_find(V.A, v, V.l, V.r) != -1
    def set_range(V, l: int, r: int): V.l, V.r = l, r
    def index(V, v: _T): return V.A.index(v, V.l, V.r) - V.l
    def reverse(V):
        l, r = V.l, V.r-1
        while l < r: V.A[l], V.A[r] = V.A[r], V.A[l]; l += 1; r -= 1
    def sort(V, /, *args, **kwargs):
        A = V.A[V.l:V.r]; A.sort(*args, **kwargs)
        for i,a in enumerate(A,V.l): V.A[i] = a
    def pop(V): V.r -= 1; return V.A[V.r]
    def append(V, v: _T): V.A[V.r] = v; V.r += 1
    def popleft(V): V.l += 1; return V.A[V.l-1]
    def appendleft(V, v: _T): V.l -= 1; V.A[V.l] = v; 
    def validate(V): return 0 <= V.l <= V.r <= len(V.A)



class Packer:
    __slots__ = 's', 'm'
    def __init__(P, mx: int): P.s = mx.bit_length(); P.m = (1 << P.s) - 1
    def enc(P, a: int, b: int): return a << P.s | b
    def dec(P, x: int) -> tuple[int, int]: return x >> P.s, x & P.m
    def enumerate(P, A, reverse=False): P.ienumerate(A:=list(A), reverse); return A
    def ienumerate(P, A, reverse=False):
        if reverse:
            for i,a in enumerate(A): A[i] = P.enc(-a, i)
        else:
            for i,a in enumerate(A): A[i] = P.enc(a, i)
    def indices(P, A: list[int]): P.iindices(A:=list(A)); return A
    def iindices(P, A):
        for i,a in enumerate(A): A[i] = P.m&a


class Grid(Generic[_T], Parsable):
    __slots__ = 'pkr', 'size', 'H', 'W', 'A'
    def __init__(G, H: int, W: int, A: Union[_T, list[_T], list[list[_T]]], pkr = None):
        G.pkr = pkr or Packer(W-1); G.size = H << G.pkr.s; G.H, G.W = H, W
        if isinstance(A, list):
            if isinstance(A[0], list):
                G.A = [A[0][0]]*G.size
                for i in range(H):
                    ii = i << G.pkr.s
                    for j in range(W): G.A[ii|j] = A[i][j]
            elif len(A) < G.size:
                G.A = [A[0]]*G.size
                for i in range(H):
                    ii = i << G.pkr.s
                    for j in range(W): G.A[ii|j] = A[i*W+j]
            else:
                G.A = A
        else:
            G.A = [A] * G.size
    def __len__(G): return G.H
    def __getitem__(G, i: int): 
        if 0 <= i < G.H: return view(G.A, i<<G.pkr.s, (i+1)<<G.pkr.s)
        else: raise IndexError
    def __call__(G, i: int, j: int): return G.A[G.pkr.enc(i,j)]
    def set(G, i: int, j: int, v: _T): G.A[G.pkr.enc(i,j)] = v

    @classmethod
    def compile(cls, H: int, W: int, T: type = int):
        pkr = Packer(W-1); size = H << pkr.s
        if T is int:
            def parse(ts: TokenStream):
                A = [0]*size
                for i in range(H):
                    for j,s in ts.line(): A[pkr.enc(i,j)] = int(s)
                return cls(H, W, A, pkr)
        elif T is str:
            def parse(ts: TokenStream):
                A = ['']*size
                for i in range(H):
                    for j,s in ts.line(): A[pkr.enc(i,j)] = s
                return cls(H, W, A, pkr)
        else:
            elm = Parser.compile(T)
            def parse(ts: TokenStream):
                A = [None]*size
                for i in range(H):
                    for j in range(W): A[pkr.enc(i,j)] = elm(ts)
                return cls(H, W, A, pkr)
        return parse

config = BenchmarkConfig(
    name="grid",
    sizes=[10000000, 1000000, 100000, 10000, 1000, 100],
    operations=['construction', 'random_access', 'row_access', 'sequential_access'],
    iterations=10,
    warmup=2,
    output_dir="./output/benchmark_results/grid"
)

benchmark = Benchmark(config)

@benchmark.data_generator("default")
def generate_data(size: int, _: str):
    H = int(size ** 0.5)
    W = size // H
    
    data = [[random.randint(1, 1000000) for _ in range(W)] for _ in range(H)]
    flat = [val for row in data for val in row]
    
    return {
        'grid': Grid(H, W, data),
        'lists': data,
        'flat': flat,
        'H': H, 'W': W,
        'coords': [(random.randint(0, H-1), random.randint(0, W-1)) for _ in range(min(100, size))]
    }

# Construction
@benchmark.implementation("grid", "construction")
def construction_grid(data):
    H, W = data['H'], data['W']
    grid = Grid(H, W, data['flat'])
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "construction")
def construction_lists(data):
    H, W = data['H'], data['W']
    lists = [row[:] for row in data['lists']]
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "construction")
def construction_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat'][:]
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, flat[i * W + j])
    return result

# Random access
@benchmark.implementation("grid", "random_access")
def random_access_grid(data):
    grid = data['grid']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "random_access")
def random_access_lists(data):
    lists = data['lists']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "random_access")
def random_access_flat(data):
    flat, W = data['flat'], data['W']
    result = 0
    for i, j in data['coords']:
        result = update_checksum(result, flat[i * W + j])
    return result

# Row access
@benchmark.implementation("grid", "row_access")
def row_access_grid(data):
    H, W = data['H'], data['W']
    grid = data['grid']
    return sum(W for i in range(H) if grid[i])  # Count logical width

@benchmark.implementation("lists", "row_access")
def row_access_lists(data):
    H = data['H']
    lists = data['lists']
    return sum(len(lists[i]) for i in range(H))

@benchmark.implementation("flat", "row_access")
def row_access_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat']
    return sum(len(flat[i * W:(i + 1) * W]) for i in range(H))

# Sequential access
@benchmark.implementation("grid", "sequential_access")
def sequential_access_grid(data):
    H, W = data['H'], data['W']
    grid = data['grid']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, grid(i, j))
    return result

@benchmark.implementation("lists", "sequential_access")
def sequential_access_lists(data):
    H, W = data['H'], data['W']
    lists = data['lists']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, lists[i][j])
    return result

@benchmark.implementation("flat", "sequential_access")
def sequential_access_flat(data):
    H, W = data['H'], data['W']
    flat = data['flat']
    result = 0
    for i in range(H):
        for j in range(W):
            result = update_checksum(result, flat[i * W + j])
    return result

if __name__ == "__main__":
    # Parse command line args and run appropriate mode
    runner = benchmark.parse_args()
    runner.run()
Back to top page