Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/requests_toolbelt/threaded/pool.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 """Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" | |
2 import multiprocessing | |
3 try: | |
4 import queue # Python 3 | |
5 except ImportError: | |
6 import Queue as queue | |
7 | |
8 import requests | |
9 | |
10 from . import thread | |
11 | |
12 | |
13 class Pool(object): | |
14 """Pool that manages the threads containing sessions. | |
15 | |
16 :param queue: | |
17 The queue you're expected to use to which you should add items. | |
18 :type queue: queue.Queue | |
19 :param initializer: | |
20 Function used to initialize an instance of ``session``. | |
21 :type initializer: collections.Callable | |
22 :param auth_generator: | |
23 Function used to generate new auth credentials for the session. | |
24 :type auth_generator: collections.Callable | |
25 :param int num_threads: | |
26 Number of threads to create. | |
27 :param session: | |
28 :type session: requests.Session | |
29 """ | |
30 | |
31 def __init__(self, job_queue, initializer=None, auth_generator=None, | |
32 num_processes=None, session=requests.Session): | |
33 if num_processes is None: | |
34 num_processes = multiprocessing.cpu_count() or 1 | |
35 | |
36 if num_processes < 1: | |
37 raise ValueError("Number of processes should at least be 1.") | |
38 | |
39 self._job_queue = job_queue | |
40 self._response_queue = queue.Queue() | |
41 self._exc_queue = queue.Queue() | |
42 self._processes = num_processes | |
43 self._initializer = initializer or _identity | |
44 self._auth = auth_generator or _identity | |
45 self._session = session | |
46 self._pool = [ | |
47 thread.SessionThread(self._new_session(), self._job_queue, | |
48 self._response_queue, self._exc_queue) | |
49 for _ in range(self._processes) | |
50 ] | |
51 | |
52 def _new_session(self): | |
53 return self._auth(self._initializer(self._session())) | |
54 | |
55 @classmethod | |
56 def from_exceptions(cls, exceptions, **kwargs): | |
57 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. | |
58 | |
59 Provided an iterable that provides :class:`~ThreadException` objects, | |
60 this classmethod will generate a new pool to retry the requests that | |
61 caused the exceptions. | |
62 | |
63 :param exceptions: | |
64 Iterable that returns :class:`~ThreadException` | |
65 :type exceptions: iterable | |
66 :param kwargs: | |
67 Keyword arguments passed to the :class:`~Pool` initializer. | |
68 :returns: An initialized :class:`~Pool` object. | |
69 :rtype: :class:`~Pool` | |
70 """ | |
71 job_queue = queue.Queue() | |
72 for exc in exceptions: | |
73 job_queue.put(exc.request_kwargs) | |
74 | |
75 return cls(job_queue=job_queue, **kwargs) | |
76 | |
77 @classmethod | |
78 def from_urls(cls, urls, request_kwargs=None, **kwargs): | |
79 """Create a :class:`~Pool` from an iterable of URLs. | |
80 | |
81 :param urls: | |
82 Iterable that returns URLs with which we create a pool. | |
83 :type urls: iterable | |
84 :param dict request_kwargs: | |
85 Dictionary of other keyword arguments to provide to the request | |
86 method. | |
87 :param kwargs: | |
88 Keyword arguments passed to the :class:`~Pool` initializer. | |
89 :returns: An initialized :class:`~Pool` object. | |
90 :rtype: :class:`~Pool` | |
91 """ | |
92 request_dict = {'method': 'GET'} | |
93 request_dict.update(request_kwargs or {}) | |
94 job_queue = queue.Queue() | |
95 for url in urls: | |
96 job = request_dict.copy() | |
97 job.update({'url': url}) | |
98 job_queue.put(job) | |
99 | |
100 return cls(job_queue=job_queue, **kwargs) | |
101 | |
102 def exceptions(self): | |
103 """Iterate over all the exceptions in the pool. | |
104 | |
105 :returns: Generator of :class:`~ThreadException` | |
106 """ | |
107 while True: | |
108 exc = self.get_exception() | |
109 if exc is None: | |
110 break | |
111 yield exc | |
112 | |
113 def get_exception(self): | |
114 """Get an exception from the pool. | |
115 | |
116 :rtype: :class:`~ThreadException` | |
117 """ | |
118 try: | |
119 (request, exc) = self._exc_queue.get_nowait() | |
120 except queue.Empty: | |
121 return None | |
122 else: | |
123 return ThreadException(request, exc) | |
124 | |
125 def get_response(self): | |
126 """Get a response from the pool. | |
127 | |
128 :rtype: :class:`~ThreadResponse` | |
129 """ | |
130 try: | |
131 (request, response) = self._response_queue.get_nowait() | |
132 except queue.Empty: | |
133 return None | |
134 else: | |
135 return ThreadResponse(request, response) | |
136 | |
137 def responses(self): | |
138 """Iterate over all the responses in the pool. | |
139 | |
140 :returns: Generator of :class:`~ThreadResponse` | |
141 """ | |
142 while True: | |
143 resp = self.get_response() | |
144 if resp is None: | |
145 break | |
146 yield resp | |
147 | |
148 def join_all(self): | |
149 """Join all the threads to the master thread.""" | |
150 for session_thread in self._pool: | |
151 session_thread.join() | |
152 | |
153 | |
154 class ThreadProxy(object): | |
155 proxied_attr = None | |
156 | |
157 def __getattr__(self, attr): | |
158 """Proxy attribute accesses to the proxied object.""" | |
159 get = object.__getattribute__ | |
160 if attr not in self.attrs: | |
161 response = get(self, self.proxied_attr) | |
162 return getattr(response, attr) | |
163 else: | |
164 return get(self, attr) | |
165 | |
166 | |
167 class ThreadResponse(ThreadProxy): | |
168 """A wrapper around a requests Response object. | |
169 | |
170 This will proxy most attribute access actions to the Response object. For | |
171 example, if you wanted the parsed JSON from the response, you might do: | |
172 | |
173 .. code-block:: python | |
174 | |
175 thread_response = pool.get_response() | |
176 json = thread_response.json() | |
177 | |
178 """ | |
179 proxied_attr = 'response' | |
180 attrs = frozenset(['request_kwargs', 'response']) | |
181 | |
182 def __init__(self, request_kwargs, response): | |
183 #: The original keyword arguments provided to the queue | |
184 self.request_kwargs = request_kwargs | |
185 #: The wrapped response | |
186 self.response = response | |
187 | |
188 | |
189 class ThreadException(ThreadProxy): | |
190 """A wrapper around an exception raised during a request. | |
191 | |
192 This will proxy most attribute access actions to the exception object. For | |
193 example, if you wanted the message from the exception, you might do: | |
194 | |
195 .. code-block:: python | |
196 | |
197 thread_exc = pool.get_exception() | |
198 msg = thread_exc.message | |
199 | |
200 """ | |
201 proxied_attr = 'exception' | |
202 attrs = frozenset(['request_kwargs', 'exception']) | |
203 | |
204 def __init__(self, request_kwargs, exception): | |
205 #: The original keyword arguments provided to the queue | |
206 self.request_kwargs = request_kwargs | |
207 #: The captured and wrapped exception | |
208 self.exception = exception | |
209 | |
210 | |
211 def _identity(session_obj): | |
212 return session_obj | |
213 | |
214 | |
215 __all__ = ['ThreadException', 'ThreadResponse', 'Pool'] |