HashWriteDistributionSortedDataframeSpec:
+ See https://iceberg.apache.org/docs/1.6.0/spark-writes/#writing-distribution-modes 
Using write.distribution-mode
- should create the appropriate number of Iceberg files
  + Given a table that has a distribution mode of 
hash
and is created with:
CREATE TABLE polaris.my_namespace.HashWriteDistributionSortedDataframeSpec (
  id int,
  label String,
  partitionKey long,
  date Date,
  timestamp Timestamp
) USING iceberg TBLPROPERTIES (
  'format-version' = '2',
  'write.spark.fanout.enabled' = 'true',
  'write.distribution-mode' = 'hash'
) PARTITIONED BY (partitionKey); 
  + And the data is sorted on the partitionKey column 
  + And a query plan that looks like:
== Optimized Logical Plan ==
Sort [partitionKey#3208L ASC NULLS FIRST], true, Statistics(sizeInBytes=6.3 MiB)
+- Repartition 6, true, Statistics(sizeInBytes=6.3 MiB)
   +- Project [id#3206, partitionKey#3208L, date#3209, timestamp#3210, concat(cast(otherId#3224L as string), xxx) AS label#3243], Statistics(sizeInBytes=6.3 MiB)
      +- Join Inner, (partitionKey#3208L = otherId#3224L), Statistics(sizeInBytes=4.9 MiB)
         :- LocalRelation [id#3206, partitionKey#3208L, date#3209, timestamp#3210], Statistics(sizeInBytes=640.0 B, rowCount=20)
         +- Project [value#3221L AS otherId#3224L], Statistics(sizeInBytes=7.8 KiB)
            +- SerializeFromObject [input[0, bigint, false] AS value#3221L], Statistics(sizeInBytes=7.8 KiB)
               +- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@15f87946, class java.lang.Long, [StructField(value,LongType,true)], obj#3220: bigint, Statistics(sizeInBytes=7.8 KiB)
                  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#3216L, true, false, true), obj#3219: java.lang.Long, Statistics(sizeInBytes=7.8 KiB)
                     +- Range (0, 1000, step=1, splits=Some(4)), Statistics(sizeInBytes=7.8 KiB, rowCount=1.00E+3)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [partitionKey#3208L ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(partitionKey#3208L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=5239]
      +- Exchange RoundRobinPartitioning(6), REPARTITION_BY_NUM, [plan_id=5237]
         +- Project [id#3206, partitionKey#3208L, date#3209, timestamp#3210, concat(cast(otherId#3224L as string), xxx) AS label#3243]
            +- BroadcastHashJoin [partitionKey#3208L], [otherId#3224L], Inner, BuildLeft, false
               :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [plan_id=5234]
               :  +- LocalTableScan [id#3206, partitionKey#3208L, date#3209, timestamp#3210]
               +- Project [value#3221L AS otherId#3224L]
                  +- SerializeFromObject [input[0, bigint, false] AS value#3221L]
                     +- MapElements uk.co.odinconsultants.iceberg.distributions.AbstractWriteDistributionSpec$$Lambda$5586/0x0000000802566990@15f87946, obj#3220: bigint
                        +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#3216L, true, false, true), obj#3219: java.lang.Long
                           +- Range (0, 1000, step=1, splits=4)


 
  + And it has 20 rows over 5 data file(s) when writing with 4 executor threads 
  + When we add another 20 rows of the same data that is logically distributed over 5 partition(s) 
  + And the data is sorted on the partitionKey column 
  + Then there are now 5 more data files 
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +  
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +