diff --git a/Sources/EventProducer/Events/Logic/EventScheduler.swift b/Sources/EventProducer/Events/Logic/EventScheduler.swift index 1b4f0fe0..462ee4b2 100644 --- a/Sources/EventProducer/Events/Logic/EventScheduler.swift +++ b/Sources/EventProducer/Events/Logic/EventScheduler.swift @@ -12,8 +12,10 @@ final class EventScheduler { private var timer: Timer? private let consumerUri: String? + private let maxDiskUsageBytes: Int? private let eventQueue: EventQueue private let networkService: NetworkingService + private var monitoring: Monitoring private var schedulingTime: TimeInterval { switch BuildEnvironment.system { @@ -24,11 +26,14 @@ final class EventScheduler { init( consumerUri: String?, + maxDiskUsageBytes: Int? = EventConfig.defaultQueueMaxDiskUsageBytes, eventQueue: EventQueue = .shared ) { self.consumerUri = consumerUri + self.maxDiskUsageBytes = maxDiskUsageBytes self.eventQueue = eventQueue self.networkService = NetworkingService(consumerUri: consumerUri) + self.monitoring = Monitoring.shared } /// The Scheduler applies a best-effort approach to sending events to the TL Consumer, meaning that it continuously tries to send @@ -72,6 +77,10 @@ final class EventScheduler { let batches = allEvents.chunkedBy(Constants.batchSize) for batch in batches { + let allowedBatchSize = getAllowedBatch(batch) + guard !allowedBatchSize.isEmpty else { + throw EventProducerError.eventSendBatchRequestFailure("Limit Exceeded") + } try await batchAndSend(batch, headerHelper: headerHelper) } } @@ -207,3 +216,26 @@ private extension EventScheduler { case valueDatatype = "Value.DataType" } } + +// MARK: Queue byte size checks + +extension EventScheduler { + func getAllowedBatch(_ batch: [Event]) -> [Event] { + var allowedBatch = batch + + while isExceedsMaxSize(for: allowedBatch) { + if allowedBatch.isEmpty { + return [] + } + if let droppedEvent = allowedBatch.popLast() { + monitoring.startOutage(eventName: droppedEvent.name) + } + } + return allowedBatch + } + + private func isExceedsMaxSize(for batch: [Event]) -> Bool { + guard let data = try? JSONEncoder().encode(batch) else { return false } + return FileManagerHelper.shared.exceedsMaximumSize(object: data, maximumSize: maxDiskUsageBytes ?? EventConfig.defaultQueueMaxDiskUsageBytes) + } +} diff --git a/Sources/EventProducer/Events/Models/EventConfig.swift b/Sources/EventProducer/Events/Models/EventConfig.swift index e2ac2337..ba31cd36 100644 --- a/Sources/EventProducer/Events/Models/EventConfig.swift +++ b/Sources/EventProducer/Events/Models/EventConfig.swift @@ -2,7 +2,8 @@ import Foundation import protocol Auth.CredentialsProvider public struct EventConfig: EventProducer { - static let defaultMaxDiskUsageBytes = 20480 + static let defaultQueueMaxDiskUsageBytes = 204800 + static let singleEventMaxDiskUsageBytes = 20480 /// An access token provider, used by the EventProducer to get access token. public var credentialsProvider: CredentialsProvider diff --git a/Sources/EventProducer/Monitoring/Monitoring.swift b/Sources/EventProducer/Monitoring/Monitoring.swift index 9b5ffdb6..93aa9320 100644 --- a/Sources/EventProducer/Monitoring/Monitoring.swift +++ b/Sources/EventProducer/Monitoring/Monitoring.swift @@ -1,9 +1,13 @@ import Foundation +import Combine struct Monitoring { static let eventName = "tep-tl-monitoring" static let shared = Monitoring() + var outageSubject: CurrentValueSubject? + private var cancellable: AnyCancellable? + private let monitoringQueue: MonitoringQueue private init(monitoringQueue: MonitoringQueue = .shared) { @@ -91,3 +95,24 @@ struct Monitoring { ) } } + +// MARK: Outage Info +extension Monitoring { + mutating func initOutageSubject(withEvent eventName: String) { + outageSubject = CurrentValueSubject( + .outageStart( + error: OutageStartError(code: "100", message: "Start outage error for \(eventName)"))) + } + + mutating func startOutage(eventName: String) { + guard let outageSubject = outageSubject else { + initOutageSubject(withEvent: eventName) + return + } + outageSubject.send(.outageStart(error: .init(code: "100", message: "Start outage error for \(eventName)"))) + } + + func endOutage(eventName: String) { + outageSubject?.send(.outageEnd(message: .init(message: "No outage for \(eventName)"))) + } +} diff --git a/Sources/EventProducer/Outage/OutageState.swift b/Sources/EventProducer/Outage/OutageState.swift index f8b1e708..92ca9200 100644 --- a/Sources/EventProducer/Outage/OutageState.swift +++ b/Sources/EventProducer/Outage/OutageState.swift @@ -2,6 +2,6 @@ import Common import Foundation public enum OutageState { - case outage(error: OutageStartError) - case noOutage(message: OutageEndMessage) + case outageStart(error: OutageStartError) + case outageEnd(message: OutageEndMessage) } diff --git a/Sources/EventProducer/TidalEventSender.swift b/Sources/EventProducer/TidalEventSender.swift index b3a5d4df..da21f01e 100644 --- a/Sources/EventProducer/TidalEventSender.swift +++ b/Sources/EventProducer/TidalEventSender.swift @@ -14,11 +14,21 @@ public final class TidalEventSender: EventSender { public var config: EventConfig? // MARK: - Private properties - - private let outageSubject = PassthroughSubject() + + var isOutage: Bool { + guard let outageState = monitoring.outageSubject?.value else { + return false + } + switch outageState { + case .outageStart: + return true + case .outageEnd: + return false + } + } private var scheduler: EventScheduler private let eventSubmitter: EventSubmitter - private let monitoring: Monitoring + private var monitoring: Monitoring private var monitoringScheduler: MonitoringScheduler private let fileManager: FileManagerHelper @@ -40,7 +50,7 @@ public final class TidalEventSender: EventSender { public func updateConfiguration(_ config: EventConfig) { self.config = config - self.scheduler = EventScheduler(consumerUri: config.consumerUri) + self.scheduler = EventScheduler(consumerUri: config.consumerUri, maxDiskUsageBytes: config.maxDiskUsageBytes) self.monitoringScheduler = MonitoringScheduler(consumerUri: config.consumerUri) } @@ -70,7 +80,7 @@ public final class TidalEventSender: EventSender { } guard !fileManager.exceedsMaximumSize(object: eventData, - maximumSize: config?.maxDiskUsageBytes ?? EventConfig.defaultMaxDiskUsageBytes) else { + maximumSize: EventConfig.singleEventMaxDiskUsageBytes) else { try await monitoring.updateMonitoringEvent(monitoringEventType: .failedStorage, eventName: event.name) startOutage(eventName: event.name) return @@ -104,11 +114,11 @@ public final class TidalEventSender: EventSender { monitoringScheduler.runScheduling(with: headerHelper) } - private func startOutage(eventName: String) { - outageSubject.send(.outage(error: .init(code: "100", message: "Start outage error for \(eventName)"))) + private func startOutage(eventName event: String) { + monitoring.startOutage(eventName: event) } - private func endOutage(eventName: String) { - outageSubject.send(.noOutage(message: .init(message: "No outage for \(eventName)"))) + private func endOutage(eventName event: String) { + monitoring.endOutage(eventName: event) } } diff --git a/Tests/EventProducerTests/EventsTests/EventsTests.swift b/Tests/EventProducerTests/EventsTests/EventsTests.swift index aeb83694..b5cfcbde 100644 --- a/Tests/EventProducerTests/EventsTests/EventsTests.swift +++ b/Tests/EventProducerTests/EventsTests/EventsTests.swift @@ -24,7 +24,7 @@ final class EventsTests: XCTestCase { private let testAccessToken = "testAccessToken" private let queue = EventQueue.shared private var headerHelper: HeaderHelper! - private let maxDiskUsageBytes = 20480 + private let maxDiskUsageBytes = 204800 override func setUp() async throws { try await super.setUp() @@ -260,4 +260,36 @@ final class EventsTests: XCTestCase { 1 ) } + + func testSchedulerBatchExceedsSize() async throws { + + guard let consumerUri = eventSender.config?.consumerUri else { + XCTFail("Default consumerUri should be set") + return + } + + let eventQueue = [ + Event( + name: "testEvent#1", + payload: "firstPayload"), + Event( + name: "testEvent#2", + payload: "secondPayload"), + Event( + name: "testEvent#3", + payload: "thirdPayload") + ] + + var eventScheduler = EventScheduler(consumerUri: consumerUri, maxDiskUsageBytes: 500) + + /// Check that the events are allowed based on the scheduler's allowed queue size limit + XCTAssertFalse(eventScheduler.getAllowedBatch(eventQueue).isEmpty) + + /// Reduce the maxDiskUsageBytes in order to drop the events + eventScheduler = EventScheduler(consumerUri: consumerUri, maxDiskUsageBytes: 50) + + /// Events that exceed the size will be dropped + XCTAssertTrue(eventScheduler.getAllowedBatch(eventQueue).isEmpty) + + } } diff --git a/Tests/EventProducerTests/HelperTests/FileManagerHelperTests.swift b/Tests/EventProducerTests/HelperTests/FileManagerHelperTests.swift index 9bb2687e..aee3d0b9 100644 --- a/Tests/EventProducerTests/HelperTests/FileManagerHelperTests.swift +++ b/Tests/EventProducerTests/HelperTests/FileManagerHelperTests.swift @@ -3,7 +3,7 @@ import XCTest final class FileManagerHelperTests: XCTestCase { private let sut: FileManagerHelper = .shared - private let maxDiskUsageBytes = 20480 + private let maxDiskUsageBytes = 204800 func testExceedsMaximumSize() { let data = Data(count: 10480) diff --git a/Tests/EventProducerTests/MonitoringTests/MonitoringTests.swift b/Tests/EventProducerTests/MonitoringTests/MonitoringTests.swift index 36478ea3..a8622962 100644 --- a/Tests/EventProducerTests/MonitoringTests/MonitoringTests.swift +++ b/Tests/EventProducerTests/MonitoringTests/MonitoringTests.swift @@ -19,10 +19,10 @@ final class MonitoringTests: XCTestCase { var isUserLoggedIn: Bool } - private let sut: Monitoring = .shared + private var sut: Monitoring = .shared private let monitoringQueue: MonitoringQueue = .shared private var headerHelper: HeaderHelper! - private let maxDiskUsageBytes = 20480 + private let maxDiskUsageBytes = 204800 private var mockCredentialsProvider: CredentialsProvider { MockCredentialsProvider(isUserLoggedIn: true) @@ -191,4 +191,26 @@ final class MonitoringTests: XCTestCase { let monitoringInfoAfterDeletion = await sut.getMonitoringInfo() XCTAssertEqual(sut.emptyMonitoringInfo, monitoringInfoAfterDeletion) } + + func testisOutageGetsTriggeredOnMonitoringUpdate() async throws { + + sut.startOutage(eventName: "test") + + guard sut.outageSubject != nil else { + XCTFail("outageSubject not accessible") + return + } + /// start outage + var isOutage: Bool = if case .outageStart? = sut.outageSubject?.value { true } else { false } + + /// Verify that the outage is active + XCTAssertTrue(isOutage) + + /// end outage + sut.endOutage(eventName: "test") + isOutage = if case .outageStart? = sut.outageSubject?.value { true } else { false } + + /// Verify that the outage is not active + XCTAssertFalse(isOutage) + } } diff --git a/Tests/EventProducerTests/NetworkingTests/NetworkingTests.swift b/Tests/EventProducerTests/NetworkingTests/NetworkingTests.swift index 41db84de..2e1ec7d8 100644 --- a/Tests/EventProducerTests/NetworkingTests/NetworkingTests.swift +++ b/Tests/EventProducerTests/NetworkingTests/NetworkingTests.swift @@ -12,7 +12,7 @@ final class NetworkingTests: XCTestCase { } } - private let maxDiskUsageBytes = 20480 + private let maxDiskUsageBytes = 204800 private var mockAuthProvider: AuthProvider { MockAuthProvider(token: "testAccessToken", clientID: "testURL")