Package co.decodable.sdk.pipeline.testing


package co.decodable.sdk.pipeline.testing
Infrastructure and utilities for (integration) testing custom Decodable pipelines.

Using a PipelineTestContext, you can produce elements for one or more Decodable streams, run your custom pipeline, and assert the output elements of this pipeline on another Decodable stream. It is recommended to use Testcontainers for starting a Kafka or Redpanda broker to be used for testing, as shown below:

@Testcontainers
public class DataStreamJobTest {

  private static final String PURCHASE_ORDERS = "purchase-orders";
  private static final String PURCHASE_ORDERS_PROCESSED = "purchase-orders-processed";

  @Container
  public RedpandaContainer broker =
      new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.2");

  @Test
  public void shouldUpperCaseCustomerName() throws Exception {
    TestEnvironment testEnvironment =
        TestEnvironment.builder()
            .withBootstrapServers(broker.getBootstrapServers())
            .withStreams(PURCHASE_ORDERS, PURCHASE_ORDERS_PROCESSED)
            .build();

    try (PipelineTestContext ctx = new PipelineTestContext(testEnvironment)) {
      String value =
          "{\n"
              + "  \"order_id\" : 19001,\n"
              + "  \"order_date\" : \"2023-06-09 10:18:38\",\n"
              + "  \"customer_name\" : \"Yolanda Hagenes\",\n"
              + "  \"price\" : 15.00,\n"
              + "  \"product_id\" : 108,\n"
              + "  \"order_status\" : false\n"
              + "}";
      String value2 =
          "{\n"
              + "  \"order_id\" : 19002,\n"
              + "  \"order_date\" : \"2023-06-09 11:25:33\",\n"
              + "  \"customer_name\" : \"Erwin Mausepeter\",\n"
              + "  \"price\" : 35.00,\n"
              + "  \"product_id\" : 22,\n"
              + "  \"order_status\" : false\n"
              + "}";

      // given
      ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value));
      ctx.stream(PURCHASE_ORDERS).add(new StreamRecord<>(value2));

      // when (as an example, PurchaseOrderProcessingJob upper-cases the customer name)
      ctx.runJobAsync(PurchaseOrderProcessingJob::main);

      StreamRecord<String> result =
          ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
      StreamRecord<String> result2 =
          ctx.stream(PURCHASE_ORDERS_PROCESSED).takeOne().get(30, TimeUnit.SECONDS);
      ObjectNode purchaseOrder = (ObjectNode) new ObjectMapper().readTree(result.value());
      ObjectNode purchaseOrder2 = (ObjectNode) new ObjectMapper().readTree(result2.value());

      // then
      assertThat(purchaseOrder.get("customer_name").asText()).isEqualTo("YOLANDA HAGENES");
      assertThat(purchaseOrder2.get("customer_name").asText()).isEqualTo("ERWIN MAUSEPETER");
    }
  }
}