Skip to content

Commit 37259b9

Browse files
author
Jon Haddad
committed
Getting connection pooling working with mocking library
Verified throwing exception when no servers are available, but correctly recovering and hitting the next server when one is. fixing minor pooling tests ensure we get the right exception back when no servers are available working on tests for retry connections working despite failure Removed old connection_manager and replaced with a simple context manager that allows for easy access to clients within the main pool
1 parent e6a7934 commit 37259b9

7 files changed

Lines changed: 167 additions & 188 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ html/
3939
#Mr Developer
4040
.mr.developer.cfg
4141
.noseids
42+
/commitlog
43+
/data
44+

cqlengine/connection.py

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
import cql
1010
import logging
1111

12+
from copy import copy
1213
from cqlengine.exceptions import CQLEngineException
1314

15+
from contextlib import contextmanager
16+
1417
from thrift.transport.TTransport import TTransportException
1518

1619
LOG = logging.getLogger('cqlengine.cql')
@@ -20,7 +23,9 @@ class CQLConnectionError(CQLEngineException): pass
2023
Host = namedtuple('Host', ['name', 'port'])
2124

2225
_max_connections = 10
23-
_connection_pool = None
26+
27+
# global connection pool
28+
connection_pool = None
2429

2530
def setup(hosts, username=None, password=None, max_connections=10, default_keyspace=None):
2631
"""
@@ -29,7 +34,7 @@ def setup(hosts, username=None, password=None, max_connections=10, default_keysp
2934
:param hosts: list of hosts, strings in the <hostname>:<port>, or just <hostname>
3035
"""
3136
global _max_connections
32-
global _connection_pool
37+
global connection_pool
3338
_max_connections = max_connections
3439

