diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 97fb70a24d9..1a8131f0072 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2670,4 +2670,14 @@ private CarbonCommonConstants() { */ public static final String CARBON_SPARK3_VERSION = "2.2.0"; + /** + * This property is to enable the min max pruning of target carbon table based on input/source + * data + */ + @CarbonProperty + public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED = + "carbon.cdc.minmax.pruning.enabled"; + + public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT = "false"; + } diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java index 263fde9b721..47644458398 100644 --- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java @@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CdcVO; import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; @@ -102,6 +103,8 @@ public class IndexInputFormat extends FileInputFormat private Set missingSISegments; + private CdcVO cdcVO; + IndexInputFormat() { } @@ -275,6 +278,10 @@ public void write(DataOutput out) throws IOException { out.writeUTF(segment); } } + out.writeBoolean(cdcVO != null); + if (cdcVO != null) { + cdcVO.write(out); + } } @Override @@ -330,6 +337,11 @@ public void readFields(DataInput in) throws IOException { missingSISegments.add(in.readUTF()); } } + boolean isCDCJob = in.readBoolean(); + if (isCDCJob) { + this.cdcVO = new CdcVO(); + cdcVO.readFields(in); + } } private void initReadCommittedScope() throws IOException { @@ -353,6 +365,14 @@ public boolean isFallbackJob() { return isFallbackJob; } + public CdcVO getCdcVO() { + return cdcVO; + } + + public void setCdcVO(CdcVO cdcVO) { + this.cdcVO = cdcVO; + } + /** * @return Whether asyncCall to the IndexServer. */ diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java index 4a4e8023a7a..c4498062089 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java @@ -31,7 +31,7 @@ public class Blocklet implements Writable, Serializable { /** file path of this blocklet */ - private String filePath; + protected String filePath; /** id to identify the blocklet inside the block (it is a sequential number) */ private String blockletId; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index a25bd90d16e..b28d7ff8a34 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -21,15 +21,21 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.carbondata.core.index.IndexInputFormat; import org.apache.carbondata.core.index.Segment; import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes; import org.apache.carbondata.core.indexstore.row.IndexRow; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.mutate.CdcVO; +import org.apache.carbondata.core.mutate.FilePathMinMaxVO; import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.CarbonInputSplit; /** @@ -47,6 +53,8 @@ public class ExtendedBlocklet extends Blocklet { private boolean isCgIndexPresent = false; + private Map> columnToMinMaxMapping; + public ExtendedBlocklet() { } @@ -163,20 +171,36 @@ public void setColumnSchema(List columnSchema) { * Method to serialize extended blocklet and input split for index server * DataFormat * - * @param out - * @param uniqueLocation + * @param out data output to write the primitives to extended blocklet + * @param uniqueLocation location to write the blocklet in case of distributed pruning, ex: Lucene + * @param isExternalPath identification for the externam segment * @throws IOException */ - public void serializeData(DataOutput out, Map uniqueLocation, boolean isCountJob, - boolean isExternalPath) + public void serializeData(DataOutput out, Map uniqueLocation, + IndexInputFormat indexInputFormat, boolean isExternalPath) throws IOException { super.write(out); - if (isCountJob) { + if (indexInputFormat.isCountStarJob()) { // In CarbonInputSplit, getDetailInfo() is a lazy call. we want to avoid this during // countStar query. As rowCount is filled inside getDetailInfo(). In countStar case we may // not have proper row count. So, always take row count from indexRow. out.writeLong(inputSplit.getIndexRow().getInt(BlockletIndexRowIndexes.ROW_COUNT_INDEX)); out.writeUTF(inputSplit.getSegmentId()); + } else if (indexInputFormat.getCdcVO() != null) { + // In case of CDC, we just need the filepath and the min max of the blocklet,so just serialize + // these data to reduce less network transfer cost and faster cache access from index server. + out.writeUTF(inputSplit.getFilePath()); + List indexesToFetch = indexInputFormat.getCdcVO().getIndexesToFetch(); + for (Integer indexToFetch : indexesToFetch) { + byte[] minValues = CarbonUtil.getMinMaxValue(inputSplit.getIndexRow(), + BlockletIndexRowIndexes.MIN_VALUES_INDEX)[indexToFetch]; + out.writeInt(minValues.length); + out.write(minValues); + byte[] maxValues = CarbonUtil.getMinMaxValue(inputSplit.getIndexRow(), + BlockletIndexRowIndexes.MAX_VALUES_INDEX)[indexToFetch]; + out.writeInt(maxValues.length); + out.write(maxValues); + } } else { if (indexUniqueId == null) { out.writeBoolean(false); @@ -207,19 +231,35 @@ public void serializeData(DataOutput out, Map uniqueLocation, boo /** * Method to deserialize extended blocklet and input split for index server - * @param in - * @param locations - * @param tablePath + * @param in data input stream to read the primitives of extended blocklet + * @param locations locations of the input split + * @param tablePath carbon table path * @throws IOException */ public void deserializeFields(DataInput in, String[] locations, String tablePath, - boolean isCountJob) + boolean isCountJob, CdcVO cdcVO) throws IOException { super.readFields(in); if (isCountJob) { count = in.readLong(); segmentNo = in.readUTF(); return; + } else if (cdcVO != null) { + filePath = in.readUTF(); + this.columnToMinMaxMapping = new HashMap<>(); + for (String column : cdcVO.getColumnToIndexMap().keySet()) { + List minMaxOfColumnInList = new ArrayList<>(); + int minLength = in.readInt(); + byte[] minValuesForBlocklets = new byte[minLength]; + in.readFully(minValuesForBlocklets); + int maxLength = in.readInt(); + byte[] maxValuesForBlocklets = new byte[maxLength]; + in.readFully(maxValuesForBlocklets); + minMaxOfColumnInList + .add(new FilePathMinMaxVO(filePath, minValuesForBlocklets, maxValuesForBlocklets)); + this.columnToMinMaxMapping.put(column, minMaxOfColumnInList); + } + return; } if (in.readBoolean()) { indexUniqueId = in.readUTF(); @@ -243,4 +283,8 @@ public void deserializeFields(DataInput in, String[] locations, String tablePath public void setCgIndexPresent(boolean cgIndexPresent) { isCgIndexPresent = cgIndexPresent; } + + public Map> getColumnToMinMaxMapping() { + return columnToMinMaxMapping; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java index acc796e7d51..37ad4bd2bc2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java @@ -36,7 +36,9 @@ import org.apache.carbondata.core.datastore.compression.SnappyCompressor; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexInputFormat; import org.apache.carbondata.core.metadata.schema.table.Writable; +import org.apache.carbondata.core.mutate.CdcVO; import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream; import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream; import org.apache.carbondata.core.stream.ExtendedDataInputStream; @@ -65,20 +67,20 @@ public ExtendedBlockletWrapper() { } - public ExtendedBlockletWrapper(List extendedBlockletList, String tablePath, - String queryId, boolean isWriteToFile, boolean isCountJob) { + public ExtendedBlockletWrapper(List extendedBlockletList, + IndexInputFormat indexInputFormat) { Map uniqueLocations = new HashMap<>(); - byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob); + byte[] bytes = convertToBytes(indexInputFormat, uniqueLocations, extendedBlockletList); int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD, CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024; DataOutputStream stream = null; // if data size is more then data will be written in file and file name will be sent from // executor to driver, in case of any failure data will send through network - if (bytes.length > serializeAllowedSize && isWriteToFile) { + if (bytes.length > serializeAllowedSize && indexInputFormat.isWriteToFile()) { final String fileName = UUID.randomUUID().toString(); String folderPath = CarbonUtil.getIndexServerTempPath() - + CarbonCommonConstants.FILE_SEPARATOR + queryId; + + CarbonCommonConstants.FILE_SEPARATOR + indexInputFormat.getQueryId(); try { final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath); boolean isFolderExists = true; @@ -115,15 +117,16 @@ public ExtendedBlockletWrapper(List extendedBlockletList, Stri } } - private byte[] convertToBytes(String tablePath, Map uniqueLocations, - List extendedBlockletList, boolean isCountJob) { + private byte[] convertToBytes(IndexInputFormat indexInputFormat, + Map uniqueLocations, List extendedBlockletList) { ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream(); DataOutputStream stream = new DataOutputStream(bos); + String tablePath = indexInputFormat.getCarbonTable().getTablePath(); try { for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) { boolean isExternalPath = !extendedBlocklet.getFilePath().startsWith(tablePath); extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, "")); - extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob, isExternalPath); + extendedBlocklet.serializeData(stream, uniqueLocations, indexInputFormat, isExternalPath); } byte[] input = bos.toByteArray(); return new SnappyCompressor().compressByte(input, input.length); @@ -171,8 +174,8 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data, * @return * @throws IOException */ - public List readBlocklet(String tablePath, String queryId, boolean isCountJob) - throws IOException { + public List readBlocklet(String tablePath, String queryId, boolean isCountJob, + CdcVO cdcVO) throws IOException { byte[] data; if (bytes != null) { if (isWrittenToFile) { @@ -216,7 +219,7 @@ public List readBlocklet(String tablePath, String queryId, boo try { for (int i = 0; i < numberOfBlocklet; i++) { ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet(); - extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob); + extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob, cdcVO); extendedBlockletList.add(extendedBlocklet); } } finally { diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java index 7e998afbd8e..4d473d9a028 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java @@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.index.IndexInputFormat; +import org.apache.carbondata.core.mutate.CdcVO; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; @@ -62,8 +64,8 @@ public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] extendedBlockl this.isFallbackJob = isFallbackJob; } - public List getExtendedBlocklets(String tablePath, String queryId, - boolean isCountJob) throws IOException { + public List getExtendedBlocklets(IndexInputFormat indexInputFormat) + throws IOException { if (!isFallbackJob) { int numOfThreads = CarbonProperties.getNumOfThreadsForPruning(); ExecutorService executorService = Executors @@ -85,8 +87,9 @@ public List getExtendedBlocklets(String tablePath, String quer List>> futures = new ArrayList<>(); for (int value : split) { end += value; - futures.add(executorService.submit( - new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob))); + futures.add(executorService.submit(new ExtendedBlockletDeserializerThread(start, end, + indexInputFormat.getCarbonTable().getTablePath(), indexInputFormat.getQueryId(), + indexInputFormat.isCountStarJob(), indexInputFormat.getCdcVO()))); start += value; } executorService.shutdown(); @@ -109,8 +112,10 @@ public List getExtendedBlocklets(String tablePath, String quer } else { List extendedBlocklets = new ArrayList<>(); for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) { - extendedBlocklets - .addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob)); + extendedBlocklets.addAll(extendedBlockletWrapper + .readBlocklet(indexInputFormat.getCarbonTable().getTablePath(), + indexInputFormat.getQueryId(), indexInputFormat.isCountStarJob(), + indexInputFormat.getCdcVO())); } return extendedBlocklets; } @@ -128,13 +133,16 @@ private class ExtendedBlockletDeserializerThread implements Callable call() throws Exception { List extendedBlocklets = new ArrayList<>(); for (int i = start; i < end; i++) { extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId, - isCountJob)); + isCountJob, cdcVO)); } return extendedBlocklets; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java index 7aef9516d4c..01da0b287df 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java @@ -67,6 +67,8 @@ import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; +import static org.apache.carbondata.core.util.CarbonUtil.getMinMaxValue; + import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; @@ -886,15 +888,6 @@ public String getTableTaskInfo(int index) { } } - private byte[][] getMinMaxValue(IndexRow row, int index) { - IndexRow minMaxRow = row.getRow(index); - byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; - for (int i = 0; i < minMax.length; i++) { - minMax[i] = minMaxRow.getByteArray(i); - } - return minMax; - } - private boolean[] getMinMaxFlag(IndexRow row, int index) { IndexRow minMaxFlagRow = row.getRow(index); boolean[] minMaxFlag = new boolean[minMaxFlagRow.getColumnCount()]; diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java b/core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java new file mode 100644 index 00000000000..41919a63c1a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.mutate; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.io.Writable; + +/** + * VO object which contains the info used in CDC case during cache loading in Index server + */ +public class CdcVO implements Serializable, Writable { + + /** + * This collection contains column to index mapping which give info about the index for a column + * in IndexRow object to fetch min max + */ + private Map columnToIndexMap; + + /** + * This list will contain the column indexes to fetch from the min max row in blocklet + */ + private List indexesToFetch; + + public CdcVO(Map columnToIndexMap) { + this.columnToIndexMap = columnToIndexMap; + } + + public CdcVO() { + } + + public Map getColumnToIndexMap() { + return columnToIndexMap; + } + + public List getIndexesToFetch() { + return indexesToFetch; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + Collection indexesToFetch = columnToIndexMap.values(); + dataOutput.writeInt(indexesToFetch.size()); + for (Integer index : indexesToFetch) { + dataOutput.writeInt(index); + dataOutput.writeInt(index); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + this.indexesToFetch = new ArrayList<>(); + int lengthOfIndexes = dataInput.readInt(); + for (int i = 0; i < lengthOfIndexes; i++) { + indexesToFetch.add(dataInput.readInt()); + } + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/FilePathMinMaxVO.java b/core/src/main/java/org/apache/carbondata/core/mutate/FilePathMinMaxVO.java new file mode 100644 index 00000000000..a904a8c3072 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/mutate/FilePathMinMaxVO.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.mutate; + +import java.io.Serializable; + +/** + * VO class for filePath and Min Max for each blocklet + */ +public class FilePathMinMaxVO implements Serializable { + + private String filePath; + + private byte[] min; + + private byte[] max; + + public FilePathMinMaxVO(String filePath, byte[] min, byte[] max) { + this.filePath = filePath; + this.min = min; + this.max = max; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public byte[] getMin() { + return min; + } + + public void setMin(byte[] min) { + this.min = min; + } + + public byte[] getMax() { + return max; + } + + public void setMax(byte[] max) { + this.max = max; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java b/core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java new file mode 100644 index 00000000000..3c503d0d9a2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.range; + +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.Set; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.comparator.SerializableComparator; + +import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET; + +/** + * This class prepares a tree for pruning using min-max of block + */ +public class BlockMinMaxTree implements Serializable { + + private MinMaxNode root; + + private final boolean isPrimitiveAndNotDate; + private final boolean isDimensionColumn; + private final DataType joinDataType; + private final SerializableComparator comparator; + + public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean isDimensionColumn, + DataType joinDataType, SerializableComparator comparator) { + this.isPrimitiveAndNotDate = isPrimitiveAndNotDate; + this.isDimensionColumn = isDimensionColumn; + this.joinDataType = joinDataType; + this.comparator = comparator; + } + + public MinMaxNode getRoot() { + return root; + } + + public void insert(MinMaxNode newMinMaxNode) { + root = insert(getRoot(), newMinMaxNode); + } + + private MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) { + /* 1. check if the root null, then insert and make new node + * 2. check if the new node completely overlaps with the root, where minCompare and maxCompare + * both are zero, if yes add the filepaths and return + * 3. if root is less than new node, check if the root has right subtree, + * if(yes) { + * replace the right node with the newnode's min and max based on comparison and then + * call insert with right node as new root and newnode + * insert(root.getRight, newnode) + * } else { + * make the new node as right node and set right node and return + * } + * 4. if root is more than new node, check if the root has left subtree, + * if(yes) { + * replace the left node with the newnode's min and max based on comparison and then + * call insert with left node as new root and newnode + * insert(root.getLeft, newnode) + * } else { + * make the new node as left node and set left node and return + * } + * */ + if (root == null) { + root = newMinMaxNode; + return root; + } + + if (compareNodesBasedOnMinMax(root, newMinMaxNode) == 0) { + root.addFilePats(newMinMaxNode.getFilePaths()); + return root; + } + + if (compareNodesBasedOnMinMax(root, newMinMaxNode) < 0) { + if (root.getRightSubTree() == null) { + root.setRightSubTree(newMinMaxNode); + root.setRightSubTreeMax(newMinMaxNode.getMax()); + root.setRightSubTreeMin(newMinMaxNode.getMin()); + } else { + if (compareMinMax(root.getRightSubTreeMax(), newMinMaxNode.getMax()) < 0) { + root.setRightSubTreeMax(newMinMaxNode.getMax()); + } + if (compareMinMax(root.getRightSubTreeMin(), newMinMaxNode.getMin()) > 0) { + root.setRightSubTreeMin(newMinMaxNode.getMin()); + } + insert(root.getRightSubTree(), newMinMaxNode); + } + } else { + if (root.getLeftSubTree() == null) { + root.setLeftSubTree(newMinMaxNode); + root.setLeftSubTreeMax(newMinMaxNode.getMax()); + root.setLeftSubTreeMin(newMinMaxNode.getMin()); + } else { + if (compareMinMax(root.getLeftSubTreeMax(), newMinMaxNode.getMax()) < 0) { + root.setLeftSubTreeMax(newMinMaxNode.getMax()); + } + if (compareMinMax(root.getLeftSubTreeMin(), newMinMaxNode.getMin()) > 0) { + root.setLeftSubTreeMin(newMinMaxNode.getMin()); + } + insert(root.getLeftSubTree(), newMinMaxNode); + } + } + return root; + } + + private int compareNodesBasedOnMinMax(MinMaxNode root, MinMaxNode newMinMaxNode) { + int minCompare = compareMinMax(root.getMin(), newMinMaxNode.getMin()); + int maxCompare = compareMinMax(root.getMax(), newMinMaxNode.getMax()); + if (minCompare == 0) { + return maxCompare; + } else { + return minCompare; + } + } + + private int compareMinMax(Object key1, Object key2) { + if (isDimensionColumn) { + if (isPrimitiveAndNotDate) { + return comparator.compare(key1, key2); + } else { + return ByteUtil.UnsafeComparer.INSTANCE + .compareTo(key1.toString().getBytes(Charset.forName(DEFAULT_CHARSET)), + key2.toString().getBytes(Charset.forName(DEFAULT_CHARSET))); + } + } else { + return comparator.compare(key1, key2); + } + } + + /** + * This method returns the list of carbondata files where the input fieldValue might present + */ + public Set getMatchingFiles(byte[] fieldValue, Set matchedFilesSet) { + getMatchingFiles(getRoot(), fieldValue, matchedFilesSet); + return matchedFilesSet; + } + + private void getMatchingFiles(MinMaxNode root, byte[] fieldValue, Set matchedFilesSet) { + Object data; + if (root == null) { + return; + } + if (isDimensionColumn) { + if (isPrimitiveAndNotDate) { + data = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(fieldValue, joinDataType); + } else { + // for string comparator is Unsafe comparator, so cannot include in common code + getMatchingFilesForString(root, fieldValue, matchedFilesSet); + return; + } + } else { + data = DataTypeUtil.getMeasureObjectFromDataType(fieldValue, joinDataType); + } + + if (comparator.compare(data, root.getMax()) <= 0 + && comparator.compare(data, root.getMin()) >= 0) { + matchedFilesSet.addAll(root.getFilePaths()); + } + + if (root.getLeftSubTree() != null && comparator.compare(data, root.getLeftSubTreeMax()) <= 0 + && comparator.compare(data, root.getLeftSubTreeMin()) >= 0) { + getMatchingFiles(root.getLeftSubTree(), fieldValue, matchedFilesSet); + } + + if (root.getRightSubTree() != null && comparator.compare(data, root.getRightSubTreeMax()) <= 0 + && comparator.compare(data, root.getRightSubTreeMin()) >= 0) { + getMatchingFiles(root.getRightSubTree(), fieldValue, matchedFilesSet); + } + } + + private void getMatchingFilesForString(MinMaxNode root, byte[] fieldValue, + Set matchedFilesSet) { + if (root == null) { + return; + } + if (ByteUtil.UnsafeComparer.INSTANCE + .compareTo(fieldValue, root.getMin().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) + >= 0 && ByteUtil.UnsafeComparer.INSTANCE + .compareTo(fieldValue, root.getMax().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) + <= 0) { + matchedFilesSet.addAll(root.getFilePaths()); + } + if (root.getLeftSubTree() != null && ByteUtil.UnsafeComparer.INSTANCE.compareTo(fieldValue, + root.getLeftSubTreeMin().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(fieldValue, + root.getLeftSubTreeMax().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) <= 0) { + getMatchingFilesForString(root.getLeftSubTree(), fieldValue, matchedFilesSet); + } + + if (root.getRightSubTree() != null && ByteUtil.UnsafeComparer.INSTANCE.compareTo(fieldValue, + root.getRightSubTreeMin().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(fieldValue, + root.getRightSubTreeMax().toString().getBytes(Charset.forName(DEFAULT_CHARSET))) <= 0) { + getMatchingFilesForString(root.getRightSubTree(), fieldValue, matchedFilesSet); + } + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java b/core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java new file mode 100644 index 00000000000..2fc99293cfe --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.range; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Each node to be inserted in BlockMinMaxTree for pruning. + */ +public class MinMaxNode implements Serializable { + + // list of files present in same range of min max of this node + private List filePaths = new ArrayList<>(); + + private Object min; + + private Object max; + + private MinMaxNode leftSubTree; + private MinMaxNode rightSubTree; + private Object leftSubTreeMax; + private Object leftSubTreeMin; + private Object rightSubTreeMax; + private Object rightSubTreeMin; + + public MinMaxNode(String filePaths, Object min, Object max) { + this.filePaths.add(filePaths); + this.min = min; + this.max = max; + } + + public void addFilePats(List filePaths) { + this.filePaths.addAll(filePaths); + } + + public List getFilePaths() { + return filePaths; + } + + public void setFilePaths(List filePaths) { + this.filePaths = filePaths; + } + + public Object getMin() { + return min; + } + + public void setMin(Object min) { + this.min = min; + } + + public Object getMax() { + return max; + } + + public void setMax(Object max) { + this.max = max; + } + + public MinMaxNode getLeftSubTree() { + return leftSubTree; + } + + public void setLeftSubTree(MinMaxNode leftSubTree) { + this.leftSubTree = leftSubTree; + } + + public MinMaxNode getRightSubTree() { + return rightSubTree; + } + + public void setRightSubTree(MinMaxNode rightSubTree) { + this.rightSubTree = rightSubTree; + } + + public Object getLeftSubTreeMax() { + return leftSubTreeMax; + } + + public void setLeftSubTreeMax(Object leftSubTreeMax) { + this.leftSubTreeMax = leftSubTreeMax; + } + + public Object getLeftSubTreeMin() { + return leftSubTreeMin; + } + + public void setLeftSubTreeMin(Object leftSubTreeMin) { + this.leftSubTreeMin = leftSubTreeMin; + } + + public Object getRightSubTreeMax() { + return rightSubTreeMax; + } + + public void setRightSubTreeMax(Object rightSubTreeMax) { + this.rightSubTreeMax = rightSubTreeMax; + } + + public Object getRightSubTreeMin() { + return rightSubTreeMin; + } + + public void setRightSubTreeMin(Object rightSubTreeMin) { + this.rightSubTreeMin = rightSubTreeMin; + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java new file mode 100644 index 00000000000..49325509e09 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.expression.conditional; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.ExpressionResult; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; + +/** + * This expression will be added to Index filter when CDC pruning is enabled. + */ +public class CDCBlockImplicitExpression extends Expression { + + Set blocksToScan; + + public CDCBlockImplicitExpression(String blockPathValues) { + blocksToScan = + Arrays.stream(blockPathValues.split(",")).map(String::trim).collect(Collectors.toSet()); + } + + @Override + public ExpressionResult evaluate(RowIntf value) { + throw new UnsupportedOperationException("Not allowed on Implicit expression"); + } + + @Override + public ExpressionType getFilterExpressionType() { + return ExpressionType.IMPLICIT; + } + + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { + throw new UnsupportedOperationException("Not allowed on Implicit expression"); + } + + @Override + public String getString() { + return null; + } + + @Override + public String getStatement() { + return null; + } + + public Set getBlocksToScan() { + return blocksToScan; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index 1acae54340e..571eb2e3d1a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -55,6 +55,7 @@ import org.apache.carbondata.core.scan.expression.logical.OrExpression; import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.scan.filter.executer.AndFilterExecutorImpl; +import org.apache.carbondata.core.scan.filter.executer.CDCBlockImplicitExecutorImpl; import org.apache.carbondata.core.scan.filter.executer.DimColumnExecutorFilterInfo; import org.apache.carbondata.core.scan.filter.executer.ExcludeFilterExecutorImpl; import org.apache.carbondata.core.scan.filter.executer.FalseFilterExecutor; @@ -196,6 +197,12 @@ private static FilterExecutor createFilterExecutorTree( return filterExecutor; } } + if (filterExpressionResolverTree + .getFilterExpression() instanceof CDCBlockImplicitExpression) { + return new CDCBlockImplicitExecutorImpl( + ((CDCBlockImplicitExpression) filterExpressionResolverTree.getFilterExpression()) + .getBlocksToScan()); + } return new RowLevelFilterExecutorImpl( ((RowLevelFilterResolverImpl) filterExpressionResolverTree) .getDimColEvaluatorInfoList(), diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java new file mode 100644 index 00000000000..385423eb6a1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.filter.executer; + +import java.util.BitSet; +import java.util.Set; + +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; +import org.apache.carbondata.core.util.BitSetGroup; + +/** + * This filter executor class will be called when the CDC pruning is enabled. + */ +public class CDCBlockImplicitExecutorImpl implements FilterExecutor, ImplicitColumnFilterExecutor { + + private final Set blocksToScan; + + public CDCBlockImplicitExecutorImpl(Set blocksToScan) { + this.blocksToScan = blocksToScan; + } + + @Override + public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue, + String uniqueBlockPath, boolean[] isMinMaxSet) { + BitSet bitSet = new BitSet(1); + if (blocksToScan.contains(uniqueBlockPath)) { + bitSet.set(0); + } + return bitSet; + } + + @Override + public Boolean isFilterValuesPresentInAbstractIndex(byte[][] maxValue, byte[][] minValue, + boolean[] isMinMaxSet) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + @Override + public BitSetGroup applyFilter(RawBlockletColumnChunks rawBlockletColumnChunks, + boolean useBitsetPipeLine) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + @Override + public BitSet prunePages(RawBlockletColumnChunks rawBlockletColumnChunks) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + @Override + public boolean applyFilter(RowIntf value, int dimOrdinalMax) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + @Override + public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue, + boolean[] isMinMaxSet) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + @Override + public void readColumnChunks(RawBlockletColumnChunks rawBlockletColumnChunks) { + throw new UnsupportedOperationException("Unsupported operation"); + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 68919ba7813..14ad2c9f1db 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -70,6 +70,7 @@ import org.apache.carbondata.core.index.Segment; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.indexstore.row.IndexRow; import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; @@ -3508,4 +3509,17 @@ public static void updateNullValueBasedOnDatatype(DataOutputStream dataOutputStr public static boolean isComplexColumn(String colName) { return colName.contains(".val") || colName.contains(CarbonCommonConstants.POINT); } + + /** + * This method returns the minmax value from the index row object + * @return minmax byte array + */ + public static byte[][] getMinMaxValue(IndexRow row, int index) { + IndexRow minMaxRow = row.getRow(index); + byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; + for (int i = 0; i < minMax.length; i++) { + minMax[i] = minMaxRow.getByteArray(i); + } + return minMax; + } } diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index f50728a15c4..7a1b610b4e2 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -50,7 +50,8 @@ This section provides the details of all the configurations required for the Car | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. | | carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. | | carbon.trash.retention.days | 7 | This parameter specifies the number of days after which the timestamp based subdirectories are expired in the trash folder. Allowed Min value = 0, Allowed Max Value = 365 days| -| carbon.clean.file.force.allowed | false | This paramter specifies if the clean files operation with force option is allowed or not.| +| carbon.clean.file.force.allowed | false | This parameter specifies if the clean files operation with force option is allowed or not.| +| carbon.cdc.minmax.pruning.enabled | false | This parameter defines whether the min max pruning to be performed on the target table based on the source data. It will be useful when data is not sparse across target table which results in better pruning.| ## Data Loading Configuration diff --git a/docs/scd-and-cdc-guide.md b/docs/scd-and-cdc-guide.md index 38a2973cf46..e7c14f47311 100644 --- a/docs/scd-and-cdc-guide.md +++ b/docs/scd-and-cdc-guide.md @@ -15,7 +15,7 @@ limitations under the License. --> -# Upsert into a Carbon DataSet using Merge +# Upsert into a Carbon DataSet using Merge and UPSERT APIs ## SCD and CDC Scenarios Change Data Capture (CDC), is to apply all data changes generated from an external data set @@ -25,7 +25,7 @@ table needs to be applied to a target table. Slowly Changing Dimensions (SCD), are the dimensions in which the data changes slowly, rather than changing regularly on a time basis. -SCD and CDC data changes can be merged to a carbon dataset online using the data frame level `MERGE` API. +SCD and CDC data changes can be merged to a carbon dataset online using the data frame level `MERGE`, `UPSERT`, `UPDATE`, `DELETE` and `INSERT` APIs. #### MERGE API @@ -43,6 +43,38 @@ Below API merges the datasets online and applies the actions as per the conditio .execute() ``` +### UPSERT API +Below API upsert the input source dataset onto the target carbondata table based on the key column and dataset provided by the user or the application. + +``` + targetDS.upsert(sourceDS, ) + .execute() +``` + +### DELETE API +Below API deletes the data present in the target carbondata table based on the key column and dataset provided by the user or the application. + +``` + targetDS.delete(sourceDS, ) + .execute() +``` + +### UPDATE API +Below API updates the data present in the target carbondata table based on the key column and dataset provided by the user or the application. + +``` + targetDS.update(sourceDS, ) + .execute() +``` + +### INSERT API +Below API inserts the input source dataset onto the target carbondata table based on the key column and dataset provided by the user or the application. + +``` + targetDS.insert(sourceDS, ) + .execute() +``` + #### MERGE API Operation Semantics Below is the detailed description of the `merge` API operation. * `merge` will merge the datasets based on a condition. @@ -55,6 +87,10 @@ Below is the detailed description of the `merge` API operation. * `whenNotMatched` clause can have only the `insertExpr` action. The new row is generated based on the specified column and corresponding expressions. Users do not need to specify all the columns in the target table. For unspecified target columns, NULL is inserted. * `whenNotMatchedAndExistsOnlyOnTarget` clause is executed when row does not match source and exists only in target. This clause can have only delete action. +#### UPSERT API Operation Semantics +* `upsert`, `delete`, `insert` and `update` APIs will help to perform specified operations on the target carbondata table. +* All the APIs expects two parameters source dataset and the key column on which the merge has to be performed. + #### MERGE SQL Below sql merges a set of updates, insertions, and deletions based on a source table @@ -90,7 +126,9 @@ clauses can have at most one UPDATE and one DELETE action, These clauses have th * To insert all the columns of the target carbondata table with the corresponding columns of the source dataset, use INSERT *. This is equivalent to INSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...]) for all the columns of the target carbondata table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an error. * `not_matched_action` can be INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...]) +**Note: Merge SQL is not yet supported for the UPSERT, DELETE, INSERT and UPDATE APIs.** ##### Example code to implement cdc/scd scenario -Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using api. -Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql. +* Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using APIs. +* Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql. +* Please refer example class [DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala) to understand and implement cdc using UPSERT APIs. \ No newline at end of file diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala new file mode 100644 index 00000000000..5e38f7fe319 --- /dev/null +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.spark.sql.CarbonSession.DataSetMerge +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +import org.apache.carbondata.examples.util.ExampleUtils + +/** + * Example for UPSERT APIs + */ +object DataUPSERTExample { + + def main(args: Array[String]): Unit = { + val spark = ExampleUtils.createSparkSession("DataUPSERTExample") + performUPSERT(spark) + } + + def performUPSERT(spark: SparkSession): Unit = { + spark.sql("drop table if exists target") + val initframe = spark.createDataFrame(Seq( + Row("a", "0"), + Row("b", "1"), + Row("c", "2"), + Row("d", "3") + ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) + initframe.write + .format("carbondata") + .option("tableName", "target") + .mode(SaveMode.Overwrite) + .save() + val target = spark.read.format("carbondata").option("tableName", "target").load() + var cdc = + spark.createDataFrame(Seq( + Row("a", "7"), + Row("b", null), + Row("g", null), + Row("e", "3") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + spark.sql("select * from target").show(false) + // upsert API updates a and b, inserts e and g + target.as("A").upsert(cdc.as("B"), "key").execute() + spark.sql("select * from target").show(false) + + cdc = + spark.createDataFrame(Seq( + Row("a", "7"), + Row("e", "3") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // delete API, deletes a and e + target.as("A").delete(cdc.as("B"), "key").execute() + spark.sql("select * from target").show(false) + + cdc = + spark.createDataFrame(Seq( + Row("g", "56") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // update API, updates g + target.as("A").update(cdc.as("B"), "key").execute() + spark.sql("select * from target").show(false) + + cdc = + spark.createDataFrame(Seq( + Row("z", "234"), + Row("x", "2") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // insert API, inserts z and x. + target.as("A").insert(cdc.as("B"), "key" ).execute() + + spark.sql("select * from target").show(false) + } + +} diff --git a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala index 36e60ba2d26..23de19d6503 100644 --- a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala +++ b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala @@ -26,12 +26,12 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.expressions.codegen.{ExprCode, GeneratePredicate} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, InterpretedPredicate, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, QueryContext, SkewSpecContext, TablePropertyListContext} import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoTable, Join, LogicalPlan, OneRowRelation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode} @@ -427,6 +427,9 @@ trait SparkVersionAdapter { s.typeName } + def evaluateWithPredicate(exp: Expression, schema: Seq[Attribute], row: InternalRow): Any = { + InterpretedPredicate.create(exp, schema).expression.eval(row) + } } case class CarbonBuildSide(buildSide: BuildSide) { diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala index b492298984f..dac47ffad01 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala @@ -126,8 +126,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS SparkEnv.get.blockManager.blockManagerId.executorId }" val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList - .asJava, indexInputFormat.getCarbonTable.getTablePath, indexInputFormat.getQueryId, - indexInputFormat.isWriteToFile, indexInputFormat.isCountStarJob)) + .asJava, indexInputFormat)) Iterator(value) } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala index 5eb1fd4382e..1994e15b96c 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala @@ -426,7 +426,6 @@ object DistributedRDDUtils { request.getValidSegments, new java.util.ArrayList(), new java.util.ArrayList()) - new ExtendedBlockletWrapper(blocklets, request.getCarbonTable.getTablePath, request.getQueryId, - request.isWriteToFile, request.isCountStarJob) + new ExtendedBlockletWrapper(blocklets, request) } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala index ce344a610b6..363e1c4fbe8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala @@ -88,8 +88,7 @@ class DistributedIndexJob extends AbstractIndexJob { IndexServer.getClient } client.getSplits(indexFormat) - .getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath, indexFormat - .getQueryId, indexFormat.isCountStarJob) + .getExtendedBlocklets(indexFormat) } finally { if (null != splitFolderPath && !splitFolderPath.deleteFile()) { LOGGER.error("Problem while deleting the temp directory:" @@ -162,8 +161,7 @@ class EmbeddedIndexJob extends AbstractIndexJob { val originalJobDesc = spark.sparkContext.getLocalProperty("spark.job.description") indexFormat.setIsWriteToFile(false) indexFormat.setFallbackJob() - val splits = IndexServer.getSplits(indexFormat).getExtendedBlocklets(indexFormat - .getCarbonTable.getTablePath, indexFormat.getQueryId, indexFormat.isCountStarJob) + val splits = IndexServer.getSplits(indexFormat).getExtendedBlocklets(indexFormat) // Fire a job to clear the cache from executors as Embedded mode does not maintain the cache. if (!indexFormat.isJobToClearIndexes) { IndexServer.invalidateSegmentCache(indexFormat.getCarbonTable, indexFormat diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 4d36fdf43b7..fcb379ce251 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -52,9 +52,9 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.readcommitter.ReadCommittedScope -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression -import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.expression.{BinaryExpression, Expression} +import org.apache.carbondata.core.scan.expression.conditional.{CDCBlockImplicitExpression, ImplicitExpression} +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, TrueExpression} import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} @@ -699,18 +699,43 @@ class CarbonScanRDD[T: ClassTag]( private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = { CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz) - val tableInfo1 = getTableInfo - CarbonInputFormat.setTableInfo(conf, tableInfo1) + val tableInfo = getTableInfo + CarbonInputFormat.setTableInfo(conf, tableInfo) if (indexFilter != null) { - indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo1)) + indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo)) + val children = indexFilter.getExpression.getChildren + // if the children of the filter contains CDCBlockImplicitExpression, set that to true + // expression here, as we don't need this in executor evaluation + children.asScala.zipWithIndex.foreach { case (child, index) => + if (child.isInstanceOf[CDCBlockImplicitExpression]) { + indexFilter.getExpression.getChildren.set(index, new TrueExpression(null)) + setCDCExpressionToTrue(indexFilter) + } + } } CarbonInputFormat.setFilterPredicates(conf, indexFilter) - CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) - CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) + CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) + CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz) createInputFormat(conf) } + /** + * When min max pruning is enabled in merge operation, CDCBlockImplicitExpression will be present + * which will be used for pruning in driver side. For executor make this as true expression. + * @param indexFilter + */ + def setCDCExpressionToTrue(indexFilter: IndexFilter): Unit = { + if (indexFilter.getExpression.asInstanceOf[BinaryExpression].getLeft + .isInstanceOf[CDCBlockImplicitExpression]) { + indexFilter.setExpression(new AndExpression(new TrueExpression(null), + indexFilter.getExpression.asInstanceOf[BinaryExpression].getRight)) + } else { + indexFilter.setExpression(new AndExpression( + indexFilter.getExpression.asInstanceOf[BinaryExpression].getLeft, new TrueExpression(null))) + } + } + private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = { val format = new CarbonFileInputFormat[Object] CarbonInputFormat.setTablePath(conf, diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index f08bf9e2a7b..4508c14da66 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -58,6 +58,13 @@ object CarbonSparkUtil { } } + def collectNonCarbonRelation(plan: LogicalPlan): Seq[LogicalRelation] = { + plan.collect { + case l: LogicalRelation => + l + } + } + /** * return's the formatted column comment if column comment is present else empty("") * diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index b52bc200c6a..b73772f0975 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTa import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener} import org.apache.spark.sql.execution.command.CreateFunctionCommand +import org.apache.spark.sql.execution.command.mutation.merge.udf.BlockPathsUDF import org.apache.spark.sql.hive._ import org.apache.spark.sql.listeners._ import org.apache.spark.sql.profiler.Profiler @@ -81,6 +82,7 @@ class CarbonEnv { sparkSession.udf.register("getTupleId", () => "") sparkSession.udf.register("getPositionId", () => "") + sparkSession.udf.register("getBlockPaths", new BlockPathsUDF) // add NI as a temp function, for queries to not hit SI table, it will be added as HiveSimpleUDF CreateFunctionCommand( databaseName = None, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala index df679af52d6..52e7d514eb3 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -25,7 +25,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.command.mutation.merge.MergeDataSetBuilder +import org.apache.spark.sql.execution.command.mutation.merge.{MergeDataSetBuilder, MergeOperationType, UpsertBuilder} import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.sql.profiler.{Profiler, SQLStart} import org.apache.spark.sql.util.SparkSQLUtil @@ -293,5 +293,29 @@ object CarbonSession { def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = { new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession) } + + def update(srcDS: Dataset[Row], keyColumn: String): UpsertBuilder = { + merge(srcDS, keyColumn, MergeOperationType.UPDATE.toString) + } + + def delete(srcDS: Dataset[Row], keyColumn: String): UpsertBuilder = { + merge(srcDS, keyColumn, MergeOperationType.DELETE.toString) + } + + def insert(srcDS: Dataset[Row], keyColumn: String): UpsertBuilder = { + merge(srcDS, keyColumn, MergeOperationType.INSERT.toString) + } + + def upsert(srcDS: Dataset[Row], keyColumn: String): UpsertBuilder = { + merge(srcDS, keyColumn, MergeOperationType.UPSERT.toString) + } + + private def merge( + srcDS: Dataset[Row], + keyColumn: String, + operationType: String): UpsertBuilder = { + new UpsertBuilder(ds, srcDS, keyColumn, operationType, ds.sparkSession) + } + } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala index 895f22a0520..d29b531c79a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala @@ -20,37 +20,39 @@ import java.util import java.util.UUID import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.util.control.Breaks._ import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, CarbonThreadUtil, Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, CarbonToSparkAdapter, Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.avro.AvroFileFormatFactory import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, Expression, GenericInternalRow, GenericRowWithSchema} -import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.execution.{LogicalRDD, ProjectExec} import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors, UpdateTableModel} -import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LongAccumulator} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.mutate.FilePathMinMaxVO +import org.apache.carbondata.core.range.{BlockMinMaxTree, MinMaxNode} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil} +import org.apache.carbondata.core.util.comparator.{Comparator, SerializableComparator} import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -63,7 +65,9 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil case class CarbonMergeDataSetCommand( targetDsOri: Dataset[Row], srcDS: Dataset[Row], - var mergeMatches: MergeDataSetMatches) + var mergeMatches: MergeDataSetMatches = null, + keyColumn: String = null, + operationType: String = null) extends DataCommand { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -79,90 +83,313 @@ case class CarbonMergeDataSetCommand( */ override def processData(sparkSession: SparkSession): Seq[Row] = { val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan) - // Target dataset must be backed by carbondata table. + val st = System.currentTimeMillis() + val targetDsAliasName = targetDsOri.logicalPlan match { + case alias: SubqueryAlias => + alias.alias + case _ => null + } + val sourceAliasName = srcDS.logicalPlan match { + case alias: SubqueryAlias => + alias.alias + case _ => null + } if (relations.length != 1) { throw new UnsupportedOperationException( "Carbon table supposed to be present in merge dataset") } - // validate the merge matches and actions. - validateMergeActions(mergeMatches, targetDsOri, sparkSession) - val carbonTable = relations.head.carbonRelation.carbonTable - val hasDelAction = mergeMatches.matchList - .exists(_.getActions.exists(_.isInstanceOf[DeleteAction])) - val hasUpdateAction = mergeMatches.matchList - .exists(_.getActions.exists(_.isInstanceOf[UpdateAction])) - val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches) - // Get all the required columns of targetDS by going through all match conditions and actions. - val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession) + // Target dataset must be backed by carbondata table. + val targetCarbonTable = relations.head.carbonRelation.carbonTable // select only the required columns, it can avoid lot of and shuffling. - val targetDs = targetDsOri.select(columns: _*) - // Update the update mapping with unfilled columns.From here on system assumes all mappings - // are existed. - mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs) - // Lets generate all conditions combinations as one column and add them as 'status'. - val condition = generateStatusColumnWithAllCombinations(mergeMatches) - - // decide join type based on match conditions - val joinType = decideJoinType + val targetDs = if (mergeMatches == null && operationType != null) { + targetDsOri.select(keyColumn) + } else { + // Get all the required columns of targetDS by going through all match conditions and actions. + val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession) + targetDsOri.select(columns: _*) + } + // decide join type based on match conditions or based on merge operation type + val joinType = if (mergeMatches == null && operationType != null) { + MergeOperationType.withName(operationType.toUpperCase) match { + case MergeOperationType.UPDATE | MergeOperationType.DELETE => + "inner" + case MergeOperationType.UPSERT => + "right_outer" + case MergeOperationType.INSERT => + null + } + } else { + decideJoinType + } - val joinColumns = mergeMatches.joinExpr.expr.collect { - case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty => - // Let's say the join condition will be something like A.id = B.id, then it will be an - // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will - // be a Literal(B.id). Since we need the column name here, we can directly check the left - // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an - // ArrayBuffer containing "A" and "id", since "id" is column name, we take - // nameparts.tail.head which gives us "id" column name. - unresolvedAttribute.nameParts.tail.head - }.distinct + val joinColumns = if (mergeMatches == null) { + Seq(keyColumn) + } else { + mergeMatches.joinExpr.expr.collect { + case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty => + // Let's say the join condition will be something like A.id = B.id, then it will be an + // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will + // be a Literal(B.id). Since we need the column name here, we can directly check the left + // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an + // ArrayBuffer containing "A" and "id", since "id" is column name, we take + // nameparts.tail.head which gives us "id" column name. + unresolvedAttribute.nameParts.tail.head + }.distinct + } // repartition the srsDs, if the target has bucketing and the bucketing columns contains join // columns val repartitionedSrcDs = - if (carbonTable.getBucketingInfo != null && - carbonTable.getBucketingInfo + if (targetCarbonTable.getBucketingInfo != null && + targetCarbonTable.getBucketingInfo .getListOfColumns .asScala .map(_.getColumnName).containsSlice(joinColumns)) { - srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges, + srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges, joinColumns.map(srcDS.col): _*) } else { srcDS + } + + // cache the source data as we will be scanning multiple times + repartitionedSrcDs.cache() + val deDuplicatedRecords = repartitionedSrcDs.count() + LOGGER.info(s"Number of records from source data: $deDuplicatedRecords") + // Create accumulators to log the stats + val stats = Stats(createLongAccumulator("insertedRows"), + createLongAccumulator("updatedRows"), + createLongAccumulator("deletedRows")) + + var finalCarbonFilesToScan: Array[String] = Array.empty[String] + // the pruning will happen when the join type is not full_outer, in case of full_outer, + // we will be needing all the records from left table which is target table, so no need to prune + // target table based on min max of source table. + val isMinMaxPruningEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED, + CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT).toBoolean + var didNotPrune = false + breakable { + if (isMinMaxPruningEnabled && joinType != null && !joinType.equalsIgnoreCase("full_outer")) { + // 1. get all the join columns of equal to conditions or equi joins + val targetKeyColumns = CarbonMergeDataSetUtil.getTargetTableKeyColumns(keyColumn, + targetDsAliasName, + targetCarbonTable, + mergeMatches) + val joinCarbonColumns = targetKeyColumns.collect { + case column => targetCarbonTable.getColumnByName(column) + } + + LOGGER + .info(s"Key columns for join are: ${ joinCarbonColumns.map(_.getColName).mkString(",") }") + + val columnToIndexMap: util.Map[String, Integer] = new util.LinkedHashMap[String, Integer] + // get the min max cache column and based on that determine the index to check in min-max + // array or Index Row + val minMaxColumns = targetCarbonTable.getMinMaxCachedColumnsInCreateOrder + if (minMaxColumns.size() != 0) { + if (minMaxColumns.size() == + targetCarbonTable.getTableInfo.getFactTable.getListOfColumns.size() || + minMaxColumns.size() == 1 && minMaxColumns.get(0).equalsIgnoreCase("All columns")) { + joinCarbonColumns.foreach { column => + if (column.isDimension) { + columnToIndexMap.put(column.getColName, column.getOrdinal) + } else { + columnToIndexMap.put(column.getColName, + targetCarbonTable.getVisibleDimensions.size() + column.getOrdinal) + } + } + } else { + // handing case where only some columns are present as cached columns and check if those + // columns has the target key columns or join columns + val joinColumnsPresentInMinMaxCacheCols = joinCarbonColumns.map(_.getColName) + .intersect(minMaxColumns.asScala.toSet) + if (joinColumnsPresentInMinMaxCacheCols.isEmpty || + joinColumnsPresentInMinMaxCacheCols.size == joinCarbonColumns.size) { + // 1. if none of the join columns are present in cache columns, then all blocklets + // will be selected, so pruning is not required + // 2. when one of the columns is not present in cache columns, no need to prune, as it + // may lead to wrong data due to different filter conditions like OR + didNotPrune = true + break() + } + } + } + + // get the splits required, which will also load the cache based on the configuration either + // in index server or driver + val columnMinMaxInBlocklet: util.LinkedHashMap[String, util.List[FilePathMinMaxVO]] = + new util.LinkedHashMap[String, util.List[FilePathMinMaxVO]] + val colToSplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]] = + CarbonMergeDataSetUtil.getSplitsAndLoadToCache(targetCarbonTable, + repartitionedSrcDs, + columnMinMaxInBlocklet, + columnToIndexMap, + sparkSession) + + LOGGER.info("Finished getting splits from driver or index server") + + // 2. get the tuple of filepath, min, max of the columns required, the min max should be + // converted to actual value based on the datatype logic collection to store only block and + // block level min and max + val fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, + (AnyRef, AnyRef)], CarbonColumn)] = + mutable.ArrayBuffer.empty[(mutable.Map[String, (AnyRef, AnyRef)], CarbonColumn)] + + val joinColumnsToComparatorMap: + mutable.LinkedHashMap[CarbonColumn, SerializableComparator] = + mutable.LinkedHashMap.empty[CarbonColumn, SerializableComparator] + joinCarbonColumns.map { joinColumn => + val joinDataType = joinColumn.getDataType + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) && + (joinDataType != DataTypes.DATE) + val comparator = if (isPrimitiveAndNotDate) { + Comparator.getComparator(joinDataType) + } else if (joinDataType == DataTypes.STRING) { + null + } else { + Comparator.getComparatorByDataTypeForMeasure(joinDataType) + } + joinColumnsToComparatorMap += (joinColumn -> comparator) + } + + // 3. prepare (filepath, (min, max)) at a block level. + CarbonMergeDataSetUtil.addFilePathAndMinMaxTuples(colToSplitsFilePathAndMinMaxMap, + targetCarbonTable, + joinColumnsToComparatorMap, + fileMinMaxMapListOfAllJoinColumns) + + // 4. prepare mapping of column and a range tree based on filepath, min and max for that + // column. Here assumption is join expression columns will be less in actual use case. + // Basically a primary column + val joinColumnToTreeMapping: mutable.LinkedHashMap[CarbonColumn, BlockMinMaxTree] = + mutable.LinkedHashMap.empty[CarbonColumn, BlockMinMaxTree] + fileMinMaxMapListOfAllJoinColumns.foreach { case (fileMinMaxMap, joinCarbonColumn) => + val joinDataType = joinCarbonColumn.getDataType + val isDimension = joinCarbonColumn.isDimension + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) && + (joinDataType != DataTypes.DATE) + val comparator = joinColumnsToComparatorMap(joinCarbonColumn) + val rangeIntervalTree = new BlockMinMaxTree(isPrimitiveAndNotDate, + isDimension, joinDataType, comparator) + fileMinMaxMap.foreach { case (filePath, minMax) => + rangeIntervalTree.insert(new MinMaxNode(filePath, minMax._1, minMax._2)) + } + joinColumnToTreeMapping += ((joinCarbonColumn, rangeIntervalTree)) + } + + // 5.from srcRDD, do map and then for each row search in min max tree prepared above and + // find the file paths to scan. + finalCarbonFilesToScan = CarbonMergeDataSetUtil.getFilesToScan(joinCarbonColumns, + joinColumnToTreeMapping, + repartitionedSrcDs) + + LOGGER.info(s"Finished min-max pruning. Carbondata files to scan during merge is: ${ + finalCarbonFilesToScan.length}") + } + } + + // check if its just upsert/update/delete/insert operation and go to UpsertHandler + if (mergeMatches == null && operationType != null) { + val isInsertOperation = operationType.equalsIgnoreCase(MergeOperationType.INSERT.toString) + val frame = if (isMinMaxPruningEnabled && !didNotPrune) { + // if min-max pruning is enabled then we need to add blockUDFs filter to scan only the + // pruned carbondata files from target carbon table. + if (!isInsertOperation) { + targetDs + .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()")) + .where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')") + .join(repartitionedSrcDs.select(keyColumn), + expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"), + joinType) + } else { + null + } + } else { + if (!isInsertOperation) { + targetDs + .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()")) + .join(repartitionedSrcDs.select(keyColumn), + expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"), + joinType) + } else { + null + } + } + val mergeHandler: MergeHandler = + MergeOperationType.withName(operationType.toUpperCase) match { + case MergeOperationType.UPSERT => + UpsertHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs) + case MergeOperationType.UPDATE => + UpdateHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs) + case MergeOperationType.DELETE => + DeleteHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs) + case MergeOperationType.INSERT => + InsertHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs) + } + + // execute merge handler + mergeHandler.handleMerge() + LOGGER.info( + " Time taken to merge data :: " + (System.currentTimeMillis() - st)) + // clear the cached src + repartitionedSrcDs.unpersist() + return Seq() } + // validate the merge matches and actions. + validateMergeActions(mergeMatches, targetDsOri, sparkSession) + val hasDelAction = mergeMatches.matchList + .exists(_.getActions.exists(_.isInstanceOf[DeleteAction])) + val hasUpdateAction = mergeMatches.matchList + .exists(_.getActions.exists(_.isInstanceOf[UpdateAction])) + val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches) + // Update the update mapping with unfilled columns.From here on system assumes all mappings + // are existed. + mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs) + // Lets generate all conditions combinations as one column and add them as 'status'. + val condition = generateStatusColumnWithAllCombinations(mergeMatches) + // Add the getTupleId() udf to get the tuple id to generate delete delta. - val frame = + val frame = if (isMinMaxPruningEnabled && !didNotPrune) { targetDs .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()")) .withColumn("exist_on_target", lit(1)) + .where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')") .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)), mergeMatches.joinExpr, joinType) .withColumn(status_on_mergeds, condition) + } else { + targetDs + .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()")) + .withColumn("exist_on_target", lit(1)) + .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)), + mergeMatches.joinExpr, + joinType) + .withColumn(status_on_mergeds, condition) + } if (LOGGER.isDebugEnabled) { frame.explain() } val tableCols = - carbonTable.getCreateOrderColumn.asScala.map(_.getColName). + targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName). filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) val header = tableCols.mkString(",") + val frameWithoutStatusCol = frame.drop(status_on_mergeds) val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { m => m.getActions.map { case u: UpdateAction => MergeProjection(tableCols, - status_on_mergeds, - frame, + frameWithoutStatusCol, relations.head, sparkSession, u) case i: InsertAction => MergeProjection(tableCols, - status_on_mergeds, - frame, + frameWithoutStatusCol, relations.head, sparkSession, i) case d: DeleteAction => MergeProjection(tableCols, - status_on_mergeds, - frame, + frameWithoutStatusCol, relations.head, sparkSession, d) @@ -170,22 +397,17 @@ case class CarbonMergeDataSetCommand( }.filter(_ != null) } - val st = System.currentTimeMillis() - // Create accumulators to log the stats - val stats = Stats(createLongAccumulator("insertedRows"), - createLongAccumulator("updatedRows"), - createLongAccumulator("deletedRows")) val targetSchema = StructType(tableCols.map { f => relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get } ++ Seq(StructField(status_on_mergeds, IntegerType))) - val (processedRDD, deltaPath) = processIUD(sparkSession, frame, carbonTable, projections, + val (processedRDD, deltaPath) = processIUD(sparkSession, frame, targetCarbonTable, projections, targetSchema, stats) val executorErrors = ExecutionErrors(FailureCauses.NONE, "") val trxMgr = TranxManager(System.currentTimeMillis()) val mutationAction = MutationActionFactory.getMutationAction(sparkSession, - carbonTable, hasDelAction, hasUpdateAction, + targetCarbonTable, hasDelAction, hasUpdateAction, insertHistOfUpdate, insertHistOfDelete) val loadDF = Dataset.ofRows(sparkSession, @@ -198,12 +420,8 @@ case class CarbonMergeDataSetCommand( val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath) val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr) FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath)) - if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava, - carbonTable, - trxMgr.getLatestTrx.toString, false, false)) { - LOGGER.error("writing of update status file failed") - throw new CarbonMergeDataSetException("writing of update status file failed") - } + MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, + trxMgr.getLatestTrx, tuple) Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx, executorErrors, tuple._2, Option.empty)) } else { @@ -211,28 +429,14 @@ case class CarbonMergeDataSetCommand( } val dataFrame = loadDF.select(tableCols.map(col): _*) - CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName), - tableName = carbonTable.getTableName, - options = Map("fileheader" -> header), - isOverwriteTable = false, - dataFrame.queryExecution.logical, - carbonTable.getTableInfo, - Map.empty, - Map.empty, - new OperationContext, - updateTableModel - ).run(sparkSession) + MergeUtil.insertDataToTargetTable(sparkSession, + targetCarbonTable, + header, + updateTableModel, + dataFrame) if (hasDelAction && count == 0) { - val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath - .getTableStatusFilePath(carbonTable.getTablePath)) - CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => - new Segment(loadMetadataDetail.getMergedLoadName, - loadMetadataDetail.getSegmentFile)).toSet.asJava, - carbonTable, - trxMgr.getLatestTrx.toString, - true, - true, new util.ArrayList[Segment]()) + MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, trxMgr.getLatestTrx) } LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}") LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}") @@ -240,12 +444,14 @@ case class CarbonMergeDataSetCommand( LOGGER.info( " Time taken to merge data :: " + (System.currentTimeMillis() - st)) - // Load the history table if the insert history table action is added by user. - HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, carbonTable, + // Load the history table if the insert history table action is added by user. + HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, targetCarbonTable, trxMgr, mutationAction, mergeMatches) // Do IUD Compaction. HorizontalCompaction.tryHorizontalCompaction( - sparkSession, carbonTable) + sparkSession, targetCarbonTable) + // clear the cached src + repartitionedSrcDs.unpersist() Seq.empty } @@ -289,7 +495,6 @@ case class CarbonMergeDataSetCommand( targetSchema: StructType, stats: Stats): (RDD[InternalRow], String) = { val frameCols = frame.queryExecution.analyzed.output - val status = frameCols.length - 1 val tupleId = frameCols.zipWithIndex .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2 val insertedRows = stats.insertedRows @@ -308,7 +513,10 @@ case class CarbonMergeDataSetCommand( StructField(status_on_mergeds, IntegerType))) val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema) val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration) - (frame.rdd.mapPartitionsWithIndex { case (index, iter) => + val expr = frame.queryExecution.sparkPlan.asInstanceOf[ProjectExec].projectList.last + val frameWithoutStatusCol = frame.drop(status_on_mergeds) + val colSchemaWithoutStatusCol = frameWithoutStatusCol.queryExecution.logical.output + (frameWithoutStatusCol.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iter) => val confB = config.value.value val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index) val attemptID = new TaskAttemptID(task, index) @@ -318,16 +526,23 @@ case class CarbonMergeDataSetCommand( val projLen = projections.length new Iterator[InternalRow] { val queue = new util.LinkedList[InternalRow]() - override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else { - writer.close() - false + + override def hasNext: Boolean = { + if (!queue.isEmpty || iter.hasNext) { + true + } else { + writer.close() + false + } } override def next(): InternalRow = { - if (!queue.isEmpty) return queue.poll() + if (!queue.isEmpty) { + return queue.poll() + } val row = iter.next() - val rowWithSchema = row.asInstanceOf[GenericRowWithSchema] - val is = row.get(status) + val is = CarbonToSparkAdapter.evaluateWithPredicate(expr, + colSchemaWithoutStatusCol, row) var isUpdate = false var isDelete = false var insertedCount = 0 @@ -335,18 +550,24 @@ case class CarbonMergeDataSetCommand( val isInt = is.asInstanceOf[Int] var i = 0 while (i < projLen) { - if ((isInt & (1 << i)) == (1 << i)) projections(i).foreach { p => - if (!p.isDelete) { - if (p.isUpdate) isUpdate = p.isUpdate - queue.add(p(rowWithSchema)) - insertedCount += 1 - } else isDelete = true + if ((isInt & (1 << i)) == (1 << i)) { + projections(i).foreach { p => + if (!p.isDelete) { + if (p.isUpdate) { + isUpdate = p.isUpdate + } + queue.add(p.getInternalRowFromIndex(row, is.asInstanceOf[Int])) + insertedCount += 1 + } else { + isDelete = true + } + } } i = i + 1 } } val newArray = new Array[Any](2) - newArray(0) = UTF8String.fromString(row.getString(tupleId)) + newArray(0) = row.getUTF8String(tupleId) if (isUpdate && isDelete) { newArray(1) = 102 writer.write(new GenericInternalRow(newArray)) @@ -364,16 +585,15 @@ case class CarbonMergeDataSetCommand( writer.write(new GenericInternalRow(newArray)) } insertedRows.add(insertedCount) - if (!queue.isEmpty) queue.poll() else { + if (!queue.isEmpty) { + queue.poll() + } else { val values = new Array[Any](targetSchema.length) new GenericInternalRow(values) } } } - }.filter { row => - val status = row.get(targetSchema.length-1, IntegerType) - status != null - }, path) + }.filter { row => !row.isNullAt(targetSchema.length - 1)}, path) } private def createLongAccumulator(name: String) = { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala new file mode 100644 index 00000000000..d719152108f --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.EqualTo +import org.apache.spark.sql.execution.CastExpressionOptimization +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.types.DateType + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.index.{IndexChooser, IndexInputFormat, IndexStoreManager, IndexUtil} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.mutate.{CdcVO, FilePathMinMaxVO} +import org.apache.carbondata.core.range.BlockMinMaxTree +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.comparator.SerializableComparator +import org.apache.carbondata.indexserver.IndexServer +import org.apache.carbondata.spark.util.CarbonSparkUtil + +/** + * The utility class for Merge operations + */ +object CarbonMergeDataSetUtil { + + /** + * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of + * the src dataset + * @param carbonTable target carbon table object + * @param colToSplitsFilePathAndMinMaxMap CarbonInputSplit whose min max cached in driver or + * the index server + * @param joinColumnsToComparatorMap This map contains the column to comparator mapping + * which will help in compare and update min max + * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the + * join columns involved + */ + def addFilePathAndMinMaxTuples( + colToSplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]], + carbonTable: CarbonTable, + joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator], + fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)], + CarbonColumn)]): Unit = { + joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) => + val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] = + collection.mutable.Map.empty[String, (AnyRef, AnyRef)] + val joinDataType = joinColumn.getDataType + val isDimension = joinColumn.isDimension + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) && + (joinDataType != DataTypes.DATE) + colToSplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach { + filePathMinMiax => + val filePath = filePathMinMiax.getFilePath + val minBytes = filePathMinMiax.getMin + val maxBytes = filePathMinMiax.getMax + val uniqBlockPath = if (carbonTable.isHivePartitionTable) { + // While data loading to SI created on Partition table, on + // partition directory, '/' will be + // replaced with '#', to support multi level partitioning. For example, BlockId will be + // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be + // replaced by '#' in place of '/', to match and prune data on SI table. + CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier, + filePath, + "", + true, + false, + true) + } else { + filePath.substring(filePath.lastIndexOf("/Part") + 1) + } + if (isDimension) { + if (isPrimitiveAndNotDate) { + val minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes, + joinDataType) + val maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes, + joinDataType) + // check here if present in map, if it is, compare and update min and amx + if (fileMinMaxMap.contains(uniqBlockPath)) { + val isMinLessThanMin = + comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0 + val isMaxMoreThanMax = + comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0 + updateMapIfRequiredBasedOnMinMax(fileMinMaxMap, + minValue, + maxValue, + uniqBlockPath, + isMinLessThanMin, + isMaxMoreThanMax) + } else { + fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue)) + } + } else { + if (fileMinMaxMap.contains(uniqBlockPath)) { + val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(fileMinMaxMap(uniqBlockPath)._1 + .asInstanceOf[String].getBytes(), minBytes) > 0 + val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(maxBytes, fileMinMaxMap(uniqBlockPath)._2 + .asInstanceOf[String].getBytes()) > 0 + updateMapIfRequiredBasedOnMinMax(fileMinMaxMap, + new String(minBytes), + new String(maxBytes), + uniqBlockPath, + isMinLessThanMin, + isMaxMoreThanMax) + } else { + fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new String(maxBytes))) + } + } + } else { + val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes, joinDataType) + val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes, joinDataType) + if (fileMinMaxMap.contains(uniqBlockPath)) { + val isMinLessThanMin = + comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0 + val isMaxMoreThanMin = + comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0 + updateMapIfRequiredBasedOnMinMax(fileMinMaxMap, + minValue, + maxValue, + uniqBlockPath, + isMinLessThanMin, + isMaxMoreThanMin) + } else { + fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue)) + } + } + } + fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn)) + } + } + + /** + * This method updates the min max map of the block if the value is less than min or more + * than max + */ + private def updateMapIfRequiredBasedOnMinMax( + fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)], + minValue: AnyRef, + maxValue: AnyRef, + uniqBlockPath: String, + isMinLessThanMin: Boolean, + isMaxMoreThanMin: Boolean): Unit = { + (isMinLessThanMin, isMaxMoreThanMin) match { + case (true, true) => fileMinMaxMap(uniqBlockPath) = (minValue, maxValue) + case (true, false) => fileMinMaxMap(uniqBlockPath) = (minValue, + fileMinMaxMap(uniqBlockPath)._2) + case (false, true) => fileMinMaxMap(uniqBlockPath) = (fileMinMaxMap(uniqBlockPath)._1, + maxValue) + case _ => + } + } + + /** + * This method returns the partitions required to scan in the target table based on the + * partitions present in the src table or dataset + */ + def getPartitionSpecToConsiderForPruning( + sparkSession: SparkSession, + srcCarbonTable: CarbonTable, + targetCarbonTable: CarbonTable, + identifier: TableIdentifier = null): util.List[PartitionSpec] = { + val partitionsToConsider = if (targetCarbonTable.isHivePartitionTable) { + // handle the case of multiple partition columns in src and target and subset of + // partition columns + val srcTableIdentifier = if (identifier == null) { + TableIdentifier(srcCarbonTable.getTableName, Some(srcCarbonTable.getDatabaseName)) + } else { + identifier + } + val srcPartitions = CarbonFilters.getPartitions( + Seq.empty, + sparkSession, + srcTableIdentifier) + .map(_.toList.flatMap(_.getPartitions.asScala)) + .orNull + // get all the partitionSpec of target table which intersects with source partitions + // example if the target has partitions as a=1/b=2/c=3, and src has e=1/a=1/b=2/d=4 + // we will consider the specific target partition as intersect gives results and also we + // don't want to go very fine grain for partitions as location will be a single for nested + // partitions. + CarbonFilters.getPartitions( + Seq.empty, + sparkSession, + TableIdentifier( + targetCarbonTable.getTableName, + Some(targetCarbonTable.getDatabaseName))).map(_.toList.filter { + partitionSpec => + partitionSpec.getPartitions.asScala.intersect(srcPartitions).nonEmpty + }).orNull + } else { + null + } + partitionsToConsider.asJava + } + + /** + * This method get the files to scan by searching the tree prepared for each column with its + * min-max + * @param joinCarbonColumns join carbon columns + * @param joinColumnToTreeMapping mapping of join column to interval tree + * @param repartitionedSrcDs source dataset + * @return carbondata files required to scan + */ + def getFilesToScan( + joinCarbonColumns: mutable.Set[CarbonColumn], + joinColumnToTreeMapping: mutable.LinkedHashMap[CarbonColumn, BlockMinMaxTree], + repartitionedSrcDs: Dataset[Row]): Array[String] = { + var finalCarbonFilesToScan: Array[String] = Array.empty[String] + val timeStampFormat = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT); + joinCarbonColumns.foreach { joinColumn => + val srcDeduplicatedRDD = repartitionedSrcDs.select(joinColumn.getColName).rdd + finalCarbonFilesToScan ++= srcDeduplicatedRDD.mapPartitions { iter => + val filesPerTask = new util.HashSet[String]() + new Iterator[util.HashSet[String]] { + override def hasNext: Boolean = { + iter.hasNext + } + + override def next(): util.HashSet[String] = { + val row = iter.next() + joinColumnToTreeMapping + .foreach { joinColumnWithRangeTree => + val joinCarbonColumn = joinColumnWithRangeTree._1 + val rangeIntervalTree = joinColumnWithRangeTree._2 + val joinDataType = joinCarbonColumn.getDataType + val isDimension = joinCarbonColumn.isDimension + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) && + (joinDataType != DataTypes.DATE) + val fieldIndex = row.fieldIndex(joinCarbonColumn.getColName) + val fieldValue = if (!row.isNullAt(fieldIndex)) { + if (isDimension) { + if (joinDataType != DataTypes.DATE) { + DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(row + .getAs(fieldIndex) + .toString, + joinDataType, timeStampFormat) + } else { + // if date, then get the key from direct dict generator and then get bytes + val actualValue = row.getAs(fieldIndex) + val dateSurrogateValue = CastExpressionOptimization + .typeCastStringToLong(actualValue, DateType).asInstanceOf[Int] + ByteUtil.convertIntToBytes(dateSurrogateValue) + } + } else { + CarbonUtil.getValueAsBytes(joinDataType, row.getAs(fieldIndex)) + } + } else { + // here handling for null values + val value: Long = 0 + if (isDimension) { + if (isPrimitiveAndNotDate) { + CarbonCommonConstants.EMPTY_BYTE_ARRAY + } else { + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY + } + } else { + val nullValueForMeasure = if ((joinDataType eq DataTypes.BOOLEAN) || + (joinDataType eq DataTypes.BYTE)) { + value.toByte + } else if (joinDataType eq DataTypes.SHORT) { + value.toShort + } else if (joinDataType eq DataTypes.INT) { + value.toInt + } else if ((joinDataType eq DataTypes.LONG) || + (joinDataType eq DataTypes.TIMESTAMP)) { + value + } else if (joinDataType eq DataTypes.DOUBLE) { + 0d + } else if (joinDataType eq DataTypes.FLOAT) { + 0f + } else if (DataTypes.isDecimal(joinDataType)) { + value + } + CarbonUtil.getValueAsBytes(joinDataType, nullValueForMeasure) + } + } + rangeIntervalTree.getMatchingFiles(fieldValue, filesPerTask) + } + filesPerTask + } + } + }.flatMap(_.asScala.toList).map(filePath => (filePath, 0)).reduceByKey((m, n) => m + n) + .collect().map(_._1) + } + finalCarbonFilesToScan + } + + /** + * This method gets the key columnsrequired for joining based on merge condition + * @param keyColumn key columns if specified in the upsert APIs + * @param targetDsAliasName target table alias name + * @param targetCarbonTable target carbon table + * @param mergeMatches merge match conditions from the user + * @return set of key columns required to be used in pruning + */ + def getTargetTableKeyColumns( + keyColumn: String, + targetDsAliasName: String, + targetCarbonTable: CarbonTable, + mergeMatches: MergeDataSetMatches): mutable.Set[String] = { + var targetKeyColumns: mutable.Set[String] = mutable.Set.empty[String] + if (mergeMatches != null) { + mergeMatches.joinExpr.expr.collect { + case EqualTo(left, right) => + left match { + case attribute: UnresolvedAttribute if right.isInstanceOf[UnresolvedAttribute] => + val leftAlias = attribute.nameParts.head + if (targetDsAliasName != null) { + if (targetDsAliasName.equalsIgnoreCase(leftAlias)) { + targetKeyColumns += attribute.nameParts.tail.head + } else { + targetKeyColumns += + right.asInstanceOf[UnresolvedAttribute].nameParts.tail.head + } + } else { + if (leftAlias.equalsIgnoreCase(targetCarbonTable.getTableName)) { + targetKeyColumns += attribute.nameParts.tail.head + } else { + targetKeyColumns += + right.asInstanceOf[UnresolvedAttribute].nameParts.tail.head + } + } + case _ => + } + } + targetKeyColumns + } else { + targetKeyColumns += keyColumn + } + } + + /** + * This method get the blocklets and cache either in driver or index server and returns column + * FilePathMinMaxVO object + * @param targetCarbonTable target carbondata table + * @param repartitionedSrcDs source dataset + * @param columnMinMaxInBlocklet mapping of column to it's min max in each blocklet + * @param columnToIndexMap mapping of column to its index to fetch from the index row + * @param sparkSession spark session + * @return mapping of column to FilePathMinMaxVO object which contains filepath and min, max + */ + def getSplitsAndLoadToCache( + targetCarbonTable: CarbonTable, + repartitionedSrcDs: Dataset[Row], + columnMinMaxInBlocklet: util.LinkedHashMap[String, util.List[FilePathMinMaxVO]], + columnToIndexMap: util.Map[String, Integer], + sparkSession: SparkSession): mutable.Map[String, util.List[FilePathMinMaxVO]] = { + val isDistributedPruningEnabled: Boolean = CarbonProperties.getInstance + .isDistributedPruningEnabled(targetCarbonTable.getDatabaseName, + targetCarbonTable.getTableName) + // if the index server is enabled, call index server to cache the index and get all the + // blocklets of the target table. If the index server disabled, just call the getSplits of + // the driver side to cache and get the splits. These CarbonInputSplits basically contain + // the filePaths and the min max of each columns. + val ssm = new SegmentStatusManager(targetCarbonTable.getAbsoluteTableIdentifier) + val validSegments = ssm.getValidAndInvalidSegments.getValidSegments + val defaultIndex = IndexStoreManager.getInstance.getDefaultIndex(targetCarbonTable) + + // 1. identify if src is partition table, if both src and target target for partition table + // on same column(s), then only get the src partitions and send those partitions to scan in + // target handling only for carbon src dataset now + val srcDataSetRelations = CarbonSparkUtil.collectCarbonRelation(repartitionedSrcDs.logicalPlan) + val partitionsToConsider = + if (srcDataSetRelations.lengthCompare(1) == 0 && + srcDataSetRelations.head.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val srcCarbonTable = srcDataSetRelations.head.carbonRelation.carbonTable + if (srcCarbonTable.isHivePartitionTable) { + CarbonMergeDataSetUtil.getPartitionSpecToConsiderForPruning( + sparkSession, + srcCarbonTable, + targetCarbonTable) + } else { + null + } + } else { + val nonCarbonRelations = CarbonSparkUtil.collectNonCarbonRelation(repartitionedSrcDs + .logicalPlan) + // when the relations are not empty, it means the source dataset is prepared from table + if (nonCarbonRelations.nonEmpty && + nonCarbonRelations.head.catalogTable.isDefined && + nonCarbonRelations.head.catalogTable.get.partitionColumnNames != null) { + CarbonMergeDataSetUtil.getPartitionSpecToConsiderForPruning( + sparkSession, + null, + targetCarbonTable, + nonCarbonRelations.head.catalogTable.get.identifier) + } else { + null + } + } + + if (isDistributedPruningEnabled) { + val indexFormat = new IndexInputFormat(targetCarbonTable, null, validSegments, + Nil.asJava, partitionsToConsider, false, null, false, false) + val cdcVO = new CdcVO(columnToIndexMap) + indexFormat.setCdcVO(cdcVO) + IndexServer.getClient.getSplits(indexFormat).getExtendedBlocklets(indexFormat).asScala + .flatMap { blocklet => + blocklet.getColumnToMinMaxMapping.asScala.map { + case (columnName, minMaxListWithFilePath) => + val filePathMinMaxList = columnMinMaxInBlocklet.get(columnName) + if (filePathMinMaxList != null) { + filePathMinMaxList.addAll(minMaxListWithFilePath) + columnMinMaxInBlocklet.put(columnName, filePathMinMaxList) + } else { + columnMinMaxInBlocklet.put(columnName, minMaxListWithFilePath) + } + } + } + columnMinMaxInBlocklet.asScala + } else { + if (targetCarbonTable.isTransactionalTable) { + val indexExprWrapper = IndexChooser.getDefaultIndex(targetCarbonTable, null) + IndexUtil.loadIndexes(targetCarbonTable, indexExprWrapper, validSegments) + } + val blocklets = defaultIndex.prune(validSegments, null, partitionsToConsider).asScala + columnToIndexMap.asScala.foreach { + case (columnName, index) => + val filePathAndMinMaxList = new util.ArrayList[FilePathMinMaxVO]() + blocklets.map { blocklet => + val filePathMinMax = new FilePathMinMaxVO(blocklet.getFilePath, + CarbonUtil.getMinMaxValue(blocklet + .getInputSplit + .getIndexRow, + BlockletIndexRowIndexes.MIN_VALUES_INDEX)(index), + CarbonUtil.getMinMaxValue(blocklet + .getInputSplit + .getIndexRow, + BlockletIndexRowIndexes.MAX_VALUES_INDEX)(index)) + filePathAndMinMaxList.add(filePathMinMax) + } + columnMinMaxInBlocklet.put(columnName, filePathAndMinMaxList) + } + columnMinMaxInBlocklet.asScala + } + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala new file mode 100644 index 00000000000..e75f11b265b --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.avro.AvroFileFormatFactory +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel} +import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{StringType, StructField} +import org.apache.spark.sql.util.SparkSQLUtil + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.SegmentUpdateDetails +import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.spark.util.CarbonSparkUtil + +/** + * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT + */ +abstract class MergeHandler( + sparkSession: SparkSession, + frame: DataFrame, + targetCarbonTable: CarbonTable, + stats: Stats, + srcDS: DataFrame) { + + protected def performTagging: (RDD[Row], String) = { + val tupleId = frame.queryExecution.analyzed.output.zipWithIndex + .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2 + val schema = + org.apache.spark.sql.types.StructType(Seq( + StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType))) + val job = CarbonSparkUtil.createHadoopJob() + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + val insertedRows = stats.insertedRows + val updatedRows = stats.updatedRows + val uuid = UUID.randomUUID.toString + job.setJobID(new JobID(uuid, 0)) + val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro" + FileOutputFormat.setOutputPath(job, new Path(path)) + val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema) + val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration) + frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) => + val confB = config.value.value + val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index) + val attemptID = new TaskAttemptID(task, index) + val context = new TaskAttemptContextImpl(confB, attemptID) + val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString, + schema, context) + new Iterator[InternalRow] { + override def hasNext: Boolean = { + if (iterator.hasNext) { + true + } else { + writer.close() + false + } + } + + override def next(): InternalRow = { + val row = iterator.next() + val newArray = new Array[Any](1) + val tupleID = row.getUTF8String(tupleId) + if (tupleID == null) { + insertedRows.add(1) + } else { + newArray(0) = tupleID + writer.write(new GenericInternalRow(newArray)) + updatedRows.add(1) + } + null + } + } + }.count() + val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path) + (deltaRdd, path) + } + + protected def triggerAction( + factTimestamp: Long, + executorErrors: ExecutionErrors, + deltaRdd: RDD[Row], + deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + val tuple = MergeUtil.triggerAction(sparkSession, + targetCarbonTable, + factTimestamp, + executorErrors, + deltaRdd) + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath)) + MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple) + tuple + } + + protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = { + val tableCols = + targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName). + filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + val header = tableCols.mkString(",") + val dataFrame = srcDS.select(tableCols.map(col): _*) + MergeUtil.insertDataToTargetTable(sparkSession, + targetCarbonTable, + header, + updateTableModel, + dataFrame) + } + + protected def tryHorizontalCompaction(): Unit = { + // Do IUD Compaction. + HorizontalCompaction.tryHorizontalCompaction( + sparkSession, targetCarbonTable) + } + + def handleMerge() +} + +case class UpdateHandler( + sparkSession: SparkSession, + frame: DataFrame, + targetCarbonTable: CarbonTable, + stats: Stats, + srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) { + + override def handleMerge(): Unit = { + assert(frame != null, "The dataframe used to perform merge can be only for insert operation") + val factTimestamp = System.currentTimeMillis() + val executorErrors = ExecutionErrors(FailureCauses.NONE, "") + val (deltaRdd, path) = performTagging + if (deltaRdd.isEmpty()) { + return + } + val tuple = triggerAction(factTimestamp, executorErrors, deltaRdd, path) + val updateTableModel = Some(UpdateTableModel(isUpdate = true, factTimestamp, + executorErrors, tuple._2, Option.empty)) + insertDataToTargetTable(updateTableModel) + tryHorizontalCompaction() + } + +} + +case class DeleteHandler( + sparkSession: SparkSession, + frame: DataFrame, + targetCarbonTable: CarbonTable, + stats: Stats, + srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) { + override def handleMerge(): Unit = { + assert(frame != null, "The dataframe used to perform merge can be only for insert operation") + val factTimestamp = System.currentTimeMillis() + val executorErrors = ExecutionErrors(FailureCauses.NONE, "") + val (deleteRDD, path) = performTagging + if (deleteRDD.isEmpty()) { + return + } + triggerAction(factTimestamp, executorErrors, deleteRDD, path) + MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, factTimestamp) + tryHorizontalCompaction() + } +} + +case class InsertHandler( + sparkSession: SparkSession, + frame: DataFrame, + targetCarbonTable: CarbonTable, + stats: Stats, + srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) { + override def handleMerge(): Unit = { + insertDataToTargetTable(None) + } +} + +case class UpsertHandler( + sparkSession: SparkSession, + frame: DataFrame, + targetCarbonTable: CarbonTable, + stats: Stats, + srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) { + override def handleMerge(): Unit = { + assert(frame != null, "The dataframe used to perform merge can be only for insert operation") + val factTimestamp = System.currentTimeMillis() + val executorErrors = ExecutionErrors(FailureCauses.NONE, "") + val (updateDataRDD, path) = performTagging + val tuple = triggerAction(factTimestamp, executorErrors, updateDataRDD, path) + val updateTableModel = Some(UpdateTableModel(isUpdate = true, factTimestamp, + executorErrors, tuple._2, Option.empty)) + insertDataToTargetTable(updateTableModel) + tryHorizontalCompaction() + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeOperationType.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeOperationType.scala new file mode 100644 index 00000000000..42ea16f43e2 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeOperationType.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +/** + * Operation Type for the merge APIs + */ +object MergeOperationType extends Enumeration { + + type MergeOperationType = Value + + val UPSERT: MergeOperationType.Value = Value("UPSERT") + + val UPDATE: MergeOperationType.Value = Value("UPDATE") + + val DELETE: MergeOperationType.Value = Value("DELETE") + + val INSERT: MergeOperationType.Value = Value("INSERT") + +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala index 68458590c7f..96b765a7fe5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution.command.mutation.merge -import java.sql.{Date, Timestamp} +import scala.collection.mutable -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonToSparkAdapter, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, GenericRowWithSchema, InterpretedMutableProjection, Projection} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.types.StructType /** * Creates the projection for each action like update,delete or insert. */ case class MergeProjection( @transient tableCols: Seq[String], - @transient statusCol: String, @transient ds: Dataset[Row], @transient rltn: CarbonDatasourceHadoopRelation, @transient sparkSession: SparkSession, @transient mergeAction: MergeAction) { - private val cutOffDate = Integer.MAX_VALUE >> 1 - val isUpdate: Boolean = mergeAction.isInstanceOf[UpdateAction] val isDelete: Boolean = mergeAction.isInstanceOf[DeleteAction] - val (expressions, inputSchema) = generateProjection - lazy val projection = new InterpretedMutableProjection(expressions, inputSchema) + val schema: StructType = ds.schema - def apply(row: GenericRowWithSchema): InternalRow = { - // TODO we can avoid these multiple conversions if this is added as a SparkPlan node. - val values = row.values.map { - case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s) - case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d) - case b: Array[Byte] => org.apache.spark.unsafe.types.UTF8String.fromBytes(b) - case d: Date => DateTimeUtils.fromJavaDate(d) - case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) - case value => value - } + val outputListOfDataset: Seq[Attribute] = ds.queryExecution.logical.output - projection(new GenericInternalRow(values)).asInstanceOf[GenericInternalRow] - } + val targetTableAttributes = rltn.carbonRelation.output - private def generateProjection: (Array[Expression], Seq[Attribute]) = { + val indexesToFetch: Seq[(Expression, Int)] = { val existingDsOutput = rltn.carbonRelation.schema.toAttributes + val literalToAttributeMap: collection.mutable.Map[Expression, Attribute] = + collection.mutable.Map.empty[Expression, Attribute] val colsMap = mergeAction match { - case UpdateAction(updateMap, isStar: Boolean) => updateMap - case InsertAction(insertMap, isStar: Boolean) => insertMap + case UpdateAction(updateMap, _: Boolean) => updateMap + case InsertAction(insertMap, _: Boolean) => insertMap case _ => null } if (colsMap != null) { @@ -72,23 +60,67 @@ case class MergeProjection( if (tableIndex < 0) { throw new CarbonMergeDataSetException(s"Mapping is wrong $colsMap") } - output(tableIndex) = v.expr.transform { + val resolvedValue = v.expr.transform { case a: Attribute if !a.resolved => ds.queryExecution.analyzed.resolveQuoted(a.name, sparkSession.sessionState.analyzer.resolver).get } + output(tableIndex) = resolvedValue + val attributesInResolvedVal = resolvedValue.collect { + case attribute: Attribute => attribute + } + if (attributesInResolvedVal.isEmpty) { + val resolvedKey = k.expr.collect { + case a: Attribute if !a.resolved => + targetTableAttributes.find(col => col.name + .equalsIgnoreCase(a.name)) + }.head.get + literalToAttributeMap += ((resolvedValue -> resolvedKey.asInstanceOf[Attribute])) + } expectOutput(tableIndex) = existingDsOutput.find(_.name.equalsIgnoreCase(tableCols(tableIndex))).get } if (output.contains(null)) { throw new CarbonMergeDataSetException(s"Not all columns are mapped") } - (output ++ Seq( - ds.queryExecution.analyzed.resolveQuoted(statusCol, - sparkSession.sessionState.analyzer.resolver).get), - ds.queryExecution.analyzed.output) + var exprToIndexMapping: scala.collection.mutable.Buffer[(Expression, Int)] = + collection.mutable.Buffer.empty + (ds.queryExecution.logical.output ++ targetTableAttributes).distinct.zipWithIndex.collect { + case (attribute, index) => + output.map { exp => + val attributeInExpression = exp.collect { + case attribute: Attribute => attribute + } + if (attributeInExpression.isEmpty) { + if (literalToAttributeMap(exp).semanticEquals(attribute)) { + exprToIndexMapping += ((exp, index)) + } else if (literalToAttributeMap(exp).name.equals(attribute.name)) { + exprToIndexMapping += ((exp, index)) + } + } else { + if (attributeInExpression.nonEmpty && + attributeInExpression.head.semanticEquals(attribute)) { + exprToIndexMapping += ((exp, index)) + } + } + } + } + output zip output.map(exprToIndexMapping.toMap) } else { - (null, null) + Seq.empty } } + + def getInternalRowFromIndex(row: InternalRow, status_on_mergeds: Int): InternalRow = { + val rowValues = row.toSeq(schema) + val requiredOutput = indexesToFetch.map { case (expr, index) => + if (expr.isInstanceOf[Attribute]) { + rowValues(index) + } else { + CarbonToSparkAdapter.evaluateWithPredicate(expr, outputListOfDataset, row) + } + } + InternalRow.fromSeq(requiredOutput ++ Seq(status_on_mergeds)) + } + } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala new file mode 100644 index 00000000000..3841b38acd8 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel} +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand +import org.apache.spark.sql.execution.command.mutation.DeleteExecution + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, SegmentUpdateDetails} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.OperationContext + + +object MergeUtil { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + /** + * This method triggers the merge action based calling merge handler + * @param carbonTable target carbon table + * @param factTimestamp the timestamp used for update and delete actions + * @param executorErrors executor errors returned from the operations like update and delete + * @param update RDD[ROW] which contains the rows to update or delete + * @return the segment list and the metadata details of the segments updated or deleted + */ + def triggerAction(sparkSession: SparkSession, + carbonTable: CarbonTable, + factTimestamp: Long, + executorErrors: ExecutionErrors, + update: RDD[Row]): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + val tuple1 = DeleteExecution.deleteDeltaExecutionInternal(Some(carbonTable.getDatabaseName), + carbonTable.getTableName, + sparkSession, update, + factTimestamp.toString, + isUpdateOperation = true, executorErrors, Some(0)) + MutationActionFactory.checkErrors(executorErrors) + val tupleProcessed1 = DeleteExecution.processSegments(executorErrors, tuple1._1, carbonTable, + factTimestamp.toString, tuple1._2) + MutationActionFactory.checkErrors(executorErrors) + tupleProcessed1 + } + + /** + * This method updates the segment status after update or delete operation + * @param targetCarbonTable target carbon table + * @param factTimeStamp timestamp to update in the status which is used in update/delete operation + * @param tuple contains the segment list and the metadata details of the segments updated/deleted + */ + def updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable: CarbonTable, + factTimeStamp: Long, + tuple: (util.List[SegmentUpdateDetails], Seq[Segment])): Unit = { + if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1, targetCarbonTable, + factTimeStamp.toString, false, false)) { + LOGGER.error("writing of update status file failed") + throw new CarbonMergeDataSetException("writing of update status file failed") + } + } + + /** + * This methods inserts the data to target carbon table. + * @param targetCarbonTable target carbon table to insert + * @param header header of the data to be inserted + * @param updateTableModel updated model if any for insert + * @param dataFrame datframe to write into target carbon table. + * @return the segmentID created afterthis insert operation. + */ + def insertDataToTargetTable(sparkSession: SparkSession, + targetCarbonTable: CarbonTable, + header: String, + updateTableModel: Option[UpdateTableModel], + dataFrame: DataFrame): Seq[Row] = { + CarbonInsertIntoCommand(databaseNameOp = Some(targetCarbonTable.getDatabaseName), + tableName = targetCarbonTable.getTableName, + options = Map("fileheader" -> header), + isOverwriteTable = false, + dataFrame.queryExecution.logical, + targetCarbonTable.getTableInfo, + Map.empty, + Map.empty, + new OperationContext, + updateTableModel + ).run(sparkSession) + } + + /** + * This method is to update the status only for delete operation. + * @param targetCarbonTable target carbon table + * @param factTimestamp timestamp to update in the status which is used in update/delete operation + * @return whether update status is successful or not + */ + def updateStatusIfJustDeleteOperation(targetCarbonTable: CarbonTable, + factTimestamp: Long): Boolean = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath + .getTableStatusFilePath(targetCarbonTable.getTablePath)) + CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => + new Segment(loadMetadataDetail.getMergedLoadName, + loadMetadataDetail.getSegmentFile)).toSet.asJava, + targetCarbonTable, + factTimestamp.toString, + true, + true, new util.ArrayList[Segment]()) + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala index d6ea288cac4..07034203e0e 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala @@ -52,16 +52,7 @@ abstract class MutationAction(sparkSession: SparkSession, carbonTable: CarbonTab val status = row.get(1) status != null && condition(status.asInstanceOf[Int]) } - val tuple1 = DeleteExecution.deleteDeltaExecutionInternal(Some(carbonTable.getDatabaseName), - carbonTable.getTableName, - sparkSession, update, - factTimestamp.toString, - true, executorErrors, Some(0)) - MutationActionFactory.checkErrors(executorErrors) - val tupleProcessed1 = DeleteExecution.processSegments(executorErrors, tuple1._1, carbonTable, - factTimestamp.toString, tuple1._2) - MutationActionFactory.checkErrors(executorErrors) - tupleProcessed1 + MergeUtil.triggerAction(sparkSession, carbonTable, factTimestamp, executorErrors, update) } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/UpsertBuilder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/UpsertBuilder.scala new file mode 100644 index 00000000000..7a372320e55 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/UpsertBuilder.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} + +class UpsertBuilder(existingDsOri: Dataset[Row], currDs: Dataset[Row], + keyColumn: String, operationType: String, sparkSession: SparkSession) { + + def build(): CarbonMergeDataSetCommand = { + CarbonMergeDataSetCommand(existingDsOri, currDs, null, keyColumn, operationType) + } + + def execute(): Unit = { + build().run(sparkSession) + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/udf/BlockPathsUDF.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/udf/BlockPathsUDF.scala new file mode 100644 index 00000000000..ee54c795c91 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/udf/BlockPathsUDF.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge.udf + +import org.apache.spark.sql.sources.Filter + +import org.apache.carbondata.common.annotations.InterfaceAudience + +@InterfaceAudience.Internal +class BlockPathsUDF extends (String => Boolean) with Serializable { + override def apply(v1: String): Boolean = { + v1.nonEmpty + } + +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index b6666689f04..09bdf18cefb 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import scala.util.Try import org.apache.spark.sql.{CarbonBoundReference, CarbonDatasourceHadoopRelation, CarbonEnv, - Dataset, SparkSession, SparkUnknownExpression} + SparkSession, SparkUnknownExpression} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, ArrayContains, Attribute, ScalaUDF, StartsWith, StringTrim} import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression} import org.apache.spark.sql.execution.CastExpressionOptimization +import org.apache.spark.sql.execution.command.mutation.merge.udf.BlockPathsUDF import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonSessionCatalogUtil} import org.apache.spark.sql.types.{ArrayType, BooleanType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructType, TimestampType} @@ -47,9 +48,9 @@ import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, LiteralExpression, MatchExpression} -import org.apache.carbondata.core.scan.expression.conditional.{EqualToExpression, - GreaterThanEqualToExpression, GreaterThanExpression, ImplicitExpression, InExpression, - LessThanEqualToExpression, LessThanExpression, ListExpression, NotEqualsExpression, +import org.apache.carbondata.core.scan.expression.conditional.{CDCBlockImplicitExpression, + EqualToExpression, GreaterThanEqualToExpression, GreaterThanExpression, ImplicitExpression, + InExpression, LessThanEqualToExpression, LessThanExpression, ListExpression, NotEqualsExpression, NotInExpression, StartsWithExpression} import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} @@ -286,6 +287,12 @@ object CarbonFilters { val (columnName, instance) = getGeoHashHandler(relation.carbonTable) Some(new PolygonRangeListExpression(children.head.toString(), children.last.toString(), columnName, instance)) + case _: BlockPathsUDF => + if (children.size > 1) { + throw new MalformedCarbonCommandException( + "Expected one comma separated values of block paths") + } + Some(new CDCBlockImplicitExpression(children.head.toString())) case _ => None } } diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala index cc65a3d436a..2d606678297 100644 --- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala +++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala @@ -21,23 +21,25 @@ import java.net.URI import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, Predicate, ScalaUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule} import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule import org.apache.spark.sql.types.{DataType, Metadata, StringType} -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF} object CarbonToSparkAdapter extends SparkVersionAdapter { @@ -227,6 +229,10 @@ object CarbonToSparkAdapter extends SparkVersionAdapter { scalaUdf.outputEncoder, scalaUdf.udfName) } + + def evaluateWithPredicate(exp: Expression, schema: Seq[Attribute], row: InternalRow): Any = { + Predicate.createInterpreted(bindReference(exp, schema)).expression.eval(row) + } } class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala index b2c99dad42a..710303988fa 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala @@ -780,6 +780,140 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll { Seq(Row("c", "200"), Row("e", "100"))) } + test("test upsert APIs on partition table") { + sql("drop table if exists target") + val initframe = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 0, "CHINA"), + Row("b", 1, "INDIA"), + Row("c", 2, "INDIA"), + Row("d", 3, "US") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), + StructField("country", StringType)))) + initframe.write + .format("carbondata") + .option("tableName", "target") + .option("partitionColumns", "country") + .mode(SaveMode.Overwrite) + .save() + val target = sqlContext.read.format("carbondata").option("tableName", "target").load() + var cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 7, "CHINA"), + Row("b", 1, "UK"), // b was just deleted once + Row("g", null, "UK"), // c was deleted and then updated twice + Row("e", 3, "US") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), StructField("country", StringType)))) + // upsert API + target.as("A").upsert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("a", 7, "CHINA"), Row("b", 1, "UK"), Row("g", null, "UK"), Row("e", 3, "US"), + Row("c", 2, "INDIA"), Row("d", 3, "US"))) + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 7, "CHINA"), + Row("e", 3, "US") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), StructField("country", StringType)))) + // delete API + target.as("A").delete(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("b", 1, "UK"), Row("g", null, "UK"), Row("c", 2, "INDIA"), Row("d", 3, "US"))) + // update API + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("g", 8, "RUSSIA") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), StructField("country", StringType)))) + target.as("A").update(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("b", 1, "UK"), Row("g", 8, "RUSSIA"), Row("c", 2, "INDIA"), Row("d", 3, "US"))) + // insert API + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("j", 2, "RUSSIA"), + Row("k", 0, "INDIA") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), StructField("country", StringType)))) + target.as("A").insert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("b", 1, "UK"), Row("g", 8, "RUSSIA"), Row("c", 2, "INDIA"), Row("d", 3, "US"), + Row("j", 2, "RUSSIA"), Row("k", 0, "INDIA"))) + } + + test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") { + sql("drop table if exists target") + val initframe = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "0"), + Row("b", "1"), + Row("c", "2"), + Row("d", "3") + ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) + initframe.write + .format("carbondata") + .option("tableName", "target") + .mode(SaveMode.Overwrite) + .save() + val target = sqlContext.read.format("carbondata").option("tableName", "target").load() + var cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "7"), + Row("b", null), // b was just deleted once + Row("g", null), // c was deleted and then updated twice + Row("e", "3") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // upsert API + target.as("A").upsert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("a", "7"), Row("b", null), Row("g", null), Row("e", "3"), Row("c", "2"), + Row("d", "3"))) + + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "7"), // c was deleted and then updated twice + Row("e", "3") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // delete API + target.as("A").delete(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("b", null), Row("g", null), Row("c", "2"), Row("d", "3"))) + + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("g", "56") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // update API + target.as("A").update(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("b", null), Row("g", "56"), Row("c", "2"), Row("d", "3"))) + + cdc = + sqlContext.sparkSession.createDataFrame(Seq( + Row("z", "234"), + Row("x", "2") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", StringType)))) + // insert API + target.as("A").insert(cdc.as("B"), "key").execute() + + checkAnswer(sql("select * from target"), + Seq(Row("b", null), Row("g", "56"), Row("c", "2"), Row("d", "3"), Row("z", "234"), + Row("x", "2"))) + } + test("check the cdc ") { sql("drop table if exists target")