From 4b52ce6650b68973fceea6bd464cdce3307c4e03 Mon Sep 17 00:00:00 2001 From: Marc Mance Date: Fri, 21 Mar 2025 13:26:14 -0400 Subject: [PATCH] update readme --- ioztat | 904 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 904 insertions(+) create mode 100755 ioztat diff --git a/ioztat b/ioztat new file mode 100755 index 0000000..8525600 --- /dev/null +++ b/ioztat @@ -0,0 +1,904 @@ +#!/usr/bin/env python3 +# Based on https://www.reddit.com/r/zfs/comments/s0gxp0/ok_i_made_it_tool_to_show_io_for_individual/ + +# BSD 2-Clause License +# +# Copyright (c) 2022, Openoid LLC, on behalf of the r/zfs community +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import copy +import math +import os +import re +import shutil +import signal +import sys +import time + +PROGRAM_VERSION = '2.0.2-dev' + +# We have little interesting to do with these signals - restore default handlers +# so our WIFSIGNALED() state propagates properly to the shell +signal.signal(signal.SIGINT, signal.SIG_DFL) + +# Docs warn against this, but we're not writing to anything but stdout/err. +# If we ever need to restore standard Python behaviour, catch BrokenPipeError in +# our print() calls, restore the handler and then re-raise the signal. +signal.signal(signal.SIGPIPE, signal.SIG_DFL) + +def fail(message): + """Print message to stderr and exit with a failure""" + print(message, file=sys.stderr) + sys.exit(1) + +# Platform specific section +# +# Must implement: +# +# DatasetDict = dict[dataset_name=str, timestamp=int|str, reads=int|str, +# writes=int|str, nread=int|str, nwritten=int|str, +# nunlinks=int|str, nunlinked=int|str] +# FetchDatasets(pools=Optional[list[str]]): Callable: Iterable[DatasetDict] +# +# i.e. a callable factory (likely a class) that takes an optional list of pools, +# and returns a callable that returns a new batch of dicts conforming to +# DatasetDict. +# +# timestamp is a nanosecond monotonic counter, e.g. time.monotonic_ns() +if sys.platform.startswith("linux"): + import glob + + class FetchDatasets: + def __init__(self, pools=None): + if pools: + patterns = map(glob.escape, pools) + else: + patterns = ["*"] + + self.globs = [ + os.path.join("/proc/spl/kstat/zfs", pattern, "objset-*") + for pattern in patterns + ] + + def __call__(self): + datasets = ( + self.parse_dataset(file) + for pattern in self.globs + for file in glob.glob(pattern) + ) + return list(filter(None, datasets)) + + @staticmethod + def parse_dataset(file): + """Parse a single ZFS objset file""" + # Shortened example of these files: + ############################################# + # 31 1 0x01 7 2160 6165792836 1634992995579 + # name type data + # dataset_name 7 rpool/ROOT/default + ############################################# + # Field 7 of the header is a nanosecond data snapshot timestamp. + # Conveniently, dataset names may not contain spaces. + try: + with open(file, "r", encoding="latin-1") as f: + header, _fieldnames, *fields = [line.split() for line in f] + fields.append(("timestamp", None, header[6])) + return { + field[0]: field[2] + for field in fields + if len(field) > 2 + } + except FileNotFoundError: + # Datasets may be destroyed between our globbing and opening + return None + + def preflight(): + try: + with open("/sys/module/zfs/version", "r", encoding="latin-1") as f: + version = f.read().rstrip() + match = re.match(r'^(\d+)\.(\d+)', version) + if not match: + raise ValueError(f"could not parse '{version}'") + + major, minor = int(match.group(1)), int(match.group(2)) + + if major == 0 and minor < 8: + fail(f"OpenZFS {major}.{minor} does not support dataset statistics. Please update to 0.8 or higher.") + except (ValueError, OSError) as e: + fail(f"Unable to determine OpenZFS version from /sys/module/zfs/version: {e}") + + preflight() + +elif sys.platform.startswith("freebsd"): + try: + # Attempt to use py-sysctl if available for best performance + import sysctl + + def fetch_sysctl(oids): + return (value for oid in oids for value in sysctl.filter(oid)) + + except ImportError: + import subprocess + from collections import namedtuple + + SysctlValue = namedtuple('SysctlValue', ['name', 'value']) + + def fetch_sysctl(oids): + r = subprocess.run(['/sbin/sysctl', '-e', '--', *oids], + capture_output=True, check=False, encoding='latin-1') + stats = (line.split("=", 2) for line in r.stdout.split("\n")) + return (SysctlValue(*nv) for nv in stats if len(nv) == 2) + + from collections import defaultdict + + class FetchDatasets: + def __init__(self, pools=None): + # We're interested in kstat.zfs.*.dataset.objset-*.* + if pools: + self.oids = tuple(f'kstat.zfs.{pool}.dataset.' for pool in pools) + else: + self.oids = ('kstat.zfs.',) + + def __call__(self): + timestamp = time.monotonic_ns() + datasets = defaultdict(lambda: {'timestamp': timestamp}) + + # Note objset ID's are only unique to individual pools + for ctl in fetch_sysctl(self.oids): + name = ctl.name.rsplit(".", 4) + if len(name) == 5 and name[2] == 'dataset': + _, pool, _, objset, oid = name + datasets[(pool, objset)][oid] = ctl.value + + return datasets.values() + + def preflight(): + match = re.match(r'(\d+)\.(\d+)', os.uname().release) + if not match: + fail(f"Unable to determine FreeBSD version from {os.uname().release}") + + major, minor = int(match.group(1)), int(match.group(2)) + + if major < 12 or (major == 12 and minor < 2): + fail(f"FreeBSD {major}.{minor} does not support dataset statistics. Please update to 12.2 or higher.") + + preflight() + +else: + fail("Unsupported platform: " + sys.platform) + + +################################################################################ +# Core type + +class Dataset: + """ + ZFS dataset statistics over a timespan in seconds + """ + name = '' + reads = 0 + nread = 0 + writes = 0 + nwritten = 0 + nunlinks = 0 + nunlinked = 0 + timespan = 0 + + def __init__(self, name=''): + self.name = name + + @classmethod + def from_dict(cls, data): + d = cls(data['dataset_name']) + d.reads = int(data['reads']) + d.nread = int(data['nread']) + d.writes = int(data['writes']) + d.nwritten = int(data['nwritten']) + d.nunlinks = int(data.get('nunlinks', 0)) + d.nunlinked = int(data.get('nunlinked', 0)) + d.timespan = int(data['timestamp']) / 1e9 + return d + + @property + def rareq_sz(self): + return self.nread / self.reads if self.reads else 0 + + @property + def wareq_sz(self): + return self.nwritten / self.writes if self.writes else 0 + + @property + def operations(self): + return self.reads + self.writes + + @property + def throughput(self): + return self.nread + self.nwritten + + def is_nonzero(self): + """True if this DatasetDiff has any non-zero deltas""" + return self.operations or self.throughput or self.nunlinks or self.nunlinked + + def per_second(self): + """ + Return a copy of Dataset with values normalized to per-second rates + """ + if self.timespan: + d = copy.copy(self) + d.reads /= self.timespan + d.nread /= self.timespan + d.writes /= self.timespan + d.nwritten /= self.timespan + d.nunlinks /= self.timespan + d.nunlinked /= self.timespan + d.timespan = 1.0 + return d + return self + + def __sub__(self, other): + """ + Return a new Dataset with this one's values subtracted from other + """ + d = copy.copy(self) + d.reads -= other.reads + d.nread -= other.nread + d.writes -= other.writes + d.nwritten -= other.nwritten + d.nunlinks -= other.nunlinks + d.nunlinked -= other.nunlinked + d.timespan -= other.timespan + return d + + def __add__(self, other): + """ + Return a new Dataset with this one's values added to other + + Note timespan is set to the maximum of the two values instead of being + added. + """ + d = copy.copy(self) + d.reads += other.reads + d.nread += other.nread + d.writes += other.writes + d.nwritten += other.nwritten + d.nunlinks += other.nunlinks + d.nunlinked += other.nunlinked + d.timespan = max(d.timespan, other.timespan) + return d + + +################################################################################ +# String formatting + +class Column: + """ + Formats (translates raw value to string) and optionally justifies (adjusts + the string to fit within a given width) a single column. + """ + def __init__(self, key, heading, width=0, format=str, just=None): + self.key = key + self.heading = heading + self.width = width + self.format = format + self.just = just + + def render(self, values): + return self.justify(str(self.format(values[self.key]))) + + def render_heading(self): + return self.justify(self.heading) + + def justify(self, string): + return self.just(string, self.width) if self.just else string + + +class ColumnGroup: + """ + A simplified column that justifies a string to span the set of Column + objects it is meant to cover. Does not support formatting. + + Width here is the size of separators not covered by the raw width of the + spanned columns themselves. + """ + def __init__(self, heading, columns, width=0, just=str.center): + self.heading = heading + self.columns = columns + self.width = width + self.just = just + + def group_width(self): + return self.width + sum((c.width for c in self.columns)) + + def render_heading(self): + return self.justify(self.heading) + + def justify(self, string): + return self.just(string, self.group_width()) if self.just else string + + +class ColumnFormatter: + """ + A buffered formatter for columnar data with optional group headings. + """ + def __init__(self, column_separator=' ', row_separator='-', buffer=None): + self.columns = [] + self.column_index = {} + self.groups = None + self.column_separator = column_separator + self.row_separator = row_separator + self.buffer = buffer if buffer is not None else [] + + def add_column(self, name, cls=Column, **args): + self.column_index[name] = len(self.columns) + self.columns.append(cls(name, **args)) + + def add_group(self, name='', colspan=1, just=str.center): + """Add a group heading that spans the previous colspan columns""" + if self.groups is None: + self.groups = ColumnFormatter(column_separator=self.column_separator, + buffer=self.buffer) + + self.groups.add_column(name, cls=ColumnGroup, columns=self.columns[-colspan:], + width=len(self.column_separator) * (colspan - 1), just=just) + + def set_column_width(self, key, width): + """Set the width of the column""" + self.columns[self.column_index[key]].width = width + + def get_printed_width(self, exclude=()): + """ + Return the printed width of columns and their separators, excluding the + keys in exclude. + """ + return sum((c.width + len(self.column_separator) + for c in self.columns + if c.key not in exclude)) + + def print_row(self, **data): + """Add a row of data to be formatted into the internal buffer""" + formatted = (column.render(data) for column in self.columns) + self.print(self.column_separator.join(formatted)) + + def print_header(self): + """Add headings and optionally group headings to the internal buffer""" + if self.groups: + self.groups.print_header() + + formatted = (column.render_heading() for column in self.columns) + self.print(self.column_separator.join(formatted)) + + def print_divider(self): + """ + Add a line of column_separator in the space given to each column to the + internal buffer. + """ + self.print( + self.column_separator.join( + [''.ljust(c.width, self.row_separator) for c in self.columns])) + + def print(self, string, end="\n"): + """ + Adds the specified string to the internal buffer. + + Must eventually be followed by a call to flush() + """ + self.buffer.append(str(string) + end) + + def flush(self, lines=None): + """Print the internal buffer, up to the optional lines limit.""" + buffer = ''.join(self.buffer) + if lines: + buffer = "\n".join(buffer.split("\n", lines)[:lines]) + print(buffer, end='') + self.buffer.clear() + + +class NumberToHuman: + """Number formatting functions""" + + SIZE_PREFIX = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') + FORMATS = ('{:.2f}{}', '{:.1f}{}', '{:.0f}{}') + + @classmethod + def formatter(cls, length, decimal=False): + """ + Return a function that will format a number to a shortened SI decimal or + binary form that fits within a given length. + """ + divisor = 1000 if decimal else 1024 + formats = cls.FORMATS + size_prefix = cls.SIZE_PREFIX + size_prefix_max = len(size_prefix) - 1 + powers = [pow(divisor, i) for i in range(len(size_prefix))] + + def fmt(num): + n = num + index = 0 + while n >= divisor and index < size_prefix_max: + n /= divisor + index += 1 + + u = size_prefix[index] + + if index == 0 or num % powers[index] == 0: + return str(int(n)) + u + + for fmt in formats: + ret = fmt.format(n, u) + if len(ret) <= length: + return ret + return ret + return fmt + + +class DatasetName: + """Dataset name formatting functions""" + + PATH_INDENT = ' ' + ELLIPSIS = '+' + + @classmethod + def shorten(cls, path, limit): + """Shorten a dataset name to fit within limit""" + if len(path) <= limit: + return path + + # Always display the pool name + components = path.split('/', 1) + ret = components[0] + if len(components) > 1: + last = components[1][-(limit - (len(ret) + len(cls.ELLIPSIS) + 1)):] + ret += '/' + cls.ELLIPSIS + last + return ret + + @classmethod + def _indent(cls, name, depth, limit): + """Indent name by depth PATH_INDENTs constrained by limit""" + if (len(cls.PATH_INDENT) * depth) + len(name) > limit: + return (cls.ELLIPSIS + name).rjust(limit - 1)[:limit] + return (cls.PATH_INDENT * depth) + name + + @classmethod + def indent(cls, name, last_path, limit): + """ + Take a name and a prior split path (or an empty list), returning a + tuple of the indented set of lines to print as a prefix up to this + component, the indented current component, and the current path leading + to this one to be passed to the next call to indent_format. + """ + cur_path = name.split('/') + common = os.path.commonprefix([last_path, cur_path]) + prefix = [] + for i, segment in enumerate(cur_path[len(common):-1]): + prefix.append(cls._indent(segment, len(common) + i, limit)) + last_segment = cls._indent(cur_path[-1], len(cur_path) - 1, limit) + return ("\n".join(prefix), last_segment, cur_path) + + @classmethod + def indent_len(cls, path): + """ + Calculate the maximum width of this path as rendered by print_format + """ + segments = path.split('/') + return (len(segments[0:-1]) * len(cls.PATH_INDENT)) + len(segments[-1]) + + +################################################################################ +# Argument processing, column configuration, and main loop + +SORTS = { + 'name': {'key': lambda x: x.name}, + 'operations': {'key': lambda x: x.operations, 'reverse': True}, + 'reads': {'key': lambda x: x.reads, 'reverse': True}, + 'writes': {'key': lambda x: x.writes, 'reverse': True}, + 'throughput': {'key': lambda x: x.throughput, 'reverse': True}, + 'nread': {'key': lambda x: x.nread, 'reverse': True}, + 'nwritten': {'key': lambda x: x.nwritten, 'reverse': True}, +} +SORT_DISPLAY = list(SORTS.keys()) +SORT_ALIAS = { + 'operations': ['io', 'ops', 'iops'], + 'reads': ['read', 'rps'], + 'writes': ['write', 'wps'], + 'throughput': ['bandwidth'], + 'nread': ['nreads', 'rmbps'], + 'nwritten': ['nwrite', 'nwrites', 'wmbps'], +} +SORTS.update({alias: SORTS[name] for name, aliases in SORT_ALIAS.items() for alias in aliases}) + + +def parse_args(): + """Parse command-line arguments list""" + parser = argparse.ArgumentParser(description='iostat for ZFS datasets', add_help=False) + parser.add_argument('dataset', type=str, nargs='*', help='ZFS dataset') + parser.add_argument(argparse.SUPPRESS, metavar='interval [count]', nargs='?', + help='seconds between reports and number of reports') + parser.add_argument('-c', dest='count', type=int, + help='number of reports generated') + parser.add_argument('-D', dest='decimal', action='store_true', + help='display size in decimal powers of 1000 instead of 1024') + parser.add_argument('-e', dest='exact', action='store_true', + help='display exact values without truncation or scaling') + parser.add_argument('-H', dest='scripted', action='store_true', + help='scripted mode, omit headers and tab-separate fields') + parser.add_argument('-h', '--help', action='help', + help='display this help message and exit') + parser.add_argument('-I', dest='per_interval', action='store_true', + help='display totals since the last report rather than averaged per-second') + parser.add_argument('-i', dest='interval', type=float, + help='interval between reports in seconds') + parser.add_argument('-N', dest='header_once', action='store_true', + help='display headers at most once') + parser.add_argument('-n', dest='non_recursive', action='store_true', + help='omit child datasets when filtering') + parser.add_argument('-o', dest='overwrite', action='store_true', + help='overwrite old reports in terminal') + group = parser.add_mutually_exclusive_group() + group.add_argument('-P', dest='fullname', action='store_true', default=None, + help='display dataset names on a single line') + group.add_argument('-p', dest='fullname', action='store_false', default=None, + help='display dataset names as an abbreviated tree') + parser.add_argument('-S', dest='sum_children', action='store_true', + help='include statistics for child datasets in parents') + parser.add_argument('-s', dest='sort', type=str.lower, default='name', + choices=SORTS.keys(), metavar='{%s}' % ','.join(SORT_DISPLAY), + help='sort by the specified field') + parser.add_argument('-T', dest='timestamp', choices=['u', 'd'], + help='prefix reports with a Unix timestamp or formatted date') + parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + PROGRAM_VERSION, + help='display version number and exit') + parser.add_argument('-x', dest='extended', action='count', default=0, + help='display extended statistics: once for average I/O size, \ + twice for unlink queue') + parser.add_argument('-y', dest='skip', action='store_const', default=0, const=1, + help='omit the initial "summary" report') + parser.add_argument('-z', dest='nonzero', action='store_true', + help='omit datasets with zero activity') + + args = parser.parse_args() + + # Handle [interval [count]] + def is_positive_float(value): + try: + value = float(value) + # We want to avoid parsing 'inf' and 'nan', as these are valid pool names + # NaN always compares False with another float, so just guard against inf + return value > 0.0 and not math.isinf(value) + except ValueError: + return False + + if len(args.dataset) > 0 and is_positive_float(args.dataset[-1]): + args.interval = float(args.dataset.pop()) + if len(args.dataset) > 0 and is_positive_float(args.dataset[-1]): + args.count = int(round(args.interval)) + args.interval = float(args.dataset.pop()) + + # Past here interval and count are either None or non-zero + if args.count is not None and not args.count > 0: + parser.error('count must be positive') + if args.interval is not None and not is_positive_float(args.interval): + parser.error('interval must be positive') + + # If there's a count but no interval, default to one second + if args.count and not args.interval: + args.interval = 1.0 + + # If neither are specified, default to one iteration and one second + if not (args.count or args.interval): + args.count = 1 + args.interval = 1.0 + + # Sanitize specified datasets according to ZFS rules. + # + # This helps defend against platform-specific code being passed arbitrary + # strings which might interact unpredictably with constructing paths or + # command lines, and also makes typos a bit more obvious to the user. + # + # Technically the regexp is enough, but try to provide helpful error messages + # for common errors. + dataset_pattern = re.compile(r'\A[a-zA-Z][a-zA-Z0-9_.:-]*(?:/[a-zA-Z0-9_.:-]+)*\Z') + for ds in args.dataset: + if ds.endswith('/'): + parser.error(f"trailing slash in dataset name: '{ds}'") + + if '//' in ds: + parser.error(f"empty component in dataset name: '{ds}'") + + if not dataset_pattern.match(ds): + parser.error(f"invalid dataset name: '{ds}'") + + # Enable full paths if we're not sorting by name or in nonzero mode, unless otherwise specified + if args.fullname is None: + args.fullname = args.sort != 'name' or args.scripted or args.exact or args.nonzero + + if args.overwrite and not sys.stdout.isatty(): + parser.error('overwrite mode only supported in a terminal') + + if args.extended > 1 and sys.platform.startswith("freebsd12"): + print("warning: unlinks statistics require FreeBSD 13+", file=sys.stderr) + + return args + + +def create_column_formatter(args): + """Create a ColumnFormatter instance configured for the given args""" + if args.scripted: + column_separator = "\t" + name_just = None + num_just = None + field_width = 0 + else: + column_separator = ' ' + name_just = str.ljust + num_just = str.rjust + field_width = 11 if args.exact else 5 + + if args.exact: + num_format = round + bytes_format = round + else: + num_format = NumberToHuman.formatter(length=field_width, decimal=True) + bytes_format = NumberToHuman.formatter(length=field_width, decimal=args.decimal) + + # Define our two types of data column from a shared base style + base = {'width': field_width, 'just': num_just} + num = {'format': num_format, **base} + byte = {'format': bytes_format, **base} + + formatter = ColumnFormatter(column_separator=column_separator, row_separator='-') + formatter.add_column('name', heading='dataset', just=name_just) + formatter.add_group() + formatter.add_column('reads', heading='read', **num) + formatter.add_column('writes', heading='write', **num) + formatter.add_group('operations', 2) + formatter.add_column('nread', heading='read', **byte) + formatter.add_column('nwritten', heading='write', **byte) + formatter.add_group('throughput', 2) + if args.extended > 0: + formatter.add_column('rareq_sz', heading='read', **byte) + formatter.add_column('wareq_sz', heading='write', **byte) + formatter.add_group('opsize', 2) + if args.extended > 1: + formatter.add_column('nunlinks', heading='queue', **num) + formatter.add_column('nunlinked', heading='done', **num) + formatter.add_group('unlinks', 2) + return formatter + + +class DatasetDiffIter: + """ + Take an argument object and create an iterator over lists of Dataset diffs + with appropriate filtering and sorting applied. + """ + def __init__(self, args): + # Configure a chain of iterators appropriate for our arguments + self.iter = self._diff_iter(args) + + if args.nonzero: + self.filter(lambda d: d.is_nonzero()) + + if args.sum_children: + self.apply(self._sum_children) + + if args.dataset: + # Cast to a faster structure for lookups + # In my testing a set is faster even for a single item. + args.dataset = set(args.dataset) + if args.non_recursive: + # Look only for the exact datasets specified + self.filter(lambda d: d.name in args.dataset) + else: + # Also look for any that start with those plus a slash + starts = tuple((ds + '/' for ds in args.dataset)) + self.filter(lambda d: d.name in args.dataset or d.name.startswith(starts)) + + if args.interval: + self.iter = self._interval_iter(self.iter, args.interval) + + for _ in range(args.skip): + next(self.iter, None) + + if args.count: + self.iter = self._take_iter(self.iter, args.count) + + if not args.per_interval: + self.map(lambda d: d.per_second()) + + if args.sort != 'name': + # Sorting by name first makes it a secondary sort field + self.apply(lambda diffs: sorted(sorted(diffs, **SORTS['name']), **SORTS[args.sort])) + else: + self.apply(lambda diffs: sorted(diffs, **SORTS['name'])) + + if not args.overwrite: + # Filter out empty iterations if we're in normal mode. + # We must coerce diffs into a list because this filters by the count, + # and it may be a generator at this point + self.iter = filter(None, map(list, self.iter)) + + # Ensure we return a list from each iteration + self.apply(list) + + def __iter__(self): + return self.iter + + def filter(self, operation): + """Filter diffs by operation""" + self.iter = (filter(operation, diffs) for diffs in self.iter) + + def map(self, operation): + """Apply operation to each diff""" + self.iter = (map(operation, diffs) for diffs in self.iter) + + def apply(self, operation): + """Apply operation to each set of diffs""" + self.iter = (operation(diffs) for diffs in self.iter) + + @classmethod + def _diff_iter(cls, args): + """Iterate over Iterable[Dataset] deltas""" + + pools = {dataset.split('/')[0] for dataset in args.dataset} + dataset_fetcher = FetchDatasets(pools) + + def fetch_datasets(): + return {d.name: d for d in + map(Dataset.from_dict, dataset_fetcher())} + + summary = fetch_datasets() + cls._validate_dataset_args(args, summary.keys()) + yield summary.values() + + prevdatasets = summary + for datasets in iter(fetch_datasets, None): + yield (datasets[key] - prevdatasets[key] + for key in datasets.keys() & prevdatasets.keys()) + prevdatasets = datasets + + @staticmethod + def _validate_dataset_args(args, found): + """ + If any specified datasets are not present in the found list, exit with + an error message. + """ + failed = False + for ds in args.dataset: + if ds in found: + continue + + children = any(os.path.commonpath((ds, path)) == ds for path in found) + if args.non_recursive: + if children: + # Accept unmounted datasets as arguments if we're going to + # sum child datasest into them + if args.sum_children: + continue + msg = 'mounted' + else: + msg = 'found' + print(f"dataset not {msg}: '{ds}'", file=sys.stderr) + failed = True + elif not children: + print(f"dataset not found: '{ds}'", file=sys.stderr) + failed = True + + if failed: + sys.exit(1) + + @staticmethod + def _sum_children(diffs): + """ + Return Iterable[Dataset] for Iterable[Dataset] having added child + datasets to parents, creating them if necessary. + """ + sums = {} + for diff in diffs: + for depth in range(diff.name.count('/') + 1): + path, *_ = diff.name.rsplit('/', depth) + if path in sums: + sums[path] += diff + else: + sums[path] = Dataset(path) + diff + + return sums.values() + + @staticmethod + def _interval_iter(it, interval): + """Yield an item from the provided Iterable every interval seconds""" + deadline = time.monotonic() + interval + for item in it: + yield item + sleep = deadline - time.monotonic() + if sleep > 0: + time.sleep(sleep) + else: + # We've fallen behind, possible due to being suspended, or the user + # has asked for an interval below our redraw time. + # Reset the interval, repeat the loop, and hope for better next time. + deadline = time.monotonic() + interval + deadline += interval + + @staticmethod + def _take_iter(it, count): + """Yield at most count items from Iterable""" + for _ in range(count): + yield next(it, None) + + +def main(): + args = parse_args() + formatter = create_column_formatter(args) + + stats_width = formatter.get_printed_width(exclude=('name')) + calc_name_width = len if args.fullname else DatasetName.indent_len + max_name_width = 0 + isatty = sys.stdout.isatty() + + for iteration, diff in enumerate(DatasetDiffIter(args)): + width, height = shutil.get_terminal_size() + width, height = width or 80, height or 24 + avail_width = width - stats_width + max_name_width = max(max_name_width, + max((calc_name_width(d.name) for d in diff), default=10)) + name_width = max(10, min([avail_width, max_name_width])) + formatter.set_column_width('name', name_width) + + if args.overwrite: + # Clear the screen and move the cursor to the upper-left + formatter.print("\033[2J\033[1;1H", end='') + + if iteration > 0 and not (args.scripted or args.overwrite): + formatter.print_divider() + + if args.timestamp == 'u': + formatter.print(int(time.time())) + elif args.timestamp == 'd': + formatter.print(time.strftime('%c')) + + if not args.scripted: + if (iteration == 0 or not args.header_once and + (args.overwrite or isatty and iteration % height == 0)): + formatter.print_header() + formatter.print_divider() + + last_path = [] + for d in diff: + if args.fullname: + name = d.name if args.exact else DatasetName.shorten(d.name, name_width) + else: + prefix, name, last_path = DatasetName.indent(d.name, last_path, name_width) + if prefix: + formatter.print(prefix) + + formatter.print_row(name=name, reads=d.reads, writes=d.writes, + nread=d.nread, nwritten=d.nwritten, + rareq_sz=d.rareq_sz, wareq_sz=d.wareq_sz, + nunlinks=d.nunlinks, nunlinked=d.nunlinked) + + formatter.flush(lines=height if args.overwrite else None) + + +if __name__ == '__main__': + main()