IT 개발/Spark Book Review

java.lang.IllegalArgumentException: The value (Buffer(키워드,키워드2)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

OKKY 2020. 8. 26. 12:11

1. 작업 :

1) elasticsearch의 indexName의 데이터를 가져온다.

2) 가져온 데이터를 하둡에 적재한다.

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
//		.put("es.read.unmapped.fields.ignore", "false")
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();

Dataset<Row> dataSet = JavaEsSparkSQL.esDF(session.getSqlContext(), "indexName", esConfigParam());

dataSet.write().mode(SaveMode.Overwrite).option("compression","gzip").parquet("/hadoopPath");

2. 에러 확인

로그 :

2020-08-26 11:53:38,502 WARN scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 330, localhost, executor 6): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:248)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

3. 원인 분석 : 특정 필드가 array로 되어 있는데 spark에서 string 타입으로 읽어들여 발생함. 

=> The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

 

4. 자세한 로그 확인 : 

1) application_id를 확인한다 => 로그에서 tracking URL의 application_xxxx 부분.

2020-08-26 11:52:20,174 INFO yarn.Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1598410339147
     final status: UNDEFINED
     tracking URL: http://localhost:8088/proxy/application_1581712733365_333633/
     user: simon

2) yarn logs -appicationID application_1581712733365_333633

로그가 많다... grep으로 중요한 부분만 보자.

=> yarn logs -appicationID application_1581712733365_333633 | grep "is backed by an array"

[simon@:~/spark_batch/bin]# yarn-logs application_1581712733365_333633 | grep "is backed by an array"
2020-08-26 14:03:19,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-08-26 14:03:20,151 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2020-08-26 11:54:40,944 WARN sql.ScalaRowValueReader: Field 'field3.dataList' is backed by an array but the associated Spark Schema does not reflect this;
2020-08-26 11:52:51,814 WARN sql.ScalaRowValueReader: Field 'field4' is backed by an array but the associated Spark Schema does not reflect this;

Field 'field3.dataList' is backed by an array ~~ 에서 array 처리가 필요한 필드를 확인할 수 있다.

 

4. array 처리 (es.read.field.as.array.include 옵션 추가)

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
		.put("es.read.field.as.array.include", "field3.dataList,field4") //추가
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();