Skip to content

Commit

Permalink
Additional fixes
Browse files Browse the repository at this point in the history
Remove debug() output commands from a lot of places. They are actually
more confusing than helpful.

Indexer class
- Changed the default block interval from 512 to 2048
- Added the postponeWrite field. This comes into play, when an empty
  block occurs. Empty blocks will not be used for index entries anymore
  as they caused a lot of problems leading to faulty results.
- Added a lot of debug code which can help a developer to better
  understand, how the index is created. Fields and methods are:
  - writeOutOfPartialDecompressedBlocks
  - storageForDecompressedBlocks
  - writeOutOfPartialDecompressedBlocks
  - storageForPartialDecompressedBlocks
  - partialBlockinfoStream
  - enableWriteOutOfDecompressedBlocksAndStatistics()
  - enableWriteOutOfPartialDecompressedBlocks()
- Extracted the methods createIndexEntryFromBlockData(),
  storeDictionaryForEntry() and writeIndexEntryIfPossible() to make the
  code more readable.
- Reworked finalizeProcessingForCurrentBlock() to make it more readable
  and also make it use and therefore output the above mentioned debug
  information.

Extractor class
- Added the roundtripBuffer field, which will be used later to enable:
  - error recognition, if the extract data is not made up of
    extractionMultiplier sized records
  - complete output with at least empty entries for a record.
  However, this is not yet used.
- Removed the skipLines field, as the whole mechanism was reworked and
  both index and extract should now produce proper results.
- Reworked processDecompressedChunkOfData(), it is now much easier to
  read and a lot less complicated.

Reworked the Starter to accept more command line options and pass them
to the specific runners.

Changed ZLibBasedFASTQProcessorBaseClass.splitStr so, that it does no
longer add an empty line to the back of the list, if the last char is a
newline character.

TestResourcesAndFunctions
- Add readLinesOfFile(), readFile(), readLinesOfResourceFile(),
  readResourceFile(), compareVectorContent() and
  getTestVectorWithSimulatedBlockData() to reduce code duplications and
  improve test readability. Code is taken from IndexerTest and
  ExtractorTest.
- The method getTestVectorWithSimulatedBlockData() will return a vector
  with files for simulating decompressed gzip block data. This tries to
  mimic common pitfalls like small blocks, empty blocks, blocks without
  newlines and so on.

IndexerTest
- Add a test for correct block line counting which uses the
  aforementioned block data from getTestVectorWithSimulatedBlockData()
- Move code to the aforementioned methods readLinesOfFile() and so on...

ExtractorTest
- Analogous to IndexerTest, add a test for correct line from block data
  extraction. This will also use getTestVectorWithSimulatedBlockData()
- Like in IndexerTest, use the extracted methods for file reading and
  vector comparison.

Add the blockAndLineCalculations test resource directory. This contains
several files with "decompressed" block data and a file which describes
the layout of the joined block files.
  • Loading branch information
heinold committed Jun 7, 2019
1 parent 3a2fa78 commit 16851f4
Show file tree
Hide file tree
Showing 29 changed files with 625 additions and 214 deletions.
13 changes: 8 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
.idea/workspace.xml
*.a
test/Testing
CMakeFiles
*.cmake
*.cbp
Makefile
*.fastq
*.fq
CMakeCache.txt
CMakeFiles
Makefile
Testing
cmake-build-debug/
release
debug
cmake-build-*
src/fastqindex
test/testapp

test/Testing
9 changes: 0 additions & 9 deletions src/ActualRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,16 @@ using experimental::filesystem::path;
ActualRunner::ActualRunner(const path &fastqfile, const path &indexfile) {
this->fastqFile = make_shared<PathInputSource>(fastqfile);
this->indexFile = indexfile;
debug(string(
"Created runner with fastq file: \"" + fastqfile.string() + "\" and index \"" + indexfile.string() + "\""));
}

ActualRunner::ActualRunner(istream *fastqStream, const path &indexfile) {
this->fastqFile = make_shared<StreamInputSource>(fastqStream);
this->indexFile = indexfile;
debug(string("Created runner with input stream and index \"" + indexfile.string() + "\""));
}

ActualRunner::ActualRunner(const shared_ptr<InputSource> &fastqfile, const path &indexfile) {
this->fastqFile = fastqfile;
this->indexFile = indexfile;
if (fastqfile->isStreamSource())
debug(string("Created runner with input stream and index \"" + indexfile.string() + "\""));
else
debug(string("Created runner with fastq file: \"" +
dynamic_pointer_cast<PathInputSource>(fastqfile)->getPath().string() +
"\" and index \"" + indexfile.string() + "\""));
}

