Skip to content

Commit

Permalink
rewrote to use inputstreams and added .tar.gz
Browse files Browse the repository at this point in the history
  • Loading branch information
jgwoolley committed Jan 8, 2025
1 parent e04aefd commit c9cb44a
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 128 deletions.
1 change: 1 addition & 0 deletions README.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ I've developed several versions of this project over the years, and created a We

== TODO

* Can pass variables with asciidoc link:https://stackoverflow.com/questions/63622953/asciidoctor-activate-substitution-in-cli-attributes[]
* Remove concept of version as provided by CLI arg, or make version and prefix additional options for configuring how FlowFileStream(s) are parsed.
* Add a seperate file metadata section w/ uuids, and connect parents w/ uuids, and flowfiles to uuids
* Add JavaDocs Comments to page to stop "warning: no comment" errors.
Expand Down
209 changes: 98 additions & 111 deletions src/main/java/com/yelloowstone/nf2t/cli/App.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.yelloowstone.nf2t.cli;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -18,7 +15,12 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFileUnpackager;
Expand Down Expand Up @@ -53,136 +55,103 @@ public App() {
this.mapper = new ObjectMapper();
}

private void unpackageFlowFileInputStream(final FlowFileStreamResult result,
final Path flowFilePath,
final InputStream in) {
// Unpack Frequently Used Variables
private void unpackageFlowFileInputStream(final FlowFileStreamResult result, final InputStream is,
final SourceFile source, final FlowFilePackageVersion packageVersion) {
final List<FlowFileResult> outputFiles = result.getOutputFiles();
final List<FlowFileErrorResult> errors = result.getErrors();
final FlowFilePackageVersion packageVersion = this.getPackageVersions().get(result.getVersion());
final FlowFileUnpackager unpackager = packageVersion.getUnpackager();

try {
do {
final Path contentPath = result.getOutputPath().resolve(UUID.randomUUID().toString() + ".dat");
FlowFileResult flowFileResult = null;
try (OutputStream out = Files.newOutputStream(contentPath)) {
final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
final Map<String, String> attributes = unpackager.unpackageFlowFile(is, out);
final long contentSize = Files.size(contentPath);
flowFileResult = new FlowFileResult(flowFilePath.toUri(), contentPath.toUri(), attributes,
contentSize);
flowFileResult = new FlowFileResult(source, null, attributes, contentSize);
outputFiles.add(flowFileResult);
} catch (Exception e) {
throw new Exception("Could not unpackage " + flowFilePath, e);
throw new Exception("Could not unpackage " + source.getAbsolutePath(), e);
}

if (result.isUuidFilenames() && flowFileResult != null) {
String filename = flowFileResult.getAttributes().get(CoreAttributes.FILENAME.key());
if (filename != null) {
Path newContentPath = contentPath.getParent().resolve(filename);
Files.move(contentPath, newContentPath);
flowFileResult.setContentPath(newContentPath.toUri());
flowFileResult.setContentPath(new SourceFile(newContentPath.toAbsolutePath().toString(),
newContentPath.getFileName().getFileName().toString(),
flowFileResult.getContentSize()));
}
}

} while (unpackager.hasMoreData());
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
errors.add(new FlowFileErrorResult(e, source));
}
}

private URI updatePath(final Path originalPath, String scheme) throws URISyntaxException {
final URI originalUri = originalPath.toUri();
final String host = originalUri.getHost();
final int port = originalUri.getPort();
final String path = originalUri.getPath();
final String query = originalUri.getQuery();
final String fragment = originalUri.getFragment();

final URI updatedURI = new URI(scheme, null, host, port, path, query, fragment);

return updatedURI;
}

private void unpackageFlowFilePath(final FlowFileStreamResult result,
final Path flowFilePath) {
private void unpackageInputStream(final FlowFileStreamResult result, final InputStream is,
final SourceFile source) {
final List<FlowFileErrorResult> errors = result.getErrors();
final String absolutePath = source.getAbsolutePath();

if (Files.isRegularFile(flowFilePath)) {
final String fileName = flowFilePath.getFileName().toString();

if (fileName.endsWith(".zip")) {
try {
final URI updatedPath = updatePath(flowFilePath, "jar:file");
if(absolutePath == null) {
errors.add(new FlowFileErrorResult(new NullPointerException("Given Absolute Path was null."), source));
return;
}

if (absolutePath.endsWith(".zip")) {
try (ZipInputStream zipIs = new ZipInputStream(is)) {
ZipEntry entry;
while ((entry = zipIs.getNextEntry()) != null) {
final String newAbsolutePath = entry.getName();
final String newFilename = new File(entry.getName()).getName();
unpackageInputStream(result, zipIs, new SourceFile(newAbsolutePath, newFilename, entry.getSize()));
}
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, source));
}
return;
}

final Map<String, Object> env = new HashMap<>();
env.put("create", "false");
if (absolutePath.endsWith(".tar.gz")) {
try (final GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(is);
final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(gzipInputStream);) {
ArchiveEntry entry;
while ((entry = tarInputStream.getNextEntry()) != null) {
if (entry.isDirectory()) {
byte[] buffer = new byte[1024];
while (tarInputStream.read(buffer) > 0) {

try (final FileSystem fs = FileSystems.newFileSystem(updatedPath, env)) {
for (Path path : fs.getRootDirectories()) {
unpackageFlowFilePath(result, path);
}
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, updatedPath));
} else {
final String newAbsolutePath = entry.getName();
final String newFilename = new File(entry.getName()).getName();
unpackageInputStream(result, tarInputStream, new SourceFile(newAbsolutePath, newFilename, entry.getSize()));
}

} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
}
} else if (fileName.endsWith(".tar.gz")) {
// TODO: ZIP File support
// TODO: TAR / GZIP support TarArchiveEntry
// this.fs = FileSystems.newFileSystem(zipfile, null);
errors.add(new FlowFileErrorResult(new IllegalArgumentException("Doesn't support .tar.gz files yet"),
flowFilePath.toUri()));

// try(final InputStream fis = Files.newInputStream(flowFilePath);final GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fis);final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(gzipInputStream);) {
// ArchiveEntry entry;
// while ((entry = tarInputStream.getNextEntry()) != null) {
// System.out.println("Entry Name: " + entry.getName());
// System.out.println("Entry Size: " + entry.getSize());
//
// // Process the entry data (if it's a file)
// if (!entry.isDirectory()) {
// result.getErrors();
//
// if(entry.getName().endsWith(fileName)) {
// unpackageFlowFileInputStream(result, packageVersion, uuidFilenames, flowFilePath, tarInputStream);
// continue;
// }
//
// byte[] buffer = new byte[1024];
// while (tarInputStream.read(buffer) > 0) {
//
// }
// }
// }
//
// }catch (Exception e) {
// errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
// }
} else {
// TODO: Currently assumes all files are flowFiles that are regular files and
// not archives.

try (final InputStream in = Files.newInputStream(flowFilePath)) {
unpackageFlowFileInputStream(result, flowFilePath, in);
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
}
}
} else if (Files.isDirectory(flowFilePath)) {
try (final Stream<Path> files = Files.list(flowFilePath)) {
files.forEach(x -> {
unpackageFlowFilePath(result, x);
});
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, source));
}
} else {
errors.add(new FlowFileErrorResult("Could not parse", flowFilePath.toUri()));
return;
}

if (absolutePath.endsWith(result.getExtension())) {
final FlowFilePackageVersion packageVersion = this.getPackageVersions().get(result.getVersion());
unpackageFlowFileInputStream(result, is, source, packageVersion);
return;
}

for (int version = 1; version <= 3; version++) {
final FlowFilePackageVersion packageVersion = this.getPackageVersions().get(version);
if (absolutePath.endsWith(packageVersion.getFileExtension())) {
unpackageFlowFileInputStream(result, is, source, packageVersion);
return;
}
}
}

