Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/github/tests/Framework.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 | |
3 # ########################## Copyrights and license ############################ | |
4 # # | |
5 # Copyright 2012 Vincent Jacques <vincent@vincent-jacques.net> # | |
6 # Copyright 2012 Zearin <zearin@gonk.net> # | |
7 # Copyright 2013 AKFish <akfish@gmail.com> # | |
8 # Copyright 2013 Vincent Jacques <vincent@vincent-jacques.net> # | |
9 # # | |
10 # This file is part of PyGithub. http://jacquev6.github.com/PyGithub/ # | |
11 # # | |
12 # PyGithub is free software: you can redistribute it and/or modify it under # | |
13 # the terms of the GNU Lesser General Public License as published by the Free # | |
14 # Software Foundation, either version 3 of the License, or (at your option) # | |
15 # any later version. # | |
16 # # | |
17 # PyGithub is distributed in the hope that it will be useful, but WITHOUT ANY # | |
18 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # | |
19 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # | |
20 # details. # | |
21 # # | |
22 # You should have received a copy of the GNU Lesser General Public License # | |
23 # along with PyGithub. If not, see <http://www.gnu.org/licenses/>. # | |
24 # # | |
25 # ############################################################################## | |
26 | |
27 import os | |
28 import sys | |
29 import unittest | |
30 import httplib | |
31 import traceback | |
32 | |
33 import github | |
34 | |
35 atLeastPython26 = sys.hexversion >= 0x02060000 | |
36 atLeastPython3 = sys.hexversion >= 0x03000000 | |
37 atMostPython32 = sys.hexversion < 0x03030000 | |
38 | |
39 if atLeastPython26: | |
40 import json | |
41 else: # pragma no cover (Covered by all tests with Python 2.5) | |
42 import simplejson as json # pragma no cover (Covered by all tests with Python 2.5) | |
43 | |
44 | |
45 def readLine(file): | |
46 if atLeastPython3: | |
47 return file.readline().decode("utf-8").strip() | |
48 else: | |
49 return file.readline().strip() | |
50 | |
51 | |
52 class FakeHttpResponse: | |
53 def __init__(self, status, headers, output): | |
54 self.status = status | |
55 self.__headers = headers | |
56 self.__output = output | |
57 | |
58 def getheaders(self): | |
59 return self.__headers | |
60 | |
61 def read(self): | |
62 return self.__output | |
63 | |
64 | |
65 def fixAuthorizationHeader(headers): | |
66 if "Authorization" in headers: | |
67 if headers["Authorization"].endswith("ZmFrZV9sb2dpbjpmYWtlX3Bhc3N3b3Jk"): | |
68 # This special case is here to test the real Authorization header | |
69 # sent by PyGithub. It would have avoided issue https://github.com/jacquev6/PyGithub/issues/153 | |
70 # because we would have seen that Python 3 was not generating the same | |
71 # header as Python 2 | |
72 pass | |
73 elif headers["Authorization"].startswith("token "): | |
74 headers["Authorization"] = "token private_token_removed" | |
75 elif headers["Authorization"].startswith("Basic "): | |
76 headers["Authorization"] = "Basic login_and_password_removed" | |
77 | |
78 | |
79 class RecordingConnection: # pragma no cover (Class useful only when recording new tests, not used during automated tests) | |
80 def __init__(self, file, protocol, host, port, *args, **kwds): | |
81 self.__file = file | |
82 self.__protocol = protocol | |
83 self.__host = host | |
84 self.__port = str(port) | |
85 self.__cnx = self._realConnection(host, port, *args, **kwds) | |
86 | |
87 def request(self, verb, url, input, headers): | |
88 print verb, url, input, headers, | |
89 self.__cnx.request(verb, url, input, headers) | |
90 fixAuthorizationHeader(headers) | |
91 self.__writeLine(self.__protocol) | |
92 self.__writeLine(verb) | |
93 self.__writeLine(self.__host) | |
94 self.__writeLine(self.__port) | |
95 self.__writeLine(url) | |
96 self.__writeLine(str(headers)) | |
97 self.__writeLine(input.replace('\n', '').replace('\r', '')) | |
98 | |
99 def getresponse(self): | |
100 res = self.__cnx.getresponse() | |
101 | |
102 status = res.status | |
103 print "=>", status | |
104 headers = res.getheaders() | |
105 output = res.read() | |
106 | |
107 self.__writeLine(str(status)) | |
108 self.__writeLine(str(headers)) | |
109 self.__writeLine(str(output)) | |
110 | |
111 return FakeHttpResponse(status, headers, output) | |
112 | |
113 def close(self): | |
114 self.__writeLine("") | |
115 return self.__cnx.close() | |
116 | |
117 def __writeLine(self, line): | |
118 self.__file.write(line + "\n") | |
119 | |
120 | |
121 class RecordingHttpConnection(RecordingConnection): # pragma no cover (Class useful only when recording new tests, not used during automated tests) | |
122 _realConnection = httplib.HTTPConnection | |
123 | |
124 def __init__(self, file, *args, **kwds): | |
125 RecordingConnection.__init__(self, file, "http", *args, **kwds) | |
126 | |
127 | |
128 class RecordingHttpsConnection(RecordingConnection): # pragma no cover (Class useful only when recording new tests, not used during automated tests) | |
129 _realConnection = httplib.HTTPSConnection | |
130 | |
131 def __init__(self, file, *args, **kwds): | |
132 RecordingConnection.__init__(self, file, "https", *args, **kwds) | |
133 | |
134 | |
135 class ReplayingConnection: | |
136 def __init__(self, testCase, file, protocol, host, port, *args, **kwds): | |
137 self.__testCase = testCase | |
138 self.__file = file | |
139 self.__protocol = protocol | |
140 self.__host = host | |
141 self.__port = str(port) | |
142 | |
143 def request(self, verb, url, input, headers): | |
144 fixAuthorizationHeader(headers) | |
145 self.__testCase.assertEqual(self.__protocol, readLine(self.__file)) | |
146 self.__testCase.assertEqual(verb, readLine(self.__file)) | |
147 self.__testCase.assertEqual(self.__host, readLine(self.__file)) | |
148 self.__testCase.assertEqual(self.__port, readLine(self.__file)) | |
149 self.__testCase.assertEqual(self.__splitUrl(url), self.__splitUrl(readLine(self.__file))) | |
150 self.__testCase.assertEqual(headers, eval(readLine(self.__file))) | |
151 expectedInput = readLine(self.__file) | |
152 if input.startswith("{"): | |
153 self.__testCase.assertEqual(json.loads(input.replace('\n', '').replace('\r', '')), json.loads(expectedInput)) | |
154 elif atMostPython32: # @todo Test in all cases, including Python 3.3 | |
155 # In Python 3.3, dicts are not output in the same order as in Python 2.5 -> 3.2. | |
156 # So, form-data encoding is not deterministic and is difficult to test. | |
157 self.__testCase.assertEqual(input.replace('\n', '').replace('\r', ''), expectedInput) | |
158 | |
159 def __splitUrl(self, url): | |
160 splitedUrl = url.split("?") | |
161 if len(splitedUrl) == 1: | |
162 return splitedUrl | |
163 self.__testCase.assertEqual(len(splitedUrl), 2) | |
164 base, qs = splitedUrl | |
165 return (base, sorted(qs.split("&"))) | |
166 | |
167 def getresponse(self): | |
168 status = int(readLine(self.__file)) | |
169 headers = eval(readLine(self.__file)) | |
170 output = readLine(self.__file) | |
171 | |
172 return FakeHttpResponse(status, headers, output) | |
173 | |
174 def close(self): | |
175 readLine(self.__file) | |
176 | |
177 | |
178 def ReplayingHttpConnection(testCase, file, *args, **kwds): | |
179 return ReplayingConnection(testCase, file, "http", *args, **kwds) | |
180 | |
181 | |
182 def ReplayingHttpsConnection(testCase, file, *args, **kwds): | |
183 return ReplayingConnection(testCase, file, "https", *args, **kwds) | |
184 | |
185 | |
186 class BasicTestCase(unittest.TestCase): | |
187 recordMode = False | |
188 | |
189 def setUp(self): | |
190 unittest.TestCase.setUp(self) | |
191 self.__fileName = "" | |
192 self.__file = None | |
193 if self.recordMode: # pragma no cover (Branch useful only when recording new tests, not used during automated tests) | |
194 github.Requester.Requester.injectConnectionClasses( | |
195 lambda ignored, *args, **kwds: RecordingHttpConnection(self.__openFile("wb"), *args, **kwds), | |
196 lambda ignored, *args, **kwds: RecordingHttpsConnection(self.__openFile("wb"), *args, **kwds) | |
197 ) | |
198 import GithubCredentials | |
199 self.login = GithubCredentials.login | |
200 self.password = GithubCredentials.password | |
201 self.oauth_token = GithubCredentials.oauth_token | |
202 # @todo Remove client_id and client_secret from ReplayData (as we already remove login, password and oauth_token) | |
203 # self.client_id = GithubCredentials.client_id | |
204 # self.client_secret = GithubCredentials.client_secret | |
205 else: | |
206 github.Requester.Requester.injectConnectionClasses( | |
207 lambda ignored, *args, **kwds: ReplayingHttpConnection(self, self.__openFile("rb"), *args, **kwds), | |
208 lambda ignored, *args, **kwds: ReplayingHttpsConnection(self, self.__openFile("rb"), *args, **kwds) | |
209 ) | |
210 self.login = "login" | |
211 self.password = "password" | |
212 self.oauth_token = "oauth_token" | |
213 self.client_id = "client_id" | |
214 self.client_secret = "client_secret" | |
215 | |
216 def tearDown(self): | |
217 unittest.TestCase.tearDown(self) | |
218 self.__closeReplayFileIfNeeded() | |
219 github.Requester.Requester.resetConnectionClasses() | |
220 | |
221 def __openFile(self, mode): | |
222 for (_, _, functionName, _) in traceback.extract_stack(): | |
223 if functionName.startswith("test") or functionName == "setUp" or functionName == "tearDown": | |
224 if functionName != "test": # because in class Hook(Framework.TestCase), method testTest calls Hook.test | |
225 fileName = os.path.join(os.path.dirname(__file__), "ReplayData", self.__class__.__name__ + "." + functionName + ".txt") | |
226 if fileName != self.__fileName: | |
227 self.__closeReplayFileIfNeeded() | |
228 self.__fileName = fileName | |
229 self.__file = open(self.__fileName, mode) | |
230 return self.__file | |
231 | |
232 def __closeReplayFileIfNeeded(self): | |
233 if self.__file is not None: | |
234 if not self.recordMode: # pragma no branch (Branch useful only when recording new tests, not used during automated tests) | |
235 self.assertEqual(readLine(self.__file), "") | |
236 self.__file.close() | |
237 | |
238 def assertListKeyEqual(self, elements, key, expectedKeys): | |
239 realKeys = [key(element) for element in elements] | |
240 self.assertEqual(realKeys, expectedKeys) | |
241 | |
242 def assertListKeyBegin(self, elements, key, expectedKeys): | |
243 realKeys = [key(element) for element in elements[: len(expectedKeys)]] | |
244 self.assertEqual(realKeys, expectedKeys) | |
245 | |
246 | |
247 class TestCase(BasicTestCase): | |
248 def doCheckFrame(self, obj, frame): | |
249 if obj._headers == {} and frame is None: | |
250 return | |
251 if obj._headers is None and frame == {}: | |
252 return | |
253 self.assertEqual(obj._headers, frame[2]) | |
254 | |
255 def getFrameChecker(self): | |
256 return lambda requester, obj, frame: self.doCheckFrame(obj, frame) | |
257 | |
258 def setUp(self): | |
259 BasicTestCase.setUp(self) | |
260 | |
261 # Set up frame debugging | |
262 github.GithubObject.GithubObject.setCheckAfterInitFlag(True) | |
263 github.Requester.Requester.setDebugFlag(True) | |
264 github.Requester.Requester.setOnCheckMe(self.getFrameChecker()) | |
265 | |
266 self.g = github.Github(self.login, self.password) | |
267 | |
268 | |
269 def activateRecordMode(): # pragma no cover (Function useful only when recording new tests, not used during automated tests) | |
270 BasicTestCase.recordMode = True |