// This example implements the asynchronous request and callback with Futures that have the
  // interface of Java 8's futures (which is the same one followed by Flink's Future)
  * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
  class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
  /** The database specific client that can issue concurrent requests with callbacks */
  private transient DatabaseClient client;
  public void open(Configuration parameters) throws Exception {
  client = new DatabaseClient(host, post, credentials);
  public void close() throws Exception {
  public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
  // issue the asynchronous request, receive a future for result
  final Future<String> result = client.query(key);
  // set the callback to be executed once the request by the client is complete
  // the callback simply forwards the result to the result future
  CompletableFuture.supplyAsync(new Supplier<String>() {
  public String get() {
  try {
  return result.get();
  } catch (InterruptedException | ExecutionException e) {
  // Normally handled explicitly.
  return null;
  }).thenAccept( (String dbResult) -> {
  resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
  // create the original stream
  DataStream<String> stream = ...;
  // apply the async I/O transformation
  DataStream<Tuple2<String, String>> resultStream =
  AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
  本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation;AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行
  * A function to trigger Async I/O operation.
  * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
  * the result can be collected by calling {@link ResultFuture#complete}. For each async
  * operation, its context is stored in the operator immediately after invoking
  * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
  * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
  * An error can also be propagate to the async IO operator by
  * {@link ResultFuture#completeExceptionally(Throwable)}.
  * <p>Callback www.michenggw.com example usage:
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  * public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
  * HBaseCallback cb = new HBaseCallback(result);
  * Get get = new Get(Bytes.toBytes(row));
  * hbase.asyncGet(get, cb);
  * }
  * }
  * }</pre>
  * <p>Future example www.mcyllpt.com usage:
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  * public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
  * Get get = new Get(Bytes.toBytes(row));
  * ListenableFuture<Result>www.gcyl158.com future = hbase.asyncGet(get);
  * Futures.addCallback(future, new FutureCallback<Result>() {
  * public void onSuccess(Result result) {
  * List<String> ret = process(result);
  * result.complete(ret);
  * }
  * public void onFailure(Throwable thrown) {
  * result.completeExceptionally(thrown);
  * }
  * });
  * }
  * }
  * }</pre>
  * @param <IN> The type of the www.haitianguo.cn input elements.
  * @param <OUT> The type of the returned elements.
  public interface AsyncFunction<IN, OUT> extends Function, Serializable {
   * Trigger async operation for each stream input.
   * @param input element coming from an upstream task
   * @param resultFuture to be completed with the result data
   * @exception Exception in case of a user code error. An exception will make the task fail and
   * trigger fail-over process.
  void asyncInvoke(IN input, ResultFuture<www.fenghuang1999.com> resultFuture) throws Exception;
   * {@link AsyncFunction#asyncInvoke} timeout occurred.
   * By default, the result future is exceptionally completed with a timeout exception.
   * @param input element coming from an upstream task
   * @param resultFuture to be completed with the result data
  default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
  new TimeoutException("Async function call has timed out."));
  public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
  private static final long serialVersionUID = 3858030061138121840L;
  public void setRuntimeContext(RuntimeContext runtimeContext) {
  if (runtimeContext instanceof IterationRuntimeContext) {
  new RichAsyncFunctionIterationRuntimeContext(
  (IterationRuntimeContext) runtimeContext));
  } else {
  super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
  public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
   * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
   * context only supports basic operations which are thread safe. Consequently, state access,
   * accumulators, broadcast variables and the distributed cache are disabled.
  private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
  private final RuntimeContext runtimeContext;
  RichAsyncFunctionRuntimeContext(RuntimeContext context) {
  runtimeContext = Preconditions.checkNotNull(context);
  public String getTaskName() {
  return runtimeContext.getTaskName();
  public MetricGroup getMetricGroup() {
  return runtimeContext.getMetricGroup();
  public int getNumberOfParallelSubtasks() {
  return runtimeContext.getNumberOfParallelSubtasks();
  public int getMaxNumberOfParallelSubtasks() {
  return runtimeContext.getMaxNumberOfParallelSubtasks();
  public int getIndexOfThisSubtask() {
  return runtimeContext.getIndexOfThisSubtask();
  public int getAttemptNumber() {
  return runtimeContext.getAttemptNumber();
  public String getTaskNameWithSubtasks() {
  return runtimeContext.getTaskNameWithSubtasks();
  public ExecutionConfig getExecutionConfig() {
  return runtimeContext.getExecutionConfig();
  public ClassLoader getUserCodeClassLoader() {
  return runtimeContext.getUserCodeClassLoader();
  // -----------------------------------------------------------------------------------
  // Unsupported operations
  // -----------------------------------------------------------------------------------
  public DistributedCache getDistributedCache() {
  throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
  public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
  throw new UnsupportedOperationException("State is not supported in rich async functions.");
  public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
  throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
  public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
  throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
  public Map<String, Accumulator<?, ?>> getAllAccumulators() {
  throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
  public IntCounter getIntCounter(String name) {
  throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
  public LongCounter getLongCounter(String name) {
  throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
  public DoubleCounter getDoubleCounter(String name) {
  throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
  public Histogram getHistogram(String name) {
  throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
  public boolean hasBroadcastVariable(String name) {
  throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
  public <RT> List<RT> getBroadcastVariable(String name) {
  throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
  public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
  throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
  private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
  private final IterationRuntimeContext iterationRuntimeContext;
  RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
  this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
  public int getSuperstepNumber() {
  return iterationRuntimeContext.getSuperstepNumber();
  // -----------------------------------------------------------------------------------
  // Unsupported operations
  // -----------------------------------------------------------------------------------
  public <T extends Aggregator<?>> T getIterationAggregator(String name) {
  throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
  public <T extends Value> T getPreviousIterationAggregate(String name) {
  throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
  public class AsyncDataStream {
   * Output mode for asynchronous operations.
  public enum OutputMode { ORDERED, UNORDERED }
  private static final int DEFAULT_QUEUE_CAPACITY = 100;
  private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
  DataStream<IN> in,
  AsyncFunction<IN, OUT> func,
  long timeout,
  int bufSize,
  OutputMode mode) {
  TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
  new int[]{1, 0},
  // create transform
  AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
  return in.transform("async wait operator", outTypeInfo, operator);
  public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
  DataStream<IN> in,
  AsyncFunction<IN, OUT> func,
  long timeout,
  TimeUnit timeUnit,
  int capacity) {
  return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
  public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
  DataStream<IN> in,
  AsyncFunction<IN, OUT> func,
  long timeout,
  TimeUnit timeUnit) {
  return addOperator(
  public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
  DataStream<IN> in,
  AsyncFunction<IN, OUT> func,
  long timeout,
  TimeUnit timeUnit,
  int capacity) {
  return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
  public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
  DataStream<IN> in,
  AsyncFunction<IN, OUT> func,
  long timeout,
  TimeUnit timeUnit) {
  return addOperator(
  flink给外部数据访问提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定义一个实现AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation
  AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载


