Spark on AWS EKS AWS EKS Cluster에서 Spark Application 동작을 분석한다. AWS EKS Cluster에서 Spark Application을 동작시키기 위해서는 Spark에서 제공하는 spark-submit CLI 및 Spark Operator를 이용하는 방식과 EMR on EKS에서 제공하는 StartJobRun API를 이용하는 방식 2가지가 존재한다.
1. spark-submit CLI & Spark Operator
AWS EKS에서도 일반적인 Kubernetes Cluster처럼 spark-submit CLI 및 Spark Operator를 이용하여 Spark Application을 동작시킬 수 있다. 이 경우 Architecture 및 동작 방식은 다음의 Link 의 내용처럼 일반적인 Kubernetes Cluster에서 spark-submit CLI 및 Spark Operator를 이용하는 방식과 동일하다.
다만 AWS EKS에서는 Driver, Executor Pod의 Container Image를 EMR on EKS Spark Container Image 로 이용하는 것을 권장한다. EMR on EKS Spark Container Image에는 EKS 환경에 최적화된 Optimized Spark가 내장되어 있어 Open Source Spark 대비 더 빠른 성능을 보이며, 아래에 명시된 AWS와 연관된 Library 및 Spark Connector가 포함되어 있기 때문이다.
EMRFS S3-optimized comitter AWS Redshift용 Spark Connector : Spark Application에서 AWS Redshift 접근시 이용 AWS SageMaker용 Spark Library : Spark Application의 DataFrame에 저장되어 있는 Data를 바로 AWS SageMaker를 통해서 Training 수행 가능 EMR on EKS Spark Container Image는 Public AWS ECR 에 공개되어 있다. Spark Application에서 고유한 Library 및 Spark Connector를 이용하는 경우 Custom Container Image를 구축해야 하는데, 이 경우에도 EMR on EKS Spark Container Image를 Base Image로 이용하는 것을 권장한다.
2. StartJobRun API
StartJobRun API는 EMR on EKS 환경에서 Spark Job을 제출하는 API이다. StartJobRun API를 이용하기 위해서는 AWS EMR에서 관리하는 가상의 Resource인 Virtual Cluster 를 생성해야 한다. Virtual Cluster를 생성하기 위해서는 EKS Cluster에 존재하는 하나의 Namespace가 필요하다. 하나의 EKS Cluster에 다수의 Namespace를 생성하고 다수의 Virtual Cluster를 각 Namespace에 Mapping하여 하나의 EKS Cluster에서 다수의 Virtual Cluster를 운영할 수 있다.
[Figure 1] Spark on AWS EKS Architecture with StartJobRun API
[Figure 1]은 하나의 Virtual Cluster가 있는 EKS Cluster에 StartJobRun API를 통해서 Spark Job을 제출할 경우의 Architecture를 나타내고 있다. StartJobRun API를 호출하면 Virtual Cluster와 Mapping 되어 있는 Namespace에 job-runner Pod가 생성되며, job-runner Pod 내부에서 spark-submit CLI가 동작한다. 즉 StartJobRun API 방식도 내부적으로는 spark-submit CLI를 이용 하여 Spark Job을 제출하는 방식이다.
$ aws emr-containers start-job-run \
--virtual-cluster-id [ virtual-cluster-id] \
--name= pi \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[ account-id] :role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}'
[Shell 1] aws CLI StartJobRun API Example [Shell 1]은 aws CLI를 활용하여 StartJobRun API를 통해서 Spark Job을 제출하는 예제를 나타내고 있다. Virtual Cluster, 제출한 Spark Job의 이름, AWS Region, Spark Job 실행을 위한 Role, Spark 관련 설정들이 명시되어 있는것을 확인할 수 있다. sparkSubmitParameters
항목에는 spark-submit CLI를 통해서 전달하는 --conf
Parameter들이 설정되어 있는것도 확인할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
apiVersion : v1
data :
spark-defaults.conf : |
spark.kubernetes.executor.podTemplateValidation.enabled true
spark.executor.extraClassPath \/usr\/lib\/hadoop-lzo\/lib\/*:\/usr\/lib\/hadoop\/hadoop-aws.jar:\/usr\/share\/aws\/aws-java-sdk\/*:\/usr\/share\/aws\/emr\/emrfs\/conf:\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/usr\/share\/aws\/emr\/security\/conf:\/usr\/share\/aws\/emr\/security\/lib\/*:\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar:\/docker\/usr\/lib\/hadoop-lzo\/lib\/*:\/docker\/usr\/lib\/hadoop\/hadoop-aws.jar:\/docker\/usr\/share\/aws\/aws-java-sdk\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/conf:\/docker\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/docker\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/docker\/usr\/share\/aws\/emr\/security\/conf:\/docker\/usr\/share\/aws\/emr\/security\/lib\/*:\/docker\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/docker\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/docker\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/docker\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar
spark.executor.extraLibraryPath \/usr\/lib\/hadoop\/lib\/native:\/usr\/lib\/hadoop-lzo\/lib\/native:\/docker\/usr\/lib\/hadoop\/lib\/native:\/docker\/usr\/lib\/hadoop-lzo\/lib\/native
spark.kubernetes.driver.internalPodTemplateFile \/etc\/spark\/conf\/driver-internal-pod.yaml
spark.resourceManager.cleanupExpiredHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
spark.kubernetes.executor.container.allowlistFile \/etc\/spark\/conf\/executor-pod-template-container-allowlist.txt
spark.kubernetes.executor.container.image 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com\/spark\/emr-6.8.0:latest
spark.history.fs.logDirectory file:\/\/\/var\/log\/spark\/apps
spark.kubernetes.pyspark.pythonVersion 3
spark.driver.memory 1G
spark.master k8s:\/\/https:\/\/kubernetes.default.svc:443
spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions
spark.driver.cores 1
spark.kubernetes.driver.container.image 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com\/spark\/emr-6.8.0:latest
spark.driver.extraLibraryPath \/usr\/lib\/hadoop\/lib\/native:\/usr\/lib\/hadoop-lzo\/lib\/native:\/docker\/usr\/lib\/hadoop\/lib\/native:\/docker\/usr\/lib\/hadoop-lzo\/lib\/native
spark.kubernetes.executor.podTemplateContainerName spark-kubernetes-executor
spark.kubernetes.driver.podTemplateValidation.enabled true
spark.kubernetes.driver.pod.allowlistFile \/etc\/spark\/conf\/driver-pod-template-pod-allowlist.txt
spark.history.ui.port 18080
spark.hadoop.fs.s3.customAWSCredentialsProvider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.blacklist.decommissioning.timeout 1h
spark.driver.defaultJavaOptions -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
spark.hadoop.fs.defaultFS file:\/\/\/
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.dynamicAllocation.enabled false
spark.kubernetes.container.image.pullPolicy Always
spark.kubernetes.driver.podTemplateContainerName spark-kubernetes-driver
spark.eventLog.logBlockUpdates.enabled true
spark.driver.extraClassPath \/usr\/lib\/hadoop-lzo\/lib\/*:\/usr\/lib\/hadoop\/hadoop-aws.jar:\/usr\/share\/aws\/aws-java-sdk\/*:\/usr\/share\/aws\/emr\/emrfs\/conf:\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/usr\/share\/aws\/emr\/security\/conf:\/usr\/share\/aws\/emr\/security\/lib\/*:\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar:\/docker\/usr\/lib\/hadoop-lzo\/lib\/*:\/docker\/usr\/lib\/hadoop\/hadoop-aws.jar:\/docker\/usr\/share\/aws\/aws-java-sdk\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/conf:\/docker\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/docker\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/docker\/usr\/share\/aws\/emr\/security\/conf:\/docker\/usr\/share\/aws\/emr\/security\/lib\/*:\/docker\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/docker\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/docker\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/docker\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar
spark.executor.defaultJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
spark.kubernetes.namespace emr-cli
spark.stage.attempt.ignoreOnDecommissionFetchFailure true
spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds 2000
spark.kubernetes.executor.internalPodTemplateContainerName spark-kubernetes-executor
spark.kubernetes.driver.container.allowlistFile \/etc\/spark\/conf\/driver-pod-template-container-allowlist.txt
spark.kubernetes.executor.pod.allowlistFile \/etc\/spark\/conf\/executor-pod-template-pod-allowlist.txt
spark.eventLog.dir file:\/\/\/var\/log\/spark\/apps
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.executor.memory 1G
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.kubernetes.executor.internalPodTemplateFile \/etc\/spark\/conf\/executor-internal-pod.yaml
spark.decommissioning.timeout.threshold 20
spark.executor.cores 1
spark.hadoop.dynamodb.customAWSCredentialsProvider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.kubernetes.driver.internalPodTemplateContainerName spark-kubernetes-driver
spark.submit.deployMode cluster
spark.authenticate true
spark.blacklist.decommissioning.enabled true
spark.eventLog.enabled true
spark.shuffle.service.enabled false
spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T13:12:38Z"
labels :
emr-containers.amazonaws.com/job.configuration : spark-defaults
emr-containers.amazonaws.com/job.id : 0000000337okspsc913
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : 0000000337okspsc913-spark-defaults
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337okspsc913
uid : 77ead808-24e6-4c20-b02b-1b6db154674f
resourceVersion : "68634058"
uid : 14325ad5-cd76-4b32-98f4-599ee07be86f
[File 1] spark-default ConfigMap 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
apiVersion : v1
data :
driver : |-
apiVersion: v1
kind: Pod
metadata:
ownerReferences:
- apiVersion: batch/v1
blockOwnerDeletion: true
controller: true
kind: ConfigMap
name: 0000000337omlr0m19o-spark-defaults
uid: cfa61687-9915-4966-a02c-ead252e87f8a
spec:
serviceAccountName: emr-containers-sa-spark-driver-[account-id]-j3uv6jk0kk3sogu231qj91fmo3mvwfl561
volumes:
- name: emr-container-communicate
emptyDir: {}
- name: config-volume
configMap:
name: fluentd-jk518skp01ys9ka8b0npx9nt0-0000000337omlr0m19o
- name: emr-container-s3
secret:
secretName: emr-containers-s3-jk518skp01ys9ka8b0npx9nt0-0000000337omlr0m19o
- name: emr-container-application-log-dir
emptyDir: {}
- name: emr-container-event-log-dir
emptyDir: {}
- name: temp-data-dir
emptyDir: {}
- name: mnt-dir
emptyDir: {}
- name: home-dir
emptyDir: {}
- name: 0000000337omlr0m19o-spark-defaults
configMap:
name: 0000000337omlr0m19o-spark-defaults
securityContext:
fsGroup: 65534
containers:
- name: spark-kubernetes-driver
image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/spark/emr-6.8.0:latest
imagePullPolicy: Always
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 1000
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
volumeMounts:
- name: emr-container-communicate
mountPath: /var/log/fluentd
readOnly: false
- name: emr-container-application-log-dir
mountPath: /var/log/spark/user
readOnly: false
- name: emr-container-event-log-dir
mountPath: /var/log/spark/apps
readOnly: false
- name: temp-data-dir
mountPath: /tmp
readOnly: false
- name: mnt-dir
mountPath: /mnt
readOnly: false
- name: home-dir
mountPath: /home/hadoop
readOnly: false
- name: 0000000337omlr0m19o-spark-defaults
mountPath: /usr/lib/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
readOnly: false
env:
- name: SPARK_CONTAINER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SIDECAR_SIGNAL_FILE
value: /var/log/fluentd/main-container-terminated
- name: K8S_SPARK_LOG_ERROR_REGEX
value: (Error|Exception|Fail)
- name: TERMINATION_ERROR_LOG_FILE_PATH
value: /var/log/spark/error.log
terminationMessagePath: /var/log/spark/error.log
- name: emr-container-fluentd
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 1000
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/fluentd/emr-6.8.0:latest
imagePullPolicy: Always
resources:
requests:
memory: 200Mi
limits:
memory: 512Mi
env:
- name: SPARK_CONTAINER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SPARK_ROLE
valueFrom:
fieldRef:
fieldPath: metadata.labels['spark-role']
- name: K8S_SPARK_LOG_URL_STDERR
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stderr"
- name: K8S_SPARK_LOG_URL_STDOUT
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stdout"
- name: SIDECAR_SIGNAL_FILE
value: /var/log/fluentd/main-container-terminated
- name: FLUENTD_CONF
value: fluent.conf
- name: K8S_SPARK_EVENT_LOG_DIR
value: /var/log/spark/apps
- name: AWS_REGION
value: ap-northeast-2
- name: RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR
value: 0.9
volumeMounts:
- name: config-volume
mountPath: /etc/fluent/fluent.conf
subPath: driver
readOnly: false
- name: emr-container-s3
mountPath: /var/emr-container/s3
readOnly: true
- name: emr-container-communicate
mountPath: /var/log/fluentd
readOnly: false
- name: emr-container-application-log-dir
mountPath: /var/log/spark/user
readOnly: false
- name: emr-container-event-log-dir
mountPath: /var/log/spark/apps
readOnly: false
- name: temp-data-dir
mountPath: /tmp
readOnly: false
- name: home-dir
mountPath: /home/hadoop
readOnly: false
driver-container-allowlist : |-
container.env
container.envFrom
container.name
container.lifecycle
container.livenessProbe
container.readinessProbe
container.resources
container.startupProbe
container.stdin
container.stdinOnce
container.terminationMessagePath
container.terminationMessagePolicy
container.tty
container.volumeDevices
container.volumeMounts
container.workingDir
driver-pod-allowlist : |-
pod.apiVersion
pod.kind
pod.metadata
pod.spec.activeDeadlineSeconds
pod.spec.affinity
pod.spec.containers
pod.spec.enableServiceLinks
pod.spec.ephemeralContainers
pod.spec.hostAliases
pod.spec.hostname
pod.spec.imagePullSecrets
pod.spec.initContainers
pod.spec.nodeName
pod.spec.nodeSelector
pod.spec.overhead
pod.spec.preemptionPolicy
pod.spec.priority
pod.spec.priorityClassName
pod.spec.readinessGates
pod.spec.restartPolicy
pod.spec.runtimeClassName
pod.spec.schedulerName
pod.spec.subdomain
pod.spec.terminationGracePeriodSeconds
pod.spec.tolerations
pod.spec.topologySpreadConstraints
pod.spec.volumes
executor : |
apiVersion: v1
kind: Pod
spec:
serviceAccountName: emr-containers-sa-spark-executor-[account-id]-j3uv6jk0kk3sogu231qj91fmo3mvwfl561
volumes:
- name: 0000000337omlr0m19o-spark-defaults
configMap:
name: 0000000337omlr0m19o-spark-defaults
- name: emr-container-communicate
emptyDir: {}
- name: emr-container-application-log-dir
emptyDir: {}
- name: emr-container-event-log-dir
emptyDir: {}
- name: temp-data-dir
emptyDir: {}
- name: mnt-dir
emptyDir: {}
- name: home-dir
emptyDir: {}
securityContext:
fsGroup: 65534
containers:
- name: spark-kubernetes-executor
image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/spark/emr-6.8.0:latest
imagePullPolicy: Always
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 1000
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
env:
- name: SPARK_CONTAINER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SIDECAR_SIGNAL_FILE
value: /tmp/main-container-terminated
- name: SIDECAR_ERROR_FOLDER_PATH
value: /var/log/fluentd/fluentd-error/
- name: EXEC_POD_CPU_REQUEST
valueFrom:
resourceFieldRef:
containerName: spark-kubernetes-executor
resource: requests.cpu
divisor: 1
- name: EXEC_POD_CPU_LIMIT
valueFrom:
resourceFieldRef:
containerName: spark-kubernetes-executor
resource: limits.cpu
divisor: 1
- name: EXEC_POD_MEM_REQUEST
valueFrom:
resourceFieldRef:
containerName: spark-kubernetes-executor
resource: requests.memory
divisor: 1
- name: EXEC_POD_MEM_LIMIT
valueFrom:
resourceFieldRef:
containerName: spark-kubernetes-executor
resource: limits.memory
divisor: 1
volumeMounts:
- name: 0000000337omlr0m19o-spark-defaults
mountPath: /usr/lib/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
readOnly: false
- name: emr-container-communicate
mountPath: /var/log/fluentd
readOnly: false
- name: emr-container-application-log-dir
mountPath: /var/log/spark/user
readOnly: false
- name: emr-container-event-log-dir
mountPath: /var/log/spark/apps
readOnly: false
- name: temp-data-dir
mountPath: /tmp
readOnly: false
- name: mnt-dir
mountPath: /mnt
readOnly: false
- name: home-dir
mountPath: /home/hadoop
readOnly: false
executor-container-allowlist : |-
container.env
container.envFrom
container.name
container.lifecycle
container.livenessProbe
container.readinessProbe
container.resources
container.startupProbe
container.stdin
container.stdinOnce
container.terminationMessagePath
container.terminationMessagePolicy
container.tty
container.volumeDevices
container.volumeMounts
container.workingDir
executor-pod-allowlist : |-
pod.apiVersion
pod.kind
pod.metadata
pod.spec.activeDeadlineSeconds
pod.spec.affinity
pod.spec.containers
pod.spec.enableServiceLinks
pod.spec.ephemeralContainers
pod.spec.hostAliases
pod.spec.hostname
pod.spec.imagePullSecrets
pod.spec.initContainers
pod.spec.nodeName
pod.spec.nodeSelector
pod.spec.overhead
pod.spec.preemptionPolicy
pod.spec.priority
pod.spec.priorityClassName
pod.spec.readinessGates
pod.spec.restartPolicy
pod.spec.runtimeClassName
pod.spec.schedulerName
pod.spec.subdomain
pod.spec.terminationGracePeriodSeconds
pod.spec.tolerations
pod.spec.topologySpreadConstraints
pod.spec.volumes
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T13:28:12Z"
labels :
emr-containers.amazonaws.com/job.configuration : pod-template
emr-containers.amazonaws.com/job.id : 0000000337omlr0m19o
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : podtemplate-0000000337omlr0m19o
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337omlr0m19o
uid : 2bde5561-1ad4-46c2-9034-ae28d507746a
resourceVersion : "68639522"
uid : 1f139a71-51bf-4be3-a269-5971ee1aff66
[File 2] Pod Template ConfigMap 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
apiVersion : v1
data :
driver : "<system>\n workers 1\n</system>\n<worker 0>\n <source>\n tag emr-containers-spark-s3-event-logs\n
\ @label @emr-containers-spark-s3-event-logs\n @type tail\n path \"#{ENV['K8S_SPARK_EVENT_LOG_DIR']}/#{ENV['SPARK_APPLICATION_ID']}.inprogress,#{ENV['K8S_SPARK_EVENT_LOG_DIR']}/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_APPLICATION_ID']}.inprogress\"\n
\ pos_file \"/tmp/fluentd/event-logs/in-tail.pos\"\n read_from_head true\n
\ refresh_interval 30\n <parse>\n @type none\n </parse>\n </source>\n
\ \n <label @emr-containers-spark-s3-event-logs>\n <match emr-containers-spark-s3-event-logs>\n
\ @type s3\n <refreshing_file_presigned_post>\n presigned_post_path
/var/emr-container/s3/presigned-post\n </refreshing_file_presigned_post>\n
\ s3_bucket prod.ap-northeast-2.appinfo.src\n s3_region ap-northeast-2\n
\ check_bucket false\n check_apikey_on_start false\n check_object
false\n path \"[account-id]/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337olhmpguek/sparklogs/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ s3_object_key_format \"%{path}/events_%{index}_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ store_as text\n storage_class STANDARD\n overwrite true\n format
single_value\n <buffer time>\n @type file\n path /tmp/fluentd/event-logs/out-s3-buffer*\n
\ chunk_limit_size 32MB\n flush_at_shutdown true\n timekey
30\n timekey_wait 0\n retry_timeout 30s\n retry_type exponential_backoff\n
\ retry_exponential_backoff_base 2\n retry_wait 1s\n retry_randomize
true\n disable_chunk_backup true\n retry_max_times 5\n </buffer>\n
\ <secondary>\n @type secondary_file\n directory /var/log/fluentd/error/\n
\ basename s3-event-error.log\n </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker
0>\n <source>\n tag emr-containers-spark-s3-event-status-file\n @label
@emr-containers-spark-s3-event-status-file\n @type exec\n command echo \"
\"\n format none\n </source>\n \n <label @emr-containers-spark-s3-event-status-file>\n
\ <match emr-containers-spark-s3-event-status-file>\n @type s3\n <refreshing_file_presigned_post>\n
\ presigned_post_path /var/emr-container/s3/presigned-post\n </refreshing_file_presigned_post>\n
\ s3_bucket prod.ap-northeast-2.appinfo.src\n s3_region ap-northeast-2\n
\ check_bucket false\n check_apikey_on_start false\n check_object
false\n path \"[account-id]/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337olhmpguek/sparklogs/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ s3_object_key_format \"%{path}/appstatus_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ store_as text\n storage_class STANDARD\n overwrite true\n format
single_value\n <buffer time>\n @type file\n path /tmp/fluentd/event-logs/appstatus/out-s3-buffer*\n
\ chunk_limit_size 1MB\n flush_mode immediate\n flush_at_shutdown
true\n retry_timeout 30s\n retry_type exponential_backoff\n retry_exponential_backoff_base
2\n retry_wait 1s\n retry_randomize true\n disable_chunk_backup
true\n retry_max_times 5\n </buffer>\n <secondary>\n @type
secondary_file\n directory /var/log/fluentd/error/\n basename s3-appstatus-event-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n"
fluentd-pod-metadata.conf : "<system>\n workers 1\n</system>\n<worker 0>\n <source>\n
\ tag emr-containers-pod-metadata\n @label @emr-containers-pod-metadata\n
\ @type tail\n path \"#{ENV['POD_METADATA_PATH']}\"\n pos_file \"/tmp/emr-containers/pod-info-in-tail.pos\"\n
\ read_from_head true\n refresh_interval 120\n <parse>\n @type none\n
\ </parse>\n </source>\n \n <label @emr-containers-pod-metadata>\n <match
emr-containers-pod-metadata>\n @type stdout\n format single_value\n
\ output_to_console true\n </match>\n </label>\n</worker>\n"
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T13:18:20Z"
labels :
emr-containers.amazonaws.com/job.configuration : fluentd
emr-containers.amazonaws.com/job.id : 0000000337olhmpguek
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : fluentd-jk518skp01ys9ka8b0npx9nt0-0000000337olhmpguek
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337olhmpguek
uid : 60039ac0-b461-4c93-8488-06a3fe711383
resourceVersion : "68636201"
uid : 683034ea-45a8-4e96-99ed-20ae01be2a2d
[File 3] fluentd ConfigMap job-runner Pod 내부의 spark-submit CLI는 job-runner Pod에 붙는 ConfigMap 기반의 File을 통해서 Spark Job 생성에 필요한 각종 설정 정보들을 얻는다. ConfigMap은 StartJobRun API의 설정에 따라서 AWS EMR에서 job-runner Pod가 생성 되기전에 생성된다. 설정 정보에는 Driver Pod, Executor Pod 관련 설정들이 포함되어 있다. [Shell 1] 명령어를 실행하면 [File 1], [File 2], [File 3] 3개의 ConfigMap이 생성된다.
[File 1]은 spark-submit CLI에게 Spark Job 설정을 전달하기 위한 spark-defauls.conf ConfigMap, [File 2]는 spark-submit CLI에게 전달하기 위한 Pod Template을 위한 ConfigMap, [File 3]은 Driver Pod에 설정되는 fluentd를 위한 ConfigMap을 나타내고 있다. StartJobRun API를 통해서 Spark Job을 제출하면 Driver Pod에는 반드시 fluentd Sidecar Container가 생성이 된다. 이유는 spark-submit CLI가 [File 1], [File 2], [File 3] ConfigMap을 통해서 spark-submit CLI의 Pod Template 기능을 통해서 fluentd Container를 Driver의 Sidecar Container로 생성하기 때문이다.
[File 3] fluentd ConfigMap의 설정을 살펴보면 prod.ap-northeast-2.appinfo.src
Bucket에 Driver Pod에서 발생하는 Event Log를 저장하는 것을 확인할 수 있다. appinfo.src
Bucket은 AWS EMR에서 관리하는 Bucket이며, EMR이 관리하는 Spark History Server와 연동되어 사용자에게 SparkJobRun API를 통해서 제출된 Spark Job의 History를 확인할 수 있게 설정된다. 물론 --conf spark.eventLog.dir=s3a://][s3-bucket]
설정을 명시하여 사용자가 원하는 경로에 Event Log를 적제하도록 설정도 가능하다.
$ aws emr-containers start-job-run \
--virtual-cluster-id jk518skp01ys9ka8b0npx9nt0 \
--name= pi-logs \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[ account-id] :role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.driver.cores=1 --conf spark.driver.memory=512M --conf spark.executor.instances=1 --conf spark.executor.memory=512M --conf spark.executor.cores=1"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"persistentAppUI": "ENABLED",
"cloudWatchMonitoringConfiguration": {
"logGroupName": "spark-startjobrun",
"logStreamNamePrefix": "pi-logs"
},
"s3MonitoringConfiguration": {
"logUri": "s3://ssup2-spark/startjobrun/"
}
}
}'
[Shell 2] aws CLI StartJobRun API with Logging Example StartJobRun API는 job-runner, driver, executor Pod의 stdout/stderr를 CloudWatch 또는 S3에 간편하게 전송해주는 기능도 제공하고 있다. [Shell 2]는 Logging 설정과 함께 aws CLI를 통해서 StartJobRun API를 통해서 Spark Job을 제출하는 예제를 나타내고 있다. [Shell 1]과 비교하면 monitoringConfigruation
설정이 추가된 것을 확인할 수 있으며 하위에 CloudWatch와 S3 설정이 각각 존재하는것을 확인할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
apiVersion : v1
data :
spark-defaults.conf : |
spark.kubernetes.executor.podTemplateValidation.enabled true
spark.executor.extraClassPath \/usr\/lib\/hadoop-lzo\/lib\/*:\/usr\/lib\/hadoop\/hadoop-aws.jar:\/usr\/share\/aws\/aws-java-sdk\/*:\/usr\/share\/aws\/emr\/emrfs\/conf:\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/usr\/share\/aws\/emr\/security\/conf:\/usr\/share\/aws\/emr\/security\/lib\/*:\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar:\/docker\/usr\/lib\/hadoop-lzo\/lib\/*:\/docker\/usr\/lib\/hadoop\/hadoop-aws.jar:\/docker\/usr\/share\/aws\/aws-java-sdk\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/conf:\/docker\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/docker\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/docker\/usr\/share\/aws\/emr\/security\/conf:\/docker\/usr\/share\/aws\/emr\/security\/lib\/*:\/docker\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/docker\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/docker\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/docker\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar
spark.executor.extraLibraryPath \/usr\/lib\/hadoop\/lib\/native:\/usr\/lib\/hadoop-lzo\/lib\/native:\/docker\/usr\/lib\/hadoop\/lib\/native:\/docker\/usr\/lib\/hadoop-lzo\/lib\/native
spark.kubernetes.driver.internalPodTemplateFile \/etc\/spark\/conf\/driver-internal-pod.yaml
spark.resourceManager.cleanupExpiredHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
spark.kubernetes.executor.container.allowlistFile \/etc\/spark\/conf\/executor-pod-template-container-allowlist.txt
spark.kubernetes.executor.container.image 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com\/spark\/emr-6.8.0:latest
spark.history.fs.logDirectory file:\/\/\/var\/log\/spark\/apps
spark.kubernetes.pyspark.pythonVersion 3
spark.driver.memory 1G
spark.master k8s:\/\/https:\/\/kubernetes.default.svc:443
spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions
spark.driver.cores 1
spark.kubernetes.driver.container.image 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com\/spark\/emr-6.8.0:latest
spark.driver.extraLibraryPath \/usr\/lib\/hadoop\/lib\/native:\/usr\/lib\/hadoop-lzo\/lib\/native:\/docker\/usr\/lib\/hadoop\/lib\/native:\/docker\/usr\/lib\/hadoop-lzo\/lib\/native
spark.kubernetes.executor.podTemplateContainerName spark-kubernetes-executor
spark.kubernetes.driver.podTemplateValidation.enabled true
spark.kubernetes.driver.pod.allowlistFile \/etc\/spark\/conf\/driver-pod-template-pod-allowlist.txt
spark.history.ui.port 18080
spark.hadoop.fs.s3.customAWSCredentialsProvider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.blacklist.decommissioning.timeout 1h
spark.driver.defaultJavaOptions -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
spark.hadoop.fs.defaultFS file:\/\/\/
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.dynamicAllocation.enabled false
spark.kubernetes.container.image.pullPolicy Always
spark.kubernetes.driver.podTemplateContainerName spark-kubernetes-driver
spark.eventLog.logBlockUpdates.enabled true
spark.driver.extraClassPath \/usr\/lib\/hadoop-lzo\/lib\/*:\/usr\/lib\/hadoop\/hadoop-aws.jar:\/usr\/share\/aws\/aws-java-sdk\/*:\/usr\/share\/aws\/emr\/emrfs\/conf:\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/usr\/share\/aws\/emr\/security\/conf:\/usr\/share\/aws\/emr\/security\/lib\/*:\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar:\/docker\/usr\/lib\/hadoop-lzo\/lib\/*:\/docker\/usr\/lib\/hadoop\/hadoop-aws.jar:\/docker\/usr\/share\/aws\/aws-java-sdk\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/conf:\/docker\/usr\/share\/aws\/emr\/emrfs\/lib\/*:\/docker\/usr\/share\/aws\/emr\/emrfs\/auxlib\/*:\/docker\/usr\/share\/aws\/emr\/goodies\/lib\/emr-spark-goodies.jar:\/docker\/usr\/share\/aws\/emr\/security\/conf:\/docker\/usr\/share\/aws\/emr\/security\/lib\/*:\/docker\/usr\/share\/aws\/hmclient\/lib\/aws-glue-datacatalog-spark-client.jar:\/docker\/usr\/share\/java\/Hive-JSON-Serde\/hive-openx-serde.jar:\/docker\/usr\/share\/aws\/sagemaker-spark-sdk\/lib\/sagemaker-spark-sdk.jar:\/docker\/usr\/share\/aws\/emr\/s3select\/lib\/emr-s3-select-spark-connector.jar
spark.executor.defaultJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
spark.kubernetes.namespace emr-cli
spark.stage.attempt.ignoreOnDecommissionFetchFailure true
spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds 2000
spark.kubernetes.executor.internalPodTemplateContainerName spark-kubernetes-executor
spark.kubernetes.driver.container.allowlistFile \/etc\/spark\/conf\/driver-pod-template-container-allowlist.txt
spark.kubernetes.executor.pod.allowlistFile \/etc\/spark\/conf\/executor-pod-template-pod-allowlist.txt
spark.eventLog.dir file:\/\/\/var\/log\/spark\/apps
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.executor.memory 1G
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.kubernetes.executor.internalPodTemplateFile \/etc\/spark\/conf\/executor-internal-pod.yaml
spark.decommissioning.timeout.threshold 20
spark.executor.cores 1
spark.hadoop.dynamodb.customAWSCredentialsProvider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.kubernetes.driver.internalPodTemplateContainerName spark-kubernetes-driver
spark.submit.deployMode cluster
spark.authenticate true
spark.blacklist.decommissioning.enabled true
spark.eventLog.enabled true
spark.shuffle.service.enabled false
spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T14:50:32Z"
labels :
emr-containers.amazonaws.com/job.configuration : spark-defaults
emr-containers.amazonaws.com/job.id : 0000000337p03c0klg8
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : 0000000337p03c0klg8-spark-defaults
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337p03c0klg8
uid : a0b14577-fe4f-4611-bd64-b45e9e898bb1
resourceVersion : "68662395"
uid : eb522369-1639-443b-a279-8e901d48b4ac
[File 4] spark-default ConfigMap with Logging 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
apiVersion : v1
data :
driver : |-
apiVersion: v1
kind: Pod
metadata:
ownerReferences:
- apiVersion: batch/v1
blockOwnerDeletion: true
controller: true
kind: ConfigMap
name: 0000000337p03c0klg8-spark-defaults
uid: eb522369-1639-443b-a279-8e901d48b4ac
spec:
serviceAccountName: emr-containers-sa-spark-driver-[account-id]-j3uv6jk0kk3sogu231qj91fmo3mvwfl561
volumes:
- name: emr-container-communicate
emptyDir: {}
- name: config-volume
configMap:
name: fluentd-jk518skp01ys9ka8b0npx9nt0-0000000337p03c0klg8
- name: emr-container-s3
secret:
secretName: emr-containers-s3-jk518skp01ys9ka8b0npx9nt0-0000000337p03c0klg8
- name: emr-container-application-log-dir
emptyDir: {}
- name: emr-container-event-log-dir
emptyDir: {}
- name: temp-data-dir
emptyDir: {}
- name: mnt-dir
emptyDir: {}
- name: home-dir
emptyDir: {}
- name: 0000000337p03c0klg8-spark-defaults
configMap:
name: 0000000337p03c0klg8-spark-defaults
securityContext:
fsGroup: 65534
containers:
- name: spark-kubernetes-driver
image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/spark/emr-6.8.0:latest
imagePullPolicy: Always
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 1000
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
volumeMounts:
- name: emr-container-communicate
mountPath: /var/log/fluentd
readOnly: false
- name: emr-container-application-log-dir
mountPath: /var/log/spark/user
readOnly: false
- name: emr-container-event-log-dir
mountPath: /var/log/spark/apps
readOnly: false
- name: temp-data-dir
mountPath: /tmp
readOnly: false
- name: mnt-dir
mountPath: /mnt
readOnly: false
- name: home-dir
mountPath: /home/hadoop
readOnly: false
- name: 0000000337p03c0klg8-spark-defaults
mountPath: /usr/lib/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
readOnly: false
env:
- name: SPARK_CONTAINER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SIDECAR_SIGNAL_FILE
value: /var/log/fluentd/main-container-terminated
- name: K8S_SPARK_LOG_ERROR_REGEX
value: (Error|Exception|Fail)
- name: K8S_SPARK_LOG_URL_STDOUT
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stdout"
- name: K8S_SPARK_LOG_URL_STDERR
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stderr"
- name: TERMINATION_ERROR_LOG_FILE_PATH
value: /var/log/spark/error.log
terminationMessagePath: /var/log/spark/error.log
- name: emr-container-fluentd
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 1000
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/fluentd/emr-6.8.0:latest
imagePullPolicy: Always
resources:
requests:
memory: 200Mi
limits:
memory: 512Mi
env:
- name: SPARK_CONTAINER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SPARK_ROLE
valueFrom:
fieldRef:
fieldPath: metadata.labels['spark-role']
- name: K8S_SPARK_LOG_URL_STDERR
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stderr"
- name: K8S_SPARK_LOG_URL_STDOUT
value: "/var/log/spark/user/$(SPARK_CONTAINER_ID)/stdout"
- name: SIDECAR_SIGNAL_FILE
value: /var/log/fluentd/main-container-terminated
- name: FLUENTD_CONF
value: fluent.conf
- name: K8S_SPARK_EVENT_LOG_DIR
value: /var/log/spark/apps
- name: AWS_REGION
value: ap-northeast-2
- name: RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR
value: 0.9
volumeMounts:
- name: config-volume
mountPath: /etc/fluent/fluent.conf
subPath: driver
readOnly: false
- name: emr-container-s3
mountPath: /var/emr-container/s3
readOnly: true
- name: emr-container-communicate
mountPath: /var/log/fluentd
readOnly: false
- name: emr-container-application-log-dir
mountPath: /var/log/spark/user
readOnly: false
- name: emr-container-event-log-dir
mountPath: /var/log/spark/apps
readOnly: false
- name: temp-data-dir
mountPath: /tmp
readOnly: false
- name: home-dir
mountPath: /home/hadoop
readOnly: false
driver-container-allowlist : |-
container.env
container.envFrom
container.name
container.lifecycle
container.livenessProbe
container.readinessProbe
container.resources
container.startupProbe
container.stdin
container.stdinOnce
container.terminationMessagePath
container.terminationMessagePolicy
container.tty
container.volumeDevices
container.volumeMounts
container.workingDir
driver-pod-allowlist : |-
pod.apiVersion
pod.kind
pod.metadata
pod.spec.activeDeadlineSeconds
pod.spec.affinity
pod.spec.containers
pod.spec.enableServiceLinks
pod.spec.ephemeralContainers
pod.spec.hostAliases
pod.spec.hostname
pod.spec.imagePullSecrets
pod.spec.initContainers
pod.spec.nodeName
pod.spec.nodeSelector
pod.spec.overhead
pod.spec.preemptionPolicy
pod.spec.priority
pod.spec.priorityClassName
pod.spec.readinessGates
pod.spec.restartPolicy
pod.spec.runtimeClassName
pod.spec.schedulerName
pod.spec.subdomain
pod.spec.terminationGracePeriodSeconds
pod.spec.tolerations
pod.spec.topologySpreadConstraints
pod.spec.volumes
executor : "apiVersion: v1\nkind: Pod\n\nspec:\n serviceAccountName: emr-containers-sa-spark-executor-[account-id]-j3uv6jk0kk3sogu231qj91fmo3mvwfl561\n
\ volumes:\n - name: emr-container-communicate\n emptyDir: {}\n - name:
config-volume\n configMap:\n name: fluentd-jk518skp01ys9ka8b0npx9nt0-0000000337p03c0klg8\n
\ \n - name: emr-container-application-log-dir\n emptyDir: {}\n -
name: temp-data-dir\n emptyDir: {}\n - name: mnt-dir\n emptyDir:
{}\n - name: home-dir\n emptyDir: {}\n - name: 0000000337p03c0klg8-spark-defaults\n
\ configMap:\n name: 0000000337p03c0klg8-spark-defaults\n securityContext:\n
\ fsGroup: 65534\n containers:\n - name: spark-kubernetes-executor\n image:
996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/spark/emr-6.8.0:latest\n imagePullPolicy:
Always\n securityContext:\n runAsNonRoot: true\n runAsUser:
999\n runAsGroup: 1000\n privileged: false\n allowPrivilegeEscalation:
false\n readOnlyRootFilesystem: true\n capabilities:\n drop:
[\"ALL\"]\n volumeMounts:\n - name: emr-container-communicate\n mountPath:
/var/log/fluentd\n readOnly: false\n - name: emr-container-application-log-dir\n
\ mountPath: /var/log/spark/user\n readOnly: false\n -
name: temp-data-dir\n mountPath: /tmp\n readOnly: false\n -
name: mnt-dir\n mountPath: /mnt\n readOnly: false\n -
name: home-dir\n mountPath: /home/hadoop\n readOnly: false\n
\ - name: 0000000337p03c0klg8-spark-defaults\n mountPath: /usr/lib/spark/conf/spark-defaults.conf\n
\ subPath: spark-defaults.conf\n readOnly: false\n env:\n
\ - name: SPARK_CONTAINER_ID\n valueFrom:\n fieldRef:\n
\ fieldPath: metadata.name\n - name: K8S_SPARK_LOG_URL_STDERR\n
\ value: \"/var/log/spark/user/$(SPARK_CONTAINER_ID)/stderr\"\n -
name: K8S_SPARK_LOG_URL_STDOUT\n value: \"/var/log/spark/user/$(SPARK_CONTAINER_ID)/stdout\"\n
\ - name: SIDECAR_SIGNAL_FILE\n value: /var/log/fluentd/main-container-terminated\n
\ - name: EXEC_POD_CPU_REQUEST\n valueFrom:\n resourceFieldRef:\n
\ containerName: spark-kubernetes-executor\n resource:
requests.cpu\n divisor: 1\n - name: EXEC_POD_CPU_LIMIT\n valueFrom:\n
\ resourceFieldRef:\n containerName: spark-kubernetes-executor\n
\ resource: limits.cpu\n divisor: 1\n - name:
EXEC_POD_MEM_REQUEST\n valueFrom:\n resourceFieldRef:\n containerName:
spark-kubernetes-executor\n resource: requests.memory\n divisor:
1\n - name: EXEC_POD_MEM_LIMIT\n valueFrom:\n resourceFieldRef:\n
\ containerName: spark-kubernetes-executor\n resource:
limits.memory\n divisor: 1\n - name: SPARK_LOG_URL_STDERR\n
\ value: \"https://s3.console.aws.amazon.com/s3/object/ssup2-spark/startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p03c0klg8/containers/$(SPARK_APPLICATION_ID)/$(SPARK_CONTAINER_ID)/stderr.gz\"\n
\ - name: SPARK_LOG_URL_STDOUT\n value: \"https://s3.console.aws.amazon.com/s3/object/ssup2-spark/startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p03c0klg8/containers/$(SPARK_APPLICATION_ID)/$(SPARK_CONTAINER_ID)/stdout.gz\"\n
\ - name: SPARK_EXECUTOR_ATTRIBUTE_CONTAINER_ID\n value: \"$(SPARK_CONTAINER_ID)\"\n
\ - name: SPARK_EXECUTOR_ATTRIBUTE_LOG_FILES\n value: \"stderr,stdout\"\n\n
\ - name: emr-container-fluentd\n securityContext:\n runAsNonRoot:
true\n runAsUser: 999\n runAsGroup: 1000\n privileged: false\n
\ allowPrivilegeEscalation: false\n readOnlyRootFilesystem: true\n
\ capabilities:\n drop: [\"ALL\"]\n image: 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/fluentd/emr-6.8.0:latest\n
\ imagePullPolicy: Always\n resources:\n requests:\n memory:
200Mi\n limits:\n memory: 512Mi\n env:\n - name: SPARK_CONTAINER_ID\n
\ valueFrom:\n fieldRef:\n fieldPath: metadata.name\n
\ - name: SPARK_ROLE\n valueFrom:\n fieldRef:\n fieldPath:
metadata.labels['spark-role']\n - name: K8S_SPARK_LOG_URL_STDERR\n value:
\"/var/log/spark/user/$(SPARK_CONTAINER_ID)/stderr\"\n - name: K8S_SPARK_LOG_URL_STDOUT\n
\ value: \"/var/log/spark/user/$(SPARK_CONTAINER_ID)/stdout\"\n -
name: SIDECAR_SIGNAL_FILE\n value: /var/log/fluentd/main-container-terminated\n
\ - name: FLUENTD_CONF\n value: fluent.conf\n - name: AWS_REGION\n
\ value: ap-northeast-2\n - name: RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR\n
\ value: 0.9\n volumeMounts:\n - name: config-volume\n mountPath:
/etc/fluent/fluent.conf\n subPath: executor\n readOnly: false\n
\ \n - name: emr-container-communicate\n mountPath: /var/log/fluentd\n
\ readOnly: false\n - name: emr-container-application-log-dir\n
\ mountPath: /var/log/spark/user\n readOnly: false\n -
name: temp-data-dir\n mountPath: /tmp\n readOnly: false\n -
name: home-dir\n mountPath: /home/hadoop\n readOnly: false"
executor-container-allowlist : |-
container.env
container.envFrom
container.name
container.lifecycle
container.livenessProbe
container.readinessProbe
container.resources
container.startupProbe
container.stdin
container.stdinOnce
container.terminationMessagePath
container.terminationMessagePolicy
container.tty
container.volumeDevices
container.volumeMounts
container.workingDir
executor-pod-allowlist : |-
pod.apiVersion
pod.kind
pod.metadata
pod.spec.activeDeadlineSeconds
pod.spec.affinity
pod.spec.containers
pod.spec.enableServiceLinks
pod.spec.ephemeralContainers
pod.spec.hostAliases
pod.spec.hostname
pod.spec.imagePullSecrets
pod.spec.initContainers
pod.spec.nodeName
pod.spec.nodeSelector
pod.spec.overhead
pod.spec.preemptionPolicy
pod.spec.priority
pod.spec.priorityClassName
pod.spec.readinessGates
pod.spec.restartPolicy
pod.spec.runtimeClassName
pod.spec.schedulerName
pod.spec.subdomain
pod.spec.terminationGracePeriodSeconds
pod.spec.tolerations
pod.spec.topologySpreadConstraints
pod.spec.volumes
validation-script : |-
#!/usr/bin/python3
import boto3
import os
import time
import json
import logging
class UploadJobMetadata(object):
LOGGING_BUCKET_NAME_ENV = 'LOGGING_BUCKET_NAME'
CLOUDWATCH_LOG_GROUP_NAME_ENV = 'CW_LOG_GROUP_NAME'
S3_UPLOAD_PATH_ENV = 'JOB_METADATA_UPLOAD_S3_PATH'
CW_UPLOAD_STREAM_ENV = 'JOB_METADATA_CW_LOG_STREAM_NAME'
def get_role_credentials(self):
role_arn = os.getenv('AWS_ROLE_ARN')
token_file = os.getenv('AWS_WEB_IDENTITY_TOKEN_FILE')
aws_region = os.getenv('AWS_REGION')
sts_endpoint_url = os.getenv('STS_ENDPOINT_URL')
token_file_obj = open(token_file, 'r')
token = token_file_obj.read()
sts_client = boto3.client('sts', region_name=aws_region, endpoint_url=sts_endpoint_url)
return sts_client.assume_role_with_web_identity(RoleArn=role_arn,
WebIdentityToken=token,
RoleSessionName='emr-containers')
def generate_job_metadata(self, region):
s3_path = os.getenv(self.S3_UPLOAD_PATH_ENV)
log_stream_name = os.getenv(self.CW_UPLOAD_STREAM_ENV)
job_metadata = {'region': region, 'job_id': os.getenv('JOB_ID')}
if s3_path is not None:
job_metadata['logging_bucket_name'] = os.getenv(self.LOGGING_BUCKET_NAME_ENV)
job_metadata['logging_s3_metadata_path'] = s3_path
if log_stream_name is not None:
job_metadata['cloudwatch_log_group_name'] = os.getenv(self.CLOUDWATCH_LOG_GROUP_NAME_ENV)
job_metadata['cloudwatch_log_metadata_stream'] = log_stream_name
return json.dumps(job_metadata)
def upload_job_metadata_to_s3(self, job_metadata, region, creds):
local_file_name = '/tmp/job-metadata.log'
local_file_obj = open(local_file_name, 'w')
local_file_obj.write(job_metadata)
local_file_obj.close()
bucket_name = os.getenv(self.LOGGING_BUCKET_NAME_ENV)
s3_path = os.getenv(self.S3_UPLOAD_PATH_ENV)
aws_access_key_id = creds['Credentials']['AccessKeyId']
aws_secret_access_key = creds['Credentials']['SecretAccessKey']
aws_session_token = creds['Credentials']['SessionToken']
s3_client = boto3.client('s3',
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
s3_client.upload_file(local_file_name, bucket_name, s3_path)
s3_client.download_file(bucket_name, s3_path, local_file_name)
def upload_job_metadata_to_cw(self, job_metadata, region, creds):
log_group_name = os.getenv(self.CLOUDWATCH_LOG_GROUP_NAME_ENV)
log_stream_name = os.getenv(self.CW_UPLOAD_STREAM_ENV)
aws_access_key_id = creds['Credentials']['AccessKeyId']
aws_secret_access_key = creds['Credentials']['SecretAccessKey']
aws_session_token = creds['Credentials']['SessionToken']
logs_client = boto3.client('logs',
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
log_groups = logs_client.describe_log_groups(logGroupNamePrefix=log_group_name)
if len(list(filter(lambda x: x['logGroupName'] == log_group_name, log_groups['logGroups']))) == 0:
logs_client.create_log_group(logGroupName=log_group_name)
log_streams = logs_client.describe_log_streams(logGroupName=log_group_name, logStreamNamePrefix=log_stream_name)
log_stream = list(filter(lambda x: x['logStreamName'] == log_stream_name, log_streams['logStreams']))
if len(log_stream) == 0:
logs_client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
self.put_log_event(logs_client, log_group_name, log_stream_name, job_metadata)
else:
if 'uploadSequenceToken' not in log_stream[0]:
self.put_log_event(logs_client, log_group_name, log_stream_name, job_metadata)
else:
upload_token = log_stream[0]['uploadSequenceToken']
self.put_log_event_with_upload_token(logs_client, log_group_name, log_stream_name, upload_token, job_metadata)
def put_log_event(self, logs_client, log_group_name, log_stream_name, job_metadata):
logs_client.put_log_events(logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=[
{
'timestamp': round(time.time() * 1000),
'message': job_metadata
}
])
def put_log_event_with_upload_token(self, logs_client, log_group_name, log_stream_name, upload_token, job_metadata):
logs_client.put_log_events(logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=[
{
'timestamp': round(time.time() * 1000),
'message': job_metadata
}
], sequenceToken = upload_token)
def run(self):
region = os.getenv('AWS_REGION')
job_metadata = self.generate_job_metadata(region)
creds = None
s3_path = os.getenv(self.S3_UPLOAD_PATH_ENV)
if s3_path is not None:
creds = self.get_role_credentials()
self.upload_job_metadata_to_s3(job_metadata, region, creds)
log_stream_name = os.getenv(self.CW_UPLOAD_STREAM_ENV)
if log_stream_name is not None:
if creds is None:
creds = self.get_role_credentials()
self.upload_job_metadata_to_cw(job_metadata, region, creds)
if __name__ == "__main__":
UploadJobMetadata().run()
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T14:50:32Z"
labels :
emr-containers.amazonaws.com/job.configuration : pod-template
emr-containers.amazonaws.com/job.id : 0000000337p03c0klg8
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : podtemplate-0000000337p03c0klg8
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337p03c0klg8
uid : a0b14577-fe4f-4611-bd64-b45e9e898bb1
resourceVersion : "68662402"
uid : b9903ad0-e49c-43e3-9d8d-e004a050f051
[File 5] Pod Template ConfigMap with Logging 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
apiVersion : v1
data :
driver : "<system>\n workers 3\n</system>\n<worker 0>\n <source>\n tag chicago-spark-stdout\n
\ @label @chicago-spark-stdout\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stdout>\n
\ <match chicago-spark-stdout>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stdout\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stdout-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 0>\n <source>\n tag chicago-spark-stderr\n
\ @label @chicago-spark-stderr\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stderr>\n
\ <match chicago-spark-stderr>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stderr\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stderr-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n tag chicago-spark-s3-container-stderr-logs\n
\ @label @chicago-spark-s3-container-stderr-logs\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-s3-container-log-in-tail.pos\"\n
\ read_from_head true\n <parse>\n @type none\n </parse>\n </source>\n
\ \n <label @chicago-spark-s3-container-stderr-logs>\n <match chicago-spark-s3-container-stderr-logs>\n
\ @type s3\n s3_bucket ssup2-spark\n s3_region ap-northeast-2\n
\ path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stderr.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stderr/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n
\ tag chicago-spark-s3-container-stdout-logs\n @label @chicago-spark-s3-container-stdout-logs\n
\ @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n pos_file
\"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-s3-container-log-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-s3-container-stdout-logs>\n
\ <match chicago-spark-s3-container-stdout-logs>\n @type s3\n s3_bucket
ssup2-spark\n s3_region ap-northeast-2\n path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stdout.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stdout/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker 2>\n <source>\n
\ tag emr-containers-spark-s3-event-logs\n @label @emr-containers-spark-s3-event-logs\n
\ @type tail\n path \"#{ENV['K8S_SPARK_EVENT_LOG_DIR']}/#{ENV['SPARK_APPLICATION_ID']}.inprogress,#{ENV['K8S_SPARK_EVENT_LOG_DIR']}/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_APPLICATION_ID']}.inprogress\"\n
\ pos_file \"/tmp/fluentd/event-logs/in-tail.pos\"\n read_from_head true\n
\ refresh_interval 30\n <parse>\n @type none\n </parse>\n </source>\n
\ \n <label @emr-containers-spark-s3-event-logs>\n <match emr-containers-spark-s3-event-logs>\n
\ @type s3\n <refreshing_file_presigned_post>\n presigned_post_path
/var/emr-container/s3/presigned-post\n </refreshing_file_presigned_post>\n
\ s3_bucket prod.ap-northeast-2.appinfo.src\n s3_region ap-northeast-2\n
\ check_bucket false\n check_apikey_on_start false\n check_object
false\n path \"[account-id]/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/sparklogs/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ s3_object_key_format \"%{path}/events_%{index}_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ store_as text\n storage_class STANDARD\n overwrite true\n format
single_value\n <buffer time>\n @type file\n path /tmp/fluentd/event-logs/out-s3-buffer*\n
\ chunk_limit_size 32MB\n flush_at_shutdown true\n timekey
30\n timekey_wait 0\n retry_timeout 30s\n retry_type exponential_backoff\n
\ retry_exponential_backoff_base 2\n retry_wait 1s\n retry_randomize
true\n disable_chunk_backup true\n retry_max_times 5\n </buffer>\n
\ <secondary>\n @type secondary_file\n directory /var/log/fluentd/error/\n
\ basename s3-event-error.log\n </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker
2>\n <source>\n tag emr-containers-spark-s3-event-status-file\n @label
@emr-containers-spark-s3-event-status-file\n @type exec\n command echo \"
\"\n format none\n </source>\n \n <label @emr-containers-spark-s3-event-status-file>\n
\ <match emr-containers-spark-s3-event-status-file>\n @type s3\n <refreshing_file_presigned_post>\n
\ presigned_post_path /var/emr-container/s3/presigned-post\n </refreshing_file_presigned_post>\n
\ s3_bucket prod.ap-northeast-2.appinfo.src\n s3_region ap-northeast-2\n
\ check_bucket false\n check_apikey_on_start false\n check_object
false\n path \"[account-id]/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/sparklogs/eventlog_v2_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ s3_object_key_format \"%{path}/appstatus_#{ENV['SPARK_APPLICATION_ID']}\"\n
\ store_as text\n storage_class STANDARD\n overwrite true\n format
single_value\n <buffer time>\n @type file\n path /tmp/fluentd/event-logs/appstatus/out-s3-buffer*\n
\ chunk_limit_size 1MB\n flush_mode immediate\n flush_at_shutdown
true\n retry_timeout 30s\n retry_type exponential_backoff\n retry_exponential_backoff_base
2\n retry_wait 1s\n retry_randomize true\n disable_chunk_backup
true\n retry_max_times 5\n </buffer>\n <secondary>\n @type
secondary_file\n directory /var/log/fluentd/error/\n basename s3-appstatus-event-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n"
executor : "<system>\n workers 2\n</system>\n<worker 0>\n <source>\n tag chicago-spark-stdout\n
\ @label @chicago-spark-stdout\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stdout>\n
\ <match chicago-spark-stdout>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stdout\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stdout-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 0>\n <source>\n tag chicago-spark-stderr\n
\ @label @chicago-spark-stderr\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stderr>\n
\ <match chicago-spark-stderr>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stderr\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}/stderr-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n tag chicago-spark-s3-container-stderr-logs\n
\ @label @chicago-spark-s3-container-stderr-logs\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-s3-container-log-in-tail.pos\"\n
\ read_from_head true\n <parse>\n @type none\n </parse>\n </source>\n
\ \n <label @chicago-spark-s3-container-stderr-logs>\n <match chicago-spark-s3-container-stderr-logs>\n
\ @type s3\n s3_bucket ssup2-spark\n s3_region ap-northeast-2\n
\ path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stderr.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stderr/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n
\ tag chicago-spark-s3-container-stdout-logs\n @label @chicago-spark-s3-container-stdout-logs\n
\ @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n pos_file
\"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-s3-container-log-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-s3-container-stdout-logs>\n
\ <match chicago-spark-s3-container-stdout-logs>\n @type s3\n s3_bucket
ssup2-spark\n s3_region ap-northeast-2\n path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stdout.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stdout/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n"
fluentd-pod-metadata.conf : "<system>\n workers 1\n</system>\n<worker 0>\n <source>\n
\ tag emr-containers-pod-metadata\n @label @emr-containers-pod-metadata\n
\ @type tail\n path \"#{ENV['POD_METADATA_PATH']}\"\n pos_file \"/tmp/emr-containers/pod-info-in-tail.pos\"\n
\ read_from_head true\n refresh_interval 120\n <parse>\n @type none\n
\ </parse>\n </source>\n \n <label @emr-containers-pod-metadata>\n <match
emr-containers-pod-metadata>\n @type stdout\n format single_value\n
\ output_to_console true\n </match>\n </label>\n</worker>\n"
job : "<system>\n workers 2\n</system>\n<worker 0>\n <source>\n tag chicago-spark-stdout\n
\ @label @chicago-spark-stdout\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stdout>\n
\ <match chicago-spark-stdout>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}/stdout\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}/stdout-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 0>\n <source>\n tag chicago-spark-stderr\n
\ @label @chicago-spark-stderr\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-stderr>\n
\ <match chicago-spark-stderr>\n @type cloudwatch_logs\n auto_create_stream
true\n region ap-northeast-2\n include_time_key true\n log_group_name
\"spark-startjobrun\"\n log_stream_name \"pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}/stderr\"\n
\ put_log_events_disable_retry_limit false\n put_log_events_retry_limit
0\n <buffer>\n @type file\n path \"/tmp/pi-logs/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}/stderr-out-cwl-buffer*\"\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 10s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <secondary>\n @type secondary_file\n directory
/var/log/fluentd/error/\n basename cw-container-error.log\n </secondary>\n
\ </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n tag chicago-spark-s3-container-stderr-logs\n
\ @label @chicago-spark-s3-container-stderr-logs\n @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ pos_file \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}-s3-container-log-in-tail.pos\"\n
\ read_from_head true\n <parse>\n @type none\n </parse>\n </source>\n
\ \n <label @chicago-spark-s3-container-stderr-logs>\n <match chicago-spark-s3-container-stderr-logs>\n
\ @type s3\n s3_bucket ssup2-spark\n s3_region ap-northeast-2\n
\ path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stderr.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stderr/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDERR']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n\n\n<worker 1>\n <source>\n
\ tag chicago-spark-s3-container-stdout-logs\n @label @chicago-spark-s3-container-stdout-logs\n
\ @type tail\n path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n pos_file
\"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}-s3-container-log-in-tail.pos\"\n read_from_head
true\n <parse>\n @type none\n </parse>\n </source>\n \n <label @chicago-spark-s3-container-stdout-logs>\n
\ <match chicago-spark-s3-container-stdout-logs>\n @type s3\n s3_bucket
ssup2-spark\n s3_region ap-northeast-2\n path \"startjobrun/jk518skp01ys9ka8b0npx9nt0/jobs/0000000337p0p4se23o/control-logs/#{ENV['SPARK_CONTAINER_ID']}\"\n
\ s3_object_key_format %{path}/stdout.gz\n check_apikey_on_start false\n
\ overwrite true\n <buffer>\n @type file\n path /tmp/fluentd/container-logs/stdout/out-s3-buffer*\n
\ chunk_limit_size 16MB\n flush_at_shutdown true\n flush_mode
interval\n flush_interval 120s\n retry_timeout 30s\n retry_type
exponential_backoff\n retry_exponential_backoff_base 2\n retry_wait
1s\n retry_randomize true\n disable_chunk_backup true\n retry_max_times
5\n </buffer>\n <local_file_upload>\n file_path \"#{ENV['K8S_SPARK_LOG_URL_STDOUT']}\"\n
\ </local_file_upload>\n <secondary>\n @type secondary_file\n
\ directory /var/log/fluentd/error/\n basename s3-container-error.log\n
\ </secondary>\n </match>\n </label>\n</worker>\n"
kind : ConfigMap
metadata :
creationTimestamp : "2024-01-11T14:56:29Z"
labels :
emr-containers.amazonaws.com/job.configuration : fluentd
emr-containers.amazonaws.com/job.id : 0000000337p0p4se23o
emr-containers.amazonaws.com/virtual-cluster-id : jk518skp01ys9ka8b0npx9nt0
name : fluentd-jk518skp01ys9ka8b0npx9nt0-0000000337p0p4se23o
namespace : emr-cli
ownerReferences :
- apiVersion : batch/v1
blockOwnerDeletion : true
controller : true
kind : Job
name : 0000000337p0p4se23o
uid : 21bbe42c-758b-42b4-9dba-aeb497bd1942
resourceVersion : "68664513"
uid : 68ec766a-2e38-46c3-a3f6-1c1ec9264cd8
[File 6] fluentd ConfigMap with Logging [Shell 2] 명령어를 실행하면 [File 4], [File 5], [File 6] 3개의 ConfigMap이 생성된다. job-runner Pod 뿐만 아니라, Driver Pod, Executor Pod 또한 fluentd가 실행 되도록 설정되어 있는것을 확인할 수 있으며, Drvier Pod, Executor Pod에서 실행되는 fluentd는 stdout/stderr를 CloudWatch 또는 S3로 전송되도록 설정되어 있는것을 확인할 수 있다.
$ aws emr-containers start-job-run \
--virtual-cluster-id jk518skp01ys9ka8b0npx9nt0 \
--name= pi \
--region ap-northeast-2 \
--execution-role-arn arn:aws:iam::[ account-id] :role/ts-eks-emr-eks-emr-cli \
--release-label emr-6.8.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"job-start-timeout":"1800",
"spark.ui.prometheus.enabled":"true",
"spark.executor.processTreeMetrics.enabled":"true",
"spark.kubernetes.driver.annotation.prometheus.io/scrape":"true",
"spark.kubernetes.driver.annotation.prometheus.io/path":"/metrics/executors/prometheus/",
"spark.kubernetes.driver.annotation.prometheus.io/port":"4040"
}
}
]
}'
[Shell 3] aws CLI StartJobRun API with Prometheus Monitoring StartJobRun API를 통해서도 spark-submit CLI에서 설정 가능한 다양한 설정들을 동일하게 설정이 가능하다. [Shell 3]은 Promethues로 Monitoring 수행을 위한 예제를 나타내고 있다.
2.1. with ACK EMR Container Controller
[Figure 2] Spark on AWS EKS Architecture with ACK EMR Container Controller
AWS에서는 StartJobRun API를 기반으로 Spark Job을 제출하는 방식을 Kubernetes의 Object를 활용할 수 있도록 ACK EMR Container Controller를 제공하고 있다. [Figure 2]는 ACK EMR Container Controller를 기반으로 StartJobRun API를 통해서 Spark Job을 제출하는 과정을 나타내고 있다.
ACK EMR Container Controller를 AWS EKS Cluster에 설치하면 Virtual Cluster
와 Job Run
두 가지 Custom Resource 이용이 가능해진다. Virtual Cluster
는 ACK EMR Container Controller가 설치된 AWS EKS Cluster의 특정 Namespace를 대상으로 EMR on EKS의 Virtual Cluster를 설정하는데 이용하는 Custome Resource이며, Job Run
은 StartJobRun API를 통해서 Spark Job을 제출하는 경우 이용하는 Custom Resource이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
apiVersion : emrcontainers.services.k8s.aws/v1alpha1
kind : JobRun
metadata :
name : pi
namespace : emr-ack
spec :
name : pi
virtualClusterID : kkm9hr2cypco1341w5b0iwuaj
executionRoleARN : "arn:aws:iam::[account-id]:role/ts-eks-emr-eks-emr-ack"
releaseLabel : "emr-6.7.0-latest"
jobDriver :
sparkSubmitJobDriver :
entryPoint : "local:///usr/lib/spark/examples/src/main/python/pi.py"
entryPointArguments :
sparkSubmitParameters : "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
configurationOverrides : |
ApplicationConfiguration: null
MonitoringConfiguration: null
[File 7] JobRun Example 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion : emrcontainers.services.k8s.aws/v1alpha1
kind : JobRun
metadata :
name : pi-logs
namespace : emr-ack
spec :
name : pi-logs
virtualClusterID : kkm9hr2cypco1341w5b0iwuaj
executionRoleARN : "arn:aws:iam::[account-id]:role/ts-eks-emr-eks-emr-ack"
releaseLabel : "emr-6.8.0-latest"
jobDriver :
sparkSubmitJobDriver :
entryPoint : "local:///usr/lib/spark/examples/src/main/python/pi.py"
entryPointArguments :
sparkSubmitParameters : "--conf spark.driver.cores=1 --conf spark.driver.memory=512M --conf spark.executor.instances=1 --conf spark.executor.cores=1 --conf spark.executor.memory=512M"
configurationOverrides : |
MonitoringConfiguration:
PersistentAppUI: "ENABLED"
CloudWatchMonitoringConfiguration:
LogGroupName: "spark-startjobrun"
LogStreamNamePrefix: "pi-logs"
S3MonitoringConfiguration:
LogUri: "s3://ssup2-spark/startjobrun/"
[File 8] JobRun Example with Logging [File 7]은 단순한 Job Run의 예제를 나타내고 있으며, [File 8]은 Logging 설정이 적용된 Job Run의 예제를 나타내고 있다. 설정 값들을 살펴보면 aws CLI를 통해서 설정하는 옵션들을 동일하게 Job Run에 설장할 수 있는것을 확인할 수 있다.
3. 참조