Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
fb2723a
Merge pull request #555 from splitio/development
chillaq Jan 17, 2025
163afc8
model and memory storage
chillaq Mar 6, 2025
a64a06e
update storage helper
chillaq Mar 6, 2025
c07651e
polish
chillaq Mar 6, 2025
2c9c47e
Merge pull request #557 from splitio/rbs-models-mem-storage
chillaq Mar 7, 2025
06a84f7
update evaluator
chillaq Mar 7, 2025
8228d94
Revert "update evaluator"
chillaq Mar 7, 2025
7a143cc
updated evaluator
chillaq Mar 7, 2025
93a9fdb
Merge pull request #558 from splitio/rbs-evaluator
chillaq Mar 10, 2025
5bda502
Updated sync and api classes
chillaq Mar 10, 2025
3b6780e
Revert "Updated sync and api classes"
chillaq Mar 10, 2025
58d5ddd
Update sync and api classes
chillaq Mar 10, 2025
6611a43
Update sync and tests
chillaq Mar 11, 2025
7df86ef
polishing
chillaq Mar 11, 2025
4cd84cd
Merge pull request #559 from splitio/sync-api-classes
chillaq Mar 11, 2025
3396b5f
Updated SSE classes
chillaq Mar 12, 2025
7cd34eb
updated redis, pluggable and localjson storages
chillaq Mar 12, 2025
4d8327c
Updated redis, pluggable and localjson storages
chillaq Mar 13, 2025
2cbc647
Update splitio/storage/pluggable.py
chillaq Mar 14, 2025
d0b2c67
Update splitio/storage/pluggable.py
chillaq Mar 14, 2025
cc990a9
Update splitio/storage/pluggable.py
chillaq Mar 14, 2025
db5eafc
Merge pull request #561 from splitio/rbs_redis_pluggable
chillaq Mar 14, 2025
1d8b448
Merge pull request #560 from splitio/rbs_sse
chillaq Mar 14, 2025
4f7d8dc
Updated tests
chillaq Mar 19, 2025
e070b90
fixed tests
chillaq Mar 19, 2025
db38e3e
Merge pull request #562 from splitio/rbs_factory
chillaq Mar 19, 2025
2e7f5d3
updated storage helper and evaluator
chillaq Mar 24, 2025
6e8188d
Merge pull request #563 from splitio/update-evaluator-rbs-storage
chillaq Mar 25, 2025
9aa56a1
Added support for old spec in fetcher
chillaq May 1, 2025
d7b06a0
Added old spec for Localhost
chillaq May 3, 2025
5530baa
polish and integration tests
chillaq May 5, 2025
e649a3c
polish
chillaq May 6, 2025
2de48b9
Merge pull request #564 from splitio/rbs-old-spec-fetcher
chillaq May 7, 2025
3eff00c
polish
chillaq May 9, 2025
f3e9137
Update rb segment matcher
chillaq May 13, 2025
6fccf99
updated test
chillaq May 13, 2025
98a6852
polish
chillaq May 13, 2025
333919c
fix matcher and test
chillaq May 14, 2025
1bd96ab
Update splitio/models/grammar/matchers/rule_based_segment.py
chillaq May 14, 2025
ba4e347
Fix initial segment fetch
chillaq May 15, 2025
066b78f
polish
chillaq May 16, 2025
ca2e3cb
updated split api
chillaq May 16, 2025
533740b
Merge pull request #567 from splitio/rbs-oldspec-restore-since
chillaq May 16, 2025
3fea6cb
Merge pull request #566 from splitio/rbs-fix-segment-initial-fetch
chillaq May 19, 2025
b3f3f36
Merge pull request #565 from splitio/rbs-old-spec-localhost
chillaq May 20, 2025
338ac89
Fixed proxy error
chillaq May 21, 2025
6dcac32
Fixed matcher
chillaq May 21, 2025
0043805
Merge pull request #569 from splitio/rbs-fix-proxy-error
chillaq May 21, 2025
c093206
Added models
chillaq May 29, 2025
8281dec
Added matcher
chillaq May 29, 2025
2214cd5
Updated evaluator
chillaq May 30, 2025
3692161
Merge pull request #570 from splitio/prereq-models
chillaq May 30, 2025
e153509
polish
chillaq May 30, 2025
488757f
Merge pull request #571 from splitio/prereq-matcher
chillaq May 30, 2025
249d9c6
Merge pull request #573 from splitio/T-FME-3998-prereq-evaluator
chillaq May 30, 2025
b64948d
fixed rbs matcher
chillaq Jun 2, 2025
c30a18b
fixed tests
chillaq Jun 2, 2025
c174578
Updated localhostjson sync
chillaq Jun 3, 2025
de2f013
Updated integrations tests
chillaq Jun 3, 2025
c830bc3
Merge pull request #574 from splitio/T-FME-4182-prereq-localhost-json
chillaq Jun 5, 2025
971f9ed
Merge pull request #575 from splitio/T-FME-4178-prereq-integration
chillaq Jun 5, 2025
21635a8
Merge branch 'feature/rule-based-segment' into feature/prerequisites
chillaq Jun 5, 2025
5abf718
Merge pull request #576 from splitio/feature/prerequisites
chillaq Jun 5, 2025
94f0755
updated version and changes
chillaq Jun 5, 2025
24c65c1
Update ci.yml
chillaq Jun 5, 2025
f876ebe
Update ci.yml
chillaq Jun 5, 2025
596ebed
Update ci.yml
chillaq Jun 5, 2025
ff90620
Update ci.yml
chillaq Jun 5, 2025
ace5a58
Update ci.yml
chillaq Jun 5, 2025
b64f84e
Update ci.yml
chillaq Jun 5, 2025
a462819
Update ci.yml
chillaq Jun 5, 2025
9349b47
downgrade urllib version for tests
chillaq Jun 5, 2025
c1bc9b9
Merge branch 'feature/rule-based-segment' of https://github.com/split…
chillaq Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated sync and api classes
  • Loading branch information
