User:Milimetric/Learning iceberg/Spark streaming append
Appearance
Tiny local spark SQL environment, just making a test table:
spark3-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive
Simple table with schema (string_key, string_value), fed by a stream of events, with random keys ranging from 1 to 100. Partition by the first character of the key, so that we'll have roughly 10 partitions and lots of opportunities for collisions as time goes on.
create table milimetric.iceberg_test_update (
string_key string not null,
string_value string
)
using iceberg
partitioned by (
truncate(1, string_key)
)
tblproperties (
'format-version'='2',
'write.format.default'='parquet',
'write.parquet.row-group-size-bytes'='268435456',
'write.parquet.page-size-bytes'='2097152',
'write.metadata.previous-versions-max'='10',
'write.metadata.delete-after-commit.enabled'='true',
'write.delete.mode'='copy-on-write',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
);
insert into milimetric.iceberg_test_update
select '14' as string_key,
'first value' as string_value
;
insert into milimetric.iceberg_test_update
select '14' as string_key,
'second value' as string_value
;
Then feed data into it in different ways and see what happens.
The simplest way is according to docs, by using append mode of structured streaming.
spark3-shell --master yarn \
--packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive
import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
val sdf = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
.selectExpr("coalesce(cast(value % 101 as string), 0) as string_key", "cast(value as string) as string_value")
/**
* Test:
* val sdf.writeStream.format("console").start()
*/
val job = sdf.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("path", "spark_catalog.milimetric.iceberg_test_update")
.option("fanout-enabled", "true")
.option("checkpointLocation", "/tmp/milimetric_test")
.start()
job.awaitTermination()
Run that for a while and evaluate (same spark3-sql as above)
select * from iceberg_test_update where string_key = '14'; -- of course returns all the values inserted for this key, including the initial one set up with the table. -- so how do we get it to update?
Try 1. update identifier fields and insert again - no good
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.Table
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.iceberg.SortOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.NullOrder
import org.apache.iceberg.actions.RewriteDataFiles
val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new java.util.HashMap[String, String]()
catalog.initialize("spark_catalog", properties)
val ice = TableIdentifier.of("milimetric", "iceberg_test_update")
val iceT = catalog.loadTable(ice)
iceT.updateSchema.setIdentifierFields("string_key").commit()
Try 2. with the schema set up as above, see if there's some upsert...