Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Fix grace period after kill for health check failure #7221

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/marathon/core/health/Health.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ case class Health(

def update(result: HealthResult): Health =
result match {
case Healthy(_, _, time, _) =>
case Healthy(_, _, _, time, _) =>
copy(
firstSuccess = firstSuccess.orElse(Some(time)),
lastSuccess = Some(time),
consecutiveFailures = 0
)
case Unhealthy(_, _, cause, time, _) =>
case Unhealthy(_, _, _, cause, time, _) =>
copy(
lastFailure = Some(time),
lastFailureCause = Some(cause),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package mesosphere.marathon
package core.health

import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.state.Timestamp

sealed trait HealthResult {
def instanceId: Instance.Id
def taskId: Task.Id
def version: Timestamp
def time: Timestamp
def publishEvent: Boolean
}

case class Healthy(instanceId: Instance.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = true)
case class Healthy(instanceId: Instance.Id, taskId: Task.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = true)
extends HealthResult

case class Unhealthy(
instanceId: Instance.Id,
taskId: Task.Id,
version: Timestamp,
cause: String,
time: Timestamp = Timestamp.now(),
Expand All @@ -26,5 +29,5 @@ case class Unhealthy(
* Representing an ignored HTTP response code (see [[MarathonHttpHealthCheck.ignoreHttp1xx]]. Will not update the
* health check state and not be published.
*/
case class Ignored(instanceId: Instance.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = false)
case class Ignored(instanceId: Instance.Id, taskId: Task.Id, version: Timestamp, time: Timestamp = Timestamp.now(), publishEvent: Boolean = false)
extends HealthResult
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import mesosphere.marathon.core.health.impl.AppHealthCheckActor.{
}
import mesosphere.marathon.core.health.impl.HealthCheckActor._
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.termination.{KillReason, KillService}
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.state.{AppDefinition, Timestamp}
Expand All @@ -40,7 +41,7 @@ private[health] class HealthCheckActor(
implicit val mat = ActorMaterializer()
import context.dispatcher

val healthByInstanceId = TrieMap.empty[Instance.Id, Health]
val healthByInstanceId = TrieMap.empty[Task.Id, Health]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we oughta rename this map?


private case class HealthCheckStreamStopped(thisInstance: this.type)

Expand Down Expand Up @@ -83,10 +84,8 @@ private[health] class HealthCheckActor(
def purgeStatusOfDoneInstances(instances: Seq[Instance]): Unit = {
logger.debug(s"Purging health status of inactive instances for app ${app.id} version ${app.version} and healthCheck ${healthCheck}")

val inactiveInstanceIds: Set[Instance.Id] = instances.filterNot(_.isActive).iterator.map(_.instanceId).toSet
inactiveInstanceIds.foreach { inactiveId =>
healthByInstanceId.remove(inactiveId)
}
val activeTaskIds: Set[Task.Id] = instances.map(_.appTask).filter(_.isActive).map(_.taskId).to(Set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val activeTaskIds: Set[Task.Id] = instances.map(_.appTask).filter(_.isActive).map(_.taskId).to(Set)
val activeTaskIds: Set[Task.Id] = instances.iterator.map(_.appTask).filter(_.isActive).map(_.taskId).to(Set)

healthByInstanceId.retain((taskId, health) => activeTaskIds(taskId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏


val checksToPurge = instances
.withFilter(!_.isActive)
Expand Down Expand Up @@ -144,12 +143,12 @@ private[health] class HealthCheckActor(

def handleHealthResult(result: HealthResult): Unit = {
val instanceId = result.instanceId
val health = healthByInstanceId.getOrElse(instanceId, Health(instanceId))
val health = healthByInstanceId.getOrElse(result.taskId, Health(instanceId))

val updatedHealth = result match {
case Healthy(_, _, _, _) =>
case Healthy(_, _, _, _, _) =>
Future.successful(health.update(result))
case Unhealthy(_, _, _, _, _) =>
case Unhealthy(_, _, _, _, _, _) =>
instanceTracker.instance(instanceId).map {
case Some(instance) =>
if (ignoreFailures(instance, health)) {
Expand Down Expand Up @@ -183,7 +182,7 @@ private[health] class HealthCheckActor(
val newHealth = instanceHealth.newHealth

logger.info(s"Received health result for app [${app.id}] version [${app.version}]: [$result]")
healthByInstanceId += (instanceId -> instanceHealth.newHealth)
healthByInstanceId += (result.taskId -> instanceHealth.newHealth)
appHealthCheckActor ! HealthCheckStatusChanged(ApplicationKey(app.id, app.version), healthCheck, newHealth)

if (health.alive != newHealth.alive && result.publishEvent) {
Expand All @@ -192,7 +191,9 @@ private[health] class HealthCheckActor(
}

def receive: Receive = {
case GetInstanceHealth(instanceId) => sender() ! healthByInstanceId.getOrElse(instanceId, Health(instanceId))
case GetInstanceHealth(instanceId) =>
sender() ! healthByInstanceId.find(_._1.instanceId == instanceId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... so we're going to scan a map? This seems like this could potentially perform poorly with lots of health checks. Maybe we should keep it indexed by instance id and evict the health status if the task id changes?

.map(_._2).getOrElse(Health(instanceId))

case GetAppHealth =>
sender() ! AppHealth(healthByInstanceId.values.to(Seq))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object HealthCheckWorker extends StrictLogging {
Success(
Unhealthy(
instance.instanceId,
instance.appTask.taskId,
instance.runSpecVersion,
s"${ex.getClass.getSimpleName}: ${ex.getMessage}"
)
Expand Down Expand Up @@ -93,18 +94,18 @@ object HealthCheckWorker extends StrictLogging {
singleRequest(RequestBuilding.Get(url), check.timeout).map { response =>
response.discardEntityBytes() //forget about the body
if (acceptableResponses.contains(response.status.intValue())) {
Healthy(instance.instanceId, instance.runSpecVersion)
Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion)
} else if (check.ignoreHttp1xx && (toIgnoreResponses.contains(response.status.intValue))) {
logger.debug(s"Ignoring health check HTTP response ${response.status.intValue} for instance=${instance.instanceId}")
Ignored(instance.instanceId, instance.runSpecVersion)
Ignored(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion)
} else {
logger.debug(s"Health check for instance=${instance.instanceId} responded with ${response.status}")
Unhealthy(instance.instanceId, instance.runSpecVersion, response.status.toString())
Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, response.status.toString())
}
}.recover {
case NonFatal(e) =>
logger.debug(s"Health check for instance=${instance.instanceId} did not respond due to ${e.getMessage}.")
Unhealthy(instance.instanceId, instance.runSpecVersion, e.getMessage)
Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, e.getMessage)
}
}

Expand All @@ -121,7 +122,7 @@ object HealthCheckWorker extends StrictLogging {
socket.connect(address, timeoutMillis)
socket.close()
}
Healthy(instance.instanceId, instance.runSpecVersion, Timestamp.now())
Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, Timestamp.now())
}(ThreadPoolContext.ioContext)
}

Expand All @@ -139,15 +140,15 @@ object HealthCheckWorker extends StrictLogging {
singleRequestHttps(RequestBuilding.Get(url), check.timeout).map { response =>
response.discardEntityBytes() // forget about the body
if (acceptableResponses.contains(response.status.intValue())) {
Healthy(instance.instanceId, instance.runSpecVersion)
Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion)
} else {
logger.debug(s"Health check for ${instance.instanceId} responded with ${response.status}")
Unhealthy(instance.instanceId, instance.runSpecVersion, response.status.toString())
Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, response.status.toString())
}
}.recover {
case NonFatal(e) =>
logger.debug(s"Health check for instance=${instance.instanceId} failed to respond due to ${e.getMessage}.")
Unhealthy(instance.instanceId, instance.runSpecVersion, e.getMessage)
Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, e.getMessage)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ class MarathonHealthCheckManager(
instance.tasksMap.values.withFilter(_.isRunning).map(_.status.mesosStatus).foreach {
case Some(mesosStatus) if mesosStatus.hasHealthy =>
val health =
if (mesosStatus.getHealthy) Healthy(instance.instanceId, instance.runSpecVersion, publishEvent = false)
else Unhealthy(instance.instanceId, instance.runSpecVersion, "", publishEvent = false)
if (mesosStatus.getHealthy) Healthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, publishEvent = false)
else Unhealthy(instance.instanceId, instance.appTask.taskId, instance.runSpecVersion, "", publishEvent = false)
ref ! health
case None =>
}
Expand Down Expand Up @@ -237,12 +237,13 @@ class MarathonHealthCheckManager(
override def update(taskStatus: TaskStatus, version: Timestamp): Unit =
appHealthChecks.readLock { ahcs =>
// construct a health result from the incoming task status
val instanceId = Task.Id.parse(taskStatus.getTaskId).instanceId
val taskId = Task.Id.parse(taskStatus.getTaskId)
val instanceId = taskId.instanceId
val maybeResult: Option[HealthResult] =
if (taskStatus.hasHealthy) {
val healthy = taskStatus.getHealthy
logger.info(s"Received status for $instanceId with version [$version] and healthy [$healthy]")
Some(if (healthy) Healthy(instanceId, version) else Unhealthy(instanceId, version, ""))
Some(if (healthy) Healthy(instanceId, taskId, version) else Unhealthy(instanceId, taskId, version, ""))
} else {
logger.debug(s"Ignoring status for $instanceId with no health information")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HealthCheckActorTest extends AkkaUnitTest {
val healthCheckWorkerHub: Sink[(AppDefinition, Instance, MarathonHealthCheck, ActorRef), NotUsed] =
MergeHub
.source[(AppDefinition, Instance, MarathonHealthCheck, ActorRef)](1)
.map { case (_, instance, _, ref) => ref ! Healthy(instance.instanceId, Timestamp.now()) }
.map { case (_, instance, _, ref) => ref ! Healthy(instance.instanceId, instance.appTask.taskId, Timestamp.now()) }
.to(Sink.ignore)
.run()

Expand Down