Package co.decodable.sdk.pipeline.testing
package co.decodable.sdk.pipeline.testing
Infrastructure and utilities for (integration) testing custom Decodable pipelines.
Using a PipelineTestContext
or a KeyedPipelineTestContext
, you can produce elements for one or
more Decodable streams, run your custom pipeline, and assert the output elements of this pipeline
on another Decodable stream. It is recommended to use Testcontainers for starting a Kafka or Redpanda broker to
be used for testing, as shown below:
@Testcontainers
@Deprecated
public class DataStreamJobTest {
private static final String PURCHASE_ORDERS = "purchase-orders";
private static final String PURCHASE_ORDERS_PROCESSED = "purchase-orders-processed";
@Container
public RedpandaContainer broker =
new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.2");
@Test
public void shouldUpperCaseCustomerName() throws Exception {
TestEnvironment testEnvironment =
TestEnvironment.builder()
.withBootstrapServers(broker.getBootstrapServers())
.withStreams(PURCHASE_ORDERS, PURCHASE_ORDERS_PROCESSED)
.build();
try (PipelineTestContext ctx = new PipelineTestContext(testEnvironment)) {
String value =
"{\n"
+ " \"order_id\" : 19001,\n"
+ " \"order_date\" : \"2023-06-09 10:18:38\",\n"
+ " \"customer_name\" : \"Yolanda Hagenes\",\n"
+ " \"price\" : 15.00,\n"
+ " \"product_id\" : 108,\n"
+ " \"order_status\" : false\n"
+ "}";
String value2 =
"{\n"
+ " \"order_id\" : 19002,\n"
+ " \"order_date\" : \"2023-06-09 11:25:33\",\n"
+ " \"customer_name\" : \"Erwin Mausepeter\",\n"
+ " \"price\" : 35.00,\n"
+ " \"product_id\" : 22,\n"
+ " \"order_status\" : false\n"
+ "}";
// given
ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value));
ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value2));
// when (as an example, PurchaseOrderProcessingJob upper-cases the customer name)
ctx.runJobAsync(PurchaseOrderProcessingJob::main);
StreamRecord<String> result =
ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
StreamRecord<String> result2 =
ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
ObjectNode purchaseOrder = (ObjectNode) new ObjectMapper().readTree(result.value());
ObjectNode purchaseOrder2 = (ObjectNode) new ObjectMapper().readTree(result2.value());
// then
assertThat(purchaseOrder.get("customer_name").asText()).isEqualTo("YOLANDA HAGENES");
assertThat(purchaseOrder2.get("customer_name").asText()).isEqualTo("ERWIN MAUSEPETER");
}
}
}
-
ClassDescriptionRepresents a data stream on the Decodable platform.KeyedDecodableStream<K,
V> Represents a keyed data stream on the Decodable platform.Provides access to keyed Decodable streams during testing as well as the ability to run custom Flink jobs.AConsumer
variant which allows for declared checked exception types.KeyedStreamRecord<K,V> Represents one element on a keyed Decodable stream.Provides access to Decodable streams during testing as well as the ability to run custom Flink jobs.AConsumer
variant which allows for declared checked exception types.StreamRecord<T>Represents one element on a Decodable stream.AnEnvironmentAccess.Environment
implementation for testing purposes, allowing to define one or more Decodable streams which then can be accessed from the job under test.A builder for creating newTestEnvironment
instances.