Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/requests_toolbelt/multipart/decoder.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """ | |
3 | |
4 requests_toolbelt.multipart.decoder | |
5 =================================== | |
6 | |
7 This holds all the implementation details of the MultipartDecoder | |
8 | |
9 """ | |
10 | |
11 import sys | |
12 import email.parser | |
13 from .encoder import encode_with | |
14 from requests.structures import CaseInsensitiveDict | |
15 | |
16 | |
17 def _split_on_find(content, bound): | |
18 point = content.find(bound) | |
19 return content[:point], content[point + len(bound):] | |
20 | |
21 | |
22 class ImproperBodyPartContentException(Exception): | |
23 pass | |
24 | |
25 | |
26 class NonMultipartContentTypeException(Exception): | |
27 pass | |
28 | |
29 | |
30 def _header_parser(string, encoding): | |
31 major = sys.version_info[0] | |
32 if major == 3: | |
33 string = string.decode(encoding) | |
34 headers = email.parser.HeaderParser().parsestr(string).items() | |
35 return ( | |
36 (encode_with(k, encoding), encode_with(v, encoding)) | |
37 for k, v in headers | |
38 ) | |
39 | |
40 | |
41 class BodyPart(object): | |
42 """ | |
43 | |
44 The ``BodyPart`` object is a ``Response``-like interface to an individual | |
45 subpart of a multipart response. It is expected that these will | |
46 generally be created by objects of the ``MultipartDecoder`` class. | |
47 | |
48 Like ``Response``, there is a ``CaseInsensitiveDict`` object named header, | |
49 ``content`` to access bytes, ``text`` to access unicode, and ``encoding`` | |
50 to access the unicode codec. | |
51 | |
52 """ | |
53 | |
54 def __init__(self, content, encoding): | |
55 self.encoding = encoding | |
56 headers = {} | |
57 # Split into header section (if any) and the content | |
58 if b'\r\n\r\n' in content: | |
59 first, self.content = _split_on_find(content, b'\r\n\r\n') | |
60 if first != b'': | |
61 headers = _header_parser(first.lstrip(), encoding) | |
62 else: | |
63 raise ImproperBodyPartContentException( | |
64 'content does not contain CR-LF-CR-LF' | |
65 ) | |
66 self.headers = CaseInsensitiveDict(headers) | |
67 | |
68 @property | |
69 def text(self): | |
70 """Content of the ``BodyPart`` in unicode.""" | |
71 return self.content.decode(self.encoding) | |
72 | |
73 | |
74 class MultipartDecoder(object): | |
75 """ | |
76 | |
77 The ``MultipartDecoder`` object parses the multipart payload of | |
78 a bytestring into a tuple of ``Response``-like ``BodyPart`` objects. | |
79 | |
80 The basic usage is:: | |
81 | |
82 import requests | |
83 from requests_toolbelt import MultipartDecoder | |
84 | |
85 response = request.get(url) | |
86 decoder = MultipartDecoder.from_response(response) | |
87 for part in decoder.parts: | |
88 print(part.header['content-type']) | |
89 | |
90 If the multipart content is not from a response, basic usage is:: | |
91 | |
92 from requests_toolbelt import MultipartDecoder | |
93 | |
94 decoder = MultipartDecoder(content, content_type) | |
95 for part in decoder.parts: | |
96 print(part.header['content-type']) | |
97 | |
98 For both these usages, there is an optional ``encoding`` parameter. This is | |
99 a string, which is the name of the unicode codec to use (default is | |
100 ``'utf-8'``). | |
101 | |
102 """ | |
103 def __init__(self, content, content_type, encoding='utf-8'): | |
104 #: Original content | |
105 self.content = content | |
106 #: Original Content-Type header | |
107 self.content_type = content_type | |
108 #: Response body encoding | |
109 self.encoding = encoding | |
110 #: Parsed parts of the multipart response body | |
111 self.parts = tuple() | |
112 self._find_boundary() | |
113 self._parse_body() | |
114 | |
115 def _find_boundary(self): | |
116 ct_info = tuple(x.strip() for x in self.content_type.split(';')) | |
117 mimetype = ct_info[0] | |
118 if mimetype.split('/')[0] != 'multipart': | |
119 raise NonMultipartContentTypeException( | |
120 "Unexpected mimetype in content-type: '{0}'".format(mimetype) | |
121 ) | |
122 for item in ct_info[1:]: | |
123 attr, value = _split_on_find( | |
124 item, | |
125 '=' | |
126 ) | |
127 if attr.lower() == 'boundary': | |
128 self.boundary = encode_with(value.strip('"'), self.encoding) | |
129 | |
130 @staticmethod | |
131 def _fix_first_part(part, boundary_marker): | |
132 bm_len = len(boundary_marker) | |
133 if boundary_marker == part[:bm_len]: | |
134 return part[bm_len:] | |
135 else: | |
136 return part | |
137 | |
138 def _parse_body(self): | |
139 boundary = b''.join((b'--', self.boundary)) | |
140 | |
141 def body_part(part): | |
142 fixed = MultipartDecoder._fix_first_part(part, boundary) | |
143 return BodyPart(fixed, self.encoding) | |
144 | |
145 def test_part(part): | |
146 return part != b'' and part != b'\r\n' and part[:4] != b'--\r\n' | |
147 | |
148 parts = self.content.split(b''.join((b'\r\n', boundary))) | |
149 self.parts = tuple(body_part(x) for x in parts if test_part(x)) | |
150 | |
151 @classmethod | |
152 def from_response(cls, response, encoding='utf-8'): | |
153 content = response.content | |
154 content_type = response.headers.get('content-type', None) | |
155 return cls(content, content_type, encoding) |