盈彩体育注册(中国)有限公司
盈彩体育注册(中国)有限公司 您所在的位置:网站首页 盈彩体育注册(中国)有限公司 IDEA配置本地开发环境连接远程集群访问Hdfs,Spark SQL访问Hive

IDEA配置本地开发环境连接远程集群访问Hdfs,Spark SQL访问Hive

2024-05-06 20:48:41| 来源: 网络整理

摘要:IDEA,Spark,Hive,Hdfs

IDEA配置访问hdfsIDEA本地环境需要配置pom.xml依赖配置hadoop-clientresources集群hadoop配置文件hdfs-site.xml,core-site.xml org.apache.hadoop hadoop-client ${hadoop.version}

将hdfs-site.xml,core-site.xml放入src/main/resources目录下hifs-site.xml配置详解,通过RPC通信地址进行hdfs文件的读写操作

dfs.replication 3 dfs.permissions.enabled false dfs.nameservices ns1 dfs.blocksize 134217728 dfs.ha.namenodes.ns1 nn1,nn2 dfs.namenode.rpc-address.ns1.nn1 cloudera01:8020 dfs.namenode.http-address.ns1.nn1 cloudera01:50070 dfs.namenode.rpc-address.ns1.nn2 cloudera02:8020 dfs.namenode.http-address.ns1.nn2 cloudera02:50070 dfs.client.failover.proxy.provider.ns1 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.ha.automatic-failover.enabled true

core-site.xml配置详解

fs.defaultFS hdfs://ns1 ha.zookeeper.quorum cloudera01:2181,cloudera02:2181,cloudera14:2181,cloudera03:2181,cloudera16:2181 fs.hdfs.impl.disable.cache true

scala脚本访问hdfs工具类,使用Configuration加载core-site.xml, hdfs-site.xml配置文件获得hdfs入口进行读写操作

