Skip to content

Commit

Permalink
Merge pull request #31 from tidal-music/improvement/check-queue-size
Browse files Browse the repository at this point in the history
Check Queue Size (iOS)
  • Loading branch information
emin-grbo authored Jun 24, 2024
2 parents f8f0a8e + cb47ecd commit 4426ab7
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 17 deletions.
32 changes: 32 additions & 0 deletions Sources/EventProducer/Events/Logic/EventScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ final class EventScheduler {
private var timer: Timer?

private let consumerUri: String?
private let maxDiskUsageBytes: Int?
private let eventQueue: EventQueue
private let networkService: NetworkingService
private var monitoring: Monitoring

private var schedulingTime: TimeInterval {
switch BuildEnvironment.system {
Expand All @@ -24,11 +26,14 @@ final class EventScheduler {

init(
consumerUri: String?,
maxDiskUsageBytes: Int? = EventConfig.defaultQueueMaxDiskUsageBytes,
eventQueue: EventQueue = .shared
) {
self.consumerUri = consumerUri
self.maxDiskUsageBytes = maxDiskUsageBytes
self.eventQueue = eventQueue
self.networkService = NetworkingService(consumerUri: consumerUri)
self.monitoring = Monitoring.shared
}

/// The Scheduler applies a best-effort approach to sending events to the TL Consumer, meaning that it continuously tries to send
Expand Down Expand Up @@ -72,6 +77,10 @@ final class EventScheduler {
let batches = allEvents.chunkedBy(Constants.batchSize)

for batch in batches {
let allowedBatchSize = getAllowedBatch(batch)
guard !allowedBatchSize.isEmpty else {
throw EventProducerError.eventSendBatchRequestFailure("Limit Exceeded")
}
try await batchAndSend(batch, headerHelper: headerHelper)
}
}
Expand Down Expand Up @@ -207,3 +216,26 @@ private extension EventScheduler {
case valueDatatype = "Value.DataType"
}
}

// MARK: Queue byte size checks

extension EventScheduler {
func getAllowedBatch(_ batch: [Event]) -> [Event] {
var allowedBatch = batch

while isExceedsMaxSize(for: allowedBatch) {
if allowedBatch.isEmpty {
return []
}
if let droppedEvent = allowedBatch.popLast() {
monitoring.startOutage(eventName: droppedEvent.name)
}
}
return allowedBatch
}

private func isExceedsMaxSize(for batch: [Event]) -> Bool {
guard let data = try? JSONEncoder().encode(batch) else { return false }
return FileManagerHelper.shared.exceedsMaximumSize(object: data, maximumSize: maxDiskUsageBytes ?? EventConfig.defaultQueueMaxDiskUsageBytes)
}
}
3 changes: 2 additions & 1 deletion Sources/EventProducer/Events/Models/EventConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import Foundation
import protocol Auth.CredentialsProvider

public struct EventConfig: EventProducer {
static let defaultMaxDiskUsageBytes = 20480
static let defaultQueueMaxDiskUsageBytes = 204800
static let singleEventMaxDiskUsageBytes = 20480

/// An access token provider, used by the EventProducer to get access token.
public var credentialsProvider: CredentialsProvider
Expand Down
25 changes: 25 additions & 0 deletions Sources/EventProducer/Monitoring/Monitoring.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import Foundation
import Combine

struct Monitoring {
static let eventName = "tep-tl-monitoring"
static let shared = Monitoring()

var outageSubject: CurrentValueSubject<OutageState, Never>?
private var cancellable: AnyCancellable?

private let monitoringQueue: MonitoringQueue

private init(monitoringQueue: MonitoringQueue = .shared) {
Expand Down Expand Up @@ -91,3 +95,24 @@ struct Monitoring {
)
}
}

// MARK: Outage Info
extension Monitoring {
mutating func initOutageSubject(withEvent eventName: String) {
outageSubject = CurrentValueSubject<OutageState, Never>(
.outageStart(
error: OutageStartError(code: "100", message: "Start outage error for \(eventName)")))
}

mutating func startOutage(eventName: String) {
guard let outageSubject = outageSubject else {
initOutageSubject(withEvent: eventName)
return
}
outageSubject.send(.outageStart(error: .init(code: "100", message: "Start outage error for \(eventName)")))
}

func endOutage(eventName: String) {
outageSubject?.send(.outageEnd(message: .init(message: "No outage for \(eventName)")))
}
}
4 changes: 2 additions & 2 deletions Sources/EventProducer/Outage/OutageState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ import Common
import Foundation

public enum OutageState {
case outage(error: OutageStartError)
case noOutage(message: OutageEndMessage)
case outageStart(error: OutageStartError)
case outageEnd(message: OutageEndMessage)
}
28 changes: 19 additions & 9 deletions Sources/EventProducer/TidalEventSender.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ public final class TidalEventSender: EventSender {
public var config: EventConfig?

// MARK: - Private properties

private let outageSubject = PassthroughSubject<OutageState, Never>()

var isOutage: Bool {
guard let outageState = monitoring.outageSubject?.value else {
return false
}
switch outageState {
case .outageStart:
return true
case .outageEnd:
return false
}
}
private var scheduler: EventScheduler
private let eventSubmitter: EventSubmitter
private let monitoring: Monitoring
private var monitoring: Monitoring
private var monitoringScheduler: MonitoringScheduler
private let fileManager: FileManagerHelper

Expand All @@ -40,7 +50,7 @@ public final class TidalEventSender: EventSender {

public func updateConfiguration(_ config: EventConfig) {
self.config = config
self.scheduler = EventScheduler(consumerUri: config.consumerUri)
self.scheduler = EventScheduler(consumerUri: config.consumerUri, maxDiskUsageBytes: config.maxDiskUsageBytes)
self.monitoringScheduler = MonitoringScheduler(consumerUri: config.consumerUri)
}

Expand Down Expand Up @@ -70,7 +80,7 @@ public final class TidalEventSender: EventSender {
}

guard !fileManager.exceedsMaximumSize(object: eventData,
maximumSize: config?.maxDiskUsageBytes ?? EventConfig.defaultMaxDiskUsageBytes) else {
maximumSize: EventConfig.singleEventMaxDiskUsageBytes) else {
try await monitoring.updateMonitoringEvent(monitoringEventType: .failedStorage, eventName: event.name)
startOutage(eventName: event.name)
return
Expand Down Expand Up @@ -104,11 +114,11 @@ public final class TidalEventSender: EventSender {
monitoringScheduler.runScheduling(with: headerHelper)
}

private func startOutage(eventName: String) {
outageSubject.send(.outage(error: .init(code: "100", message: "Start outage error for \(eventName)")))
private func startOutage(eventName event: String) {
monitoring.startOutage(eventName: event)
}

private func endOutage(eventName: String) {
outageSubject.send(.noOutage(message: .init(message: "No outage for \(eventName)")))
private func endOutage(eventName event: String) {
monitoring.endOutage(eventName: event)
}
}
34 changes: 33 additions & 1 deletion Tests/EventProducerTests/EventsTests/EventsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final class EventsTests: XCTestCase {
private let testAccessToken = "testAccessToken"
private let queue = EventQueue.shared
private var headerHelper: HeaderHelper!
private let maxDiskUsageBytes = 20480
private let maxDiskUsageBytes = 204800

override func setUp() async throws {
try await super.setUp()
Expand Down Expand Up @@ -260,4 +260,36 @@ final class EventsTests: XCTestCase {
1
)
}

func testSchedulerBatchExceedsSize() async throws {

guard let consumerUri = eventSender.config?.consumerUri else {
XCTFail("Default consumerUri should be set")
return
}

let eventQueue = [
Event(
name: "testEvent#1",
payload: "firstPayload"),
Event(
name: "testEvent#2",
payload: "secondPayload"),
Event(
name: "testEvent#3",
payload: "thirdPayload")
]

var eventScheduler = EventScheduler(consumerUri: consumerUri, maxDiskUsageBytes: 500)

/// Check that the events are allowed based on the scheduler's allowed queue size limit
XCTAssertFalse(eventScheduler.getAllowedBatch(eventQueue).isEmpty)

/// Reduce the maxDiskUsageBytes in order to drop the events
eventScheduler = EventScheduler(consumerUri: consumerUri, maxDiskUsageBytes: 50)

/// Events that exceed the size will be dropped
XCTAssertTrue(eventScheduler.getAllowedBatch(eventQueue).isEmpty)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import XCTest

final class FileManagerHelperTests: XCTestCase {
private let sut: FileManagerHelper = .shared
private let maxDiskUsageBytes = 20480
private let maxDiskUsageBytes = 204800

func testExceedsMaximumSize() {
let data = Data(count: 10480)
Expand Down
26 changes: 24 additions & 2 deletions Tests/EventProducerTests/MonitoringTests/MonitoringTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ final class MonitoringTests: XCTestCase {
var isUserLoggedIn: Bool
}

private let sut: Monitoring = .shared
private var sut: Monitoring = .shared
private let monitoringQueue: MonitoringQueue = .shared
private var headerHelper: HeaderHelper!
private let maxDiskUsageBytes = 20480
private let maxDiskUsageBytes = 204800

private var mockCredentialsProvider: CredentialsProvider {
MockCredentialsProvider(isUserLoggedIn: true)
Expand Down Expand Up @@ -191,4 +191,26 @@ final class MonitoringTests: XCTestCase {
let monitoringInfoAfterDeletion = await sut.getMonitoringInfo()
XCTAssertEqual(sut.emptyMonitoringInfo, monitoringInfoAfterDeletion)
}

func testisOutageGetsTriggeredOnMonitoringUpdate() async throws {

sut.startOutage(eventName: "test")

guard sut.outageSubject != nil else {
XCTFail("outageSubject not accessible")
return
}
/// start outage
var isOutage: Bool = if case .outageStart? = sut.outageSubject?.value { true } else { false }

/// Verify that the outage is active
XCTAssertTrue(isOutage)

/// end outage
sut.endOutage(eventName: "test")
isOutage = if case .outageStart? = sut.outageSubject?.value { true } else { false }

/// Verify that the outage is not active
XCTAssertFalse(isOutage)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ final class NetworkingTests: XCTestCase {
}
}

private let maxDiskUsageBytes = 20480
private let maxDiskUsageBytes = 204800

private var mockAuthProvider: AuthProvider {
MockAuthProvider(token: "testAccessToken", clientID: "testURL")
Expand Down

0 comments on commit 4426ab7

Please sign in to comment.