Skip to content
Snippets Groups Projects
Select Git revision
  • 5b11ae58d6337f7bb948de45d5b5ff61c831be0f
  • master default protected
2 results

paper-sorting-loc-new.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    sams_proxy.py 4.25 KiB
    from .exceptions import(
      ProxySpecKeyMssing, ProxySpecMissing
    )
    import requests
    from flask import request
    import sys
    from flask import session
    
    def eprint(*args, **kwargs):
      print(*args, file=sys.stderr, **kwargs)
    
    class SAMSProxy:
      
      def __init__(self, proxySpec: dict):
        self._proxySpec = proxySpec
        self._check_proxy_spec()
      
      @property
      def urlRule(self):
        urlRule = self._proxySpec['in'].strip()
        urlRule = urlRule.strip('/')
        if 0 == len(self._proxySpec['in']):
          return '/<path:path>'
        return '/' + urlRule + '/<path:path>' 
      
      @property
      def rootUrlRule(self):
        urlRule = self._proxySpec['in'].strip()
        urlRule = urlRule.strip('/')
        if 0 == len(self._proxySpec['in']):
          return '/'
        return '/' + urlRule + '/' 
          
      def proxy(self, path = ''):
        url = '/'.join([self._proxySpec['out'].strip().rstrip('/'), path])
        headers = SAMSProxy.__cleaned_dict(request.headers)
        headers.pop('Content-Type', None)
        body = self._get_first_dict(request.get_json(), dict(request.form))
        body = self._pass_param(request.method, 'body', body)
        res = requests.request(
          method=request.method, url=url,
          params=dict(self._pass_param(request.method, 'url', dict(request.args))),
          **SAMSProxy._json_or_data(body, request.is_json),
          headers=dict(self._pass_param(request.method, 'header', headers)))
        return (res.content, res.status_code, res.headers.items())
      
      def _pass_param(self, method, paramType, paramDict = {}):
        paramDict.update(self._filter_session(method, paramType))
        paramDict.update(self._filter_token(paramType))
        return paramDict
      
      def _filter_session(self, method, paramType):
        filteredParams = self._proxySpec.get('session_passthrough', {})
        filteredParams = filteredParams.get(method.lower(),
          filteredParams.get('default', {}))
        filteredSession = {}
        for param in filteredParams.keys():
          if filteredParams[param]['param-type'] is paramType:
            filteredSession[filteredParams[param]['name']] = str(session.get(param))
        return filteredSession
      
      def _filter_token(self, paramType: str) -> dict:
        if paramType == self._proxySpec.get('token', {}).get('param-type'):
          return {
            self._proxySpec['token']['name']: self._proxySpec['token']['value']}
        return {}
      
      def _get_first_dict(self, *possibleDicts) -> dict:
        for possibleDict in possibleDicts:
          if isinstance(possibleDict, dict):
            return possibleDict
        return {}
      
      def _check_proxy_spec(self):
        self._check_proxy_spec_exists()
        self._check_proxy_spec_type()
        for mandatoryKey in ('in', 'out'):
          SAMSProxy.__check_key_exists(
            mandatoryKey = mandatoryKey, testDict = self._proxySpec)
        for tokenMandatoryKey in ('name', 'value', 'param-type'):
          SAMSProxy.__check_key_exists(
            mandatoryKey = tokenMandatoryKey, parentKey = 'token',
            testDict = self._proxySpec.get('token'))
        SAMSProxy.__check_key_exists(
            mandatoryKey = 'default', parentKey = 'session-paassthrough'
            , testDict = self._proxySpec.get('session-passthrough'))
        for passthroughMandatoryKey in ('name', 'param-type'):
          SAMSProxy.__check_key_exists(
            mandatoryKey = passthroughMandatoryKey,
            parentKey = "session-passthrough['default']",
            testDict = self._proxySpec.get('session-passthrough', {}).get('default')
          )
    
      def _check_proxy_spec_exists(self):
        if not self._proxySpec:
          raise(ProxySpecMissing())
      
      def _check_proxy_spec_type(self):
        if not isinstance(self._proxySpec, dict):
          raise(TypeError('proxySpec ist not an instance of dict'))
      
      @staticmethod
      def _json_or_data(body, is_json):
        if is_json:
          return {'json': body}
        return {'data': body}
      
      @staticmethod
      def __cleaned_dict(uncleanDict):
        """returns copy of uncleanDict that does not contain elements with value ''
        """
        cleanDict = dict(uncleanDict)
        for key in list(cleanDict.keys()):
          if cleanDict[key] is '':
            del(cleanDict[key])
        return cleanDict
      
      @staticmethod
      def __check_key_exists(mandatoryKey, testDict, parentKey = '') -> None:
        if testDict is None:
          return
        try:
          testDict[mandatoryKey]
        except KeyError:
          raise(ProxySpecKeyMssing(
            mandatoryKey + ' is missing in proxySpec ' + parentKey))