Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update sync and api classes
  • Loading branch information
chillaq committed Mar 10, 2025
commit 58d5ddda54f0556731664adf3b9e925f47943fdc
20 changes: 18 additions & 2 deletions splitio/api/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def record_telemetry(status_code, elapsed, metric_name, telemetry_runtime_produc
class FetchOptions(object):
"""Fetch Options object."""

def __init__(self, cache_control_headers=False, change_number=None, sets=None, spec=SPEC_VERSION):
def __init__(self, cache_control_headers=False, change_number=None, rbs_change_number=None, sets=None, spec=SPEC_VERSION):
"""
Class constructor.

Expand All @@ -72,6 +72,7 @@ def __init__(self, cache_control_headers=False, change_number=None, sets=None, s
"""
self._cache_control_headers = cache_control_headers
self._change_number = change_number
self._rbs_change_number = rbs_change_number
self._sets = sets
self._spec = spec

Expand All @@ -85,6 +86,11 @@ def change_number(self):
"""Return change number."""
return self._change_number

@property
def rbs_change_number(self):
"""Return change number."""
return self._rbs_change_number

@property
def sets(self):
"""Return sets."""
Expand All @@ -103,14 +109,19 @@ def __eq__(self, other):
if self._change_number != other._change_number:
return False

if self._rbs_change_number != other._rbs_change_number:
return False

if self._sets != other._sets:
return False

if self._spec != other._spec:
return False

return True


def build_fetch(change_number, fetch_options, metadata):
def build_fetch(change_number, fetch_options, metadata, rbs_change_number=None):
"""
Build fetch with new flags if that is the case.

Expand All @@ -123,11 +134,16 @@ def build_fetch(change_number, fetch_options, metadata):
:param metadata: Metadata Headers.
:type metadata: dict

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:return: Objects for fetch
:rtype: dict, dict
"""
query = {'s': fetch_options.spec} if fetch_options.spec is not None else {}
query['since'] = change_number
if rbs_change_number is not None:
query['rbSince'] = rbs_change_number
extra_headers = metadata
if fetch_options is None:
return query, extra_headers
Expand Down
14 changes: 10 additions & 4 deletions splitio/api/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
self._telemetry_runtime_producer = telemetry_runtime_producer
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)

def fetch_splits(self, change_number, fetch_options):
def fetch_splits(self, change_number, rbs_change_number, fetch_options):
"""
Fetch feature flags from backend.

:param change_number: Last known timestamp of a split modification.
:type change_number: int

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:param fetch_options: Fetch options for getting feature flag definitions.
:type fetch_options: splitio.api.commons.FetchOptions

:return: Json representation of a splitChanges response.
:rtype: dict
"""
try:
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = self._client.get(
'sdk',
'splitChanges',
Expand Down Expand Up @@ -86,12 +89,15 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
self._telemetry_runtime_producer = telemetry_runtime_producer
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)

