Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sql): teach SqlStorageService.loadObjects and loadObjectsNewerThan to skip invalid objects #1450

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ default boolean supportsEventing(ObjectType objectType) {
<T extends Timestamped> T loadObject(ObjectType objectType, String objectKey)
throws NotFoundException;

/**
* Load objects of the given type with the given keys
*
* @param objectType {@link ObjectType} of the objects to load
* @return A list of the loaded objects. Any objects that fail to deserialize are skipped.
*/
default <T extends Timestamped> List<T> loadObjects(
ObjectType objectType, List<String> objectKeys) {
throw new UnsupportedOperationException();
Expand All @@ -47,7 +53,8 @@ default <T extends Timestamped> List<T> loadObjects(
* @param lastModifiedThreshold threshold to use for filtering based on the last_modified_at time
* in ms
* @return {@link Map<String, List<T>>} with two keys, "deleted" and "not_deleted". Each value is
* a list of fetched objects that satisfy the provided last_modified_at threshold.
* a list of fetched objects that satisfy the provided last_modified_at threshold. Any objects
* that fail to deserialize are skipped.
*/
default <T extends Timestamped> Map<String, List<T>> loadObjectsNewerThan(
ObjectType objectType, long lastModifiedThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.netflix.spinnaker.front50.model

import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.config.Front50SqlProperties
import com.netflix.spinnaker.front50.api.model.Timestamped
Expand Down Expand Up @@ -91,6 +93,8 @@ class SqlStorageService(
private val lastModifiedField = field("last_modified_at", Long::class.java)
}

private val invalidJsonCounterId: Id = registry.createId("sqlStorageService.invalidJson");

override fun supportsVersioning(): Boolean {
return true
}
Expand Down Expand Up @@ -145,14 +149,18 @@ class SqlStorageService(
}
}

records.map {
objectMapper.readValue(
it.getValue(field("body", String::class.java)),
objectType.clazz as Class<T>
).apply {
this.createdAt = it.getValue(field("created_at", Long::class.java))

this.lastModified = it.getValue(field("last_modified_at", Long::class.java))
records.mapNotNull {
val body = it.getValue(field("body", String::class.java))
try {
objectMapper.readValue(body, objectType.clazz as Class<T>
).apply {
this.createdAt = it.getValue(field("created_at", Long::class.java))
this.lastModified = it.getValue(field("last_modified_at", Long::class.java))
}
} catch (e: JsonProcessingException) {
log.error("unable to deserialize {}", objectType.name, e)
registry.counter(invalidJsonCounterId.withTag("objectType", objectType.group)).increment();
null
}
}
}
Expand Down Expand Up @@ -208,10 +216,14 @@ class SqlStorageService(
} else {
notDeletedKey
}
resultMap[insertInto]!!.add(
record.get("body", String::class.java)
.let { objectMapper.readValue(it, objectType.clazz as Class<T>)
})
val bodyString = record.get("body", String::class.java)
try {
val thisObject = objectMapper.readValue(bodyString, objectType.clazz as Class<T>)
resultMap[insertInto]!!.add(thisObject)
} catch (e: JsonProcessingException) {
log.error("unable to deserialize {}", objectType.name, e)
registry.counter(invalidJsonCounterId.withTag("objectType", objectType.group)).increment();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.netflix.spinnaker.front50.model

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spinnaker.config.Front50SqlProperties
import com.netflix.spinnaker.front50.api.model.Timestamped
import com.netflix.spinnaker.front50.api.model.pipeline.Pipeline;
Expand All @@ -31,9 +31,12 @@ import dev.minutest.rootContext
import java.time.Clock
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
import strikt.api.expectThat
import strikt.api.expectThrows
import strikt.assertions.hasSize
import strikt.assertions.isEmpty
import strikt.assertions.isEqualTo
import strikt.assertions.isNotEmpty
import strikt.assertions.isNotEqualTo
Expand All @@ -51,9 +54,12 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
jooqConfig.jdbcUrl,
jooqConfig.dialect
)

val registry = DefaultRegistry()

val sqlStorageService = SqlStorageService(
ObjectMapper(),
NoopRegistry(),
registry,
jooq,
Clock.systemDefaultZone(),
SqlRetryProperties(),
Expand Down Expand Up @@ -137,6 +143,11 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
}

context("Pipeline") {

after {
registry.reset()
}

test("create, update and delete a pipeline") {
// verify that a pipeline can be created
sqlStorageService.storeObject(
Expand Down Expand Up @@ -185,7 +196,7 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
}

var lastModifiedMs : Long = 100
test("bulk load pipelines") {
test("loadObjects basic behavior") {
val objectKeys = mutableSetOf<String>()
val lastModifiedList = mutableSetOf<Long>()
(1..10).forEach {
Expand Down Expand Up @@ -218,9 +229,56 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
expectThat(
pipelines.map { it.lastModified }.toSet()
).isEqualTo(lastModifiedList)
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(0);
}

test("loadObjects with malformed pipelines") {
// populate one record that fails to deserialize
val lastModified: Long = Instant.now().toEpochMilli()

// Can't use storeObject since it serializes a valid object...
val invalidObjectKey = "new-id-pipeline001-busted"
val bustedPipeline = mapOf("id" to invalidObjectKey,
"name" to "new-pipeline001-busted",
"application" to "application001",
"body" to "not json",
"created_at" to lastModified,
"last_modified_at" to lastModified,
"last_modified_by" to "test-user",
"is_deleted" to false)
jooq.insertInto(table("pipelines"), *bustedPipeline.keys.map { field(it) }.toTypedArray())
.values(bustedPipeline.values)
.execute()

val onlyInvalid = sqlStorageService.loadObjects<Pipeline>(
ObjectType.PIPELINE,
listOf(invalidObjectKey)
)
expectThat(onlyInvalid).isEmpty()
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(1);

// Add a valid pipeline and repeat. Make sure we get only the valid pipeline.
val validObjectKey = "new-id-pipeline002-valid"
sqlStorageService.storeObject(
ObjectType.PIPELINE,
validObjectKey,
Pipeline().apply {
this.setId(validObjectKey)
this.setName("new-pipeline002")
this.setLastModified(lastModified)
this.setApplication("application001")
}
)

val withValidPipeline = sqlStorageService.loadObjects<Pipeline>(
ObjectType.PIPELINE,
listOf(invalidObjectKey, validObjectKey)
)
expectThat(withValidPipeline.map { it.id }.toList()).isEqualTo(listOf(validObjectKey))
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(2);
}

test("load pipelines newer than") {
test("loadObjectsNewerThan basic behavior") {
// populate 10 records <= the threshold and 5 newer than the threshold
// make sure loadObjectsNewerThan returns 5
val lastModifiedThreshold: Long = Instant.now().toEpochMilli()
Expand Down Expand Up @@ -269,6 +327,7 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
lastModifiedThreshold
)
verifyNewerThan(newerItems, newObjectKeys, emptySet())
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(0);

// Delete a newer item and verify the behavior
val newIdToDelete = newObjectKeys.first()
Expand Down Expand Up @@ -302,6 +361,51 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
)
verifyNewerThan(afterModifyOlder, newObjectKeys + oldIdToModify, setOf(newIdToDelete, oldIdToDelete))
}

test("loadObjectsNewerThan with malformed pipelines") {
// populate one record that's newer than the threshold that fails to deserialize
val lastModifiedThreshold: Long = Instant.now().toEpochMilli()

// Can't use storeObject since it serializes a valid object...
val bustedPipeline = mapOf("id" to "new-id-pipeline001-busted",
"name" to "new-pipeline001-busted",
"application" to "application001",
"body" to "not json",
"created_at" to lastModifiedThreshold + 1,
"last_modified_at" to lastModifiedThreshold + 1,
"last_modified_by" to "test-user",
"is_deleted" to false)
jooq.insertInto(table("pipelines"), *bustedPipeline.keys.map { field(it) }.toTypedArray())
.values(bustedPipeline.values)
.execute()

val onlyInvalid: Map<String, List<Pipeline>> = sqlStorageService.loadObjectsNewerThan(
ObjectType.PIPELINE,
lastModifiedThreshold
)
verifyNewerThan(onlyInvalid, setOf(), setOf())
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(1);

// Add a valid pipeline and repeat. Make sure we get only the valid pipeline.
val objectKey = "new-id-pipeline002-valid"
sqlStorageService.storeObject(
ObjectType.PIPELINE,
objectKey,
Pipeline().apply {
this.setId(objectKey)
this.setName("new-pipeline002")
this.setLastModified(lastModifiedThreshold + 1)
this.setApplication("application001")
}
)

val withValidPipeline: Map<String, List<Pipeline>> = sqlStorageService.loadObjectsNewerThan(
ObjectType.PIPELINE,
lastModifiedThreshold
)
verifyNewerThan(withValidPipeline, setOf(objectKey), setOf())
expectThat(registry.counter("sqlStorageService.invalidJson", "objectType", "pipelines").count()).isEqualTo(2);
}
}

context("Entity Tags") {
Expand Down