Skip to content

Commit

Permalink
remove list files while query and invalid cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ShreelekhyaG committed Jul 8, 2022
1 parent b8511b6 commit 94c40e9
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -480,19 +479,6 @@ public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
return getFiles(listStatus);
}

@Override
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
List<FileStatus> listStatus = new ArrayList<>();
RemoteIterator<LocatedFileStatus> iter = fileSystem.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
listStatus.add(fileStatus);
}
}
return getFiles(listStatus.toArray(new FileStatus[listStatus.size()]));
}

protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
throws IOException {
List<CarbonFile> carbonFiles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;

public interface CarbonFile {
Expand All @@ -41,11 +40,6 @@ public interface CarbonFile {

List<CarbonFile> listDirs() throws IOException;

/**
* It returns list of files with location details.
*/
CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException;

String getName();

boolean isDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.log4j.Logger;
import org.xerial.snappy.SnappyInputStream;
Expand Down Expand Up @@ -442,11 +441,6 @@ public boolean createNewLockFile() throws IOException {
return file.createNewFile();
}

@Override
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) {
return listFiles();
}

@Override
public String[] getLocations() {
return new String[]{"localhost"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,11 +66,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) {

@Override
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
return get(identifierWrapper, null);
}

public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
TableBlockIndexUniqueIdentifier identifier =
identifierWrapper.getTableBlockIndexUniqueIdentifier();
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
Expand All @@ -83,24 +77,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
segInfoCache = new HashMap<>();
}
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the indexes
if (identifier.getMergeIndexFileName() == null) {
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos);
BlockIndex blockIndex =
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
Expand All @@ -120,8 +101,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos);
if (!blockMetaInfoMap.isEmpty()) {
BlockIndex blockIndex =
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
Expand Down Expand Up @@ -157,8 +137,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
public List<BlockletIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
throws IOException {
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
new HashMap<String, Map<String, BlockMetaInfo>>();

List<BlockletIndexWrapper> blockletIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
Expand All @@ -177,7 +155,7 @@ public List<BlockletIndexWrapper> getAll(
}
if (missedIdentifiersWrapper.size() > 0) {
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
blockletIndexWrapper = get(identifierWrapper);
blockletIndexWrappers.add(blockletIndexWrapper);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
Expand All @@ -61,9 +58,6 @@
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.Logger;

public class BlockletIndexUtil {
Expand All @@ -79,8 +73,7 @@ public static Set<TableBlockIndexUniqueIdentifier> getSegmentUniqueIdentifiers(S

public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
SegmentIndexFileStore indexFileStore, Set<String> filesRead,
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
SegmentIndexFileStore indexFileStore, Set<String> filesRead, List<DataFileFooter> indexInfos)
throws IOException {
boolean isTransactionalTable = true;
TableBlockIndexUniqueIdentifier identifier =
Expand Down Expand Up @@ -130,8 +123,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
}
String blockPath = footer.getBlockInfo().getFilePath();
if (null == blockMetaInfoMap.get(blockPath)) {
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(
fileNameToMetaInfoMapping, footer.getBlockInfo());
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo());
// if blockMetaInfo is null that means the file has been deleted from the file system.
// This can happen in case IUD scenarios where after deleting or updating the data the
// complete block is deleted but the entry still exists in index or merge index file
Expand All @@ -143,38 +135,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
return blockMetaInfoMap;
}

/**
* This method will create file name to block Meta Info Mapping. This method will reduce the
* number of nameNode calls and using this method one namenode will fetch 1000 entries
*
* @param segmentFilePath
* @return
* @throws IOException
*/
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
String segmentFilePath, Configuration configuration) throws IOException {
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) {
PathFilter pathFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return CarbonTablePath.isCarbonDataFile(path.getName());
}
};
CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
for (CarbonFile file : carbonFiles) {
String[] location = file.getLocations();
long len = file.getSize();
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
}
}
return fileNameToMetaInfoMapping;
}

private static BlockMetaInfo createBlockMetaInfo(
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, TableBlockInfo blockInfo)
private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo)
throws IOException {
String carbonDataFile = blockInfo.getFilePath();
FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
Expand All @@ -193,7 +154,11 @@ private static BlockMetaInfo createBlockMetaInfo(
CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
default:
return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
if (!FileFactory.isFileExist(carbonDataFile)) {
return null;
}
CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile));
return new BlockMetaInfo(file.getLocations(), file.getSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.common.logging.LogServiceFactory;
Expand All @@ -38,7 +36,6 @@
import org.apache.carbondata.core.index.dev.CacheableIndex;
import org.apache.carbondata.core.index.dev.IndexFactory;
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.BlockletIndexStore;
import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
Expand Down Expand Up @@ -127,8 +124,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit,
Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
private Iterator<TableBlockIndexUniqueIdentifier> iterator;
// Cache to avoid multiple times listing of files
private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Expand All @@ -152,8 +147,7 @@ public boolean nextKeyValue() {
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
false, true, true);
this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
wrapper = ((BlockletIndexStore) cache)
.get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
Expand Down Expand Up @@ -84,6 +84,9 @@ object DeleteExecution {
if (executorErrors.failureCauses == FailureCauses.NONE) {
operatedRowCount = res.flatten.map(_._2._3).sum
}
// clear invalid segments from cache
IndexStoreManager.getInstance()
.clearInvalidSegments(carbonTable, segmentsTobeDeleted.map(_.getSegmentNo).toList.asJava)
(segmentsTobeDeleted, operatedRowCount, isUpdateRequired, tblStatusWriteVersion)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists carbonTable2")
}

test("test cache after delete") {
sql("drop table if exists carbonTable1")
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata")
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
var showCache = sql("show metacache on table carbonTable1").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
sql("delete from carbonTable1 where col3 ='vf'").collect()
showCache = sql("show metacache on table carbonTable1").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached"))
sql("drop table if exists carbonTable1")
}

// Runs only when index server is enabled.
test("test embedded pruning", false) {
val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {
Expand Down

0 comments on commit 94c40e9

Please sign in to comment.