Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/click/testing.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 import os | |
2 import sys | |
3 import shutil | |
4 import tempfile | |
5 import contextlib | |
6 | |
7 from ._compat import iteritems, PY2 | |
8 | |
9 | |
10 # If someone wants to vendor click, we want to ensure the | |
11 # correct package is discovered. Ideally we could use a | |
12 # relative import here but unfortunately Python does not | |
13 # support that. | |
14 clickpkg = sys.modules[__name__.rsplit('.', 1)[0]] | |
15 | |
16 | |
17 if PY2: | |
18 from cStringIO import StringIO | |
19 else: | |
20 import io | |
21 from ._compat import _find_binary_reader | |
22 | |
23 | |
24 class EchoingStdin(object): | |
25 | |
26 def __init__(self, input, output): | |
27 self._input = input | |
28 self._output = output | |
29 | |
30 def __getattr__(self, x): | |
31 return getattr(self._input, x) | |
32 | |
33 def _echo(self, rv): | |
34 self._output.write(rv) | |
35 return rv | |
36 | |
37 def read(self, n=-1): | |
38 return self._echo(self._input.read(n)) | |
39 | |
40 def readline(self, n=-1): | |
41 return self._echo(self._input.readline(n)) | |
42 | |
43 def readlines(self): | |
44 return [self._echo(x) for x in self._input.readlines()] | |
45 | |
46 def __iter__(self): | |
47 return iter(self._echo(x) for x in self._input) | |
48 | |
49 def __repr__(self): | |
50 return repr(self._input) | |
51 | |
52 | |
53 def make_input_stream(input, charset): | |
54 # Is already an input stream. | |
55 if hasattr(input, 'read'): | |
56 if PY2: | |
57 return input | |
58 rv = _find_binary_reader(input) | |
59 if rv is not None: | |
60 return rv | |
61 raise TypeError('Could not find binary reader for input stream.') | |
62 | |
63 if input is None: | |
64 input = b'' | |
65 elif not isinstance(input, bytes): | |
66 input = input.encode(charset) | |
67 if PY2: | |
68 return StringIO(input) | |
69 return io.BytesIO(input) | |
70 | |
71 | |
72 class Result(object): | |
73 """Holds the captured result of an invoked CLI script.""" | |
74 | |
75 def __init__(self, runner, output_bytes, exit_code, exception, | |
76 exc_info=None): | |
77 #: The runner that created the result | |
78 self.runner = runner | |
79 #: The output as bytes. | |
80 self.output_bytes = output_bytes | |
81 #: The exit code as integer. | |
82 self.exit_code = exit_code | |
83 #: The exception that happend if one did. | |
84 self.exception = exception | |
85 #: The traceback | |
86 self.exc_info = exc_info | |
87 | |
88 @property | |
89 def output(self): | |
90 """The output as unicode string.""" | |
91 return self.output_bytes.decode(self.runner.charset, 'replace') \ | |
92 .replace('\r\n', '\n') | |
93 | |
94 def __repr__(self): | |
95 return '<Result %s>' % ( | |
96 self.exception and repr(self.exception) or 'okay', | |
97 ) | |
98 | |
99 | |
100 class CliRunner(object): | |
101 """The CLI runner provides functionality to invoke a Click command line | |
102 script for unittesting purposes in a isolated environment. This only | |
103 works in single-threaded systems without any concurrency as it changes the | |
104 global interpreter state. | |
105 | |
106 :param charset: the character set for the input and output data. This is | |
107 UTF-8 by default and should not be changed currently as | |
108 the reporting to Click only works in Python 2 properly. | |
109 :param env: a dictionary with environment variables for overriding. | |
110 :param echo_stdin: if this is set to `True`, then reading from stdin writes | |
111 to stdout. This is useful for showing examples in | |
112 some circumstances. Note that regular prompts | |
113 will automatically echo the input. | |
114 """ | |
115 | |
116 def __init__(self, charset=None, env=None, echo_stdin=False): | |
117 if charset is None: | |
118 charset = 'utf-8' | |
119 self.charset = charset | |
120 self.env = env or {} | |
121 self.echo_stdin = echo_stdin | |
122 | |
123 def get_default_prog_name(self, cli): | |
124 """Given a command object it will return the default program name | |
125 for it. The default is the `name` attribute or ``"root"`` if not | |
126 set. | |
127 """ | |
128 return cli.name or 'root' | |
129 | |
130 def make_env(self, overrides=None): | |
131 """Returns the environment overrides for invoking a script.""" | |
132 rv = dict(self.env) | |
133 if overrides: | |
134 rv.update(overrides) | |
135 return rv | |
136 | |
137 @contextlib.contextmanager | |
138 def isolation(self, input=None, env=None, color=False): | |
139 """A context manager that sets up the isolation for invoking of a | |
140 command line tool. This sets up stdin with the given input data | |
141 and `os.environ` with the overrides from the given dictionary. | |
142 This also rebinds some internals in Click to be mocked (like the | |
143 prompt functionality). | |
144 | |
145 This is automatically done in the :meth:`invoke` method. | |
146 | |
147 .. versionadded:: 4.0 | |
148 The ``color`` parameter was added. | |
149 | |
150 :param input: the input stream to put into sys.stdin. | |
151 :param env: the environment overrides as dictionary. | |
152 :param color: whether the output should contain color codes. The | |
153 application can still override this explicitly. | |
154 """ | |
155 input = make_input_stream(input, self.charset) | |
156 | |
157 old_stdin = sys.stdin | |
158 old_stdout = sys.stdout | |
159 old_stderr = sys.stderr | |
160 | |
161 env = self.make_env(env) | |
162 | |
163 if PY2: | |
164 sys.stdout = sys.stderr = bytes_output = StringIO() | |
165 if self.echo_stdin: | |
166 input = EchoingStdin(input, bytes_output) | |
167 else: | |
168 bytes_output = io.BytesIO() | |
169 if self.echo_stdin: | |
170 input = EchoingStdin(input, bytes_output) | |
171 input = io.TextIOWrapper(input, encoding=self.charset) | |
172 sys.stdout = sys.stderr = io.TextIOWrapper( | |
173 bytes_output, encoding=self.charset) | |
174 | |
175 sys.stdin = input | |
176 | |
177 def visible_input(prompt=None): | |
178 sys.stdout.write(prompt or '') | |
179 val = input.readline().rstrip('\r\n') | |
180 sys.stdout.write(val + '\n') | |
181 sys.stdout.flush() | |
182 return val | |
183 | |
184 def hidden_input(prompt=None): | |
185 sys.stdout.write((prompt or '') + '\n') | |
186 sys.stdout.flush() | |
187 return input.readline().rstrip('\r\n') | |
188 | |
189 def _getchar(echo): | |
190 char = sys.stdin.read(1) | |
191 if echo: | |
192 sys.stdout.write(char) | |
193 sys.stdout.flush() | |
194 return char | |
195 | |
196 default_color = color | |
197 def should_strip_ansi(stream=None, color=None): | |
198 if color is None: | |
199 return not default_color | |
200 return not color | |
201 | |
202 old_visible_prompt_func = clickpkg.termui.visible_prompt_func | |
203 old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func | |
204 old__getchar_func = clickpkg.termui._getchar | |
205 old_should_strip_ansi = clickpkg.utils.should_strip_ansi | |
206 clickpkg.termui.visible_prompt_func = visible_input | |
207 clickpkg.termui.hidden_prompt_func = hidden_input | |
208 clickpkg.termui._getchar = _getchar | |
209 clickpkg.utils.should_strip_ansi = should_strip_ansi | |
210 | |
211 old_env = {} | |
212 try: | |
213 for key, value in iteritems(env): | |
214 old_env[key] = os.environ.get(value) | |
215 if value is None: | |
216 try: | |
217 del os.environ[key] | |
218 except Exception: | |
219 pass | |
220 else: | |
221 os.environ[key] = value | |
222 yield bytes_output | |
223 finally: | |
224 for key, value in iteritems(old_env): | |
225 if value is None: | |
226 try: | |
227 del os.environ[key] | |
228 except Exception: | |
229 pass | |
230 else: | |
231 os.environ[key] = value | |
232 sys.stdout = old_stdout | |
233 sys.stderr = old_stderr | |
234 sys.stdin = old_stdin | |
235 clickpkg.termui.visible_prompt_func = old_visible_prompt_func | |
236 clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func | |
237 clickpkg.termui._getchar = old__getchar_func | |
238 clickpkg.utils.should_strip_ansi = old_should_strip_ansi | |
239 | |
240 def invoke(self, cli, args=None, input=None, env=None, | |
241 catch_exceptions=True, color=False, **extra): | |
242 """Invokes a command in an isolated environment. The arguments are | |
243 forwarded directly to the command line script, the `extra` keyword | |
244 arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
245 the command. | |
246 | |
247 This returns a :class:`Result` object. | |
248 | |
249 .. versionadded:: 3.0 | |
250 The ``catch_exceptions`` parameter was added. | |
251 | |
252 .. versionchanged:: 3.0 | |
253 The result object now has an `exc_info` attribute with the | |
254 traceback if available. | |
255 | |
256 .. versionadded:: 4.0 | |
257 The ``color`` parameter was added. | |
258 | |
259 :param cli: the command to invoke | |
260 :param args: the arguments to invoke | |
261 :param input: the input data for `sys.stdin`. | |
262 :param env: the environment overrides. | |
263 :param catch_exceptions: Whether to catch any other exceptions than | |
264 ``SystemExit``. | |
265 :param extra: the keyword arguments to pass to :meth:`main`. | |
266 :param color: whether the output should contain color codes. The | |
267 application can still override this explicitly. | |
268 """ | |
269 exc_info = None | |
270 with self.isolation(input=input, env=env, color=color) as out: | |
271 exception = None | |
272 exit_code = 0 | |
273 | |
274 try: | |
275 cli.main(args=args or (), | |
276 prog_name=self.get_default_prog_name(cli), **extra) | |
277 except SystemExit as e: | |
278 if e.code != 0: | |
279 exception = e | |
280 | |
281 exc_info = sys.exc_info() | |
282 | |
283 exit_code = e.code | |
284 if not isinstance(exit_code, int): | |
285 sys.stdout.write(str(exit_code)) | |
286 sys.stdout.write('\n') | |
287 exit_code = 1 | |
288 except Exception as e: | |
289 if not catch_exceptions: | |
290 raise | |
291 exception = e | |
292 exit_code = -1 | |
293 exc_info = sys.exc_info() | |
294 finally: | |
295 sys.stdout.flush() | |
296 output = out.getvalue() | |
297 | |
298 return Result(runner=self, | |
299 output_bytes=output, | |
300 exit_code=exit_code, | |
301 exception=exception, | |
302 exc_info=exc_info) | |
303 | |
304 @contextlib.contextmanager | |
305 def isolated_filesystem(self): | |
306 """A context manager that creates a temporary folder and changes | |
307 the current working directory to it for isolated filesystem tests. | |
308 """ | |
309 cwd = os.getcwd() | |
310 t = tempfile.mkdtemp() | |
311 os.chdir(t) | |
312 try: | |
313 yield t | |
314 finally: | |
315 os.chdir(cwd) | |
316 try: | |
317 shutil.rmtree(t) | |
318 except (OSError, IOError): | |
319 pass |