NoneWriteDistributionSortedTableOnePartitionSpec:
+ See https://iceberg.apache.org/docs/1.6.0/spark-writes/#writing-distribution-modes
Using write.distribution-mode
- should create the appropriate number of Iceberg files
+ Given a table that has a distribution mode of
none
and is created with:
CREATE TABLE polaris.my_namespace.NoneWriteDistributionSortedTableOnePartitionSpec (
id int,
label String,
partitionKey long,
date Date,
timestamp Timestamp
) USING iceberg TBLPROPERTIES (
'format-version' = '2',
'write.spark.fanout.enabled' = 'true',
'write.distribution-mode' = 'none',
'sort-order' = 'partitionKey ASC NULLS FIRST',
'spark.sql.adaptive.advisoryPartitionSizeInBytes' = '1073741824',
'write.spark.advisory-partition-size-bytes' = '1073741824',
'write.target-file-size-bytes' = '1073741824'
) PARTITIONED BY (partitionKey);
+ And a query plan that looks like:
== Optimized Logical Plan ==
Repartition 6, true, Statistics(sizeInBytes=6.3 MiB)
+- Project [id#1335, partitionKey#1337L, date#1338, timestamp#1339, concat(cast(otherId#1353L as string), xxx) AS label#1372], Statistics(sizeInBytes=6.3 MiB)
+- Join Inner, (partitionKey#1337L = otherId#1353L), Statistics(sizeInBytes=4.9 MiB)
:- LocalRelation [id#1335, partitionKey#1337L, date#1338, timestamp#1339], Statistics(sizeInBytes=640.0 B, rowCount=20)
+- Project [value#1350L AS otherId#1353L], Statistics(sizeInBytes=7.8 KiB)
+- SerializeFromObject [input[0, bigint, false] AS value#1350L], Statistics(sizeInBytes=7.8 KiB)
+- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@492fbf58, class java.lang.Long, [StructField(value,LongType,true)], obj#1349: bigint, Statistics(sizeInBytes=7.8 KiB)
+- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#1345L, true, false, true), obj#1348: java.lang.Long, Statistics(sizeInBytes=7.8 KiB)
+- Range (0, 1000, step=1, splits=Some(4)), Statistics(sizeInBytes=7.8 KiB, rowCount=1.00E+3)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(6), REPARTITION_BY_NUM, [plan_id=2719]
+- Project [id#1335, partitionKey#1337L, date#1338, timestamp#1339, concat(cast(otherId#1353L as string), xxx) AS label#1372]
+- BroadcastHashJoin [partitionKey#1337L], [otherId#1353L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [plan_id=2716]
: +- LocalTableScan [id#1335, partitionKey#1337L, date#1338, timestamp#1339]
+- Project [value#1350L AS otherId#1353L]
+- SerializeFromObject [input[0, bigint, false] AS value#1350L]
+- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@492fbf58, obj#1349: bigint
+- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#1345L, true, false, true), obj#1348: java.lang.Long
+- Range (0, 1000, step=1, splits=4)
+ And it has 20 rows over 6 data file(s) when writing with 4 executor threads
+ When we add another 20 rows of the same data that is logically distributed over 1 partition(s)
+ Then there are now 6 more data files
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ The corner case of all the data being in one partition
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +