Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update sync and tests
  • Loading branch information
chillaq committed Mar 11, 2025
commit 6611a43d98adef434693b271a9d88b5656506c96
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
'flake8',
'pytest==7.0.1',
'pytest-mock==3.11.1',
'coverage',
'coverage==7.0.0',
'pytest-cov==4.1.0',
'importlib-metadata==6.7',
'tomli==1.2.3',
Expand All @@ -17,7 +17,8 @@
'pytest-asyncio==0.21.0',
'aiohttp>=3.8.4',
'aiofiles>=23.1.0',
'requests-kerberos>=0.15.0'
'requests-kerberos>=0.15.0',
'urllib3==2.2.0'
]

INSTALL_REQUIRES = [
Expand Down
32 changes: 16 additions & 16 deletions splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

def _fetch_until(self, fetch_options, till=None):
def _fetch_until(self, fetch_options, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return when since==till.

Expand All @@ -109,7 +109,7 @@ def _fetch_until(self, fetch_options, till=None):
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number):
# the passed till is less than change_number, no need to perform updates
return change_number, rbs_change_number, segment_list

Expand All @@ -135,7 +135,7 @@ def _fetch_until(self, fetch_options, till=None):
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

def _attempt_feature_flag_sync(self, fetch_options, till=None):
def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return True if sync is complete.

Expand All @@ -153,9 +153,9 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till, rbs_till)
final_segment_list.update(segment_list)
if till is None or (till <= change_number and till <= rbs_change_number):
if (till is None or till <= change_number) and (rbs_till is None or rbs_till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
Expand All @@ -176,7 +176,7 @@ def _get_config_sets(self):

return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets)

def synchronize_splits(self, till=None):
def synchronize_splits(self, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return True if sync is complete.

Expand All @@ -186,15 +186,15 @@ def synchronize_splits(self, till=None):
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
till)
till, rbs_till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till, rbs_till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand Down Expand Up @@ -233,7 +233,7 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

async def _fetch_until(self, fetch_options, till=None):
async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return when since==till.

Expand All @@ -256,7 +256,7 @@ async def _fetch_until(self, fetch_options, till=None):
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
if (till is not None and till < change_number) or (rbs_till is not None and till < rbs_change_number):
# the passed till is less than change_number, no need to perform updates
return change_number, rbs_change_number, segment_list

Expand All @@ -282,7 +282,7 @@ async def _fetch_until(self, fetch_options, till=None):
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

async def _attempt_feature_flag_sync(self, fetch_options, till=None):
async def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return True if sync is complete.

Expand All @@ -300,9 +300,9 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till, rbs_till)
final_segment_list.update(segment_list)
if till is None or (till <= change_number and till <= rbs_change_number):
if (till is None or till <= change_number) and (rbs_till is None or rbs_till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
Expand All @@ -311,7 +311,7 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
how_long = self._backoff.get()
await asyncio.sleep(how_long)

async def synchronize_splits(self, till=None):
async def synchronize_splits(self, till=None, rbs_till=None):
"""
Hit endpoint, update storage and return True if sync is complete.

Expand All @@ -321,15 +321,15 @@ async def synchronize_splits(self, till=None):
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
till)
till, rbs_till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till, rbs_till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand Down
35 changes: 25 additions & 10 deletions tests/sync/test_splits_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,34 +322,44 @@ def rbs_change_number_mock():
rbs_change_number_mock._calls += 1
if rbs_change_number_mock._calls == 1:
return -1
return 12345 # Return proper cn for CDN Bypass
elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3:
return 555
elif change_number_mock._calls <= 9:
return 555
return 666 # Return proper cn for CDN Bypass

change_number_mock._calls = 0
rbs_change_number_mock._calls = 0
storage.get_change_number.side_effect = change_number_mock
rbs_storage.get_change_number.side_effect = rbs_change_number_mock

api = mocker.Mock()

rbs_1 = copy.deepcopy(json_body['rbs']['d'])
def get_changes(*args, **kwargs):
get_changes.called += 1
# pytest.set_trace()
if get_changes.called == 1:
return { 'ff': { 'd': self.splits, 's': -1, 't': 123 },
'rbs': {"t": 123, "s": -1, "d": []}}
'rbs': {"t": 555, "s": -1, "d": rbs_1}}
elif get_changes.called == 2:
return { 'ff': { 'd': [], 's': 123, 't': 123 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 3:
return { 'ff': { 'd': [], 's': 123, 't': 1234 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called >= 4 and get_changes.called <= 6:
return { 'ff': { 'd': [], 's': 1234, 't': 1234 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 7:
return { 'ff': { 'd': [], 's': 1234, 't': 12345 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 555, "s": 555, "d": []}}
elif get_changes.called == 8:
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
'rbs': {"t": 555, "s": 555, "d": []}}
rbs_1[0]['excluded']['keys'] = ['bilal@split.io']
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
'rbs': {"t": 123, "s": 123, "d": []}}
'rbs': {"t": 666, "s": 666, "d": rbs_1}}

get_changes.called = 0
api.fetch_splits.side_effect = get_changes

Expand Down Expand Up @@ -377,12 +387,17 @@ def intersect(sets):
split_synchronizer.synchronize_splits(12345)
assert api.fetch_splits.mock_calls[3][1][0] == 1234
assert api.fetch_splits.mock_calls[3][1][2].cache_control_headers == True
assert len(api.fetch_splits.mock_calls) == 10 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)

inserted_split = storage.update.mock_calls[0][1][0][0]
assert isinstance(inserted_split, Split)
assert inserted_split.name == 'some_name'

split_synchronizer._backoff = Backoff(1, 0.1)
split_synchronizer.synchronize_splits(None, 666)
inserted_rbs = rbs_storage.update.mock_calls[8][1][0][0]
assert inserted_rbs.excluded.get_excluded_keys() == ['bilal@split.io']

def test_sync_flag_sets_with_config_sets(self, mocker):
"""Test split sync with flag sets."""
storage = InMemorySplitStorage(['set1', 'set2'])
Expand Down Expand Up @@ -723,7 +738,7 @@ def intersect(sets):
split_synchronizer._backoff = Backoff(1, 0.1)
await split_synchronizer.synchronize_splits(12345)
assert (12345, True, 1234) == (self.change_number_3, self.fetch_options_3.cache_control_headers, self.fetch_options_3.change_number)
assert get_changes.called == 10 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
assert get_changes.called == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)

inserted_split = self.parsed_split[0]
assert isinstance(inserted_split, Split)
Expand Down