You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, ACID reader is currently using Hive ORC Acid readers shaded to read the Hive ACID tables. We can definitely improve the performance if we move to native readers to read those tables. Especially in the case of Parquet, Spark’s native Parquet readers are optimized and we don't want to miss on that. Another reason is that we need to convert every row from Hive Structure to Internal Row which might add penalty too.
Following are the 2 important objectives for this exercise:
Performance: Figure out a performant way of using the native Spark readers.
Maintainability: Not make changes to Spark readers for Hive ACID. We would not like to make any changes to Spark readers or keep a fork of it which needs to be maintained later.
One of the solutions we are planning to evaluate is to use anti-join between base_files + delta files (let’s call it base relation) and on the delete_delta files (let’s call it delete relation). AntiJoin of base and delta relations on (rowId, bucket, transactionIds) can give us the result to read i.e.,
AnitJoin(base, delete)
However, Sort Merge Join can lead to extra shuffle that might cause a performance issue. If delete relation is more than the broadcast threshold then that can lead to SMJ. To ensure Broadcast to happen more frequently is to split the base and delete relations by bucket id and doing anti-join corresponding to the bucket Ids i.e.,
One of the issues specific to ORC is that the reader currently understands the ACID format. So if we just do this:
spark.read.format("orc").load(".../warehouse/acidtbl/delta_0000002_0000002_0000/bucket_00000")
following exception is thrown in SchemaEvolution:
if (this.readerIncluded != null && this.readerIncluded.length + this.readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
throw new IllegalArgumentException("Include vector the wrong length: " + this.readerSchema.toJson() + " with include length " + this.readerIncluded.length);
}
This means delta files are not even accessible to normal readers where it can read it like a normal ORC files with normal row ids.
What should be done when rowId has not been determined for base files yet ?
fix: dev: SPAR-4319: Added blobstore commit marker while doing insert overwrite. Also set hive conf when reading from spark.
Approved-by: Amogh Margoor <[email protected]>
Currently, ACID reader is currently using Hive ORC Acid readers shaded to read the Hive ACID tables. We can definitely improve the performance if we move to native readers to read those tables. Especially in the case of Parquet, Spark’s native Parquet readers are optimized and we don't want to miss on that. Another reason is that we need to convert every row from Hive Structure to Internal Row which might add penalty too.
Following are the 2 important objectives for this exercise:
Performance: Figure out a performant way of using the native Spark readers.
Maintainability: Not make changes to Spark readers for Hive ACID. We would not like to make any changes to Spark readers or keep a fork of it which needs to be maintained later.
One of the solutions we are planning to evaluate is to use anti-join between base_files + delta files (let’s call it base relation) and on the delete_delta files (let’s call it delete relation). AntiJoin of base and delta relations on (rowId, bucket, transactionIds) can give us the result to read i.e.,
AnitJoin(base, delete)
However, Sort Merge Join can lead to extra shuffle that might cause a performance issue. If delete relation is more than the broadcast threshold then that can lead to SMJ. To ensure Broadcast to happen more frequently is to split the base and delete relations by bucket id and doing anti-join corresponding to the bucket Ids i.e.,
AntiJoin(base, delete) = Union(AntiJoin(base_bucket_1, delete_bucket_1), AntiJoin(base_bucket_2, delete_bucket_2), … AntiJoin(base_bucket_n, delete_bucket_n))
This way joins are getting split and making sure the broadcast joins come into play here.
The text was updated successfully, but these errors were encountered: