From 88f426beeba7d78b808d43e106243d8c6edcc5bc Mon Sep 17 00:00:00 2001 From: qaate47 Date: Tue, 7 Nov 2023 23:46:02 +0100 Subject: [PATCH] GH-462 Delta file --- qendpoint-cli/bin/qepSearch.ps1 | 2 + qendpoint-core/datastructures/deltafile.abs | 28 +++ .../qendpoint/core/enums/RDFNotation.java | 10 + .../core/enums/WikidataChangesFlavor.java | 58 ++++++ .../core/exceptions/ParserException.java | 4 + .../qendpoint/core/options/HDTOptions.java | 3 +- .../core/options/HDTOptionsKeys.java | 11 ++ .../qendpoint/core/rdf/RDFParserFactory.java | 3 + .../core/rdf/parsers/RDFDeltaFileParser.java | 182 ++++++++++++++++++ .../qendpoint/tools/QEPSearch.java | 49 ++++- .../qendpoint/utils/FormatUtils.java | 36 ++++ 11 files changed, 384 insertions(+), 2 deletions(-) create mode 100644 qendpoint-core/datastructures/deltafile.abs create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/WikidataChangesFlavor.java create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java diff --git a/qendpoint-cli/bin/qepSearch.ps1 b/qendpoint-cli/bin/qepSearch.ps1 index a8acbedd..916b0374 100644 --- a/qendpoint-cli/bin/qepSearch.ps1 +++ b/qendpoint-cli/bin/qepSearch.ps1 @@ -24,6 +24,8 @@ param( [Parameter()] [Switch] $memory, + [Switch] + $nocrc, [Parameter(ValueFromRemainingArguments, Position = 0)] [string[]] $OtherParams diff --git a/qendpoint-core/datastructures/deltafile.abs b/qendpoint-core/datastructures/deltafile.abs new file mode 100644 index 00000000..846d7990 --- /dev/null +++ b/qendpoint-core/datastructures/deltafile.abs @@ -0,0 +1,28 @@ +// delta file structure version 0, com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser + +WikidataChangesFlavor : byte { + DUMP = 0x63, + SIMPLE = 0x64, + FULL = 0x65 +} + +DeltaFile { + struct { + string magic = "$DltF0\n\r"; + long count; + long start; + long end; + WikidataChangesFlavor flavor; + byte __pad[3]; + crc8 crc; + } header; + struct { + vlong sizeName; + byte buff[sizeName]; + vlong sizeBuff; + byte buff[sizeBuff]; + } elements[header.count]; + crc32 crc; +} + +file(DeltaFile, ".*\\.df"); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/RDFNotation.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/RDFNotation.java index 648b4ecd..87cbad14 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/RDFNotation.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/RDFNotation.java @@ -89,6 +89,11 @@ public enum RDFNotation { */ JSONLD, + /** + * Delta file + */ + DELTAFILE, + /** * List of URIs with RDF content in other RDF Formats */ @@ -148,6 +153,9 @@ public static RDFNotation parse(String str) { case "trix" -> { return TRIX; } + case "df", "deltafile" -> { + return DELTAFILE; + } } throw new IllegalArgumentException(); } @@ -195,6 +203,8 @@ public static RDFNotation guess(String fileName) throws IllegalArgumentException return TRIG; } else if (str.endsWith("trix")) { return TRIX; + } else if (str.endsWith("df") || str.endsWith("deltafile")) { + return DELTAFILE; } throw new IllegalArgumentException("Could not guess the format for " + fileName); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/WikidataChangesFlavor.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/WikidataChangesFlavor.java new file mode 100644 index 00000000..6aaae917 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/enums/WikidataChangesFlavor.java @@ -0,0 +1,58 @@ +package com.the_qa_company.qendpoint.core.enums; + +import java.util.HashMap; + +public enum WikidataChangesFlavor { + /** + * Excludes descriptions of entities referred to in the data + */ + DUMP("dump", true, "Excludes descriptions of entities referred to in the data.", (byte) 0x63), + /** + * Provides only truthy statements, along with sitelinks and version + * information. + */ + SIMPLE("simple", true, "Provides only truthy statements, along with sitelinks and version information.", + (byte) 0x64), + /** + * An argument of "full" returns all data. + */ + FULL("full", false, "An argument of \"full\" returns all data.", (byte) 0x65); + + private static final HashMap FLAVOR_HASH_MAP = new HashMap<>(); + + static { + for (WikidataChangesFlavor fl : values()) { + FLAVOR_HASH_MAP.put(fl.id, fl); + } + } + + public final String title; + public final boolean shouldSpecify; + public final String description; + public final byte id; + + WikidataChangesFlavor(String title, boolean shouldSpecify, String description, byte id) { + this.title = title; + this.shouldSpecify = shouldSpecify; + this.description = description; + this.id = id; + } + + /** + * @return the default flavor + */ + public static WikidataChangesFlavor getDefaultFlavor() { + return FULL; + } + + /** + * get a flavor from its id + * + * @param id id + * @return flavor or null + */ + public static WikidataChangesFlavor getFromId(byte id) { + return FLAVOR_HASH_MAP.get(id); + } + +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/exceptions/ParserException.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/exceptions/ParserException.java index 01194582..1a1b02f8 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/exceptions/ParserException.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/exceptions/ParserException.java @@ -40,6 +40,10 @@ public ParserException(String message) { super(message); } + public ParserException(String message, Throwable e) { + super(message, e); + } + public ParserException(String message, String line, int location) { this(createMessage(message, line, location)); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java index 260947bf..873f0d5a 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptions.java @@ -23,6 +23,7 @@ import com.the_qa_company.qendpoint.core.hdt.HDTManager; import com.the_qa_company.qendpoint.core.rdf.RDFFluxStop; import com.the_qa_company.qendpoint.core.util.Profiler; +import com.the_qa_company.qendpoint.core.util.UnicodeEscape; import java.io.File; import java.io.IOException; @@ -617,7 +618,7 @@ default void write(Writer w, boolean withComment) throws IOException { w.write("# " + opt.getKeyInfo().desc() + "\n# Type: " + opt.getKeyInfo().type().getTitle() + "\n"); } } - w.write(key + "=" + value + "\n"); + w.write(key + "=" + UnicodeEscape.escapeString(value) + "\n"); } } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsKeys.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsKeys.java index 1d0eed3b..d53ae4ae 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsKeys.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsKeys.java @@ -260,6 +260,17 @@ public class HDTOptionsKeys { */ @Key(type = Key.Type.BOOLEAN, desc = "Use the canonical NT file parser, removing checks") public static final String NT_SIMPLE_PARSER_KEY = "parser.ntSimpleParser"; + /** + * No crc check with deltafile reader, default to false. Boolean value + */ + @Key(type = Key.Type.BOOLEAN, desc = "No crc check with deltafile reader") + public static final String PARSER_DELTAFILE_NO_CRC = "parser.deltafile.nocrc"; + /** + * No exception, only a stop with deltafile reader, default to false. + * Boolean value + */ + @Key(type = Key.Type.BOOLEAN, desc = "No exception, only a stop with deltafile reader") + public static final String PARSER_DELTAFILE_NO_EXCEPTION = "parser.deltafile.noExceptionOnlyStop"; /** * Key for setting the maximum amount of file loaded with the directory * parser, 1 for no async parsing, 0 for the number of processors, default diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java index f1101e77..6653451f 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java @@ -23,6 +23,7 @@ import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; +import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserDir; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserHDT; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserList; @@ -72,6 +73,8 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio return new RDFParserRAR(spec); case HDT: return new RDFParserHDT(); + case DELTAFILE: + return new RDFDeltaFileParser(spec); case JSONLD: // FIXME: Implement throw new NotImplementedException("RDFParserJSONLD not implemented"); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java new file mode 100644 index 00000000..5fe00558 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java @@ -0,0 +1,182 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import com.the_qa_company.qendpoint.core.enums.RDFNotation; +import com.the_qa_company.qendpoint.core.enums.WikidataChangesFlavor; +import com.the_qa_company.qendpoint.core.exceptions.ParserException; +import com.the_qa_company.qendpoint.core.iterator.utils.FetcherExceptionIterator; +import com.the_qa_company.qendpoint.core.listener.ProgressListener; +import com.the_qa_company.qendpoint.core.options.HDTOptions; +import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; +import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; +import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory; +import com.the_qa_company.qendpoint.core.util.crc.CRC32; +import com.the_qa_company.qendpoint.core.util.crc.CRC8; +import com.the_qa_company.qendpoint.core.util.crc.CRCInputStream; +import com.the_qa_company.qendpoint.core.util.io.IOUtil; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.Arrays; +import java.util.zip.GZIPInputStream; + +import static java.nio.charset.StandardCharsets.US_ASCII; + +public class RDFDeltaFileParser implements RDFParserCallback { + public static final byte[] COOKIE = "$DltF0\n\r".getBytes(US_ASCII); + + public record DeltaFileComponent(String fileName, byte[] data) {} + + public static class DeltaFileReader extends FetcherExceptionIterator + implements Closeable { + private final long count; + private long id; + private final InputStream stream; + private final long start; + private final long end; + private final WikidataChangesFlavor flavor; + private boolean noExceptionOnlyStop; + + public DeltaFileReader(InputStream is, HDTOptions spec) throws IOException { + boolean nocrc = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_CRC, false); + noExceptionOnlyStop = spec.getBoolean(HDTOptionsKeys.PARSER_DELTAFILE_NO_EXCEPTION, false); + + stream = nocrc ? is : new CRCInputStream(is, new CRC8()); + + if (!Arrays.equals(stream.readNBytes(8), COOKIE)) { + throw new IOException("Bad cookie"); + } + + this.count = IOUtil.readLong(stream); + this.start = IOUtil.readLong(stream); + this.end = IOUtil.readLong(stream); + this.flavor = WikidataChangesFlavor.getFromId((byte) stream.read()); + if (flavor == null) { + throw new IOException("Bad flavor"); + } + stream.skipNBytes(3); + + if (!nocrc) { + CRCInputStream crcis = (CRCInputStream) stream; + if (!crcis.readCRCAndCheck()) { + throw new IOException("Bad header crc"); + } + crcis.setCRC(new CRC32()); + } else { + stream.skipNBytes(1); // skip header crc + } + } + + public Instant getStart() { + return Instant.ofEpochSecond(start / 1000000, (start % 1000000) * 1000); + } + + public Instant getEnd() { + return Instant.ofEpochSecond(end / 1000000, (end % 1000000) * 1000); + } + + @Override + public long getSize() { + return count; + } + + public void setNoExceptionOnlyStop(boolean noExceptionOnlyStop) { + this.noExceptionOnlyStop = noExceptionOnlyStop; + } + + public WikidataChangesFlavor getFlavor() { + return flavor; + } + + @Override + protected DeltaFileComponent getNext() throws IOException { + if (id >= count) { + if (id == count) { // last + id++; + try { + if (stream instanceof CRCInputStream crcis) { + if (!crcis.readCRCAndCheck()) { + throw new IOException("Bad data crc!"); + } + } else { + stream.readNBytes(4); // read crc + } + } catch (Throwable t) { + if (noExceptionOnlyStop) { + return null; + } + throw t; + } + } + return null; + } + id++; + + try { + // name + byte[] name = IOUtil.readSizedBuffer(stream, ProgressListener.ignore()); // title + // + + // .ttl? + // buffer + byte[] bytes = IOUtil.readSizedBuffer(stream, ProgressListener.ignore()); + + return new DeltaFileComponent(new String(name, US_ASCII), bytes); + } catch (Throwable e) { + if (noExceptionOnlyStop) { + return null; + } + throw e; + } + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + + private final HDTOptions spec; + + public RDFDeltaFileParser(HDTOptions spec) { + this.spec = spec; + } + + @Override + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) + throws ParserException { + try (InputStream is = IOUtil.getFileInputStream(fileName)) { + doParse(is, baseUri, notation, keepBNode, callback); + } catch (IOException e) { + throw new ParserException(e); + } + } + + @Override + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) + throws ParserException { + try { + // read df file + DeltaFileReader reader = new DeltaFileReader(in, spec); + while (reader.hasNext()) { + DeltaFileComponent next = reader.next(); + if (next.data.length == 0) { + continue; // deleted + } + RDFNotation not = RDFNotation.guess(next.fileName); + RDFParserCallback parser = RDFParserFactory.getParserCallback(not, spec); + try { + // read the next byte information + parser.doParse(new GZIPInputStream(new ByteArrayInputStream(next.data)), baseUri, not, keepBNode, + callback); + } catch (IOException e) { + throw new ParserException("Error when reading " + next.fileName + " size: " + next.data.length, e); + } + } + } catch (IOException e) { + throw new ParserException(e); + } + } + +} diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/tools/QEPSearch.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/tools/QEPSearch.java index d7f3eec8..a66c4aaf 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/tools/QEPSearch.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/tools/QEPSearch.java @@ -16,6 +16,7 @@ import com.the_qa_company.qendpoint.core.hdt.HDTVersion; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; +import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser; import com.the_qa_company.qendpoint.core.tools.HDTVerify; import com.the_qa_company.qendpoint.core.triples.IteratorTripleString; import com.the_qa_company.qendpoint.core.triples.TripleString; @@ -32,6 +33,7 @@ import com.the_qa_company.qendpoint.model.SimpleLiteralHDT; import com.the_qa_company.qendpoint.store.EndpointStore; import com.the_qa_company.qendpoint.store.HDTConverter; +import com.the_qa_company.qendpoint.utils.FormatUtils; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; @@ -46,15 +48,19 @@ import org.eclipse.rdf4j.sail.helpers.AbstractNotifyingSail; import org.eclipse.rdf4j.sail.nativerdf.NativeStore; +import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,6 +110,8 @@ public class QEPSearch { public String searchCfg; @Parameter(names = "-binindex", description = "Prints bin index if implemented") public boolean showBinIndex; + @Parameter(names = "-nocrc", description = "Avoid CRC checks") + public boolean noCRC; public String input; @@ -908,7 +916,13 @@ public void execute() throws IOException { } else if (path.getFileName().toString().endsWith(".prof")) { type = "profiler"; } else { - throw new IllegalArgumentException("Can't guess type for store " + path + "!"); + byte[] cookie = FormatUtils.readCookie(path, 8); + // search for the right magic + if (Arrays.equals(cookie, "$DltF0\n\r".getBytes(UTF_8))) { + type = "deltafile"; + } else { + throw new IllegalArgumentException("Can't guess type for store " + path + "!"); + } } } else { type = this.type.toLowerCase(); @@ -920,6 +934,7 @@ public void execute() throws IOException { case "hdt" -> executeHDT(); case "reader" -> executeReader(); case "profiler" -> executeProfiler(); + case "deltafile" -> executeDeltaFile(); default -> throw new IllegalArgumentException("Can't understand store of type " + this.type + "!"); } } @@ -1006,6 +1021,38 @@ private void executeProfiler() throws IOException { } } + private void executeDeltaFile() throws IOException { + Path file = Path.of(input); + MultiThreadListenerConsole console = colorTool.getConsole(); + + HDTOptions spec = HDTOptions.of(HDTOptionsKeys.PARSER_DELTAFILE_NO_CRC, noCRC, + HDTOptionsKeys.PARSER_DELTAFILE_NO_EXCEPTION, true); + + try (RDFDeltaFileParser.DeltaFileReader reader = new RDFDeltaFileParser.DeltaFileReader( + new BufferedInputStream(Files.newInputStream(file)), spec)) { + + console.printLine(console.color(5, 5, 1) + "files .. " + console.colorReset() + reader.getSize()); + console.printLine(console.color(5, 5, 1) + "start .. " + console.colorReset() + reader.getStart()); + console.printLine(console.color(5, 5, 1) + "end .... " + console.colorReset() + reader.getEnd()); + console.printLine(console.color(5, 5, 1) + "flavor . " + console.colorReset() + reader.getFlavor()); + + long i = 0; + long size = reader.getSize(); + while (reader.hasNext()) { + i++; + RDFDeltaFileParser.DeltaFileComponent comp = reader.next(); + + console.notifyProgress((float) (i * 1000 / size) / 10, "reading files " + i + "/" + size + ": " + + console.color(2, 2, 2) + comp.fileName() + console.colorReset()); + } + if (i != size) { + console.printLine(console.color(5, 1, 1) + "Error, not everything was read: " + i + " != " + size + " " + + (100 * i / size) + "%"); + } + console.notifyProgress(100, "done"); + } + } + private void executeReader() throws IOException { colorTool.log("Reader REPL"); diff --git a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/FormatUtils.java b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/FormatUtils.java index c1cc1a74..74aa956b 100644 --- a/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/FormatUtils.java +++ b/qendpoint-store/src/main/java/com/the_qa_company/qendpoint/utils/FormatUtils.java @@ -1,12 +1,18 @@ package com.the_qa_company.qendpoint.utils; +import com.the_qa_company.qendpoint.core.options.ControlInfo; +import com.the_qa_company.qendpoint.core.options.ControlInformation; import org.eclipse.rdf4j.query.resultio.QueryResultFormat; import org.eclipse.rdf4j.query.resultio.TupleQueryResultWriterRegistry; import org.eclipse.rdf4j.rio.RDFFormat; import org.eclipse.rdf4j.rio.Rio; +import java.io.BufferedInputStream; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Comparator; @@ -71,4 +77,34 @@ public static Optional getRDFWriterFormat(String acceptHeader) { return sortedFormat(acceptHeader).map(m -> Rio.getWriterFormatForMIMEType(m.mime)).filter(Optional::isPresent) .map(Optional::get).findFirst(); } + + /** + * Read n bytes from the start of a file + * + * @param file file + * @param size n + * @return n bytes, or null if error + */ + public static byte[] readCookie(Path file, int size) { + try (InputStream stream = Files.newInputStream(file)) { + return (size > 0x1000 ? new BufferedInputStream(stream) : stream).readNBytes(size); + } catch (IOException e) { + return new byte[0]; + } + } + + /** + * Read control info from the start of a file + * + * @param file file + * @return ci + * @throws IOException ioe + */ + public static ControlInfo readCookieInfo(Path file) throws IOException { + ControlInformation ci = new ControlInformation(); + try (InputStream stream = Files.newInputStream(file)) { + ci.load(stream); + } + return ci; + } }