실행 : shift + Enter
a : 상단에 셀 만들기
b : 하단에 셀 만들기
셀 안에 # 을 넣으면 글자 크기가 변경 됨
실행 : shift + Enter
a : 상단에 셀 만들기
b : 하단에 셀 만들기
셀 안에 # 을 넣으면 글자 크기가 변경 됨
List.isEmpty()
/**
* Returns <tt>true</tt> if this list contains no elements.
*
* @return <tt>true</tt> if this list contains no elements
*/
boolean isEmpty();
CollectionUtils.isEmpty(list)
public static boolean isEmpty(final Collection<?> coll) {
return coll == null || coll.isEmpty();
}
결국 CollectionUtils를 사용하면 리스트 인스턴스가 null인지도 체크함. CollectionUtils를 사용하면 dependencies가 늘어나므로 아래와 같이 리스트의 비어있는지를 체크하면 된다.
if(list.isEmpty && list != null)
p.132~153 - DataFrame의 기본 연산
p.168 - trim
p.170 - translate ***
p.173 - 날짜와 타임스탬프 데이터 타입
p.212 - window 함수
p.227 - 조인
spark window 함수 관련 (0) | 2021.04.05 |
---|---|
WindowSpec을 이용한 누적 합계 (cumSum) (0) | 2021.03.17 |
빅데이터 분석을 위한 스파트2 프로그래밍 (0) | 2020.11.01 |
RDD
p.110 - map vs. flatMap vs. mapPartitions
p.121 - groupBy
p.125 - distinct
p.129 - Join
p.139 - repartition
p.144 - filter
p.162 - toDebugString
DataFrame (=DataSet<Row>, SparkSQL 모듈의 핵심 추상화 모델)
p.309 - isin, when
p.313 - count, countDistinct
p.317 - current_date(), unix_timestamp(), to_date()
p.319 - 날짜 연산
p.320 - window() 시간윈도우
p.321 - desc, asc
p.329 - select, filter, where, agg
p.332 - groupby, distinct
p.335 - join
p.343 - withColumn
DataSet
p.356 - df.withColumn("count"), lit("1')).groupBy("word").agg(sum("count"))
p.359 - 객체를 이용한 데이터셋 생성, createDataset (인코더는 최적화된 바이너리를 생성하고 직렬화함)
p.365 - flatMap
p.366 - groupByKey
spark window 함수 관련 (0) | 2021.04.05 |
---|---|
WindowSpec을 이용한 누적 합계 (cumSum) (0) | 2021.03.17 |
Spark 완벽 가이드 (0) | 2020.11.02 |
원본 코드 : 하둡 파일시스템 객체로 symbolic link를 걸려고 했다.
import org.apache.hadoop.fs.FileSystem;
FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);
결과 : unsupportedOperationException 발생
org.apache.hadoop.ipc.RemoteException(java.lang.UnsupportedOperationException): Symlinks not supported
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.createSymlink(FSNamesystem.java:2139)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.createSymlink(NameNodeRpcServer.java:1455)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.createSymlink(ClientNamenodeProtocolServerSideTranslatorPB.java:1066)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy10.createSymlink(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.createSymlink(ClientNamenodeProtocolTranslatorPB.java:873)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy11.createSymlink(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.createSymlink(DFSClient.java:1769)
at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1336)
at org.apache.hadoop.hdfs.DistributedFileSystem$23.doCall(DistributedFileSystem.java:1332)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.createSymlink(DistributedFileSystem.java:1345)
at com.tmoncorp.search.batch.logcollect.LogCollectDriver.collectData(LogCollectDriver.java:67)
at com.tmoncorp.search.batch.logcollect.LogCollectDriver.<init>(LogCollectDriver.java:27)
at com.tmoncorp.search.batch.logcollect.LogCollectDriver.main(LogCollectDriver.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
조치1. symlink enable 처리 하였다. 하지만 결과는 동일.
import org.apache.hadoop.fs.FileSystem;
FileSystem.enableSymlinks(); // add line
FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);
createSymlink 함수를 살펴보니 무조껀 unsuppotedoperationException을 발생시키도록 해두었다.
public void createSymlink(Path target, Path link, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
throw new UnsupportedOperationException("Filesystem does not support symlinks!");
}
조치2. java doc 확인
1) FileSystem안의 createSymlink 확인
2) FileContext에도 createSymlink 가 있었다!
fileContext 안에 createSymlink는 작동하는 코드로 이루어져 있다!!
public void createSymlink(final Path target, Path link, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
if (!FileSystem.areSymlinksEnabled()) {
throw new UnsupportedOperationException("Symlinks not supported");
} else {
Path nonRelLink = this.fixRelativePart(link);
(new FSLinkResolver<Void>() {
public Void next(AbstractFileSystem fs, Path p) throws IOException, UnresolvedLinkException {
fs.createSymlink(target, p, createParent);
return null;
}
}).resolve(this, nonRelLink);
}
}
조치3. FileSystem => FileContext 사용
//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
//import org.apache.hadoop.fs.FileSystem;
FileSystem.enableSymlinks(); // FileContext를 사용해도 해당 코드 필요
//FileSystem fs = FileSystem.get(session.getSession().sparkContext().hadoopConfiguration());
//fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);
FileContext fc = FileContext.getFileContext(session.getSession().sparkContext().hadoopConfiguration());
fc.createSymlink(new Path(params.getWriteAppendDatePath()), new Path(params.getWritePath()), true);
조치4. hdfs-site.xml 에 아래 옵션도 추가해주어야 한다.
<property>
<name>test.SymlinkEnabledForTesting</name>
<value>true</value>
</property>
1. 작업 :
1) elasticsearch의 indexName의 데이터를 가져온다.
2) 가져온 데이터를 하둡에 적재한다.
Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
.put("es.nodes", params.getEsNodes())
.put("es.mapping.date.rich", "false")
.put("pushdown", "true")
.put("es.scroll.size", params.getScrollSize())
// .put("es.read.unmapped.fields.ignore", "false")
.put("es.input.json", "true")
.put("es.read.metadata", "true").build();
Dataset<Row> dataSet = JavaEsSparkSQL.esDF(session.getSqlContext(), "indexName", esConfigParam());
dataSet.write().mode(SaveMode.Overwrite).option("compression","gzip").parquet("/hadoopPath");
2. 에러 확인
로그 :
2020-08-26 11:53:38,502 WARN scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 330, localhost, executor 6): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:248)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
3. 원인 분석 : 특정 필드가 array로 되어 있는데 spark에서 string 타입으로 읽어들여 발생함.
=> The value (Buffer(제주도, 하와이)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
4. 자세한 로그 확인 :
1) application_id를 확인한다 => 로그에서 tracking URL의 application_xxxx 부분.
2020-08-26 11:52:20,174 INFO yarn.Client:
client token: N/A
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1598410339147
final status: UNDEFINED
tracking URL: http://localhost:8088/proxy/application_1581712733365_333633/
user: simon
2) yarn logs -appicationID application_1581712733365_333633
로그가 많다... grep으로 중요한 부분만 보자.
=> yarn logs -appicationID application_1581712733365_333633 | grep "is backed by an array"
[simon@:~/spark_batch/bin]# yarn-logs application_1581712733365_333633 | grep "is backed by an array"
2020-08-26 14:03:19,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-08-26 14:03:20,151 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2020-08-26 11:54:40,944 WARN sql.ScalaRowValueReader: Field 'field3.dataList' is backed by an array but the associated Spark Schema does not reflect this;
2020-08-26 11:52:51,814 WARN sql.ScalaRowValueReader: Field 'field4' is backed by an array but the associated Spark Schema does not reflect this;
Field 'field3.dataList' is backed by an array ~~ 에서 array 처리가 필요한 필드를 확인할 수 있다.
4. array 처리 (es.read.field.as.array.include 옵션 추가)
Map<String, String> esConfig = new ImmutableMap.Builder<String, String>()
.put("es.nodes", params.getEsNodes())
.put("es.mapping.date.rich", "false")
.put("pushdown", "true")
.put("es.scroll.size", params.getScrollSize())
.put("es.read.field.as.array.include", "field3.dataList,field4") //추가
.put("es.input.json", "true")
.put("es.read.metadata", "true").build();
org.apache.hadoop.ipc.RemoteException(java.lang.UnsupportedOperationException): Symlinks not supported (0) | 2020.09.05 |
---|---|
스파크2 프로그래밍 #2.RDD (0) | 2020.06.12 |
스파크2 프로그래밍 #1.스파크 소개 (0) | 2020.05.26 |