3540
if default_keyspace:
@@ -50,16 +55,13 @@ def setup(hosts, username=None, password=None, max_connections=10, default_keysp
5055
if not _hosts:
5156
raise CQLConnectionError("At least one host required")
5257

53-
_connection_pool = ConnectionPool(_hosts)
58+
connection_pool = ConnectionPool(_hosts, username, password)
5459

5560

5661
class ConnectionPool(object):
5762
"""Handles pooling of database connections."""
5863

59-
# Connection pool queue
60-
_queue = None
61-
62-
def __init__(self, hosts, username, password):
64+
def __init__(self, hosts, username=None, password=None):
6365
self._hosts = hosts
6466
self._username = username
6567
self._password = password
@@ -113,58 +115,40 @@ def _create_connection(self):
113115
if not self._hosts:
114116
raise CQLConnectionError("At least one host required")
115117

116-
host = _hosts[_host_idx]
117-
118-
new_conn = cql.connect(host.name, host.port, user=_username, password=_password)
119-
new_conn.set_cql_version('3.0.0')
120-
return new_conn
121-
122-
123-
class connection_manager(object):
124-
"""
125-
Connection failure tolerant connection manager. Written to be used in a 'with' block for connection pooling
126-
"""
127-
def __init__(self):
128-
if not _hosts:
129-
raise CQLConnectionError("No connections have been configured, call cqlengine.connection.setup")
130-
self.keyspace = None
131-
self.con = ConnectionPool.get()
132-
self.cur = None
118+
hosts = copy(self._hosts)
119+
random.shuffle(hosts)
133120

134-
def close(self):
135-
if self.cur: self.cur.close()
136-
ConnectionPool.put(self.con)
137-
138-
def __enter__(self):
139-
return self
121+
for host in hosts:
122+
try:
123+
new_conn = cql.connect(host.name, host.port, user=self._username, password=self._password)
124+
new_conn.set_cql_version('3.0.0')
125+
return new_conn
126+
except Exception as e:
127+
logging.debug("Could not establish connection to {}:{}".format(host.name, host.port))
128+
pass
140129

141-
def __exit__(self, type, value, traceback):
142-
self.close()
130+
raise CQLConnectionError("Could not connect to any server in cluster")
143131

144-
def execute(self, query, params={}):
145-
"""
146-
Gets a connection from the pool and executes the given query, returns the cursor
132+
def execute(self, query, params):
133+
try:
134+
con = self.get()
135+
cur = con.cursor()
136+
cur.execute(query, params)
137+
self.put(con)
138+
return cur
139+
except cql.ProgrammingError as ex:
140+
raise CQLEngineException(unicode(ex))
141+
except TTransportException:
142+
pass
147143

148-
if there's a connection problem, this will silently create a new connection pool
149-
from the available hosts, and remove the problematic host from the host list
150-
"""
151-
global _host_idx
144+
raise CQLEngineException("Could not execute query against the cluster")
152145

153-
for i in range(len(_hosts)):
154-
try:
155-
LOG.debug('{} {}'.format(query, repr(params)))
156-
self.cur = self.con.cursor()
157-
self.cur.execute(query, params)
158-
return self.cur
159-
except cql.ProgrammingError as ex:
160-
raise CQLEngineException(unicode(ex))
161-
except TTransportException:
162-
#TODO: check for other errors raised in the event of a connection / server problem
163-
#move to the next connection and set the connection pool
164-
_host_idx += 1
165-
_host_idx %= len(_hosts)
166-
self.con.close()
167-
self.con = ConnectionPool._create_connection()
168-
169-
raise CQLConnectionError("couldn't reach a Cassandra server")
146+
def execute(query, params={}):
147+
return connection_pool.execute(query, params)
170148

149+
@contextmanager
150+
def connection_manager():
151+
global connection_pool
152+
tmp = connection_pool.get()
153+
yield tmp
154+
connection_pool.put(tmp)

cqlengine/management.py

Lines changed: 74 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
22

3-
from cqlengine.connection import connection_manager
3+
from cqlengine.connection import connection_manager, execute
44
from cqlengine.exceptions import CQLEngineException
55

66
def create_keyspace(name, strategy_class='SimpleStrategy', replication_factor=3, durable_writes=True, **replication_values):
@@ -15,11 +15,11 @@ def create_keyspace(name, strategy_class='SimpleStrategy', replication_factor=3,
1515
"""
1616
with connection_manager() as con:
1717
#TODO: check system tables instead of using cql thrifteries
18-
if not any([name == k.name for k in con.con.client.describe_keyspaces()]):
19-
# if name not in [k.name for k in con.con.client.describe_keyspaces()]:
18+
if not any([name == k.name for k in con.client.describe_keyspaces()]):
19+
# if name not in [k.name for k in con.con.client.describe_keyspaces()]:
2020
try:
2121
#Try the 1.1 method
22-
con.execute("""CREATE KEYSPACE {}
22+
execute("""CREATE KEYSPACE {}
2323
WITH strategy_class = '{}'
2424
AND strategy_options:replication_factor={};""".format(name, strategy_class, replication_factor))
2525
except CQLEngineException:
@@ -38,12 +38,12 @@ def create_keyspace(name, strategy_class='SimpleStrategy', replication_factor=3,
3838
if strategy_class != 'SimpleStrategy':
3939
query += " AND DURABLE_WRITES = {}".format('true' if durable_writes else 'false')
4040

41-
con.execute(query)
41+
execute(query)
4242

4343
def delete_keyspace(name):
4444
with connection_manager() as con:
45-
if name in [k.name for k in con.con.client.describe_keyspaces()]:
46-
con.execute("DROP KEYSPACE {}".format(name))
45+
if name in [k.name for k in con.client.describe_keyspaces()]:
46+
execute("DROP KEYSPACE {}".format(name))
4747

4848
def create_table(model, create_missing_keyspace=True):
4949
#construct query string
@@ -55,78 +55,81 @@ def create_table(model, create_missing_keyspace=True):
5555
create_keyspace(model._get_keyspace())
5656

5757
with connection_manager() as con:
58-
#check for an existing column family
59-
#TODO: check system tables instead of using cql thrifteries
60-
ks_info = con.con.client.describe_keyspace(model._get_keyspace())
61-
if not any([raw_cf_name == cf.name for cf in ks_info.cf_defs]):
62-
qs = ['CREATE TABLE {}'.format(cf_name)]
63-
64-
#add column types
65-
pkeys = []
66-
ckeys = []
67-
qtypes = []
68-
def add_column(col):
69-
s = col.get_column_def()
70-
if col.primary_key:
71-
keys = (pkeys if col.partition_key else ckeys)
72-
keys.append('"{}"'.format(col.db_field_name))
73-
qtypes.append(s)
74-
for name, col in model._columns.items():
75-
add_column(col)
76-
77-
qtypes.append('PRIMARY KEY (({}){})'.format(', '.join(pkeys), ckeys and ', ' + ', '.join(ckeys) or ''))
78-
79-
qs += ['({})'.format(', '.join(qtypes))]
80-
81-
with_qs = ['read_repair_chance = {}'.format(model.read_repair_chance)]
82-
83-
_order = ["%s %s" % (c.db_field_name, c.clustering_order or 'ASC') for c in model._clustering_keys.values()]
84-
if _order:
85-
with_qs.append("clustering order by ({})".format(', '.join(_order)))
86-
87-
# add read_repair_chance
88-
qs += ['WITH {}'.format(' AND '.join(with_qs))]
58+
ks_info = con.client.describe_keyspace(model._get_keyspace())
59+
60+
#check for an existing column family
61+
#TODO: check system tables instead of using cql thrifteries
62+
if not any([raw_cf_name == cf.name for cf in ks_info.cf_defs]):
63+
qs = ['CREATE TABLE {}'.format(cf_name)]
64+
65+
#add column types
66+
pkeys = []
67+
ckeys = []
68+
qtypes = []
69+
def add_column(col):
70+
s = col.get_column_def()
71+
if col.primary_key:
72+
keys = (pkeys if col.partition_key else ckeys)
73+
keys.append('"{}"'.format(col.db_field_name))
74+
qtypes.append(s)
75+
for name, col in model._columns.items():
76+
add_column(col)
77+
78+
qtypes.append('PRIMARY KEY (({}){})'.format(', '.join(pkeys), ckeys and ', ' + ', '.join(ckeys) or ''))
79+
80+
qs += ['({})'.format(', '.join(qtypes))]
81+
82+
with_qs = ['read_repair_chance = {}'.format(model.read_repair_chance)]
83+
84+
_order = ["%s %s" % (c.db_field_name, c.clustering_order or 'ASC') for c in model._clustering_keys.values()]
85+
if _order:
86+
with_qs.append("clustering order by ({})".format(', '.join(_order)))
87+
88+
# add read_repair_chance
89+
qs += ['WITH {}'.format(' AND '.join(with_qs))]
90+
qs = ' '.join(qs)
91+
92+
try:
93+
execute(qs)
94+
except CQLEngineException as ex:
95+
# 1.2 doesn't return cf names, so we have to examine the exception
96+
# and ignore if it says the column family already exists
97+
if "Cannot add already existing column family" not in unicode(ex):
98+
raise
99+
100+
#get existing index names, skip ones that already exist
101+
with connection_manager() as con:
102+
ks_info = con.client.describe_keyspace(model._get_keyspace())
103+
104+
cf_defs = [cf for cf in ks_info.cf_defs if cf.name == raw_cf_name]
105+
idx_names = [i.index_name for i in cf_defs[0].column_metadata] if cf_defs else []
106+
idx_names = filter(None, idx_names)
107+
108+
indexes = [c for n,c in model._columns.items() if c.index]
109+
if indexes:
110+
for column in indexes:
111+
if column.db_index_name in idx_names: continue
112+
qs = ['CREATE INDEX index_{}_{}'.format(raw_cf_name, column.db_field_name)]
113+
qs += ['ON {}'.format(cf_name)]
114+
qs += ['("{}")'.format(column.db_field_name)]
89115
qs = ' '.join(qs)
90116

91117
try:
92-
con.execute(qs)
118+
execute(qs)
93119
except CQLEngineException as ex:
94120
# 1.2 doesn't return cf names, so we have to examine the exception
95-
# and ignore if it says the column family already exists
96-
if "Cannot add already existing column family" not in unicode(ex):
121+
# and ignore if it says the index already exists
122+
if "Index already exists" not in unicode(ex):
97123
raise
98124

99-
#get existing index names, skip ones that already exist
100-
ks_info = con.con.client.describe_keyspace(model._get_keyspace())
101-
cf_defs = [cf for cf in ks_info.cf_defs if cf.name == raw_cf_name]
102-
idx_names = [i.index_name for i in cf_defs[0].column_metadata] if cf_defs else []
103-
idx_names = filter(None, idx_names)
104-
105-
indexes = [c for n,c in model._columns.items() if c.index]
106-
if indexes:
107-
for column in indexes:
108-
if column.db_index_name in idx_names: continue
109-
qs = ['CREATE INDEX index_{}_{}'.format(raw_cf_name, column.db_field_name)]
110-
qs += ['ON {}'.format(cf_name)]
111-
qs += ['("{}")'.format(column.db_field_name)]
112-
qs = ' '.join(qs)
113-
114-
try:
115-
con.execute(qs)
116-
except CQLEngineException as ex:
117-
# 1.2 doesn't return cf names, so we have to examine the exception
118-
# and ignore if it says the index already exists
119-
if "Index already exists" not in unicode(ex):
120-
raise
121-
122125

123126
def delete_table(model):
124127
cf_name = model.column_family_name()
125-
with connection_manager() as con:
126-
try:
127-
con.execute('drop table {};'.format(cf_name))
128-
except CQLEngineException as ex:
129-
#don't freak out if the table doesn't exist
130-
if 'Cannot drop non existing column family' not in unicode(ex):
131-
raise
128+
129+
try:
130+
execute('drop table {};'.format(cf_name))
131+
except CQLEngineException as ex:
132+
#don't freak out if the table doesn't exist
133+
if 'Cannot drop non existing column family' not in unicode(ex):
134+
raise
132135

0 commit comments

Comments
 (0)