Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-462 add support for delta file #463

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions qendpoint-cli/bin/qepSearch.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ param(
[Parameter()]
[Switch]
$memory,
[Switch]
$nocrc,
[Parameter(ValueFromRemainingArguments, Position = 0)]
[string[]]
$OtherParams
Expand Down
28 changes: 28 additions & 0 deletions qendpoint-core/datastructures/deltafile.abs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// delta file structure version 0, com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser

WikidataChangesFlavor : byte {
DUMP = 0x63,
SIMPLE = 0x64,
FULL = 0x65
}

DeltaFile {
struct {
string magic = "$DltF0\n\r";
long count;
long start;
long end;
WikidataChangesFlavor flavor;
byte __pad[3];
crc8 crc;
} header;
struct {
vlong sizeName;
byte buff[sizeName];
vlong sizeBuff;
byte buff[sizeBuff];
} elements[header.count];
crc32 crc;
}

file(DeltaFile, ".*\\.df");
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public enum RDFNotation {
*/
JSONLD,

/**
* Delta file
*/
DELTAFILE,

/**
* List of URIs with RDF content in other RDF Formats
*/
Expand Down Expand Up @@ -148,6 +153,9 @@ public static RDFNotation parse(String str) {
case "trix" -> {
return TRIX;
}
case "df", "deltafile" -> {
return DELTAFILE;
}
}
throw new IllegalArgumentException();
}
Expand Down Expand Up @@ -195,6 +203,8 @@ public static RDFNotation guess(String fileName) throws IllegalArgumentException
return TRIG;
} else if (str.endsWith("trix")) {
return TRIX;
} else if (str.endsWith("df") || str.endsWith("deltafile")) {
return DELTAFILE;
}