@Command(name = "unpackage", description = "Unpackages FlowFileStream(s), information regarding this operation sent to standard out. See command arguments for furher details.")
Expand All @@ -195,42 +164,54 @@ public Integer unpackageFlowFileStream(
+ FlowFileStreamResult.INPUTPATH_UNPACKAGE_DESCRIPTION, required = true) final String inputOption,
@Option(names = { "-o", "--out" }, description = "The output path."
+ FlowFileStreamResult.OUTPUTPATH_UNPACKAGE_DESCRIPTION, required = true) final String outputOption,
@Option(names = { "-u", "--uuid" }, description = FlowFileStreamResult.UUID_DESCRIPTION, defaultValue = "true") final boolean uuidFilenames) {
@Option(names = { "-u",
"--uuid" }, description = FlowFileStreamResult.UUID_DESCRIPTION, defaultValue = "true") final boolean uuidFilenames) {
final FlowFileStreamResult result = createResult(version, extension, uuidFilenames, inputOption, outputOption);

// Unpack Frequently Used Variables
final Path inputPath = result.getInputPath();
final Path outputPath = result.getOutputPath();
final List<FlowFileErrorResult> errors = result.getErrors();

final SourceFile source = SourceFile.fromPath(inputPath);

// Get Packager For Current Version
final FlowFilePackageVersion packageVersion = packageVersions.get(version);
if (packageVersion == null) {
errors.add(new FlowFileErrorResult(new Exception("Bad FlowFile Package Version Given: " + version),
inputPath.toUri()));
source));
printResult(result);
return 1;
}