bool ActualRunner::checkPremises() {
Expand Down
97 changes: 49 additions & 48 deletions src/Extractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ Extractor::Extractor(
this->indexReader = make_shared<IndexReader>(indexfile);
this->resultFile = resultfile;
this->forceOverwrite = forceOverwrite;
this->extractionMulitplier = extractionMulitplier == 0 ? 4 : extractionMulitplier;
this->extractionMultiplier = extractionMulitplier == 0 ? 4 : extractionMulitplier;
this->roundtripBuffer = new string[extractionMulitplier];

if (resultFile.empty() || resultFile.generic_u8string() == "-") {
this->forceOverwrite = false;
Expand All @@ -48,6 +49,8 @@ Extractor::Extractor(
}
}

Extractor::~Extractor() { delete[] roundtripBuffer; }

bool Extractor::checkPremises() {
if (lineCount == 0) {
addErrorMessage("Can't extract a line count of 0 lines. The value needs to be a positive number.");
Expand Down Expand Up @@ -98,12 +101,6 @@ bool Extractor::extract() {
out = &outfilestream;
}

if (startingLine >= extractionMulitplier) {
startingLine -= extractionMulitplier;
skipLines = extractionMulitplier;
lineCount += extractionMulitplier;
}

timerStart();

shared_ptr<IndexEntry> previousEntry = indexReader->readIndexEntry();
Expand All @@ -117,7 +114,6 @@ bool Extractor::extract() {
break;
}
startingIndexLine = entry;
// previousEntry = entry;
}

timerRestart("Index entry search");
Expand Down Expand Up @@ -185,7 +181,7 @@ bool Extractor::extract() {
if (!decompressNextChunkOfData(checkForStreamEnd, Z_NO_FLUSH))
break;

if (!processDecompressedData(out, startingIndexLine))
if (!processDecompressedChunkOfData(out, currentDecompressedBlock.str(), startingIndexLine))
continue;

// Tell the extractor, that the inner loop was called at least once, so we don't remove the first line of
Expand Down Expand Up @@ -242,67 +238,72 @@ bool Extractor::checkAndPrepareForNextConcatenatedPart(bool finalAbort) {
return true;
}

bool Extractor::processDecompressedData(ostream *out, const shared_ptr<IndexEntry> &startingIndexLine) {
vector<string> splitLines;
string str = currentDecompressedBlock.str();
splitLines = splitStr(str);
bool
Extractor::processDecompressedChunkOfData(ostream *out, string str, const shared_ptr<IndexEntry> &startingIndexLine) {
if (extractedLines >= lineCount)
return false;
vector<string> splitLines = splitStr(str);
totalSplitCount += splitLines.size();

// In the case, that we invoke this method the first time, the index entry
bool removeIncompleteFirstLine = firstPass && startingIndexLine->offsetOfFirstValidLine > 0;
if (removeIncompleteFirstLine) splitLines.erase(splitLines.begin());
firstPass = false;

// Strip away incomplete last line, store this line for the next block.
string curIncompleteLastLine;
// Strip away incomplete line, store this line for the next block.
string lastSplitLine;
if (!splitLines.empty())
lastSplitLine = splitLines[splitLines.size() - 1];

char lastChar{0};
if (!lastSplitLine.empty())
lastChar = lastSplitLine.c_str()[lastSplitLine.size() - 1];
if (!str.empty())
lastChar = str[str.size() - 1];
if (lastChar != '\n') {
curIncompleteLastLine = lastSplitLine;
if (!splitLines.empty())
if (!splitLines.empty()) {
curIncompleteLastLine = splitLines[splitLines.size() - 1];
splitLines.pop_back();
}
totalSplitCount--;
}

// Even if the split string fails, there could still be a newline in the string. Add this and continue.
if (splitLines.empty()) {
incompleteLastLine = incompleteLastLine + curIncompleteLastLine;
incompleteLastLine += curIncompleteLastLine;
return false;
}

// Two options. Extraction began earlier OR we are processing another chunk of data.
u_int64_t iStart = 0;
if (extractedLines == 0) {
if (startingIndexLine->offsetOfFirstValidLine > 0 && firstPass) {
if (!splitLines.empty()) splitLines.erase(splitLines.begin());
}
iStart = skip;
} else {
if (!incompleteLastLine.empty() && extractedLines < lineCount) {
storeOrOutputLine(out, &skipLines, incompleteLastLine + splitLines[0]);
bool result = true;
// Basically two cases, first case, we have enough data here and can output something, or we skip the whole chunk
if (skip >= splitLines.size()) { // Ignore
result = false;
} else { // Output
u_int64_t iStart = skip;
if (iStart == 0) { // We start right away, also use incompleteLastLine.
storeOrOutputLine(out, incompleteLastLine + splitLines[0]);
extractedLines++;
iStart = 1;
}
}
// iStart is 0 or 1
for (int i = iStart; i < splitLines.size() && extractedLines < lineCount; ++i) {
storeOrOutputLine(out, &skipLines, splitLines[i]);
extractedLines++;
// iStart is 0 or 1
for (int i = iStart; i < splitLines.size() && extractedLines < lineCount; ++i) {
storeOrOutputLine(out, splitLines[i]);
extractedLines++;
}
}
incompleteLastLine = curIncompleteLastLine;
if (skip > 0) skip -= min(splitLines.size(), skip);

return true;
return result;
}

void Extractor::storeOrOutputLine(ostream *outStream, uint64_t *skipLines, string line) {
if (*skipLines > 0) {
(*skipLines)--;
debug(string("skip line: ") + line);
} else {
if (enableDebugging)
storedLines.emplace_back(line);
else
(*outStream) << line << "\n";
}
void Extractor::storeOrOutputLine(ostream *outStream, string line) {
roundtripBufferPosition = extractedLines % extractionMultiplier;
roundtripBuffer[roundtripBufferPosition] = line;
// if (roundtripBufferPosition != extractionMultiplier - 1) return;

// for (int i = 0; i < extractionMultiplier; i++) {
if (enableDebugging)
storedLines.emplace_back(line);
else
(*outStream) << line << "\n";
// }
}

void Extractor::storeLinesOfCurrentBlockForDebugMode() {
Expand Down
43 changes: 35 additions & 8 deletions src/Extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Extractor : public ZLibBasedFASTQProcessorBaseClass {

u_int64_t lineCount;

uint extractionMulitplier;
uint extractionMultiplier;

path resultFile;

Expand All @@ -55,10 +55,9 @@ class Extractor : public ZLibBasedFASTQProcessorBaseClass {
// Keep track of all split lines. Merely for debugging
u_int64_t totalSplitCount = 0;

/**
* Lines to skip, when the extraction starts. This is for a bugfix...
*/
uint64_t skipLines = 0;
string *roundtripBuffer = nullptr;

uint roundtripBufferPosition = 0;

public:

Expand All @@ -75,25 +74,53 @@ class Extractor : public ZLibBasedFASTQProcessorBaseClass {
bool forceOverwrite, u_int64_t startingLine, u_int64_t lineCount, uint extractionMulitplier,
bool enableDebugging);

virtual ~Extractor() = default;
virtual ~Extractor();;

/**
* Will call tryOpenAndReadHeader on the internal indexReader.
*/
bool checkPremises();

/**
* For debugging and testing , will be overriden by extract(). Sets the initial amount of lines which will be
* omitted.
* TODO Move to test-aware subclass.
*/
void setSkip(uint skip) {
this->skip = skip;
}

/**
* For debugging and testing , will be overriden by extract(). Sets the initial amount of lines which will be
* omitted.
* TODO Move to test-aware subclass.
*/
void setFirstPass(bool firstPass) {
this->firstPass = firstPass;
}

/**
* For now directly to cout?
* @param start
* @param count
*/
bool extract();

bool processDecompressedData(ostream *out, const shared_ptr<IndexEntry> &startingIndexLine);
/**
* Process a decompressed chunk (NOT a complete decompressed block!) of data.
*
* Will reset the firstPass variable, if called for the first time.
*
* @param out The stream to put the data to
* @param str The string of decompressed chunk data
* @param startingIndexLine The index entry which was used to step into the gzip file.
* @return true, if something was written out or false otherwise.
*/
bool processDecompressedChunkOfData(ostream *out, string str, const shared_ptr<IndexEntry> &startingIndexLine);

bool checkAndPrepareForNextConcatenatedPart(bool finalAbort);

void storeOrOutputLine(ostream *outStream, uint64_t *skipLines, string line);
void storeOrOutputLine(ostream *outStream, string line);

void storeLinesOfCurrentBlockForDebugMode();

Expand Down
1 change: 0 additions & 1 deletion src/IndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ bool IndexWriter::writeIndexHeader(const shared_ptr<IndexHeader> &header) {
}

bool IndexWriter::writeIndexEntry(const shared_ptr<IndexEntryV1> &entry) {
debug("Write index entry to index file.");
if (!this->writerIsOpen) {
// Throw assertion errors? Would actually be better right?
addErrorMessage("Could not write index entry to index file, writer is not open.");
Expand Down
Loading

0 comments on commit 16851f4

Please sign in to comment.