diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 1a8131f0072..75b94bb90ec 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -484,6 +484,9 @@ private CarbonCommonConstants() { // default blocklet size value in MB public static final String TABLE_BLOCKLET_SIZE_DEFAULT = "64"; + // does query with latest segment + public static final String TABLE_QUERY_LATEST_SEGMENT = "query_latest_segment"; + /** * set in column level to disable inverted index * @Deprecated :This property is deprecated, it is kept just for compatibility diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index f24a2f19038..d42d42e0d59 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -18,13 +18,7 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -222,7 +216,9 @@ public List getSplits(JobContext job) throws IOException { */ private List getFilteredSegment(JobContext job, List validSegments, boolean validationRequired, ReadCommittedScope readCommittedScope) throws IOException { - Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope); + // first check for mapreduce.input.carboninputformat.segmentnumbers + // second check for table property of latest_segment for query + Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope, validSegments); if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) { return validSegments; } @@ -421,9 +417,11 @@ public void updateLoadMetaDataDetailsToSegments(List validSegments, /** * return valid segment to access + * check for SET carbon.input.segments.. */ public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + if (segmentString.trim().isEmpty()) { return new Segment[0]; } @@ -601,4 +599,60 @@ public String getSegmentIdFromFilePath(String filePath) { } return CarbonCommonConstants.INVALID_SEGMENT_ID; } + + /** + * return valid segment to access + * first check for mapreduce.input.carboninputformat.segmentnumbers" + * second check for table property of latest_segment for query + */ + public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope, + List validSegments) { + String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + boolean queryLatestSegment = false; + if (null != carbonTable) { + queryLatestSegment = Boolean.parseBoolean(carbonTable.getTableInfo() + .getFactTable().getTableProperties() + .getOrDefault(CarbonCommonConstants.TABLE_QUERY_LATEST_SEGMENT, "false")); + } + if (segmentString.trim().isEmpty()) { + if (!queryLatestSegment) { + return new Segment[0]; + } else { + List segments = getLatestSegment(validSegments); + return segments.toArray(new Segment[0]); + } + } else { + List segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope); + if (!queryLatestSegment) { + return segments.toArray(new Segment[0]); + } else { + List latestSegment; + if (segments.size() > 0 && segments.get(0).getSegmentNo().equalsIgnoreCase("*")) { + latestSegment = getLatestSegment(validSegments); + } else { + latestSegment = getLatestSegment(segments); + } + return latestSegment.toArray(new Segment[0]); + } + } + } + + /** + * get the latest segment + * @param validSegments the in put segment for search + * @return the latest segment for query + */ + public List getLatestSegment(List validSegments) { + if (validSegments.isEmpty()) { + return Collections.emptyList(); + } else { + Segment segment = validSegments.stream().max((a, b) -> { + double aNo = Double.parseDouble(a.getSegmentNo()); + double bNo = Double.parseDouble(b.getSegmentNo()); + return Double.compare(aNo, bNo); + }).get(); + return Collections.singletonList(segment); + } + } + } \ No newline at end of file diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala index eeda653d58c..21b5b414114 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala @@ -273,17 +273,19 @@ object BroadCastSIFilterPushJoin { val identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val readCommittedScope: ReadCommittedScope = carbonTableInputFormat.getReadCommitted(job, identifier) - val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess(job, - readCommittedScope) - val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment] - for (segId <- segmentsToAccess) { - segmentsToAccessSet.add(segId) - } // get all valid segments and set them into the configuration val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) val segments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = segmentStatusManager .getValidAndInvalidSegments(carbonTable.isMV) val validSegments: util.List[Segment] = segments.getValidSegments + + val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess (job, + readCommittedScope, validSegments) + val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment] + for (segId <- segmentsToAccess) { + segmentsToAccessSet.add(segId) + } + // if no segments in table val validSegmentsToAccess: util.List[Segment] = new util.ArrayList[Segment] if (validSegments.size == 0) { diff --git a/integration/spark/src/test/resources/latest-table-data.csv b/integration/spark/src/test/resources/latest-table-data.csv new file mode 100644 index 00000000000..bad595b6279 --- /dev/null +++ b/integration/spark/src/test/resources/latest-table-data.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData +1,10,1100,48.4,,,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' +5,17,1140,43.4,,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' +1,11,1100,44.4,,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' +1,10,1150,43.4,spark,,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' +1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' +3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' +2,,,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' +,10,1100,43.4,spark,,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' +4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5, +1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,,'foo'#'bar'#'world' diff --git a/integration/spark/src/test/resources/latest-table-data1.csv b/integration/spark/src/test/resources/latest-table-data1.csv new file mode 100644 index 00000000000..d12df4bcfde --- /dev/null +++ b/integration/spark/src/test/resources/latest-table-data1.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData +1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' +5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' +1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' +1,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' +1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' +3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' +2,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' +4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world' diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala new file mode 100644 index 00000000000..c7f0532cdc2 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/LatestSegmentTestCases.scala @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + + +class LatestSegmentTestCases extends QueryTest with BeforeAndAfterAll { + val path = s"$resourcesPath/latest-table-data.csv" + val path1 = s"$resourcesPath/latest-table-data1.csv" + + override def beforeAll { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + } + + test("test latest segment query for load") { + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_2 == 5) + assert(last_segment_result_3 == 4) + } + + test("test latest segment query for load, set carbon.input.segments") { + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result == 6) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = 0,1") + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + assert(last_segment_result_2 == 6) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = *") + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + sqlContext.sparkSession.conf.unset("carbon.input.segments.default.latest_table_latest_segment") + assert(last_segment_result_3 == 4) + } + + + test("test latest segment query for partition table ") { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | charField CHAR(5), + | floatField FLOAT + | ) + | partitioned by (dateField DATE) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_2 == 5) + assert(last_segment_result_3 == 4) + } + + test("test latest segment query for partition table set carbon.input.segments") { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sql( + s""" + | CREATE TABLE latest_table_latest_segment( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | charField CHAR(5), + | floatField FLOAT + | ) + | partitioned by (dateField DATE) + | STORED AS carbondata + | TBLPROPERTIES('QUERY_LATEST_SEGMENT'='true') + """.stripMargin) + sql( + s""" + | CREATE TABLE latest_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED AS carbondata + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$path1' + | INTO TABLE latest_table + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + + val last_segment_result = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + val result = sql( + s""" + | SELECT * + | FROM latest_table + | WHERE stringField = 'spark' + """.stripMargin).collect().length + + assert(last_segment_result == 6) + assert(result == 12) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE latest_table_latest_segment + | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') + """.stripMargin) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = 0,1") + + val last_segment_result_2 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE intField = 10 + """.stripMargin).collect().length + assert(last_segment_result_2 == 6) + + sql("SET carbon.input.segments.default.latest_table_latest_segment = *") + val last_segment_result_3 = sql( + s""" + | SELECT * + | FROM latest_table_latest_segment + | WHERE stringField = 'spark' + """.stripMargin).collect().length + assert(last_segment_result_3 == 4) + } + + override def afterAll { + sql("DROP TABLE IF EXISTS latest_table_latest_segment") + sql("DROP TABLE IF EXISTS latest_table") + sqlContext.sparkSession.conf.unset("carbon.input.segments.default.carbon_table") + } +}