원본 코드 : 하둡 파일시스템 객체로 symbolic link를 걸려고 했다.

import org.apache.hadoop.fs.FileSystem;

FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

결과 : unsupportedOperationException 발생

org.apache.hadoop.ipc.RemoteException(java.lang.UnsupportedOperationException): Symlinks not supported
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.createSymlink(FSNamesystem.java:2139)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.createSymlink(NameNodeRpcServer.java:1455)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.createSymlink(ClientNamenodeProtocolServerSideTranslatorPB.java:1066)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
    
    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy10.createSymlink(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.createSymlink(ClientNamenodeProtocolTranslatorPB.java:873)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy11.createSymlink(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.createSymlink(DFSClient.java:1769)
    at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1336)
    at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1332)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.createSymlink(DistributedFileSystem.java:1345)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.collectData(LogCollectDriver.java:67)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.<init>(LogCollectDriver.java:27)
    at com.tmoncorp.search.batch.logcollect.LogCollectDriver.main(LogCollectDriver.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)

 조치1. symlink enable 처리 하였다. 하지만 결과는 동일.

import org.apache.hadoop.fs.FileSystem;

FileSystem.enableSymlinks(); // add line
FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

createSymlink 함수를 살펴보니 무조껀 unsuppotedoperationException을 발생시키도록 해두었다.

public void createSymlink(Path target, Path link, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
        throw new UnsupportedOperationException("Filesystem does not support symlinks!");
    }

조치2. java doc 확인

1) FileSystem안의 createSymlink 확인

hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/FileSystem.html#createSymlink(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path,%20boolean)

 

FileSystem (Apache Hadoop Main 2.7.3 API)

FSDataOutputStream create(Path f, FsPermission permission, EnumSet  flags, int bufferSize, short replication, long blockSize, Progressable progress, org.apache.hadoop.fs.Options.ChecksumOpt checksumOpt) Create an FSDataOutputStream at the indicated

hadoop.apache.org

2) FileContext에도 createSymlink 가 있었다!

hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/FileContext.html#createSymlink(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path,%20boolean)

 

 

FileContext (Apache Hadoop Main 2.7.3 API)

The FileContext class provides an interface to the application writer for using the Hadoop file system. It provides a set of methods for the usual operation: create, open, list, etc *** Path Names *** The Hadoop file system supports a URI name space and UR

hadoop.apache.org

fileContext 안에 createSymlink는 작동하는 코드로 이루어져 있다!!

public void createSymlink(final Path target, Path link, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
        if (!FileSystem.areSymlinksEnabled()) {
            throw new UnsupportedOperationException("Symlinks not supported");
        } else {
            Path nonRelLink = this.fixRelativePart(link);
            (new FSLinkResolver<Void>() {
                public Void next(AbstractFileSystem fs, Path p) throws IOException, UnresolvedLinkException {
                    fs.createSymlink(target, p, createParent);
                    return null;
                }
            }).resolve(this, nonRelLink);
        }
    }

 

조치3. FileSystem => FileContext 사용

//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;

//import org.apache.hadoop.fs.FileSystem;

FileSystem.enableSymlinks(); // FileContext를 사용해도 해당 코드 필요
//FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
//fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

FileContext fc = FileContext.getFileContext(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);

 

조치4. hdfs-site.xml 에 아래 옵션도 추가해주어야 한다.

<property>
         <name>test.SymlinkEnabledForTesting</name>
         <value>true</value>
 </property>

1. 작업 :

1) elasticsearch의 indexName의 데이터를 가져온다.

2) 가져온 데이터를 하둡에 적재한다.

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
//		.put("es.read.unmapped.fields.ignore", "false")
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();

Dataset<Row> dataSet = JavaEsSparkSQL.esDF(session.getSqlContext(), "indexName", esConfigParam());

dataSet.write().mode(SaveMode.Overwrite).option("compression","gzip").parquet("/hadoopPath");

2. 에러 확인

로그 :

2020-08-26 11:53:38,502 WARN scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 330, localhost, executor 6): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:248)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

3. 원인 분석 : 특정 필드가 array로 되어 있는데 spark에서 string 타입으로 읽어들여 발생함. 

=> The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

 

4. 자세한 로그 확인 : 

1) application_id를 확인한다 => 로그에서 tracking URL의 application_xxxx 부분.

2020-08-26 11:52:20,174 INFO yarn.Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1598410339147
     final status: UNDEFINED
     tracking URL: http://localhost:8088/proxy/application_1581712733365_333633/
     user: simon

2) yarn logs -appicationID application_1581712733365_333633

로그가 많다... grep으로 중요한 부분만 보자.

=> yarn logs -appicationID application_1581712733365_333633 | grep "is backed by an array"