if (!Files.isDirectory(outputPath)) {
final Exception exception = new FileNotFoundException(
"Output path not found: " + outputPath.toAbsolutePath().toString());
errors.add(new FlowFileErrorResult(exception, inputPath.toUri()));
errors.add(new FlowFileErrorResult(exception, source));
} else {
if (Files.isDirectory(inputPath)) {
try (final Stream<Path> files = Files.list(inputPath)) {
files.forEach(flowFilePath -> {
unpackageFlowFilePath(result, flowFilePath);
files.forEach(x -> {
final SourceFile newSource = SourceFile.fromPath(x);
try(final InputStream is = Files.newInputStream(x)) {
unpackageInputStream(result, is, newSource);
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, newSource));
}
});
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, inputPath.toUri()));
errors.add(new FlowFileErrorResult(e, source));
}
} else if (Files.isRegularFile(inputPath)) {
unpackageFlowFilePath(result, inputPath);
try(final InputStream is = Files.newInputStream(inputPath)) {
unpackageInputStream(result, is, source);
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, source));
}
} else {
final Exception exception = new FileNotFoundException(
"Input path not found: " + inputPath.toAbsolutePath().toString());
errors.add(new FlowFileErrorResult(exception, inputPath.toUri()));
errors.add(new FlowFileErrorResult(exception, source));
}
}

Expand Down Expand Up @@ -258,11 +239,15 @@ public Integer packageFlowFileStream(
Path outputPath = result.getOutputPath();
final List<FlowFileErrorResult> errors = result.getErrors();

final SourceFile source = SourceFile.fromPath(inputPath);
final SourceFile output = SourceFile.fromPath(outputPath);


// Get Packager For Current Version
final FlowFilePackageVersion packageVersion = packageVersions.get(version);
if (packageVersion == null) {
errors.add(new FlowFileErrorResult(new Exception("Bad FlowFile Package Version Given: " + version),
inputPath.toUri()));
source));
printResult(result);
return 1;
}
Expand All @@ -279,14 +264,14 @@ public Integer packageFlowFileStream(
try {
Files.list(inputPath).filter(Files::isRegularFile).forEach(x -> contentPaths.add(x));
} catch (IOException e) {
result.getErrors().add(new FlowFileErrorResult(e, outputPath.toUri()));
result.getErrors().add(new FlowFileErrorResult(e, output));
printResult(result);
return 1;
}
} else {
final Exception exception = new FileNotFoundException(
"Flowfile content not found at path: " + inputPath.toAbsolutePath().toString());
result.getErrors().add(new FlowFileErrorResult(exception, inputPath.toUri()));
result.getErrors().add(new FlowFileErrorResult(exception, output));
printResult(result);
return 1;
}
Expand All @@ -295,26 +280,28 @@ public Integer packageFlowFileStream(

try (OutputStream outputStream = Files.newOutputStream(outputPath)) {
for (Path contentPath : contentPaths) {
final SourceFile content = SourceFile.fromPath(outputPath);

try {
final Map<String, String> attributes = new HashMap<>();
final long contentSize = updateDefaultAttributes(attributes, contentPath);

try (InputStream inputStream = Files.newInputStream(contentPath)) {
packager.packageFlowFile(inputStream, outputStream, attributes, contentSize);

final FlowFileResult packageResult = new FlowFileResult(outputPath.toUri(), inputPath.toUri(),
final FlowFileResult packageResult = new FlowFileResult(output, source,
attributes, contentSize);
result.getOutputFiles().add(packageResult);
}

} catch (IOException e) {
result.getErrors().add(new FlowFileErrorResult(e, contentPath.toUri()));
result.getErrors().add(new FlowFileErrorResult(e, content));
printResult(result);
return 1;
}
}
} catch (IOException e) {
result.getErrors().add(new FlowFileErrorResult(e, outputPath.toUri()));
result.getErrors().add(new FlowFileErrorResult(e, output));
printResult(result);
return 1;
}
Expand Down Expand Up @@ -356,8 +343,8 @@ public boolean printResult(final FlowFileStreamResult result) {
return false;
}

public FlowFileStreamResult createResult(final int version, String extension, final boolean uuidFilenames, final String inputOption,
String outputOption) {
public FlowFileStreamResult createResult(final int version, String extension, final boolean uuidFilenames,
final String inputOption, String outputOption) {
final Path inputPath = Paths.get(inputOption == null ? "." : inputOption);
final Path outputPath = outputOption == null ? inputPath : Paths.get(outputOption);

Expand Down
Loading

0 comments on commit c9cb44a

Please sign in to comment.