Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,32 @@ class InvalidRetryRequest(RuntimeError):
"""Exception raised when retry request is invalid."""


RETRYABLE_INTERNAL_ERROR_MESSAGES = (
"rst_stream",
"rst stream",
"received unexpected eos on data frame from server",
)
"""Internal error messages that can be retried during read row and mutation."""


def _retriable_internal_server_error(exc):
"""
Return True if the internal server error is retriable.
"""
return isinstance(exc, exceptions.InternalServerError) and any(
retryable_message in exc.message.lower()
for retryable_message in RETRYABLE_INTERNAL_ERROR_MESSAGES
)


def _retry_read_rows_exception(exc):
"""Return True if the exception is retriable for read row requests."""
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded))

return _retriable_internal_server_error(exc) or isinstance(
exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)
)


DEFAULT_RETRY_READ_ROWS = retry.Retry(
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import InternalServerError
from google.api_core.gapic_v1.method import DEFAULT
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
Expand All @@ -37,7 +38,10 @@
from google.cloud.bigtable.row import AppendRow
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.cloud.bigtable.row_data import (
PartialRowsData,
_retriable_internal_server_error,
)
from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS
from google.cloud.bigtable.row_set import RowSet
from google.cloud.bigtable.row_set import RowRange
Expand All @@ -55,9 +59,15 @@
_MAX_BULK_MUTATIONS = 100000
VIEW_NAME_ONLY = enums.Table.View.NAME_ONLY

RETRYABLE_MUTATION_ERRORS = (Aborted, DeadlineExceeded, ServiceUnavailable)
RETRYABLE_MUTATION_ERRORS = (
Aborted,
DeadlineExceeded,
ServiceUnavailable,
InternalServerError,
)
"""Errors which can be retried during row mutation."""


RETRYABLE_CODES: Set[int] = set()

for retryable in RETRYABLE_MUTATION_ERRORS:
Expand Down Expand Up @@ -1130,11 +1140,18 @@ def _do_mutate_retryable_rows(self):
retry=None,
**kwargs
)
except RETRYABLE_MUTATION_ERRORS:
except RETRYABLE_MUTATION_ERRORS as exc:
# If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is
# returned from the initial call, consider
# it to be retryable. Wrap as a Bigtable Retryable Error.
raise _BigtableRetryableError
# For InternalServerError, it is only retriable if the message is related to RST Stream messages
if _retriable_internal_server_error(exc) or not isinstance(
exc, InternalServerError
):
raise _BigtableRetryableError
else:
# re-raise the original exception
raise

num_responses = 0
num_retryable_responses = 0
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,31 @@ def test__retry_read_rows_exception_deadline_exceeded():
assert _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_internal_server_not_retriable():
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import (
_retry_read_rows_exception,
RETRYABLE_INTERNAL_ERROR_MESSAGES,
)

err_message = "500 Error"
exception = InternalServerError(err_message)
assert err_message not in RETRYABLE_INTERNAL_ERROR_MESSAGES
assert not _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_internal_server_retriable():
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import (
_retry_read_rows_exception,
RETRYABLE_INTERNAL_ERROR_MESSAGES,
)

for err_message in RETRYABLE_INTERNAL_ERROR_MESSAGES:
exception = InternalServerError(err_message)
assert _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_miss_wrapped_in_grpc():
from google.api_core.exceptions import Conflict
from google.cloud.bigtable.row_data import _retry_read_rows_exception
Expand Down
55 changes: 54 additions & 1 deletion tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
RETRYABLE_3 = StatusCode.UNAVAILABLE.value[0]
RETRYABLES = (RETRYABLE_1, RETRYABLE_2, RETRYABLE_3)
NON_RETRYABLE = StatusCode.CANCELLED.value[0]
STATUS_INTERNAL = StatusCode.INTERNAL.value[0]


@mock.patch("google.cloud.bigtable.table._MAX_BULK_MUTATIONS", new=3)
Expand Down Expand Up @@ -1636,6 +1637,7 @@ def _do_mutate_retryable_rows_helper(
raising_retry=False,
retryable_error=False,
timeout=None,
mutate_rows_side_effect=None,
):
from google.api_core.exceptions import ServiceUnavailable
from google.cloud.bigtable.row import DirectRow
Expand Down Expand Up @@ -1664,8 +1666,13 @@ def _do_mutate_retryable_rows_helper(

data_api = client._table_data_client = _make_data_api()
if retryable_error:
data_api.mutate_rows.side_effect = ServiceUnavailable("testing")
if mutate_rows_side_effect is not None:
data_api.mutate_rows.side_effect = mutate_rows_side_effect
else:
data_api.mutate_rows.side_effect = ServiceUnavailable("testing")
else:
if mutate_rows_side_effect is not None:
data_api.mutate_rows.side_effect = mutate_rows_side_effect
data_api.mutate_rows.return_value = [response]

worker = _make_worker(client, table.name, rows=rows)
Expand Down Expand Up @@ -1785,6 +1792,52 @@ def test_rmrw_do_mutate_retryable_rows_w_retryable_error():
)


def test_rmrw_do_mutate_retryable_rows_w_retryable_error_internal_rst_stream_error():
# Mutate two rows
# Raise internal server error with RST STREAM error messages
# There should be no error raised and that the request is retried
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES

row_cells = [
(b"row_key_1", ("cf", b"col", b"value1")),
(b"row_key_2", ("cf", b"col", b"value2")),
]
responses = ()

for retryable_internal_error_message in RETRYABLE_INTERNAL_ERROR_MESSAGES:
for message in [
retryable_internal_error_message,
retryable_internal_error_message.upper(),
]:
_do_mutate_retryable_rows_helper(
row_cells,
responses,
retryable_error=True,
mutate_rows_side_effect=InternalServerError(message),
)


def test_rmrw_do_mutate_rows_w_retryable_error_internal_not_retryable():
# Mutate two rows
# Raise internal server error but not RST STREAM error messages
# mutate_rows should raise Internal Server Error
from google.api_core.exceptions import InternalServerError

row_cells = [
(b"row_key_1", ("cf", b"col", b"value1")),
(b"row_key_2", ("cf", b"col", b"value2")),
]
responses = ()

with pytest.raises(InternalServerError):
_do_mutate_retryable_rows_helper(
row_cells,
responses,
mutate_rows_side_effect=InternalServerError("Error not retryable."),
)


def test_rmrw_do_mutate_retryable_rows_retry():
#
# Setup:
Expand Down