Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
_BaseResumptionStrategy,
)
from google.cloud._storage_v2.types.storage import BidiReadObjectRedirectedError

class _DownloadState:
"""A helper class to track the state of a single range download."""
Expand Down Expand Up @@ -65,7 +66,9 @@ def update_state_from_response(self, response: storage_v2.BidiReadObjectResponse
raise DataCorruption(response, f"Byte count mismatch for read_id {read_id}")

async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
"""Handles BidiReadObjectRedirectError for reads."""
"""Handles BidiReadObjectRedirectedError for reads."""
# This would parse the gRPC error details, extract the routing_token,
# and store it on the shared state object.
pass
cause = getattr(error, "cause", error)
if isinstance(cause, BidiReadObjectRedirectedError):
state['routing_token'] = cause.routing_token
20 changes: 20 additions & 0 deletions tests/unit/asyncio/retry/test_reads_resumption_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import unittest
import pytest
from google.cloud.storage.exceptions import DataCorruption
from google.api_core import exceptions

from google.cloud import _storage_v2 as storage_v2
from google.cloud.storage._experimental.asyncio.retry.reads_resumption_strategy import (
_DownloadState,
_ReadResumptionStrategy,
)
from google.cloud._storage_v2.types.storage import BidiReadObjectRedirectedError

_READ_ID = 1

Expand Down Expand Up @@ -204,3 +206,21 @@ def test_update_state_from_response_completes_download_zero_length(self):

self.assertTrue(read_state.is_complete)
self.assertEqual(read_state.bytes_written, len(data))

async def test_recover_state_on_failure_handles_redirect(self):
"""Verify recover_state_on_failure correctly extracts routing_token."""
strategy = _ReadResumptionStrategy()

state = {}
self.assertIsNone(state.get("routing_token"))

dummy_token = "dummy-routing-token"
redirect_error = BidiReadObjectRedirectedError(
routing_token=dummy_token
)
Comment thread
Pulkit0110 marked this conversation as resolved.

final_error = exceptions.RetryError("Retry failed", cause=redirect_error)

await strategy.recover_state_on_failure(final_error, state)

self.assertEqual(state.get("routing_token"), dummy_token)