scala - Incompatible table partitioning specification using spark BQ connector - Stack Overflow

admin2025-05-02  0

I want to run a spark job that was originally based on scala 2.12.15 and spark 3.3.4 with sparkBigQueryConnectorVersion = "0.28.1".

However, I just upgraded the runtime of dataproc serverless from 1.1 to 1.2 since i need to upgrade from java 11 to java 17 by also changing the library versions (especially the BigQueryconnector) in build.sbt to following:

val sparkVersion = "3.5.1"
val sparkBigQueryConnectorVersion = "0.36.4"
val sparkAvroVersion = "3.5.1"

val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
val sparkHive = "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
val sparkBigQueryConnector = "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % sparkBigQueryConnectorVersion
val sparkAvro = "org.apache.spark" %% "spark-avro" % sparkAvroVersion`

to support the versions specified here in the dataproc serverless 1.2 documentation .2

I run a airflow dag with the following config:

batch_config = {
"spark_batch": {  # Use 'spark_batch' for Spark jobs in Dataproc Serverless
  "jar_file_uris": [
    f"{artifact_bucket}/bq-ingestion-{release_id}.jar"
  ],
  "main_class": "de.mobile.dmh.campaign.ingestion.ExecutableCampaignsIngestion",
  "args": ["--startDate", start_date, "--endDate", end_date, "--outputDs", target_ds]
},
"environment_config": {  # Corrected to use environment_config for serverless
  "execution_config": {
    "subnetwork_uri": subnetwork_uri,  # Specify subnetwork
    "service_account": service_account  # Specify service account
  }
},
"runtime_config": {  # Specify the runtime version for Dataproc Serverless
  "version": "1.2",
  "properties": {  # Spark properties moved here under 'properties'
    "spark.executor.memory": "4g",
    "spark.executor.cores": "4",
  }
}}

The job I'm running needs to write a partitioned Dataframe to a partitioned table using this:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkInit {

  def initSpark(moduleName: String, appName: String, conf: Option[SparkConf]): SparkSession = {
    val spark: SparkSession = SparkSession.builder()
    .appName(s"[$moduleName] $appName")
    .config(conf.getOrElse(defaultConfig()))
    .enableHiveSupport()
    .getOrCreate()

    spark.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false")
    spark.sparkContext.hadoopConfiguration.set("hive.metastore.client.socket.timeout", "1500")
    spark
  }

  def initSpark(moduleName: String, appName: String): SparkSession = initSpark(moduleName, appName, None)

  def defaultConfig(): SparkConf = {
    new SparkConf()
    .set("spark.files.overwrite", "true")
    .set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
    .set("spark.sql.caseSensitive", "false")
    .set("hive.metastore.client.socket.timeout", "1500") // if we need to write a lot of partitions meta store will fail with default of 600
    .set("spark.sql.adaptive.optimizeSkewedJoin.enabled", "true")
    .set("viewsEnabled", "true")
  }

}

ExecutableCampaignsIngestion.scala

// SOME VARIABLE DEFINITIONS HERE  

private def createSparkConfig(): SparkConf = {
    new SparkConf()
    .set("spark.files.overwrite", "true")
    .set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
    .set("spark.dynamicAllocation.enabled", "true")
    .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
    .set("spark.sql.caseSensitive", "false")
    .set("spark.sql.legacy.timeParserPolicy","LEGACY")
    .set("temporaryGcsBucket", temporaryGcsBucket)
    .set("viewsEnabled", "true")
    .set("materializationDataset", materializationDataset)
  }

val spark: SparkSession = SparkInit.initSpark(AppConfig.moduleName, "ExecutableCampaignsIngestion", Some(createSparkConfig()))

dataFrame
.withColumn(partitionColumn, to_date(col(partitionColumn)))
.write
.format(StorageOptions.BiqQuery)
.mode(SaveMode.Overwrite)
.option("createDisposition", "CREATE_IF_NEEDED")
//.option("intermediateFormat", StorageOptions.Avro)
//.option("useAvroLogicalTypes", "true")
.option("partitionField", partitionColumn)
.save(temporaryTableName)

However, I receive an error:

Caused by: com.google.cloud.spark.bigquery.repackaged.google.cloud.bigquery.BigQueryException: Incompatible table partitioning specification. Expects partitioning specification none, but input partitioning specification is interval(type:day,field:event_date)

I only pasted the relevant code. Its very weird since I checked the BigQuery table characteristics, and it has the partitioning enabled already for the partitionColumn value. If I delete the table, and my job recreates it, theres no error in writing the dataframe to the table. But when i run the job again, it gives the same partitioning error again. The older dataproc runtime and library versions worked correctly.

转载请注明原文地址:http://anycun.com/QandA/1746121000a91959.html