Package co.decodable.sdk.pipeline


package co.decodable.sdk.pipeline
An SDK for implementing Apache Flink jobs and running them on Decodable.

The SDK provides Source and StatefulSink implementations which integrate custom Flink jobs seamlessly with Decodable managed streams and, in turn, connections. For instance, you can retrieve the elements of a stream, apply a custom mapping function to them, and write them back to another stream using a Flink job like this:

@SourceStreams(PURCHASE_ORDERS_STREAM)
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
public class PurchaseOrderProcessingJob {

  static final String PURCHASE_ORDERS_STREAM = "purchase-orders";
  static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DecodableStreamSource<PurchaseOrder> source = DecodableStreamSource.<PurchaseOrder>builder()
        .withStreamName(PURCHASE_ORDERS_STREAM)
        .withDeserializationSchema(new JsonDeserializationSchema<>(PurchaseOrder.class))
        .build();

    DecodableStreamSink<PurchaseOrder> sink = DecodableStreamSink.<PurchaseOrder>builder()
        .withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
        .withSerializationSchema(new JsonSerializationSchema<>())
        .build();

    DataStream<PurchaseOrder> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
                    "[stream-purchase-orders] Purchase Orders Source")
        .map(new PurchaseOrderProcessor());

    stream.sinkTo(sink).name("[stream-purchase-orders-processed] Purchase Orders Sink");

    env.execute("Purchase Order Processor");
  }

Stream Metadata

While not required, it is a good practice for custom pipeline authors to provide metadata about the source and sink streams accessed by their pipelines. That way, the referenced pipelines can be displayed in the Decodable user interface. In order to do so, add a file named META-INF/decodable/stream-names.properties to your Flink job JAR. Within that file, specify the name(s) of all source and sink streams for each job class as comma-separated lists, using the property keys "<fully-qualified class name>.source-streams" and <fully-qualified class name>"sink-streams":

com.example.MyJob.source-streams=my_source_stream_1,my_source_stream_2
com.example.MyJob.sink-streams=my_sink_stream_1,my_sink_stream_2
com.example.MyOtherJob.source-streams=my_other_source_stream
com.example.MyOtherJob.sink-streams=my_other_sink_stream
Instead of manually creating this file, it is recommended to generate it automatically, using an annotation processor which ships with this SDK. To do so, specify the stream names using the SourceStreams and SinkStreams annotations on the job class, as shown in the example listing above.

Operator naming convention

Optionally, naming operators with a prefix of [stream-<stream_name>] will allow them to properly display input and output metrics by stream when run. For example, a stream source operator with the name [stream-my_source_stream_1] Purchase Order Source or a stream sink operator with the name [stream-my_sink_stream_1] Purchase Order Sink would generate metrics for that source and sink stream respectively.

Custom Metrics

By default, Decodable custom pipelines expose a set of Flink metrics. To expose additional metrics of your job, add the DecodableMetrics metric group to your registered metric:

@Override
public void open(Configuration parameters) throws Exception {
  recordsProcessed = getRuntimeContext()
          .getMetricGroup()
          .addGroup("DecodableMetrics")
          .counter("recordsProcessed", new SimpleCounter());
}