-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hubspot cell balancer & normalizer #126
base: hubspot-2.5
Are you sure you want to change the base?
Changes from 126 commits
9c62045
e4f5a14
aad121f
9b5002b
9a954dd
6271c26
995b8cb
d94d862
8202674
f83dc2e
8c6c48c
ab52ea6
0ac73bb
275ba6b
1f58743
57205da
a9e1547
b59c17c
840496d
d7081eb
856e440
7df5fc2
a1d849f
45c8182
1a67f81
5328064
4490e1b
580e31c
ba831d9
cdd6e77
426aeca
4a0a7fa
867f492
6120bc8
b3a664b
a3b9b88
45ed606
941d2d2
0123942
43fb3da
3d45b4d
cf1064d
04fecfe
35adc46
3a744ab
fe67705
1d9b7ef
0ac31f5
06c2ad5
d25a9ac
235fe34
3517598
c4c6296
1b1bc44
6ac45ba
583aca9
26b4f21
bbed188
dbe3263
41a0b41
1873181
adf28a6
06e6b83
e270d6a
4f61691
c6a84ec
f40e83b
7011733
bd83602
5d69edc
d8cef32
e1f1da1
eb52446
eca1dfe
d68f91a
aad1e1b
5a1dc79
9199cd8
a986195
0677ff3
258caf9
e95ef44
221a8c4
20cbb95
9eedc86
3526567
83dad7f
378ad33
faf41c9
c0895ba
5ba25ae
3bd6efb
61b89c5
66b2adf
ddb3017
bf9fe28
6044fc7
edc89ff
6a7511b
ff1ef1f
e0695c0
e6a9d7c
7b69247
9a41c05
6b2df75
4cae482
28b0271
f8085d5
e8361df
ca45b36
b8ac383
ce825c9
5a42bb3
b685173
cf40b93
b7f108a
2a22aea
d87742d
0f67a71
b425e9a
d8e9f9a
2337b4c
d5b2b9a
6caeb48
e865850
f8d48d9
70bc20f
36d4fd2
f0cf9ff
71ecc2b
da0834e
59b41c2
25e0cf9
c2fde1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
package org.apache.hadoop.hbase.hubspot; | ||
|
||
import org.agrona.collections.Int2IntCounterMap; | ||
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.hadoop.hbase.client.RegionInfo; | ||
import org.apache.hadoop.hbase.client.RegionInfoBuilder; | ||
import org.apache.hadoop.hbase.util.Bytes; | ||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; | ||
import org.apache.hbase.thirdparty.com.google.common.primitives.Shorts; | ||
import org.apache.hbase.thirdparty.com.google.gson.ExclusionStrategy; | ||
import org.apache.hbase.thirdparty.com.google.gson.FieldAttributes; | ||
import org.apache.hbase.thirdparty.com.google.gson.Gson; | ||
import org.apache.hbase.thirdparty.com.google.gson.GsonBuilder; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonArray; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonDeserializationContext; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonDeserializer; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonElement; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonObject; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonParseException; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializationContext; | ||
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import java.lang.reflect.Field; | ||
import java.lang.reflect.Type; | ||
import java.util.Arrays; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
@InterfaceAudience.Private | ||
public final class HubSpotCellUtilities { | ||
// TODO: this should be dynamically configured, not hard-coded, but this dramatically simplifies the initial version | ||
public static final short MAX_CELL_COUNT = 360; | ||
public static final int MAX_CELLS_PER_RS = 36; | ||
|
||
public static final Gson OBJECT_MAPPER = new GsonBuilder() | ||
.excludeFieldsWithoutExposeAnnotation() | ||
.enableComplexMapKeySerialization() | ||
.registerTypeAdapter(Int2IntCounterMap.class, new Int2IntCounterMapAdapter()) | ||
.registerTypeAdapter(RegionInfo.class, (JsonDeserializer) (json, typeOfT, context) -> { | ||
JsonObject obj = json.getAsJsonObject(); | ||
|
||
boolean split = obj.get("split").getAsBoolean(); | ||
long regionId = obj.get("regionId").getAsLong(); | ||
int replicaId = obj.get("replicaId").getAsInt(); | ||
JsonObject tableName = obj.get("tableName").getAsJsonObject(); | ||
JsonArray startKey = obj.get("startKey").getAsJsonArray(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An array of bytes? Wouldn't this be better as a Base64 encoded string or some such? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, you may want to extend/use/copy the existing Json configuration produced by GsonFactory ; it provides byte[] serialization. |
||
JsonArray endKey = obj.get("endKey").getAsJsonArray(); | ||
|
||
byte[] startKeyBytes = new byte[startKey.size()]; | ||
byte[] endKeyBytes = new byte[endKey.size()]; | ||
|
||
for (int i = 0; i < startKey.size(); i++) { | ||
startKeyBytes[i] = startKey.get(i).getAsByte(); | ||
} | ||
for (int i = 0; i < endKey.size(); i++) { | ||
endKeyBytes[i] = endKey.get(i).getAsByte(); | ||
} | ||
|
||
TableName tb = TableName.valueOf( | ||
tableName.get("namespaceAsString").getAsString(), | ||
tableName.get("qualifierAsString").getAsString() | ||
); | ||
|
||
RegionInfo result = | ||
RegionInfoBuilder.newBuilder(tb).setSplit(split).setRegionId(regionId) | ||
.setReplicaId(replicaId).setStartKey(startKeyBytes).setEndKey(endKeyBytes).build(); | ||
return result; | ||
}) | ||
.addDeserializationExclusionStrategy(new ExclusionStrategy() { | ||
@Override public boolean shouldSkipField(FieldAttributes f) { | ||
return f.getName().equals("serversToIndex") | ||
|| f.getName().equals("regionsToIndex") | ||
|| f.getName().equals("clusterState") | ||
; | ||
} | ||
|
||
@Override public boolean shouldSkipClass(Class<?> clazz) { | ||
return false; | ||
} | ||
}) | ||
.create(); | ||
|
||
public static final ImmutableSet<String> CELL_AWARE_TABLES = ImmutableSet.of("objects-3"); | ||
|
||
private HubSpotCellUtilities() {} | ||
|
||
public static String toCellSetString(Set<Short> cells) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a name conflict -- HBase already uses the term "Cell" extensively, to mean a key-value pair with all of its trimmings. If you want this code to colocate in HBase, I recommend a name like "tenant partition" or something like that. |
||
return cells.stream().sorted().map(x -> Short.toString(x)).collect(Collectors.joining(", ", "{", "}")); | ||
} | ||
|
||
public static boolean isStopInclusive(byte[] endKey) { | ||
return (endKey == null || endKey.length != 2) && (endKey == null || endKey.length <= 2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's with this magic number |
||
|| !areSubsequentBytesAllZero(endKey, 2)); | ||
} | ||
|
||
public static short calcNumCells(RegionInfo[] regionInfos, short totalCellCount) { | ||
if (regionInfos == null || regionInfos.length == 0) { | ||
return 0; | ||
} | ||
|
||
Set<Short> cellsInRegions = Arrays.stream(regionInfos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, I wonder if it'll be easier to represent the cells as a fixed-length BitSet. |
||
.map(region -> toCells(region.getStartKey(), region.getEndKey(), totalCellCount)) | ||
.flatMap(Set::stream).collect(Collectors.toSet()); | ||
return Shorts.checkedCast(cellsInRegions.size()); | ||
} | ||
|
||
public static Set<Short> toCells(byte[] rawStart, byte[] rawStop, short numCells) { | ||
return range(padToTwoBytes(rawStart, (byte) 0), padToTwoBytes(rawStop, (byte) -1), numCells); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've assumed no change to the rowkey? Or, how do you know that the first two bytes are the cell? You've pre-migrated the schema and hard-coded cell assignments to tenant id? |
||
} | ||
|
||
public static byte[] padToTwoBytes(byte[] key, byte pad) { | ||
if (key == null || key.length == 0) { | ||
return new byte[] { pad, pad }; | ||
} | ||
|
||
if (key.length == 1) { | ||
return new byte[] { pad, key[0] }; | ||
} | ||
|
||
return key; | ||
} | ||
|
||
public static Set<Short> range(byte[] start, byte[] stop) { | ||
return range(start, stop, MAX_CELL_COUNT); | ||
} | ||
|
||
public static Set<Short> range(byte[] start, byte[] stop, short numCells) { | ||
short stopCellId = toCell(stop, (byte) -1, (short) (numCells - 1)); | ||
if (stopCellId < 0 || stopCellId > numCells) { | ||
stopCellId = numCells; | ||
} | ||
short startCellId = toCell(start, (byte) 0, (short) 0); | ||
|
||
if (startCellId == stopCellId) { | ||
return ImmutableSet.of(startCellId); | ||
} | ||
|
||
boolean isStopExclusive = areSubsequentBytesAllZero(stop, 2); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest adding a unit test over this stop cell exclusivity stuff. I assume that you've seen that the |
||
|
||
final IntStream cellStream; | ||
if (isStopExclusive) { | ||
cellStream = IntStream.range(startCellId, stopCellId); | ||
} else { | ||
int stopCellIdForcedToIncludeStart = Math.max(stopCellId, startCellId + 1); | ||
cellStream = IntStream.rangeClosed(startCellId, stopCellIdForcedToIncludeStart); | ||
} | ||
|
||
return cellStream.mapToObj(val -> (short) val).collect(Collectors.toSet()); | ||
} | ||
|
||
private static boolean areSubsequentBytesAllZero(byte[] stop, int offset) { | ||
for (int i = offset; i < stop.length; i++) { | ||
if (stop[i] != (byte) 0) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
private static short toCell(byte[] key, byte pad, short ifAbsent) { | ||
if (key == null) { | ||
throw new IllegalArgumentException( | ||
"Key must be nonnull"); | ||
} | ||
|
||
return key.length == 0 | ||
? ifAbsent | ||
: (key.length >= 2 | ||
? Bytes.toShort(key, 0, 2) | ||
: Bytes.toShort(new byte[] { pad, key[0] })); | ||
} | ||
|
||
static class Int2IntCounterMapAdapter implements JsonSerializer<Int2IntCounterMap>, | ||
JsonDeserializer<Int2IntCounterMap> { | ||
@Override public JsonElement serialize(Int2IntCounterMap src, Type typeOfSrc, | ||
JsonSerializationContext context) { | ||
JsonObject obj = new JsonObject(); | ||
|
||
obj.addProperty("loadFactor", src.loadFactor()); | ||
obj.addProperty("initialValue", src.initialValue()); | ||
obj.addProperty("resizeThreshold", src.resizeThreshold()); | ||
obj.addProperty("size", src.size()); | ||
|
||
Field entryField = null; | ||
try { | ||
entryField = Int2IntCounterMap.class.getDeclaredField("entries"); | ||
} catch (NoSuchFieldException e) { | ||
throw new RuntimeException(e); | ||
} | ||
entryField.setAccessible(true); | ||
int[] entries = null; | ||
try { | ||
entries = (int[]) entryField.get(src); | ||
} catch (IllegalAccessException e) { | ||
throw new RuntimeException(e); | ||
} | ||
JsonArray entryArray = new JsonArray(entries.length); | ||
for (int entry : entries) { | ||
entryArray.add(entry); | ||
} | ||
obj.add("entries", entryArray); | ||
|
||
return obj; | ||
} | ||
|
||
@Override public Int2IntCounterMap deserialize(JsonElement json, Type typeOfT, | ||
JsonDeserializationContext context) throws JsonParseException { | ||
JsonObject obj = json.getAsJsonObject(); | ||
|
||
float loadFactor = obj.get("loadFactor").getAsFloat(); | ||
int initialValue = obj.get("initialValue").getAsInt(); | ||
int resizeThreshold = obj.get("resizeThreshold").getAsInt(); | ||
int size = obj.get("size").getAsInt(); | ||
|
||
JsonArray entryArray = obj.get("entries").getAsJsonArray(); | ||
int[] entries = new int[entryArray.size()]; | ||
|
||
for (int i = 0; i < entryArray.size(); i++) { | ||
entries[i] = entryArray.get(i).getAsInt(); | ||
} | ||
|
||
Int2IntCounterMap result = new Int2IntCounterMap(0, loadFactor, initialValue); | ||
|
||
Field resizeThresholdField = null; | ||
Field entryField = null; | ||
Field sizeField = null; | ||
|
||
try { | ||
resizeThresholdField = Int2IntCounterMap.class.getDeclaredField("resizeThreshold"); | ||
entryField = Int2IntCounterMap.class.getDeclaredField("entries"); | ||
sizeField = Int2IntCounterMap.class.getDeclaredField("size"); | ||
} catch (NoSuchFieldException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
resizeThresholdField.setAccessible(true); | ||
entryField.setAccessible(true); | ||
sizeField.setAccessible(true); | ||
|
||
try { | ||
resizeThresholdField.set(result, resizeThreshold); | ||
entryField.set(result, entries); | ||
sizeField.set(result, size); | ||
} catch (IllegalAccessException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
return result; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is agrona? It's already on the master classpath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure - I used this after seeing it in the balancer cluster state representation. It seems to be a fairly efficient counter map