Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "lz4.h"
21 : #include "pk.h"
22 : #include "sql.h"
23 : #include "utils.h"
24 : #include "dbutils.h"
25 : #include "block.h"
26 :
27 : #ifdef _WIN32
28 : #include <winsock2.h>
29 : #include <ws2tcpip.h>
30 : #else
31 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
32 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
33 : #endif
34 :
35 : #ifndef htonll
36 : #if __BIG_ENDIAN__
37 : #define htonll(x) (x)
38 : #define ntohll(x) (x)
39 : #else
40 : #ifndef htobe64
41 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
42 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
43 : #else
44 : #define htonll(x) htobe64(x)
45 : #define ntohll(x) be64toh(x)
46 : #endif
47 : #endif
48 : #endif
49 :
50 : #define CLOUDSYNC_INIT_NTABLES 64
51 : #define CLOUDSYNC_MIN_DB_VERSION 0
52 :
53 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
54 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
55 : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
56 : #define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
57 : #define CLOUDSYNC_PAYLOAD_VERSION_2 2
58 : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
59 : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
60 :
61 : #ifndef MAX
62 : #define MAX(a, b) (((a)>(b))?(a):(b))
63 : #endif
64 :
65 : #define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
66 :
67 : typedef enum {
68 : CLOUDSYNC_PK_INDEX_TBL = 0,
69 : CLOUDSYNC_PK_INDEX_PK = 1,
70 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
71 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
72 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
73 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
74 : CLOUDSYNC_PK_INDEX_SITEID = 6,
75 : CLOUDSYNC_PK_INDEX_CL = 7,
76 : CLOUDSYNC_PK_INDEX_SEQ = 8
77 : } CLOUDSYNC_PK_INDEX;
78 :
79 : typedef enum {
80 : DBVM_VALUE_ERROR = -1,
81 : DBVM_VALUE_UNCHANGED = 0,
82 : DBVM_VALUE_CHANGED = 1,
83 : } DBVM_VALUE;
84 :
85 : #define SYNCBIT_SET(_data) _data->insync = 1
86 : #define SYNCBIT_RESET(_data) _data->insync = 0
87 :
88 : // MARK: - Deferred column-batch merge -
89 :
90 : typedef struct {
91 : const char *col_name; // pointer into table_context->col_name[idx] (stable)
92 : dbvalue_t *col_value; // duplicated via database_value_dup (owned)
93 : int64_t col_version;
94 : int64_t db_version;
95 : uint8_t site_id[UUID_LEN];
96 : int site_id_len;
97 : int64_t seq;
98 : } merge_pending_entry;
99 :
100 : typedef struct {
101 : cloudsync_table_context *table;
102 : char *pk; // malloc'd copy, freed on flush
103 : int pk_len;
104 : int64_t cl;
105 : bool sentinel_pending;
106 : bool row_exists; // true when the PK already exists locally
107 : int count;
108 : int capacity;
109 : merge_pending_entry *entries;
110 :
111 : // Statement cache — reuse the prepared statement when the column
112 : // combination and row_exists flag match between consecutive PK flushes.
113 : dbvm_t *cached_vm;
114 : bool cached_row_exists;
115 : int cached_col_count;
116 : const char **cached_col_names; // array of pointers into table_context (not owned)
117 : } merge_pending_batch;
118 :
119 : // MARK: -
120 :
121 : struct cloudsync_pk_decode_bind_context {
122 : dbvm_t *vm;
123 : char *tbl;
124 : int64_t tbl_len;
125 : const void *pk;
126 : int64_t pk_len;
127 : char *col_name;
128 : int64_t col_name_len;
129 : int64_t col_version;
130 : int64_t db_version;
131 : const void *site_id;
132 : int64_t site_id_len;
133 : int64_t cl;
134 : int64_t seq;
135 : };
136 :
137 : struct cloudsync_context {
138 : void *db;
139 : char errmsg[1024];
140 : int errcode;
141 :
142 : char *libversion;
143 : uint8_t site_id[UUID_LEN];
144 : int insync;
145 : int debug;
146 : bool merge_equal_values;
147 : void *aux_data;
148 :
149 : // stmts and context values
150 : dbvm_t *schema_version_stmt;
151 : dbvm_t *data_version_stmt;
152 : dbvm_t *db_version_stmt;
153 : dbvm_t *getset_siteid_stmt;
154 : int data_version;
155 : int schema_version;
156 : uint64_t schema_hash;
157 :
158 : // set at transaction start and reset on commit/rollback
159 : int64_t db_version;
160 : // version the DB would have if the transaction committed now
161 : int64_t pending_db_version;
162 : // used to set an order inside each transaction
163 : int seq;
164 :
165 : // optional schema_name to be set in the cloudsync_table_context
166 : char *current_schema;
167 :
168 : // augmented tables are stored in-memory so we do not need to retrieve information about
169 : // col_names and cid from the disk each time a write statement is performed
170 : // we do also not need to use an hash map here because for few tables the direct
171 : // in-memory comparison with table name is faster
172 : cloudsync_table_context **tables; // dense vector: [0..tables_count-1] are valid
173 : int tables_count; // size
174 : int tables_cap; // capacity
175 :
176 : int skip_decode_idx; // -1 in sqlite, col_value index in postgresql
177 :
178 : // deferred column-batch merge (active during payload_apply)
179 : merge_pending_batch *pending_batch;
180 : };
181 :
182 : struct cloudsync_table_context {
183 : table_algo algo; // CRDT algoritm associated to the table
184 : char *name; // table name
185 : char *schema; // table schema
186 : char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
187 : char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
188 : char **col_name; // array of column names
189 : dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
190 : dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
191 : int *col_id; // array of column id
192 : col_algo_t *col_algo; // per-column algorithm (normal or block)
193 : char **col_delimiter; // per-column delimiter for block splitting (NULL = default "\n")
194 : bool has_block_cols; // quick check: does this table have any block columns?
195 : dbvm_t *block_value_read_stmt; // SELECT col_value FROM blocks table
196 : dbvm_t *block_value_write_stmt; // INSERT OR REPLACE into blocks table
197 : dbvm_t *block_value_delete_stmt; // DELETE from blocks table
198 : dbvm_t *block_list_stmt; // SELECT block entries for materialization
199 : char *blocks_ref; // schema-qualified blocks table name
200 : int ncols; // number of non primary key cols
201 : int npks; // number of primary key cols
202 : bool enabled; // flag to check if a table is enabled or disabled
203 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
204 : bool rowid_only; // a table with no primary keys other than the implicit rowid
205 : #endif
206 :
207 : char **pk_name; // array of primary key names
208 :
209 : // precompiled statements
210 : dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
211 : dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
212 : dbvm_t *meta_sentinel_insert_stmt; // insert a local sentinel row
213 : dbvm_t *meta_row_insert_update_stmt; // insert/update a local row
214 : dbvm_t *meta_row_drop_stmt; // delete rows from meta
215 : dbvm_t *meta_update_move_stmt; // update rows in meta when pk changes
216 : dbvm_t *meta_local_cl_stmt; // compute local cl value
217 : dbvm_t *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
218 : dbvm_t *meta_merge_delete_drop;
219 : dbvm_t *meta_zero_clock_stmt;
220 : dbvm_t *meta_col_version_stmt;
221 : dbvm_t *meta_site_id_stmt;
222 :
223 : dbvm_t *real_col_values_stmt; // retrieve all column values based on pk
224 : dbvm_t *real_merge_delete_stmt;
225 : dbvm_t *real_merge_sentinel_stmt;
226 :
227 : bool is_altering; // flag to track if a table alteration is in progress
228 :
229 : // context
230 : cloudsync_context *context;
231 : };
232 :
233 : struct cloudsync_payload_context {
234 : char *buffer;
235 : size_t bsize;
236 : size_t balloc;
237 : size_t bused;
238 : uint64_t nrows;
239 : uint16_t ncols;
240 : };
241 :
242 : #ifdef _MSC_VER
243 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
244 : #define PACKED
245 : #else
246 : #define PACKED __attribute__((__packed__))
247 : #endif
248 :
249 : typedef struct PACKED {
250 : uint32_t signature; // 'CLSY'
251 : uint8_t version; // protocol version
252 : uint8_t libversion[3]; // major.minor.patch
253 : uint32_t expanded_size;
254 : uint16_t ncols;
255 : uint32_t nrows;
256 : uint64_t schema_hash;
257 : uint8_t checksum[6]; // 48 bits checksum (to ensure struct is 32 bytes)
258 : } cloudsync_payload_header;
259 :
260 : #ifdef _MSC_VER
261 : #pragma pack(pop)
262 : #endif
263 :
264 : #if CLOUDSYNC_UNITTEST
265 : bool force_uncompressed_blob = false;
266 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
267 : #else
268 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
269 : #endif
270 :
271 : // Internal prototypes
272 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
273 :
274 : // MARK: - CRDT algos -
275 :
276 622 : table_algo cloudsync_algo_from_name (const char *algo_name) {
277 622 : if (algo_name == NULL) return table_algo_none;
278 :
279 622 : if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
280 266 : if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
281 259 : if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
282 258 : if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
283 :
284 : // if nothing is found
285 257 : return table_algo_none;
286 622 : }
287 :
288 110 : const char *cloudsync_algo_name (table_algo algo) {
289 110 : switch (algo) {
290 105 : case table_algo_crdt_cls: return "cls";
291 1 : case table_algo_crdt_gos: return "gos";
292 1 : case table_algo_crdt_dws: return "dws";
293 1 : case table_algo_crdt_aws: return "aws";
294 1 : case table_algo_none: return NULL;
295 : }
296 1 : return NULL;
297 110 : }
298 :
299 : // MARK: - DBVM Utils -
300 :
301 32049 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
302 32049 : if (!stmt) return DBVM_VALUE_ERROR;
303 :
304 32047 : int rc = databasevm_step(stmt);
305 32047 : if (rc != DBRES_ROW && rc != DBRES_DONE) {
306 2 : if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
307 2 : databasevm_reset(stmt);
308 2 : return DBVM_VALUE_ERROR;
309 : }
310 :
311 32045 : DBVM_VALUE result = DBVM_VALUE_CHANGED;
312 32045 : if (stmt == data->data_version_stmt) {
313 29639 : int version = (int)database_column_int(stmt, 0);
314 29639 : if (version != data->data_version) {
315 225 : data->data_version = version;
316 225 : } else {
317 29414 : result = DBVM_VALUE_UNCHANGED;
318 : }
319 32045 : } else if (stmt == data->schema_version_stmt) {
320 1203 : int version = (int)database_column_int(stmt, 0);
321 1203 : if (version > data->schema_version) {
322 380 : data->schema_version = version;
323 380 : } else {
324 823 : result = DBVM_VALUE_UNCHANGED;
325 : }
326 :
327 2406 : } else if (stmt == data->db_version_stmt) {
328 1203 : data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
329 1203 : }
330 :
331 32045 : databasevm_reset(stmt);
332 32045 : return result;
333 32049 : }
334 :
335 3587 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
336 3587 : int result = -1;
337 3587 : int rc = DBRES_OK;
338 :
339 3587 : if (value) {
340 3586 : rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
341 3586 : if (rc != DBRES_OK) goto cleanup;
342 3586 : }
343 :
344 3587 : rc = databasevm_step(stmt);
345 7174 : if (rc == DBRES_DONE) {
346 1 : result = 0;
347 1 : rc = DBRES_OK;
348 3587 : } else if (rc == DBRES_ROW) {
349 3586 : result = (int)database_column_int(stmt, 0);
350 3586 : rc = DBRES_OK;
351 3586 : }
352 :
353 : cleanup:
354 3587 : databasevm_reset(stmt);
355 3587 : return result;
356 : }
357 :
358 255556 : void dbvm_reset (dbvm_t *stmt) {
359 255556 : if (!stmt) return;
360 232595 : databasevm_clear_bindings(stmt);
361 232595 : databasevm_reset(stmt);
362 255556 : }
363 :
364 : // MARK: - Database Version -
365 :
366 671 : int cloudsync_dbversion_build_query (cloudsync_context *data, char **sql_out) {
367 : // this function must be manually called each time tables changes
368 : // because the query plan changes too and it must be re-prepared
369 : // unfortunately there is no other way
370 :
371 : // we need to execute a query like:
372 : /*
373 : SELECT max(version) as version FROM (
374 : SELECT max(db_version) as version FROM "table1_cloudsync"
375 : UNION ALL
376 : SELECT max(db_version) as version FROM "table2_cloudsync"
377 : UNION ALL
378 : SELECT max(db_version) as version FROM "table3_cloudsync"
379 : UNION
380 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
381 : )
382 : */
383 :
384 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
385 :
386 671 : *sql_out = NULL;
387 671 : return database_select_text(data, SQL_DBVERSION_BUILD_QUERY, sql_out);
388 : }
389 :
390 894 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
391 894 : if (data->db_version_stmt) {
392 443 : databasevm_finalize(data->db_version_stmt);
393 443 : data->db_version_stmt = NULL;
394 443 : }
395 :
396 894 : int64_t count = dbutils_table_settings_count_tables(data);
397 894 : if (count == 0) return DBRES_OK;
398 671 : else if (count == -1) return cloudsync_set_dberror(data);
399 :
400 671 : char *sql = NULL;
401 671 : int rc = cloudsync_dbversion_build_query(data, &sql);
402 671 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
403 :
404 : // A NULL SQL with rc == OK means the generator produced a NULL row:
405 : // sqlite_master has no *_cloudsync meta-tables (for example, the user
406 : // dropped the base table and its meta-table without calling
407 : // cloudsync_cleanup, leaving stale cloudsync_table_settings rows).
408 : // Treat this the same as count == 0: no prepared statement, db_version
409 : // stays at the minimum and will be rebuilt on the next cloudsync_init.
410 : // Genuine errors from database_select_text are handled above.
411 670 : if (!sql) return DBRES_OK;
412 : DEBUG_SQL("db_version_stmt: %s", sql);
413 :
414 669 : rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
415 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
416 669 : cloudsync_memory_free(sql);
417 669 : return rc;
418 894 : }
419 :
420 1203 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
421 1203 : DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
422 1203 : if (schema_changed == DBVM_VALUE_ERROR) return -1;
423 :
424 1203 : if (schema_changed == DBVM_VALUE_CHANGED) {
425 380 : int rc = cloudsync_dbversion_rebuild(data);
426 380 : if (rc != DBRES_OK) return -1;
427 380 : }
428 :
429 1203 : if (!data->db_version_stmt) {
430 0 : data->db_version = CLOUDSYNC_MIN_DB_VERSION;
431 0 : return 0;
432 : }
433 :
434 1203 : DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
435 1203 : if (rc == DBVM_VALUE_ERROR) return -1;
436 1203 : return 0;
437 1203 : }
438 :
439 29641 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
440 : // perform a PRAGMA data_version to check if some other process write any data
441 29641 : DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
442 29641 : if (rc == DBVM_VALUE_ERROR) return -1;
443 :
444 : // db_version is already set and there is no need to update it
445 29639 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
446 :
447 1203 : return cloudsync_dbversion_rerun(data);
448 29641 : }
449 :
450 29615 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
451 29615 : int rc = cloudsync_dbversion_check_uptodate(data);
452 29615 : if (rc != DBRES_OK) return -1;
453 :
454 29614 : int64_t result = data->db_version + 1;
455 29614 : if (result < data->pending_db_version) result = data->pending_db_version;
456 29614 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
457 29614 : data->pending_db_version = result;
458 :
459 29614 : return result;
460 29615 : }
461 :
462 : // MARK: - PK Context -
463 :
464 0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
465 0 : *tbl_len = ctx->tbl_len;
466 0 : return ctx->tbl;
467 : }
468 :
469 0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
470 0 : *pk_len = ctx->pk_len;
471 0 : return (void *)ctx->pk;
472 : }
473 :
474 0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
475 0 : *colname_len = ctx->col_name_len;
476 0 : return ctx->col_name;
477 : }
478 :
479 0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
480 0 : return ctx->cl;
481 : }
482 :
483 0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
484 0 : return ctx->db_version;
485 : }
486 :
487 : // MARK: - CloudSync Context -
488 :
489 12673 : int cloudsync_insync (cloudsync_context *data) {
490 12673 : return data->insync;
491 : }
492 :
493 797 : void *cloudsync_siteid (cloudsync_context *data) {
494 797 : return (void *)data->site_id;
495 : }
496 :
497 2 : void cloudsync_reset_siteid (cloudsync_context *data) {
498 2 : memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
499 2 : }
500 :
501 229 : int cloudsync_load_siteid (cloudsync_context *data) {
502 : // check if site_id was already loaded
503 229 : if (data->site_id[0] != 0) return DBRES_OK;
504 :
505 : // load site_id
506 227 : char *buffer = NULL;
507 227 : int64_t size = 0;
508 227 : int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
509 227 : if (rc != DBRES_OK) return rc;
510 227 : if (!buffer || size != UUID_LEN) {
511 0 : if (buffer) cloudsync_memory_free(buffer);
512 0 : return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
513 : }
514 :
515 227 : memcpy(data->site_id, buffer, UUID_LEN);
516 227 : cloudsync_memory_free(buffer);
517 :
518 227 : return DBRES_OK;
519 229 : }
520 :
521 2 : int64_t cloudsync_dbversion (cloudsync_context *data) {
522 2 : return data->db_version;
523 : }
524 :
525 12158 : int cloudsync_bumpseq (cloudsync_context *data) {
526 12158 : int value = data->seq;
527 12158 : data->seq += 1;
528 12158 : return value;
529 : }
530 :
531 286 : void cloudsync_update_schema_hash (cloudsync_context *data) {
532 286 : database_update_schema_hash(data, &data->schema_hash);
533 286 : }
534 :
535 62354 : void *cloudsync_db (cloudsync_context *data) {
536 62354 : return data->db;
537 : }
538 :
539 513 : int cloudsync_add_dbvms (cloudsync_context *data) {
540 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
541 :
542 513 : if (data->data_version_stmt == NULL) {
543 227 : int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
544 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
545 227 : if (rc != DBRES_OK) return rc;
546 : DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
547 227 : }
548 :
549 513 : if (data->schema_version_stmt == NULL) {
550 227 : int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
551 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
552 227 : if (rc != DBRES_OK) return rc;
553 : DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
554 227 : }
555 :
556 513 : if (data->getset_siteid_stmt == NULL) {
557 : // get and set index of the site_id
558 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
559 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
560 227 : int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
561 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
562 227 : if (rc != DBRES_OK) return rc;
563 : DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
564 227 : }
565 :
566 513 : return cloudsync_dbversion_rebuild(data);
567 513 : }
568 :
569 23 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
570 : // force err_code to be something different than OK
571 23 : if (err_code == DBRES_OK) err_code = database_errcode(data);
572 23 : if (err_code == DBRES_OK) err_code = DBRES_ERROR;
573 :
574 : // compute a meaningful error message
575 23 : if (err_user == NULL) {
576 6 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
577 6 : } else {
578 17 : const char *db_error = database_errmsg(data);
579 : char db_error_copy[sizeof(data->errmsg)];
580 17 : int rc = database_errcode(data);
581 17 : if (rc == DBRES_OK) {
582 17 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
583 17 : } else {
584 0 : if (db_error == data->errmsg) {
585 0 : snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
586 0 : db_error = db_error_copy;
587 0 : }
588 0 : snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
589 : }
590 : }
591 :
592 23 : data->errcode = err_code;
593 23 : return err_code;
594 : }
595 :
596 6 : int cloudsync_set_dberror (cloudsync_context *data) {
597 6 : return cloudsync_set_error(data, NULL, DBRES_OK);
598 : }
599 :
600 14 : const char *cloudsync_errmsg (cloudsync_context *data) {
601 14 : return data->errmsg;
602 : }
603 :
604 4 : int cloudsync_errcode (cloudsync_context *data) {
605 4 : return data->errcode;
606 : }
607 :
608 2 : void cloudsync_reset_error (cloudsync_context *data) {
609 2 : data->errmsg[0] = 0;
610 2 : data->errcode = DBRES_OK;
611 2 : }
612 :
613 3 : void *cloudsync_auxdata (cloudsync_context *data) {
614 3 : return data->aux_data;
615 : }
616 :
617 2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
618 2 : data->aux_data = xdata;
619 2 : }
620 :
621 6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
622 6 : if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
623 5 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
624 5 : data->current_schema = NULL;
625 5 : if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
626 6 : }
627 :
628 1289 : const char *cloudsync_schema (cloudsync_context *data) {
629 1289 : return data->current_schema;
630 : }
631 :
632 4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
633 4 : cloudsync_table_context *table = table_lookup(data, table_name);
634 4 : if (!table) return NULL;
635 :
636 1 : return table->schema;
637 4 : }
638 :
639 : // MARK: - Table Utils -
640 :
641 69 : void table_pknames_free (char **names, int nrows) {
642 69 : if (!names) return;
643 132 : for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
644 46 : cloudsync_memory_free(names);
645 69 : }
646 :
647 284 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
648 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
649 : if (table->rowid_only) {
650 : char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
651 : return sql;
652 : }
653 : #endif
654 :
655 284 : return sql_build_delete_by_pk(table->context, table->name, table->schema);
656 : }
657 :
658 1367 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
659 1367 : char *sql = NULL;
660 :
661 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
662 : if (table->rowid_only) {
663 : if (colname == NULL) {
664 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
665 : sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
666 : } else {
667 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
668 : sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
669 : }
670 : return sql;
671 : }
672 : #endif
673 :
674 1367 : if (colname == NULL) {
675 : // is sentinel insert
676 284 : sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
677 284 : } else {
678 1083 : sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
679 : }
680 1367 : return sql;
681 : }
682 :
683 1082 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
684 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
685 : if (table->rowid_only) {
686 : char *colnamequote = "\"";
687 : char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
688 : return sql;
689 : }
690 : #endif
691 :
692 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
693 1082 : return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
694 : }
695 :
696 284 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
697 : DEBUG_DBFUNCTION("table_create %s", name);
698 :
699 284 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
700 284 : if (!table) return NULL;
701 :
702 284 : table->context = data;
703 284 : table->algo = algo;
704 284 : table->name = cloudsync_string_dup_lowercase(name);
705 :
706 : // Detect schema from metadata table location. If metadata table doesn't
707 : // exist yet (during initialization), fall back to cloudsync_schema() which
708 : // returns the explicitly set schema or current_schema().
709 284 : table->schema = database_table_schema(name);
710 284 : if (!table->schema) {
711 284 : const char *fallback_schema = cloudsync_schema(data);
712 284 : if (fallback_schema) {
713 0 : table->schema = cloudsync_string_dup(fallback_schema);
714 0 : }
715 284 : }
716 :
717 284 : if (!table->name) {
718 0 : cloudsync_memory_free(table);
719 0 : return NULL;
720 : }
721 284 : table->meta_ref = database_build_meta_ref(table->schema, table->name);
722 284 : table->base_ref = database_build_base_ref(table->schema, table->name);
723 284 : table->enabled = true;
724 :
725 284 : return table;
726 284 : }
727 :
728 284 : void table_free (cloudsync_table_context *table) {
729 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
730 284 : if (!table) return;
731 :
732 284 : if (table->col_name) {
733 1327 : for (int i=0; i<table->ncols; ++i) {
734 1082 : cloudsync_memory_free(table->col_name[i]);
735 1082 : }
736 245 : cloudsync_memory_free(table->col_name);
737 245 : }
738 284 : if (table->col_merge_stmt) {
739 1327 : for (int i=0; i<table->ncols; ++i) {
740 1082 : databasevm_finalize(table->col_merge_stmt[i]);
741 1082 : }
742 245 : cloudsync_memory_free(table->col_merge_stmt);
743 245 : }
744 284 : if (table->col_value_stmt) {
745 1327 : for (int i=0; i<table->ncols; ++i) {
746 1082 : databasevm_finalize(table->col_value_stmt[i]);
747 1082 : }
748 245 : cloudsync_memory_free(table->col_value_stmt);
749 245 : }
750 284 : if (table->col_id) {
751 245 : cloudsync_memory_free(table->col_id);
752 245 : }
753 284 : if (table->col_algo) {
754 245 : cloudsync_memory_free(table->col_algo);
755 245 : }
756 284 : if (table->col_delimiter) {
757 1327 : for (int i=0; i<table->ncols; ++i) {
758 1082 : if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
759 1082 : }
760 245 : cloudsync_memory_free(table->col_delimiter);
761 245 : }
762 :
763 284 : if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
764 284 : if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
765 284 : if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
766 284 : if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
767 284 : if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
768 :
769 284 : if (table->name) cloudsync_memory_free(table->name);
770 284 : if (table->schema) cloudsync_memory_free(table->schema);
771 284 : if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
772 284 : if (table->base_ref) cloudsync_memory_free(table->base_ref);
773 284 : if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
774 284 : if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
775 284 : if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
776 284 : if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
777 284 : if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
778 284 : if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
779 284 : if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
780 284 : if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
781 284 : if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
782 284 : if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
783 284 : if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
784 284 : if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
785 284 : if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
786 :
787 284 : if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
788 284 : if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
789 284 : if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
790 :
791 284 : cloudsync_memory_free(table);
792 284 : }
793 :
794 284 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
795 284 : int rc = DBRES_OK;
796 284 : char *sql = NULL;
797 284 : cloudsync_context *data = table->context;
798 :
799 : // META TABLE statements
800 :
801 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
802 :
803 : // precompile the pk exists statement
804 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
805 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
806 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
807 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
808 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
809 :
810 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
811 284 : cloudsync_memory_free(sql);
812 284 : if (rc != DBRES_OK) goto cleanup;
813 :
814 : // precompile the update local sentinel statement
815 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
816 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
817 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
818 :
819 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
820 284 : cloudsync_memory_free(sql);
821 284 : if (rc != DBRES_OK) goto cleanup;
822 :
823 : // precompile the insert local sentinel statement
824 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
825 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
826 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
827 :
828 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
829 284 : cloudsync_memory_free(sql);
830 284 : if (rc != DBRES_OK) goto cleanup;
831 :
832 : // precompile the insert/update local row statement
833 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
834 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
835 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
836 :
837 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
838 284 : cloudsync_memory_free(sql);
839 284 : if (rc != DBRES_OK) goto cleanup;
840 :
841 : // precompile the delete rows from meta
842 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
843 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
844 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
845 :
846 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
847 284 : cloudsync_memory_free(sql);
848 284 : if (rc != DBRES_OK) goto cleanup;
849 :
850 : // precompile the update rows from meta when pk changes
851 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
852 284 : sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
853 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
854 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
855 :
856 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
857 284 : cloudsync_memory_free(sql);
858 284 : if (rc != DBRES_OK) goto cleanup;
859 :
860 : // local cl
861 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
862 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
863 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
864 :
865 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
866 284 : cloudsync_memory_free(sql);
867 284 : if (rc != DBRES_OK) goto cleanup;
868 :
869 : // rowid of the last inserted/updated row in the meta table
870 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
871 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
872 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
873 :
874 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
875 284 : cloudsync_memory_free(sql);
876 284 : if (rc != DBRES_OK) goto cleanup;
877 :
878 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
879 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
880 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
881 :
882 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
883 284 : cloudsync_memory_free(sql);
884 284 : if (rc != DBRES_OK) goto cleanup;
885 :
886 : // zero clock
887 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
888 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
889 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
890 :
891 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
892 284 : cloudsync_memory_free(sql);
893 284 : if (rc != DBRES_OK) goto cleanup;
894 :
895 : // col_version
896 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
897 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
898 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
899 :
900 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
901 284 : cloudsync_memory_free(sql);
902 284 : if (rc != DBRES_OK) goto cleanup;
903 :
904 : // site_id
905 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
906 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
907 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
908 :
909 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
910 284 : cloudsync_memory_free(sql);
911 284 : if (rc != DBRES_OK) goto cleanup;
912 :
913 : // REAL TABLE statements
914 :
915 : // precompile the get column value statement
916 284 : if (ncols > 0) {
917 245 : sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
918 245 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
919 : DEBUG_SQL("real_col_values_stmt: %s", sql);
920 :
921 245 : rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
922 245 : cloudsync_memory_free(sql);
923 245 : if (rc != DBRES_OK) goto cleanup;
924 245 : }
925 :
926 284 : sql = table_build_mergedelete_sql(table);
927 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
928 : DEBUG_SQL("real_merge_delete: %s", sql);
929 :
930 284 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
931 284 : cloudsync_memory_free(sql);
932 284 : if (rc != DBRES_OK) goto cleanup;
933 :
934 284 : sql = table_build_mergeinsert_sql(table, NULL);
935 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
936 : DEBUG_SQL("real_merge_sentinel: %s", sql);
937 :
938 284 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
939 284 : cloudsync_memory_free(sql);
940 284 : if (rc != DBRES_OK) goto cleanup;
941 :
942 : cleanup:
943 284 : if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
944 284 : return rc;
945 : }
946 :
947 184142 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
948 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
949 :
950 184142 : if (table_name) {
951 185768 : for (int i=0; i<data->tables_count; ++i) {
952 185189 : if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
953 1626 : }
954 579 : }
955 :
956 579 : return NULL;
957 184142 : }
958 :
959 169306 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
960 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
961 :
962 315503 : for (int i=0; i<table->ncols; ++i) {
963 315503 : if (strcasecmp(table->col_name[i], col_name) == 0) {
964 169306 : if (index) *index = i;
965 169306 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
966 : }
967 146197 : }
968 :
969 0 : if (index) *index = -1;
970 0 : return NULL;
971 169306 : }
972 :
973 283 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
974 283 : const char *table_name = table->name;
975 : DEBUG_DBFUNCTION("table_remove %s", table_name);
976 :
977 333 : for (int i = 0; i < data->tables_count; ++i) {
978 333 : cloudsync_table_context *t = data->tables[i];
979 :
980 : // pointer compare is fastest but fallback to strcasecmp if not same pointer
981 333 : if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
982 283 : int last = data->tables_count - 1;
983 283 : data->tables[i] = data->tables[last]; // move last into the hole (keeps array dense)
984 283 : data->tables[last] = NULL; // NULLify tail (as an extra security measure)
985 283 : data->tables_count--;
986 283 : return data->tables_count;
987 : }
988 50 : }
989 :
990 0 : return -1;
991 283 : }
992 :
993 1083 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
994 1083 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
995 1083 : cloudsync_context *data = table->context;
996 :
997 1083 : int index = table->ncols;
998 2165 : for (int i=0; i<ncols; i+=2) {
999 1083 : const char *name = values[i];
1000 1083 : int cid = (int)strtol(values[i+1], NULL, 0);
1001 :
1002 1083 : table->col_id[index] = cid;
1003 1083 : table->col_name[index] = cloudsync_string_dup_lowercase(name);
1004 1083 : if (!table->col_name[index]) goto error;
1005 :
1006 1083 : char *sql = table_build_mergeinsert_sql(table, name);
1007 1083 : if (!sql) goto error;
1008 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
1009 :
1010 1083 : int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
1011 1083 : cloudsync_memory_free(sql);
1012 1083 : if (rc != DBRES_OK) goto error;
1013 1082 : if (!table->col_merge_stmt[index]) goto error;
1014 :
1015 1082 : sql = table_build_value_sql(table, name);
1016 1082 : if (!sql) goto error;
1017 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
1018 :
1019 1082 : rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
1020 1082 : cloudsync_memory_free(sql);
1021 1082 : if (rc != DBRES_OK) goto error;
1022 1082 : if (!table->col_value_stmt[index]) goto error;
1023 1082 : }
1024 1082 : table->ncols += 1;
1025 :
1026 1082 : return 0;
1027 :
1028 : error:
1029 : // clean up partially-initialized entry at index
1030 1 : if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
1031 1 : if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
1032 1 : if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1033 1 : return 1;
1034 1083 : }
1035 :
1036 284 : bool table_ensure_capacity (cloudsync_context *data) {
1037 284 : if (data->tables_count < data->tables_cap) return true;
1038 :
1039 0 : int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
1040 0 : size_t bytes = (size_t)new_cap * sizeof(*data->tables);
1041 0 : void *p = cloudsync_memory_realloc(data->tables, bytes);
1042 0 : if (!p) return false;
1043 :
1044 0 : data->tables = (cloudsync_table_context **)p;
1045 0 : data->tables_cap = new_cap;
1046 0 : return true;
1047 284 : }
1048 :
1049 289 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
1050 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
1051 :
1052 : // Check if table already initialized in this connection's context.
1053 : // Note: This prevents same-connection duplicate initialization.
1054 : // SQLite clients cannot distinguish schemas, so having 'public.users'
1055 : // and 'auth.users' would cause sync ambiguity. Users should avoid
1056 : // initializing tables with the same name in different schemas.
1057 : // If two concurrent connections initialize tables with the same name
1058 : // in different schemas, the behavior is undefined.
1059 289 : cloudsync_table_context *table = table_lookup(data, table_name);
1060 289 : if (table) return true;
1061 :
1062 : // check for space availability
1063 284 : if (!table_ensure_capacity(data)) return false;
1064 :
1065 : // setup a new table
1066 284 : table = table_create(data, table_name, algo);
1067 284 : if (!table) return false;
1068 :
1069 : // fill remaining metadata in the table
1070 284 : int count = database_count_pk(data, table_name, false, table->schema);
1071 284 : if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1072 284 : table->npks = count;
1073 284 : if (table->npks == 0) {
1074 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1075 0 : goto abort_add_table;
1076 : #else
1077 : table->rowid_only = true;
1078 : table->npks = 1; // rowid
1079 : #endif
1080 : }
1081 :
1082 284 : int ncols = database_count_nonpk(data, table_name, table->schema);
1083 284 : if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1084 284 : int rc = table_add_stmts(table, ncols);
1085 284 : if (rc != DBRES_OK) goto abort_add_table;
1086 :
1087 : // a table with only pk(s) is totally legal
1088 284 : if (ncols > 0) {
1089 245 : table->col_name = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1090 245 : if (!table->col_name) goto abort_add_table;
1091 :
1092 245 : table->col_id = (int *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(int) * ncols));
1093 245 : if (!table->col_id) goto abort_add_table;
1094 :
1095 245 : table->col_merge_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1096 245 : if (!table->col_merge_stmt) goto abort_add_table;
1097 :
1098 245 : table->col_value_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1099 245 : if (!table->col_value_stmt) goto abort_add_table;
1100 :
1101 245 : table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
1102 245 : if (!table->col_algo) goto abort_add_table;
1103 :
1104 245 : table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1105 245 : if (!table->col_delimiter) goto abort_add_table;
1106 :
1107 : // Pass empty string when schema is NULL; SQL will fall back to current_schema()
1108 245 : const char *schema = table->schema ? table->schema : "";
1109 490 : char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1110 245 : table_name, schema, table_name, schema);
1111 245 : if (!sql) goto abort_add_table;
1112 245 : rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
1113 245 : cloudsync_memory_free(sql);
1114 245 : if (rc == DBRES_ABORT) goto abort_add_table;
1115 244 : }
1116 :
1117 : // append newly created table
1118 283 : data->tables[data->tables_count++] = table;
1119 283 : return true;
1120 :
1121 : abort_add_table:
1122 1 : table_free(table);
1123 1 : return false;
1124 289 : }
1125 :
1126 0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
1127 0 : dbvm_t *vm = NULL;
1128 0 : *persistent = false;
1129 :
1130 0 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1131 0 : if (table) {
1132 0 : char *col_name = NULL;
1133 0 : if (table->ncols > 0) {
1134 0 : col_name = table->col_name[0];
1135 : // retrieve col_value precompiled statement
1136 0 : vm = table_column_lookup(table, col_name, false, NULL);
1137 0 : *persistent = true;
1138 0 : } else {
1139 0 : char *sql = table_build_value_sql(table, "*");
1140 0 : databasevm_prepare(data, sql, (void **)&vm, 0);
1141 0 : cloudsync_memory_free(sql);
1142 0 : *persistent = false;
1143 : }
1144 0 : }
1145 :
1146 0 : return vm;
1147 : }
1148 :
1149 6877 : bool table_enabled (cloudsync_table_context *table) {
1150 6877 : return table->enabled;
1151 : }
1152 :
1153 6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
1154 6 : table->enabled = value;
1155 6 : }
1156 :
1157 19854 : int table_count_cols (cloudsync_table_context *table) {
1158 19854 : return table->ncols;
1159 : }
1160 :
1161 6771 : int table_count_pks (cloudsync_table_context *table) {
1162 6771 : return table->npks;
1163 : }
1164 :
1165 32938 : const char *table_colname (cloudsync_table_context *table, int index) {
1166 32938 : return table->col_name[index];
1167 : }
1168 :
1169 3586 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
1170 : // check if a row with the same primary key already exists
1171 : // if so, this means the row might have been previously deleted (sentinel)
1172 3586 : return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
1173 : }
1174 :
1175 0 : char **table_pknames (cloudsync_table_context *table) {
1176 0 : return table->pk_name;
1177 : }
1178 :
1179 23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
1180 23 : table_pknames_free(table->pk_name, table->npks);
1181 23 : table->pk_name = pknames;
1182 23 : }
1183 :
1184 49273 : bool table_algo_isgos (cloudsync_table_context *table) {
1185 49273 : return (table->algo == table_algo_crdt_gos);
1186 : }
1187 :
1188 0 : const char *table_schema (cloudsync_table_context *table) {
1189 0 : return table->schema;
1190 : }
1191 :
1192 : // MARK: - Merge Insert -
1193 :
1194 46123 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
1195 46123 : dbvm_t *vm = table->meta_local_cl_stmt;
1196 46123 : int64_t result = -1;
1197 :
1198 46123 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1199 46123 : if (rc != DBRES_OK) goto cleanup;
1200 :
1201 46123 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1202 46123 : if (rc != DBRES_OK) goto cleanup;
1203 :
1204 46123 : rc = databasevm_step(vm);
1205 46123 : if (rc == DBRES_ROW) result = database_column_int(vm, 0);
1206 0 : else if (rc == DBRES_DONE) result = 0;
1207 :
1208 : cleanup:
1209 46123 : if (result == -1) cloudsync_set_dberror(table->context);
1210 46123 : dbvm_reset(vm);
1211 46123 : return result;
1212 : }
1213 :
1214 45090 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
1215 45090 : dbvm_t *vm = table->meta_col_version_stmt;
1216 :
1217 45090 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1218 45090 : if (rc != DBRES_OK) goto cleanup;
1219 :
1220 45090 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1221 45090 : if (rc != DBRES_OK) goto cleanup;
1222 :
1223 45090 : rc = databasevm_step(vm);
1224 70033 : if (rc == DBRES_ROW) {
1225 24943 : *version = database_column_int(vm, 0);
1226 24943 : rc = DBRES_OK;
1227 24943 : }
1228 :
1229 : cleanup:
1230 45090 : if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
1231 45090 : dbvm_reset(vm);
1232 45090 : return rc;
1233 : }
1234 :
1235 25163 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1236 :
1237 : // get/set site_id
1238 25163 : dbvm_t *vm = data->getset_siteid_stmt;
1239 25163 : int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
1240 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1241 :
1242 25163 : rc = databasevm_step(vm);
1243 25163 : if (rc != DBRES_ROW) goto cleanup_merge;
1244 :
1245 25163 : int64_t ord = database_column_int(vm, 0);
1246 25163 : dbvm_reset(vm);
1247 :
1248 25163 : vm = table->meta_winner_clock_stmt;
1249 25163 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
1250 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1251 :
1252 25163 : rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
1253 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1254 :
1255 25163 : rc = databasevm_bind_int(vm, 3, col_version);
1256 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1257 :
1258 25163 : rc = databasevm_bind_int(vm, 4, db_version);
1259 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1260 :
1261 25163 : rc = databasevm_bind_int(vm, 5, seq);
1262 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1263 :
1264 25163 : rc = databasevm_bind_int(vm, 6, ord);
1265 25163 : if (rc != DBRES_OK) goto cleanup_merge;
1266 :
1267 25163 : rc = databasevm_step(vm);
1268 50326 : if (rc == DBRES_ROW) {
1269 25163 : *rowid = database_column_int(vm, 0);
1270 25163 : rc = DBRES_OK;
1271 25163 : }
1272 :
1273 : cleanup_merge:
1274 25163 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1275 25163 : dbvm_reset(vm);
1276 25163 : return rc;
1277 : }
1278 :
1279 : // MARK: - Deferred column-batch merge functions -
1280 :
1281 21362 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
1282 21362 : merge_pending_batch *batch = data->pending_batch;
1283 :
1284 : // Store table and PK on first entry
1285 21362 : if (batch->table == NULL) {
1286 7707 : batch->table = table;
1287 7707 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1288 7707 : if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
1289 7707 : memcpy(batch->pk, pk, pklen);
1290 7707 : batch->pk_len = pklen;
1291 7707 : }
1292 :
1293 : // Ensure capacity
1294 21362 : if (batch->count >= batch->capacity) {
1295 504 : int new_cap = batch->capacity ? batch->capacity * 2 : 8;
1296 504 : merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
1297 504 : if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
1298 504 : batch->entries = new_entries;
1299 504 : batch->capacity = new_cap;
1300 504 : }
1301 :
1302 : // Resolve col_name to a stable pointer from the table context
1303 : // (the incoming col_name may point to VM-owned memory that gets freed on reset)
1304 21362 : int col_idx = -1;
1305 21362 : table_column_lookup(table, col_name, true, &col_idx);
1306 21362 : const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
1307 21362 : if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
1308 :
1309 21362 : merge_pending_entry *e = &batch->entries[batch->count];
1310 21362 : e->col_name = stable_col_name;
1311 21362 : e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
1312 21362 : e->col_version = col_version;
1313 21362 : e->db_version = db_version;
1314 21362 : e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
1315 21362 : memcpy(e->site_id, site_id, e->site_id_len);
1316 21362 : e->seq = seq;
1317 :
1318 21362 : batch->count++;
1319 21362 : return DBRES_OK;
1320 21362 : }
1321 :
1322 18917 : static void merge_pending_free_entries (merge_pending_batch *batch) {
1323 18917 : if (batch->entries) {
1324 35625 : for (int i = 0; i < batch->count; i++) {
1325 21362 : if (batch->entries[i].col_value) {
1326 21362 : database_value_free(batch->entries[i].col_value);
1327 21362 : batch->entries[i].col_value = NULL;
1328 21362 : }
1329 21362 : }
1330 14263 : }
1331 18917 : if (batch->pk) {
1332 7740 : cloudsync_memory_free(batch->pk);
1333 7740 : batch->pk = NULL;
1334 7740 : }
1335 18917 : batch->table = NULL;
1336 18917 : batch->pk_len = 0;
1337 18917 : batch->cl = 0;
1338 18917 : batch->sentinel_pending = false;
1339 18917 : batch->row_exists = false;
1340 18917 : batch->count = 0;
1341 18917 : }
1342 :
1343 18917 : static int merge_flush_pending (cloudsync_context *data) {
1344 18917 : merge_pending_batch *batch = data->pending_batch;
1345 18917 : if (!batch) return DBRES_OK;
1346 :
1347 18917 : int rc = DBRES_OK;
1348 18917 : bool flush_savepoint = false;
1349 :
1350 : // Nothing to write — handle sentinel-only case or skip
1351 18917 : if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
1352 11177 : goto cleanup;
1353 : }
1354 :
1355 : // Wrap database operations in a savepoint so that on failure (e.g. RLS
1356 : // denial) the rollback properly releases all executor resources (open
1357 : // relations, snapshots, plan cache) acquired during the failed statement.
1358 7740 : flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
1359 :
1360 7740 : if (batch->count == 0) {
1361 : // Sentinel with no winning columns (PK-only row)
1362 31 : dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
1363 31 : rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1364 31 : if (rc < 0) {
1365 0 : cloudsync_set_dberror(data);
1366 0 : dbvm_reset(vm);
1367 0 : goto cleanup;
1368 : }
1369 31 : SYNCBIT_SET(data);
1370 31 : rc = databasevm_step(vm);
1371 31 : dbvm_reset(vm);
1372 31 : SYNCBIT_RESET(data);
1373 31 : if (rc == DBRES_DONE) rc = DBRES_OK;
1374 31 : if (rc != DBRES_OK) {
1375 0 : cloudsync_set_dberror(data);
1376 0 : goto cleanup;
1377 : }
1378 31 : goto cleanup;
1379 : }
1380 :
1381 : // Check if cached prepared statement can be reused
1382 7709 : cloudsync_table_context *table = batch->table;
1383 7709 : dbvm_t *vm = NULL;
1384 7709 : bool cache_hit = false;
1385 :
1386 14610 : if (batch->cached_vm &&
1387 7205 : batch->cached_row_exists == batch->row_exists &&
1388 6901 : batch->cached_col_count == batch->count) {
1389 6877 : cache_hit = true;
1390 26587 : for (int i = 0; i < batch->count; i++) {
1391 19736 : if (batch->cached_col_names[i] != batch->entries[i].col_name) {
1392 26 : cache_hit = false;
1393 26 : break;
1394 : }
1395 19710 : }
1396 6877 : }
1397 :
1398 7709 : if (cache_hit) {
1399 6851 : vm = batch->cached_vm;
1400 6851 : dbvm_reset(vm);
1401 6851 : } else {
1402 : // Invalidate old cache
1403 858 : if (batch->cached_vm) {
1404 354 : databasevm_finalize(batch->cached_vm);
1405 354 : batch->cached_vm = NULL;
1406 354 : }
1407 :
1408 : // Build multi-column SQL
1409 858 : const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
1410 858 : if (!colnames) {
1411 0 : rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
1412 0 : goto cleanup;
1413 : }
1414 2510 : for (int i = 0; i < batch->count; i++) {
1415 1652 : colnames[i] = batch->entries[i].col_name;
1416 1652 : }
1417 :
1418 858 : char *sql = batch->row_exists
1419 418 : ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
1420 440 : : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
1421 858 : cloudsync_memory_free(colnames);
1422 :
1423 858 : if (!sql) {
1424 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
1425 0 : goto cleanup;
1426 : }
1427 :
1428 858 : rc = databasevm_prepare(data, sql, &vm, 0);
1429 858 : cloudsync_memory_free(sql);
1430 858 : if (rc != DBRES_OK) {
1431 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
1432 0 : goto cleanup;
1433 : }
1434 :
1435 : // Update cache
1436 858 : batch->cached_vm = vm;
1437 858 : batch->cached_row_exists = batch->row_exists;
1438 858 : batch->cached_col_count = batch->count;
1439 : // Reallocate cached_col_names if needed
1440 858 : if (batch->cached_col_count > 0) {
1441 858 : const char **new_names = (const char **)cloudsync_memory_realloc(
1442 858 : batch->cached_col_names, batch->count * sizeof(const char *));
1443 858 : if (new_names) {
1444 2510 : for (int i = 0; i < batch->count; i++) {
1445 1652 : new_names[i] = batch->entries[i].col_name;
1446 1652 : }
1447 858 : batch->cached_col_names = new_names;
1448 858 : }
1449 858 : }
1450 : }
1451 :
1452 : // Bind PKs (positions 1..npks)
1453 7709 : int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1454 7709 : if (npks < 0) {
1455 0 : cloudsync_set_dberror(data);
1456 0 : dbvm_reset(vm);
1457 0 : rc = DBRES_ERROR;
1458 0 : goto cleanup;
1459 : }
1460 :
1461 : // Bind column values (positions npks+1..npks+count)
1462 29071 : for (int i = 0; i < batch->count; i++) {
1463 21362 : merge_pending_entry *e = &batch->entries[i];
1464 21362 : int bind_idx = npks + 1 + i;
1465 21362 : if (e->col_value) {
1466 21362 : rc = databasevm_bind_value(vm, bind_idx, e->col_value);
1467 21362 : } else {
1468 0 : rc = databasevm_bind_null(vm, bind_idx);
1469 : }
1470 21362 : if (rc != DBRES_OK) {
1471 0 : cloudsync_set_dberror(data);
1472 0 : dbvm_reset(vm);
1473 0 : goto cleanup;
1474 : }
1475 21362 : }
1476 :
1477 : // Execute with SYNCBIT and GOS handling
1478 7709 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1479 7709 : SYNCBIT_SET(data);
1480 7709 : rc = databasevm_step(vm);
1481 7709 : dbvm_reset(vm);
1482 7709 : SYNCBIT_RESET(data);
1483 7709 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1484 :
1485 7709 : if (rc != DBRES_DONE) {
1486 3 : cloudsync_set_dberror(data);
1487 3 : goto cleanup;
1488 : }
1489 7706 : rc = DBRES_OK;
1490 :
1491 : // Call merge_set_winner_clock for each buffered entry
1492 7706 : int64_t rowid = 0;
1493 29060 : for (int i = 0; i < batch->count; i++) {
1494 21354 : merge_pending_entry *e = &batch->entries[i];
1495 42708 : int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
1496 21354 : e->col_name, e->col_version, e->db_version,
1497 21354 : (const char *)e->site_id, e->site_id_len,
1498 21354 : e->seq, &rowid);
1499 21354 : if (clock_rc != DBRES_OK) {
1500 0 : rc = clock_rc;
1501 0 : goto cleanup;
1502 : }
1503 29060 : }
1504 :
1505 : cleanup:
1506 18917 : merge_pending_free_entries(batch);
1507 18917 : if (flush_savepoint) {
1508 7740 : if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
1509 3 : else database_rollback_savepoint(data, "merge_flush");
1510 7740 : }
1511 18917 : return rc;
1512 18917 : }
1513 :
1514 3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1515 : int index;
1516 3167 : dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
1517 3167 : if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
1518 :
1519 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1520 :
1521 : // bind primary key(s)
1522 3167 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1523 3167 : if (rc < 0) {
1524 0 : cloudsync_set_dberror(data);
1525 0 : dbvm_reset(vm);
1526 0 : return rc;
1527 : }
1528 :
1529 : // bind value (always bind all expected parameters for correct prepared statement handling)
1530 3167 : if (col_value) {
1531 3167 : rc = databasevm_bind_value(vm, table->npks+1, col_value);
1532 3167 : if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1533 3167 : } else {
1534 0 : rc = databasevm_bind_null(vm, table->npks+1);
1535 0 : if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
1536 : }
1537 3167 : if (rc != DBRES_OK) {
1538 0 : cloudsync_set_dberror(data);
1539 0 : dbvm_reset(vm);
1540 0 : return rc;
1541 : }
1542 :
1543 : // perform real operation and disable triggers
1544 :
1545 : // in case of GOS we reused the table->col_merge_stmt statement
1546 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1547 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1548 : // the trick is to disable that trigger before executing the statement
1549 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1550 3167 : SYNCBIT_SET(data);
1551 3167 : rc = databasevm_step(vm);
1552 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1553 3167 : dbvm_reset(vm);
1554 3167 : SYNCBIT_RESET(data);
1555 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1556 :
1557 3167 : if (rc != DBRES_DONE) {
1558 0 : cloudsync_set_dberror(data);
1559 0 : return rc;
1560 : }
1561 :
1562 3167 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
1563 3167 : }
1564 :
1565 164 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1566 164 : int rc = DBRES_OK;
1567 :
1568 : // reset return value
1569 164 : *rowid = 0;
1570 :
1571 : // bind pk
1572 164 : dbvm_t *vm = table->real_merge_delete_stmt;
1573 164 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1574 164 : if (rc < 0) {
1575 0 : rc = cloudsync_set_dberror(data);
1576 0 : dbvm_reset(vm);
1577 0 : return rc;
1578 : }
1579 :
1580 : // perform real operation and disable triggers
1581 164 : SYNCBIT_SET(data);
1582 164 : rc = databasevm_step(vm);
1583 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1584 164 : dbvm_reset(vm);
1585 164 : SYNCBIT_RESET(data);
1586 164 : if (rc == DBRES_DONE) rc = DBRES_OK;
1587 164 : if (rc != DBRES_OK) {
1588 0 : cloudsync_set_dberror(data);
1589 0 : return rc;
1590 : }
1591 :
1592 164 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
1593 164 : if (rc != DBRES_OK) return rc;
1594 :
1595 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1596 : // this must never come before `set_winner_clock`
1597 164 : vm = table->meta_merge_delete_drop;
1598 164 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1599 164 : if (rc == DBRES_OK) rc = databasevm_step(vm);
1600 164 : dbvm_reset(vm);
1601 :
1602 164 : if (rc == DBRES_DONE) rc = DBRES_OK;
1603 164 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1604 164 : return rc;
1605 164 : }
1606 :
1607 66 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
1608 66 : dbvm_t *vm = table->meta_zero_clock_stmt;
1609 :
1610 66 : int rc = databasevm_bind_int(vm, 1, db_version);
1611 66 : if (rc != DBRES_OK) goto cleanup;
1612 :
1613 66 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1614 66 : if (rc != DBRES_OK) goto cleanup;
1615 :
1616 66 : rc = databasevm_step(vm);
1617 66 : if (rc == DBRES_DONE) rc = DBRES_OK;
1618 :
1619 : cleanup:
1620 66 : if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
1621 66 : dbvm_reset(vm);
1622 66 : return rc;
1623 : }
1624 :
1625 : // executed only if insert_cl == local_cl
1626 45090 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
1627 :
1628 45090 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1629 :
1630 : int64_t local_version;
1631 45090 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
1632 45090 : if (rc == DBRES_DONE) {
1633 : // no rows returned, the incoming change wins if there's nothing there locally
1634 20147 : *didwin_flag = true;
1635 20147 : return DBRES_OK;
1636 : }
1637 24943 : if (rc != DBRES_OK) return rc;
1638 :
1639 : // rc == DBRES_OK, means that a row with a version exists
1640 24943 : if (local_version != col_version) {
1641 1982 : if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
1642 743 : if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
1643 0 : }
1644 :
1645 : // rc == DBRES_ROW and col_version == local_version, need to compare values
1646 :
1647 : // retrieve col_value precompiled statement
1648 22961 : bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
1649 : dbvm_t *vm;
1650 22961 : if (is_block_col) {
1651 : // Block column: read value from blocks table (pk + col_name bindings)
1652 43 : vm = table_block_value_read_stmt(table);
1653 43 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
1654 43 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1655 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1656 43 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1657 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1658 43 : } else {
1659 22918 : vm = table_column_lookup(table, col_name, false, NULL);
1660 22918 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
1661 :
1662 : // bind primary key values
1663 22918 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1664 22918 : if (rc < 0) {
1665 0 : rc = cloudsync_set_dberror(data);
1666 0 : dbvm_reset(vm);
1667 0 : return rc;
1668 : }
1669 : }
1670 :
1671 : // execute vm
1672 : dbvalue_t *local_value;
1673 22961 : rc = databasevm_step(vm);
1674 22961 : if (rc == DBRES_DONE) {
1675 : // meta entry exists but the actual value is missing
1676 : // we should allow the value_compare function to make a decision
1677 : // value_compare has been modified to handle the case where lvalue is NULL
1678 2 : local_value = NULL;
1679 2 : rc = DBRES_OK;
1680 22961 : } else if (rc == DBRES_ROW) {
1681 22959 : local_value = database_column_value(vm, 0);
1682 22959 : rc = DBRES_OK;
1683 22959 : } else {
1684 0 : goto cleanup;
1685 : }
1686 :
1687 : // compare values
1688 22961 : int ret = dbutils_value_compare(insert_value, local_value);
1689 : // reset after compare, otherwise local value would be deallocated
1690 22961 : dbvm_reset(vm);
1691 22961 : vm = NULL;
1692 :
1693 22961 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1694 22961 : if (!compare_site_id) {
1695 22961 : *didwin_flag = (ret > 0);
1696 22961 : goto cleanup;
1697 : }
1698 :
1699 : // values are the same and merge_equal_values is true
1700 0 : vm = table->meta_site_id_stmt;
1701 0 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1702 0 : if (rc != DBRES_OK) goto cleanup;
1703 :
1704 0 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1705 0 : if (rc != DBRES_OK) goto cleanup;
1706 :
1707 0 : rc = databasevm_step(vm);
1708 0 : if (rc == DBRES_ROW) {
1709 0 : const void *local_site_id = database_column_blob(vm, 0, NULL);
1710 0 : if (!local_site_id) {
1711 0 : dbvm_reset(vm);
1712 0 : return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1713 : }
1714 0 : ret = memcmp(site_id, local_site_id, site_len);
1715 0 : *didwin_flag = (ret > 0);
1716 0 : dbvm_reset(vm);
1717 0 : return DBRES_OK;
1718 : }
1719 :
1720 : // handle error condition here
1721 0 : dbvm_reset(vm);
1722 0 : return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
1723 :
1724 : cleanup:
1725 22961 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1726 22961 : dbvm_reset(vm);
1727 22961 : return rc;
1728 45090 : }
1729 :
1730 66 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1731 :
1732 : // reset return value
1733 66 : *rowid = 0;
1734 :
1735 66 : if (data->pending_batch == NULL) {
1736 : // Immediate mode: execute base table INSERT
1737 0 : dbvm_t *vm = table->real_merge_sentinel_stmt;
1738 0 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1739 0 : if (rc < 0) {
1740 0 : rc = cloudsync_set_dberror(data);
1741 0 : dbvm_reset(vm);
1742 0 : return rc;
1743 : }
1744 :
1745 0 : SYNCBIT_SET(data);
1746 0 : rc = databasevm_step(vm);
1747 0 : dbvm_reset(vm);
1748 0 : SYNCBIT_RESET(data);
1749 0 : if (rc == DBRES_DONE) rc = DBRES_OK;
1750 0 : if (rc != DBRES_OK) {
1751 0 : cloudsync_set_dberror(data);
1752 0 : return rc;
1753 : }
1754 0 : } else {
1755 : // Batch mode: skip base table INSERT, the batch flush will create the row
1756 66 : merge_pending_batch *batch = data->pending_batch;
1757 66 : batch->sentinel_pending = true;
1758 66 : if (batch->table == NULL) {
1759 33 : batch->table = table;
1760 33 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1761 33 : if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
1762 33 : memcpy(batch->pk, pk, pklen);
1763 33 : batch->pk_len = pklen;
1764 33 : }
1765 : }
1766 :
1767 : // Metadata operations always execute regardless of batch mode
1768 66 : int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
1769 66 : if (rc != DBRES_OK) return rc;
1770 :
1771 66 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
1772 66 : }
1773 :
1774 : // MARK: - Block-level merge helpers -
1775 :
1776 : // Store a block value in the blocks table
1777 379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
1778 379 : dbvm_t *vm = table->block_value_write_stmt;
1779 379 : if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
1780 :
1781 379 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1782 379 : if (rc != DBRES_OK) goto cleanup;
1783 379 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1784 379 : if (rc != DBRES_OK) goto cleanup;
1785 379 : if (col_value) {
1786 379 : rc = databasevm_bind_value(vm, 3, col_value);
1787 379 : } else {
1788 0 : rc = databasevm_bind_null(vm, 3);
1789 : }
1790 379 : if (rc != DBRES_OK) goto cleanup;
1791 :
1792 379 : rc = databasevm_step(vm);
1793 379 : if (rc == DBRES_DONE) rc = DBRES_OK;
1794 :
1795 : cleanup:
1796 379 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1797 379 : databasevm_reset(vm);
1798 379 : return rc;
1799 379 : }
1800 :
1801 : // Delete a block value from the blocks table
1802 74 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
1803 74 : dbvm_t *vm = table->block_value_delete_stmt;
1804 74 : if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
1805 :
1806 74 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1807 74 : if (rc != DBRES_OK) goto cleanup;
1808 74 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1809 74 : if (rc != DBRES_OK) goto cleanup;
1810 :
1811 74 : rc = databasevm_step(vm);
1812 74 : if (rc == DBRES_DONE) rc = DBRES_OK;
1813 :
1814 : cleanup:
1815 74 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1816 74 : databasevm_reset(vm);
1817 74 : return rc;
1818 74 : }
1819 :
1820 : // Materialize all alive blocks for a base column into the base table
1821 462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
1822 462 : if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
1823 :
1824 : // Find column index and delimiter
1825 462 : int col_idx = -1;
1826 468 : for (int i = 0; i < table->ncols; i++) {
1827 468 : if (strcasecmp(table->col_name[i], base_col_name) == 0) {
1828 462 : col_idx = i;
1829 462 : break;
1830 : }
1831 6 : }
1832 462 : if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
1833 462 : const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
1834 :
1835 : // Build the LIKE pattern for block col_names: "base_col\x1F%"
1836 462 : char *like_pattern = block_build_colname(base_col_name, "%");
1837 462 : if (!like_pattern) return DBRES_NOMEM;
1838 :
1839 : // Query alive blocks from blocks table joined with metadata
1840 : // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
1841 : // ON b.pk = m.pk AND b.col_name = m.col_name
1842 : // WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
1843 : // ORDER BY b.col_name
1844 462 : dbvm_t *vm = table->block_list_stmt;
1845 462 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1846 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1847 462 : rc = databasevm_bind_text(vm, 2, like_pattern, -1);
1848 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1849 : // Bind pk again for the join condition (parameter 3)
1850 462 : rc = databasevm_bind_blob(vm, 3, pk, pklen);
1851 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1852 462 : rc = databasevm_bind_text(vm, 4, like_pattern, -1);
1853 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1854 :
1855 : // Collect block values
1856 462 : const char **block_values = NULL;
1857 462 : int block_count = 0;
1858 462 : int block_cap = 0;
1859 :
1860 22695 : while ((rc = databasevm_step(vm)) == DBRES_ROW) {
1861 22233 : const char *value = database_column_text(vm, 0);
1862 22233 : if (block_count >= block_cap) {
1863 1076 : int new_cap = block_cap ? block_cap * 2 : 16;
1864 1076 : const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
1865 1076 : if (!new_arr) { rc = DBRES_NOMEM; break; }
1866 1076 : block_values = new_arr;
1867 1076 : block_cap = new_cap;
1868 1076 : }
1869 22233 : block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
1870 22233 : block_count++;
1871 : }
1872 462 : databasevm_reset(vm);
1873 462 : cloudsync_memory_free(like_pattern);
1874 :
1875 462 : if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
1876 : // Free collected values
1877 0 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1878 0 : if (block_values) cloudsync_memory_free((void *)block_values);
1879 0 : return cloudsync_set_dberror(data);
1880 : }
1881 :
1882 : // Materialize text (NULL when no alive blocks)
1883 462 : char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
1884 22695 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1885 462 : if (block_values) cloudsync_memory_free((void *)block_values);
1886 462 : if (block_count > 0 && !text) return DBRES_NOMEM;
1887 :
1888 : // Update the base table column via the col_merge_stmt (with triggers disabled)
1889 462 : dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
1890 462 : if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
1891 :
1892 : // Bind PKs
1893 462 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
1894 462 : if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
1895 :
1896 : // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
1897 462 : int npks = table->npks;
1898 462 : if (text) {
1899 458 : rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
1900 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1901 458 : rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
1902 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1903 458 : } else {
1904 4 : rc = databasevm_bind_null(merge_vm, npks + 1);
1905 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1906 4 : rc = databasevm_bind_null(merge_vm, npks + 2);
1907 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1908 : }
1909 :
1910 : // Execute with triggers disabled
1911 462 : table->enabled = 0;
1912 462 : SYNCBIT_SET(data);
1913 462 : rc = databasevm_step(merge_vm);
1914 462 : databasevm_reset(merge_vm);
1915 462 : SYNCBIT_RESET(data);
1916 462 : table->enabled = 1;
1917 :
1918 462 : cloudsync_memory_free(text);
1919 :
1920 462 : if (rc == DBRES_DONE) rc = DBRES_OK;
1921 462 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
1922 462 : return DBRES_OK;
1923 462 : }
1924 :
1925 : // Accessor for has_block_cols flag
1926 1024 : bool table_has_block_cols (cloudsync_table_context *table) {
1927 1024 : return table && table->has_block_cols;
1928 : }
1929 :
1930 : // Get block column algo for a given column index
1931 11585 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
1932 11585 : if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
1933 11585 : return table->col_algo[index];
1934 11585 : }
1935 :
1936 : // Get block delimiter for a given column index
1937 137 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
1938 137 : if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
1939 137 : return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
1940 137 : }
1941 :
1942 : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
1943 1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
1944 506 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
1945 93 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
1946 93 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
1947 :
1948 3 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
1949 3 : if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
1950 3 : if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
1951 3 : table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
1952 3 : }
1953 :
1954 : // Find column index by name
1955 127 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
1956 127 : if (!table || !col_name) return -1;
1957 131 : for (int i = 0; i < table->ncols; i++) {
1958 131 : if (strcasecmp(table->col_name[i], col_name) == 0) return i;
1959 4 : }
1960 0 : return -1;
1961 127 : }
1962 :
1963 46123 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
1964 : // Handle DWS and AWS algorithms here
1965 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1966 : // Add-Wins Set (AWS): table_algo_crdt_aws
1967 :
1968 : // Causal-Length Set (CLS) Algorithm (default)
1969 :
1970 : // compute the local causal length for the row based on the primary key
1971 : // the causal length is used to determine the order of operations and resolve conflicts.
1972 46123 : int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
1973 46123 : if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
1974 :
1975 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1976 : // because the local changes are more recent
1977 46123 : if (insert_cl < local_cl) return DBRES_OK;
1978 :
1979 : // check if the operation is a delete by examining the causal length
1980 : // even causal lengths typically signify delete operations
1981 45903 : bool is_delete = (insert_cl % 2 == 0);
1982 45903 : if (is_delete) {
1983 : // if it's a delete, check if the local state is at the same causal length
1984 : // if it is, no further action is needed
1985 614 : if (local_cl == insert_cl) return DBRES_OK;
1986 :
1987 : // perform a delete merge if the causal length is newer than the local one
1988 328 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1989 164 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1990 164 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
1991 164 : return rc;
1992 : }
1993 :
1994 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1995 45289 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1996 45289 : if (is_sentinel_only) {
1997 199 : if (local_cl == insert_cl) return DBRES_OK;
1998 :
1999 : // perform a sentinel-only insert to track the existence of the row
2000 132 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
2001 66 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2002 66 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2003 66 : return rc;
2004 : }
2005 :
2006 : // from this point I can be sure that insert_name is not sentinel
2007 :
2008 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
2009 : // odd causal lengths can "resurrect" rows
2010 45090 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
2011 45090 : bool row_exists_locally = local_cl != 0;
2012 :
2013 : // if a resurrection is needed, insert a sentinel to mark the row as alive
2014 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
2015 45090 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
2016 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
2017 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2018 0 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2019 0 : }
2020 :
2021 : // at this point, we determine whether the incoming change wins based on causal length
2022 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
2023 45090 : bool flag = false;
2024 45090 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
2025 45090 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
2026 :
2027 : // check if the incoming change wins and should be applied
2028 45090 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
2029 45090 : if (!does_cid_win) return DBRES_OK;
2030 :
2031 : // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
2032 : // bypass the normal base-table write and instead store the value in the blocks table.
2033 : // The base table column will be materialized from all alive blocks.
2034 21791 : if (block_is_block_colname(insert_name) && table->has_block_cols) {
2035 : // Store or delete block value in blocks table depending on tombstone status
2036 412 : if (insert_col_version % 2 == 0) {
2037 : // Tombstone: remove from blocks table
2038 33 : rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
2039 33 : } else {
2040 379 : rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
2041 : }
2042 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
2043 :
2044 : // Set winner clock in metadata
2045 824 : rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
2046 412 : insert_col_version, insert_db_version,
2047 412 : insert_site_id, insert_site_id_len, insert_seq, rowid);
2048 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
2049 :
2050 : // Materialize the full column from blocks into the base table
2051 412 : char *base_col = block_extract_base_colname(insert_name);
2052 412 : if (base_col) {
2053 412 : rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
2054 412 : cloudsync_memory_free(base_col);
2055 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
2056 412 : }
2057 :
2058 412 : return DBRES_OK;
2059 : }
2060 :
2061 : // perform the final column insert or update if the incoming change wins
2062 21379 : if (data->pending_batch) {
2063 : // Propagate row_exists_locally to the batch on the first winning column.
2064 : // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
2065 : // which matters when RLS policies reference columns not in the payload.
2066 21362 : if (data->pending_batch->table == NULL) {
2067 7707 : data->pending_batch->row_exists = row_exists_locally;
2068 7707 : }
2069 21362 : rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
2070 21362 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
2071 21362 : } else {
2072 17 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2073 17 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
2074 : }
2075 :
2076 21379 : return rc;
2077 46123 : }
2078 :
2079 : // MARK: - Block column setup -
2080 :
2081 : // Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
2082 : // Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
2083 : // reads each row's current value from the base table, splits it into blocks, and inserts
2084 : // the block entries into both the blocks table and the metadata table.
2085 : // Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
2086 73 : static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
2087 73 : const char *col_name = table->col_name[col_idx];
2088 73 : if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
2089 :
2090 73 : const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
2091 73 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2092 :
2093 : // Phase 1: collect all existing PKs that have an alive regular col_name entry
2094 : // AND do not yet have any entries in the blocks table for this column.
2095 : // The NOT IN filter makes this idempotent: rows that were already migrated
2096 : // (or had their blocks created via INSERT) are skipped on subsequent calls.
2097 : // We collect PKs before writing so that writes to the metadata table (Phase 2)
2098 : // do not perturb the read cursor on the same table.
2099 73 : char *like_pattern = block_build_colname(col_name, "%");
2100 73 : if (!like_pattern) return DBRES_NOMEM;
2101 :
2102 73 : char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
2103 73 : if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
2104 73 : dbvm_t *scan_vm = NULL;
2105 73 : int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
2106 73 : cloudsync_memory_free(scan_sql);
2107 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
2108 :
2109 73 : rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
2110 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2111 : // Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
2112 : // because databasevm_bind_text uses SQLITE_STATIC (no copy).
2113 73 : rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
2114 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2115 :
2116 : // Collect pk blobs into a dynamically-grown array of owned copies
2117 73 : void **pks = NULL;
2118 73 : size_t *pklens = NULL;
2119 73 : int pk_count = 0;
2120 73 : int pk_cap = 0;
2121 :
2122 75 : while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
2123 2 : size_t pklen = 0;
2124 2 : const void *pk = database_column_blob(scan_vm, 0, &pklen);
2125 2 : if (!pk || pklen == 0) continue;
2126 :
2127 2 : if (pk_count >= pk_cap) {
2128 1 : int new_cap = pk_cap ? pk_cap * 2 : 8;
2129 1 : void **new_pks = (void **)cloudsync_memory_realloc(pks, (uint64_t)(new_cap * sizeof(void *)));
2130 1 : size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
2131 1 : if (!new_pks || !new_pklens) {
2132 0 : cloudsync_memory_free(new_pks ? new_pks : pks);
2133 0 : cloudsync_memory_free(new_pklens ? new_pklens : pklens);
2134 0 : databasevm_finalize(scan_vm);
2135 0 : return DBRES_NOMEM;
2136 : }
2137 1 : pks = new_pks;
2138 1 : pklens = new_pklens;
2139 1 : pk_cap = new_cap;
2140 1 : }
2141 :
2142 2 : pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
2143 2 : if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
2144 2 : memcpy(pks[pk_count], pk, pklen);
2145 2 : pklens[pk_count] = pklen;
2146 2 : pk_count++;
2147 : }
2148 :
2149 73 : databasevm_finalize(scan_vm);
2150 73 : cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
2151 73 : if (rc != DBRES_DONE && rc != DBRES_OK) {
2152 0 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2153 0 : cloudsync_memory_free(pks);
2154 0 : cloudsync_memory_free(pklens);
2155 0 : return rc;
2156 : }
2157 :
2158 73 : if (pk_count == 0) {
2159 72 : cloudsync_memory_free(pks);
2160 72 : cloudsync_memory_free(pklens);
2161 72 : return DBRES_OK;
2162 : }
2163 :
2164 : // Phase 2: for each collected PK, read the column value, split into blocks,
2165 : // and insert into the blocks table + metadata using INSERT OR IGNORE.
2166 :
2167 1 : char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
2168 1 : if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
2169 1 : dbvm_t *meta_vm = NULL;
2170 1 : rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
2171 1 : cloudsync_memory_free(meta_sql);
2172 1 : if (rc != DBRES_OK) goto cleanup_pks;
2173 :
2174 1 : char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
2175 1 : if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
2176 1 : dbvm_t *blocks_vm = NULL;
2177 1 : rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
2178 1 : cloudsync_memory_free(blocks_sql);
2179 1 : if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
2180 :
2181 1 : dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
2182 :
2183 3 : for (int p = 0; p < pk_count; p++) {
2184 2 : const void *pk = pks[p];
2185 2 : size_t pklen = pklens[p];
2186 :
2187 2 : if (!val_vm) continue;
2188 :
2189 : // Read current column value from the base table
2190 2 : int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
2191 2 : if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
2192 :
2193 2 : int step_rc = databasevm_step(val_vm);
2194 2 : const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
2195 : // Make a copy of text before resetting val_vm, as the pointer is only valid until reset
2196 2 : char *text_copy = text ? cloudsync_string_dup(text) : NULL;
2197 2 : databasevm_reset(val_vm);
2198 :
2199 2 : if (!text_copy) continue; // NULL column value: nothing to migrate
2200 :
2201 : // Split text into blocks and store each one
2202 2 : block_list_t *blocks = block_split(text_copy, delim);
2203 2 : cloudsync_memory_free(text_copy);
2204 2 : if (!blocks) continue;
2205 :
2206 2 : char **positions = block_initial_positions(blocks->count);
2207 2 : if (positions) {
2208 7 : for (int b = 0; b < blocks->count; b++) {
2209 5 : char *block_cn = block_build_colname(col_name, positions[b]);
2210 5 : if (block_cn) {
2211 : // Metadata entry (skip if this block position already exists)
2212 5 : databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
2213 5 : databasevm_bind_text(meta_vm, 2, block_cn, -1);
2214 5 : databasevm_bind_int(meta_vm, 3, 1); // col_version = 1 (alive)
2215 5 : databasevm_bind_int(meta_vm, 4, db_version);
2216 5 : databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
2217 5 : databasevm_step(meta_vm);
2218 5 : databasevm_reset(meta_vm);
2219 :
2220 : // Block value (skip if this block position already exists)
2221 5 : databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
2222 5 : databasevm_bind_text(blocks_vm, 2, block_cn, -1);
2223 5 : databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
2224 5 : databasevm_step(blocks_vm);
2225 5 : databasevm_reset(blocks_vm);
2226 :
2227 5 : cloudsync_memory_free(block_cn);
2228 5 : }
2229 5 : cloudsync_memory_free(positions[b]);
2230 5 : }
2231 2 : cloudsync_memory_free(positions);
2232 2 : }
2233 2 : block_list_free(blocks);
2234 2 : }
2235 :
2236 1 : databasevm_finalize(meta_vm);
2237 1 : databasevm_finalize(blocks_vm);
2238 1 : rc = DBRES_OK;
2239 :
2240 : cleanup_pks:
2241 3 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2242 1 : cloudsync_memory_free(pks);
2243 1 : cloudsync_memory_free(pklens);
2244 1 : return rc;
2245 73 : }
2246 :
2247 74 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
2248 74 : cloudsync_table_context *table = table_lookup(data, table_name);
2249 74 : if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
2250 :
2251 : // Find column index
2252 74 : int col_idx = table_col_index(table, col_name);
2253 74 : if (col_idx < 0) {
2254 : char buf[1024];
2255 0 : snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
2256 0 : return cloudsync_set_error(data, buf, DBRES_ERROR);
2257 : }
2258 :
2259 : // Set column algo
2260 74 : table->col_algo[col_idx] = col_algo_block;
2261 74 : table->has_block_cols = true;
2262 :
2263 : // Set delimiter (can be NULL for default)
2264 74 : if (table->col_delimiter[col_idx]) {
2265 0 : cloudsync_memory_free(table->col_delimiter[col_idx]);
2266 0 : table->col_delimiter[col_idx] = NULL;
2267 0 : }
2268 74 : if (delimiter) {
2269 1 : table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
2270 1 : }
2271 :
2272 : // Create blocks table if not already done
2273 74 : if (!table->blocks_ref) {
2274 71 : table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
2275 71 : if (!table->blocks_ref) return DBRES_NOMEM;
2276 :
2277 : // CREATE TABLE IF NOT EXISTS
2278 71 : char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
2279 71 : if (!sql) return DBRES_NOMEM;
2280 :
2281 71 : int rc = database_exec(data, sql);
2282 71 : cloudsync_memory_free(sql);
2283 71 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
2284 :
2285 : // Prepare block statements
2286 : // Write: upsert into blocks (pk, col_name, col_value)
2287 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
2288 71 : if (!sql) return DBRES_NOMEM;
2289 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
2290 71 : cloudsync_memory_free(sql);
2291 71 : if (rc != DBRES_OK) return rc;
2292 :
2293 : // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
2294 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
2295 71 : if (!sql) return DBRES_NOMEM;
2296 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
2297 71 : cloudsync_memory_free(sql);
2298 71 : if (rc != DBRES_OK) return rc;
2299 :
2300 : // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
2301 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
2302 71 : if (!sql) return DBRES_NOMEM;
2303 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
2304 71 : cloudsync_memory_free(sql);
2305 71 : if (rc != DBRES_OK) return rc;
2306 :
2307 : // List alive blocks for materialization
2308 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
2309 71 : if (!sql) return DBRES_NOMEM;
2310 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
2311 71 : cloudsync_memory_free(sql);
2312 71 : if (rc != DBRES_OK) return rc;
2313 71 : }
2314 :
2315 : // Persist settings (skipped when called from the settings loader, since
2316 : // writing to cloudsync_table_settings while sqlite3_exec is iterating it
2317 : // re-feeds the rewritten row to the cursor and causes an infinite loop).
2318 74 : if (persist) {
2319 73 : int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
2320 73 : if (rc != DBRES_OK) return rc;
2321 :
2322 73 : if (delimiter) {
2323 0 : rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
2324 0 : if (rc != DBRES_OK) return rc;
2325 0 : }
2326 :
2327 : // Migrate any existing tracked rows: populate the blocks table and metadata with
2328 : // block entries derived from the current column value, so that subsequent UPDATE
2329 : // operations can diff against the real existing state instead of treating everything
2330 : // as new, and so this node participates correctly in LWW conflict resolution.
2331 73 : rc = block_migrate_existing_rows(data, table, col_idx);
2332 73 : if (rc != DBRES_OK) return rc;
2333 73 : }
2334 :
2335 74 : return DBRES_OK;
2336 74 : }
2337 :
2338 : // MARK: - Private -
2339 :
2340 234 : bool cloudsync_config_exists (cloudsync_context *data) {
2341 234 : return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
2342 : }
2343 :
2344 713 : bool cloudsync_context_is_initialized (cloudsync_context *data) {
2345 : // A fully initialized context has its persistent "is the DB stale" probe
2346 : // prepared. cloudsync_context_init prepares data_version_stmt (via
2347 : // cloudsync_add_dbvms) only after the cloudsync_site_id table exists, so
2348 : // a non-NULL pointer means cloudsync_init has been called at least once
2349 : // on this connection. Used to produce actionable error messages when
2350 : // callers hit a function before calling cloudsync_init.
2351 713 : return data != NULL && data->data_version_stmt != NULL;
2352 : }
2353 :
2354 249 : cloudsync_context *cloudsync_context_create (void *db) {
2355 249 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
2356 249 : if (!data) return NULL;
2357 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
2358 :
2359 249 : data->libversion = CLOUDSYNC_VERSION;
2360 249 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2361 : #if CLOUDSYNC_DEBUG
2362 : data->debug = 1;
2363 : #endif
2364 :
2365 : // allocate space for 64 tables (it can grow if needed)
2366 249 : uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
2367 249 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
2368 249 : if (!data->tables) {cloudsync_memory_free(data); return NULL;}
2369 :
2370 249 : data->tables_cap = CLOUDSYNC_INIT_NTABLES;
2371 249 : data->tables_count = 0;
2372 249 : data->db = db;
2373 :
2374 : // SQLite exposes col_value as ANY, but other databases require a concrete type.
2375 : // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
2376 : // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
2377 : // It is decoded to the target column type just before applying changes to the base table.
2378 249 : data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
2379 :
2380 249 : return data;
2381 249 : }
2382 :
2383 249 : void cloudsync_context_free (void *ctx) {
2384 249 : cloudsync_context *data = (cloudsync_context *)ctx;
2385 : DEBUG_SETTINGS("cloudsync_context_free %p", data);
2386 249 : if (!data) return;
2387 :
2388 : // free all table contexts and prepared statements
2389 249 : cloudsync_terminate(data);
2390 :
2391 249 : cloudsync_memory_free(data->tables);
2392 249 : cloudsync_memory_free(data);
2393 249 : }
2394 :
2395 344 : const char *cloudsync_context_init (cloudsync_context *data) {
2396 344 : if (!data) return NULL;
2397 :
2398 : // perform init just the first time, if the site_id field is not set.
2399 : // The data->site_id value could exists while settings tables don't exists if the
2400 : // cloudsync_context_init was previously called in init transaction that was rolled back
2401 : // because of an error during the init process.
2402 344 : if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
2403 229 : if (dbutils_settings_init(data) != DBRES_OK) return NULL;
2404 229 : if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
2405 229 : if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
2406 229 : data->schema_hash = database_schema_hash(data);
2407 229 : }
2408 :
2409 344 : return (const char *)data->site_id;
2410 344 : }
2411 :
2412 1370 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
2413 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
2414 :
2415 : // sync data
2416 1370 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
2417 229 : data->schema_version = (int)strtol(value, NULL, 0);
2418 229 : return;
2419 : }
2420 :
2421 1141 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
2422 0 : data->debug = 0;
2423 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
2424 0 : return;
2425 : }
2426 :
2427 1141 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
2428 0 : cloudsync_set_schema(data, value);
2429 0 : return;
2430 : }
2431 1370 : }
2432 :
2433 : #if 0
2434 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
2435 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
2436 : // Unused in this version
2437 : return;
2438 : }
2439 : #endif
2440 :
2441 8319 : int cloudsync_commit_hook (void *ctx) {
2442 8319 : cloudsync_context *data = (cloudsync_context *)ctx;
2443 :
2444 8319 : data->db_version = data->pending_db_version;
2445 8319 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2446 8319 : data->seq = 0;
2447 :
2448 8319 : return DBRES_OK;
2449 : }
2450 :
2451 3 : void cloudsync_rollback_hook (void *ctx) {
2452 3 : cloudsync_context *data = (cloudsync_context *)ctx;
2453 :
2454 3 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2455 3 : data->seq = 0;
2456 3 : }
2457 :
2458 24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
2459 : // init cloudsync_settings
2460 24 : if (cloudsync_context_init(data) == NULL) {
2461 0 : return DBRES_MISUSE;
2462 : }
2463 :
2464 : // lookup table
2465 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2466 24 : if (!table) {
2467 : char buffer[1024];
2468 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2469 1 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2470 : }
2471 :
2472 : // idempotent: if already altering, return OK
2473 23 : if (table->is_altering) return DBRES_OK;
2474 :
2475 : // retrieve primary key(s)
2476 23 : char **names = NULL;
2477 23 : int nrows = 0;
2478 23 : int rc = database_pk_names(data, table_name, &names, &nrows);
2479 23 : if (rc != DBRES_OK) {
2480 : char buffer[1024];
2481 0 : snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
2482 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2483 0 : goto rollback_begin_alter;
2484 : }
2485 :
2486 : // sanity check the number of primary keys
2487 23 : if (nrows != table_count_pks(table)) {
2488 : char buffer[1024];
2489 0 : snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
2490 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2491 0 : goto rollback_begin_alter;
2492 : }
2493 :
2494 : // drop original triggers
2495 23 : rc = database_delete_triggers(data, table_name);
2496 23 : if (rc != DBRES_OK) {
2497 : char buffer[1024];
2498 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
2499 0 : cloudsync_set_error(data, buffer, DBRES_ERROR);
2500 0 : goto rollback_begin_alter;
2501 : }
2502 :
2503 23 : table_set_pknames(table, names);
2504 23 : table->is_altering = true;
2505 23 : return DBRES_OK;
2506 :
2507 : rollback_begin_alter:
2508 0 : if (names) table_pknames_free(names, nrows);
2509 0 : return rc;
2510 24 : }
2511 :
2512 23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
2513 : // check if dbversion needed to be updated
2514 23 : cloudsync_dbversion_check_uptodate(data);
2515 :
2516 : // if primary-key columns change, all row identities change.
2517 : // In that case, the clock table must be dropped, recreated,
2518 : // and backfilled. We detect this by comparing the unique index
2519 : // in the lookaside table with the source table's PKs.
2520 :
2521 : // retrieve primary keys (to check is they changed)
2522 23 : char **result = NULL;
2523 23 : int nrows = 0;
2524 23 : int rc = database_pk_names (data, table->name, &result, &nrows);
2525 23 : if (rc != DBRES_OK || nrows == 0) {
2526 0 : if (nrows == 0) rc = DBRES_MISUSE;
2527 0 : goto finalize;
2528 : }
2529 :
2530 : // check if there are differences
2531 23 : bool pk_diff = (nrows != table->npks);
2532 23 : if (!pk_diff) {
2533 45 : for (int i = 0; i < nrows; ++i) {
2534 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
2535 6 : pk_diff = true;
2536 6 : break;
2537 : }
2538 28 : }
2539 17 : }
2540 :
2541 23 : if (pk_diff) {
2542 : // drop meta-table, it will be recreated
2543 12 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
2544 12 : rc = database_exec(data, sql);
2545 12 : cloudsync_memory_free(sql);
2546 12 : if (rc != DBRES_OK) {
2547 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2548 0 : goto finalize;
2549 : }
2550 12 : } else {
2551 : // compact meta-table
2552 : // delete entries for removed columns
2553 11 : const char *schema = table->schema ? table->schema : "";
2554 11 : char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
2555 11 : rc = database_exec(data, sql);
2556 11 : cloudsync_memory_free(sql);
2557 11 : if (rc != DBRES_OK) {
2558 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2559 0 : goto finalize;
2560 : }
2561 :
2562 11 : sql = sql_build_pk_qualified_collist_query(schema, table->name);
2563 11 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2564 :
2565 11 : char *pkclause = NULL;
2566 11 : rc = database_select_text(data, sql, &pkclause);
2567 11 : cloudsync_memory_free(sql);
2568 11 : if (rc != DBRES_OK) goto finalize;
2569 11 : char *pkvalues = (pkclause) ? pkclause : "rowid";
2570 :
2571 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
2572 11 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
2573 11 : rc = database_exec(data, sql);
2574 11 : if (pkclause) cloudsync_memory_free(pkclause);
2575 11 : cloudsync_memory_free(sql);
2576 11 : if (rc != DBRES_OK) {
2577 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2578 0 : goto finalize;
2579 : }
2580 :
2581 : }
2582 :
2583 : // update key to be later used in cloudsync_dbversion_rebuild
2584 : char buf[256];
2585 23 : snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
2586 23 : dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
2587 :
2588 : finalize:
2589 23 : table_pknames_free(result, nrows);
2590 23 : return rc;
2591 : }
2592 :
2593 24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
2594 24 : int rc = DBRES_MISUSE;
2595 24 : cloudsync_table_context *table = NULL;
2596 :
2597 : // init cloudsync_settings
2598 24 : if (cloudsync_context_init(data) == NULL) {
2599 0 : cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
2600 0 : goto rollback_finalize_alter;
2601 : }
2602 :
2603 : // lookup table
2604 24 : table = table_lookup(data, table_name);
2605 24 : if (!table) {
2606 : char buffer[1024];
2607 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2608 1 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2609 1 : goto rollback_finalize_alter;
2610 : }
2611 :
2612 : // idempotent: if not altering, return OK
2613 23 : if (!table->is_altering) return DBRES_OK;
2614 :
2615 23 : rc = cloudsync_finalize_alter(data, table);
2616 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2617 :
2618 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
2619 : // is_altering is reset implicitly because table_free + cloudsync_init_table
2620 : // will reallocate the table context with zero-initialized memory
2621 23 : table_remove(data, table);
2622 23 : table_free(table);
2623 23 : table = NULL;
2624 :
2625 : // init again cloudsync for the table
2626 23 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
2627 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
2628 23 : rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK);
2629 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2630 :
2631 23 : return DBRES_OK;
2632 :
2633 : rollback_finalize_alter:
2634 1 : if (table) {
2635 0 : table_set_pknames(table, NULL);
2636 0 : table->is_altering = false;
2637 0 : }
2638 1 : return rc;
2639 24 : }
2640 :
2641 : // MARK: - Filter Rewrite -
2642 :
2643 : // Replace bare column names in a filter expression with prefix-qualified names.
2644 : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
2645 : // Columns must be sorted by length descending by the caller to avoid partial matches.
2646 : // Skips content inside single-quoted string literals.
2647 : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
2648 : // Helper: check if an identifier token matches a column name.
2649 112 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
2650 450 : for (int i = 0; i < ncols; ++i) {
2651 388 : if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
2652 50 : return true;
2653 338 : }
2654 62 : return false;
2655 112 : }
2656 :
2657 : // Helper: check if character is part of a SQL identifier.
2658 796 : static bool filter_is_ident_char (char c) {
2659 1262 : return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
2660 466 : (c >= '0' && c <= '9') || c == '_';
2661 : }
2662 :
2663 40 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
2664 40 : if (!filter || !prefix || !columns || ncols <= 0) return NULL;
2665 :
2666 40 : size_t filter_len = strlen(filter);
2667 40 : size_t prefix_len = strlen(prefix);
2668 :
2669 : // Each identifier match grows by at most (prefix_len + 3) bytes.
2670 : // Worst case: the entire filter is one repeated column reference separated by
2671 : // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
2672 40 : size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
2673 40 : size_t cap = filter_len + max_growth + 64;
2674 40 : char *result = (char *)cloudsync_memory_alloc(cap);
2675 40 : if (!result) return NULL;
2676 40 : size_t out = 0;
2677 :
2678 : // Single pass: tokenize into identifiers, quoted strings, and everything else.
2679 40 : size_t i = 0;
2680 336 : while (i < filter_len) {
2681 : // Skip single-quoted string literals verbatim (handle '' escape)
2682 296 : if (filter[i] == '\'') {
2683 6 : result[out++] = filter[i++];
2684 38 : while (i < filter_len) {
2685 38 : if (filter[i] == '\'') {
2686 6 : result[out++] = filter[i++];
2687 : // '' is an escaped quote — keep going
2688 6 : if (i < filter_len && filter[i] == '\'') {
2689 0 : result[out++] = filter[i++];
2690 0 : continue;
2691 : }
2692 6 : break; // single ' ends the literal
2693 : }
2694 32 : result[out++] = filter[i++];
2695 : }
2696 6 : continue;
2697 : }
2698 :
2699 : // Extract identifier token
2700 290 : if (filter_is_ident_char(filter[i])) {
2701 112 : size_t start = i;
2702 540 : while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
2703 112 : size_t token_len = i - start;
2704 :
2705 112 : if (filter_is_column(&filter[start], token_len, columns, ncols)) {
2706 : // Emit PREFIX."column_name"
2707 50 : memcpy(&result[out], prefix, prefix_len); out += prefix_len;
2708 50 : result[out++] = '.';
2709 50 : result[out++] = '"';
2710 50 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2711 50 : result[out++] = '"';
2712 50 : } else {
2713 : // Not a column — copy as-is
2714 62 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2715 : }
2716 112 : continue;
2717 : }
2718 :
2719 : // Any other character — copy as-is
2720 178 : result[out++] = filter[i++];
2721 : }
2722 :
2723 40 : result[out] = '\0';
2724 40 : return result;
2725 40 : }
2726 :
2727 24 : int cloudsync_reset_metatable (cloudsync_context *data, const char *table_name) {
2728 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2729 24 : if (!table) return DBRES_ERROR;
2730 :
2731 24 : char *sql = cloudsync_memory_mprintf(SQL_DELETE_ALL_FROM_CLOUDSYNC_TABLE, table->meta_ref);
2732 24 : int rc = database_exec(data, sql);
2733 24 : cloudsync_memory_free(sql);
2734 24 : if (rc != DBRES_OK) return rc;
2735 :
2736 24 : return cloudsync_refill_metatable(data, table_name);
2737 24 : }
2738 :
2739 308 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
2740 308 : cloudsync_table_context *table = table_lookup(data, table_name);
2741 308 : if (!table) return DBRES_ERROR;
2742 :
2743 308 : dbvm_t *vm = NULL;
2744 308 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2745 :
2746 : // Read row-level filter from settings (if any)
2747 : char filter_buf[2048];
2748 308 : int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
2749 308 : const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
2750 :
2751 308 : const char *schema = table->schema ? table->schema : "";
2752 308 : char *sql = sql_build_pk_collist_query(schema, table_name);
2753 308 : char *pkclause_identifiers = NULL;
2754 308 : int rc = database_select_text(data, sql, &pkclause_identifiers);
2755 308 : cloudsync_memory_free(sql);
2756 308 : if (rc != DBRES_OK) goto finalize;
2757 308 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
2758 :
2759 : // Use database-specific query builder to handle type differences in composite PKs
2760 308 : sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
2761 308 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2762 308 : rc = database_exec(data, sql);
2763 308 : cloudsync_memory_free(sql);
2764 308 : if (rc != DBRES_OK) goto finalize;
2765 :
2766 : // fill missing colums
2767 : // for each non-pk column:
2768 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
2769 : // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
2770 :
2771 308 : if (filter) {
2772 20 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
2773 20 : } else {
2774 288 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
2775 : }
2776 308 : rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
2777 308 : cloudsync_memory_free(sql);
2778 308 : if (rc != DBRES_OK) goto finalize;
2779 :
2780 1457 : for (int i=0; i<table->ncols; ++i) {
2781 1149 : char *col_name = table->col_name[i];
2782 :
2783 1149 : rc = databasevm_bind_text(vm, 1, col_name, -1);
2784 1149 : if (rc != DBRES_OK) goto finalize;
2785 :
2786 1187 : while (1) {
2787 1187 : rc = databasevm_step(vm);
2788 1187 : if (rc == DBRES_ROW) {
2789 38 : size_t pklen = 0;
2790 38 : const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
2791 38 : if (!pk) { rc = DBRES_ERROR; break; }
2792 38 : rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
2793 1187 : } else if (rc == DBRES_DONE) {
2794 1149 : rc = DBRES_OK;
2795 1149 : break;
2796 : } else {
2797 0 : break;
2798 : }
2799 : }
2800 1149 : if (rc != DBRES_OK) goto finalize;
2801 :
2802 1149 : databasevm_reset(vm);
2803 1457 : }
2804 :
2805 : finalize:
2806 308 : if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
2807 308 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
2808 308 : if (vm) databasevm_finalize(vm);
2809 308 : return rc;
2810 308 : }
2811 :
2812 : // MARK: - Local -
2813 :
2814 4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2815 4 : dbvm_t *vm = table->meta_sentinel_update_stmt;
2816 4 : if (!vm) return -1;
2817 :
2818 4 : int rc = databasevm_bind_int(vm, 1, db_version);
2819 4 : if (rc != DBRES_OK) goto cleanup;
2820 :
2821 4 : rc = databasevm_bind_int(vm, 2, seq);
2822 4 : if (rc != DBRES_OK) goto cleanup;
2823 :
2824 4 : rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
2825 4 : if (rc != DBRES_OK) goto cleanup;
2826 :
2827 4 : rc = databasevm_step(vm);
2828 4 : if (rc == DBRES_DONE) rc = DBRES_OK;
2829 :
2830 : cleanup:
2831 4 : DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
2832 4 : databasevm_reset(vm);
2833 4 : return rc;
2834 4 : }
2835 :
2836 129 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2837 129 : dbvm_t *vm = table->meta_sentinel_insert_stmt;
2838 129 : if (!vm) return -1;
2839 :
2840 129 : int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
2841 129 : if (rc != DBRES_OK) goto cleanup;
2842 :
2843 129 : rc = databasevm_bind_int(vm, 2, db_version);
2844 129 : if (rc != DBRES_OK) goto cleanup;
2845 :
2846 129 : rc = databasevm_bind_int(vm, 3, seq);
2847 129 : if (rc != DBRES_OK) goto cleanup;
2848 :
2849 129 : rc = databasevm_bind_int(vm, 4, db_version);
2850 129 : if (rc != DBRES_OK) goto cleanup;
2851 :
2852 129 : rc = databasevm_bind_int(vm, 5, seq);
2853 129 : if (rc != DBRES_OK) goto cleanup;
2854 :
2855 129 : rc = databasevm_step(vm);
2856 129 : if (rc == DBRES_DONE) rc = DBRES_OK;
2857 :
2858 : cleanup:
2859 129 : DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
2860 129 : databasevm_reset(vm);
2861 129 : return rc;
2862 129 : }
2863 :
2864 11953 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
2865 :
2866 11953 : dbvm_t *vm = table->meta_row_insert_update_stmt;
2867 11953 : if (!vm) return -1;
2868 :
2869 11953 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2870 11953 : if (rc != DBRES_OK) goto cleanup;
2871 :
2872 11953 : rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
2873 11953 : if (rc != DBRES_OK) goto cleanup;
2874 :
2875 11953 : rc = databasevm_bind_int(vm, 3, col_version);
2876 11953 : if (rc != DBRES_OK) goto cleanup;
2877 :
2878 11953 : rc = databasevm_bind_int(vm, 4, db_version);
2879 11953 : if (rc != DBRES_OK) goto cleanup;
2880 :
2881 11953 : rc = databasevm_bind_int(vm, 5, seq);
2882 11953 : if (rc != DBRES_OK) goto cleanup;
2883 :
2884 11953 : rc = databasevm_bind_int(vm, 6, db_version);
2885 11953 : if (rc != DBRES_OK) goto cleanup;
2886 :
2887 11953 : rc = databasevm_bind_int(vm, 7, seq);
2888 11953 : if (rc != DBRES_OK) goto cleanup;
2889 :
2890 11953 : rc = databasevm_step(vm);
2891 11953 : if (rc == DBRES_DONE) rc = DBRES_OK;
2892 :
2893 : cleanup:
2894 11953 : DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
2895 11953 : databasevm_reset(vm);
2896 11953 : return rc;
2897 11953 : }
2898 :
2899 11844 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
2900 11844 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
2901 : }
2902 :
2903 41 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
2904 : // Mark a block as deleted by setting col_version = 2 (even = deleted)
2905 41 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
2906 : }
2907 :
2908 41 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
2909 41 : return block_delete_value(data, table, pk, (int)pklen, block_colname);
2910 : }
2911 :
2912 68 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2913 68 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
2914 : }
2915 :
2916 36 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
2917 36 : dbvm_t *vm = table->meta_row_drop_stmt;
2918 36 : if (!vm) return -1;
2919 :
2920 36 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2921 36 : if (rc != DBRES_OK) goto cleanup;
2922 :
2923 36 : rc = databasevm_step(vm);
2924 36 : if (rc == DBRES_DONE) rc = DBRES_OK;
2925 :
2926 : cleanup:
2927 36 : DEBUG_DBERROR(rc, "local_drop_meta", table->context);
2928 36 : databasevm_reset(vm);
2929 36 : return rc;
2930 36 : }
2931 :
2932 32 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
2933 : /*
2934 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
2935 : * to a new primary key (NEW.pk) when a primary key change occurs.
2936 : *
2937 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
2938 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
2939 : *
2940 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
2941 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
2942 : * may be applied incorrectly, leading to data inconsistency.
2943 : *
2944 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
2945 : * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
2946 : * that generates a unique sequence for each row. The update query should ensure that each row moved
2947 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
2948 : */
2949 :
2950 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2951 : // pk2 is the old pk
2952 :
2953 32 : dbvm_t *vm = table->meta_update_move_stmt;
2954 32 : if (!vm) return -1;
2955 :
2956 : // new primary key
2957 32 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2958 32 : if (rc != DBRES_OK) goto cleanup;
2959 :
2960 : // new db_version
2961 32 : rc = databasevm_bind_int(vm, 2, db_version);
2962 32 : if (rc != DBRES_OK) goto cleanup;
2963 :
2964 : // old primary key
2965 32 : rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
2966 32 : if (rc != DBRES_OK) goto cleanup;
2967 :
2968 32 : rc = databasevm_step(vm);
2969 32 : if (rc == DBRES_DONE) rc = DBRES_OK;
2970 :
2971 : cleanup:
2972 32 : DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
2973 32 : databasevm_reset(vm);
2974 32 : return rc;
2975 32 : }
2976 :
2977 : // MARK: - Payload Encode / Decode -
2978 :
2979 687 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
2980 687 : uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
2981 687 : header->checksum[0] = (uint8_t)(h >> 40);
2982 687 : header->checksum[1] = (uint8_t)(h >> 32);
2983 687 : header->checksum[2] = (uint8_t)(h >> 24);
2984 687 : header->checksum[3] = (uint8_t)(h >> 16);
2985 687 : header->checksum[4] = (uint8_t)(h >> 8);
2986 687 : header->checksum[5] = (uint8_t)(h >> 0);
2987 687 : }
2988 :
2989 682 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
2990 2046 : return ((uint64_t)header->checksum[0] << 40) |
2991 1364 : ((uint64_t)header->checksum[1] << 32) |
2992 1364 : ((uint64_t)header->checksum[2] << 24) |
2993 1364 : ((uint64_t)header->checksum[3] << 16) |
2994 1364 : ((uint64_t)header->checksum[4] << 8) |
2995 682 : ((uint64_t)header->checksum[5] << 0);
2996 : }
2997 :
2998 682 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
2999 682 : uint64_t checksum1 = cloudsync_payload_checksum_load(header);
3000 682 : uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
3001 682 : return (checksum1 == checksum2);
3002 : }
3003 :
3004 49217 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
3005 49217 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
3006 :
3007 : // alloc/resize buffer
3008 49217 : if (payload->bused + needed > payload->balloc) {
3009 696 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
3010 696 : size_t balloc = payload->balloc + needed;
3011 :
3012 696 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
3013 696 : if (!buffer) {
3014 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3015 0 : memset(payload, 0, sizeof(cloudsync_payload_context));
3016 0 : return false;
3017 : }
3018 :
3019 696 : payload->buffer = buffer;
3020 696 : payload->balloc = balloc;
3021 696 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
3022 696 : }
3023 :
3024 49217 : return true;
3025 49217 : }
3026 :
3027 50600 : size_t cloudsync_payload_context_size (size_t *header_size) {
3028 50600 : if (header_size) *header_size = sizeof(cloudsync_payload_header);
3029 50600 : return sizeof(cloudsync_payload_context);
3030 : }
3031 :
3032 687 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
3033 687 : memset(header, 0, sizeof(cloudsync_payload_header));
3034 : assert(sizeof(cloudsync_payload_header)==32);
3035 :
3036 : int major, minor, patch;
3037 687 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
3038 :
3039 687 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
3040 687 : header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
3041 687 : header->libversion[0] = (uint8_t)major;
3042 687 : header->libversion[1] = (uint8_t)minor;
3043 687 : header->libversion[2] = (uint8_t)patch;
3044 687 : header->expanded_size = htonl(expanded_size);
3045 687 : header->ncols = htons(ncols);
3046 687 : header->nrows = htonl(nrows);
3047 687 : header->schema_hash = htonll(hash);
3048 687 : }
3049 :
3050 49217 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
3051 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
3052 : // debug_values(argc, argv);
3053 :
3054 : // check if the step function is called for the first time
3055 49217 : if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
3056 :
3057 49217 : size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
3058 49217 : if (cloudsync_payload_encode_check(payload, breq) == false) {
3059 0 : return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
3060 : }
3061 :
3062 49217 : char *buffer = payload->buffer + payload->bused;
3063 49217 : size_t bsize = payload->balloc - payload->bused;
3064 49217 : char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
3065 49217 : if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
3066 :
3067 : // update buffer
3068 49217 : payload->bused += breq;
3069 :
3070 : // increment row counter
3071 49217 : ++payload->nrows;
3072 :
3073 49217 : return DBRES_OK;
3074 49217 : }
3075 :
3076 695 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
3077 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
3078 :
3079 695 : if (payload->nrows == 0) {
3080 8 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3081 8 : payload->buffer = NULL;
3082 8 : payload->bsize = 0;
3083 8 : return DBRES_OK;
3084 : }
3085 :
3086 687 : if (payload->nrows > UINT32_MAX) {
3087 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3088 0 : payload->buffer = NULL;
3089 0 : payload->bsize = 0;
3090 0 : cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
3091 0 : return DBRES_ERROR;
3092 : }
3093 :
3094 : // sanity check about buffer size
3095 687 : int header_size = (int)sizeof(cloudsync_payload_header);
3096 687 : int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
3097 687 : if (buffer_size < 0) {
3098 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3099 0 : payload->buffer = NULL;
3100 0 : payload->bsize = 0;
3101 0 : cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
3102 0 : return DBRES_ERROR;
3103 : }
3104 687 : if (buffer_size > INT_MAX) {
3105 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3106 0 : payload->buffer = NULL;
3107 0 : payload->bsize = 0;
3108 0 : cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
3109 0 : return DBRES_ERROR;
3110 : }
3111 : // try to allocate buffer used for compressed data
3112 687 : int real_buffer_size = (int)buffer_size;
3113 687 : int zbound = LZ4_compressBound(real_buffer_size);
3114 687 : char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
3115 :
3116 : // skip the reserved header from the buffer to compress
3117 687 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
3118 687 : int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
3119 687 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
3120 687 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
3121 :
3122 : // setup payload header
3123 687 : cloudsync_payload_header header = {0};
3124 687 : uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
3125 687 : cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
3126 :
3127 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
3128 687 : if (use_uncompressed_buffer) {
3129 16 : if (zbuffer) cloudsync_memory_free(zbuffer);
3130 16 : zbuffer = payload->buffer;
3131 16 : zused = real_buffer_size;
3132 16 : }
3133 :
3134 : // compute checksum of the buffer
3135 687 : uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
3136 687 : cloudsync_payload_checksum_store(&header, checksum);
3137 :
3138 : // copy header and data to SQLite BLOB
3139 687 : memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
3140 687 : int blob_size = zused + sizeof(cloudsync_payload_header);
3141 687 : payload->bsize = blob_size;
3142 :
3143 : // cleanup memory
3144 687 : if (zbuffer != payload->buffer) {
3145 671 : cloudsync_memory_free (payload->buffer);
3146 671 : payload->buffer = zbuffer;
3147 671 : }
3148 :
3149 687 : return DBRES_OK;
3150 695 : }
3151 :
3152 695 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
3153 : DEBUG_FUNCTION("cloudsync_payload_blob");
3154 :
3155 695 : if (blob_size) *blob_size = (int64_t)payload->bsize;
3156 695 : if (nrows) *nrows = (int64_t)payload->nrows;
3157 695 : return payload->buffer;
3158 : }
3159 :
3160 442233 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
3161 442233 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
3162 442233 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
3163 :
3164 442233 : if (rc == DBRES_OK) {
3165 : // the dbversion index is smaller than seq index, so it is processed first
3166 : // when processing the dbversion column: save the value to the tmp_dbversion field
3167 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
3168 442233 : switch (index) {
3169 : case CLOUDSYNC_PK_INDEX_TBL:
3170 49137 : if (type == DBTYPE_TEXT) {
3171 49137 : decode_context->tbl = pval;
3172 49137 : decode_context->tbl_len = ival;
3173 49137 : }
3174 49137 : break;
3175 : case CLOUDSYNC_PK_INDEX_PK:
3176 49137 : if (type == DBTYPE_BLOB) {
3177 49137 : decode_context->pk = pval;
3178 49137 : decode_context->pk_len = ival;
3179 49137 : }
3180 49137 : break;
3181 : case CLOUDSYNC_PK_INDEX_COLNAME:
3182 49137 : if (type == DBTYPE_TEXT) {
3183 49137 : decode_context->col_name = pval;
3184 49137 : decode_context->col_name_len = ival;
3185 49137 : }
3186 49137 : break;
3187 : case CLOUDSYNC_PK_INDEX_COLVERSION:
3188 49137 : if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
3189 49137 : break;
3190 : case CLOUDSYNC_PK_INDEX_DBVERSION:
3191 49137 : if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
3192 49137 : break;
3193 : case CLOUDSYNC_PK_INDEX_SITEID:
3194 49137 : if (type == DBTYPE_BLOB) {
3195 49137 : decode_context->site_id = pval;
3196 49137 : decode_context->site_id_len = ival;
3197 49137 : }
3198 49137 : break;
3199 : case CLOUDSYNC_PK_INDEX_CL:
3200 49137 : if (type == DBTYPE_INTEGER) decode_context->cl = ival;
3201 49137 : break;
3202 : case CLOUDSYNC_PK_INDEX_SEQ:
3203 49137 : if (type == DBTYPE_INTEGER) decode_context->seq = ival;
3204 49137 : break;
3205 : }
3206 442233 : }
3207 :
3208 442233 : return rc;
3209 : }
3210 :
3211 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
3212 :
3213 685 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3214 : // Guard against calling payload_apply before cloudsync_init: without this,
3215 : // the settings lookups at the top of this function would each emit a
3216 : // "no such table: cloudsync_settings" debug line, control would fall
3217 : // through to the meta-table insert, and the function would ultimately
3218 : // return an error with an empty errmsg — SQLite then surfaces that as
3219 : // the confusing "Runtime error: not an error".
3220 685 : if (!cloudsync_context_is_initialized(data)) {
3221 1 : return cloudsync_set_error(data,
3222 : "cloudsync is not initialized: call SELECT cloudsync_init('<table_name>') "
3223 : "to enable sync on a table before calling cloudsync_payload_apply().",
3224 : DBRES_MISUSE);
3225 : }
3226 :
3227 : // sanity check
3228 684 : if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
3229 :
3230 : // decode header
3231 : cloudsync_payload_header header;
3232 684 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
3233 :
3234 684 : header.signature = ntohl(header.signature);
3235 684 : header.expanded_size = ntohl(header.expanded_size);
3236 684 : header.ncols = ntohs(header.ncols);
3237 684 : header.nrows = ntohl(header.nrows);
3238 684 : header.schema_hash = ntohll(header.schema_hash);
3239 :
3240 : // compare schema_hash only if not disabled and if the received payload was created with the current header version
3241 : // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
3242 684 : if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
3243 684 : if (header.schema_hash != data->schema_hash) {
3244 4 : if (!database_check_schema_hash(data, header.schema_hash)) {
3245 : char buffer[1024];
3246 2 : snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
3247 2 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3248 : }
3249 2 : }
3250 682 : }
3251 :
3252 : // sanity check header
3253 682 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
3254 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
3255 : }
3256 :
3257 682 : const char *buffer = payload + sizeof(cloudsync_payload_header);
3258 682 : size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
3259 :
3260 : // sanity check checksum (only if version is >= 2)
3261 682 : if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
3262 682 : uint64_t checksum = pk_checksum(buffer, buf_len);
3263 682 : if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
3264 1 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
3265 : }
3266 681 : }
3267 :
3268 : // check if payload is compressed
3269 681 : char *clone = NULL;
3270 681 : if (header.expanded_size != 0) {
3271 665 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
3272 665 : if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
3273 :
3274 665 : int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
3275 665 : if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
3276 0 : if (clone) cloudsync_memory_free(clone);
3277 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
3278 : }
3279 :
3280 665 : buffer = (const char *)clone;
3281 665 : buf_len = (size_t)header.expanded_size;
3282 665 : }
3283 :
3284 : // precompile the insert statement
3285 681 : dbvm_t *vm = NULL;
3286 681 : int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
3287 681 : if (rc != DBRES_OK) {
3288 0 : if (clone) cloudsync_memory_free(clone);
3289 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
3290 : }
3291 :
3292 : // process buffer, one row at a time
3293 681 : uint16_t ncols = header.ncols;
3294 681 : uint32_t nrows = header.nrows;
3295 681 : int64_t last_payload_db_version = -1;
3296 681 : int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3297 681 : int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3298 681 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
3299 :
3300 : // Initialize deferred column-batch merge
3301 681 : merge_pending_batch batch = {0};
3302 681 : data->pending_batch = &batch;
3303 681 : bool in_savepoint = false;
3304 681 : const void *last_pk = NULL;
3305 681 : int64_t last_pk_len = 0;
3306 681 : const char *last_tbl = NULL;
3307 681 : int64_t last_tbl_len = 0;
3308 :
3309 49818 : for (uint32_t i=0; i<nrows; ++i) {
3310 49137 : size_t seek = 0;
3311 49137 : int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
3312 49137 : if (res == -1) {
3313 0 : merge_flush_pending(data);
3314 0 : data->pending_batch = NULL;
3315 0 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3316 0 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3317 0 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3318 0 : if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
3319 0 : rc = DBRES_ERROR;
3320 0 : goto cleanup;
3321 : }
3322 :
3323 : // Detect PK/table/db_version boundary to flush pending batch
3324 97593 : bool pk_changed = (last_pk != NULL &&
3325 48456 : (last_pk_len != decoded_context.pk_len ||
3326 46530 : memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
3327 97593 : bool tbl_changed = (last_tbl != NULL &&
3328 48456 : (last_tbl_len != decoded_context.tbl_len ||
3329 48384 : memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
3330 49137 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
3331 :
3332 : // Flush pending batch before any boundary change
3333 49137 : if (pk_changed || tbl_changed || db_version_changed) {
3334 18236 : int flush_rc = merge_flush_pending(data);
3335 18236 : if (flush_rc != DBRES_OK) {
3336 1 : rc = flush_rc;
3337 : // continue processing remaining rows
3338 1 : }
3339 18236 : }
3340 :
3341 : // Per-db_version savepoints group rows with the same source db_version
3342 : // into one transaction. In SQLite autocommit mode, the RELEASE triggers
3343 : // the commit hook which bumps data->db_version and resets seq, ensuring
3344 : // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
3345 : // database_in_transaction() is always true so this block is inactive —
3346 : // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
3347 49137 : if (in_savepoint && db_version_changed) {
3348 4293 : rc = database_commit_savepoint(data, "cloudsync_payload_apply");
3349 4293 : if (rc != DBRES_OK) {
3350 0 : merge_pending_free_entries(&batch);
3351 0 : data->pending_batch = NULL;
3352 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
3353 0 : goto cleanup;
3354 : }
3355 4293 : in_savepoint = false;
3356 4293 : }
3357 :
3358 49137 : if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
3359 4974 : rc = database_begin_savepoint(data, "cloudsync_payload_apply");
3360 4974 : if (rc != DBRES_OK) {
3361 0 : merge_pending_free_entries(&batch);
3362 0 : data->pending_batch = NULL;
3363 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
3364 0 : goto cleanup;
3365 : }
3366 4974 : in_savepoint = true;
3367 4974 : }
3368 :
3369 : // Track db_version for batch-flush boundary detection
3370 49137 : if (db_version_changed) {
3371 4974 : last_payload_db_version = decoded_context.db_version;
3372 4974 : }
3373 :
3374 : // Update PK/table tracking
3375 49137 : last_pk = decoded_context.pk;
3376 49137 : last_pk_len = decoded_context.pk_len;
3377 49137 : last_tbl = decoded_context.tbl;
3378 49137 : last_tbl_len = decoded_context.tbl_len;
3379 :
3380 49137 : rc = databasevm_step(vm);
3381 49137 : if (rc != DBRES_DONE) {
3382 : // don't "break;", the error can be due to a RLS policy.
3383 : // in case of error we try to apply the following changes
3384 2 : }
3385 :
3386 49137 : buffer += seek;
3387 49137 : buf_len -= seek;
3388 49137 : dbvm_reset(vm);
3389 49137 : }
3390 :
3391 : // Final flush after loop
3392 : {
3393 681 : int flush_rc = merge_flush_pending(data);
3394 681 : if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
3395 : }
3396 681 : data->pending_batch = NULL;
3397 :
3398 681 : if (in_savepoint) {
3399 681 : int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
3400 681 : if (rc1 != DBRES_OK) rc = rc1;
3401 681 : }
3402 :
3403 : // save last error (unused if function returns OK)
3404 681 : if (rc != DBRES_OK && rc != DBRES_DONE) {
3405 1 : cloudsync_set_dberror(data);
3406 1 : }
3407 :
3408 681 : if (rc == DBRES_DONE) rc = DBRES_OK;
3409 1361 : if (rc == DBRES_OK) {
3410 : char buf[256];
3411 680 : if (decoded_context.db_version >= dbversion) {
3412 546 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
3413 546 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3414 :
3415 546 : if (decoded_context.seq != seq) {
3416 338 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
3417 338 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3418 338 : }
3419 546 : }
3420 680 : }
3421 :
3422 : cleanup:
3423 : // cleanup merge_pending_batch
3424 681 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3425 681 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3426 681 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3427 :
3428 : // cleanup vm
3429 681 : if (vm) databasevm_finalize(vm);
3430 :
3431 : // cleanup memory
3432 681 : if (clone) cloudsync_memory_free(clone);
3433 :
3434 : // error already saved in (save last error)
3435 681 : if (rc != DBRES_OK) return rc;
3436 :
3437 : // return the number of processed rows
3438 680 : if (pnrows) *pnrows = nrows;
3439 680 : return DBRES_OK;
3440 685 : }
3441 :
3442 : // MARK: - Payload load/store -
3443 :
3444 0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
3445 : // retrieve current db_version and seq
3446 0 : *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
3447 0 : if (*db_version < 0) return DBRES_ERROR;
3448 :
3449 : // retrieve BLOB
3450 : char sql[1024];
3451 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
3452 : "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
3453 :
3454 0 : int64_t len = 0;
3455 0 : int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
3456 0 : *blob_size = (int)len;
3457 0 : if (rc != DBRES_OK) return rc;
3458 :
3459 : // exit if there is no data to send
3460 0 : if (*blob == NULL || *blob_size == 0) return DBRES_OK;
3461 0 : return rc;
3462 0 : }
3463 :
3464 : #ifdef CLOUDSYNC_DESKTOP_OS
3465 0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
3466 : DEBUG_FUNCTION("cloudsync_payload_save");
3467 :
3468 : // silently delete any other payload with the same name
3469 0 : cloudsync_file_delete(payload_path);
3470 :
3471 : // retrieve payload
3472 0 : char *blob = NULL;
3473 0 : int blob_size = 0, db_version = 0;
3474 0 : int64_t new_db_version = 0;
3475 0 : int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
3476 0 : if (rc != DBRES_OK) {
3477 0 : if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
3478 0 : return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
3479 : }
3480 :
3481 : // exit if there is no data to save
3482 0 : if (blob == NULL || blob_size == 0) {
3483 0 : if (size) *size = 0;
3484 0 : return DBRES_OK;
3485 : }
3486 :
3487 : // write payload to file
3488 0 : bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
3489 0 : cloudsync_memory_free(blob);
3490 0 : if (res == false) {
3491 0 : return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
3492 : }
3493 :
3494 : // returns blob size
3495 0 : if (size) *size = blob_size;
3496 0 : return DBRES_OK;
3497 0 : }
3498 : #endif
3499 :
3500 : // MARK: - Core -
3501 :
3502 294 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, CLOUDSYNC_INIT_FLAG init_flags) {
3503 : DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
3504 : char buffer[2048];
3505 :
3506 : // sanity check table name
3507 294 : if (name == NULL) {
3508 1 : return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
3509 : }
3510 :
3511 : // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
3512 : // for table names. This limit is reasonable and helps prevent memory management issues.
3513 293 : const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
3514 293 : if (strlen(name) > maxlen) {
3515 1 : snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
3516 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3517 : }
3518 :
3519 : // check if already initialized
3520 292 : cloudsync_table_context *table = table_lookup(data, name);
3521 292 : if (table) return DBRES_OK;
3522 :
3523 : // check if table exists
3524 288 : if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
3525 2 : snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
3526 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3527 : }
3528 :
3529 : // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
3530 286 : int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
3531 286 : if (npri_keys < 0) return cloudsync_set_dberror(data);
3532 286 : if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
3533 :
3534 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
3535 : // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
3536 286 : if (npri_keys == 0) {
3537 1 : snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
3538 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3539 : }
3540 : #endif
3541 :
3542 285 : bool skip_int_pk_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK) != 0;
3543 285 : if (!skip_int_pk_check) {
3544 222 : if (npri_keys == 1) {
3545 : // the affinity of a column is determined by the declared type of the column,
3546 : // according to the following rules in the order shown:
3547 : // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
3548 134 : int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
3549 134 : if (npri_keys_int < 0) return cloudsync_set_dberror(data);
3550 134 : if (npri_keys == npri_keys_int) {
3551 1 : snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
3552 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3553 : }
3554 :
3555 133 : }
3556 221 : }
3557 :
3558 : // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
3559 : #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
3560 : bool skip_notnull_prikeys_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_PRIKEYS_CHECK) != 0;
3561 : if (!skip_notnull_prikeys_check) {
3562 : if (npri_keys > 0) {
3563 : int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
3564 : if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
3565 : if (npri_keys != npri_keys_notnull) {
3566 : snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
3567 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3568 : }
3569 : }
3570 : }
3571 : #endif
3572 :
3573 : // check for columns declared as NOT NULL without a DEFAULT value.
3574 : // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
3575 284 : bool skip_notnull_default_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_DEFAULT_CHECK) != 0;
3576 284 : if (!skip_notnull_default_check) {
3577 284 : int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
3578 284 : if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
3579 284 : if (n_notnull_nodefault > 0) {
3580 0 : snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
3581 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3582 : }
3583 284 : }
3584 :
3585 284 : return DBRES_OK;
3586 294 : }
3587 :
3588 4 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
3589 4 : if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
3590 :
3591 : // drop meta-table
3592 4 : const char *table_name = table->name;
3593 4 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
3594 4 : int rc = database_exec(data, sql);
3595 4 : cloudsync_memory_free(sql);
3596 4 : if (rc != DBRES_OK) {
3597 : char buffer[1024];
3598 0 : snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
3599 0 : return cloudsync_set_error(data, buffer, rc);
3600 : }
3601 :
3602 : // drop blocks table if this table has block LWW columns
3603 4 : if (table->blocks_ref) {
3604 1 : sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->blocks_ref);
3605 1 : rc = database_exec(data, sql);
3606 1 : cloudsync_memory_free(sql);
3607 1 : if (rc != DBRES_OK) {
3608 : char buffer[1024];
3609 0 : snprintf(buffer, sizeof(buffer), "Unable to drop blocks table %s_cloudsync_blocks in cloudsync_cleanup", table_name);
3610 0 : return cloudsync_set_error(data, buffer, rc);
3611 : }
3612 1 : }
3613 :
3614 : // drop original triggers
3615 4 : rc = database_delete_triggers(data, table_name);
3616 4 : if (rc != DBRES_OK) {
3617 : char buffer[1024];
3618 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
3619 0 : return cloudsync_set_error(data, buffer, rc);
3620 : }
3621 :
3622 : // remove all table related settings
3623 4 : dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
3624 4 : return DBRES_OK;
3625 4 : }
3626 :
3627 4 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
3628 4 : cloudsync_table_context *table = table_lookup(data, table_name);
3629 4 : if (!table) return DBRES_OK;
3630 :
3631 : // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
3632 :
3633 4 : int rc = cloudsync_cleanup_internal(data, table);
3634 4 : if (rc != DBRES_OK) return rc;
3635 :
3636 4 : int counter = table_remove(data, table);
3637 4 : table_free(table);
3638 :
3639 4 : if (counter == 0) {
3640 : // cleanup database on last table
3641 2 : cloudsync_reset_siteid(data);
3642 2 : dbutils_settings_cleanup(data);
3643 2 : } else {
3644 2 : if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
3645 2 : cloudsync_update_schema_hash(data);
3646 2 : }
3647 : }
3648 :
3649 4 : return DBRES_OK;
3650 4 : }
3651 :
3652 0 : int cloudsync_cleanup_all (cloudsync_context *data) {
3653 0 : return database_cleanup(data);
3654 : }
3655 :
3656 484 : int cloudsync_terminate (cloudsync_context *data) {
3657 : // can't use for/loop here because data->tables_count is changed by table_remove
3658 740 : while (data->tables_count > 0) {
3659 256 : cloudsync_table_context *t = data->tables[data->tables_count - 1];
3660 256 : table_remove(data, t);
3661 256 : table_free(t);
3662 : }
3663 :
3664 484 : if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
3665 484 : if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
3666 484 : if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
3667 484 : if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
3668 484 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
3669 :
3670 484 : data->schema_version_stmt = NULL;
3671 484 : data->data_version_stmt = NULL;
3672 484 : data->db_version_stmt = NULL;
3673 484 : data->getset_siteid_stmt = NULL;
3674 484 : data->current_schema = NULL;
3675 :
3676 : // reset the site_id so the cloudsync_context_init will be executed again
3677 : // if any other cloudsync function is called after terminate
3678 484 : data->site_id[0] = 0;
3679 :
3680 484 : return 1;
3681 : }
3682 :
3683 289 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, CLOUDSYNC_INIT_FLAG init_flags) {
3684 : // sanity check table and its primary key(s)
3685 289 : int rc = cloudsync_table_sanity_check(data, table_name, init_flags);
3686 289 : if (rc != DBRES_OK) return rc;
3687 :
3688 : // init cloudsync_settings
3689 287 : if (cloudsync_context_init(data) == NULL) {
3690 0 : return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
3691 : }
3692 :
3693 : // sanity check algo name (if exists)
3694 287 : table_algo algo_new = table_algo_none;
3695 287 : if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
3696 :
3697 287 : algo_new = cloudsync_algo_from_name(algo_name);
3698 287 : if (algo_new == table_algo_none) {
3699 : char buffer[1024];
3700 1 : snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
3701 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3702 : }
3703 :
3704 : // DWS and AWS algorithms are not yet implemented in the merge logic
3705 286 : if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
3706 : char buffer[1024];
3707 2 : snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
3708 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3709 : }
3710 :
3711 : // check if table name was already augmented
3712 284 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
3713 :
3714 : // sanity check algorithm
3715 284 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3716 : // if table algorithms and the same and not none, do nothing
3717 284 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3718 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3719 0 : algo_new = algo_current = table_algo_crdt_cls;
3720 256 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3721 : // algo is already written into settins so just use it
3722 0 : algo_new = algo_current;
3723 256 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3724 : // write table algo name in settings
3725 256 : dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
3726 256 : } else {
3727 : // error condition
3728 0 : return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
3729 : }
3730 :
3731 : // Run the following function even if table was already augmented.
3732 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3733 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3734 :
3735 : // sync algo with table (unused in this version)
3736 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3737 :
3738 : // read row-level filter from settings (if any)
3739 : char init_filter_buf[2048];
3740 284 : int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
3741 284 : const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
3742 :
3743 : // check triggers
3744 284 : rc = database_create_triggers(data, table_name, algo_new, init_filter);
3745 284 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
3746 :
3747 : // check meta-table
3748 284 : rc = database_create_metatable(data, table_name);
3749 284 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
3750 :
3751 : // add prepared statements
3752 284 : if (cloudsync_add_dbvms(data) != DBRES_OK) {
3753 0 : return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
3754 : }
3755 :
3756 : // add table to in-memory data context
3757 284 : if (table_add_to_context(data, algo_new, table_name) == false) {
3758 : char buffer[1024];
3759 0 : snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
3760 0 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3761 : }
3762 :
3763 284 : if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
3764 0 : return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
3765 : }
3766 :
3767 284 : return DBRES_OK;
3768 289 : }
|