Source code for arandomness.argparse.Open

#! /usr/bin/env python

"""Detects and opens compressed files for reading or writing

Copyright:
    open.py  detects and opens compressed files for reading and writing
    Copyright (C) 2017  Alex Hyer

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

import argparse
from importlib import import_module
from inspect import getfullargspec
from os import linesep
from warnings import warn


__author__ = 'Alex Hyer'
__credit__ = 'Lauritz V. Thaulow'
__email__ = 'theonehyer@gmail.com'
__license__ = 'GPLv3'
__maintainer__ = 'Alex Hyer'
__status__ = 'Production/Stable'
__version__ = '1.0.0'


[docs]class Open(argparse.Action): """Argparse Action that detects and opens compressed files for rw Attributes: option_strings (list): list of str giving command line flags that call this action dest (str): Namespace reference to value mode (str): mode to pass to (de)compression algorithm nargs (bool): True if multiple arguments specified **kwargs (various): optional arguments to pass to argparse and algo """ def __init__(self, option_strings, dest, mode='rb', nargs=None, **kwargs): """Initialize class and spawn self as Base Class w/o nargs Warns: ImportError: if Open cannot import a compression library, it warns the user that it cannot open the corresponding file type Raises: ValueError: if nargs is not None, Open does not accept nargs """ # Only accept a single value to analyze if nargs is not None: raise ValueError('nargs not allowed for Open') # Call self again but without nargs super(Open, self).__init__(option_strings, dest) # Store and establish variables used in __call__ self.kwargs = kwargs self.mode = mode.lower().strip() self.modules = {} modules_to_import = { 'bz2': 'BZ2File', 'gzip': 'GzipFile', 'lzma': 'LZMAFile' } # Dynamically import compression libraries and warn about failures for mod, _class in modules_to_import.items(): try: self.modules[_class] = getattr(import_module(mod), _class) except (ImportError, AttributeError) as e: self.modules[_class] = open warn('Cannot process {0} files due to following error:' '{1}{2}{1}You will need to install the {0} library to ' 'properly use these files. Currently, such files will ' 'open in text mode.'.format(mod, linesep, e)) # Credits: https://stackoverflow.com/questions/13044562/ # python-mechanism-to-identify-compressed-file-type-and-uncompress
[docs] def __call__(self, parser, namespace, value, option_string=None, **kwargs): """Detects and opens compressed files Args: parser (ArgumentParser): parser used to generate values namespace (Namespace): namespace to set values for value (str): actual value specified by user option_string (str): argument flag used to call this function **kwargs (various): optional arguments later passed to the compression algorithm """ filename = value # For readability algo = open # Default to plaintext # Capture any mode that isn't read, such as write or append if self.mode.lstrip('U')[0] != 'r': algo_map = { 'bz2': self.modules['BZ2File'], 'gz': self.modules['GzipFile'], 'xz': self.modules['LZMAFile'] } # Base compression algorithm on file extension ext = value.split('.')[-1] try: algo = algo_map[ext] except KeyError: pass # Basically read mode else: file_sigs = { b'\x42\x5a\x68': self.modules['BZ2File'], b'\x1f\x8b\x08': self.modules['GzipFile'], b'\xfd7zXZ\x00': self.modules['LZMAFile'] } max_len = max(len(x) for x in file_sigs.keys()) # Check beginning of file for signature with open(filename, 'rb') as in_handle: start = in_handle.read(max_len) for sig in file_sigs.keys(): if start.startswith(sig): algo = file_sigs[sig] break # Filter all **kwargs by the args accepted by the compression algo algo_args = set(getfullargspec(algo).args) good_args = set(self.kwargs.keys()).intersection(algo_args) _kwargs = {arg: self.kwargs[arg] for arg in good_args} # Open the file using parameters defined above and store in namespace handle = algo(value, mode=self.mode, **_kwargs) setattr(namespace, self.dest, handle)