comparison venv/lib/python2.7/site-packages/setuptools/archive_util.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 """Utilities for extracting common archive formats"""
2
3
4 __all__ = [
5 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
6 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
7 ]
8
9 import zipfile
10 import tarfile
11 import os
12 import shutil
13 import posixpath
14 import contextlib
15 from pkg_resources import ensure_directory, ContextualZipFile
16 from distutils.errors import DistutilsError
17
18 class UnrecognizedFormat(DistutilsError):
19 """Couldn't recognize the archive type"""
20
21 def default_filter(src,dst):
22 """The default progress/filter callback; returns True for all files"""
23 return dst
24
25
26 def unpack_archive(filename, extract_dir, progress_filter=default_filter,
27 drivers=None):
28 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
29
30 `progress_filter` is a function taking two arguments: a source path
31 internal to the archive ('/'-separated), and a filesystem path where it
32 will be extracted. The callback must return the desired extract path
33 (which may be the same as the one passed in), or else ``None`` to skip
34 that file or directory. The callback can thus be used to report on the
35 progress of the extraction, as well as to filter the items extracted or
36 alter their extraction paths.
37
38 `drivers`, if supplied, must be a non-empty sequence of functions with the
39 same signature as this function (minus the `drivers` argument), that raise
40 ``UnrecognizedFormat`` if they do not support extracting the designated
41 archive type. The `drivers` are tried in sequence until one is found that
42 does not raise an error, or until all are exhausted (in which case
43 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
44 drivers, the module's ``extraction_drivers`` constant will be used, which
45 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
46 order.
47 """
48 for driver in drivers or extraction_drivers:
49 try:
50 driver(filename, extract_dir, progress_filter)
51 except UnrecognizedFormat:
52 continue
53 else:
54 return
55 else:
56 raise UnrecognizedFormat(
57 "Not a recognized archive type: %s" % filename
58 )
59
60
61 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
62 """"Unpack" a directory, using the same interface as for archives
63
64 Raises ``UnrecognizedFormat`` if `filename` is not a directory
65 """
66 if not os.path.isdir(filename):
67 raise UnrecognizedFormat("%s is not a directory" % filename)
68
69 paths = {
70 filename: ('', extract_dir),
71 }
72 for base, dirs, files in os.walk(filename):
73 src, dst = paths[base]
74 for d in dirs:
75 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
76 for f in files:
77 target = os.path.join(dst, f)
78 target = progress_filter(src + f, target)
79 if not target:
80 # skip non-files
81 continue
82 ensure_directory(target)
83 f = os.path.join(base, f)
84 shutil.copyfile(f, target)
85 shutil.copystat(f, target)
86
87
88 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
89 """Unpack zip `filename` to `extract_dir`
90
91 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
92 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
93 of the `progress_filter` argument.
94 """
95
96 if not zipfile.is_zipfile(filename):
97 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
98
99 with ContextualZipFile(filename) as z:
100 for info in z.infolist():
101 name = info.filename
102
103 # don't extract absolute paths or ones with .. in them
104 if name.startswith('/') or '..' in name.split('/'):
105 continue
106
107 target = os.path.join(extract_dir, *name.split('/'))
108 target = progress_filter(name, target)
109 if not target:
110 continue
111 if name.endswith('/'):
112 # directory
113 ensure_directory(target)
114 else:
115 # file
116 ensure_directory(target)
117 data = z.read(info.filename)
118 with open(target, 'wb') as f:
119 f.write(data)
120 unix_attributes = info.external_attr >> 16
121 if unix_attributes:
122 os.chmod(target, unix_attributes)
123
124
125 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
126 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
127
128 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
129 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
130 of the `progress_filter` argument.
131 """
132 try:
133 tarobj = tarfile.open(filename)
134 except tarfile.TarError:
135 raise UnrecognizedFormat(
136 "%s is not a compressed or uncompressed tar file" % (filename,)
137 )
138 with contextlib.closing(tarobj):
139 # don't do any chowning!
140 tarobj.chown = lambda *args: None
141 for member in tarobj:
142 name = member.name
143 # don't extract absolute paths or ones with .. in them
144 if not name.startswith('/') and '..' not in name.split('/'):
145 prelim_dst = os.path.join(extract_dir, *name.split('/'))
146
147 # resolve any links and to extract the link targets as normal
148 # files
149 while member is not None and (member.islnk() or member.issym()):
150 linkpath = member.linkname
151 if member.issym():
152 base = posixpath.dirname(member.name)
153 linkpath = posixpath.join(base, linkpath)
154 linkpath = posixpath.normpath(linkpath)
155 member = tarobj._getmember(linkpath)
156
157 if member is not None and (member.isfile() or member.isdir()):
158 final_dst = progress_filter(name, prelim_dst)
159 if final_dst:
160 if final_dst.endswith(os.sep):
161 final_dst = final_dst[:-1]
162 try:
163 # XXX Ugh
164 tarobj._extract_member(member, final_dst)
165 except tarfile.ExtractError:
166 # chown/chmod/mkfifo/mknode/makedev failed
167 pass
168 return True
169
170 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile