From 6a00ec2ae510d14066bf826097a823910788fa61 Mon Sep 17 00:00:00 2001 From: litao Date: Fri, 30 Jul 2021 22:07:26 +0800 Subject: [PATCH] Scenario: the number of data rows does not change. The data of each column is increasing and changing. At this scenario, it is faster load full data each time using load command. In this case, the query only needs to query the latest segment Need a way to control the table do like this. --- .../core/constants/CarbonCommonConstants.java | 3 + .../hadoop/api/CarbonTableInputFormat.java | 68 ++- .../joins/BroadCastSIFilterPushJoin.scala | 14 +- .../src/test/resources/latest-table-data.csv | 11 + .../src/test/resources/latest-table-data1.csv | 11 + .../carbondata/LatestSegmentTestCases.scala | 403 ++++++++++++++++++ 6 files changed, 496 insertions(+), 14 deletions(-) create mode 100644 integration/spark/src/test/resources/latest-table-data.csv create mode 100644 integration/spark/src/test/resources/latest-table-data1.csv create mode 100644 integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 1a8131f0072..75b94bb90ec 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -484,6 +484,9 @@ private CarbonCommonConstants() { // default blocklet size value in MB public static final String TABLE_BLOCKLET_SIZE_DEFAULT = "64"; + // does query with latest segment + public static final String TABLE_QUERY_LATEST_SEGMENT = "query_latest_segment"; + /** * set in column level to disable inverted index * @Deprecated :This property is deprecated, it is kept just for compatibility diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index f24a2f19038..58ed09d3e91 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -18,13 +18,7 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -222,7 +216,9 @@ public List getSplits(JobContext job) throws IOException { */ private List getFilteredSegment(JobContext job, List validSegments, boolean validationRequired, ReadCommittedScope readCommittedScope) throws IOException { - Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope); + // first check for mapreduce.input.carboninputformat.segmentnumbers + // second check for table property of latest_segment for query + Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope, validSegments); if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) { return validSegments; } @@ -421,9 +417,11 @@ public void updateLoadMetaDataDetailsToSegments(List validSegments, /** * return valid segment to access + * check for SET carbon.input.segments.. */ public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + if (segmentString.trim().isEmpty()) { return new Segment[0]; } @@ -601,4 +599,58 @@ public String getSegmentIdFromFilePath(String filePath) { } return CarbonCommonConstants.INVALID_SEGMENT_ID; } + + /** + * return valid segment to access + * first check for mapreduce.input.carboninputformat.segmentnumbers" + * second check for table property of latest_segment for query + */ + public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope, + List validSegments) { + String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + boolean queryLatestSegment = Boolean.parseBoolean(carbonTable.getTableInfo() + .getFactTable().getTableProperties() + .getOrDefault(CarbonCommonConstants.TABLE_QUERY_LATEST_SEGMENT, "false")); + + if (segmentString.trim().isEmpty()) { + if (!queryLatestSegment) { + return new Segment[0]; + } else { + List segments = getLatestSegment(validSegments); + return segments.toArray(new Segment[0]); + } + } else { + List segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope); + if (!queryLatestSegment) { + return segments.toArray(new Segment[0]); + } else { + List latestSegment; + if (segments.size() > 0 && segments.get(0).getSegmentNo().equalsIgnoreCase("*")) { + latestSegment = getLatestSegment(validSegments); + } else { + latestSegment = getLatestSegment(segments); + } + return latestSegment.toArray(new Segment[0]); + } + } + } + + /** + * get the latest segment + * @param validSegments the in put segment for search + * @return the latest segment for query + */ + public List getLatestSegment(List validSegments) { + if (validSegments.isEmpty()) { + return Collections.emptyList(); + } else { + Segment segment = validSegments.stream().max((a, b) -> { + double aNo = Double.parseDouble(a.getSegmentNo()); + double bNo = Double.parseDouble(b.getSegmentNo()); + return Double.compare(aNo, bNo); + }).get(); + return Collections.singletonList(segment); + } + } + } \ No newline at end of file diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala index eeda653d58c..21b5b414114 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala @@ -273,17 +273,19 @@ object BroadCastSIFilterPushJoin { val identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val readCommittedScope: ReadCommittedScope = carbonTableInputFormat.getReadCommitted(job, identifier) - val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess(job, - readCommittedScope) - val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment] - for (segId <- segmentsToAccess) { - segmentsToAccessSet.add(segId) - } // get all valid segments and set them into the configuration val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) val segments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = segmentStatusManager .getValidAndInvalidSegments(carbonTable.isMV) val validSegments: util.List[Segment] = segments.getValidSegments + + val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess (job, + readCommittedScope, validSegments) + val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment] + for (segId <- segmentsToAccess) { + segmentsToAccessSet.add(segId) + } + // if no segments in table val validSegmentsToAccess: util.List[Segment] = new util.ArrayList[Segment] if (validSegments.size == 0) { diff --git a/integration/spark/src/test/resources/latest-table-data.csv b/integration/spark/src/test/resources/latest-table-data.csv new file mode 100644 index 00000000000..bad595b6279 --- /dev/null +++ b/integration/spark/src/test/resources/latest-table-data.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData +1,10,1100,48.4,,,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' +5,17,1140,43.4,,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' +1,11,1100,44.4,,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' +1,10,1150,43.4,spark,,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' +1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' +3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' +2,,,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' +,10,1100,43.4,spark,,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' +4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5, +1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,,'foo'#'bar'#'world' diff --git a/integration/spark/src/test/resources/latest-table-data1.csv b/integration/spark/src/test/resources/latest-table-data1.csv new file mode 100644 index 00000000000..d12df4bcfde --- /dev/null +++ b/integration/spark/src/test/resources/latest-table-data1.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData +1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' +5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' +1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' +1,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' +1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' +3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' +2,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' +4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world' diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala new file mode 100644 index 00000000000..c7f0532cdc2 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + + +class LatestSegmentTestCases extends QueryTest with BeforeAndAfterAll { + val path = s"$resourcesPath/latest-table-data.csv" + val path1 = s"$resourcesPath/latest-table-data1.csv" + + override def beforeAll { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + } + + test("test latest segment query for load") { + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_2 == 5) + assert(last_segment_result_3 == 4) + } + + test("test latest segment query for load, set carbon.input.segments") { + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result == 6) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = 0,1") + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + assert(last_segment_result_2 == 6) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = *") + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + sqlContext.sparkSession.conf.unset("carbon.input.segments.default.latest_table_latest_segment") + assert(last_segment_result_3 == 4) + } + + + test("test latest segment query for partition table ") { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | charField CHAR(5), + | floatField FLOAT + | ) + | partitioned by (dateField DATE) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_2 == 5) + assert(last_segment_result_3 == 4) + } + + test("test latest segment query for partition table set carbon.input.segments") { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | charField CHAR(5), + | floatField FLOAT + | ) + | partitioned by (dateField DATE) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = 0,1") + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + assert(last_segment_result_2 == 6) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = *") + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_3 == 4) + } + + override def afterAll { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sqlContext.sparkSession.conf.unset("carbon.input.segments.default.carbon_table") + } +}