Skip to content

Commit

Permalink
messy wip - needs cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
naknomum committed Jan 9, 2025
1 parent 46cfa2d commit f31f077
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 10 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/ecocean/Annotation.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ public void opensearchDocumentSerializer(JsonGenerator jgen, Shepherd myShepherd
return null;
}

@Override public String getAllVersionsSql() {
return "SELECT \"ID\", \"VERSION\" AS version FROM \"ANNOTATION\" ORDER BY version";
}

@Override public Base getById(Shepherd myShepherd, String id) {
return myShepherd.getAnnotation(id);
}

@Override public void setComments(final String comments) {
}

Expand Down
72 changes: 72 additions & 0 deletions src/main/java/org/ecocean/Base.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,78 @@ public static Map<String, Long> getAllVersions(Shepherd myShepherd, String sql)
return rtn;
}

// these two methods are kinda hacky needs for opensearchSyncIndex (e.g. the fact
// they are not static)
public abstract Base getById(Shepherd myShepherd, String id);

public abstract String getAllVersionsSql();

// contains some reflection; not pretty, but gets the job done
public static int[] opensearchSyncIndex(Shepherd myShepherd, Class cls, int stopAfter)
throws IOException {
int[] rtn = new int[2];
Object tmpObj = null;

try {
tmpObj = cls.newInstance();
} catch (Exception ex) {
throw new IOException("FAIL: " + ex);
}
Base baseObj = (Base)tmpObj;
String indexName = baseObj.opensearchIndexName();
if (OpenSearch.indexingActive()) {
System.out.println("Base.opensearchSyncIndex(" + indexName +
") skipped due to indexingActive()");
rtn[0] = -1;
rtn[1] = -1;
return rtn;
}
OpenSearch.setActiveIndexingBackground();
OpenSearch os = new OpenSearch();
System.out.println(">>>>>>>>>>>>>>>>>>>> " + baseObj.getAllVersionsSql());
List<List<String> > changes = os.resolveVersions(getAllVersions(myShepherd,
baseObj.getAllVersionsSql()), os.getAllVersions(indexName));
if (changes.size() != 2) throw new IOException("invalid resolveVersions results");
List<String> needIndexing = changes.get(0);
List<String> needRemoval = changes.get(1);
rtn[0] = needIndexing.size();
rtn[1] = needRemoval.size();
System.out.println("Base.opensearchSyncIndex(" + indexName + "): stopAfter=" + stopAfter +
", needIndexing=" + rtn[0] + ", needRemoval=" + rtn[1]);
int ct = 0;
for (String id : needIndexing) {
Base obj = baseObj.getById(myShepherd, id);
try {
if (obj != null) os.index(indexName, obj);
} catch (Exception ex) {
System.out.println("Base.opensearchSyncIndex(" + indexName + "): index failed " +
obj + " => " + ex.toString());
ex.printStackTrace();
}
if (ct % 500 == 0)
System.out.println("Base.opensearchSyncIndex(" + indexName + ") needIndexing: " +
ct + "/" + rtn[0]);
ct++;
if ((stopAfter > 0) && (ct > stopAfter)) {
System.out.println("Base.opensearchSyncIndex(" + indexName +
") breaking due to stopAfter");
break;
}
}
System.out.println("Base.opensearchSyncIndex(" + indexName + ") finished needIndexing");
ct = 0;
for (String id : needRemoval) {
os.delete(indexName, id);
if (ct % 500 == 0)
System.out.println("Base.opensearchSyncIndex(" + indexName + ") needRemoval: " +
ct + "/" + rtn[1]);
ct++;
}
System.out.println("Base.opensearchSyncIndex(" + indexName + ") finished needRemoval");
OpenSearch.unsetActiveIndexingBackground();
return rtn;
}