chillaq committed Mar 10, 2025
commit 5bda502b3b14917cce7c7268d08a1624ab4f66d7
20 changes: 18 additions & 2 deletions splitio/api/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def record_telemetry(status_code, elapsed, metric_name, telemetry_runtime_produc
class FetchOptions(object):
"""Fetch Options object."""

def __init__(self, cache_control_headers=False, change_number=None, sets=None, spec=SPEC_VERSION):
def __init__(self, cache_control_headers=False, change_number=None, rbs_change_number=None, sets=None, spec=SPEC_VERSION):
"""
Class constructor.

Expand All @@ -72,6 +72,7 @@ def __init__(self, cache_control_headers=False, change_number=None, sets=None, s
"""
self._cache_control_headers = cache_control_headers
self._change_number = change_number
self._rbs_change_number = rbs_change_number
self._sets = sets
self._spec = spec

Expand All @@ -85,6 +86,11 @@ def change_number(self):
"""Return change number."""
return self._change_number

@property
def rbs_change_number(self):
"""Return change number."""
return self._rbs_change_number

@property
def sets(self):
"""Return sets."""
Expand All @@ -103,14 +109,19 @@ def __eq__(self, other):
if self._change_number != other._change_number:
return False

if self._rbs_change_number != other._rbs_change_number:
return False

if self._sets != other._sets:
return False

if self._spec != other._spec:
return False

return True


def build_fetch(change_number, fetch_options, metadata):
def build_fetch(change_number, fetch_options, metadata, rbs_change_number=None):
"""
Build fetch with new flags if that is the case.

Expand All @@ -123,11 +134,16 @@ def build_fetch(change_number, fetch_options, metadata):
:param metadata: Metadata Headers.
:type metadata: dict

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:return: Objects for fetch
:rtype: dict, dict
"""
query = {'s': fetch_options.spec} if fetch_options.spec is not None else {}
query['since'] = change_number
if rbs_change_number is not None:
query['rbSince'] = rbs_change_number
extra_headers = metadata
if fetch_options is None:
return query, extra_headers
Expand Down
14 changes: 10 additions & 4 deletions splitio/api/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
self._telemetry_runtime_producer = telemetry_runtime_producer
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)

def fetch_splits(self, change_number, fetch_options):
def fetch_splits(self, change_number, rbs_change_number, fetch_options):
"""
Fetch feature flags from backend.

:param change_number: Last known timestamp of a split modification.
:type change_number: int

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:param fetch_options: Fetch options for getting feature flag definitions.
:type fetch_options: splitio.api.commons.FetchOptions

:return: Json representation of a splitChanges response.
:rtype: dict
"""
try:
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = self._client.get(
'sdk',
'splitChanges',
Expand Down Expand Up @@ -86,12 +89,15 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
self._telemetry_runtime_producer = telemetry_runtime_producer
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)

