comparison venv/lib/python2.7/site-packages/requests_toolbelt/threaded/pool.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 """Module implementing the Pool for :mod:``requests_toolbelt.threaded``."""
2 import multiprocessing
3 try:
4 import queue # Python 3
5 except ImportError:
6 import Queue as queue
7
8 import requests
9
10 from . import thread
11
12
13 class Pool(object):
14 """Pool that manages the threads containing sessions.
15
16 :param queue:
17 The queue you're expected to use to which you should add items.
18 :type queue: queue.Queue
19 :param initializer:
20 Function used to initialize an instance of ``session``.
21 :type initializer: collections.Callable
22 :param auth_generator:
23 Function used to generate new auth credentials for the session.
24 :type auth_generator: collections.Callable
25 :param int num_threads:
26 Number of threads to create.
27 :param session:
28 :type session: requests.Session
29 """
30
31 def __init__(self, job_queue, initializer=None, auth_generator=None,
32 num_processes=None, session=requests.Session):
33 if num_processes is None:
34 num_processes = multiprocessing.cpu_count() or 1
35
36 if num_processes < 1:
37 raise ValueError("Number of processes should at least be 1.")
38
39 self._job_queue = job_queue
40 self._response_queue = queue.Queue()
41 self._exc_queue = queue.Queue()
42 self._processes = num_processes
43 self._initializer = initializer or _identity
44 self._auth = auth_generator or _identity
45 self._session = session
46 self._pool = [
47 thread.SessionThread(self._new_session(), self._job_queue,
48 self._response_queue, self._exc_queue)
49 for _ in range(self._processes)
50 ]
51
52 def _new_session(self):
53 return self._auth(self._initializer(self._session()))
54
55 @classmethod
56 def from_exceptions(cls, exceptions, **kwargs):
57 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s.
58
59 Provided an iterable that provides :class:`~ThreadException` objects,
60 this classmethod will generate a new pool to retry the requests that
61 caused the exceptions.
62
63 :param exceptions:
64 Iterable that returns :class:`~ThreadException`
65 :type exceptions: iterable
66 :param kwargs:
67 Keyword arguments passed to the :class:`~Pool` initializer.
68 :returns: An initialized :class:`~Pool` object.
69 :rtype: :class:`~Pool`
70 """
71 job_queue = queue.Queue()
72 for exc in exceptions:
73 job_queue.put(exc.request_kwargs)
74
75 return cls(job_queue=job_queue, **kwargs)
76
77 @classmethod
78 def from_urls(cls, urls, request_kwargs=None, **kwargs):
79 """Create a :class:`~Pool` from an iterable of URLs.
80
81 :param urls:
82 Iterable that returns URLs with which we create a pool.
83 :type urls: iterable
84 :param dict request_kwargs:
85 Dictionary of other keyword arguments to provide to the request
86 method.
87 :param kwargs:
88 Keyword arguments passed to the :class:`~Pool` initializer.
89 :returns: An initialized :class:`~Pool` object.
90 :rtype: :class:`~Pool`
91 """
92 request_dict = {'method': 'GET'}
93 request_dict.update(request_kwargs or {})
94 job_queue = queue.Queue()
95 for url in urls:
96 job = request_dict.copy()
97 job.update({'url': url})
98 job_queue.put(job)
99
100 return cls(job_queue=job_queue, **kwargs)
101
102 def exceptions(self):
103 """Iterate over all the exceptions in the pool.
104
105 :returns: Generator of :class:`~ThreadException`
106 """
107 while True:
108 exc = self.get_exception()
109 if exc is None:
110 break
111 yield exc
112
113 def get_exception(self):
114 """Get an exception from the pool.
115
116 :rtype: :class:`~ThreadException`
117 """
118 try:
119 (request, exc) = self._exc_queue.get_nowait()
120 except queue.Empty:
121 return None
122 else:
123 return ThreadException(request, exc)
124
125 def get_response(self):
126 """Get a response from the pool.
127
128 :rtype: :class:`~ThreadResponse`
129 """
130 try:
131 (request, response) = self._response_queue.get_nowait()
132 except queue.Empty:
133 return None
134 else:
135 return ThreadResponse(request, response)
136
137 def responses(self):
138 """Iterate over all the responses in the pool.
139
140 :returns: Generator of :class:`~ThreadResponse`
141 """
142 while True:
143 resp = self.get_response()
144 if resp is None:
145 break
146 yield resp
147
148 def join_all(self):
149 """Join all the threads to the master thread."""
150 for session_thread in self._pool:
151 session_thread.join()
152
153
154 class ThreadProxy(object):
155 proxied_attr = None
156
157 def __getattr__(self, attr):
158 """Proxy attribute accesses to the proxied object."""
159 get = object.__getattribute__
160 if attr not in self.attrs:
161 response = get(self, self.proxied_attr)
162 return getattr(response, attr)
163 else:
164 return get(self, attr)
165
166
167 class ThreadResponse(ThreadProxy):
168 """A wrapper around a requests Response object.
169
170 This will proxy most attribute access actions to the Response object. For
171 example, if you wanted the parsed JSON from the response, you might do:
172
173 .. code-block:: python
174
175 thread_response = pool.get_response()
176 json = thread_response.json()
177
178 """
179 proxied_attr = 'response'
180 attrs = frozenset(['request_kwargs', 'response'])
181
182 def __init__(self, request_kwargs, response):
183 #: The original keyword arguments provided to the queue
184 self.request_kwargs = request_kwargs
185 #: The wrapped response
186 self.response = response
187
188
189 class ThreadException(ThreadProxy):
190 """A wrapper around an exception raised during a request.
191
192 This will proxy most attribute access actions to the exception object. For
193 example, if you wanted the message from the exception, you might do:
194
195 .. code-block:: python
196
197 thread_exc = pool.get_exception()
198 msg = thread_exc.message
199
200 """
201 proxied_attr = 'exception'
202 attrs = frozenset(['request_kwargs', 'exception'])
203
204 def __init__(self, request_kwargs, exception):
205 #: The original keyword arguments provided to the queue
206 self.request_kwargs = request_kwargs
207 #: The captured and wrapped exception
208 self.exception = exception
209
210
211 def _identity(session_obj):
212 return session_obj
213
214
215 __all__ = ['ThreadException', 'ThreadResponse', 'Pool']