Description
Script Used To Test
import airbyte
from airbyte.models import shared
s = airbyte.Airbyte(
security=shared.Security(
server_url="http://localhost:8006/v1",
basic_auth=shared.SchemeBasicAuth(
password=<pwd>,
username=<username>,
),
),
)
req = operations.GetSourceRequest(
source_id=<custom-source-id>,
)
res = airbyte_client.sources.get_source(req)
if res.source_response is not None:
print("source fetched")
Error Observed
Traceback (most recent call last):
File "/mnt/c/ACA-Group/alpha-airbyte/web/api/source/test.py", line 173, in
result = AirbyteConnectorConfig.process_get({'source_id':'c5f9b2ed-60ec-4e46-93dc-b00a54fcab11'})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/source/test.py", line 50, in process_get
res = airbyte_client.sources.get_source(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/airbyte/sources.py", line 102, in get_source
out = utils.unmarshal_json(http_res.text, Optional[shared.SourceResponse])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/airbyte/utils/utils.py", line 695, in unmarshal_json
out = unmarshal.from_dict({"res": json_dict})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/dataclasses_json/api.py", line 70, in from_dict
return _decode_dataclass(cls, kvs, infer_missing)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/dataclasses_json/core.py", line 220, in _decode_dataclass
init_kwargs[field.name] = _decode_generic(field_type,
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/dataclasses_json/core.py", line 309, in _decode_generic
res = _decode_dataclass(type_arg, value, infer_missing)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/web/api/.venv/lib/python3.11/site-packages/dataclasses_json/core.py", line 172, in _decode_dataclass
field_value = kvs[field.name]
~~~^^^^^^^^^^^^
KeyError: 'source_type'
Additionally
It is expecting source_type to be mandatorily available in SourceResponse which is true only for non custom connectors
Although hitting the API via postman works without issues (http://localhost:8006/v1/sources/)
Also looks like the dataclasses for source response is restrictive and tailor made only for noncustom connectors
as I see the source_type is not defined as Optional[str] and Configurations is a Union of defined sources only, so it would fail to accommodate for custom sources with custom configurations
https://github.com/airbytehq/airbyte-api-python-sdk/blob/main/src/airbyte/models/shared/sourceresponse.py#L207