-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathauthorization.py
More file actions
132 lines (109 loc) · 4.53 KB
/
Copy pathauthorization.py
File metadata and controls
132 lines (109 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import os
import json
from werkzeug.wrappers import Response
from . import utils
__author__ = "asaka <lan@leancloud.rocks>"
APP_ID = os.environ.get("LEANCLOUD_APP_ID")
APP_KEY = os.environ.get("LEANCLOUD_APP_KEY")
ANDX_KEY = os.environ.get("LEANCLOUD_APP_ANDX_KEY")
MASTER_KEY = os.environ.get("LEANCLOUD_APP_MASTER_KEY")
HOOK_KEY = os.environ.get("LEANCLOUD_APP_HOOK_KEY")
_ENABLE_TEST = False
current_environ = None
class AuthorizationMiddleware(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
if _ENABLE_TEST:
global current_environ
current_environ = environ
self.parse_header(environ)
app_params = environ["_app_params"]
if not any(app_params.values()): # all app_params's value is None
self.parse_body(environ)
unauth_response = Response(
json.dumps({"code": 401, "error": "Unauthorized."}),
status=401,
mimetype="application/json",
)
if app_params["id"] is None:
return unauth_response(environ, start_response)
if (APP_ID == app_params["id"]) and (
app_params["key"] in [MASTER_KEY, APP_KEY, ANDX_KEY]
):
return self.app(environ, start_response)
if (APP_ID == app_params["id"]) and (app_params["master_key"] == MASTER_KEY):
return self.app(environ, start_response)
return unauth_response(environ, start_response)
@classmethod
def parse_header(cls, environ):
request = environ["leanengine.request"]
app_id = (
request.headers.get("x-avoscloud-application-id")
or request.headers.get("x-uluru-application-id")
or request.headers.get("x-lc-id")
)
app_key = (
request.headers.get("x-avoscloud-application-key")
or request.headers.get("x-uluru-application-key")
or request.headers.get("x-lc-key")
)
session_token = (
request.headers.get("x-uluru-session-token")
or request.headers.get("x-avoscloud-session-token")
or request.headers.get("x-lc-session")
)
master_key = request.headers.get("x-uluru-master-key") or request.headers.get(
"x-avoscloud-master-key"
)
hook_key = request.headers.get("x-lc-hook-key")
if app_key and ",master" in app_key:
master_key, _ = app_key.split(",")
app_key = None
if app_key is None:
request_sign = request.headers.get(
"x-avoscloud-request-sign"
) or request.headers.get("x-lc-sign")
if request_sign:
request_sign = request_sign.split(",") if request_sign else []
sign = request_sign[0].lower()
timestamp = request_sign[1]
# if len(request_sign) == 3 and request_sign[2] == 'master':
# key = MASTER_KEY
# else:
# APP_KEY
# if sign == utils.sign_by_key(timestamp, key):
# app_key = key
if len(request_sign) == 3:
if (request_sign[2] == "master") and (
sign == utils.sign_by_key(timestamp, MASTER_KEY)
):
master_key = MASTER_KEY
elif (request_sign[2] == "ax-sig-1") and (
sign == utils.sign_by_key(timestamp, ANDX_KEY)
):
app_key = ANDX_KEY
elif sign == utils.sign_by_key(timestamp, APP_KEY):
app_key = APP_KEY
environ["_app_params"] = {
"id": app_id,
"key": app_key,
"master_key": master_key,
"session_token": session_token,
"hook_key": hook_key,
}
@classmethod
def parse_body(cls, environ):
request = environ["leanengine.request"]
if (not request.content_type) or ("text/plain" not in request.content_type):
return
body = json.loads(request.get_data(as_text=True))
environ["_app_params"]["id"] = body.get("_ApplicationId")
environ["_app_params"]["key"] = body.get("_ApplicationKey")
environ["_app_params"]["master_key"] = body.get("_MasterKey")
environ["_app_params"]["session_token"] = body.get("_SessionToken")