Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add new IndexLogEntryTags to cache InMemoryFileIndex (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby authored Jan 29, 2021
1 parent b64ef85 commit cd9a632
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,13 @@ case class IndexLogEntry(
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag).foreach { return _ }
val ret = f
setTagValue(plan, tag, ret)
ret
getTagValue(plan, tag) match {
case Some(v) => v
case None =>
val ret = f
setTagValue(plan, tag, ret)
ret
}
}

def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = {
Expand All @@ -595,10 +598,7 @@ case class IndexLogEntry(
}

def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(tag).foreach { return _ }
val ret = f
setTagValue(tag, ret)
ret
withCachedTag(null, tag)(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex

object IndexLogEntryTags {
// HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for the index or not.
val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] =
Expand All @@ -37,4 +39,18 @@ object IndexLogEntryTags {
// HYBRIDSCAN_RELATED_CONFIGS contains Seq of value strings of Hybrid Scan related configs.
val HYBRIDSCAN_RELATED_CONFIGS: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("hybridScanRelatedConfigs")

// INMEMORYFILEINDEX_INDEX_ONLY stores InMemoryFileIndex for index only scan.
val INMEMORYFILEINDEX_INDEX_ONLY: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexIndexOnly")

// INMEMORYFILEINDEX_HYBRID_SCAN stores InMemoryFileIndex including index data files and also
// appended files for Hybrid Scan.
val INMEMORYFILEINDEX_HYBRID_SCAN: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScan")

// INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED stores InMemoryFileIndex including only appended files
// for Hybrid Scan.
val INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended")
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ object RuleUtils {
// Project(A,B) -> Filter(C = 10) -> Index Scan (A,B,C)
plan transformDown {
case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) =>
val location =
val location = index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY) {
new InMemoryFileIndex(spark, index.content.files, Map(), None)
}

val relation = new IndexHadoopFsRelation(
location,
new StructType(),
Expand Down Expand Up @@ -382,7 +384,14 @@ object RuleUtils {
baseRelation.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
IndexConstants.DATA_FILE_NAME_ID))))

val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None)
def fileIndex: InMemoryFileIndex =
new InMemoryFileIndex(spark, filesToRead, Map(), None)
val newLocation = if (filesToRead.length == index.content.files.size) {
index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fileIndex)
} else {
index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fileIndex)
}

val relation = new IndexHadoopFsRelation(
newLocation,
new StructType(),
Expand Down Expand Up @@ -416,7 +425,7 @@ object RuleUtils {
// For more details, see https://github.com/microsoft/hyperspace/issues/150.

val planForAppended =
transformPlanToReadAppendedFiles(spark, index.schema, plan, unhandledAppendedFiles)
transformPlanToReadAppendedFiles(spark, index, plan, unhandledAppendedFiles)
if (useBucketSpec) {
// If Bucketing information of the index is used to read the index data, we need to
// shuffle the appended data in the same way to correctly merge with bucketed index data.
Expand Down Expand Up @@ -447,14 +456,14 @@ object RuleUtils {
* by using [[BucketUnion]] or [[Union]].
*
* @param spark Spark session.
* @param indexSchema Index schema used for the output.
* @param index Index used in transformation of plan.
* @param originalPlan Original plan.
* @param filesAppended Appended files to the source relation.
* @return Transformed linear logical plan for appended files.
*/
private def transformPlanToReadAppendedFiles(
spark: SparkSession,
indexSchema: StructType,
index: IndexLogEntry,
originalPlan: LogicalPlan,
filesAppended: Seq[Path]): LogicalPlan = {
// Transform the location of LogicalRelation with appended files.
Expand All @@ -474,14 +483,18 @@ object RuleUtils {
}
.getOrElse(Map())

val newLocation = new InMemoryFileIndex(spark, filesAppended, options, None)
val newLocation = index.withCachedTag(
originalPlan,
IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED) {
new InMemoryFileIndex(spark, filesAppended, options, None)
}
// Set the same output schema with the index plan to merge them using BucketUnion.
// Include partition columns for data loading.
val partitionColumns = location.partitionSchema.map(_.name)
val updatedSchema = StructType(baseRelation.schema.filter(col =>
indexSchema.contains(col) || location.partitionSchema.contains(col)))
index.schema.contains(col) || location.partitionSchema.contains(col)))
val updatedOutput = baseOutput.filter(attr =>
indexSchema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
val newRelation = fsRelation.copy(
location = newLocation,
dataSchema = updatedSchema,
Expand Down
Loading

0 comments on commit cd9a632

Please sign in to comment.