Source code for tea.utils.compress

__author__ = 'Viktor Kerkez <alefnula@gmail.com>'
__date__ = '07 January 2009'
__copyright__ = 'Copyright (c) 2009 Viktor Kerkez'

import os
import zipfile
import logging
from tea import shell
from tea.system import platform
from tea.process import execute_and_report as er

logger = logging.getLogger(__name__)


def _extract_file(archive, destination, filename):
    try:
        output_path = os.path.join(destination, filename)
        output_dir = os.path.dirname(output_path)
        if not os.path.isdir(output_dir):
            shell.mkdir(output_dir)
        # Cannot write big chunks of data to windows shares
        MAX_BYTES = 5242880  # 5MB
        reader = archive.open(filename)
        writer = open(output_path, 'wb')
        while True:
            data = reader.read(MAX_BYTES)
            if len(data) > 0:
                writer.write(data)
            else:
                break
        return True
    except:
        logger.exception('Error while unzipping filename %s from archive %s' %
                         (filename, archive.filename))
        return False


[docs]def unzip(archive, destination, filenames=None): """Unzip the a complete zip archive into destination directory, or unzip a specific file(s) from the archive. Usage: >>> output = os.path.join(os.getcwd(), 'output') >>> # Archive can be an instance of a ZipFile class >>> archive = zipfile.ZipFile('test.zip', 'r') >>> # Or just a filename >>> archive = 'test.zip' >>> # Extracts all files >>> unzip(archive, output) >>> # Extract only one file >>> unzip(archive, output, 'my_file.txt') >>> # Extract a list of files >>> unzip(archive, output, ['my_file1.txt', 'my_file2.txt']) >>> unzip_file('test.zip', 'my_file.txt', output) :type archive: :class:`zipfile.ZipFile` or :obj:`str` :param archive: Zipfile object to extract from or path to the zip archive. :type destination: str :param destination: Path to the output directory :type filenames: str, list or None :param filenames: Path(s) to the filename(s) inside the zip archive that you want to extract. """ close = False try: if not isinstance(archive, zipfile.ZipFile): archive = zipfile.ZipFile(archive, 'r', allowZip64=True) close = True logger.info('Extracting: %s -> %s' % (archive.filename, destination)) if isinstance(filenames, str): filenames = [filenames] if filenames is None: # extract all filenames = archive.namelist() for filename in filenames: if filename.endswith('/'): # it's a directory shell.mkdir(os.path.join(destination, filename)) else: if not _extract_file(archive, destination, filename): raise Exception() logger.info('Extracting zip archive "%s" succeeded' % archive.filename) return True except: logger.exception('Error while unzipping archive %s' % archive.filename) return False finally: if close: archive.close()
[docs]def mkzip(archive, items, mode='w', save_full_paths=False): """Recursively zip a directory :type archive: :class:`zipfile.ZipFile` or :obj:`str` :param archive: ZipFile object add to or path to the output zip archive. :type items: str or list :param items: Single item or list of items (files and directories) to be added to zipfile :type mode: str :param mode: w for create new and write a for append to :type save_full_paths: bool :param save_full_paths: preserve full paths """ close = False try: if not isinstance(archive, zipfile.ZipFile): archive = zipfile.ZipFile(archive, mode, allowZip64=True) close = True logger.info('mkdzip: Creating %s, from: %s', archive.filename, items) if isinstance(items, str): items = [items] for item in items: item = os.path.abspath(item) basename = os.path.basename(item) if os.path.isdir(item): for root, directoires, filenames in os.walk(item): for filename in filenames: path = os.path.join(root, filename) if save_full_paths: archive_path = path.encode('utf-8') else: archive_path = os.path.join( basename, path.replace(item, '').strip('\\/') ).encode('utf-8') archive.write(path, archive_path) elif os.path.isfile(item): if save_full_paths: archive_name = item.encode('utf-8') else: archive_name = basename.encode('utf-8') archive.write(item, archive_name) # , zipfile.ZIP_DEFLATED) return True except Exception as e: logger.error('Error occurred during mkzip: %s' % e) return False finally: if close: archive.close()
_SZ_EXECUTABLE = None def _get_sz(): global _SZ_EXECUTABLE if _SZ_EXECUTABLE is None: _SZ_EXECUTABLE = '7z' if platform.is_a(platform.WINDOWS): for pf in ('ProgramFiles', 'ProgramFiles(x86)', 'ProgramW6432'): executable = os.path.join(os.environ.get(pf, ''), '7-Zip', '7z.exe') if os.path.exists(executable): _SZ_EXECUTABLE = executable break return _SZ_EXECUTABLE
[docs]def seven_zip(archive, items, self_extracting=False): """Create a 7z archive""" if not isinstance(items, (list, tuple)): items = [items] if self_extracting: return er(_get_sz(), 'a', '-ssw', '-sfx', archive, *items) else: return er(_get_sz(), 'a', '-ssw', archive, *items)
[docs]def seven_unzip(archive, output): """Extract a 7z archive""" return er(_get_sz(), 'x', archive, '-o%s' % output, '-aoa')