From f15eeee44a0c545d78e859a650c6375dc10f23f2 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Mon, 25 Nov 2024 15:27:32 +0100 Subject: [PATCH 1/5] Add Gauge for in-progress jobs --- Sources/Queues/QueueWorker.swift | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index 96d4970..a94d765 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -13,7 +13,7 @@ extension Queue { /// The worker that runs ``Job``s. public struct QueueWorker: Sendable { let queue: any Queue - + /// Run the queue until there is no more work to be done. /// This is a thin wrapper for ELF-style callers. public func run() -> EventLoopFuture { @@ -35,6 +35,11 @@ public struct QueueWorker: Sendable { logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" logger.trace("Popping job from queue") + Gauge( + label: "jobs.in.progress.gauge", + dimensions: [("queueName", self.queue.queueName.string)] + ).record(1) + guard let id = try await self.queue.pop().get() else { // No job found, go around again. logger.trace("No pending jobs") @@ -122,17 +127,15 @@ public struct QueueWorker: Sendable { queue: any Queue, error: (any Error)? = nil ) { - // Checks how long the job took to complete Timer( - label: "\(jobName).jobDurationTimer", + label: "\(jobName).duration.timer", dimensions: [ ("success", error == nil ? "true" : "false"), ("jobName", jobName), ], - preferredDisplayUnit: .seconds + preferredDisplayUnit: .milliseconds ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) - // Adds the completed job to a different counter depending on its result if error != nil { Counter( label: "error.completed.jobs.counter", @@ -144,5 +147,10 @@ public struct QueueWorker: Sendable { dimensions: [("queueName", queue.queueName.string)] ).increment() } + + Gauge( + label: "jobs.in.progress.gauge", + dimensions: [("queueName", queue.queueName.string)] + ).record(-1) } } From 3c86781c00bbff1aee0e9fc298ffeb13202d3ed4 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Mon, 25 Nov 2024 15:40:44 +0100 Subject: [PATCH 2/5] Whitespace nit --- Sources/Queues/QueueWorker.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index a94d765..fedeaa1 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -13,7 +13,7 @@ extension Queue { /// The worker that runs ``Job``s. public struct QueueWorker: Sendable { let queue: any Queue - + /// Run the queue until there is no more work to be done. /// This is a thin wrapper for ELF-style callers. public func run() -> EventLoopFuture { From 19823fa7c508d5067a1edb0bc1427b7cd842c32b Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Mon, 25 Nov 2024 15:51:26 +0100 Subject: [PATCH 3/5] Move the gauge to the right place --- Sources/Queues/QueueWorker.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index fedeaa1..34ea635 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -35,11 +35,6 @@ public struct QueueWorker: Sendable { logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" logger.trace("Popping job from queue") - Gauge( - label: "jobs.in.progress.gauge", - dimensions: [("queueName", self.queue.queueName.string)] - ).record(1) - guard let id = try await self.queue.pop().get() else { // No job found, go around again. logger.trace("No pending jobs") @@ -70,6 +65,11 @@ public struct QueueWorker: Sendable { try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } + Gauge( + label: "jobs.in.progress.gauge", + dimensions: [("queueName", self.queue.queueName.string)] + ).record(1) + try await self.runOneJob(id: id, job: job, jobData: data, logger: logger) return true } From d3256ebd2cfbd7a8e139dce1e5b96cb61b93e155 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Mon, 25 Nov 2024 16:28:23 +0100 Subject: [PATCH 4/5] Make `Gauge` a `Meter` and add test --- Sources/Queues/QueueWorker.swift | 12 ++++++------ Tests/QueuesTests/MetricsTests.swift | 27 ++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index 34ea635..8aaa32f 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -65,10 +65,10 @@ public struct QueueWorker: Sendable { try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } - Gauge( - label: "jobs.in.progress.gauge", + Meter( + label: "jobs.in.progress.meter", dimensions: [("queueName", self.queue.queueName.string)] - ).record(1) + ).increment() try await self.runOneJob(id: id, job: job, jobData: data, logger: logger) return true @@ -148,9 +148,9 @@ public struct QueueWorker: Sendable { ).increment() } - Gauge( - label: "jobs.in.progress.gauge", + Meter( + label: "jobs.in.progress.meter", dimensions: [("queueName", queue.queueName.string)] - ).record(-1) + ).decrement() } } diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index cfef6f8..c52252d 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -39,7 +39,7 @@ final class MetricsTests: XCTestCase { try await self.app.queues.queue.worker.run() - let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "MyAsyncJob.jobDurationTimer" })) + let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "MyAsyncJob.duration.timer" })) let successDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "success" })) let idDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "jobName" })) XCTAssertEqual(successDimension.1, "true") @@ -66,6 +66,7 @@ final class MetricsTests: XCTestCase { let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "success.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(counter.lastValue, 1) } func testErroringJobsCounter() async throws { @@ -86,6 +87,7 @@ final class MetricsTests: XCTestCase { let counter = try XCTUnwrap(self.metrics.counters.first(where: { $0.label == "error.completed.jobs.counter" })) let queueNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "queueName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(counter.lastValue, 1) } func testDispatchedJobsCounter() async throws { @@ -110,5 +112,28 @@ final class MetricsTests: XCTestCase { let jobNameDimension = try XCTUnwrap(counter.dimensions.first(where: { $0.0 == "jobName" })) XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) XCTAssertEqual(jobNameDimension.1, MyAsyncJob.name) + XCTAssertEqual(counter.totalValue, 2) + } + + func testInProgressJobsGauge() async throws { + let promise = self.app.eventLoopGroup.next().makePromise(of: Void.self) + self.app.queues.add(MyAsyncJob(promise: promise)) + + self.app.get("foo") { req async throws in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + try await self.app.queues.queue.worker.run() + + let meter = try XCTUnwrap(self.metrics.meters.first(where: { $0.label == "jobs.in.progress.meter" })) + let queueNameDimension = try XCTUnwrap(meter.dimensions.first(where: { $0.0 == "queueName" })) + XCTAssertEqual(queueNameDimension.1, self.app.queues.queue.queueName.string) + XCTAssertEqual(meter.values, [1, 0]) } } From dadc6ac20ba5cc1667b9806eea8ec5a101e4baf8 Mon Sep 17 00:00:00 2001 From: Paul Toffoloni Date: Tue, 26 Nov 2024 09:57:24 +0100 Subject: [PATCH 5/5] Change back job duration timer label --- Sources/Queues/QueueWorker.swift | 2 +- Tests/QueuesTests/MetricsTests.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index 8aaa32f..ff585ea 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -128,7 +128,7 @@ public struct QueueWorker: Sendable { error: (any Error)? = nil ) { Timer( - label: "\(jobName).duration.timer", + label: "\(jobName).jobDurationTimer", dimensions: [ ("success", error == nil ? "true" : "false"), ("jobName", jobName), diff --git a/Tests/QueuesTests/MetricsTests.swift b/Tests/QueuesTests/MetricsTests.swift index c52252d..6051c92 100644 --- a/Tests/QueuesTests/MetricsTests.swift +++ b/Tests/QueuesTests/MetricsTests.swift @@ -39,7 +39,7 @@ final class MetricsTests: XCTestCase { try await self.app.queues.queue.worker.run() - let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "MyAsyncJob.duration.timer" })) + let timer = try XCTUnwrap(self.metrics.timers.first(where: { $0.label == "MyAsyncJob.jobDurationTimer" })) let successDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "success" })) let idDimension = try XCTUnwrap(timer.dimensions.first(where: { $0.0 == "jobName" })) XCTAssertEqual(successDimension.1, "true")