async def fetch_splits(self, change_number, fetch_options):
async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
"""
Fetch feature flags from backend.

:param change_number: Last known timestamp of a split modification.
:type change_number: int

:param rbs_change_number: Last known timestamp of a rule based segment modification.
:type rbs_change_number: int

:param fetch_options: Fetch options for getting feature flag definitions.
:type fetch_options: splitio.api.commons.FetchOptions
Expand All @@ -100,7 +106,7 @@ async def fetch_splits(self, change_number, fetch_options):
:rtype: dict
"""
try:
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = await self._client.get(
'sdk',
'splitChanges',
Expand Down
103 changes: 63 additions & 40 deletions splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from splitio.api import APIException, APIUriException
from splitio.api.commons import FetchOptions
from splitio.client.input_validator import validate_flag_sets
from splitio.models import splits
from splitio.models import splits, rule_based_segments
from splitio.util.backoff import Backoff
from splitio.util.time import get_current_epoch_time_ms
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async, \
update_rule_based_segment_storage, update_rule_based_segment_storage_async
from splitio.sync import util
from splitio.optional.loaders import asyncio, aiofiles

Expand All @@ -32,7 +33,7 @@
class SplitSynchronizerBase(object):
"""Feature Flag changes synchronizer."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -44,6 +45,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
"""
self._api = feature_flag_api
self._feature_flag_storage = feature_flag_storage
self._rule_based_segment_storage = rule_based_segment_storage
self._backoff = Backoff(
_ON_DEMAND_FETCH_BACKOFF_BASE,
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
Expand All @@ -53,6 +55,11 @@ def feature_flag_storage(self):
"""Return Feature_flag storage object"""
return self._feature_flag_storage

@property
def rule_based_segment_storage(self):
"""Return rule base segment storage object"""
return self._rule_based_segment_storage

def _get_config_sets(self):
"""
Get all filter flag sets cnverrted to string, if no filter flagsets exist return None
Expand All @@ -67,7 +74,7 @@ def _get_config_sets(self):
class SplitSynchronizer(SplitSynchronizerBase):
"""Feature Flag changes synchronizer."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -77,7 +84,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

def _fetch_until(self, fetch_options, till=None):
"""
Expand All @@ -97,12 +104,17 @@ def _fetch_until(self, fetch_options, till=None):
change_number = self._feature_flag_storage.get_change_number()
if change_number is None:
change_number = -1
if till is not None and till < change_number:

rbs_change_number = self._rule_based_segment_storage.get_change_number()
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
# the passed till is less than change_number, no need to perform updates
return change_number, segment_list
return change_number, rbs_change_number, segment_list

try:
feature_flag_changes = self._api.fetch_splits(change_number, fetch_options)
feature_flag_changes = self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
except APIException as exc:
if exc._status_code is not None and exc._status_code == 414:
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
Expand All @@ -112,15 +124,16 @@ def _fetch_until(self, fetch_options, till=None):
_LOGGER.error('Exception raised while fetching feature flags')
_LOGGER.debug('Exception information: ', exc_info=True)
raise exc
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list

fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

def _attempt_feature_flag_sync(self, fetch_options, till=None):
"""
Expand All @@ -140,13 +153,13 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, segment_list = self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till)
final_segment_list.update(segment_list)
if till is None or till <= change_number:
return True, remaining_attempts, change_number, final_segment_list
if till is None or (till <= change_number and till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
return False, remaining_attempts, change_number, final_segment_list
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list

how_long = self._backoff.get()
time.sleep(how_long)
Expand All @@ -172,16 +185,16 @@ def synchronize_splits(self, till=None):
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand All @@ -208,7 +221,7 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
class SplitSynchronizerAsync(SplitSynchronizerBase):
"""Feature Flag changes synchronizer async."""

def __init__(self, feature_flag_api, feature_flag_storage):
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
"""
Class constructor.

Expand All @@ -218,7 +231,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
:param feature_flag_storage: Feature Flag Storage.
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
"""
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)

async def _fetch_until(self, fetch_options, till=None):
"""
Expand All @@ -238,12 +251,17 @@ async def _fetch_until(self, fetch_options, till=None):
change_number = await self._feature_flag_storage.get_change_number()
if change_number is None:
change_number = -1
if till is not None and till < change_number:

rbs_change_number = await self._rule_based_segment_storage.get_change_number()
if rbs_change_number is None:
rbs_change_number = -1

if till is not None and till < change_number and till < rbs_change_number:
# the passed till is less than change_number, no need to perform updates
return change_number, segment_list
return change_number, rbs_change_number, segment_list

try:
feature_flag_changes = await self._api.fetch_splits(change_number, fetch_options)
feature_flag_changes = await self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
except APIException as exc:
if exc._status_code is not None and exc._status_code == 414:
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
Expand All @@ -254,10 +272,15 @@ async def _fetch_until(self, fetch_options, till=None):
_LOGGER.debug('Exception information: ', exc_info=True)
raise exc

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
rbs_segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])

fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

async def _attempt_feature_flag_sync(self, fetch_options, till=None):
"""
Expand All @@ -277,13 +300,13 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
while True:
remaining_attempts -= 1
change_number, segment_list = await self._fetch_until(fetch_options, till)
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till)
final_segment_list.update(segment_list)
if till is None or till <= change_number:
return True, remaining_attempts, change_number, final_segment_list
if till is None or (till <= change_number and till <= rbs_change_number):
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list

elif remaining_attempts <= 0:
return False, remaining_attempts, change_number, final_segment_list
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list

how_long = self._backoff.get()
await asyncio.sleep(how_long)
Expand All @@ -297,16 +320,16 @@ async def synchronize_splits(self, till=None):
"""
final_segment_list = set()
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
till)
final_segment_list.update(segment_list)
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if successful_sync: # succedeed sync
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
return final_segment_list

with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
final_segment_list.update(segment_list)
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
if without_cdn_successful_sync:
Expand Down
Loading