Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
polishing
  • Loading branch information
chillaq committed Mar 11, 2025
commit 7df86efd82013854f86ed873a6e01ed73294187b
29 changes: 28 additions & 1 deletion splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st

:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage

:param rule_based_segment_storage: Rule based segment Storage.
:type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
"""
self._api = feature_flag_api
self._feature_flag_storage = feature_flag_storage
Expand Down Expand Up @@ -83,6 +86,9 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st

:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage

:param rule_based_segment_storage: Rule based segment Storage.
:type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

Expand All @@ -96,6 +102,9 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None):
:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int

:return: last change number
:rtype: int
"""
Expand Down Expand Up @@ -145,6 +154,9 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int

:return: Flags to check if it should perform bypass or operation ended
:rtype: bool, int, int
"""
Expand Down Expand Up @@ -182,6 +194,9 @@ def synchronize_splits(self, till=None, rbs_till=None):

:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
Expand Down Expand Up @@ -230,6 +245,9 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st

:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage

:param rule_based_segment_storage: Rule based segment Storage.
:type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

Expand All @@ -243,6 +261,9 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int

:return: last change number
:rtype: int
"""
Expand All @@ -256,7 +277,7 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
if rbs_change_number is None:
rbs_change_number = -1

if (till is not None and till < change_number) or (rbs_till is not None and till < rbs_change_number):
if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number):
# the passed till is less than change_number, no need to perform updates
return change_number, rbs_change_number, segment_list

Expand Down Expand Up @@ -292,6 +313,9 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=No
:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int

:return: Flags to check if it should perform bypass or operation ended
:rtype: bool, int, int
"""
Expand All @@ -317,6 +341,9 @@ async def synchronize_splits(self, till=None, rbs_till=None):

:param till: Passed till from Streaming.
:type till: int

:param rbs_till: Passed rbs till from Streaming.
:type rbs_till: int
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
Expand Down
44 changes: 32 additions & 12 deletions tests/sync/test_splits_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ def rbs_change_number_mock():
rbs_1 = copy.deepcopy(json_body['rbs']['d'])
def get_changes(*args, **kwargs):
get_changes.called += 1
# pytest.set_trace()
if get_changes.called == 1:
return { 'ff': { 'd': self.splits, 's': -1, 't': 123 },
'rbs': {"t": 555, "s": -1, "d": rbs_1}}
Expand Down Expand Up @@ -392,6 +391,8 @@ def intersect(sets):
inserted_split = storage.update.mock_calls[0][1][0][0]
assert isinstance(inserted_split, Split)
assert inserted_split.name == 'some_name'
inserted_rbs = rbs_storage.update.mock_calls[0][1][0][0]
assert inserted_rbs.excluded.get_excluded_keys() == ["mauro@split.io","gaston@split.io"]

split_synchronizer._backoff = Backoff(1, 0.1)
split_synchronizer.synchronize_splits(None, 666)
Expand Down Expand Up @@ -664,7 +665,11 @@ async def rbs_change_number_mock():
rbs_change_number_mock._calls += 1
if rbs_change_number_mock._calls == 1:
return -1
return 12345 # Return proper cn for CDN Bypass
elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3:
return 555
elif change_number_mock._calls <= 9:
return 555
return 666 # Return proper cn for CDN Bypass

change_number_mock._calls = 0
rbs_change_number_mock._calls = 0
Expand All @@ -677,8 +682,10 @@ async def update(parsed_split, deleted, change_number):
self.parsed_split = parsed_split
storage.update = update

self.parsed_rbs = None
async def rbs_update(parsed, deleted, change_number):
pass
if len(parsed) > 0:
self.parsed_rbs = parsed
rbs_storage.update = rbs_update

api = mocker.Mock()
Expand All @@ -688,32 +695,38 @@ async def rbs_update(parsed, deleted, change_number):
self.fetch_options_2 = None
self.change_number_3 = None
self.fetch_options_3 = None
rbs_1 = copy.deepcopy(json_body['rbs']['d'])

async def get_changes(change_number, rbs_change_number, fetch_options):
get_changes.called += 1
if get_changes.called == 1:
self.change_number_1 = change_number
self.fetch_options_1 = fetch_options
return { 'ff': { 'd': self.splits, 's': -1, 't': 123 },
'rbs': {"t": 123, "s": -1, "d": []}}
'rbs': {"t": 555, "s": -1, "d": rbs_1}}
elif get_changes.called == 2:
self.change_number_2 = change_number
self.fetch_options_2 = fetch_options
return { 'ff': { 'd': [], 's': 123, 't': 123 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 3:
return { 'ff': { 'd': [], 's': 123, 't': 1234 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called >= 4 and get_changes.called <= 6:
return { 'ff': { 'd': [], 's': 1234, 't': 1234 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 7:
return { 'ff': { 'd': [], 's': 1234, 't': 12345 },
'rbs': {"t": 123, "s": 123, "d": []}}
self.change_number_3 = change_number
self.fetch_options_3 = fetch_options
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 8:
self.change_number_3 = change_number
self.fetch_options_3 = fetch_options
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
'rbs': {"t": 555, "s": 555, "d": []}}
rbs_1[0]['excluded']['keys'] = ['bilal@split.io']
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
'rbs': {"t": 123, "s": 123, "d": []}}

'rbs': {"t": 666, "s": 666, "d": rbs_1}}
get_changes.called = 0
api.fetch_splits = get_changes

Expand Down Expand Up @@ -743,7 +756,14 @@ def intersect(sets):
inserted_split = self.parsed_split[0]
assert isinstance(inserted_split, Split)
assert inserted_split.name == 'some_name'
inserted_rbs = self.parsed_rbs[0]
assert inserted_rbs.excluded.get_excluded_keys() == ["mauro@split.io","gaston@split.io"]

split_synchronizer._backoff = Backoff(1, 0.1)
await split_synchronizer.synchronize_splits(None, 666)
inserted_rbs = self.parsed_rbs[0]
assert inserted_rbs.excluded.get_excluded_keys() == ['bilal@split.io']

@pytest.mark.asyncio
async def test_sync_flag_sets_with_config_sets(self, mocker):
"""Test split sync with flag sets."""
Expand Down