11import json
22
3- from cqlengine .connection import connection_manager
3+ from cqlengine .connection import connection_manager , execute
44from cqlengine .exceptions import CQLEngineException
55
66def create_keyspace (name , strategy_class = 'SimpleStrategy' , replication_factor = 3 , durable_writes = True , ** replication_values ):
@@ -15,11 +15,11 @@ def create_keyspace(name, strategy_class='SimpleStrategy', replication_factor=3,
1515 """
1616 with connection_manager () as con :
1717 #TODO: check system tables instead of using cql thrifteries
18- if not any ([name == k .name for k in con .con . client .describe_keyspaces ()]):
19- # if name not in [k.name for k in con.con.client.describe_keyspaces()]:
18+ if not any ([name == k .name for k in con .client .describe_keyspaces ()]):
19+ # if name not in [k.name for k in con.con.client.describe_keyspaces()]:
2020 try :
2121 #Try the 1.1 method
22- con . execute ("""CREATE KEYSPACE {}
22+ execute ("""CREATE KEYSPACE {}
2323 WITH strategy_class = '{}'
2424 AND strategy_options:replication_factor={};""" .format (name , strategy_class , replication_factor ))
2525 except CQLEngineException :
@@ -38,12 +38,12 @@ def create_keyspace(name, strategy_class='SimpleStrategy', replication_factor=3,
3838 if strategy_class != 'SimpleStrategy' :
3939 query += " AND DURABLE_WRITES = {}" .format ('true' if durable_writes else 'false' )
4040
41- con . execute (query )
41+ execute (query )
4242
4343def delete_keyspace (name ):
4444 with connection_manager () as con :
45- if name in [k .name for k in con .con . client .describe_keyspaces ()]:
46- con . execute ("DROP KEYSPACE {}" .format (name ))
45+ if name in [k .name for k in con .client .describe_keyspaces ()]:
46+ execute ("DROP KEYSPACE {}" .format (name ))
4747
4848def create_table (model , create_missing_keyspace = True ):
4949 #construct query string
@@ -55,78 +55,81 @@ def create_table(model, create_missing_keyspace=True):
5555 create_keyspace (model ._get_keyspace ())
5656
5757 with connection_manager () as con :
58- #check for an existing column family
59- #TODO: check system tables instead of using cql thrifteries
60- ks_info = con .con .client .describe_keyspace (model ._get_keyspace ())
61- if not any ([raw_cf_name == cf .name for cf in ks_info .cf_defs ]):
62- qs = ['CREATE TABLE {}' .format (cf_name )]
63-
64- #add column types
65- pkeys = []
66- ckeys = []
67- qtypes = []
68- def add_column (col ):
69- s = col .get_column_def ()
70- if col .primary_key :
71- keys = (pkeys if col .partition_key else ckeys )
72- keys .append ('"{}"' .format (col .db_field_name ))
73- qtypes .append (s )
74- for name , col in model ._columns .items ():
75- add_column (col )
76-
77- qtypes .append ('PRIMARY KEY (({}){})' .format (', ' .join (pkeys ), ckeys and ', ' + ', ' .join (ckeys ) or '' ))
78-
79- qs += ['({})' .format (', ' .join (qtypes ))]
80-
81- with_qs = ['read_repair_chance = {}' .format (model .read_repair_chance )]
82-
83- _order = ["%s %s" % (c .db_field_name , c .clustering_order or 'ASC' ) for c in model ._clustering_keys .values ()]
84- if _order :
85- with_qs .append ("clustering order by ({})" .format (', ' .join (_order )))
86-
87- # add read_repair_chance
88- qs += ['WITH {}' .format (' AND ' .join (with_qs ))]
58+ ks_info = con .client .describe_keyspace (model ._get_keyspace ())
59+
60+ #check for an existing column family
61+ #TODO: check system tables instead of using cql thrifteries
62+ if not any ([raw_cf_name == cf .name for cf in ks_info .cf_defs ]):
63+ qs = ['CREATE TABLE {}' .format (cf_name )]
64+
65+ #add column types
66+ pkeys = []
67+ ckeys = []
68+ qtypes = []
69+ def add_column (col ):
70+ s = col .get_column_def ()
71+ if col .primary_key :
72+ keys = (pkeys if col .partition_key else ckeys )
73+ keys .append ('"{}"' .format (col .db_field_name ))
74+ qtypes .append (s )
75+ for name , col in model ._columns .items ():
76+ add_column (col )
77+
78+ qtypes .append ('PRIMARY KEY (({}){})' .format (', ' .join (pkeys ), ckeys and ', ' + ', ' .join (ckeys ) or '' ))
79+
80+ qs += ['({})' .format (', ' .join (qtypes ))]
81+
82+ with_qs = ['read_repair_chance = {}' .format (model .read_repair_chance )]
83+
84+ _order = ["%s %s" % (c .db_field_name , c .clustering_order or 'ASC' ) for c in model ._clustering_keys .values ()]
85+ if _order :
86+ with_qs .append ("clustering order by ({})" .format (', ' .join (_order )))
87+
88+ # add read_repair_chance
89+ qs += ['WITH {}' .format (' AND ' .join (with_qs ))]
90+ qs = ' ' .join (qs )
91+
92+ try :
93+ execute (qs )
94+ except CQLEngineException as ex :
95+ # 1.2 doesn't return cf names, so we have to examine the exception
96+ # and ignore if it says the column family already exists
97+ if "Cannot add already existing column family" not in unicode (ex ):
98+ raise
99+
100+ #get existing index names, skip ones that already exist
101+ with connection_manager () as con :
102+ ks_info = con .client .describe_keyspace (model ._get_keyspace ())
103+
104+ cf_defs = [cf for cf in ks_info .cf_defs if cf .name == raw_cf_name ]
105+ idx_names = [i .index_name for i in cf_defs [0 ].column_metadata ] if cf_defs else []
106+ idx_names = filter (None , idx_names )
107+
108+ indexes = [c for n ,c in model ._columns .items () if c .index ]
109+ if indexes :
110+ for column in indexes :
111+ if column .db_index_name in idx_names : continue
112+ qs = ['CREATE INDEX index_{}_{}' .format (raw_cf_name , column .db_field_name )]
113+ qs += ['ON {}' .format (cf_name )]
114+ qs += ['("{}")' .format (column .db_field_name )]
89115 qs = ' ' .join (qs )
90116
91117 try :
92- con . execute (qs )
118+ execute (qs )
93119 except CQLEngineException as ex :
94120 # 1.2 doesn't return cf names, so we have to examine the exception
95- # and ignore if it says the column family already exists
96- if "Cannot add already existing column family " not in unicode (ex ):
121+ # and ignore if it says the index already exists
122+ if "Index already exists " not in unicode (ex ):
97123 raise
98124
99- #get existing index names, skip ones that already exist
100- ks_info = con .con .client .describe_keyspace (model ._get_keyspace ())
101- cf_defs = [cf for cf in ks_info .cf_defs if cf .name == raw_cf_name ]
102- idx_names = [i .index_name for i in cf_defs [0 ].column_metadata ] if cf_defs else []
103- idx_names = filter (None , idx_names )
104-
105- indexes = [c for n ,c in model ._columns .items () if c .index ]
106- if indexes :
107- for column in indexes :
108- if column .db_index_name in idx_names : continue
109- qs = ['CREATE INDEX index_{}_{}' .format (raw_cf_name , column .db_field_name )]
110- qs += ['ON {}' .format (cf_name )]
111- qs += ['("{}")' .format (column .db_field_name )]
112- qs = ' ' .join (qs )
113-
114- try :
115- con .execute (qs )
116- except CQLEngineException as ex :
117- # 1.2 doesn't return cf names, so we have to examine the exception
118- # and ignore if it says the index already exists
119- if "Index already exists" not in unicode (ex ):
120- raise
121-
122125
123126def delete_table (model ):
124127 cf_name = model .column_family_name ()
125- with connection_manager () as con :
126- try :
127- con . execute ('drop table {};' .format (cf_name ))
128- except CQLEngineException as ex :
129- #don't freak out if the table doesn't exist
130- if 'Cannot drop non existing column family' not in unicode (ex ):
131- raise
128+
129+ try :
130+ execute ('drop table {};' .format (cf_name ))
131+ except CQLEngineException as ex :
132+ #don't freak out if the table doesn't exist
133+ if 'Cannot drop non existing column family' not in unicode (ex ):
134+ raise
132135
0 commit comments