Docs
uhadoop
Developer Guide
Flink Development Guide

Flink Operation Guide

1. Basic Introduction

Apache Flink is a distributed processing engine and framework for stateful computations over bounded and unbounded data streams. Flink is designed to run in all common cluster environments, performing computations at *arbitrary scale and in-memory speed. For more details, see Appache Flink.

2. Job Submission Mode

Flink in UHadoop uses Flink on Yarn mode for deployment. You can SSH into any node of the UHadoop cluster and submit Flink jobs from the command line.

Flink on Yarn mode provides Session, Per-Job Cluster, and Application modes to submit jobs, as explained below:

ModeDescriptionAdvantages and Disadvantages
SessionThis mode creates a Flink cluster, all jobs will be submitted to this cluster to run, and the cluster will not be automatically released even after the jobs end. If some job encounters an exception causing a Task Manager to shut down, other jobs running on this Task Manager will fail as well.Advantages: Low resource allocation overhead for job submission.
Disadvantages: There is resource competition between jobs.
Applicable scenarios: Deploy jobs that need shorter startup time and relatively short running time.
Per-Job ClusterIn this mode, each time a Flink job is submitted, YARN will start a new Flink cluster for this job and run the job. When the job ends or is cancelled, the Flink cluster belonging to the job will also be released.Advantages: Resource isolation between jobs. The abnormal behavior of a job will not affect other jobs.
Disadvantages: Larger overhead for starting jobs.
Applicable scenarios: Jobs with longer running times.
ApplicationIn this mode, each submission of a Flink Application (an Application contains one or more jobs), YARN will start a new Flink cluster for this Application. When the Application ends or is cancelled, the Flink cluster belonging to the Application will also be released.
The difference between this mode and the Per-Job mode is that the main() method in the JAR file corresponding to the Application is executed in the Job Manager in the cluster.

If the submitted JAR file contains multiple jobs, these jobs will execute in the cluster belonging to the Application.
Advantages: Can alleviate the burden of the client when submitting jobs.
Disadvantages: Larger overhead for starting jobs.
Applicable scenarios: Jobs with longer running times.

3. Job Submission

Prerequisite: Login to the cluster node via SSH.

3.1 Session Mode

  • Execute the following command to start a yarn session

    yarn-session.sh --detached
    
  • Execute the following command to submit a job

    flink run /home/hadoop/flink/examples/streaming/TopSpeedWindowing.jar
    

    After successful submission, the Yarn ApplicationId corresponding to the job will be returned (used in subsequent steps), as shown in the red field below:

3.2 Per-Job Mode

  • Execute the following command to submit a job

    flink run -t yarn-per-job --detached /home/hadoop/flink/examples/streaming/TopSpeedWindowing.jar
    

    After successful submission, the Yarn ApplicationId corresponding to the job will be returned (used in subsequent steps), as shown in the red field below:

    If an error occurs during task submission: Exception in thread "Thread-6" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.

​ Add classloader.check-leaked-classloader: false in the /home/hadoop/flink/conf/flink-conf.yaml file and resubmit the task.

3.3 Application Mode

  • Execute the following command to submit a job

    flink run-application -t yarn-application /home/hadoop/flink/examples/streaming/TopSpeedWindowing.jar
    

    After successful submission, the Yarn ApplicationId corresponding to the job will be returned (used in subsequent steps), as shown in the red field below:

4. Job Operations

4.1 View Jobs

  • Visit Yarn WebUI, for accessing methods refer to UHadoop Service WebUI Access Guide, select the corresponding task, and click the task ID.

  • After entering the new page, click the Tracking URL link:

  • Enter the Flink page to view job-related information.

Note: For completed jobs, logs need to be viewed separately, as follows:

  • Visit Yarn WebUI, for accessing methods refer to UHadoop Service WebUI Access Guide, select the corresponding task, and click the task ID.

    You can view the job’s logs, as follows:

4.2 Stop Jobs

You can execute the following command to stop jobs:

flink cancel -t yarn-application -Dyarn.application.id=<applicationId> <jobId>

4.3 Specify Job Configuration

You can specify job configurations in the following ways:

  • Specify in the code: You can directly specify the configuration in the code, referred as below:

    // instantiate table environment
    Configuration configuration = new Configuration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true");
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    
    ...
    

    For more details, refer to Flink Configuration.

  • Set in the job submission command, you can specify the configuration item through -D, as follows:

    flink run-application -t yarn-application -D classloader.check-leaked-classloader=false
    
  • Modify the Flink configuration file /home/hadoop/flink/conf/flink-conf.yaml, modify or add configuration items.