List.isEmpty()

    /**
     * Returns <tt>true</tt> if this list contains no elements.
     *
     * @return <tt>true</tt> if this list contains no elements
     */
    boolean isEmpty();

 

CollectionUtils.isEmpty(list)

 public static boolean isEmpty(final Collection<?> coll) {
        return coll == null || coll.isEmpty();
    }

 

결국 CollectionUtils를 사용하면 리스트 인스턴스가 null인지도 체크함. CollectionUtils를 사용하면 dependencies가 늘어나므로 아래와 같이 리스트의 비어있는지를 체크하면 된다.

if(list.isEmpty && list != null)
더보기

p.132~153 - DataFrame의 기본 연산

p.168 - trim

p.170 - translate ***

p.173 - 날짜와 타임스탬프 데이터 타입

p.212 - window 함수

p.227 - 조인

 

더보기

RDD

p.110 - map vs. flatMap vs. mapPartitions

p.121 - groupBy

p.125 - distinct

p.129 - Join

p.139 - repartition

p.144 - filter

p.162 - toDebugString 

 

DataFrame (=DataSet<Row>, SparkSQL 모듈의 핵심 추상화 모델)

p.309 - isin, when

p.313 - count, countDistinct

p.317 - current_date(), unix_timestamp(), to_date()

p.319 - 날짜 연산

p.320 - window() 시간윈도우

p.321 - desc, asc

p.329 - select, filter, where, agg

p.332 - groupby, distinct

p.335 - join

p.343 - withColumn

 

DataSet

p.356 - df.withColumn("count"), lit("1')).groupBy("word").agg(sum("count"))

p.359 - 객체를 이용한 데이터셋 생성, createDataset (인코더는 최적화된 바이너리를 생성하고 직렬화함)

p.365 - flatMap

p.366 - groupByKey

 

'IT 개발 > Spark 관련' 카테고리의 다른 글

spark window 함수 관련  (0) 2021.04.05
WindowSpec을 이용한 누적 합계 (cumSum)  (0) 2021.03.17
Spark 완벽 가이드  (0) 2020.11.02

원본 코드 : 하둡 파일시스템 객체로 symbolic link를 걸려고 했다.

import org.apache.hadoop.fs.FileSystem;

FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

결과 : unsupportedOperationException 발생

org.apache.hadoop.ipc.RemoteException(java.lang.UnsupportedOperationException): Symlinks not supported
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.createSymlink(FSNamesystem.java:2139)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.createSymlink(NameNodeRpcServer.java:1455)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.createSymlink(ClientNamenodeProtocolServerSideTranslatorPB.java:1066)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
    
    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy10.createSymlink(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.createSymlink(ClientNamenodeProtocolTranslatorPB.java:873)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy11.createSymlink(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.createSymlink(DFSClient.java:1769)
    at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1336)
    at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1332)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.createSymlink(DistributedFileSystem.java:1345)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.collectData(LogCollectDriver.java:67)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.<init>(LogCollectDriver.java:27)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.main(LogCollectDriver.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)

 조치1. symlink enable 처리 하였다. 하지만 결과는 동일.

import org.apache.hadoop.fs.FileSystem;

FileSystem.enableSymlinks(); // add line
FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

createSymlink 함수를 살펴보니 무조껀 unsuppotedoperationException을 발생시키도록 해두었다.

public void createSymlink(Path target, Path link, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
        throw new UnsupportedOperationException("Filesystem does not support symlinks!");
    }

조치2. java doc 확인

1) FileSystem안의 createSymlink 확인

hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/FileSystem.html#createSymlink(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path,%20boolean)

 

FileSystem (Apache Hadoop Main 2.7.3 API)

FSDataOutputStream create(Path f, FsPermission permission, EnumSet  flags, int bufferSize, short replication, long blockSize, Progressable progress, org.apache.hadoop.fs.Options.ChecksumOpt checksumOpt) Create an FSDataOutputStream at the indicated

hadoop.apache.org

2) FileContext에도 createSymlink 가 있었다!

hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/FileContext.html#createSymlink(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path,%20boolean)

 

 

FileContext (Apache Hadoop Main 2.7.3 API)

The FileContext class provides an interface to the application writer for using the Hadoop file system. It provides a set of methods for the usual operation: create, open, list, etc *** Path Names *** The Hadoop file system supports a URI name space and UR

hadoop.apache.org

fileContext 안에 createSymlink는 작동하는 코드로 이루어져 있다!!

public void createSymlink(final Path target, Path link, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
        if (!FileSystem.areSymlinksEnabled()) {
            throw new UnsupportedOperationException("Symlinks not supported");
        } else {
            Path nonRelLink = this.fixRelativePart(link);
            (new FSLinkResolver<Void>() {
                public Void next(AbstractFileSystem fs, Path p) throws IOException, UnresolvedLinkException {
                    fs.createSymlink(target, p, createParent);
                    return null;
                }
            }).resolve(this, nonRelLink);
        }
    }

 

조치3. FileSystem => FileContext 사용

//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;

//import org.apache.hadoop.fs.FileSystem;

FileSystem.enableSymlinks(); // FileContext를 사용해도 해당 코드 필요
//FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
//fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

FileContext fc = FileContext.getFileContext(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

 

조치4. hdfs-site.xml 에 아래 옵션도 추가해주어야 한다.

<property>
         <name>test.SymlinkEnabledForTesting</name>
         <value>true</value>
 </property>

1. 작업 :

1) elasticsearch의 indexName의 데이터를 가져온다.

2) 가져온 데이터를 하둡에 적재한다.

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
//		.put("es.read.unmapped.fields.ignore", "false")
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();

Dataset<Row> dataSet = JavaEsSparkSQL.esDF(session.getSqlContext(), "indexName", esConfigParam());

dataSet.write().mode(SaveMode.Overwrite).option("compression","gzip").parquet("/hadoopPath");

2. 에러 확인

로그 :

2020-08-26 11:53:38,502 WARN scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 330, localhost, executor 6): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:248)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

3. 원인 분석 : 특정 필드가 array로 되어 있는데 spark에서 string 타입으로 읽어들여 발생함. 

=> The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

 

4. 자세한 로그 확인 : 

1) application_id를 확인한다 => 로그에서 tracking URL의 application_xxxx 부분.

2020-08-26 11:52:20,174 INFO yarn.Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1598410339147
     final status: UNDEFINED
     tracking URL: http://localhost:8088/proxy/application_1581712733365_333633/
     user: simon

2) yarn logs -appicationID application_1581712733365_333633

로그가 많다... grep으로 중요한 부분만 보자.

=> yarn logs -appicationID application_1581712733365_333633 | grep "is backed by an array"

[simon@:~/spark_batch/bin]# yarn-logs application_1581712733365_333633 | grep "is backed by an array"
2020-08-26 14:03:19,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-08-26 14:03:20,151 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2020-08-26 11:54:40,944 WARN sql.ScalaRowValueReader: Field 'field3.dataList' is backed by an array but the associated Spark Schema does not reflect this;
2020-08-26 11:52:51,814 WARN sql.ScalaRowValueReader: Field 'field4' is backed by an array but the associated Spark Schema does not reflect this;

Field 'field3.dataList' is backed by an array ~~ 에서 array 처리가 필요한 필드를 확인할 수 있다.

 

4. array 처리 (es.read.field.as.array.include 옵션 추가)

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
		.put("es.read.field.as.array.include", "field3.dataList,field4") //추가
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();

 

 

'IT 개발 > Spring 수업' 카테고리의 다른 글

[Spring 수업] #6. Layered Architecture & @Component  (0) 2020.06.22

+ Recent posts