Select Git revision
paper-sorting-loc-new.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
sams_proxy.py 4.25 KiB
from .exceptions import(
ProxySpecKeyMssing, ProxySpecMissing
)
import requests
from flask import request
import sys
from flask import session
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class SAMSProxy:
def __init__(self, proxySpec: dict):
self._proxySpec = proxySpec
self._check_proxy_spec()
@property
def urlRule(self):
urlRule = self._proxySpec['in'].strip()
urlRule = urlRule.strip('/')
if 0 == len(self._proxySpec['in']):
return '/<path:path>'
return '/' + urlRule + '/<path:path>'
@property
def rootUrlRule(self):
urlRule = self._proxySpec['in'].strip()
urlRule = urlRule.strip('/')
if 0 == len(self._proxySpec['in']):
return '/'
return '/' + urlRule + '/'
def proxy(self, path = ''):
url = '/'.join([self._proxySpec['out'].strip().rstrip('/'), path])
headers = SAMSProxy.__cleaned_dict(request.headers)
headers.pop('Content-Type', None)
body = self._get_first_dict(request.get_json(), dict(request.form))
body = self._pass_param(request.method, 'body', body)
res = requests.request(
method=request.method, url=url,
params=dict(self._pass_param(request.method, 'url', dict(request.args))),
**SAMSProxy._json_or_data(body, request.is_json),
headers=dict(self._pass_param(request.method, 'header', headers)))
return (res.content, res.status_code, res.headers.items())
def _pass_param(self, method, paramType, paramDict = {}):
paramDict.update(self._filter_session(method, paramType))
paramDict.update(self._filter_token(paramType))
return paramDict
def _filter_session(self, method, paramType):
filteredParams = self._proxySpec.get('session_passthrough', {})
filteredParams = filteredParams.get(method.lower(),
filteredParams.get('default', {}))
filteredSession = {}
for param in filteredParams.keys():
if filteredParams[param]['param-type'] is paramType:
filteredSession[filteredParams[param]['name']] = str(session.get(param))
return filteredSession
def _filter_token(self, paramType: str) -> dict:
if paramType == self._proxySpec.get('token', {}).get('param-type'):
return {
self._proxySpec['token']['name']: self._proxySpec['token']['value']}
return {}
def _get_first_dict(self, *possibleDicts) -> dict:
for possibleDict in possibleDicts:
if isinstance(possibleDict, dict):
return possibleDict
return {}
def _check_proxy_spec(self):
self._check_proxy_spec_exists()
self._check_proxy_spec_type()
for mandatoryKey in ('in', 'out'):
SAMSProxy.__check_key_exists(
mandatoryKey = mandatoryKey, testDict = self._proxySpec)
for tokenMandatoryKey in ('name', 'value', 'param-type'):
SAMSProxy.__check_key_exists(
mandatoryKey = tokenMandatoryKey, parentKey = 'token',
testDict = self._proxySpec.get('token'))
SAMSProxy.__check_key_exists(
mandatoryKey = 'default', parentKey = 'session-paassthrough'
, testDict = self._proxySpec.get('session-passthrough'))
for passthroughMandatoryKey in ('name', 'param-type'):
SAMSProxy.__check_key_exists(
mandatoryKey = passthroughMandatoryKey,
parentKey = "session-passthrough['default']",
testDict = self._proxySpec.get('session-passthrough', {}).get('default')
)
def _check_proxy_spec_exists(self):
if not self._proxySpec:
raise(ProxySpecMissing())
def _check_proxy_spec_type(self):
if not isinstance(self._proxySpec, dict):
raise(TypeError('proxySpec ist not an instance of dict'))
@staticmethod
def _json_or_data(body, is_json):
if is_json:
return {'json': body}
return {'data': body}
@staticmethod
def __cleaned_dict(uncleanDict):
"""returns copy of uncleanDict that does not contain elements with value ''
"""
cleanDict = dict(uncleanDict)
for key in list(cleanDict.keys()):
if cleanDict[key] is '':
del(cleanDict[key])
return cleanDict
@staticmethod
def __check_key_exists(mandatoryKey, testDict, parentKey = '') -> None:
if testDict is None:
return
try:
testDict[mandatoryKey]
except KeyError:
raise(ProxySpecKeyMssing(
mandatoryKey + ' is missing in proxySpec ' + parentKey))