Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/setuptools/archive_util.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 """Utilities for extracting common archive formats""" | |
2 | |
3 | |
4 __all__ = [ | |
5 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", | |
6 "UnrecognizedFormat", "extraction_drivers", "unpack_directory", | |
7 ] | |
8 | |
9 import zipfile | |
10 import tarfile | |
11 import os | |
12 import shutil | |
13 import posixpath | |
14 import contextlib | |
15 from pkg_resources import ensure_directory, ContextualZipFile | |
16 from distutils.errors import DistutilsError | |
17 | |
18 class UnrecognizedFormat(DistutilsError): | |
19 """Couldn't recognize the archive type""" | |
20 | |
21 def default_filter(src,dst): | |
22 """The default progress/filter callback; returns True for all files""" | |
23 return dst | |
24 | |
25 | |
26 def unpack_archive(filename, extract_dir, progress_filter=default_filter, | |
27 drivers=None): | |
28 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` | |
29 | |
30 `progress_filter` is a function taking two arguments: a source path | |
31 internal to the archive ('/'-separated), and a filesystem path where it | |
32 will be extracted. The callback must return the desired extract path | |
33 (which may be the same as the one passed in), or else ``None`` to skip | |
34 that file or directory. The callback can thus be used to report on the | |
35 progress of the extraction, as well as to filter the items extracted or | |
36 alter their extraction paths. | |
37 | |
38 `drivers`, if supplied, must be a non-empty sequence of functions with the | |
39 same signature as this function (minus the `drivers` argument), that raise | |
40 ``UnrecognizedFormat`` if they do not support extracting the designated | |
41 archive type. The `drivers` are tried in sequence until one is found that | |
42 does not raise an error, or until all are exhausted (in which case | |
43 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of | |
44 drivers, the module's ``extraction_drivers`` constant will be used, which | |
45 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that | |
46 order. | |
47 """ | |
48 for driver in drivers or extraction_drivers: | |
49 try: | |
50 driver(filename, extract_dir, progress_filter) | |
51 except UnrecognizedFormat: | |
52 continue | |
53 else: | |
54 return | |
55 else: | |
56 raise UnrecognizedFormat( | |
57 "Not a recognized archive type: %s" % filename | |
58 ) | |
59 | |
60 | |
61 def unpack_directory(filename, extract_dir, progress_filter=default_filter): | |
62 """"Unpack" a directory, using the same interface as for archives | |
63 | |
64 Raises ``UnrecognizedFormat`` if `filename` is not a directory | |
65 """ | |
66 if not os.path.isdir(filename): | |
67 raise UnrecognizedFormat("%s is not a directory" % filename) | |
68 | |
69 paths = { | |
70 filename: ('', extract_dir), | |
71 } | |
72 for base, dirs, files in os.walk(filename): | |
73 src, dst = paths[base] | |
74 for d in dirs: | |
75 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) | |
76 for f in files: | |
77 target = os.path.join(dst, f) | |
78 target = progress_filter(src + f, target) | |
79 if not target: | |
80 # skip non-files | |
81 continue | |
82 ensure_directory(target) | |
83 f = os.path.join(base, f) | |
84 shutil.copyfile(f, target) | |
85 shutil.copystat(f, target) | |
86 | |
87 | |
88 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): | |
89 """Unpack zip `filename` to `extract_dir` | |
90 | |
91 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined | |
92 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation | |
93 of the `progress_filter` argument. | |
94 """ | |
95 | |
96 if not zipfile.is_zipfile(filename): | |
97 raise UnrecognizedFormat("%s is not a zip file" % (filename,)) | |
98 | |
99 with ContextualZipFile(filename) as z: | |
100 for info in z.infolist(): | |
101 name = info.filename | |
102 | |
103 # don't extract absolute paths or ones with .. in them | |
104 if name.startswith('/') or '..' in name.split('/'): | |
105 continue | |
106 | |
107 target = os.path.join(extract_dir, *name.split('/')) | |
108 target = progress_filter(name, target) | |
109 if not target: | |
110 continue | |
111 if name.endswith('/'): | |
112 # directory | |
113 ensure_directory(target) | |
114 else: | |
115 # file | |
116 ensure_directory(target) | |
117 data = z.read(info.filename) | |
118 with open(target, 'wb') as f: | |
119 f.write(data) | |
120 unix_attributes = info.external_attr >> 16 | |
121 if unix_attributes: | |
122 os.chmod(target, unix_attributes) | |
123 | |
124 | |
125 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): | |
126 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` | |
127 | |
128 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined | |
129 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation | |
130 of the `progress_filter` argument. | |
131 """ | |
132 try: | |
133 tarobj = tarfile.open(filename) | |
134 except tarfile.TarError: | |
135 raise UnrecognizedFormat( | |
136 "%s is not a compressed or uncompressed tar file" % (filename,) | |
137 ) | |
138 with contextlib.closing(tarobj): | |
139 # don't do any chowning! | |
140 tarobj.chown = lambda *args: None | |
141 for member in tarobj: | |
142 name = member.name | |
143 # don't extract absolute paths or ones with .. in them | |
144 if not name.startswith('/') and '..' not in name.split('/'): | |
145 prelim_dst = os.path.join(extract_dir, *name.split('/')) | |
146 | |
147 # resolve any links and to extract the link targets as normal | |
148 # files | |
149 while member is not None and (member.islnk() or member.issym()): | |
150 linkpath = member.linkname | |
151 if member.issym(): | |
152 base = posixpath.dirname(member.name) | |
153 linkpath = posixpath.join(base, linkpath) | |
154 linkpath = posixpath.normpath(linkpath) | |
155 member = tarobj._getmember(linkpath) | |
156 | |
157 if member is not None and (member.isfile() or member.isdir()): | |
158 final_dst = progress_filter(name, prelim_dst) | |
159 if final_dst: | |
160 if final_dst.endswith(os.sep): | |
161 final_dst = final_dst[:-1] | |
162 try: | |
163 # XXX Ugh | |
164 tarobj._extract_member(member, final_dst) | |
165 except tarfile.ExtractError: | |
166 # chown/chmod/mkfifo/mknode/makedev failed | |
167 pass | |
168 return True | |
169 | |
170 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile |