개요
Flink에서 Execution Environment는 Flink Application을 개발할 때 가장 먼저 획득하는 인스턴스의 클래스입니다. 이 글에서는 Flink에서 Execution Environment의 개념을 용어 중심으로 파고들어 Flink의 아키텍처를 이해해 보겠습니다.
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
정의
TheStreamExecutionEnvironment
is the context in which a streaming program is executed.
ALocalStreamEnvironment
will cause execution in the current JVM, aRemoteStreamEnvironment
will cause execution on a remote setup.The environment provides methods to control the job execution (such as setting the parallelism or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
StreamExecutionEnvironment
는 스트리밍 프로그램이 실행되는 컨텍스트입니다.LocalStreamEnvironment
은 현재 JVM에서 실행되고,RemoteStreamEnvironment
은 원격으로 설치된 환경에서 실행됩니다. 이 환경은 작업 실행을 제어(병렬 처리 또는 내결함성/체크포인트 매개변수 설정 등)하고 외부 세계와 상호 작용(데이터 액세스)하는 메서드를 제공합니다.
Flink API 문서 - StreamingExecutionEnvironment
Execution Environment는 프로그램이 실행되는 컨텍스트이다. 즉, Execution Environment는 Context이다.
Context란 무엇인가?
A system contains data, which must be generally available to divergent parts of the system, but we wish to avoid using long parameter lists to functions or global data; therefore, we place the necessary data in a Context container and pass this object from function to function.
시스템에는 시스템의 다양한 부분에서 일반적으로 사용할 수 있어야 하는 데이터가 포함되어 있지만 함수나 전역 데이터에 긴 매개변수 목록을 사용하는 것을 피하고 싶기 때문에 필요한 데이터를 Context 컨테이너에 배치하고 이 객체를 함수에서 함수로 전달합니다.
Core J2EE Patterns - Context Object
Encapsulate the context (state and behaviors) relevant to the user or the request being processed in order to decouple application components from the complexities of the environment.
사용자 또는 처리 중인 요청과 관련된 컨텍스트(상태 및 동작)를 캡슐화하여 애플리케이션 구성 요소를 환경의 복잡성으로부터 분리합니다.
Java Design Patterns - Context Object
Context Object는 애플리케이션 전체 또는 일부에 공유되는 정보를 캡슐화한 객체이다.
- 사용하는 이유
- 공유되는 정보를 매번 파라미터로 넘기는 불편함을 줄이기 위해서
- 전역 상태를 중앙 집중화해서 관리하기 위해서(코드 변경 지점을 하나로 모음)
- 쉽게 말해서 전역 변수처럼 값을 공유하기 위해서 쓴다고 생각하면 된다
- 이점
- Dependency Injection으로 환경 정보가 바뀔 때 기존 코드의 변경을 줄일 수 있다
- 주의점
- 잘못 사용하면 많은 책임을 갖는 복잡한 객체가 되어 코드를 이해하기 어렵게 만들 수도 있다
- 객체의 책임(Single Responsibility Principle)를 고려하여 주의 깊게 설계해야 한다
- 잘못 사용하면 많은 책임을 갖는 복잡한 객체가 되어 코드를 이해하기 어렵게 만들 수도 있다
- 구현
- 상태를 공유한다는 특징 때문에 주로 Singleton Pattern으로 생성한다.
- 공통된 상태가 한 개의 인스턴스로 존재하고, 같은 인스턴스를 공유하기 위해
- 상태를 공유한다는 특징 때문에 주로 Singleton Pattern으로 생성한다.
.getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned by createLocalEnvironment().
Returns:The execution environment of the context in which the program is executed.
프로그램이 현재 실행되고 있는 컨텍스트를 나타내는 실행 환경을 생성합니다. 프로그램이 독립적으로 호출되는 경우, 이 메서드는 createLocalEnvironment()가 반환한 로컬 실행 환경을 반환합니다.
return:
프로그램이 실행되는 컨텍스트의 실행 환경입니다.
.getExecutionEnvironment()
는 프로그램이 현재 실행되는 Context Object를 반환한다. Flink Application이 독립적으로 실행(IDE에서 실행. 다른 Flink 클러스터에 제출하지 않고 바로 실행)되면 LocalStreamEnvironment
Context를, Flink Cluster에서 실행되면 RemoteStreamEnvironment
Context를 반환한다.
Flink Application이란?
A Flink application is a Java Application that submits one or multiple Flink Jobs from the main() method (or by some other means). Submitting jobs is usually done by calling execute() on an execution environment.
The jobs of an application can either be submitted to a long running Flink Session Cluster, to a dedicated Flink Application Cluster, or to a Flink Job Cluster.
Flink 애플리케이션은 main() 메서드에서(또는 다른 방법으로) 하나 이상의 Flink Job을 제출하는 Java 애플리케이션입니다. 작업 제출은 일반적으로 실행 환경에서 execute()를 호출하여 수행됩니다. 애플리케이션의 작업은 장기 실행 중인 Flink 세션 클러스터, 전용 Flink 애플리케이션 클러스터 또는 Flink 잡 클러스터에 제출할 수 있습니다.
앞선 예시처럼 Flink를 사용하기 위해서 작성한 Java 프로그램이 Flink Application이다. 이 애플리케이션의 역할은 Flink Job을 Flink Cluster에 제출하는 것이다. Flink Job을 제출하는 것은 코드의 env.execute()
로 실행된다
Flink Job이란?
A Flink Job is the runtime representation of a logical graph (also often called dataflow graph) that is created and submitted by calling execute() in a Flink Application.
Flink Job은 Flink 애플리케이션에서 execute()을 호출하여 생성하고 제출하는 logical graph(데이터 흐름 그래프라고도 함)의 런타임 표현입니다.
Logical Graph
A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets. A logical graph is created by submitting jobs from a Flink Application.
Logical graphs are also often referred to as dataflow graphs.
논리 그래프는 노드가 연산자이고 간선이 연산자의 입력/출력 관계를 정의하며 데이터 스트림 또는 데이터 세트에 해당하는 방향성 그래프입니다. 논리 그래프는 Flink 애플리케이션에서 작업을 제출하여 생성됩니다. 논리 그래프는 데이터 흐름 그래프라고도 합니다.
Operator
Node of a Logical Graph. An Operator performs a certain operation, which is usually executed by a Function. Sources and Sinks are special Operators for data ingestion and data egress.
논리 그래프의 노드입니다. 연산자는 특정 연산을 수행하며, 일반적으로 함수에 의해 실행됩니다. 소스와 싱크는 데이터 수집과 데이터 송출을 위한 특수 연산자입니다.
정리하면, Flink Job은 위 그림과 같은 그래프이다. 코드로 source를 추가하고, map이나 filter로 변환하고, sink에 저장하는 과정을 거친 뒤 env.execute()
를 호출하면 이 (자료구조) 그래프가 Flink cluster에 제출된다.
Flink Cluster란?
A distributed system consisting of (typically) one JobManager and one or more Flink TaskManager processes.
(일반적으로) 하나의 JobManager와 하나 이상의 Flink TaskManager 프로세스로 구성된 분산 시스템입니다.
위 그림처럼 JobManager와 TaskManager로 이루어진 시스템이다. 이처럼 Flink는 분산 시스템이어서 여러 개의 서비스로 구동된다. 이 여러 서비스의 묶음을 Cluster라고 한다. 이 서비스가 여러 컴퓨터에서 구동될 수 있어서 Computer Cluster와 유사한 용어를 사용한 것으로 추정된다.
Flink Application의 정의에 나오는 Flink Session Cluster, Flink Application Cluster, Flink Job Cluster는 모두 Flink Cluster의 하위 분류이다.
Flink Session Cluster
A long-running Flink Cluster which accepts multiple Flink Jobs for execution. The lifetime of this Flink Cluster is not bound to the lifetime of any Flink Job. Formerly, a Flink Session Cluster was also known as a Flink Cluster in session mode. Compare to Flink Application Cluster.
실행을 위해 여러 개의 플링크 잡을 수락하는 장기 실행 플링크 클러스터입니다. 이 Flink 클러스터의 수명은 Flink 잡의 수명에 구속되지 않습니다. 이전에는 Flink 세션 클러스터를 세션 모드의 Flink 클러스터라고도 했습니다. Flink 애플리케이션 클러스터와 비교하세요.
= 계속 켜져 있으면서 여러 개의 Flink Job을 받는 클러스터.
Flink Application Cluster
A Flink Application Cluster is a dedicated Flink Cluster that only executes Flink Jobs from one Flink Application. The lifetime of the Flink Cluster is bound to the lifetime of the Flink Application.
플링크 애플리케이션 클러스터는 하나의 플링크 애플리케이션에서만 플링크 Job을 실행하는 전용 플링크 클러스터입니다. 플링크 클러스터의 수명은 플링크 애플리케이션의 수명에 묶여 있습니다.
= 1 애플리케이션 전용 클러스터. 플링크 애플리케이션이 종료되면 클러스터도 종료된다. 아래의 Job Cluster와 다른 점은 하나의 애플리케이션에 여러 개의 job이 있어도(env.execute()
가 여러 번 호출되어도) flink application cluster의 조건을 만족한다.
Flink Job Cluster
A Flink Job Cluster is a dedicated Flink Cluster that only executes a single Flink Job. The lifetime of the Flink Cluster is bound to the lifetime of the Flink Job. This deployment mode has been deprecated since Flink 1.15.
플링크 잡 클러스터는 하나의 플링크 잡만 실행하는 전용 플링크 클러스터입니다. 플링크 클러스터의 수명은 플링크 잡의 수명에 묶여 있습니다. 이 배포 모드는 Flink 1.15부터 더 이상 사용되지 않습니다.
= 1 Job 전용 클러스터. Job 하나가 종료되면 플링크 클러스터가 종료된다.
LocalStreamEnvironment
The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally, multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded Flink cluster in the background and executes the program on that cluster.
로컬 스트림 환경은 환경이 인스턴스화된 JVM에서 프로그램을 로컬로 멀티스레드로 실행하는 StreamExecutionEnvironment입니다. 이 환경은 백그라운드에서 임베디드 Flink 클러스터를 스폰하고 해당 클러스터에서 프로그램을 실행합니다.
.createLocalEnvironment()
Creates a LocalStreamEnvironment. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. The default parallelism of the local environment is the number of hardware contexts (CPU cores / threads), unless it was specified differently by
setParallelism(int).
로컬 스트림 환경을 생성합니다. 로컬 실행 환경은 환경이 생성된 것과 동일한 JVM에서 멀티 스레드 방식으로 프로그램을 실행합니다. 로컬 환경의 기본 병렬 처리는 하드웨어 컨텍스트 수(CPU 코어/스레드)입니다. setParallelism(int)으로 다르게 설정할 수 있습니다.
Local Execution Environment는 env 인스턴스를 생성한 JVM에서 임베디드 Flink 클러스터를 시작하고, 같은 JVM 프로세스에 임베드된 Flink cluster에서 프로그램을 실행하는 Context이다. 로컬에서 디버깅을 할 때 IDE에서 run 버튼을 누르면 적용되는 환경이다. flink를 별도로 설치하지 않아도 embedded flink cluster가 실행되어 breakpoint를 사용해서 디버깅할 수 있다. #
아래 그림으로 설명하면 "env 인스턴스를 생성한 JVM"은 Client 서비스이다. Job을 실행하는 서비스는 JobManager와 TaskManager로 구성된 Flink 클러스터이다. 원래 Client와 (JobManager와 같은) Flink cluster는 다른 서비스이고, 다른 프로세스다. 심지어는 다른 컴퓨터에 존재할 수도 있는 서비스다. 그런데 디버깅을 위해서 같은 JVM 프로세스 하나에서 Job 생성과 실행을 모두 하기 때문에 특별하다. 이는 하나의 컴퓨터에 Flink Cluster가 여러 개의 프로세스로 실행되는 standalone 모드와도 다르다.
RemoteStreamEnvironment
A StreamExecutionEnvironment for executing on a cluster.
클러스터에서 실행하기 위한 스트림 실행 환경입니다.
Flink API 문서 - RemoteStreamEnvironent
RemoteStreamEnvironment
는 여러 프로세스로 이루어진 원격 Flink Cluster에서 Job(=Graph) 를 실행하는 Context이다.
프로덕션 환경에서 쓰는 Standalone, Kubernetes, YARN 등 디버깅할 때 쓰는 JVM 임베드 환경을 제외한 프로덕션용 Context이다.
Standalone mode란?
This Getting Started section guides you through the local setup (on one machine, but in separate processes) of a Flink cluster. This can easily be expanded to set up a distributed standalone cluster, which we describe in the reference section.
Introduction
The standalone mode is the most barebone way of deploying Flink: The Flink services described in the deployment overview are just launched as processes on the operating system. Unlike deploying Flink with a resource provider such as Kubernetes or YARN, you have to take care of restarting failed processes, or allocation and de-allocation of resources during operation.
이 시작하기 섹션에서는 Flink 클러스터의 로컬 설정(한 컴퓨터에서 별도의 프로세스로)을 안내합니다. 참조 섹션에서 설명하는 분산형 독립 실행형 클러스터를 설정하기 위해 쉽게 확장할 수 있습니다.
소개
독립형 모드는 가장 기본적인 Flink 배포 방식입니다: 배포 개요에서 설명한 Flink 서비스는 운영 체제에서 프로세스로 실행됩니다. Kubernetes나 YARN과 같은 리소스 공급자와 함께 Flink를 배포하는 것과 달리, 실패한 프로세스를 다시 시작하거나 운영 중에 리소스를 할당 및 할당 해제하는 작업을 처리해야 합니다.
Flink Docs - Deployment - Standalone
Flink 문서에 나오는 Standalone mode는 Flink Cluster를 한 머신에서 여러 프로세스로 실행하는 모드이다.
Standalone 모드에서는 여러 서비스가 운영 체제에서 별도의 프로세스로 실행된다. 이 설명이 의미하는 것은 그림에 있는 서비스( JobManager, TaskManager)가 한 컴퓨터에서 별도의 프로세스로 실행된다는 의미이다.
이것이 왜 특징이냐면 Standalone과 대비되는 Kubernetes와 YARN을 이용한 배포는 그림에 있는 여러 서비스를 여러 대의 컴퓨터(노드)에서 실행하기 때문이다. Flink는 분산 처리 엔진이기 때문에 이러한 장점을 살리려면 프로덕션 환경에서는 Kubernetes나 YARN을 이용한 배포를, 개발 환경에서 간단하게 실행할 때는 Standalone 모드로 실행하게 될 것이다.
정리
- Execution Environent
- Flink Application이 실행되는 Context
- Local Environment
- 디버깅용
- IDE에서 run 했을 때 적용되는 모드
- 하나의 JVM 프로세스에 임베드된 Flink Cluster.
- Remote Environment
- 프로덕션. 실제 클러스터에서 여러 프로세스에 배포했을 때 적용됨
- Standalone, YARN, Kubernetes
- Standalone Mode
- 컴퓨터 하나에 여러 프로세스로 배포한 Flink Cluster. 개발자가 직접 배포해 줘야 한다. 프로덕션처럼 여러 프로세스로 분산된 환경을 테스트하기 위해서 사용.
- Kubernetes, YARN
- 프로덕션. 여러 프로세스를 여러 컴퓨터로 분산, 병렬 처리.
참고 자료
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/learn-flink/datastream_api/
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/flink-architecture/
http://www.corej2eepatterns.com/Patterns2ndEd/ContextObject.htm
'Flink' 카테고리의 다른 글
[Flink] Execution Configuration - Job별 실행 설정 (0) | 2024.06.12 |
---|