[simon@:~/spark_batch/bin]# yarn-logs application_1581712733365_333633 | grep "is backed by an array"
2020-08-26 14:03:19,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-08-26 14:03:20,151 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2020-08-26 11:54:40,944 WARN sql.ScalaRowValueReader: Field 'field3.dataList' is backed by an array but the associated Spark Schema does not reflect this;
2020-08-26 11:52:51,814 WARN sql.ScalaRowValueReader: Field 'field4' is backed by an array but the associated Spark Schema does not reflect this;

Field 'field3.dataList' is backed by an array ~~ 에서 array 처리가 필요한 필드를 확인할 수 있다.

 

4. array 처리 (es.read.field.as.array.include 옵션 추가)

Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
		.put("es.nodes", params.getEsNodes())
		.put("es.mapping.date.rich", "false")
		.put("pushdown", "true")
		.put("es.scroll.size", params.getScrollSize())
		.put("es.read.field.as.array.include", "field3.dataList,field4") //추가
		.put("es.input.json", "true")
		.put("es.read.metadata", "true").build();

 

빅데이터 : Volume. Veriety, Velocity -크기, 다양성, 속도

Variability(가벼성), Veracity(정확성), Complexity(복잡성), Visibillity(시인성)

 

스파크 : 하둡 기반의 맵리듀스 작업의 단점(파일방식,속도느림,복잡성)을 보완하기 위해 개발, 메모리 방식

 

데이터 모델 1. RDD (Resilient distributed dataset) : 분산 방식으로 저장된 데이터 요소들의 집합, 병렬처리가 가능하고 장애 복구 내성(tolerance)를 가진 모델

  • RDD는 파티션 단위로 나뉘어 병렬 처리 됨.
  • 작업중 파티션은 재구성 되면서 네트워크를 통해 다른 서버의 파티션으로 이동하는 셔플링이 발생. 셔플링은 작업 성능에 큰 영향을 주기 때문에 주의해야 함.
  • RDD는 연산이 수행 시 새롭게 생성되며, 변경되지 않는 읽기 전용 모델이다. 전 단계 RDD와 어떤 연산을 적용해서 만들어 지는지를 기억(lineage)하고 있어서 특정 절차에 장애가 발생해도 해당 이력을 바탕으로 RDD 생성을 재시도 하게 된다.
  • RDD 제공 연산 1. - 트랜스포메이션(Transformation) : 새로운 RDD를 만들어 내는 변환 연산, 액션 연산이 호출될 때 여러 변환 연산들의 최적화를 분석해서 실행(lazy 실행방식)이 된다.
  • RDD 제공 연산 2. - 액션(Action) : 연산의 결과(RDD가 아닌 다른 값, 반환하지 않는 연산 포함)를 반환하는 동작, ex) sum(), stddev() 등

DAG 스케쥴러 : 셔플의 유무에 따라 스테이지를 나누고 셔플로 인해 발생하는 부하를 최대한 줄이는 역활을 함.

  • Driver 메인 함수 > Spark Context > RDD 연산 정보를 DAG 스케쥴러에게 전달 : 전체 데이터 처리 흐름을 분석해서 네트워크를 통한 데이터 이동이 최소화 되도록 Stage 구성 > 클러스터 매니저에게 전달 > Stage 별 task 실행
  • 좁은 의존성 : 연산이 실행되는데 필요한 파티션이 적은 경우, 파티션간의 셔플이 발생하지 않음. (더하기 연산)
  • 넓은 의존성 : 여러 파티션의 정보가 연산에 필요한 경우, 각 파티션들의 데이터를 셔플하여 시간이 오래 걸림. (SUM 등)

Spark Context와 Spark Session : 스파크 애플리케이션이 동작하기 위한 서버 프로세스가 필요. 

  • 클러스터를 구성하는 개별 서버 간에 메시지와 데이터를 주고 받을 수 있는 채널(백엔드 서버 프로세스) 구동
  • 개별 작업 수행에 필요한 메타 정보를 저장하고 관리
  • 스파크 2.0부터 세션안에 컨텍스트가 포함됨.

데이터 프레임

  • RDD는 실행할 함수를 직접 전달 받아 사용하는 대신에 DataFrame은 사전에 정의된 표현식과 내부 함수를 이용해 코드를 작성
  • 논리적 작업 트리로 변환한 후 최적화 과정을 거쳐서 최종 동작을 수행 => RDD에 비해 훨씬 효율적이고 최적화를 수행.
RDD : 직접 함수를 전달받아 처리
val words = rdd.flatMap(str => str.split(" "))
val wcPair = words.map(v => (v,1))

DF : 내장 함수 사용
val wordDF = df.select(explode(split(col("value"), " ")).as("word"))

 

+ Recent posts