Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ datasource = "8.0.11-1836-95c6e774"
# Test Dependencies
kotest = "5.9.1"
turbine = "1.2.1"
mockk = "1.13.14"
mockk = "1.14.9"

# Spring BOM should manage other dependencies versions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import com.caplin.datasource.messaging.CachedMessageFactory
import com.caplin.datasource.messaging.container.ContainerMessage
import com.caplin.datasource.messaging.json.JsonMessage
import com.caplin.datasource.messaging.record.GenericMessage
import com.caplin.datasource.namespace.RegexNamespace
import com.caplin.datasource.publisher.CachingDataProvider
import com.caplin.datasource.publisher.CachingPublisher
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent.RowEvent.Remove
import com.caplin.integration.datasourcex.reactive.api.ContainerEvent.RowEvent.Upsert
import com.caplin.integration.datasourcex.reactive.api.InsertAt.HEAD
import com.caplin.integration.datasourcex.util.AntPatternNamespace
import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion
import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion.Completion
import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion.Value
Expand All @@ -37,6 +37,7 @@ import io.mockk.verifyOrder
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.properties.ReadOnlyProperty
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -82,7 +83,9 @@ class BindTest :

val dataSource =
mockk<DataSource> {
every { createCachingPublisher(any<RegexNamespace>(), capture(dataProviders)) } answers
every {
createCachingPublisher(any<AntPatternNamespace>(), capture(dataProviders))
} answers
{
secondArg<CachingDataProvider>().setPublisher(cachingPublisher)
cachingPublisher
Expand Down Expand Up @@ -134,11 +137,11 @@ class BindTest :

containerDataProviderCaptor.onRequest("/SUBJECT/1")

delay(10)
delay(10.milliseconds)

verify(exactly = 0) { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }

delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }

Expand All @@ -161,21 +164,21 @@ class BindTest :

containerDataProviderCaptor.onRequest("/SUBJECT/1")

delay(1)
delay(1.milliseconds)
requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf("1" to "a", "2" to "b")))

delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Upsert(key = "2", value = mapOf("1" to "x", "2" to "y")))

verify(exactly = 0) { cachingPublisher.publish(any()) }
delay(50)
delay(50.milliseconds)

verify(exactly = 0) { cachingPublisher.publish(any()) }
delay(51)
delay(51.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
containerMessages.last().also { containerMessage ->
Expand All @@ -187,7 +190,7 @@ class BindTest :
}

dataProviderCaptor.onRequest("/SUBJECT/1-items/1")
delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/1") }
genericMessages.last().also { genericMessage ->
Expand All @@ -199,7 +202,7 @@ class BindTest :
}

dataProviderCaptor.onRequest("/SUBJECT/1-items/2")
delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/2") }
genericMessages.last().also { genericMessage ->
Expand All @@ -212,11 +215,11 @@ class BindTest :

sharedFlow.emitUpdate(Remove(key = "2"))

delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Upsert(key = "3", value = mapOf("1" to "l", "2" to "m")))

delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
containerMessages.last().also { containerMessage ->
Expand All @@ -228,7 +231,7 @@ class BindTest :
}

dataProviderCaptor.onRequest("/SUBJECT/1-items/3")
delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/3") }
genericMessages.last().also { genericMessage ->
Expand Down Expand Up @@ -259,18 +262,18 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))

delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Upsert(key = "2", value = mapOf()))

delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
containerMessages.last().also { containerMessage ->
Expand All @@ -282,7 +285,7 @@ class BindTest :
}

containerDataProviderCaptor.onDiscard("/SUBJECT/1")
delay(1)
delay(1.milliseconds)
}

test("Generic Container - Re-request") {
Expand All @@ -299,18 +302,18 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))

delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Upsert(key = "2", value = mapOf()))

delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
containerMessages.last().also { containerMessage ->
Expand All @@ -322,23 +325,23 @@ class BindTest :
}

containerDataProviderCaptor.onDiscard("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

// Resubscribe

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow2 = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow2.emitUpdate(Upsert(key = "1", value = mapOf()))

delay(50)
delay(50.milliseconds)

sharedFlow2.emitUpdate(Upsert(key = "2", value = mapOf()))

delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }

Expand All @@ -351,7 +354,7 @@ class BindTest :
}

containerDataProviderCaptor.onDiscard("/SUBJECT/1")
delay(1)
delay(1.milliseconds)
}

