forked from artyom-beilis/cppcms
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdisco_test.py
More file actions
executable file
·80 lines (73 loc) · 2.54 KB
/
Copy pathdisco_test.py
File metadata and controls
executable file
·80 lines (73 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
# coding=utf-8
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
import sys
import socket
import time
import os.path
import traceback
import random
import toscgi
import tofcgi
def load_file(file_name):
file_name = os.path.dirname(sys.argv[0]) + "/" + file_name
f=open(file_name,'rb')
input=f.read()
f.close()
return input
def test_io(input,socket_type,target):
try:
s=socket.socket(socket_type,socket.SOCK_STREAM);
try:
s.connect(target)
if socket_type==socket.AF_INET:
s.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,1)
s.sendall(input)
for x in xrange(0,100):
chunk = s.recv(1024)
if chunk == '':
break
except socket.error:
pass
s.close();
except socket.error:
pass
def usege():
print './disco_test.py (http|fastcgi_tcp|scgi_tcp|fastcgi_unix|scgi_unix)'
test=sys.argv[1]
if test=='http' or test=='fastcgi_tcp' or test=='scgi_tcp':
target=('localhost',8080)
socket_type=socket.AF_INET
else:
target=('/tmp/cppcms_test_socket')
socket_type=socket.AF_UNIX
if test=='http':
input = load_file('disco_test_norm.in');
test_io(input,socket_type,target);
input = load_file('disco_test_gzip.in');
test_io(input,socket_type,target);
input = load_file('disco_test_async_multiple.in');
test_io(input,socket_type,target);
input = load_file('disco_test_async_single.in');
test_io(input,socket_type,target);
elif test=='fastcgi_tcp' or test=='fastcgi_unix':
input = tofcgi.to_fcgi_request(load_file('disco_test_norm_cgi.in'));
test_io(input,socket_type,target);
input = tofcgi.to_fcgi_request(load_file('disco_test_gzip_cgi.in'));
test_io(input,socket_type,target);
input = tofcgi.to_fcgi_request(load_file('disco_test_async_cgi_multiple.in'));
test_io(input,socket_type,target);
input = tofcgi.to_fcgi_request(load_file('disco_test_async_cgi_single.in'));
test_io(input,socket_type,target);
elif test=='scgi_tcp' or test=='scgi_unix':
input = toscgi.toscgi(load_file('disco_test_norm_cgi.in'));
test_io(input,socket_type,target);
input = toscgi.toscgi(load_file('disco_test_gzip_cgi.in'));
test_io(input,socket_type,target);
input = toscgi.toscgi(load_file('disco_test_async_cgi_multiple.in'));
test_io(input,socket_type,target);
input = toscgi.toscgi(load_file('disco_test_async_cgi_single.in'));
test_io(input,socket_type,target);
else:
usege()
time.sleep(0.5);