18
18
*/
19
19
package io.floodplain.test
20
20
21
+ import com.github.dockerjava.api.command.InspectContainerResponse
21
22
import org.testcontainers.containers.GenericContainer
22
23
import org.testcontainers.containers.KafkaContainer
24
+ import org.testcontainers.containers.wait.strategy.Wait
25
+ import org.testcontainers.images.builder.Transferable
23
26
import org.testcontainers.utility.DockerImageName
27
+ import java.nio.charset.StandardCharsets
24
28
import java.time.Duration
25
29
26
30
val useIntegraton: Boolean by lazy {
27
31
System .getenv(" NO_INTEGRATION" ) == null
28
32
}
29
33
34
+ private val logger = mu.KotlinLogging .logger {}
35
+
30
36
class KGenericContainer (imageName : String ) : GenericContainer<KGenericContainer>(DockerImageName .parse(imageName))
31
37
32
38
/* *
33
39
* Kotlin wrapper, to make testcontainers easier to use
34
40
*/
35
- class InstantiatedContainer (image : String , port : Int , env : Map <String , String > = emptyMap(), customizer : ((KGenericContainer ) -> KGenericContainer )? = null ) {
41
+ class InstantiatedContainer (
42
+ image : String ,
43
+ port : Int ,
44
+ env : Map <String , String > = emptyMap(),
45
+ customizer : ((KGenericContainer ) -> KGenericContainer )? = null
46
+ ) {
36
47
37
48
var container: KGenericContainer ? = KGenericContainer (image)
38
49
.apply { withExposedPorts(port) }
39
50
.apply { withEnv(env) }
40
51
.apply { customizer?.invoke(this ) }
41
52
var host: String
42
53
var exposedPort: Int = - 1
54
+
43
55
init {
44
56
container?.start()
45
57
host = container?.host ? : " localhost"
46
58
exposedPort = container?.firstMappedPort ? : - 1
47
59
}
60
+
48
61
fun close () {
49
62
container?.close()
50
63
}
51
64
}
65
+
52
66
// KafkaContainer("5.5.3").withEmbeddedZookeeper().withExposedPorts(9092,9093)
53
67
class InstantiatedKafkaContainer (customizer : ((KafkaContainer ) -> KafkaContainer )? = null ) {
54
68
// class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(DockerImageName.parse(imageName))
@@ -59,12 +73,61 @@ class InstantiatedKafkaContainer(customizer: ((KafkaContainer) -> KafkaContainer
59
73
.apply { customizer?.invoke(this ) }
60
74
var host: String
61
75
var exposedPort: Int = - 1
76
+
62
77
init {
63
78
container.start()
64
79
host = container.host ? : " localhost"
65
80
exposedPort = container.getMappedPort(9093 ) ? : - 1
66
81
}
82
+
83
+ fun close () {
84
+ container.close()
85
+ }
86
+ }
87
+
88
+ class InstantiatedRedPandaContainer (customizer : ((RedpandaContainer ) -> RedpandaContainer )? = null ) {
89
+ private val container = RedpandaContainer ()
90
+ .apply { customizer?.invoke(this ) }
91
+ var host: String
92
+ var exposedPort: Int = - 1
93
+
94
+ init {
95
+ container.start()
96
+ host = container.host ? : " localhost"
97
+ exposedPort = container.getMappedPort(9092 ) ? : - 1
98
+ }
99
+
67
100
fun close () {
68
101
container.close()
69
102
}
70
103
}
104
+
105
+ // withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
106
+
107
+ private const val STARTER_SCRIPT = " /testcontainers_start.sh"
108
+
109
+ class RedpandaContainer : GenericContainer <RedpandaContainer ?>(" vectorized/redpanda:v21.4.13" ) {
110
+ override fun containerIsStarting (containerInfo : InspectContainerResponse ? ) {
111
+ super .containerIsStarting(containerInfo)
112
+ var command = " #!/bin/bash\n "
113
+ command + = " /usr/bin/rpk redpanda start --check=false --node-id 0 "
114
+ command + = " --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 "
115
+ command + = " --advertise-kafka-addr PLAINTEXT://broker:29092,OUTSIDE://$host :" + getMappedPort(
116
+ 29092
117
+ )
118
+ logger.info(" command: $command " )
119
+ logger.info(" mapped port: $host :${getMappedPort(9092 )} " )
120
+ copyFileToContainer(
121
+ Transferable .of(command.toByteArray(StandardCharsets .UTF_8 ), 511 ),
122
+ STARTER_SCRIPT
123
+ )
124
+ logger.info(" Copied to location: $STARTER_SCRIPT " )
125
+ }
126
+
127
+ init {
128
+ withExposedPorts(9092 )
129
+ withCreateContainerCmdModifier { cmd -> cmd.withEntrypoint(" sh" ) }
130
+ withCommand(" -c" , " while [ ! -f $STARTER_SCRIPT ]; do sleep 0.1; done; $STARTER_SCRIPT " )
131
+ waitingFor(Wait .forLogMessage(" .*Started Kafka API server.*" , 1 ))
132
+ }
133
+ }
0 commit comments