test("Generic Container - Upsert and remove quicker than debounce") {
Expand All @@ -362,27 +365,27 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))
delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Remove(key = "1"))
delay(100)
delay(100.milliseconds)

verify(exactly = 0) { cachingPublisher.publish(any()) }

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))
delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Upsert(key = "2", value = mapOf()))
delay(50)
delay(50.milliseconds)

sharedFlow.emitUpdate(Remove(key = "1"))
delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
containerMessages.last().also { containerMessage ->
Expand All @@ -393,15 +396,15 @@ class BindTest :
}

dataProviderCaptor.onRequest("/SUBJECT/1-items/2")
delay(1)
delay(1.milliseconds)
verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/2") }
genericMessages.last().also { genericMessage ->
verify { cachingPublisher.publish(genericMessage) }
}

containerDataProviderCaptor.onDiscard("/SUBJECT/1")
dataProviderCaptor.onDiscard("/SUBJECT/1-items/2")
delay(1)
delay(1.milliseconds)

confirmVerified(cachedMessageFactory)
}
Expand All @@ -414,23 +417,23 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.subscriptionCount.value shouldNotBeEqual 0

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))
delay(50)
delay(50.milliseconds)

dataProviderCaptor.onRequest("/SUBJECT/1-items/1")
delay(1)
delay(1.milliseconds)

sharedFlow.subscriptionCount.value shouldNotBeEqual 0

sharedFlow.emit(Completion(IllegalStateException()))
delay(1)
delay(1.milliseconds)

sharedFlow.subscriptionCount.value shouldBeEqual 0

Expand All @@ -450,23 +453,23 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.subscriptionCount.value shouldNotBeEqual 0

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))
delay(50)
delay(50.milliseconds)

dataProviderCaptor.onRequest("/SUBJECT/1-items/1")
delay(1)
delay(1.milliseconds)

sharedFlow.subscriptionCount.value shouldNotBeEqual 0

sharedFlow.emit(Completion())
delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/1") }
Expand All @@ -489,7 +492,7 @@ class BindTest :
}

dataProviderCaptor.onRequest("/SUBJECT/1-items/1")
delay(1)
delay(1.milliseconds)

verify { cachedMessageFactory.createSubjectErrorEvent("/SUBJECT/1-items/1", NotFound) }
subjectErrorEvents.last().also { subjectErrorEvent ->
Expand All @@ -505,32 +508,32 @@ class BindTest :
}

containerDataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedContainerSubjects.keys shouldContain "/SUBJECT/1"
val sharedFlow = requestedContainerSubjects.getValue("/SUBJECT/1")

sharedFlow.subscriptionCount.value shouldNotBeEqual 0

sharedFlow.emitUpdate(Upsert(key = "1", value = mapOf()))
delay(101)
delay(101.milliseconds)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }

dataProviderCaptor.onRequest("/SUBJECT/1-items/2")
delay(9000)
delay(9000.milliseconds)
verify(exactly = 0) {
cachedMessageFactory.createSubjectErrorEvent("/SUBJECT/1-items/2", NotFound)
}
delay(1001)
delay(1001.milliseconds)

verify { cachedMessageFactory.createSubjectErrorEvent("/SUBJECT/1-items/2", NotFound) }
subjectErrorEvents.last().also { subjectErrorEvent ->
verify { cachingPublisher.publishSubjectErrorEvent(subjectErrorEvent) }
}

containerDataProviderCaptor.onDiscard("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

confirmVerified(cachedMessageFactory)
}
Expand All @@ -543,7 +546,7 @@ class BindTest :
requestedGenericSubjects.keys shouldNotContain "/SUBJECT/1"

dataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedGenericSubjects.keys shouldContain "/SUBJECT/1"

Expand Down Expand Up @@ -591,7 +594,7 @@ class BindTest :
requestedGenericSubjects.keys shouldNotContain "/SUBJECT/1"

dataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)

requestedGenericSubjects.keys shouldContain "/SUBJECT/1"

Expand Down Expand Up @@ -627,7 +630,7 @@ class BindTest :

requestedJsonSubjects.keys shouldNotContain "/SUBJECT/1"
dataProviderCaptor.onRequest("/SUBJECT/1")
delay(1)
delay(1.milliseconds)
requestedJsonSubjects.keys shouldContain "/SUBJECT/1"

val sharedFlow = requestedJsonSubjects.getValue("/SUBJECT/1")
Expand Down
Loading