public static Base createFromApi(JSONObject payload, List<File> files, Shepherd myShepherd)
throws ApiException {
throw new ApiException("not yet supported");
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/org/ecocean/Encounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4326,15 +4326,23 @@ public static boolean opensearchAccess(org.json.JSONObject doc, User user,
return false;
}

@Override public Base getById(Shepherd myShepherd, String id) {
return myShepherd.getEncounter(id);
}

@Override public String getAllVersionsSql() {
return
"SELECT \"CATALOGNUMBER\", CAST(COALESCE(EXTRACT(EPOCH FROM CAST(\"MODIFIED\" AS TIMESTAMP))*1000,-1) AS BIGINT) AS version FROM \"ENCOUNTER\" ORDER BY version";
}

@Override public long getVersion() {
return Util.getVersionFromModified(modified);
}

public static Map<String, Long> getAllVersions(Shepherd myShepherd) {
String sql =
"SELECT \"CATALOGNUMBER\", CAST(COALESCE(EXTRACT(EPOCH FROM CAST(\"MODIFIED\" AS TIMESTAMP))*1000,-1) AS BIGINT) AS version FROM \"ENCOUNTER\" ORDER BY version";
Encounter enc = new Encounter();

return getAllVersions(myShepherd, sql);
return getAllVersions(myShepherd, enc.getAllVersionsSql());
}

public org.json.JSONObject opensearchMapping() {
Expand Down Expand Up @@ -4382,6 +4390,7 @@ public org.json.JSONObject opensearchMapping() {
return map;
}

/*
public static int[] opensearchSyncIndex(Shepherd myShepherd)
throws IOException {
return opensearchSyncIndex(myShepherd, 0);
Expand Down Expand Up @@ -4441,7 +4450,7 @@ public static int[] opensearchSyncIndex(Shepherd myShepherd, int stopAfter)
OpenSearch.unsetActiveIndexingBackground();
return rtn;
}

*/
public static Base createFromApi(org.json.JSONObject payload, List<File> files,
Shepherd myShepherd)
throws ApiException {
Expand Down Expand Up @@ -4704,5 +4713,4 @@ public void sendCreationEmails(Shepherd myShepherd, String langCode) {
myShepherd.rollbackDBTransaction();
}
}

}
10 changes: 9 additions & 1 deletion src/main/java/org/ecocean/MarkedIndividual.java
Original file line number Diff line number Diff line change
Expand Up @@ -2609,7 +2609,7 @@ public void run() {
try {
MarkedIndividual indiv = bgShepherd.getMarkedIndividual(indivId);
if ((indiv == null) || (indiv.getEncounters() == null)) {
//bgShepherd.rollbackAndClose();
// bgShepherd.rollbackAndClose();
executor.shutdown();
return;
}
Expand Down Expand Up @@ -2657,11 +2657,19 @@ public String toString() {
.toString();
}

@Override public Base getById(Shepherd myShepherd, String id) {
return myShepherd.getMarkedIndividual(id);
}

@Override public long getVersion() {
// Returning 0 for now since the class does not have a 'modified' attribute to compute this value, to be fixed in future.
return 0;
}

@Override public String getAllVersionsSql() {
return "SELECT \"INDIVIDUALID\", CAST(0 AS BIGINT) FROM \"MARKEDINDIVIDUAL\"";
}

public static Map<String, Long> getAllVersions(Shepherd myShepherd) {
// see above
String sql = "SELECT \"INDIVIDUALID\", CAST(0 AS BIGINT) FROM \"MARKEDINDIVIDUAL\"";
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/org/ecocean/Occurrence.java
Original file line number Diff line number Diff line change
Expand Up @@ -1408,11 +1408,19 @@ public void run() {
return Util.getVersionFromModified(modified);
}

@Override public Base getById(Shepherd myShepherd, String id) {
return myShepherd.getOccurrence(id);
}

@Override public String getAllVersionsSql() {
return
"SELECT \"OCCURRENCEID\", CAST(COALESCE(EXTRACT(EPOCH FROM CAST(\"MODIFIED\" AS TIMESTAMP))*1000,-1) AS BIGINT) AS version FROM \"OCCURRENCE\" ORDER BY version";
}

public static Map<String, Long> getAllVersions(Shepherd myShepherd) {
// note: some Occurrences do not have ids. :(
String sql =
"SELECT \"OCCURRENCEID\", CAST(COALESCE(EXTRACT(EPOCH FROM CAST(\"MODIFIED\" AS TIMESTAMP))*1000,-1) AS BIGINT) AS version FROM \"ENCOUNTER\" ORDER BY version";
Occurrence occ = new Occurrence();

return getAllVersions(myShepherd, sql);
return getAllVersions(myShepherd, occ.getAllVersionsSql());
}
}
3 changes: 2 additions & 1 deletion src/main/java/org/ecocean/OpenSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public void run() {
try {
myShepherd.beginDBTransaction();
System.out.println("OpenSearch background indexing running...");
Encounter.opensearchSyncIndex(myShepherd, BACKGROUND_SLICE_SIZE);
Base.opensearchSyncIndex(myShepherd, Encounter.class,
BACKGROUND_SLICE_SIZE);
System.out.println("OpenSearch background indexing finished.");
myShepherd.rollbackAndClose();
} catch (Exception ex) {
Expand Down

0 comments on commit f31f077

Please sign in to comment.