개요
StreamExecutionEnvrionment에는 런타임에 대한 Job별 구성 값을 설정할 수 있는 ExecutionConfig가 포함되어 있습니다. 모든 작업에 영향을 미치는 기본값은 conf 디렉토리의 Configuration File에서 설정할 수 있다
설정 목록
setClosureCleanerLevel()
: 클로저 클리너 레벨은 기본적으로ClosureCleanerLevel.RECURSIVE
로 설정되어 있습니다. 클로저 클리너는 Flink 프로그램 내부의 익명 함수 주변 클래스에 대한 불필요한 참조를 제거합니다. 클로저 클리너가 비활성화되면, 익명 사용자 함수가 주변 클래스를 참조할 수 있으며, 이는 일반적으로 직렬화할 수 없습니다. 이로 인해 직렬화 도중 예외가 발생할 수 있습니다. 설정값은 다음과 같습니다:NONE
: 클로저 클리너를 완전히 비활성화합니다.TOP_LEVEL
: 필드로 재귀하지 않고 최상위 클래스만 정리합니다.RECURSIVE
: 모든 필드를 재귀적으로 정리합니다.
getParallelism()
/setParallelism(int parallelism)
: 작업의 기본 병렬 처리 수준을 설정합니다.getMaxParallelism()
/setMaxParallelism(int parallelism)
: 작업의 기본 최대 병렬 처리 수준을 설정합니다. 이 설정은 최대 병렬 처리 수준을 결정하고 동적 스케일링의 상한선을 지정합니다.getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
: 실패한 작업을 재실행하는 횟수를 설정합니다. 값이 0이면 내결함성이 효과적으로 비활성화됩니다. 값이 -1이면 시스템 기본값(구성에 정의된 대로)을 사용해야 함을 나타냅니다. 이는 더 이상 사용되지 않으며, 대신 재시작 전략을 사용하세요.getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
: 작업이 실패한 후 재실행하기 전에 시스템이 대기하는 지연 시간(밀리초)을 설정합니다. 지연은 모든 작업이 TaskManager에서 성공적으로 중지된 후에 시작되며, 지연이 지나면 작업이 재시작됩니다. 이 매개변수는 재실행을 지연시켜 특정 시간 초과 관련 오류(완전히 시간 초과되지 않은 손상된 연결 등)가 완전히 표면화되도록 한 다음, 재실행을 시도하고 동일한 문제로 인해 즉시 실패하는 것을 방지하는 데 유용합니다. 이 매개변수는 실행 재시도 횟수가 1회 이상인 경우에만 효과가 있습니다. 이는 더 이상 사용되지 않으며, 대신 재시작 전략을 사용하세요.getExecutionMode()
/setExecutionMode()
: 기본 실행 모드는PIPELINED
입니다. 프로그램을 실행할 실행 모드를 설정합니다. 실행 모드는 데이터 교환이 배치 방식으로 수행되는지 아니면 파이프라인 방식으로 수행되는지를 정의합니다.enableForceKryo()
/disableForceKryo
: Kryo는 기본적으로 강제 적용되지 않습니다. POJO를 분석할 수 있더라도 GenericTypeInformation이 POJO에 대해 Kryo 직렬화기를 사용하도록 강제합니다. 경우에 따라 이것이 더 바람직할 수 있습니다. 예를 들어, Flink의 내부 직렬화기가 POJO를 제대로 처리하지 못할 때 사용할 수 있습니다.enableForceAvro()
/disableForceAvro()
: Avro는 기본적으로 강제 적용되지 않습니다. Flink AvroTypeInfo가 Avro POJO를 직렬화할 때 Kryo 대신 Avro 직렬화기를 사용하도록 강제합니다.enableObjectReuse()
/disableObjectReuse()
: 기본적으로 Flink에서는 객체가 재사용되지 않습니다. 객체 재사용 모드를 활성화하면 런타임이 성능 향상을 위해 사용자 객체를 재사용하도록 지시합니다. 작업의 사용자 코드 함수가 이러한 동작을 인식하지 못하면 버그가 발생할 수 있습니다.getGlobalJobParameters()
/setGlobalJobParameters()
: 이 메서드를 사용하면 사용자가 사용자 정의 객체를 작업의 전역 구성으로 설정할 수 있습니다. ExecutionConfig는 모든 사용자 정의 함수에서 액세스할 수 있으므로, 이는 작업에서 구성을 전역적으로 사용할 수 있는 쉬운 방법입니다.addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
: 주어진 유형에 대한 Kryo 직렬화기 인스턴스를 등록합니다.addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
: 주어진 유형에 대한 Kryo 직렬화기 클래스를 등록합니다.registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
: 주어진 유형을 Kryo에 등록하고 해당 유형에 대한 직렬화기를 지정합니다. 유형을 Kryo에 등록하면 해당 유형의 직렬화가 훨씬 더 효율적이 됩니다.registerKryoType(Class<?> type)
: 유형이 Kryo로 직렬화되는 경우, Kryo에 등록되어 태그(정수 ID)만 작성되도록 합니다. 유형이 Kryo에 등록되지 않은 경우, 모든 인스턴스와 함께 전체 클래스 이름이 직렬화되어 I/O 비용이 훨씬 더 높아집니다.registerPojoType(Class<?> type)
: 주어진 유형을 직렬화 스택에 등록합니다. 유형이 결국 POJO로 직렬화되면 해당 유형은 POJO 직렬화기에 등록됩니다. 유형이 Kryo로 직렬화되는 경우, Kryo에 등록되어 태그만 작성되도록 합니다. 유형이 Kryo에 등록되지 않은 경우, 모든 인스턴스와 함께 전체 클래스 이름이 직렬화되어 I/O 비용이 훨씬 더 높아집니다.registerKryoType()
으로 등록된 유형은 Flink의 POJO 직렬화기 인스턴스에서 사용할 수 없습니다.disableAutoTypeRegistration()
: 자동 유형 등록은 기본적으로 활성화되어 있습니다. 자동 유형 등록은 사용자 코드에서 사용되는 모든 유형(하위 유형 포함)을 Kryo 및 POJO 직렬화기에 등록합니다.setTaskCancellationInterval(long interval)
: 실행 중인 작업을 취소하기 위한 연속적인 시도 사이에 대기할 간격(밀리초)을 설정합니다. 작업이 취소되면 작업 스레드가 특정 시간 내에 종료되지 않으면 작업 스레드에서 주기적으로interrupt()
를 호출하는 새 스레드가 생성됩니다. 이 매개변수는interrupt()
에 대한 연속 호출 사이의 시간을 나타내며 기본적으로 30000밀리초(30초)로 설정됩니다.
getRuntimeContext()
메서드를 통해 Rich* 함수에서 액세스할 수 있는 RuntimeContext도 모든 사용자 정의 함수에서 ExecutionConfig에 액세스할 수 있습니다.
출처
'Flink' 카테고리의 다른 글
[Flink] Execution Environment 이해 (0) | 2024.05.27 |
---|