Skip to content

Commit

Permalink
Better performance for reads during resharding
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 22, 2023
1 parent e25babd commit c8e6fc1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public void test() throws Exception {

server.addDiff("diff", testDiff);

server.start();
server.stop();
// server.start();
// server.stop();
}

@Test
Expand All @@ -26,7 +26,7 @@ public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception

server.addDiff("diff", testDiff);

server.start();
server.stop();
// server.start();
// server.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,9 @@ static class ShardsHolder {
final HollowObjectTypeReadStateShard shards[];
final int shardNumberMask;

private ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElements, int[] shardOrdinalShifts) {
int numShards = dataElements.length;
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, dataElements[i], shardOrdinalShifts[i]);
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
private ShardsHolder(HollowObjectTypeReadStateShard[] fromShards) {
this.shards = fromShards;
this.shardNumberMask = fromShards.length - 1;
}

private ShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) {
Expand Down Expand Up @@ -93,8 +88,8 @@ public HollowObjectTypeReadState(HollowObjectSchema schema, HollowObjectTypeData
this.sampler = new HollowObjectSampler(schema, DisabledSamplingDirector.INSTANCE);
this.unfilteredSchema = schema;

int shardOrdinalShift = 0;
this.shardsVolatile = new ShardsHolder(schema, new HollowObjectTypeDataElements[] {dataElements}, new int[] {shardOrdinalShift});
HollowObjectTypeReadStateShard newShard = new HollowObjectTypeReadStateShard(schema, dataElements, 0);
this.shardsVolatile = new ShardsHolder(new HollowObjectTypeReadStateShard[] {newShard});
this.maxOrdinal = dataElements.maxOrdinal;
}

Expand All @@ -118,15 +113,14 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
if(numShards > 1)
maxOrdinal = VarInt.readVInt(in);

HollowObjectTypeDataElements[] snapshotData = new HollowObjectTypeDataElements[numShards];
int shardOrdinalShifts[] = new int[numShards];
HollowObjectTypeReadStateShard[] newShards = new HollowObjectTypeReadStateShard[numShards];
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
for(int i=0; i<numShards; i++) {
snapshotData[i] = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData[i].readSnapshot(in, unfilteredSchema);
shardOrdinalShifts[i] = 31 - Integer.numberOfLeadingZeros(numShards);
HollowObjectTypeDataElements shardDataElements = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
shardDataElements.readSnapshot(in, unfilteredSchema);
newShards[i] = new HollowObjectTypeReadStateShard(getSchema(), shardDataElements, shardOrdinalShift);
}

shardsVolatile = new ShardsHolder(getSchema(), snapshotData, shardOrdinalShifts);
shardsVolatile = new ShardsHolder(newShards);

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;
Expand Down Expand Up @@ -261,9 +255,8 @@ void reshard(int newNumShards) {
newDataElements = new HollowObjectTypeDataElements[shardsVolatile.shards.length];
shardOrdinalShifts = new int[shardsVolatile.shards.length];
copyShardElements(shardsVolatile, newDataElements, shardOrdinalShifts);
shardsVolatile = new ShardsHolder(schema,
Arrays.copyOfRange(newDataElements, 0, newNumShards),
Arrays.copyOfRange(shardOrdinalShifts, 0, newNumShards));
shardsVolatile = new ShardsHolder(Arrays.copyOfRange(shardsVolatile.shards, 0, newNumShards));

// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a join of original data elements.
Expand All @@ -290,56 +283,43 @@ ShardsHolder joinDataElementsForOneShard(ShardsHolder shardsHolder, int currentI
int newNumShards = shardsHolder.shards.length / shardingFactor;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowObjectTypeDataElements[] joinCandidates = joinCandidates(shardsHolder.shards, currentIndex, shardingFactor);

HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner();
HollowObjectTypeDataElements[] joinCandidates = joinCandidates(shardsHolder.shards, currentIndex, shardingFactor);
HollowObjectTypeDataElements joined = joiner.join(joinCandidates);

HollowObjectTypeDataElements[] newDataElements = new HollowObjectTypeDataElements[shardsHolder.shards.length];
int[] shardOrdinalShifts = new int[shardsHolder.shards.length];
copyShardElements(shardsHolder, newDataElements, shardOrdinalShifts);

HollowObjectTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.shards, shardsHolder.shards.length);
for (int i=0; i<shardingFactor; i++) {
newDataElements[currentIndex + (newNumShards*i)] = joined;
shardOrdinalShifts[currentIndex + (newNumShards*i)] = newShardOrdinalShift;
};
return new ShardsHolder(schema, newDataElements, shardOrdinalShifts);
newShards[currentIndex + (newNumShards*i)] = new HollowObjectTypeReadStateShard(getSchema(), joined, newShardOrdinalShift);
}
return new ShardsHolder(newShards);
}

