1. 작업 :
1) elasticsearch의 indexName의 데이터를 가져온다.
2) 가져온 데이터를 하둡에 적재한다.
Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
.put("es.nodes", params.getEsNodes())
.put("es.mapping.date.rich", "false")
.put("pushdown", "true")
.put("es.scroll.size", params.getScrollSize())
// .put("es.read.unmapped.fields.ignore", "false")
.put("es.input.json", "true")
.put("es.read.metadata", "true").build();
Dataset<Row> dataSet = JavaEsSparkSQL.esDF(session.getSqlContext(), "indexName", esConfigParam());
dataSet.write().mode(SaveMode.Overwrite).option("compression","gzip").parquet("/hadoopPath");
2. 에러 확인
로그 :
2020-08-26 11:53:38,502 WARN scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 330, localhost, executor 6): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:248)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
3. 원인 분석 : 특정 필드가 array로 되어 있는데 spark에서 string 타입으로 읽어들여 발생함.
=> The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
4. 자세한 로그 확인 :
1) application_id를 확인한다 => 로그에서 tracking URL의 application_xxxx 부분.
2020-08-26 11:52:20,174 INFO yarn.Client:
client token: N/A
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1598410339147
final status: UNDEFINED
tracking URL: http://localhost:8088/proxy/application_1581712733365_333633/
user: simon
2) yarn logs -appicationID application_1581712733365_333633
로그가 많다... grep으로 중요한 부분만 보자.
=> yarn logs -appicationID application_1581712733365_333633 | grep "is backed by an array"
[simon@:~/spark_batch/bin]# yarn-logs application_1581712733365_333633 | grep "is backed by an array"
2020-08-26 14:03:19,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-08-26 14:03:20,151 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2020-08-26 11:54:40,944 WARN sql.ScalaRowValueReader: Field 'field3.dataList' is backed by an array but the associated Spark Schema does not reflect this;
2020-08-26 11:52:51,814 WARN sql.ScalaRowValueReader: Field 'field4' is backed by an array but the associated Spark Schema does not reflect this;
Field 'field3.dataList' is backed by an array ~~ 에서 array 처리가 필요한 필드를 확인할 수 있다.
4. array 처리 (es.read.field.as.array.include 옵션 추가)
Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
.put("es.nodes", params.getEsNodes())
.put("es.mapping.date.rich", "false")
.put("pushdown", "true")
.put("es.scroll.size", params.getScrollSize())
.put("es.read.field.as.array.include", "field3.dataList,field4") //추가
.put("es.input.json", "true")
.put("es.read.metadata", "true").build();