async def fetch_splits(self, change_number, fetch_options):
async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
"""
Fetch feature flags from backend.

:param change_number: Last known timestamp of a split modification.
:type change_number: int

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:param fetch_options: Fetch options for getting feature flag definitions.
:type fetch_options: splitio.api.commons.FetchOptions
Expand All @@ -100,7 +106,7 @@ async def fetch_splits(self, change_number, fetch_options):
:rtype: dict
"""
try:
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = await self._client.get(
'sdk',
'splitChanges',
Expand Down
103 changes: 63 additions & 40 deletions splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from splitio.api import APIException, APIUriException
from splitio.api.commons import FetchOptions
from splitio.client.input_validator import validate_flag_sets
from splitio.models import splits
from splitio.models import splits, rule_based_segments
from splitio.util.backoff import Backoff
from splitio.util.time import get_current_epoch_time_ms
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async, \
update_rule_based_segment_storage, update_rule_based_segment_storage_async
from splitio.sync import util
from splitio.optional.loaders import asyncio, aiofiles

Expand All @@ -32,7 +33,7 @@
class SplitSynchronizerBase(object):
"""Feature Flag changes synchronizer."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -44,6 +45,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
"""
self._api = feature_flag_api
self._feature_flag_storage = feature_flag_storage
self._rule_based_segment_storage = rule_based_segment_storage
self._backoff = Backoff(
_ON_DEMAND_FETCH_BACKOFF_BASE,
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
Expand All @@ -53,6 +55,11 @@ def feature_flag_storage(self):
"""Return Feature_flag storage object"""
return self._feature_flag_storage

@property
def rule_based_segment_storage(self):
"""Return rule base segment storage object"""
return self._rule_based_segment_storage

def _get_config_sets(self):
"""
Get all filter flag sets cnverrted to string, if no filter flagsets exist return None
Expand All @@ -67,7 +74,7 @@ def _get_config_sets(self):
class SplitSynchronizer(SplitSynchronizerBase):
"""Feature Flag changes synchronizer."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -77,7 +84,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

def _fetch_until(self, fetch_options, till=None):
"""
Expand All @@ -97,12 +104,17 @@ def _fetch_until(self, fetch_options, till=None):
change_number = self._feature_flag_storage.get_change_number()
if change_number is None:
change_number = -1
if till is not None and till < change_number:

rbs_change_number = self._rule_based_segment_storage.get_change_number()
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
# the passed till is less than change_number, no need to perform updates
return change_number, segment_list
return change_number, rbs_change_number, segment_list

try:
feature_flag_changes = self._api.fetch_splits(change_number, fetch_options)
feature_flag_changes = self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
except APIException as exc:
if exc._status_code is not None and exc._status_code == 414:
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
Expand All @@ -112,15 +124,16 @@ def _fetch_until(self, fetch_options, till=None):
_LOGGER.error('Exception raised while fetching feature flags')
_LOGGER.debug('Exception information: ', exc_info=True)
raise exc
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list

fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

def _attempt_feature_flag_sync(self, fetch_options, till=None):
"""
Expand All @@ -140,13 +153,13 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, segment_list = self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till)
final_segment_list.update(segment_list)
if till is None or till <= change_number:
return True, remaining_attempts, change_number, final_segment_list
if till is None or (till <= change_number and till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
return False, remaining_attempts, change_number, final_segment_list
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list

how_long = self._backoff.get()
time.sleep(how_long)
Expand All @@ -172,16 +185,16 @@ def synchronize_splits(self, till=None):
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand All @@ -208,7 +221,7 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
class SplitSynchronizerAsync(SplitSynchronizerBase):
"""Feature Flag changes synchronizer async."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -218,7 +231,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

async def _fetch_until(self, fetch_options, till=None):
"""
Expand All @@ -238,12 +251,17 @@ async def _fetch_until(self, fetch_options, till=None):
change_number = await self._feature_flag_storage.get_change_number()
if change_number is None:
change_number = -1
if till is not None and till < change_number:

rbs_change_number = await self._rule_based_segment_storage.get_change_number()
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
# the passed till is less than change_number, no need to perform updates
return change_number, segment_list
return change_number, rbs_change_number, segment_list

try:
feature_flag_changes = await self._api.fetch_splits(change_number, fetch_options)
feature_flag_changes = await self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
except APIException as exc:
if exc._status_code is not None and exc._status_code == 414:
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
Expand All @@ -254,10 +272,15 @@ async def _fetch_until(self, fetch_options, till=None):
_LOGGER.debug('Exception information: ', exc_info=True)
raise exc

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
rbs_segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

async def _attempt_feature_flag_sync(self, fetch_options, till=None):
"""
Expand All @@ -277,13 +300,13 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, segment_list = await self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till)
final_segment_list.update(segment_list)
if till is None or till <= change_number:
return True, remaining_attempts, change_number, final_segment_list
if till is None or (till <= change_number and till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
return False, remaining_attempts, change_number, final_segment_list
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list

how_long = self._backoff.get()
await asyncio.sleep(how_long)
Expand All @@ -297,16 +320,16 @@ async def synchronize_splits(self, till=None):
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand Down
Loading