ShardsHolder expandWithOriginalDataElements(ShardsHolder shardsHolder, int shardingFactor) {
int prevNumShards = shardsHolder.shards.length;
int newNumShards = prevNumShards * shardingFactor;
HollowObjectTypeDataElements[] newDataElements = new HollowObjectTypeDataElements[newNumShards];
int[] shardOrdinalShifts = new int[newNumShards];
HollowObjectTypeReadStateShard[] newShards = new HollowObjectTypeReadStateShard[newNumShards];

for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newDataElements[i+(prevNumShards*j)] = shardsHolder.shards[i].dataElements;
shardOrdinalShifts[i+(prevNumShards*j)] = 31 - Integer.numberOfLeadingZeros(prevNumShards);
newShards[i+(prevNumShards*j)] = shardsHolder.shards[i];
}
}
return new ShardsHolder(schema, newDataElements, shardOrdinalShifts);
return new ShardsHolder(newShards);
}

ShardsHolder splitDataElementsForOneShard(ShardsHolder shardsHolder, int currentIndex, int prevNumShards, int shardingFactor) {
int newNumShards = shardsHolder.shards.length;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowObjectTypeDataElements dataElementsToSplit = shardsHolder.shards[currentIndex].dataElements;

HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter();
HollowObjectTypeDataElements dataElementsToSplit = shardsHolder.shards[currentIndex].dataElements;
HollowObjectTypeDataElements[] splits = splitter.split(dataElementsToSplit, shardingFactor);

HollowObjectTypeDataElements[] newDataElements = new HollowObjectTypeDataElements[shardsHolder.shards.length];
int[] shardOrdinalShifts = new int[shardsHolder.shards.length];
copyShardElements(shardsHolder, newDataElements, shardOrdinalShifts);

HollowObjectTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.shards, shardsHolder.shards.length);
for (int i = 0; i < shardingFactor; i ++) {
newDataElements[currentIndex + (prevNumShards*i)] = splits[i];
shardOrdinalShifts[currentIndex + (prevNumShards*i)] = newShardOrdinalShift;
newShards[currentIndex + (prevNumShards*i)] = new HollowObjectTypeReadStateShard(getSchema(), splits[i], newShardOrdinalShift);
}

return new ShardsHolder(schema, newDataElements, shardOrdinalShifts);
return new ShardsHolder(newShards);
}

private void destroyOriginalDataElements(HollowObjectTypeDataElements dataElements) {
Expand Down Expand Up @@ -375,7 +355,7 @@ public boolean isNull(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
fixedLengthValue = shard.isNull(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

switch(((HollowObjectSchema) schema).getFieldType(fieldIndex)) {
case BYTES:
Expand Down Expand Up @@ -403,7 +383,7 @@ public int readOrdinal(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
refOrdinal = shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(refOrdinal == shard.dataElements.nullValueForField[fieldIndex])
return ORDINAL_NONE;
Expand All @@ -422,7 +402,7 @@ public int readInt(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(value == shard.dataElements.nullValueForField[fieldIndex])
return Integer.MIN_VALUE;
Expand All @@ -441,7 +421,7 @@ public float readFloat(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(value == HollowObjectWriteRecord.NULL_FLOAT_BITS)
return Float.NaN;
Expand All @@ -460,7 +440,7 @@ public double readDouble(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS)
return Double.NaN;
Expand All @@ -479,7 +459,7 @@ public long readLong(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(value == shard.dataElements.nullValueForField[fieldIndex])
return Long.MIN_VALUE;
Expand All @@ -498,7 +478,7 @@ public Boolean readBoolean(int ordinal, int fieldIndex) {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

if(value == shard.dataElements.nullValueForField[fieldIndex])
return null;
Expand Down Expand Up @@ -528,10 +508,10 @@ public byte[] readBytes(int ordinal, int fieldIndex) {
currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex);
endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField);
startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0;
} while (readWasUnsafe(shardsHolder));
} while (readWasUnsafe(shardsHolder, ordinal, shard));

result = shard.readBytes(startByte, endByte, numBitsForField, fieldIndex);
} while (readWasUnsafe(shardsHolder));
} while (readWasUnsafe(shardsHolder, ordinal, shard));

return result;
}
Expand Down Expand Up @@ -559,10 +539,10 @@ public String readString(int ordinal, int fieldIndex) {
currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex);
endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField);
startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0;
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

result = shard.readString(startByte, endByte, numBitsForField, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

return result;
}
Expand Down Expand Up @@ -590,10 +570,10 @@ public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue)
currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex);
endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField);
startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0;
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

