diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 25337c72b01..c7f1f0fe6c0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -480,19 +479,6 @@ public List listFiles(Boolean recursive) throws IOException { return getFiles(listStatus); } - @Override - public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException { - List listStatus = new ArrayList<>(); - RemoteIterator iter = fileSystem.listLocatedStatus(path); - while (iter.hasNext()) { - LocatedFileStatus fileStatus = iter.next(); - if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) { - listStatus.add(fileStatus); - } - } - return getFiles(listStatus.toArray(new FileStatus[listStatus.size()])); - } - protected List getFiles(RemoteIterator listStatus) throws IOException { List carbonFiles = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java index 1b439e14729..5943e474f4f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; public interface CarbonFile { @@ -41,11 +40,6 @@ public interface CarbonFile { List listDirs() throws IOException; - /** - * It returns list of files with location details. - */ - CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException; - String getName(); boolean isDirectory(); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index 9cc442101de..1855d59cf8c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -49,7 +49,6 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.log4j.Logger; import org.xerial.snappy.SnappyInputStream; @@ -442,11 +441,6 @@ public boolean createNewLockFile() throws IOException { return file.createNewFile(); } - @Override - public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) { - return listFiles(); - } - @Override public String[] getLocations() { return new String[]{"localhost"}; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java index 14d0f775226..2320bdc579d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,11 +66,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) { @Override public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) { - return get(identifierWrapper, null); - } - - public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper, - Map> segInfoCache) { TableBlockIndexUniqueIdentifier identifier = identifierWrapper.getTableBlockIndexUniqueIdentifier(); String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); @@ -83,24 +77,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(identifierWrapper.getConfiguration()); Set filesRead = new HashSet<>(); - String segmentFilePath = identifier.getIndexFilePath(); - if (segInfoCache == null) { - segInfoCache = new HashMap<>(); - } - Map carbonDataFileBlockMetaInfoMapping = - segInfoCache.get(segmentFilePath); - if (carbonDataFileBlockMetaInfoMapping == null) { - carbonDataFileBlockMetaInfoMapping = - BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath, - identifierWrapper.getConfiguration()); - segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping); - } // if the identifier is not a merge file we can directly load the indexes if (identifier.getMergeIndexFileName() == null) { List indexInfos = new ArrayList<>(); Map blockMetaInfoMap = BlockletIndexUtil - .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, - carbonDataFileBlockMetaInfoMapping, indexInfos); + .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos); BlockIndex blockIndex = loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap, identifierWrapper.getCarbonTable(), @@ -120,8 +101,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie List indexInfos = new ArrayList<>(); Map blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap( new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier, - identifierWrapper.getCarbonTable()), indexFileStore, filesRead, - carbonDataFileBlockMetaInfoMapping, indexInfos); + identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos); if (!blockMetaInfoMap.isEmpty()) { BlockIndex blockIndex = loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap, @@ -157,8 +137,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie public List getAll( List tableSegmentUniqueIdentifiers) throws IOException { - Map> segInfoCache = - new HashMap>(); List blockletIndexWrappers = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); @@ -177,7 +155,7 @@ public List getAll( } if (missedIdentifiersWrapper.size() > 0) { for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) { - blockletIndexWrapper = get(identifierWrapper, segInfoCache); + blockletIndexWrapper = get(identifierWrapper); blockletIndexWrappers.add(blockletIndexWrapper); } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java index 6f427682e27..8352a44a40a 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java @@ -31,16 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.index.Segment; import org.apache.carbondata.core.indexstore.BlockMetaInfo; @@ -61,9 +58,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.Logger; public class BlockletIndexUtil { @@ -79,8 +73,7 @@ public static Set getSegmentUniqueIdentifiers(S public static Map getBlockMetaInfoMap( TableBlockIndexUniqueIdentifierWrapper identifierWrapper, - SegmentIndexFileStore indexFileStore, Set filesRead, - Map fileNameToMetaInfoMapping, List indexInfos) + SegmentIndexFileStore indexFileStore, Set filesRead, List indexInfos) throws IOException { boolean isTransactionalTable = true; TableBlockIndexUniqueIdentifier identifier = @@ -130,8 +123,7 @@ public static Map getBlockMetaInfoMap( } String blockPath = footer.getBlockInfo().getFilePath(); if (null == blockMetaInfoMap.get(blockPath)) { - BlockMetaInfo blockMetaInfo = createBlockMetaInfo( - fileNameToMetaInfoMapping, footer.getBlockInfo()); + BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo()); // if blockMetaInfo is null that means the file has been deleted from the file system. // This can happen in case IUD scenarios where after deleting or updating the data the // complete block is deleted but the entry still exists in index or merge index file @@ -143,38 +135,7 @@ public static Map getBlockMetaInfoMap( return blockMetaInfoMap; } - /** - * This method will create file name to block Meta Info Mapping. This method will reduce the - * number of nameNode calls and using this method one namenode will fetch 1000 entries - * - * @param segmentFilePath - * @return - * @throws IOException - */ - public static Map createCarbonDataFileBlockMetaInfoMapping( - String segmentFilePath, Configuration configuration) throws IOException { - Map fileNameToMetaInfoMapping = new TreeMap(); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration); - if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) { - PathFilter pathFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return CarbonTablePath.isCarbonDataFile(path.getName()); - } - }; - CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter); - for (CarbonFile file : carbonFiles) { - String[] location = file.getLocations(); - long len = file.getSize(); - BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len); - fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo); - } - } - return fileNameToMetaInfoMapping; - } - - private static BlockMetaInfo createBlockMetaInfo( - Map fileNameToMetaInfoMapping, TableBlockInfo blockInfo) + private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo) throws IOException { String carbonDataFile = blockInfo.getFilePath(); FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile); @@ -193,7 +154,11 @@ private static BlockMetaInfo createBlockMetaInfo( CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize()); default: - return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile)); + if (!FileFactory.isFileExist(carbonDataFile)) { + return null; + } + CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile)); + return new BlockMetaInfo(file.getLocations(), file.getSize()); } } diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java index 19a48eeec33..dab102d3cae 100644 --- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java +++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java @@ -20,11 +20,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -38,7 +36,6 @@ import org.apache.carbondata.core.index.dev.CacheableIndex; import org.apache.carbondata.core.index.dev.IndexFactory; import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper; -import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.BlockletIndexStore; import org.apache.carbondata.core.indexstore.BlockletIndexWrapper; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; @@ -127,8 +124,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit, Cache cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX); private Iterator iterator; - // Cache to avoid multiple times listing of files - private Map> segInfoCache = new HashMap<>(); @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) @@ -152,8 +147,7 @@ public boolean nextKeyValue() { new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table, false, true, true); this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper; - wrapper = ((BlockletIndexStore) cache) - .get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache); + wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper); return true; } return false; diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 1f42421fdf9..790f963479b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -37,7 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.index.{IndexStoreManager, Segment} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} @@ -84,6 +84,9 @@ object DeleteExecution { if (executorErrors.failureCauses == FailureCauses.NONE) { operatedRowCount = res.flatten.map(_._2._3).sum } + // clear invalid segments from cache + IndexStoreManager.getInstance() + .clearInvalidSegments(carbonTable, segmentsTobeDeleted.map(_.getSegmentNo).toList.asJava) (segmentsTobeDeleted, operatedRowCount, isUpdateRequired, tblStatusWriteVersion) } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala index a282fe01447..22d2a571f26 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala @@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll { sql("drop table if exists carbonTable2") } + test("test cache after delete") { + sql("drop table if exists carbonTable1") + sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata") + sql("insert into carbonTable1 select 1, 'ab', 'vf'") + sql("insert into carbonTable1 select 1, 'ab', 'vf'") + var showCache = sql("show metacache on table carbonTable1").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached")) + sql("delete from carbonTable1 where col3 ='vf'").collect() + showCache = sql("show metacache on table carbonTable1").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached")) + sql("drop table if exists carbonTable1") + } + // Runs only when index server is enabled. test("test embedded pruning", false) { val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {