Package co.decodable.sdk.pipeline.testing
package co.decodable.sdk.pipeline.testing
Infrastructure and utilities for (integration) testing custom Decodable pipelines.
Using a PipelineTestContext
, you can produce elements for one or more Decodable
streams, run your custom pipeline, and assert the output elements of this pipeline on another
Decodable stream. It is recommended to use Testcontainers for starting a Kafka or Redpanda broker to
be used for testing, as shown below:
@Testcontainers
public class DataStreamJobTest {
private static final String PURCHASE_ORDERS = "purchase-orders";
private static final String PURCHASE_ORDERS_PROCESSED = "purchase-orders-processed";
@Container
public RedpandaContainer broker =
new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.2");
@Test
public void shouldUpperCaseCustomerName() throws Exception {
TestEnvironment testEnvironment =
TestEnvironment.builder()
.withBootstrapServers(broker.getBootstrapServers())
.withStreams(PURCHASE_ORDERS, PURCHASE_ORDERS_PROCESSED)
.build();
try (PipelineTestContext ctx = new PipelineTestContext(testEnvironment)) {
String value =
"{\n"
+ " \"order_id\" : 19001,\n"
+ " \"order_date\" : \"2023-06-09 10:18:38\",\n"
+ " \"customer_name\" : \"Yolanda Hagenes\",\n"
+ " \"price\" : 15.00,\n"
+ " \"product_id\" : 108,\n"
+ " \"order_status\" : false\n"
+ "}";
String value2 =
"{\n"
+ " \"order_id\" : 19002,\n"
+ " \"order_date\" : \"2023-06-09 11:25:33\",\n"
+ " \"customer_name\" : \"Erwin Mausepeter\",\n"
+ " \"price\" : 35.00,\n"
+ " \"product_id\" : 22,\n"
+ " \"order_status\" : false\n"
+ "}";
// given
ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value));
ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value2));
// when (as an example, PurchaseOrderProcessingJob upper-cases the customer name)
ctx.runJobAsync(PurchaseOrderProcessingJob::main);
StreamRecord<String> result =
ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
StreamRecord<String> result2 =
ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
ObjectNode purchaseOrder = (ObjectNode) new ObjectMapper().readTree(result.value());
ObjectNode purchaseOrder2 = (ObjectNode) new ObjectMapper().readTree(result2.value());
// then
assertThat(purchaseOrder.get("customer_name").asText()).isEqualTo("YOLANDA HAGENES");
assertThat(purchaseOrder2.get("customer_name").asText()).isEqualTo("ERWIN MAUSEPETER");
}
}
}
-
ClassDescriptionRepresents a data stream on the Decodable platform.Provides access to Decodable streams during testing as well as the ability to run custom Flink jobs.A
Consumer
variant which allows for declared checked exception types.StreamRecord<T>Represents one element on a Decodable stream.AnEnvironmentAccess.Environment
implementation for testing purposes, allowing to define one or more Decodable streams which then can be accessed from the job under test.A builder for creating newTestEnvironment
instances.