Spark on AWS EKS
AWS EKS Cluster에서 Spark Application 동작을 분석한다. AWS EKS Cluster에서 Spark Application을 동작시키기 위해서는 Spark에서 제공하는 spark-submit CLI 및 Spark Operator를 이용하는 방식과 EMR on EKS에서 제공하는 StartJobRun API를 이용하는 방식 2가지가 존재한다.
1. spark-submit CLI & Spark Operator
AWS EKS에서도 일반적인 Kubernetes Cluster처럼 spark-submit CLI 및 Spark Operator를 이용하여 Spark Application을 동작시킬 수 있다. 이 경우 Architecture 및 동작 방식은 다음의 Link의 내용처럼 일반적인 Kubernetes Cluster에서 spark-submit CLI 및 Spark Operator를 이용하는 방식과 동일하다.
다만 AWS EKS에서는 Driver, Executor Pod의 Container Image를 EMR on EKS Spark Container Image로 이용하는 것을 권장한다. EMR on EKS Spark Container Image에는 EKS 환경에 최적화된 Optimized Spark가 내장되어 있어 Open Source Spark 대비 더 빠른 성능을 보이며, 아래에 명시된 AWS와 연관된 Library 및 Spark Connector가 포함되어 있기 때문이다.
- EMRFS S3-optimized comitter
- AWS Redshift용 Spark Connector : Spark Application에서 AWS Redshift 접근시 이용
- AWS SageMaker용 Spark Library : Spark Application의 DataFrame에 저장되어 있는 Data를 바로 AWS SageMaker를 통해서 Training 수행 가능
EMR on EKS Spark Container Image는 Public AWS ECR에 공개되어 있다. Spark Application에서 고유한 Library 및 Spark Connector를 이용하는 경우 Custom Container Image를 구축해야 하는데, 이 경우에도 EMR on EKS Spark Container Image를 Base Image로 이용하는 것을 권장한다.
2. StartJobRun API
StartJobRun API는 EMR on EKS 환경에서 Spark Job을 제출하는 API이다. StartJobRun API를 이용하기 위해서는 AWS EMR에서 관리하는 가상의 Resource인 Virtual Cluster를 생성해야 한다. Virtual Cluster를 생성하기 위해서는 EKS Cluster에 존재하는 하나의 Namespace가 필요하다. 하나의 EKS Cluster에 다수의 Namespace를 생성하고 다수의 Virtual Cluster를 각 Namespace에 Mapping하여 하나의 EKS Cluster에서 다수의 Virtual Cluster를 운영할 수 있다.
[Figure 1]은 하나의 Virtual Cluster가 있는 EKS Cluster에 StartJobRun API를 통해서 Spark Job을 제출할 경우의 Architecture를 나타내고 있다. StartJobRun API를 호출하면 Virtual Cluster와 Mapping 되어 있는 Namespace에 job-runner Pod가 생성되며, job-runner Pod 내부에서 spark-submit CLI가 동작한다. 즉 StartJobRun API 방식도 내부적으로는 spark-submit CLI를 이용하여 Spark Job을 제출하는 방식이다.
$ aws emr-containers start-job-run \
--virtual-cluster-id [virtual-cluster-id] \
--name=pi \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[account-id]:role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}'
[Shell 1]은 aws CLI를 활용하여 StartJobRun API를 통해서 Spark Job을 제출하는 예제를 나타내고 있다. Virtual Cluster, 제출한 Spark Job의 이름, AWS Region, Spark Job 실행을 위한 Role, Spark 관련 설정들이 명시되어 있는것을 확인할 수 있다. sparkSubmitParameters
항목에는 spark-submit CLI를 통해서 전달하는 --conf
Parameter들이 설정되어 있는것도 확인할 수 있다.
|
|
|
|
|
|
job-runner Pod 내부의 spark-submit CLI는 job-runner Pod에 붙는 ConfigMap 기반의 File을 통해서 Spark Job 생성에 필요한 각종 설정 정보들을 얻는다. ConfigMap은 StartJobRun API의 설정에 따라서 AWS EMR에서 job-runner Pod가 생성 되기전에 생성된다. 설정 정보에는 Driver Pod, Executor Pod 관련 설정들이 포함되어 있다. [Shell 1] 명령어를 실행하면 [File 1], [File 2], [File 3] 3개의 ConfigMap이 생성된다.
[File 1]은 spark-submit CLI에게 Spark Job 설정을 전달하기 위한 spark-defauls.conf ConfigMap, [File 2]는 spark-submit CLI에게 전달하기 위한 Pod Template을 위한 ConfigMap, [File 3]은 Driver Pod에 설정되는 fluentd를 위한 ConfigMap을 나타내고 있다. StartJobRun API를 통해서 Spark Job을 제출하면 Driver Pod에는 반드시 fluentd Sidecar Container가 생성이 된다. 이유는 spark-submit CLI가 [File 1], [File 2], [File 3] ConfigMap을 통해서 spark-submit CLI의 Pod Template 기능을 통해서 fluentd Container를 Driver의 Sidecar Container로 생성하기 때문이다.
[File 3] fluentd ConfigMap의 설정을 살펴보면 prod.ap-northeast-2.appinfo.src
Bucket에 Driver Pod에서 발생하는 Event Log를 저장하는 것을 확인할 수 있다. appinfo.src
Bucket은 AWS EMR에서 관리하는 Bucket이며, EMR이 관리하는 Spark History Server와 연동되어 사용자에게 SparkJobRun API를 통해서 제출된 Spark Job의 History를 확인할 수 있게 설정된다. 물론 --conf spark.eventLog.dir=s3a://][s3-bucket]
설정을 명시하여 사용자가 원하는 경로에 Event Log를 적제하도록 설정도 가능하다.
$ aws emr-containers start-job-run \
--virtual-cluster-id jk518skp01ys9ka8b0npx9nt0 \
--name=pi-logs \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[account-id]:role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.driver.cores=1 --conf spark.driver.memory=512M --conf spark.executor.instances=1 --conf spark.executor.memory=512M --conf spark.executor.cores=1"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"persistentAppUI": "ENABLED",
"cloudWatchMonitoringConfiguration": {
"logGroupName": "spark-startjobrun",
"logStreamNamePrefix": "pi-logs"
},
"s3MonitoringConfiguration": {
"logUri": "s3://ssup2-spark/startjobrun/"
}
}
}'
StartJobRun API는 job-runner, driver, executor Pod의 stdout/stderr를 CloudWatch 또는 S3에 간편하게 전송해주는 기능도 제공하고 있다. [Shell 2]는 Logging 설정과 함께 aws CLI를 통해서 StartJobRun API를 통해서 Spark Job을 제출하는 예제를 나타내고 있다. [Shell 1]과 비교하면 monitoringConfigruation
설정이 추가된 것을 확인할 수 있으며 하위에 CloudWatch와 S3 설정이 각각 존재하는것을 확인할 수 있다.
|
|
|
|
|
|
[Shell 2] 명령어를 실행하면 [File 4], [File 5], [File 6] 3개의 ConfigMap이 생성된다. job-runner Pod 뿐만 아니라, Driver Pod, Executor Pod 또한 fluentd가 실행 되도록 설정되어 있는것을 확인할 수 있으며, Drvier Pod, Executor Pod에서 실행되는 fluentd는 stdout/stderr를 CloudWatch 또는 S3로 전송되도록 설정되어 있는것을 확인할 수 있다.
$ aws emr-containers start-job-run \
--virtual-cluster-id jk518skp01ys9ka8b0npx9nt0 \
--name=pi \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[account-id]:role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"job-start-timeout":"1800",
"spark.ui.prometheus.enabled":"true",
"spark.executor.processTreeMetrics.enabled":"true",
"spark.kubernetes.driver.annotation.prometheus.io/scrape":"true",
"spark.kubernetes.driver.annotation.prometheus.io/path":"/metrics/executors/prometheus/",
"spark.kubernetes.driver.annotation.prometheus.io/port":"4040"
}
}
]
}'
StartJobRun API를 통해서도 spark-submit CLI에서 설정 가능한 다양한 설정들을 동일하게 설정이 가능하다. [Shell 3]은 Promethues로 Monitoring 수행을 위한 예제를 나타내고 있다.
2.1. with ACK EMR Container Controller
AWS에서는 StartJobRun API를 기반으로 Spark Job을 제출하는 방식을 Kubernetes의 Object를 활용할 수 있도록 ACK EMR Container Controller를 제공하고 있다. [Figure 2]는 ACK EMR Container Controller를 기반으로 StartJobRun API를 통해서 Spark Job을 제출하는 과정을 나타내고 있다.
ACK EMR Container Controller를 AWS EKS Cluster에 설치하면 Virtual Cluster
와 Job Run
두 가지 Custom Resource 이용이 가능해진다. Virtual Cluster
는 ACK EMR Container Controller가 설치된 AWS EKS Cluster의 특정 Namespace를 대상으로 EMR on EKS의 Virtual Cluster를 설정하는데 이용하는 Custome Resource이며, Job Run
은 StartJobRun API를 통해서 Spark Job을 제출하는 경우 이용하는 Custom Resource이다.
|
|
|
|
[File 7]은 단순한 Job Run의 예제를 나타내고 있으며, [File 8]은 Logging 설정이 적용된 Job Run의 예제를 나타내고 있다. 설정 값들을 살펴보면 aws CLI를 통해서 설정하는 옵션들을 동일하게 Job Run에 설장할 수 있는것을 확인할 수 있다.
3. 참조
- EMR on EKS Container Image : https://gallery.ecr.aws/emr-on-eks
- StartJobRun Parameter : https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-jobs-CLI.html#emr-eks-jobs-parameters