Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/click/_compat.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 import re | |
2 import io | |
3 import os | |
4 import sys | |
5 import codecs | |
6 from weakref import WeakKeyDictionary | |
7 | |
8 import click | |
9 | |
10 | |
11 PY2 = sys.version_info[0] == 2 | |
12 WIN = sys.platform.startswith('win') | |
13 DEFAULT_COLUMNS = 80 | |
14 | |
15 | |
16 _ansi_re = re.compile('\033\[((?:\d|;)*)([a-zA-Z])') | |
17 | |
18 | |
19 def _find_unicode_literals_frame(): | |
20 import __future__ | |
21 frm = sys._getframe(1) | |
22 idx = 1 | |
23 while frm is not None: | |
24 if frm.f_globals.get('__name__', '').startswith('click.'): | |
25 frm = frm.f_back | |
26 idx += 1 | |
27 elif frm.f_code.co_flags & __future__.unicode_literals.compiler_flag: | |
28 return idx | |
29 else: | |
30 break | |
31 return 0 | |
32 | |
33 | |
34 def _check_for_unicode_literals(): | |
35 if not __debug__: | |
36 return | |
37 if not PY2 or click.disable_unicode_literals_warning: | |
38 return | |
39 bad_frame = _find_unicode_literals_frame() | |
40 if bad_frame <= 0: | |
41 return | |
42 from warnings import warn | |
43 warn(Warning('Click detected the use of the unicode_literals ' | |
44 '__future__ import. This is heavily discouraged ' | |
45 'because it can introduce subtle bugs in your ' | |
46 'code. You should instead use explicit u"" literals ' | |
47 'for your unicode strings. For more information see ' | |
48 'http://click.pocoo.org/python3/'), | |
49 stacklevel=bad_frame) | |
50 | |
51 | |
52 def get_filesystem_encoding(): | |
53 return sys.getfilesystemencoding() or sys.getdefaultencoding() | |
54 | |
55 | |
56 def _make_text_stream(stream, encoding, errors): | |
57 if encoding is None: | |
58 encoding = get_best_encoding(stream) | |
59 if errors is None: | |
60 errors = 'replace' | |
61 return _NonClosingTextIOWrapper(stream, encoding, errors, | |
62 line_buffering=True) | |
63 | |
64 | |
65 def is_ascii_encoding(encoding): | |
66 """Checks if a given encoding is ascii.""" | |
67 try: | |
68 return codecs.lookup(encoding).name == 'ascii' | |
69 except LookupError: | |
70 return False | |
71 | |
72 | |
73 def get_best_encoding(stream): | |
74 """Returns the default stream encoding if not found.""" | |
75 rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding() | |
76 if is_ascii_encoding(rv): | |
77 return 'utf-8' | |
78 return rv | |
79 | |
80 | |
81 class _NonClosingTextIOWrapper(io.TextIOWrapper): | |
82 | |
83 def __init__(self, stream, encoding, errors, **extra): | |
84 self._stream = stream = _FixupStream(stream) | |
85 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) | |
86 | |
87 # The io module is a place where the Python 3 text behavior | |
88 # was forced upon Python 2, so we need to unbreak | |
89 # it to look like Python 2. | |
90 if PY2: | |
91 def write(self, x): | |
92 if isinstance(x, str) or is_bytes(x): | |
93 try: | |
94 self.flush() | |
95 except Exception: | |
96 pass | |
97 return self.buffer.write(str(x)) | |
98 return io.TextIOWrapper.write(self, x) | |
99 | |
100 def writelines(self, lines): | |
101 for line in lines: | |
102 self.write(line) | |
103 | |
104 def __del__(self): | |
105 try: | |
106 self.detach() | |
107 except Exception: | |
108 pass | |
109 | |
110 def isatty(self): | |
111 # https://bitbucket.org/pypy/pypy/issue/1803 | |
112 return self._stream.isatty() | |
113 | |
114 | |
115 class _FixupStream(object): | |
116 """The new io interface needs more from streams than streams | |
117 traditionally implement. As such, this fix-up code is necessary in | |
118 some circumstances. | |
119 """ | |
120 | |
121 def __init__(self, stream): | |
122 self._stream = stream | |
123 | |
124 def __getattr__(self, name): | |
125 return getattr(self._stream, name) | |
126 | |
127 def read1(self, size): | |
128 f = getattr(self._stream, 'read1', None) | |
129 if f is not None: | |
130 return f(size) | |
131 # We only dispatch to readline instead of read in Python 2 as we | |
132 # do not want cause problems with the different implementation | |
133 # of line buffering. | |
134 if PY2: | |
135 return self._stream.readline(size) | |
136 return self._stream.read(size) | |
137 | |
138 def readable(self): | |
139 x = getattr(self._stream, 'readable', None) | |
140 if x is not None: | |
141 return x() | |
142 try: | |
143 self._stream.read(0) | |
144 except Exception: | |
145 return False | |
146 return True | |
147 | |
148 def writable(self): | |
149 x = getattr(self._stream, 'writable', None) | |
150 if x is not None: | |
151 return x() | |
152 try: | |
153 self._stream.write('') | |
154 except Exception: | |
155 try: | |
156 self._stream.write(b'') | |
157 except Exception: | |
158 return False | |
159 return True | |
160 | |
161 def seekable(self): | |
162 x = getattr(self._stream, 'seekable', None) | |
163 if x is not None: | |
164 return x() | |
165 try: | |
166 self._stream.seek(self._stream.tell()) | |
167 except Exception: | |
168 return False | |
169 return True | |
170 | |
171 | |
172 if PY2: | |
173 text_type = unicode | |
174 bytes = str | |
175 raw_input = raw_input | |
176 string_types = (str, unicode) | |
177 iteritems = lambda x: x.iteritems() | |
178 range_type = xrange | |
179 | |
180 def is_bytes(x): | |
181 return isinstance(x, (buffer, bytearray)) | |
182 | |
183 _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') | |
184 | |
185 # For Windows, we need to force stdout/stdin/stderr to binary if it's | |
186 # fetched for that. This obviously is not the most correct way to do | |
187 # it as it changes global state. Unfortunately, there does not seem to | |
188 # be a clear better way to do it as just reopening the file in binary | |
189 # mode does not change anything. | |
190 # | |
191 # An option would be to do what Python 3 does and to open the file as | |
192 # binary only, patch it back to the system, and then use a wrapper | |
193 # stream that converts newlines. It's not quite clear what's the | |
194 # correct option here. | |
195 if WIN: | |
196 import msvcrt | |
197 def set_binary_mode(f): | |
198 try: | |
199 fileno = f.fileno() | |
200 except Exception: | |
201 pass | |
202 else: | |
203 msvcrt.setmode(fileno, os.O_BINARY) | |
204 return f | |
205 else: | |
206 set_binary_mode = lambda x: x | |
207 | |
208 def isidentifier(x): | |
209 return _identifier_re.search(x) is not None | |
210 | |
211 def get_binary_stdin(): | |
212 return set_binary_mode(sys.stdin) | |
213 | |
214 def get_binary_stdout(): | |
215 return set_binary_mode(sys.stdout) | |
216 | |
217 def get_binary_stderr(): | |
218 return set_binary_mode(sys.stderr) | |
219 | |
220 def get_text_stdin(encoding=None, errors=None): | |
221 return _make_text_stream(sys.stdin, encoding, errors) | |
222 | |
223 def get_text_stdout(encoding=None, errors=None): | |
224 return _make_text_stream(sys.stdout, encoding, errors) | |
225 | |
226 def get_text_stderr(encoding=None, errors=None): | |
227 return _make_text_stream(sys.stderr, encoding, errors) | |
228 | |
229 def filename_to_ui(value): | |
230 if isinstance(value, bytes): | |
231 value = value.decode(get_filesystem_encoding(), 'replace') | |
232 return value | |
233 else: | |
234 import io | |
235 text_type = str | |
236 raw_input = input | |
237 string_types = (str,) | |
238 range_type = range | |
239 isidentifier = lambda x: x.isidentifier() | |
240 iteritems = lambda x: iter(x.items()) | |
241 | |
242 def is_bytes(x): | |
243 return isinstance(x, (bytes, memoryview, bytearray)) | |
244 | |
245 def _is_binary_reader(stream, default=False): | |
246 try: | |
247 return isinstance(stream.read(0), bytes) | |
248 except Exception: | |
249 return default | |
250 # This happens in some cases where the stream was already | |
251 # closed. In this case, we assume the default. | |
252 | |
253 def _is_binary_writer(stream, default=False): | |
254 try: | |
255 stream.write(b'') | |
256 except Exception: | |
257 try: | |
258 stream.write('') | |
259 return False | |
260 except Exception: | |
261 pass | |
262 return default | |
263 return True | |
264 | |
265 def _find_binary_reader(stream): | |
266 # We need to figure out if the given stream is already binary. | |
267 # This can happen because the official docs recommend detaching | |
268 # the streams to get binary streams. Some code might do this, so | |
269 # we need to deal with this case explicitly. | |
270 if _is_binary_reader(stream, False): | |
271 return stream | |
272 | |
273 buf = getattr(stream, 'buffer', None) | |
274 | |
275 # Same situation here; this time we assume that the buffer is | |
276 # actually binary in case it's closed. | |
277 if buf is not None and _is_binary_reader(buf, True): | |
278 return buf | |
279 | |
280 def _find_binary_writer(stream): | |
281 # We need to figure out if the given stream is already binary. | |
282 # This can happen because the official docs recommend detatching | |
283 # the streams to get binary streams. Some code might do this, so | |
284 # we need to deal with this case explicitly. | |
285 if _is_binary_writer(stream, False): | |
286 return stream | |
287 | |
288 buf = getattr(stream, 'buffer', None) | |
289 | |
290 # Same situation here; this time we assume that the buffer is | |
291 # actually binary in case it's closed. | |
292 if buf is not None and _is_binary_writer(buf, True): | |
293 return buf | |
294 | |
295 def _stream_is_misconfigured(stream): | |
296 """A stream is misconfigured if its encoding is ASCII.""" | |
297 # If the stream does not have an encoding set, we assume it's set | |
298 # to ASCII. This appears to happen in certain unittest | |
299 # environments. It's not quite clear what the correct behavior is | |
300 # but this at least will force Click to recover somehow. | |
301 return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii') | |
302 | |
303 def _is_compatible_text_stream(stream, encoding, errors): | |
304 stream_encoding = getattr(stream, 'encoding', None) | |
305 stream_errors = getattr(stream, 'errors', None) | |
306 | |
307 # Perfect match. | |
308 if stream_encoding == encoding and stream_errors == errors: | |
309 return True | |
310 | |
311 # Otherwise, it's only a compatible stream if we did not ask for | |
312 # an encoding. | |
313 if encoding is None: | |
314 return stream_encoding is not None | |
315 | |
316 return False | |
317 | |
318 def _force_correct_text_reader(text_reader, encoding, errors): | |
319 if _is_binary_reader(text_reader, False): | |
320 binary_reader = text_reader | |
321 else: | |
322 # If there is no target encoding set, we need to verify that the | |
323 # reader is not actually misconfigured. | |
324 if encoding is None and not _stream_is_misconfigured(text_reader): | |
325 return text_reader | |
326 | |
327 if _is_compatible_text_stream(text_reader, encoding, errors): | |
328 return text_reader | |
329 | |
330 # If the reader has no encoding, we try to find the underlying | |
331 # binary reader for it. If that fails because the environment is | |
332 # misconfigured, we silently go with the same reader because this | |
333 # is too common to happen. In that case, mojibake is better than | |
334 # exceptions. | |
335 binary_reader = _find_binary_reader(text_reader) | |
336 if binary_reader is None: | |
337 return text_reader | |
338 | |
339 # At this point, we default the errors to replace instead of strict | |
340 # because nobody handles those errors anyways and at this point | |
341 # we're so fundamentally fucked that nothing can repair it. | |
342 if errors is None: | |
343 errors = 'replace' | |
344 return _make_text_stream(binary_reader, encoding, errors) | |
345 | |
346 def _force_correct_text_writer(text_writer, encoding, errors): | |
347 if _is_binary_writer(text_writer, False): | |
348 binary_writer = text_writer | |
349 else: | |
350 # If there is no target encoding set, we need to verify that the | |
351 # writer is not actually misconfigured. | |
352 if encoding is None and not _stream_is_misconfigured(text_writer): | |
353 return text_writer | |
354 | |
355 if _is_compatible_text_stream(text_writer, encoding, errors): | |
356 return text_writer | |
357 | |
358 # If the writer has no encoding, we try to find the underlying | |
359 # binary writer for it. If that fails because the environment is | |
360 # misconfigured, we silently go with the same writer because this | |
361 # is too common to happen. In that case, mojibake is better than | |
362 # exceptions. | |
363 binary_writer = _find_binary_writer(text_writer) | |
364 if binary_writer is None: | |
365 return text_writer | |
366 | |
367 # At this point, we default the errors to replace instead of strict | |
368 # because nobody handles those errors anyways and at this point | |
369 # we're so fundamentally fucked that nothing can repair it. | |
370 if errors is None: | |
371 errors = 'replace' | |
372 return _make_text_stream(binary_writer, encoding, errors) | |
373 | |
374 def get_binary_stdin(): | |
375 reader = _find_binary_reader(sys.stdin) | |
376 if reader is None: | |
377 raise RuntimeError('Was not able to determine binary ' | |
378 'stream for sys.stdin.') | |
379 return reader | |
380 | |
381 def get_binary_stdout(): | |
382 writer = _find_binary_writer(sys.stdout) | |
383 if writer is None: | |
384 raise RuntimeError('Was not able to determine binary ' | |
385 'stream for sys.stdout.') | |
386 return writer | |
387 | |
388 def get_binary_stderr(): | |
389 writer = _find_binary_writer(sys.stderr) | |
390 if writer is None: | |
391 raise RuntimeError('Was not able to determine binary ' | |
392 'stream for sys.stderr.') | |
393 return writer | |
394 | |
395 def get_text_stdin(encoding=None, errors=None): | |
396 return _force_correct_text_reader(sys.stdin, encoding, errors) | |
397 | |
398 def get_text_stdout(encoding=None, errors=None): | |
399 return _force_correct_text_writer(sys.stdout, encoding, errors) | |
400 | |
401 def get_text_stderr(encoding=None, errors=None): | |
402 return _force_correct_text_writer(sys.stderr, encoding, errors) | |
403 | |
404 def filename_to_ui(value): | |
405 if isinstance(value, bytes): | |
406 value = value.decode(get_filesystem_encoding(), 'replace') | |
407 else: | |
408 value = value.encode('utf-8', 'surrogateescape') \ | |
409 .decode('utf-8', 'replace') | |
410 return value | |
411 | |
412 | |
413 def get_streerror(e, default=None): | |
414 if hasattr(e, 'strerror'): | |
415 msg = e.strerror | |
416 else: | |
417 if default is not None: | |
418 msg = default | |
419 else: | |
420 msg = str(e) | |
421 if isinstance(msg, bytes): | |
422 msg = msg.decode('utf-8', 'replace') | |
423 return msg | |
424 | |
425 | |
426 def open_stream(filename, mode='r', encoding=None, errors='strict', | |
427 atomic=False): | |
428 # Standard streams first. These are simple because they don't need | |
429 # special handling for the atomic flag. It's entirely ignored. | |
430 if filename == '-': | |
431 if 'w' in mode: | |
432 if 'b' in mode: | |
433 return get_binary_stdout(), False | |
434 return get_text_stdout(encoding=encoding, errors=errors), False | |
435 if 'b' in mode: | |
436 return get_binary_stdin(), False | |
437 return get_text_stdin(encoding=encoding, errors=errors), False | |
438 | |
439 # Non-atomic writes directly go out through the regular open functions. | |
440 if not atomic: | |
441 if encoding is None: | |
442 return open(filename, mode), True | |
443 return io.open(filename, mode, encoding=encoding, errors=errors), True | |
444 | |
445 # Some usability stuff for atomic writes | |
446 if 'a' in mode: | |
447 raise ValueError( | |
448 'Appending to an existing file is not supported, because that ' | |
449 'would involve an expensive `copy`-operation to a temporary ' | |
450 'file. Open the file in normal `w`-mode and copy explicitly ' | |
451 'if that\'s what you\'re after.' | |
452 ) | |
453 if 'x' in mode: | |
454 raise ValueError('Use the `overwrite`-parameter instead.') | |
455 if 'w' not in mode: | |
456 raise ValueError('Atomic writes only make sense with `w`-mode.') | |
457 | |
458 # Atomic writes are more complicated. They work by opening a file | |
459 # as a proxy in the same folder and then using the fdopen | |
460 # functionality to wrap it in a Python file. Then we wrap it in an | |
461 # atomic file that moves the file over on close. | |
462 import tempfile | |
463 fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename), | |
464 prefix='.__atomic-write') | |
465 | |
466 if encoding is not None: | |
467 f = io.open(fd, mode, encoding=encoding, errors=errors) | |
468 else: | |
469 f = os.fdopen(fd, mode) | |
470 | |
471 return _AtomicFile(f, tmp_filename, filename), True | |
472 | |
473 | |
474 # Used in a destructor call, needs extra protection from interpreter cleanup. | |
475 _rename = os.rename | |
476 | |
477 | |
478 class _AtomicFile(object): | |
479 | |
480 def __init__(self, f, tmp_filename, real_filename): | |
481 self._f = f | |
482 self._tmp_filename = tmp_filename | |
483 self._real_filename = real_filename | |
484 self.closed = False | |
485 | |
486 @property | |
487 def name(self): | |
488 return self._real_filename | |
489 | |
490 def close(self, delete=False): | |
491 if self.closed: | |
492 return | |
493 self._f.close() | |
494 _rename(self._tmp_filename, self._real_filename) | |
495 self.closed = True | |
496 | |
497 def __getattr__(self, name): | |
498 return getattr(self._f, name) | |
499 | |
500 def __enter__(self): | |
501 return self | |
502 | |
503 def __exit__(self, exc_type, exc_value, tb): | |
504 self.close(delete=exc_type is not None) | |
505 | |
506 def __repr__(self): | |
507 return repr(self._f) | |
508 | |
509 | |
510 auto_wrap_for_ansi = None | |
511 colorama = None | |
512 get_winterm_size = None | |
513 | |
514 | |
515 def strip_ansi(value): | |
516 return _ansi_re.sub('', value) | |
517 | |
518 | |
519 def should_strip_ansi(stream=None, color=None): | |
520 if color is None: | |
521 if stream is None: | |
522 stream = sys.stdin | |
523 return not isatty(stream) | |
524 return not color | |
525 | |
526 | |
527 # If we're on Windows, we provide transparent integration through | |
528 # colorama. This will make ANSI colors through the echo function | |
529 # work automatically. | |
530 if WIN: | |
531 # Windows has a smaller terminal | |
532 DEFAULT_COLUMNS = 79 | |
533 | |
534 try: | |
535 import colorama | |
536 except ImportError: | |
537 pass | |
538 else: | |
539 _ansi_stream_wrappers = WeakKeyDictionary() | |
540 | |
541 def auto_wrap_for_ansi(stream, color=None): | |
542 """This function wraps a stream so that calls through colorama | |
543 are issued to the win32 console API to recolor on demand. It | |
544 also ensures to reset the colors if a write call is interrupted | |
545 to not destroy the console afterwards. | |
546 """ | |
547 try: | |
548 cached = _ansi_stream_wrappers.get(stream) | |
549 except Exception: | |
550 cached = None | |
551 if cached is not None: | |
552 return cached | |
553 strip = should_strip_ansi(stream, color) | |
554 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) | |
555 rv = ansi_wrapper.stream | |
556 _write = rv.write | |
557 | |
558 def _safe_write(s): | |
559 try: | |
560 return _write(s) | |
561 except: | |
562 ansi_wrapper.reset_all() | |
563 raise | |
564 | |
565 rv.write = _safe_write | |
566 try: | |
567 _ansi_stream_wrappers[stream] = rv | |
568 except Exception: | |
569 pass | |
570 return rv | |
571 | |
572 def get_winterm_size(): | |
573 win = colorama.win32.GetConsoleScreenBufferInfo( | |
574 colorama.win32.STDOUT).srWindow | |
575 return win.Right - win.Left, win.Bottom - win.Top | |
576 | |
577 | |
578 def term_len(x): | |
579 return len(strip_ansi(x)) | |
580 | |
581 | |
582 def isatty(stream): | |
583 try: | |
584 return stream.isatty() | |
585 except Exception: | |
586 return False | |
587 | |
588 | |
589 def _make_cached_stream_func(src_func, wrapper_func): | |
590 cache = WeakKeyDictionary() | |
591 def func(): | |
592 stream = src_func() | |
593 try: | |
594 rv = cache.get(stream) | |
595 except Exception: | |
596 rv = None | |
597 if rv is not None: | |
598 return rv | |
599 rv = wrapper_func() | |
600 try: | |
601 cache[stream] = rv | |
602 except Exception: | |
603 pass | |
604 return rv | |
605 return func | |
606 | |
607 | |
608 _default_text_stdout = _make_cached_stream_func( | |
609 lambda: sys.stdout, get_text_stdout) | |
610 _default_text_stderr = _make_cached_stream_func( | |
611 lambda: sys.stderr, get_text_stderr) | |
612 | |
613 | |
614 binary_streams = { | |
615 'stdin': get_binary_stdin, | |
616 'stdout': get_binary_stdout, | |
617 'stderr': get_binary_stderr, | |
618 } | |
619 | |
620 text_streams = { | |
621 'stdin': get_text_stdin, | |
622 'stdout': get_text_stdout, | |
623 'stderr': get_text_stderr, | |
624 } |