Skip to content

[SUPPORT] AWS Glue 3.0 fail to write dataset with hudi (hive sync issue) #6832

Closed
@dragonH

Description

@dragonH

Describe the problem you faced

I was trying to use hudi with AWS Glue

At first, i create a simple dataframe

from pyspark.sql import Row
import time

ut = time.time()

product = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00005', 'product_name': 'USB chargers', 'price': 50, 'category': 'Electronics-t2', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

then set the hudi config

hudi_options = {
    'hoodie.table.name': 'Customer_Sample_Hudi',
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'product_id',
    'hoodie.datasource.write.partitionpath.field': 'product_id',
    'hoodie.datasource.write.table.name': 'Customer_Sample_Hudi',
    'hoodie.datasource.write.operation': 'insert_overwrite',
    'hoodie.datasource.write.precombine.field': 'updated_at',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2,
    'path': 's3://my_staging_bucket/Customer_Sample_Hudi/',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': 'default',
    'hoodie.datasource.hive_sync.table': 'Customer_Sample_Hudi',
    'hoodie.datasource.hive_sync.partition_fields': 'product_id',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.mode': 'hms'
}

then write the dataframe to AWS s3

df_products.write.format("hudi")  \
    .options(**hudi_options)  \
    .mode("append")  \
    .save()

sometimes it could write the dataframe successfully

sometimes it would fail (with the same dataframe but different table name)

please see the error message below

the data was write to AWS S3 but meet some error

i've checked that the table name not exists in aws glue catalog before writing it

the path in aws s3 was not exist either
To Reproduce

Steps to reproduce the behavior:

Steps are above

Expected behavior

To write the dataset with hudi successfully

Environment Description

  • Hudi version : 0.10.1

  • Spark version : AWS Glue 3.0 with Spark 3.1.1-amzn-0

  • Hive version : glue catalog

  • Hadoop version : 3.2.1-amzn-3

  • Storage (HDFS/S3/GCS..) : AWS S3

  • Running on Docker? (yes/no) : no

Additional context

I was follow this guideline refer

using 3 jars

hudi-utilities-bundle_2.12-0.10.1.jar
hudi-spark3.1.1-bundle_2.12-0.10.1.jar
spark-avro_2.12-3.1.1.jar

Stacktrace

Py4JJavaError: An error occurred while calling o235.save.
: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing Customer_Sample_Hudi
	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:118)
	at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:539)
	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:595)
	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:591)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:77)
	at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:591)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:665)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to sync partitions for table Customer_Sample_Hudi
	at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:363)
	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:197)
	at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:129)
	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:115)
	... 46 more
Caused by: java.lang.IllegalArgumentException: Partitions must be in the same table
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:92)
	at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.validateInputForBatchCreatePartitions(GlueMetastoreClientDelegate.java:800)
	at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.batchCreatePartitions(GlueMetastoreClientDelegate.java:736)
	at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.addPartitions(GlueMetastoreClientDelegate.java:718)
	at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.add_partitions(AWSCatalogMetastoreClient.java:339)
	at org.apache.hudi.hive.ddl.HMSDDLExecutor.addPartitionsToTable(HMSDDLExecutor.java:198)
	at org.apache.hudi.hive.HoodieHiveClient.addPartitionsToTable(HoodieHiveClient.java:115)
	at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:346)
	... 49 more

Metadata

Metadata

Assignees

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions