Skip to content

Commit

Permalink
Allow configuring channel capacity of websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
mlake committed May 12, 2024
1 parent ae1e96c commit 40745a1
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 7 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@
<artifactId>okio-jvm</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core-jvm</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-jdk8</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-slf4j</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
29 changes: 24 additions & 5 deletions src/main/kotlin/bybit/sdk/websocket/ByBitWebSocketClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ constructor(
reconnectWebSocket = false
}

fun getWebSocketEventChannel(): ReceiveChannel<ByBitWebSocketMessage> {
fun getWebSocketEventChannel(capacity: Int, onBufferOverflow: BufferOverflow): ReceiveChannel<ByBitWebSocketMessage> {
// CONFLATED means events might drop, UNLIMITED should use lots of memory
logger.warn("we are creating a Channel for websocket which capacity = Channel.UNLIMITED")
val c = Channel<ByBitWebSocketMessage>(capacity = Channel.UNLIMITED, onUndeliveredElement = { value -> logger.warn("Dropped value: $value") } )
logger.warn("we are creating a Channel for websocket which capacity = $capacity")
val c = Channel<ByBitWebSocketMessage>(
capacity = capacity,
onBufferOverflow = onBufferOverflow,
onUndeliveredElement = { value -> logger.warn("Dropped value: $value") } )
eventChannelList.add(c as SendChannel<ByBitWebSocketMessage>)
return c
}
Expand Down Expand Up @@ -261,11 +264,27 @@ constructor(
for (msg in messageList) {
logger.trace("Incoming WebSocket message {}: {}", msg::class.java.canonicalName, msg)

if (eventChannelList.isEmpty()) {
logger.error("eventChannelList is empty")
continue
}

for (c in eventChannelList) {
Concurrency.run {
try {
c.send(msg)
} catch (_: ClosedSendChannelException) {
when (msg) {
is TopicResponse.PublicTrade -> {
val channelResult = c.trySend(msg)
if (channelResult.isFailure) {
logger.error("trySend failed: $channelResult")
}
}
else -> {
c.send(msg)
}
}
} catch (e: ClosedSendChannelException) {
logger.error { "SendChannel closed: ${e.message}"}
delay(10)
eventChannelList.remove(c)
} catch (t: Throwable) {
Expand Down
28 changes: 28 additions & 0 deletions src/test/kotlin/bybit/sdk/rest/PositionClientTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package bybit.sdk.rest


import bybit.sdk.properties.ByBitProperties
import bybit.sdk.rest.position.PositionInfoParams
import bybit.sdk.shared.Category
import kotlin.test.Test
import kotlin.test.assertEquals

internal class PositionClientTest {

private var restClient: ByBitRestClient =
ByBitRestClient(
apiKey = System.getenv("BYBIT_API_KEY") ?: ByBitProperties.APIKEY,
secret = System.getenv("BYBIT_SECRET") ?: ByBitProperties.SECRET,
testnet = true,
// httpClientProvider = okHttpClientProvider
)

@Test
fun getPositionInfo() {
val resp = restClient.positionClient.getPositionInfoBlocking(
PositionInfoParams(Category.inverse, settleCoin = "BTC")
)
assertEquals(0, resp.retCode)
assertEquals("OK", resp.retMsg)
}
}
5 changes: 3 additions & 2 deletions src/test/kotlin/bybit/sdk/websocket/WebSocketTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bybit.sdk.websocket

import bybit.sdk.properties.ByBitProperties
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import org.junit.jupiter.api.Test

internal class WebSocketTest {
Expand Down Expand Up @@ -43,7 +44,7 @@ internal class WebSocketTest {
scopeOne.launch {
wsClientOne.connect(listOf(ByBitWebSocketSubscription(ByBitWebsocketTopic.Trades, "BTCUSD")))

val channel = wsClientOne.getWebSocketEventChannel()
val channel = wsClientOne.getWebSocketEventChannel(10, BufferOverflow.SUSPEND)

while (true) {
val msg = channel.receive()
Expand Down Expand Up @@ -88,7 +89,7 @@ internal class WebSocketTest {

wsClientPrivate.connect(privateSubs)

val channel = wsClientPrivate.getWebSocketEventChannel()
val channel = wsClientPrivate.getWebSocketEventChannel(10, BufferOverflow.SUSPEND)
while (true) {
when (val message = channel.receive()) {
is ByBitWebSocketMessage.StatusMessage -> {
Expand Down

0 comments on commit 40745a1

Please sign in to comment.