result = shard.isStringFieldEqual(startByte, endByte, numBitsForField, fieldIndex, testValue);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

return result;
}
Expand Down Expand Up @@ -621,15 +601,15 @@ public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) {
currentBitOffset = shard.fieldOffset(shardOrdinal, fieldIndex);
endByte = shard.dataElements.fixedLengthData.getElementValue(currentBitOffset, numBitsForField);
startByte = shardOrdinal != 0 ? shard.dataElements.fixedLengthData.getElementValue(currentBitOffset - shard.dataElements.bitsPerRecord, numBitsForField) : 0;
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

hashCode = shard.findVarLengthFieldHashCode(startByte, endByte, numBitsForField, fieldIndex);
} while(readWasUnsafe(shardsHolder));
} while(readWasUnsafe(shardsHolder, ordinal, shard));

return hashCode;
}

private boolean readWasUnsafe(ShardsHolder shardsHolder) {
private boolean readWasUnsafe(ShardsHolder shardsHolder, int ordinal, HollowObjectTypeReadStateShard shard) {
// Use a load (acquire) fence to constrain the compiler reordering prior plain loads so
// that they cannot "float down" below the volatile load of shardsVolatile.
// This ensures data is checked against current shard holder *after* optimistic calculations
Expand Down Expand Up @@ -657,7 +637,14 @@ private boolean readWasUnsafe(ShardsHolder shardsHolder) {
// [Comment credit: Paul Sandoz]
//
HollowUnsafeHandle.getUnsafe().loadFence();
return shardsHolder != shardsVolatile;
ShardsHolder currShardsHolder = shardsVolatile;
// Validate against the underlying shard so that, during re-sharding, the maximum times a read will be invalidated
// is 3: when shards are expanded or truncated, when a shard is affected by a split or join, and finally when
// delta is applied to a shard. If only shardsHolder was checked here, the worst-case scenario could lead to
// read invalidation (numShards+2) times: once for shards expansion/truncation, once for split/join on any shard, and
// then once when delta is applied.
return shardsHolder != currShardsHolder
&& (currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask] != shard);
}

/**
Expand Down Expand Up @@ -688,11 +675,11 @@ protected void invalidate() {
stateListeners = EMPTY_LISTENERS;
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
int numShards = shards.length;
HollowObjectTypeDataElements[] nullDataElements = new HollowObjectTypeDataElements[numShards];
int[] shardOridnalShifts = new int[numShards];
for (int i=0;i<numShards;i++)
shardOridnalShifts[i] = shards[i].shardOrdinalShift;
this.shardsVolatile = new ShardsHolder(getSchema(), nullDataElements, shardOridnalShifts);
HollowObjectTypeReadStateShard[] newShards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0;i<numShards;i++) {
newShards[i] = new HollowObjectTypeReadStateShard(getSchema(), null, shards[i].shardOrdinalShift);
}
this.shardsVolatile = new ShardsHolder(newShards);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.write.HollowObjectTypeWriteState;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -71,8 +72,13 @@ public void testResharding() throws Exception {
{
for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(1000))
{
final int iterNumRecords = numRecords;
HollowObjectTypeReadState objectTypeReadState = populateTypeStateWith(numRecords);
assertDataUnchanged(objectTypeReadState, numRecords);
CompletableFuture<Void> reads = CompletableFuture.runAsync(() -> {
for (int i=0; i<100; i++) {
assertDataUnchanged(objectTypeReadState, iterNumRecords);
}
});

// Splitting shards
{
Expand All @@ -95,6 +101,10 @@ public void testResharding() throws Exception {
assertEquals(shardingFactor * newShardCount, prevShardCount);
}
assertDataUnchanged(objectTypeReadState, numRecords);

reads.get();
if(reads.isCompletedExceptionally())
throw new IllegalStateException();
}
}
}
Expand Down

0 comments on commit c8e6fc1

Please sign in to comment.