HashDistributionSortedTableSpec: + See https://iceberg.apache.org/docs/1.6.0/spark-writes/#writing-distribution-modes Using write.distribution-mode - should create the appropriate number of Iceberg files + Given a table that has a distribution mode of hash and is created with: CREATE TABLE polaris.my_namespace.HashDistributionSortedTableSpec ( id int, label String, partitionKey long, date Date, timestamp Timestamp ) USING iceberg TBLPROPERTIES ( 'format-version' = '2', 'write.spark.fanout.enabled' = 'true', 'write.distribution-mode' = 'hash', 'sort-order' = 'partitionKey ASC NULLS FIRST', 'spark.sql.adaptive.advisoryPartitionSizeInBytes' = '1073741824', 'write.spark.advisory-partition-size-bytes' = '1073741824', 'write.target-file-size-bytes' = '1073741824' ) PARTITIONED BY (partitionKey); + And a query plan that looks like: == Optimized Logical Plan == Repartition 6, true, Statistics(sizeInBytes=6.3 MiB) +- Project [id#1453, partitionKey#1455L, date#1456, timestamp#1457, concat(cast(otherId#1471L as string), xxx) AS label#1490], Statistics(sizeInBytes=6.3 MiB) +- Join Inner, (partitionKey#1455L = otherId#1471L), Statistics(sizeInBytes=4.9 MiB) :- LocalRelation [id#1453, partitionKey#1455L, date#1456, timestamp#1457], Statistics(sizeInBytes=640.0 B, rowCount=20) +- Project [value#1468L AS otherId#1471L], Statistics(sizeInBytes=7.8 KiB) +- SerializeFromObject [input[0, bigint, false] AS value#1468L], Statistics(sizeInBytes=7.8 KiB) +- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@2b704646, class java.lang.Long, [StructField(value,LongType,true)], obj#1467: bigint, Statistics(sizeInBytes=7.8 KiB) +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#1463L, true, false, true), obj#1466: java.lang.Long, Statistics(sizeInBytes=7.8 KiB) +- Range (0, 1000, step=1, splits=Some(4)), Statistics(sizeInBytes=7.8 KiB, rowCount=1.00E+3) == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange RoundRobinPartitioning(6), REPARTITION_BY_NUM, [plan_id=3047] +- Project [id#1453, partitionKey#1455L, date#1456, timestamp#1457, concat(cast(otherId#1471L as string), xxx) AS label#1490] +- BroadcastHashJoin [partitionKey#1455L], [otherId#1471L], Inner, BuildLeft, false :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [plan_id=3044] : +- LocalTableScan [id#1453, partitionKey#1455L, date#1456, timestamp#1457] +- Project [value#1468L AS otherId#1471L] +- SerializeFromObject [input[0, bigint, false] AS value#1468L] +- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@2b704646, obj#1467: bigint +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#1463L, true, false, true), obj#1466: java.lang.Long +- Range (0, 1000, step=1, splits=4) + And it has 20 rows over 5 data file(s) when writing with 4 executor threads + When we add another 20 rows of the same data that is logically distributed over 5 partition(s) + Then there are now 5 more data files + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +