Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBaseInputFormat tests and fix #1269

Open
wants to merge 3 commits into
base: titan11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion titan-hadoop-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<properties>
<top.level.basedir>${project.parent.basedir}</top.level.basedir>
<skipCassandra>${skipTests}</skipCassandra>
<skipHBase>true</skipHBase>
<skipHBase>${skipTests}</skipHBase>
<skipPipeline>${skipTests}</skipPipeline>
</properties>

Expand Down
22 changes: 20 additions & 2 deletions titan-hadoop-parent/titan-hadoop-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
<scope>test</scope>
<optional>true</optional>
</dependency>
<!-- Include titan-hbase-core to resolve Guava StopWatch error in HBase tests.
Can be removed when Guava version is updated in HBase -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>titan-hbase-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
Expand All @@ -50,22 +58,32 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import com.thinkaurelius.titan.hadoop.formats.util.AbstractBinaryInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
Expand All @@ -32,7 +34,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);

private final TableInputFormat tableInputFormat = new TableInputFormat();
private TableRecordReader tableReader;
private RecordReader<ImmutableBytesWritable, Result> tableReader;
private byte[] inputCFBytes;
private RecordReader<StaticBuffer, Iterable<Entry>> titanRecordReader;

Expand All @@ -43,8 +45,7 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio

@Override
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
tableReader =
(TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
titanRecordReader =
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
return titanRecordReader;
Expand Down Expand Up @@ -90,7 +91,7 @@ public void setConf(final Configuration config) {
this.tableInputFormat.setConf(config);
}

public TableRecordReader getTableReader() {
public RecordReader<ImmutableBytesWritable, Result> getTableReader() {
return tableReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
Expand All @@ -17,11 +19,11 @@

public class HBaseBinaryRecordReader extends RecordReader<StaticBuffer, Iterable<Entry>> {

private TableRecordReader reader;
private RecordReader<ImmutableBytesWritable, Result> reader;

private final byte[] edgestoreFamilyBytes;

public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
public HBaseBinaryRecordReader(final RecordReader<ImmutableBytesWritable, Result> reader, final byte[] edgestoreFamilyBytes) {
this.reader = reader;
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
}
Expand Down Expand Up @@ -52,7 +54,7 @@ public void close() throws IOException {
}

@Override
public float getProgress() {
public float getProgress() throws IOException, InterruptedException {
return this.reader.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.thinkaurelius.titan.hadoop;

import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
import com.thinkaurelius.titan.example.GraphOfTheGodsFactory;
import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public abstract class AbstractInputFormatIT extends TitanGraphBaseTest {


@Test
public void testReadGraphOfTheGods() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() throws Exception {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
}

@Test
public void testReadSelfEdge() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
TitanVertex sky = (TitanVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
}

abstract protected Graph getGraph() throws IOException, ConfigurationException;
}
Original file line number Diff line number Diff line change
@@ -1,101 +1,15 @@
package com.thinkaurelius.titan.hadoop;

import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
import com.thinkaurelius.titan.example.GraphOfTheGodsFactory;
import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class CassandraInputFormatIT extends AbstractInputFormatIT {

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class CassandraInputFormatIT extends TitanGraphBaseTest {


@Test
public void testReadGraphOfTheGods() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
}

@Test
public void testReadSelfEdge() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
TitanVertex sky = (TitanVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
protected Graph getGraph() {
return GraphFactory.open("target/test-classes/cassandra-read.properties");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.thinkaurelius.titan.hadoop;

import com.thinkaurelius.titan.HBaseStorageSetup;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class HBaseInputFormatIT extends AbstractInputFormatIT {

@BeforeClass
public static void startHBase() throws IOException, BackendException {
HBaseStorageSetup.startHBase();
}

@AfterClass
public static void stopHBase() {
// Workaround for https://issues.apache.org/jira/browse/HBASE-10312
if (VersionInfo.getVersion().startsWith("0.96"))
HBaseStorageSetup.killIfRunning();
}

protected Graph getGraph() throws IOException, ConfigurationException {
final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/hbase-read.properties");
Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation"));
baseOutDir.toFile().mkdirs();
String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString();
config.setProperty("gremlin.hadoop.outputLocation", outDir);
return GraphFactory.open(config);
}

@Override
public WriteConfiguration getConfiguration() {
return HBaseStorageSetup.getHBaseGraphConfiguration();
}
}
Loading