-
Notifications
You must be signed in to change notification settings - Fork 703
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CARBONDATA-4242]Improve cdc performance and introduce new APIs for U…
…PSERT, DELETE, INSERT and UPDATE Why is this PR needed? 1. In the exiting solution, when we perform join of the source and target dataset for tagging records to delete, update and insert records, we were scanning all the data of target table and then perform join with source dataset. But it can happen that the source data is less and its range may cover only some 100s of Carbondata files out of 1000s of files in the target table. So pruning is main bottleneck here, so scanning all records and involving in join results in so much of shuffle and reduces performance. 2. Source data caching was not there, caching source data will help to improve its multiple scans and since input source data will be of less size, we can persist the dataset. 3. When we were performing join, we used to first get the Row object and then operate on it and then for each datatype casting happens to convert to spark datatype and then covert to InternalRow object for further processing of joined data. This will add extra deserializeToObject and map nodes in DAG and increase time. 4. Initially during tagging records(Join operation), we were preparing a new projection of required columns, which basically involves operations of preparing an internal row object as explained in point 3, and then apply eval function on each row to prepare a projection, so this basically applying same eval of expression on joined data, a repeated work and increases time. 5. In join operation we were using all the columns of source dataset and the required columns of target table like, join key column and other columns of tupleID, status_on_mergeds etc. So when we there will be so many columns in the table, then it will increase the execution time due to lot of data shuffling. 6. The current APIs of merge are little bit complex and generalized and confusing to user for simple Upsert, delete and insert operations. What changes were proposed in this PR? 1. Add a pruning logic before the join operations. Compare the incoming row with an interval based tree data structure which contains the Carbondata file path and min and max to identify the Carbondata file where the incoming row can be present, so that in some use case scenario which will be explained in later section, can give benefit and help to scan less files rather than blindly scanning all the Carbondata files in the target table. 2. Cache the incoming source dataset srcDS.cache(), so that the cached data will be used in all the operations and speed will be improved. Uncache() after the merge operation 3. Instead of operating on row object and then converting to InternalRow, directly operate on the InternalRow object to avoid the data type conversions. 4. Instead of evaluating the expression again based on required project columns on matching conditions and making new projection, directly identify the indexes required for output row and then directly access these indices on the incoming internal row object after step3, so evaluation is avoided and array access with indices will give O(1) performance. 5. During join or the tagging of records, do not include all the column data, just include the join key columns and identify the tupleIDs to delete and the rows to insert, this will avoid lot of shuffle and improve performance significantly. 6. Introduce new APIs for UPSERT, UPDATE, DELETE and INSERT and make the user exposed APIs simple. So now user just needs to give the key column for join, source dataset and the operation type as mentioned above. These new APIs will make use of all the improvements mentioned above and avoid unnecessary operations of the existing merge APIs. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #4148
- Loading branch information
1 parent
feb0521
commit 1e2fc4c
Showing
38 changed files
with
2,439 additions
and
221 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.