throw new IllegalArgumentException("Could not guess the format for " + fileName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.the_qa_company.qendpoint.core.enums;

import java.util.HashMap;

public enum WikidataChangesFlavor {
/**
* Excludes descriptions of entities referred to in the data
*/
DUMP("dump", true, "Excludes descriptions of entities referred to in the data.", (byte) 0x63),
/**
* Provides only truthy statements, along with sitelinks and version
* information.
*/
SIMPLE("simple", true, "Provides only truthy statements, along with sitelinks and version information.",
(byte) 0x64),
/**
* An argument of "full" returns all data.
*/
FULL("full", false, "An argument of \"full\" returns all data.", (byte) 0x65);

private static final HashMap<Byte, WikidataChangesFlavor> FLAVOR_HASH_MAP = new HashMap<>();

static {
for (WikidataChangesFlavor fl : values()) {
FLAVOR_HASH_MAP.put(fl.id, fl);
}
}

public final String title;
public final boolean shouldSpecify;
public final String description;
public final byte id;

WikidataChangesFlavor(String title, boolean shouldSpecify, String description, byte id) {
this.title = title;
this.shouldSpecify = shouldSpecify;
this.description = description;
this.id = id;
}

/**
* @return the default flavor
*/
public static WikidataChangesFlavor getDefaultFlavor() {
return FULL;
}

/**
* get a flavor from its id
*
* @param id id
* @return flavor or null
*/
public static WikidataChangesFlavor getFromId(byte id) {
return FLAVOR_HASH_MAP.get(id);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public ParserException(String message) {
super(message);
}

public ParserException(String message, Throwable e) {
super(message, e);
}

public ParserException(String message, String line, int location) {
this(createMessage(message, line, location));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.rdf.RDFFluxStop;
import com.the_qa_company.qendpoint.core.util.Profiler;
import com.the_qa_company.qendpoint.core.util.UnicodeEscape;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -617,7 +618,7 @@ default void write(Writer w, boolean withComment) throws IOException {
w.write("# " + opt.getKeyInfo().desc() + "\n# Type: " + opt.getKeyInfo().type().getTitle() + "\n");
}
}
w.write(key + "=" + value + "\n");
w.write(key + "=" + UnicodeEscape.escapeString(value) + "\n");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.BOOLEAN, desc = "Use the canonical NT file parser, removing checks")
public static final String NT_SIMPLE_PARSER_KEY = "parser.ntSimpleParser";
/**
* No crc check with deltafile reader, default to false. Boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "No crc check with deltafile reader")
public static final String PARSER_DELTAFILE_NO_CRC = "parser.deltafile.nocrc";
/**
* No exception, only a stop with deltafile reader, default to false.
* Boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "No exception, only a stop with deltafile reader")
public static final String PARSER_DELTAFILE_NO_EXCEPTION = "parser.deltafile.noExceptionOnlyStop";
/**
* Key for setting the maximum amount of file loaded with the directory
* parser, 1 for no async parsing, 0 for the number of processors, default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserDir;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserHDT;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserList;
Expand Down Expand Up @@ -72,6 +73,8 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio
return new RDFParserRAR(spec);
case HDT:
return new RDFParserHDT();
case DELTAFILE:
return new RDFDeltaFileParser(spec);
case JSONLD:
// FIXME: Implement
throw new NotImplementedException("RDFParserJSONLD not implemented");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.the_qa_company.qendpoint.core.rdf.parsers;

import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.enums.WikidataChangesFlavor;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.iterator.utils.FetcherExceptionIterator;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
import com.the_qa_company.qendpoint.core.util.crc.CRC8;
import com.the_qa_company.qendpoint.core.util.crc.CRCInputStream;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;

import static java.nio.charset.StandardCharsets.US_ASCII;

public class RDFDeltaFileParser implements RDFParserCallback {
public static final byte[] COOKIE = "$DltF0\n\r".getBytes(US_ASCII);

public record DeltaFileComponent(String fileName, byte[] data) {}

public static class DeltaFileReader extends FetcherExceptionIterator<DeltaFileComponent, IOException>
implements Closeable {
private final long count;
private long id;
private final InputStream stream;
private final long start;
private final long end;
private final WikidataChangesFlavor flavor;
private boolean noExceptionOnlyStop;

public DeltaFileReader(InputStream is, HDTOptions spec) throws IOException {
boolean nocrc = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_CRC, false);
noExceptionOnlyStop = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_EXCEPTION, false);

stream = nocrc ? is : new CRCInputStream(is, new CRC8());

if (!Arrays.equals(stream.readNBytes(8), COOKIE)) {
throw new IOException("Bad cookie");
}

this.count = IOUtil.readLong(stream);
this.start = IOUtil.readLong(stream);
this.end = IOUtil.readLong(stream);
this.flavor = WikidataChangesFlavor.getFromId((byte) stream.read());
if (flavor == null) {
throw new IOException("Bad flavor");
}
stream.skipNBytes(3);

if (!nocrc) {
CRCInputStream crcis = (CRCInputStream) stream;
if (!crcis.readCRCAndCheck()) {
throw new IOException("Bad header crc");
}
crcis.setCRC(new CRC32());
} else {
stream.skipNBytes(1); // skip header crc
}
}

public Instant getStart() {
return Instant.ofEpochSecond(start / 1000000, (start % 1000000) * 1000);
}

public Instant getEnd() {
return Instant.ofEpochSecond(end / 1000000, (end % 1000000) * 1000);
}

@Override
public long getSize() {
return count;
}

public void setNoExceptionOnlyStop(boolean noExceptionOnlyStop) {
this.noExceptionOnlyStop = noExceptionOnlyStop;
}

public WikidataChangesFlavor getFlavor() {
return flavor;
}

@Override
protected DeltaFileComponent getNext() throws IOException {
if (id >= count) {
if (id == count) { // last
id++;
try {
if (stream instanceof CRCInputStream crcis) {
if (!crcis.readCRCAndCheck()) {
throw new IOException("Bad data crc!");
}
} else {
stream.readNBytes(4); // read crc
}
} catch (Throwable t) {
if (noExceptionOnlyStop) {
return null;
}
throw t;
}
}
return null;
}
id++;

try {
// name
byte[] name = IOUtil.readSizedBuffer(stream, ProgressListener.ignore()); // title
// +
// .ttl?
// buffer
byte[] bytes = IOUtil.readSizedBuffer(stream, ProgressListener.ignore());

return new DeltaFileComponent(new String(name, US_ASCII), bytes);
} catch (Throwable e) {
if (noExceptionOnlyStop) {
return null;
}
throw e;
}
}

@Override
public void close() throws IOException {
stream.close();
}
}

private final HDTOptions spec;

public RDFDeltaFileParser(HDTOptions spec) {
this.spec = spec;
}

@Override
public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback)
throws ParserException {
try (InputStream is = IOUtil.getFileInputStream(fileName)) {
doParse(is, baseUri, notation, keepBNode, callback);
} catch (IOException e) {
throw new ParserException(e);
}
}

@Override
public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback)
throws ParserException {
try {
// read df file
DeltaFileReader reader = new DeltaFileReader(in, spec);
while (reader.hasNext()) {
DeltaFileComponent next = reader.next();
if (next.data.length == 0) {
continue; // deleted
}
RDFNotation not = RDFNotation.guess(next.fileName);
RDFParserCallback parser = RDFParserFactory.getParserCallback(not, spec);
try {
// read the next byte information
parser.doParse(new GZIPInputStream(new ByteArrayInputStream(next.data)), baseUri, not, keepBNode,
callback);
} catch (IOException e) {
throw new ParserException("Error when reading " + next.fileName + " size: " + next.data.length, e);
}
}
} catch (IOException e) {
throw new ParserException(e);
}
}

}
Loading
Loading