Skip to content

Commit

Permalink
refactored unpackage methods to require less arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
jgwoolley committed Jan 8, 2025
1 parent b5e38d0 commit e04aefd
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 63 deletions.
118 changes: 62 additions & 56 deletions src/main/java/com/yelloowstone/nf2t/cli/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public App() {
}

private void unpackageFlowFileInputStream(final FlowFileStreamResult result,
final FlowFilePackageVersion packageVersion, final boolean uuidFilenames, final Path flowFilePath,
final Path flowFilePath,
final InputStream in) {
// Unpack Frequently Used Variables
final List<FlowFileResult> outputFiles = result.getOutputFiles();
final List<FlowFileErrorResult> errors = result.getErrors();

final FlowFilePackageVersion packageVersion = this.getPackageVersions().get(result.getVersion());
final FlowFileUnpackager unpackager = packageVersion.getUnpackager();

try {
Expand All @@ -69,13 +69,14 @@ private void unpackageFlowFileInputStream(final FlowFileStreamResult result,
try (OutputStream out = Files.newOutputStream(contentPath)) {
final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
final long contentSize = Files.size(contentPath);
flowFileResult = new FlowFileResult(flowFilePath.toUri(), contentPath.toUri(), attributes, contentSize);
flowFileResult = new FlowFileResult(flowFilePath.toUri(), contentPath.toUri(), attributes,
contentSize);
outputFiles.add(flowFileResult);
} catch (Exception e) {
throw new Exception("Could not unpackage " + flowFilePath, e);
}

if (uuidFilenames && flowFileResult != null) {
if (result.isUuidFilenames() && flowFileResult != null) {
String filename = flowFileResult.getAttributes().get(CoreAttributes.FILENAME.key());
if (filename != null) {
Path newContentPath = contentPath.getParent().resolve(filename);
Expand All @@ -91,51 +92,50 @@ private void unpackageFlowFileInputStream(final FlowFileStreamResult result,
}

private URI updatePath(final Path originalPath, String scheme) throws URISyntaxException {
final URI originalUri = originalPath.toUri();
final String host = originalUri.getHost();
final int port = originalUri.getPort();
final String path = originalUri.getPath();
final String query = originalUri.getQuery();
final String fragment = originalUri.getFragment();

final URI updatedURI = new URI(scheme, null, host, port,
path, query, fragment);

final URI originalUri = originalPath.toUri();
final String host = originalUri.getHost();
final int port = originalUri.getPort();
final String path = originalUri.getPath();
final String query = originalUri.getQuery();
final String fragment = originalUri.getFragment();

final URI updatedURI = new URI(scheme, null, host, port, path, query, fragment);

return updatedURI;
}

private void unpackageFlowFilePath(final FlowFileStreamResult result,
final FlowFilePackageVersion packageVersion, final boolean uuidFilenames, final Path flowFilePath) {
final Path flowFilePath) {
final List<FlowFileErrorResult> errors = result.getErrors();

if(Files.isRegularFile(flowFilePath)) {
if (Files.isRegularFile(flowFilePath)) {
final String fileName = flowFilePath.getFileName().toString();
if(fileName.endsWith(".zip")) {

if (fileName.endsWith(".zip")) {
try {
final URI updatedPath = updatePath(flowFilePath, "jar:file");
final Map<String, Object> env = new HashMap<>();
env.put("create", "false");
try(final FileSystem fs = FileSystems.newFileSystem(updatedPath, env)) {
final URI updatedPath = updatePath(flowFilePath, "jar:file");

final Map<String, Object> env = new HashMap<>();
env.put("create", "false");

try (final FileSystem fs = FileSystems.newFileSystem(updatedPath, env)) {
for (Path path : fs.getRootDirectories()) {
unpackageFlowFilePath(result, packageVersion, uuidFilenames, path);
}
} catch(Exception e) {
errors.add(new FlowFileErrorResult(e, updatedPath));
unpackageFlowFilePath(result, path);
}
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, updatedPath));
}



} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
}
} else if(fileName.endsWith(".tar.gz")) {
} else if (fileName.endsWith(".tar.gz")) {
// TODO: ZIP File support
// TODO: TAR / GZIP support TarArchiveEntry
// this.fs = FileSystems.newFileSystem(zipfile, null);
errors.add(new FlowFileErrorResult(new IllegalArgumentException("Doesn't support .tar.gz files yet"), flowFilePath.toUri()));

errors.add(new FlowFileErrorResult(new IllegalArgumentException("Doesn't support .tar.gz files yet"),
flowFilePath.toUri()));

// try(final InputStream fis = Files.newInputStream(flowFilePath);final GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fis);final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(gzipInputStream);) {
// ArchiveEntry entry;
// while ((entry = tarInputStream.getNextEntry()) != null) {
Expand All @@ -162,19 +162,19 @@ private void unpackageFlowFilePath(final FlowFileStreamResult result,
// errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
// }
} else {
// TODO: Currently assumes all files are flowFiles that are regular files and not archives.
// TODO: Currently assumes all files are flowFiles that are regular files and
// not archives.

try (final InputStream in = Files.newInputStream(flowFilePath)) {
unpackageFlowFileInputStream(result, packageVersion, uuidFilenames, flowFilePath, in);
unpackageFlowFileInputStream(result, flowFilePath, in);
} catch (Exception e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
}
}
}
else if(Files.isDirectory(flowFilePath)) {
} else if (Files.isDirectory(flowFilePath)) {
try (final Stream<Path> files = Files.list(flowFilePath)) {
files.forEach(x -> {
unpackageFlowFilePath(result, packageVersion, uuidFilenames, x);
unpackageFlowFilePath(result, x);
});
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, flowFilePath.toUri()));
Expand All @@ -184,19 +184,19 @@ else if(Files.isDirectory(flowFilePath)) {
}

}


@Command(name = "unpackage", description="Unpackages FlowFileStream(s), information regarding this operation sent to standard out. See command arguments for furher details.")

@Command(name = "unpackage", description = "Unpackages FlowFileStream(s), information regarding this operation sent to standard out. See command arguments for furher details.")
public Integer unpackageFlowFileStream(
@Option(names = { "-v", "--version" }, defaultValue = "3", description = {
FlowFileStreamResult.VERSION_DESCRIPTION + " incoming FlowFileStream(s)." }) final int version,
@Option(names = { "-e",
"--extension" }, defaultValue = ".pkg", description = FlowFileStreamResult.EXTENSION_UNPACKAGE_DESCRIPTION) final String extension,
@Option(names = { "-i", "--in" }, description = "The input path."
+ FlowFileStreamResult.INPUTPATH_UNPACKAGE_DESCRIPTION, required = true) final String inputOption,
@Option(names = { "-o", "--out" }, description = "The output path."
+ FlowFileStreamResult.OUTPUTPATH_UNPACKAGE_DESCRIPTION, required = true) final String outputOption,
@Option(names = { "-u", "--uuid" }, description = {
"Will make all unpackaged content filename(s) UUIDs, to prevent clobering." }, defaultValue = "true") final boolean uuidFilenames) {
final FlowFileStreamResult result = createResult(version, inputOption, outputOption);
@Option(names = { "-u", "--uuid" }, description = FlowFileStreamResult.UUID_DESCRIPTION, defaultValue = "true") final boolean uuidFilenames) {
final FlowFileStreamResult result = createResult(version, extension, uuidFilenames, inputOption, outputOption);

// Unpack Frequently Used Variables
final Path inputPath = result.getInputPath();
Expand All @@ -220,13 +220,13 @@ public Integer unpackageFlowFileStream(
if (Files.isDirectory(inputPath)) {
try (final Stream<Path> files = Files.list(inputPath)) {
files.forEach(flowFilePath -> {
unpackageFlowFilePath(result, packageVersion, uuidFilenames, flowFilePath);
unpackageFlowFilePath(result, flowFilePath);
});
} catch (IOException e) {
errors.add(new FlowFileErrorResult(e, inputPath.toUri()));
}
} else if (Files.isRegularFile(inputPath)) {
unpackageFlowFilePath(result, packageVersion, uuidFilenames, inputPath);
unpackageFlowFilePath(result, inputPath);
} else {
final Exception exception = new FileNotFoundException(
"Input path not found: " + inputPath.toAbsolutePath().toString());
Expand All @@ -240,16 +240,18 @@ public Integer unpackageFlowFileStream(

return 0;
}
@Command(name = "package", description="Packages FlowFileStream(s), information regarding this operation sent to standard out. See command arguments for furher details.")

@Command(name = "package", description = "Packages FlowFileStream(s), information regarding this operation sent to standard out. See command arguments for furher details.")
public Integer packageFlowFileStream(
@Option(names = { "-v", "--version" }, defaultValue = "3", description = {
FlowFileStreamResult.VERSION_DESCRIPTION + " resulting FlowFile." }) final int version,
@Option(names = { "-e",
"--extension" }, defaultValue = "", description = FlowFileStreamResult.EXTENSION_PACKAGE_DESCRIPTION) final String extension,
@Option(names = { "-i", "--in" }, description = "The input path."
+ FlowFileStreamResult.INPUTPATH_PACKAGE_DESCRIPTION, required = true) final String inputOption,
@Option(names = { "-o", "--out" }, description = "The output path."
+ FlowFileStreamResult.OUTPUTPATH_PACKAGE_DESCRIPTION, required = true) final String outputOption) {
final FlowFileStreamResult result = createResult(version, inputOption, outputOption);
final FlowFileStreamResult result = createResult(version, extension, true, inputOption, outputOption);

// Unpack Frequently Used Variables
final Path inputPath = result.getInputPath();
Expand Down Expand Up @@ -300,8 +302,8 @@ public Integer packageFlowFileStream(
try (InputStream inputStream = Files.newInputStream(contentPath)) {
packager.packageFlowFile(inputStream, outputStream, attributes, contentSize);

final FlowFileResult packageResult = new FlowFileResult(outputPath.toUri(), inputPath.toUri(), attributes,
contentSize);
final FlowFileResult packageResult = new FlowFileResult(outputPath.toUri(), inputPath.toUri(),
attributes, contentSize);
result.getOutputFiles().add(packageResult);
}

Expand All @@ -324,7 +326,7 @@ public Integer packageFlowFileStream(
return 0;
}

@Command(name = "generateSchema", description="Generates a JSONSchema for the result of the Unpackage/Package commands.")
@Command(name = "generateSchema", description = "Generates a JSONSchema for the result of the Unpackage/Package commands.")
public Integer generateSchema() throws JsonProcessingException {
SchemaFactoryWrapper visitor = new SchemaFactoryWrapper();
mapper.acceptJsonFormatVisitor(FlowFileStreamResult.class, visitor);
Expand All @@ -335,7 +337,7 @@ public Integer generateSchema() throws JsonProcessingException {

@Override
public Integer call() throws Exception {
throw new picocli.CommandLine.ParameterException(spec.commandLine(), "Missing required subcommand.");
throw new picocli.CommandLine.ParameterException(spec.commandLine(), "Missing required subcommand.");
}

public FlowFilePackageVersions getPackageVersions() {
Expand All @@ -354,13 +356,17 @@ public boolean printResult(final FlowFileStreamResult result) {
return false;
}

public static FlowFileStreamResult createResult(final int version, final String inputOption,
final String outputOption) {
public FlowFileStreamResult createResult(final int version, String extension, final boolean uuidFilenames, final String inputOption,
String outputOption) {
final Path inputPath = Paths.get(inputOption == null ? "." : inputOption);
final Path outputPath = outputOption == null ? inputPath : Paths.get(outputOption);

if (extension.length() <= 0) {
extension = packageVersions.get(version).getFileExtension();
}

long unixTime = System.currentTimeMillis() / 1000L;
return new FlowFileStreamResult(version, inputPath, outputPath, unixTime);
return new FlowFileStreamResult(version, extension, uuidFilenames, inputPath, outputPath, unixTime);
}

public static long updateDefaultAttributes(Map<String, String> attributes, Path path) throws IOException {
Expand Down
46 changes: 39 additions & 7 deletions src/main/java/com/yelloowstone/nf2t/cli/FlowFileStreamResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,37 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

public class FlowFileStreamResult {

protected static final String VERSION_DESCRIPTION = "This is the FlowFile version of the ";

private static final String UNPACKAGE_DESCRIPTION = " For the unpackage command, ";
private static final String PACKAGE_DESCRIPTION = " For the package command, ";

protected static final String EXTENSION_DESCRIPTION = "This is the extension ";
protected static final String EXTENSION_UNPACKAGE_DESCRIPTION = UNPACKAGE_DESCRIPTION + "it is used to determine whether or not an incoming file is a FlowFileStream of the specified version.";
protected static final String EXTENSION_PACKAGE_DESCRIPTION = PACKAGE_DESCRIPTION + "it will be used to generate the outgoing extension of the specified version.";
@JsonPropertyDescription(EXTENSION_DESCRIPTION + EXTENSION_UNPACKAGE_DESCRIPTION + EXTENSION_PACKAGE_DESCRIPTION)
@JsonProperty("extension")
final private String extension;

protected static final String VERSION_DESCRIPTION = "This is the FlowFileStream version of the ";
@JsonPropertyDescription(VERSION_DESCRIPTION + "file.")
@JsonProperty("version")
final private int version;

protected static final String INPUTPATH_UNPACKAGE_DESCRIPTION = " For the unpackage command, a single FlowFileStream file, a directory of FlowFileStream files, a directory containing .ZIP or .TAR.GZ files containing FlowFileStream(s), or a single .ZIP or .TAR.GZ file containing FlowFileStream(s).";
protected static final String INPUTPATH_PACKAGE_DESCRIPTION = " For the package command, a directory or file containing FlowFile content.";
protected static final String UUID_DESCRIPTION = UNPACKAGE_DESCRIPTION + "Will make all unpackaged content filename(s) UUIDs, to prevent clobering.";

@JsonPropertyDescription(UUID_DESCRIPTION)
@JsonProperty("uuidFilenames")
private boolean uuidFilenames;


protected static final String INPUTPATH_UNPACKAGE_DESCRIPTION = UNPACKAGE_DESCRIPTION + "a single FlowFileStream file, a directory of FlowFileStream files, a directory containing .ZIP or .TAR.GZ files containing FlowFileStream(s), or a single .ZIP or .TAR.GZ file containing FlowFileStream(s).";
protected static final String INPUTPATH_PACKAGE_DESCRIPTION = PACKAGE_DESCRIPTION + "a directory or file containing FlowFile content.";
@JsonPropertyDescription("The input path. " + INPUTPATH_UNPACKAGE_DESCRIPTION + INPUTPATH_PACKAGE_DESCRIPTION + "Represented by Java URI format.")
@JsonProperty("inputPath")
final private Path inputPath;

protected static final String OUTPUTPATH_UNPACKAGE_DESCRIPTION = " For the unpackage command, a directory containing the FlowFile content.";
protected static final String OUTPUTPATH_PACKAGE_DESCRIPTION = " For the package command, a directory where a FlowFileStream will be created containing all the incoming content with a default filename, or the full path to the FlowFileStream.";
protected static final String OUTPUTPATH_UNPACKAGE_DESCRIPTION = UNPACKAGE_DESCRIPTION + "a directory containing the FlowFile content.";
protected static final String OUTPUTPATH_PACKAGE_DESCRIPTION = PACKAGE_DESCRIPTION + "a directory where a FlowFileStream will be created containing all the incoming content with a default filename, or the full path to the FlowFileStream.";

@JsonPropertyDescription("The output path. " + OUTPUTPATH_UNPACKAGE_DESCRIPTION + OUTPUTPATH_PACKAGE_DESCRIPTION + "Represented by Java URI format.")
@JsonProperty("outputPath")
Expand All @@ -41,9 +58,11 @@ public class FlowFileStreamResult {

@JsonCreator
public FlowFileStreamResult(@JsonProperty("version") final int version,
@JsonProperty("inputPath") final Path inputPath, @JsonProperty("outputPath") final Path outputPath,
@JsonProperty("extension") String extension, @JsonProperty("uuidFilenames") final boolean uuidFilenames, @JsonProperty("inputPath") final Path inputPath, @JsonProperty("outputPath") final Path outputPath,
@JsonProperty("unixTime") final long unixTime) {
this.version = version;
this.extension = extension;
this.uuidFilenames = uuidFilenames;
this.inputPath = inputPath;
this.outputPath = outputPath;
this.unixTime = unixTime;
Expand All @@ -53,6 +72,19 @@ public int getVersion() {
return version;
}

public String getExtension() {
return extension;
}


public boolean isUuidFilenames() {
return uuidFilenames;
}

public void setUuidFilenames(boolean uuidFilenames) {
this.uuidFilenames = uuidFilenames;
}

public Path getInputPath() {
return inputPath;
}
Expand Down

0 comments on commit e04aefd

Please sign in to comment.