Skip to content

multipart

pigwig.multipart

MultipartFile(data: bytes, filename: str)

Source code in pigwig/multipart.py
126
127
128
def __init__(self, data: bytes, filename: str) -> None:
	self.data = data
	self.filename = filename

data: bytes = data instance-attribute

filename: str = filename instance-attribute

parse_multipart(fp: io.BufferedIOBase, pdict: dict) -> dict[str, list[bytes | MultipartFile]]

most of this code is copied straight from cgi.parse_multipart. the only difference is that it returns a MultipartFile for any part with a filename param in its content-disposition (instead of just the bytes).

Source code in pigwig/multipart.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def parse_multipart(fp: io.BufferedIOBase, pdict: dict) -> dict[str, list[bytes | MultipartFile]]:
	"""
		most of this code is copied straight from
		[cgi.parse_multipart](https://github.com/python/cpython/blob/ad76602f69d884e491b0913c641dd8e42902c36c/Lib/cgi.py#L201).
		the only difference is that it returns a [MultipartFile][] for any part with a ``filename``
		param in its content-disposition (instead of just the bytes).
	"""
	boundary = pdict.get('boundary', b'')
	if re.match(b'^[ -~]{0,200}[!-~]$', boundary) is None:
		raise ValueError('Invalid boundary in multipart form: %r' % boundary)

	nextpart = b'--' + boundary
	lastpart = b'--' + boundary + b'--'
	partdict: dict[str, list[bytes | MultipartFile]] = {}
	terminator = b''
	while terminator != lastpart:
		read = -1
		data: bytes | MultipartFile | None = None
		if terminator:
			# At start of next part.  Read headers first.
			headers = http.client.parse_headers(fp)
			clength = headers.get('content-length')
			if clength:
				try:
					read = int(clength)
				except ValueError:
					pass
			if read > 0:
				if maxlen and read > maxlen:
					raise ValueError('Maximum content length exceeded')
				data = fp.read(read)
			else:
				data = b''
		# read lines until end of part
		lines = []
		while True:
			line = fp.readline()
			if not line:
				terminator = lastpart
				break
			if line.startswith(b'--'):
				terminator = line.rstrip()
				if terminator in (nextpart, lastpart):
					break
			lines.append(line)
		# done with part
		if data is None:
			continue
		if read < 0:
			if lines:
				# strip final line terminator
				line = lines[-1]
				if line[-2:] == b'\r\n':
					line = line[:-2]
				elif line[-1:] == b'\n':
					line = line[:-1]
				lines[-1] = line
				data = b''.join(lines)
		content_disposition = headers['content-disposition']
		if not content_disposition:
			continue
		key, params = parse_header(content_disposition)
		if key != 'form-data':
			continue
		if 'name' in params:
			name = params['name']
		else:
			continue

		if 'filename' in params:
			assert isinstance(data, bytes)
			data = MultipartFile(data, params['filename'])
		if name in partdict:
			partdict[name].append(data)
		else:
			partdict[name] = [data]

	return partdict

parse_header(line: str) -> tuple[str, dict[str, str]]

Parse a Content-type like header.

Return the main content-type and a dictionary of options.

Source code in pigwig/multipart.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def parse_header(line: str) -> tuple[str, dict[str, str]]:
	"""Parse a Content-type like header.

	Return the main content-type and a dictionary of options.
	"""
	parts = _parseparam(';' + line)
	key = parts.__next__()
	pdict = {}
	for p in parts:
		i = p.find('=')
		if i >= 0:
			name = p[:i].strip().lower()
			value = p[i+1:].strip()
			if len(value) >= 2 and value[0] == value[-1] == '"':
				value = value[1:-1]
				value = value.replace('\\\\', '\\').replace('\\"', '"')
			pdict[name] = value
	return key, pdict