forked from algorithmiaio/algorithmia-python
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathalgo_response.py
More file actions
55 lines (47 loc) · 1.79 KB
/
Copy pathalgo_response.py
File metadata and controls
55 lines (47 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import base64
from Algorithmia.errors import AlgorithmException
class AlgoResponse(object):
def __init__(self, result, metadata):
self.result = result
self.metadata = metadata
def __repr__(self):
return 'AlgoResponse(result=%s,metadata=%s)' % (self.result, self.metadata)
@staticmethod
def create_algo_response(responseJson):
# Parse response JSON
if 'error' in responseJson:
# Failure
raise parse_exception(responseJson['error'])
else:
metadata = Metadata(responseJson['metadata'])
# Success, check content_type
if responseJson['metadata']['content_type'] == 'binary':
# Decode Base64 encoded binary file
return AlgoResponse(base64.b64decode(responseJson['result']), metadata)
elif responseJson['metadata']['content_type'] == 'void':
return AlgoResponse(None, metadata)
else:
return AlgoResponse(responseJson['result'], metadata)
def parse_exception(error):
message = error['message']
if 'stacktrace' in error:
stacktrace = error['stacktrace']
else:
stacktrace = None
if 'code' in error:
code = error['code']
else:
code = None
e = AlgorithmException(message=message, code=code)
e.stacktrace = stacktrace
return e
class Metadata(object):
def __init__(self, metadata):
self.content_type = metadata['content_type']
self.duration = metadata['duration']
self.stdout = None
if 'stdout' in metadata:
self.stdout = metadata['stdout']
self.full_metadata = metadata
def __repr__(self):
return "Metadata(content_type='%s',duration=%s,stdout=%s)" % (self.content_type, self.duration, self.stdout)