comparison venv/lib/python2.7/site-packages/click/_compat.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 import re
2 import io
3 import os
4 import sys
5 import codecs
6 from weakref import WeakKeyDictionary
7
8 import click
9
10
11 PY2 = sys.version_info[0] == 2
12 WIN = sys.platform.startswith('win')
13 DEFAULT_COLUMNS = 80
14
15
16 _ansi_re = re.compile('\033\[((?:\d|;)*)([a-zA-Z])')
17
18
19 def _find_unicode_literals_frame():
20 import __future__
21 frm = sys._getframe(1)
22 idx = 1
23 while frm is not None:
24 if frm.f_globals.get('__name__', '').startswith('click.'):
25 frm = frm.f_back
26 idx += 1
27 elif frm.f_code.co_flags & __future__.unicode_literals.compiler_flag:
28 return idx
29 else:
30 break
31 return 0
32
33
34 def _check_for_unicode_literals():
35 if not __debug__:
36 return
37 if not PY2 or click.disable_unicode_literals_warning:
38 return
39 bad_frame = _find_unicode_literals_frame()
40 if bad_frame <= 0:
41 return
42 from warnings import warn
43 warn(Warning('Click detected the use of the unicode_literals '
44 '__future__ import. This is heavily discouraged '
45 'because it can introduce subtle bugs in your '
46 'code. You should instead use explicit u"" literals '
47 'for your unicode strings. For more information see '
48 'http://click.pocoo.org/python3/'),
49 stacklevel=bad_frame)
50
51
52 def get_filesystem_encoding():
53 return sys.getfilesystemencoding() or sys.getdefaultencoding()
54
55
56 def _make_text_stream(stream, encoding, errors):
57 if encoding is None:
58 encoding = get_best_encoding(stream)
59 if errors is None:
60 errors = 'replace'
61 return _NonClosingTextIOWrapper(stream, encoding, errors,
62 line_buffering=True)
63
64
65 def is_ascii_encoding(encoding):
66 """Checks if a given encoding is ascii."""
67 try:
68 return codecs.lookup(encoding).name == 'ascii'
69 except LookupError:
70 return False
71
72
73 def get_best_encoding(stream):
74 """Returns the default stream encoding if not found."""
75 rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding()
76 if is_ascii_encoding(rv):
77 return 'utf-8'
78 return rv
79
80
81 class _NonClosingTextIOWrapper(io.TextIOWrapper):
82
83 def __init__(self, stream, encoding, errors, **extra):
84 self._stream = stream = _FixupStream(stream)
85 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
86
87 # The io module is a place where the Python 3 text behavior
88 # was forced upon Python 2, so we need to unbreak
89 # it to look like Python 2.
90 if PY2:
91 def write(self, x):
92 if isinstance(x, str) or is_bytes(x):
93 try:
94 self.flush()
95 except Exception:
96 pass
97 return self.buffer.write(str(x))
98 return io.TextIOWrapper.write(self, x)
99
100 def writelines(self, lines):
101 for line in lines:
102 self.write(line)
103
104 def __del__(self):
105 try:
106 self.detach()
107 except Exception:
108 pass
109
110 def isatty(self):
111 # https://bitbucket.org/pypy/pypy/issue/1803
112 return self._stream.isatty()
113
114
115 class _FixupStream(object):
116 """The new io interface needs more from streams than streams
117 traditionally implement. As such, this fix-up code is necessary in
118 some circumstances.
119 """
120
121 def __init__(self, stream):
122 self._stream = stream
123
124 def __getattr__(self, name):
125 return getattr(self._stream, name)
126
127 def read1(self, size):
128 f = getattr(self._stream, 'read1', None)
129 if f is not None:
130 return f(size)
131 # We only dispatch to readline instead of read in Python 2 as we
132 # do not want cause problems with the different implementation
133 # of line buffering.
134 if PY2:
135 return self._stream.readline(size)
136 return self._stream.read(size)
137
138 def readable(self):
139 x = getattr(self._stream, 'readable', None)
140 if x is not None:
141 return x()
142 try:
143 self._stream.read(0)
144 except Exception:
145 return False
146 return True
147
148 def writable(self):
149 x = getattr(self._stream, 'writable', None)
150 if x is not None:
151 return x()
152 try:
153 self._stream.write('')
154 except Exception:
155 try:
156 self._stream.write(b'')
157 except Exception:
158 return False
159 return True
160
161 def seekable(self):
162 x = getattr(self._stream, 'seekable', None)
163 if x is not None:
164 return x()
165 try:
166 self._stream.seek(self._stream.tell())
167 except Exception:
168 return False
169 return True
170
171
172 if PY2:
173 text_type = unicode
174 bytes = str
175 raw_input = raw_input
176 string_types = (str, unicode)
177 iteritems = lambda x: x.iteritems()
178 range_type = xrange
179
180 def is_bytes(x):
181 return isinstance(x, (buffer, bytearray))
182
183 _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
184
185 # For Windows, we need to force stdout/stdin/stderr to binary if it's
186 # fetched for that. This obviously is not the most correct way to do
187 # it as it changes global state. Unfortunately, there does not seem to
188 # be a clear better way to do it as just reopening the file in binary
189 # mode does not change anything.
190 #
191 # An option would be to do what Python 3 does and to open the file as
192 # binary only, patch it back to the system, and then use a wrapper
193 # stream that converts newlines. It's not quite clear what's the
194 # correct option here.
195 if WIN:
196 import msvcrt
197 def set_binary_mode(f):
198 try:
199 fileno = f.fileno()
200 except Exception:
201 pass
202 else:
203 msvcrt.setmode(fileno, os.O_BINARY)
204 return f
205 else:
206 set_binary_mode = lambda x: x
207
208 def isidentifier(x):
209 return _identifier_re.search(x) is not None
210
211 def get_binary_stdin():
212 return set_binary_mode(sys.stdin)
213
214 def get_binary_stdout():
215 return set_binary_mode(sys.stdout)
216
217 def get_binary_stderr():
218 return set_binary_mode(sys.stderr)
219
220 def get_text_stdin(encoding=None, errors=None):
221 return _make_text_stream(sys.stdin, encoding, errors)
222
223 def get_text_stdout(encoding=None, errors=None):
224 return _make_text_stream(sys.stdout, encoding, errors)
225
226 def get_text_stderr(encoding=None, errors=None):
227 return _make_text_stream(sys.stderr, encoding, errors)
228
229 def filename_to_ui(value):
230 if isinstance(value, bytes):
231 value = value.decode(get_filesystem_encoding(), 'replace')
232 return value
233 else:
234 import io
235 text_type = str
236 raw_input = input
237 string_types = (str,)
238 range_type = range
239 isidentifier = lambda x: x.isidentifier()
240 iteritems = lambda x: iter(x.items())
241
242 def is_bytes(x):
243 return isinstance(x, (bytes, memoryview, bytearray))
244
245 def _is_binary_reader(stream, default=False):
246 try:
247 return isinstance(stream.read(0), bytes)
248 except Exception:
249 return default
250 # This happens in some cases where the stream was already
251 # closed. In this case, we assume the default.
252
253 def _is_binary_writer(stream, default=False):
254 try:
255 stream.write(b'')
256 except Exception:
257 try:
258 stream.write('')
259 return False
260 except Exception:
261 pass
262 return default
263 return True
264
265 def _find_binary_reader(stream):
266 # We need to figure out if the given stream is already binary.
267 # This can happen because the official docs recommend detaching
268 # the streams to get binary streams. Some code might do this, so
269 # we need to deal with this case explicitly.
270 if _is_binary_reader(stream, False):
271 return stream
272
273 buf = getattr(stream, 'buffer', None)
274
275 # Same situation here; this time we assume that the buffer is
276 # actually binary in case it's closed.
277 if buf is not None and _is_binary_reader(buf, True):
278 return buf
279
280 def _find_binary_writer(stream):
281 # We need to figure out if the given stream is already binary.
282 # This can happen because the official docs recommend detatching
283 # the streams to get binary streams. Some code might do this, so
284 # we need to deal with this case explicitly.
285 if _is_binary_writer(stream, False):
286 return stream
287
288 buf = getattr(stream, 'buffer', None)
289
290 # Same situation here; this time we assume that the buffer is
291 # actually binary in case it's closed.
292 if buf is not None and _is_binary_writer(buf, True):
293 return buf
294
295 def _stream_is_misconfigured(stream):
296 """A stream is misconfigured if its encoding is ASCII."""
297 # If the stream does not have an encoding set, we assume it's set
298 # to ASCII. This appears to happen in certain unittest
299 # environments. It's not quite clear what the correct behavior is
300 # but this at least will force Click to recover somehow.
301 return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii')
302
303 def _is_compatible_text_stream(stream, encoding, errors):
304 stream_encoding = getattr(stream, 'encoding', None)
305 stream_errors = getattr(stream, 'errors', None)
306
307 # Perfect match.
308 if stream_encoding == encoding and stream_errors == errors:
309 return True
310
311 # Otherwise, it's only a compatible stream if we did not ask for
312 # an encoding.
313 if encoding is None:
314 return stream_encoding is not None
315
316 return False
317
318 def _force_correct_text_reader(text_reader, encoding, errors):
319 if _is_binary_reader(text_reader, False):
320 binary_reader = text_reader
321 else:
322 # If there is no target encoding set, we need to verify that the
323 # reader is not actually misconfigured.
324 if encoding is None and not _stream_is_misconfigured(text_reader):
325 return text_reader
326
327 if _is_compatible_text_stream(text_reader, encoding, errors):
328 return text_reader
329
330 # If the reader has no encoding, we try to find the underlying
331 # binary reader for it. If that fails because the environment is
332 # misconfigured, we silently go with the same reader because this
333 # is too common to happen. In that case, mojibake is better than
334 # exceptions.
335 binary_reader = _find_binary_reader(text_reader)
336 if binary_reader is None:
337 return text_reader
338
339 # At this point, we default the errors to replace instead of strict
340 # because nobody handles those errors anyways and at this point
341 # we're so fundamentally fucked that nothing can repair it.
342 if errors is None:
343 errors = 'replace'
344 return _make_text_stream(binary_reader, encoding, errors)
345
346 def _force_correct_text_writer(text_writer, encoding, errors):
347 if _is_binary_writer(text_writer, False):
348 binary_writer = text_writer
349 else:
350 # If there is no target encoding set, we need to verify that the
351 # writer is not actually misconfigured.
352 if encoding is None and not _stream_is_misconfigured(text_writer):
353 return text_writer
354
355 if _is_compatible_text_stream(text_writer, encoding, errors):
356 return text_writer
357
358 # If the writer has no encoding, we try to find the underlying
359 # binary writer for it. If that fails because the environment is
360 # misconfigured, we silently go with the same writer because this
361 # is too common to happen. In that case, mojibake is better than
362 # exceptions.
363 binary_writer = _find_binary_writer(text_writer)
364 if binary_writer is None:
365 return text_writer
366
367 # At this point, we default the errors to replace instead of strict
368 # because nobody handles those errors anyways and at this point
369 # we're so fundamentally fucked that nothing can repair it.
370 if errors is None:
371 errors = 'replace'
372 return _make_text_stream(binary_writer, encoding, errors)
373
374 def get_binary_stdin():
375 reader = _find_binary_reader(sys.stdin)
376 if reader is None:
377 raise RuntimeError('Was not able to determine binary '
378 'stream for sys.stdin.')
379 return reader
380
381 def get_binary_stdout():
382 writer = _find_binary_writer(sys.stdout)
383 if writer is None:
384 raise RuntimeError('Was not able to determine binary '
385 'stream for sys.stdout.')
386 return writer
387
388 def get_binary_stderr():
389 writer = _find_binary_writer(sys.stderr)
390 if writer is None:
391 raise RuntimeError('Was not able to determine binary '
392 'stream for sys.stderr.')
393 return writer
394
395 def get_text_stdin(encoding=None, errors=None):
396 return _force_correct_text_reader(sys.stdin, encoding, errors)
397
398 def get_text_stdout(encoding=None, errors=None):
399 return _force_correct_text_writer(sys.stdout, encoding, errors)
400
401 def get_text_stderr(encoding=None, errors=None):
402 return _force_correct_text_writer(sys.stderr, encoding, errors)
403
404 def filename_to_ui(value):
405 if isinstance(value, bytes):
406 value = value.decode(get_filesystem_encoding(), 'replace')
407 else:
408 value = value.encode('utf-8', 'surrogateescape') \
409 .decode('utf-8', 'replace')
410 return value
411
412
413 def get_streerror(e, default=None):
414 if hasattr(e, 'strerror'):
415 msg = e.strerror
416 else:
417 if default is not None:
418 msg = default
419 else:
420 msg = str(e)
421 if isinstance(msg, bytes):
422 msg = msg.decode('utf-8', 'replace')
423 return msg
424
425
426 def open_stream(filename, mode='r', encoding=None, errors='strict',
427 atomic=False):
428 # Standard streams first. These are simple because they don't need
429 # special handling for the atomic flag. It's entirely ignored.
430 if filename == '-':
431 if 'w' in mode:
432 if 'b' in mode:
433 return get_binary_stdout(), False
434 return get_text_stdout(encoding=encoding, errors=errors), False
435 if 'b' in mode:
436 return get_binary_stdin(), False
437 return get_text_stdin(encoding=encoding, errors=errors), False
438
439 # Non-atomic writes directly go out through the regular open functions.
440 if not atomic:
441 if encoding is None:
442 return open(filename, mode), True
443 return io.open(filename, mode, encoding=encoding, errors=errors), True
444
445 # Some usability stuff for atomic writes
446 if 'a' in mode:
447 raise ValueError(
448 'Appending to an existing file is not supported, because that '
449 'would involve an expensive `copy`-operation to a temporary '
450 'file. Open the file in normal `w`-mode and copy explicitly '
451 'if that\'s what you\'re after.'
452 )
453 if 'x' in mode:
454 raise ValueError('Use the `overwrite`-parameter instead.')
455 if 'w' not in mode:
456 raise ValueError('Atomic writes only make sense with `w`-mode.')
457
458 # Atomic writes are more complicated. They work by opening a file
459 # as a proxy in the same folder and then using the fdopen
460 # functionality to wrap it in a Python file. Then we wrap it in an
461 # atomic file that moves the file over on close.
462 import tempfile
463 fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename),
464 prefix='.__atomic-write')
465
466 if encoding is not None:
467 f = io.open(fd, mode, encoding=encoding, errors=errors)
468 else:
469 f = os.fdopen(fd, mode)
470
471 return _AtomicFile(f, tmp_filename, filename), True
472
473
474 # Used in a destructor call, needs extra protection from interpreter cleanup.
475 _rename = os.rename
476
477
478 class _AtomicFile(object):
479
480 def __init__(self, f, tmp_filename, real_filename):
481 self._f = f
482 self._tmp_filename = tmp_filename
483 self._real_filename = real_filename
484 self.closed = False
485
486 @property
487 def name(self):
488 return self._real_filename
489
490 def close(self, delete=False):
491 if self.closed:
492 return
493 self._f.close()
494 _rename(self._tmp_filename, self._real_filename)
495 self.closed = True
496
497 def __getattr__(self, name):
498 return getattr(self._f, name)
499
500 def __enter__(self):
501 return self
502
503 def __exit__(self, exc_type, exc_value, tb):
504 self.close(delete=exc_type is not None)
505
506 def __repr__(self):
507 return repr(self._f)
508
509
510 auto_wrap_for_ansi = None
511 colorama = None
512 get_winterm_size = None
513
514
515 def strip_ansi(value):
516 return _ansi_re.sub('', value)
517
518
519 def should_strip_ansi(stream=None, color=None):
520 if color is None:
521 if stream is None:
522 stream = sys.stdin
523 return not isatty(stream)
524 return not color
525
526
527 # If we're on Windows, we provide transparent integration through
528 # colorama. This will make ANSI colors through the echo function
529 # work automatically.
530 if WIN:
531 # Windows has a smaller terminal
532 DEFAULT_COLUMNS = 79
533
534 try:
535 import colorama
536 except ImportError:
537 pass
538 else:
539 _ansi_stream_wrappers = WeakKeyDictionary()
540
541 def auto_wrap_for_ansi(stream, color=None):
542 """This function wraps a stream so that calls through colorama
543 are issued to the win32 console API to recolor on demand. It
544 also ensures to reset the colors if a write call is interrupted
545 to not destroy the console afterwards.
546 """
547 try:
548 cached = _ansi_stream_wrappers.get(stream)
549 except Exception:
550 cached = None
551 if cached is not None:
552 return cached
553 strip = should_strip_ansi(stream, color)
554 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
555 rv = ansi_wrapper.stream
556 _write = rv.write
557
558 def _safe_write(s):
559 try:
560 return _write(s)
561 except:
562 ansi_wrapper.reset_all()
563 raise
564
565 rv.write = _safe_write
566 try:
567 _ansi_stream_wrappers[stream] = rv
568 except Exception:
569 pass
570 return rv
571
572 def get_winterm_size():
573 win = colorama.win32.GetConsoleScreenBufferInfo(
574 colorama.win32.STDOUT).srWindow
575 return win.Right - win.Left, win.Bottom - win.Top
576
577
578 def term_len(x):
579 return len(strip_ansi(x))
580
581
582 def isatty(stream):
583 try:
584 return stream.isatty()
585 except Exception:
586 return False
587
588
589 def _make_cached_stream_func(src_func, wrapper_func):
590 cache = WeakKeyDictionary()
591 def func():
592 stream = src_func()
593 try:
594 rv = cache.get(stream)
595 except Exception:
596 rv = None
597 if rv is not None:
598 return rv
599 rv = wrapper_func()
600 try:
601 cache[stream] = rv
602 except Exception:
603 pass
604 return rv
605 return func
606
607
608 _default_text_stdout = _make_cached_stream_func(
609 lambda: sys.stdout, get_text_stdout)
610 _default_text_stderr = _make_cached_stream_func(
611 lambda: sys.stderr, get_text_stderr)
612
613
614 binary_streams = {
615 'stdin': get_binary_stdin,
616 'stdout': get_binary_stdout,
617 'stderr': get_binary_stderr,
618 }
619
620 text_streams = {
621 'stdin': get_text_stdin,
622 'stdout': get_text_stdout,
623 'stderr': get_text_stderr,
624 }