import java.io._import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.{FileSystem, _}import org.apache.hadoop.io.IOUtils/** * hdfs文件夹操作类 */object HdfsUtils { def getFS(): FileSystem = { this.synchronized { System.setProperty("HADOOP_USER_NAME", "hdfs") val conf = new Configuration() conf.addResource(getClass.getResourceAsStream("/core-site.xml")) conf.addResource(getClass.getResourceAsStream("/hdfs-site.xml")) conf.set("mapred.remote.os", "Linux") println(conf) FileSystem.get(conf) } } /** * 关闭FileSystem * * @param fileSystem */ def closeFS(fileSystem: FileSystem) { this.synchronized { if (fileSystem != null) { try { fileSystem.close() } catch { case e: IOException => e.printStackTrace() } } } } /** * ls * * @param hdfsFilePath */ def listFiles(hdfsFilePath: String): Unit = { this.synchronized { val fileSystem = getFS() try { val fstats = fileSystem.listStatus(new Path(hdfsFilePath)) for (fstat e.printStackTrace() } finally { if (fileSystem != null) { try { fileSystem.close() } catch { case e: IOException => e.printStackTrace() } } } } } def ls(fileSystem: FileSystem, path: String) = { println("list path:" + path) val fs = fileSystem.listStatus(new Path(path)) val listPath = FileUtil.stat2Paths(fs) for (p e.printStackTrace() case e: IOException => e.printStackTrace() } finally { this.closeFS(fileSystem) } } } /** * 删除文件或目录 * * @param hdfsFilePath * @param recursive 递归 */ def rm(hdfsFilePath: String, recursive: Boolean): Unit = { this.synchronized { val fileSystem = this.getFS() try { val path = new Path(hdfsFilePath) if (fileSystem.exists(path)) { val success = fileSystem.delete(path, recursive) if (success) { System.out.println("delete successfully") } } } catch { case e: IllegalArgumentException => e.printStackTrace() case e: IOException => e.printStackTrace() } finally { this.closeFS(fileSystem) } } } /** * 上传文件到HDFS * * @param localPath * @param hdfspath */ def write(localPath: String, hdfspath: String) { this.synchronized { val fileSystem = this.getFS() var inStream: FileInputStream = null var outStream: FSDataOutputStream = null try { inStream = new FileInputStream( new File(localPath)) val writePath = new Path(hdfspath) outStream = fileSystem.create(writePath) IOUtils.copyBytes(inStream, outStream, 4096, false) } catch { case e: IOException => e.printStackTrace() } finally { IOUtils.closeStream(inStream) IOUtils.closeStream(outStream) this.closeFS(fileSystem) } } } /** * 读文本文件并返回行的列表 * * @param hdfspath */ def readAllLines(hdfspath: String): scala.collection.mutable.ListBuffer[String] = { this.synchronized { val fileSystem = this.getFS() var inStreamReader: InputStreamReader = null var isr: java.io.BufferedReader = null var allLines: scala.collection.mutable.ListBuffer[String] = scala.collection.mutable.ListBuffer() try { val readPath = new Path(hdfspath) inStreamReader = new InputStreamReader(fileSystem.open(readPath), "UTF-8") isr = new java.io.BufferedReader(inStreamReader) var line: String = null do { line = isr.readLine() if (line != null) { //println(line) allLines += line; } } while (line != null) } catch { case e: IOException => { e.printStackTrace() } } finally { isr.close inStreamReader.close this.closeFS(fileSystem) } allLines } } /** * 读文本文件并返回行的列表 * * @param hdfspath */ def readContent(hdfspath: String): String = { this.synchronized { val fileSystem = this.getFS() var buf: Array[Byte] = null var inputStream: FSDataInputStream = null try { val readPath = new Path(hdfspath) buf = new Array[Byte](fileSystem.getFileStatus(readPath).getLen.toInt) inputStream = fileSystem.open(readPath) var toRead: Int = buf.length var off =while (toRead > 0) { val ret: Int = inputStream.read(buf, off, toRead) if (ret < 0) { throw new IOException("Premature EOF from inputStream") } toRead = toRead - ret off += ret Thread.sleep(10) } new String(buf, "UTF-8") } catch { case e: Exception => { e.printStackTrace() } "" } finally { inputStream.close this.closeFS(fileSystem) } } } /** * // * 上传文件到HDFS * // * * // * @param localFilePath * // * @param hdfsFilePath * // */ def put(localFilePath: String, hdfsFilePath: String) = { this.synchronized { val fileSystem = this.getFS() var fdos: FSDataOutputStream = null var fis: FileInputStream = null try { fdos = fileSystem.create(new Path(hdfsFilePath)) fis = new FileInputStream(new File(localFilePath)) IOUtils.copyBytes(fis, fdos, 1024) } catch { case e: IllegalArgumentException => e.printStackTrace() case e: IOException => e.printStackTrace() } finally { IOUtils.closeStream(fdos) IOUtils.closeStream(fis) this.closeFS(fileSystem) } } } /** * 打印hdfs上的文件内容 * * @param hdfsFilePath */ def cat(hdfsFilePath: String) { this.synchronized { val fileSystem = this.getFS() var inStream: FSDataInputStream = null try { val readPath = new Path(hdfsFilePath) inStream = fileSystem.open(readPath) IOUtils.copyBytes(inStream, System.out, 4096, false) } catch { case e: IOException => e.printStackTrace() } finally { IOUtils.closeStream(inStream) this.closeFS(fileSystem) } } } /** * 打印hdfs上的文件内容 * * @param hdfsFilePath */ def exist(hdfsFilePath: String): Boolean = { this.synchronized { val fileSystem = this.getFS() try { fileSystem.exists(new Path(hdfsFilePath)) } catch { case e: IOException => e.printStackTrace() false false } finally { // this.closeFS(fileSystem) } } } /** * 下载文件到本地 * * @param localFilePath * @param hdfsFilePath */ def get(localFilePath: String, hdfsFilePath: String) { this.synchronized { val fileSystem = this.getFS() var fsis: FSDataInputStream = null var fos: FileOutputStream = null try { fsis = fileSystem.open(new Path(hdfsFilePath)) fos = new FileOutputStream(new File(localFilePath)) IOUtils.copyBytes(fsis, fos, 1024) } catch { case e: IllegalArgumentException => e.printStackTrace() case e: IOException => e.printStackTrace() } finally { IOUtils.closeStream(fsis) IOUtils.closeStream(fos) this.closeFS(fileSystem) } } }}IDEA配置Spark访问远程集群hiveIDEA本地环境需要配置pom.xml依赖配置spark-core,spark-sql,spark-hive依赖,其中spark依赖的依赖范围使用默认的compileresources集群hive配置文件hive-site.xml指定metastore服务的机器地址和端口号远程集群hive开启metastore服务hive --service metastore -p 9083 &

引入项目依赖

org.apache.spark spark-core_2.11 ${spark.version} org.apache.hadoop hadoop-client com.google.guava guava org.apache.spark spark-sql_2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.spark spark-hive_2.11 ${spark.version}

hive-site.xml配置,其中hive.metastore.uris指定metastore服务运行的机器ip和端口,并且需要单独手动启动metastore服务,客户端连接metastore服务,metastore再去连接MySQL数据库来存取hive元数据,元数据包含用Hive创建的database、table等的元信息。有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接metastore 服务即可。

hive.metastore.uris thrift://cloudera01:9083

脚本测试使用Spark SQL连接远程hive,如果没有权限访问hive表修改HADOOP_USER_NAME,例如为hdfs

import org.apache.spark.sql.SparkSessionobject test { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") // hive val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate() spark.sql("show databases").show() val df = spark.sql("select * from test_gp.student_info") df.show() }}


【本文地址】 转载请注明 

最新文章

推荐文章

CopyRight 2018-2019 盈彩体育注册(中国)有限公司 版权所有 豫ICP备16040606号-1