1010$ pytest
1111"""
1212
13- from datetime import timedelta
13+ from collections import namedtuple
14+ from datetime import datetime , timedelta
1415import base64
16+ import json
1517import os
1618import re
19+ import shutil
20+ from typing import Union
1721import unittest
18- from urllib .parse import urlparse
22+ from urllib .parse import urlencode , urlparse
1923
2024from Crypto .Hash import SHA256 , SHA512
2125from Crypto .PublicKey import ECC , RSA
26+ from Crypto .Hash .SHA512 import SHA512Hash
27+ from Crypto .Hash .SHA256 import SHA256Hash
2228from Crypto .Signature import pkcs1_15 , pss , DSS
2329
2430import petstore_api
2531#from petstore_api.models import Category, Tag, Pet
2632from petstore_api .api .pet_api import PetApi
2733from petstore_api import Configuration , signing
34+ from petstore_api .rest import (
35+ RESTClientObject ,
36+ RESTResponse
37+ )
38+
39+ from petstore_api .exceptions import (
40+ ApiException ,
41+ ApiValueError ,
42+ ApiTypeError ,
43+ )
2844
2945from .util import id_gen
3046
3147import urllib3
3248
49+ from unittest .mock import patch
3350
3451HOST = 'http://localhost/v2'
3552
@@ -59,7 +76,7 @@ def __init__(self, *arg, **kwargs):
5976 def __eq__ (self , other ):
6077 return self ._read == other ._read and self ._connect == other ._connect and self .total == other .total
6178
62- class MockPoolManager (object ):
79+ class MockPoolManager (urllib3 . PoolManager ):
6380 def __init__ (self , tc ):
6481 self ._tc = tc
6582 self ._reqs = []
@@ -69,9 +86,9 @@ def expect_request(self, *args, **kwargs):
6986
7087 def set_signing_config (self , signing_cfg ):
7188 self .signing_cfg = signing_cfg
72- self ._tc . assertIsNotNone ( self . signing_cfg )
89+ assert self .signing_cfg is not None
7390 self .pubkey = self .signing_cfg .get_public_key ()
74- self ._tc . assertIsNotNone ( self . pubkey )
91+ assert self .pubkey is not None
7592
7693 def request (self , * actual_request_target , ** actual_request_headers_and_body ):
7794 self ._tc .assertTrue (len (self ._reqs ) > 0 )
@@ -114,13 +131,13 @@ def _validate_authorization_header(self, request_target, actual_headers, authori
114131 # Extract (created)
115132 r1 = re .compile (r'created=([0-9]+)' )
116133 m1 = r1 .search (authorization_header )
117- self . _tc . assertIsNotNone ( m1 )
134+ assert m1 is not None
118135 created = m1 .group (1 )
119136
120137 # Extract list of signed headers
121138 r1 = re .compile (r'headers="([^"]+)"' )
122139 m1 = r1 .search (authorization_header )
123- self . _tc . assertIsNotNone ( m1 )
140+ assert m1 is not None
124141 headers = m1 .group (1 ).split (' ' )
125142 signed_headers_list = []
126143 for h in headers :
@@ -132,25 +149,26 @@ def _validate_authorization_header(self, request_target, actual_headers, authori
132149 signed_headers_list .append ((h , "{0} {1}" .format (request_target [0 ].lower (), target_path )))
133150 else :
134151 value = next ((v for k , v in actual_headers .items () if k .lower () == h ), None )
135- self . _tc . assertIsNotNone ( value )
152+ assert value is not None
136153 signed_headers_list .append ((h , value ))
137154 header_items = [
138155 "{0}: {1}" .format (key .lower (), value ) for key , value in signed_headers_list ]
139156 string_to_sign = "\n " .join (header_items )
140- digest = None
157+ digest : Union [ SHA512Hash , SHA256Hash ]
141158 if self .signing_cfg .hash_algorithm == signing .HASH_SHA512 :
142159 digest = SHA512 .new ()
143160 elif self .signing_cfg .hash_algorithm == signing .HASH_SHA256 :
144161 digest = SHA256 .new ()
145162 else :
146163 self ._tc .fail ("Unsupported hash algorithm: {0}" .format (self .signing_cfg .hash_algorithm ))
164+
147165 digest .update (string_to_sign .encode ())
148166 b64_body_digest = base64 .b64encode (digest .digest ()).decode ()
149167
150168 # Extract the signature
151169 r2 = re .compile (r'signature="([^"]+)"' )
152170 m2 = r2 .search (authorization_header )
153- self . _tc . assertIsNotNone ( m2 )
171+ assert m2 is not None
154172 b64_signature = m2 .group (1 )
155173 signature = base64 .b64decode (b64_signature )
156174 # Build the message
@@ -180,6 +198,12 @@ def _validate_authorization_header(self, request_target, actual_headers, authori
180198 self ._tc .fail ("Unsupported signing algorithm: {0}" .format (signing_alg ))
181199
182200class PetApiTests (unittest .TestCase ):
201+ rsa_key_path : str
202+ rsa4096_key_path : str
203+ ec_p521_key_path : str
204+ pet : petstore_api .Pet
205+ test_file_dir : str
206+ private_key_passphrase : str
183207
184208 @classmethod
185209 def setUpClass (cls ):
@@ -198,19 +222,19 @@ def tearDownClass(cls):
198222
199223 @classmethod
200224 def setUpModels (cls ):
201- cls . category = petstore_api .Category (name = "dog" )
202- cls . category .id = id_gen ()
203- cls . tag = petstore_api .Tag ()
204- cls . tag .id = id_gen ()
205- cls . tag .name = "python-pet-tag"
225+ category = petstore_api .Category (name = "dog" )
226+ category .id = id_gen ()
227+ tag = petstore_api .Tag ()
228+ tag .id = id_gen ()
229+ tag .name = "python-pet-tag"
206230 cls .pet = petstore_api .Pet (
207231 name = "hello kity" ,
208- photo_urls = ["http://foo.bar.com/1" , "http://foo.bar.com/2" ]
232+ photoUrls = ["http://foo.bar.com/1" , "http://foo.bar.com/2" ]
209233 )
210234 cls .pet .id = id_gen ()
211235 cls .pet .status = "sold"
212- cls .pet .category = cls . category
213- cls .pet .tags = [cls . tag ]
236+ cls .pet .category = category
237+ cls .pet .tags = [tag ]
214238
215239 @classmethod
216240 def setUpFiles (cls ):
@@ -229,6 +253,8 @@ def setUpFiles(cls):
229253 with open (cls .rsa_key_path , 'w' ) as f :
230254 f .write (RSA_TEST_PRIVATE_KEY )
231255
256+ key : Union [RSA .RsaKey , ECC .EccKey ]
257+
232258 if not os .path .exists (cls .rsa4096_key_path ):
233259 key = RSA .generate (4096 )
234260 private_key = key .export_key (
@@ -240,13 +266,17 @@ def setUpFiles(cls):
240266
241267 if not os .path .exists (cls .ec_p521_key_path ):
242268 key = ECC .generate (curve = 'P-521' )
243- private_key = key .export_key (
269+ pkey = key .export_key (
244270 format = 'PEM' ,
245271 passphrase = cls .private_key_passphrase ,
246272 use_pkcs8 = True ,
247273 protection = 'PBKDF2WithHMAC-SHA1AndAES128-CBC'
248274 )
249- with open (cls .ec_p521_key_path , "wt" ) as f :
275+ if isinstance (pkey , str ):
276+ private_key = pkey .encode ("ascii" )
277+ else :
278+ private_key = pkey
279+ with open (cls .ec_p521_key_path , "wb" ) as f :
250280 f .write (private_key )
251281
252282 def test_valid_http_signature (self ):
@@ -449,7 +479,7 @@ def test_invalid_configuration(self):
449479 signing_cfg = signing .HttpSigningConfiguration (
450480 key_id = "my-key-id" ,
451481 private_key_path = self .ec_p521_key_path ,
452- signing_scheme = None
482+ signing_scheme = None # type: ignore
453483 )
454484 self .assertTrue (re .match ('Unsupported security scheme' , str (cm .exception )),
455485 'Exception message: {0}' .format (str (cm .exception )))
0 commit comments