博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hbase 基础API
阅读量:6890 次
发布时间:2019-06-27

本文共 13288 字,大约阅读时间需要 44 分钟。

 

本文参考:https://www.cnblogs.com/skyl/p/4803738.html

package com.Hbaseimport org.apache.hadoop.hbase._import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.filter.SingleColumnValueFilterobject HbaseDDL {  def main(args: Array[String]): Unit = {    // 确认表是否存在    def ensureHbaseTableExist(tableName:TableName) = {      // 配置 Hbase      val hbaseconf = HBaseConfiguration.create()      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val ifExist = adminHbase.tableExists(tableName)      ifExist    }    // 确认表是否存在 测试//    val result = ensureHbaseTableExist("user_info1")//    if (result) {//      println("表存在")//    } else {//      println("表不存在")//    }    /**      * Hbase建表  两个参数      * @param tableName   形式为 ns:tb  或者  tb    API 创建 namespace 机会不多,一般通过 hbase shell 创建      * @param columnFamilys  cf1,cf2,cf3      */    def createHbaseTable(tableName:String, columnFamilys:String) = {      // 配置 Hbase      val hbaseconf = HBaseConfiguration.create()      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      // 列簇逗号分隔      val CFS = columnFamilys.split(",")      // 表名,判断是否带了namespace,带了则判断是否存在 namespace, 不存在则创建      val nameSpace = tableName.split(":")(0)      if (nameSpace != tableName) {        adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build())        println("NameSpace 创建成功!")      }      // 判断表是否存在,不存在新建,存在则提示      if (!ensureHbaseTableExist(TableName.valueOf(tableName))) {        // 实例化 HTableDescriptor        val htable = new HTableDescriptor(TableName.valueOf(tableName))        // 循环添加所有列簇        for ( columnFamily <- CFS) {          // 实例化 HColumnDescriptor          val htableColumnFamily1 = new HColumnDescriptor((columnFamily))          // 调用 HColumnDescriptor 设置列簇属性          htableColumnFamily1.setMaxVersions(3)          // 表增加列族          htable.addFamily(new HColumnDescriptor(columnFamily))        }        // 创建表        adminHbase.createTable(htable)        println("表创建成功")      } else {        println("表已存在")      }       adminHbase.close()    }    // 测试建表   // createHbaseTable("scTable3", "info,base")    /**      *  列出所有表      */    def listAllHbaseTable() ={      // 配置 Hbase      val hbaseconf = HBaseConfiguration.create()      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val listTables =  adminHbase.listTableNames()      for(table <- listTables){        println(table)      }      adminHbase.close()    }    //listAllHbaseTable()    /**      * 删除一张表,输入表名      * 判断是否存在,是否失效,否则不能删除      *      * @param tableName      */    def deleteHbaseTable(tableName: String) ={      val hbaseconf = HBaseConfiguration.create()      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val tbName = TableName.valueOf(tableName)      if(ensureHbaseTableExist(tbName)){        // 若表不失效,则使失效        if(!adminHbase.isTableDisabled(tbName)){          adminHbase.disableTable(tbName)        }        adminHbase.deleteTable(tbName)        println("删除成功")      } else {        println("表不存在")      }      adminHbase.close()    }    //deleteHbaseTable("scTable3")    /**      *      * 删除表的某个列族  ,得到 HTableDescriptor , 调用该类的 removeFamily 方法      * @param tableName   表名  --->   ns:tb   or    tb   [String]      * @param columnFamily   列族名  --->   String      */    def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={      val hbaseconf = HBaseConfiguration.create()      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val tbName = TableName.valueOf(tableName)      // disable table      adminHbase.disableTable(tbName)      // get HTableDescriptor      val htd = adminHbase.getTableDescriptor(tbName)      // delete family      htd.removeFamily(columnFamily.getBytes())      // apply htd to table      adminHbase.modifyTable(tbName, htd)      // enable table      adminHbase.enableTable(tbName)      println("删除成功")      adminHbase.close()    }   // deleteHbaseColumnFamily("scTable3", "base")    /**      * 给表增加列族    先得到表的 HTableDescriptor, 然后使用 HColumnDescriptor 初始化 新增列,并设置属性      * 调用 HTableDescriptor 的 addFamily 方法,将初始化好的 HCD 添加到  HTableDescriptor ,然后使用admin 的 modifyTable 方法将修改应用      * @param tableName      * @param columnFamily      */    def addHbaseColumnFamily(tableName:String, columnFamily:String) ={      val hbaseconf = HBaseConfiguration.create()      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val tbName = TableName.valueOf(tableName)      // disable table      adminHbase.disableTable(tbName)      // get HTableDescriptor      val htd = adminHbase.getTableDescriptor(tbName)      //      val hcd = new HColumnDescriptor(columnFamily)      hcd.setMaxVersions(3)      // add family      htd.addFamily(hcd)      // apply htd to table      adminHbase.modifyTable(tbName, htd)      // enable table      adminHbase.enableTable(tbName)      println("添加成功")      adminHbase.close()    }    //addHbaseColumnFamily("scTable3", "base")    /**      * 修改列簇功能  get 到  HTableDescriptor ,再 get 到 Family ,设置 Family ,admin modifyTable  应用      * @param tableName      * @param columnFamily      */    def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String)  ={      val hbaseconf = HBaseConfiguration.create()      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)      val adminHbase = HbaseConn.getAdmin()      val tbName = TableName.valueOf(tableName)      adminHbase.disableTable(tbName)      val htd = adminHbase.getTableDescriptor(tbName)      val modifyCol  = htd.getFamily(columnFamily.getBytes())      modifyCol.setMaxVersions(3)      adminHbase.modifyTable(tbName,htd)      adminHbase.enableTable(tbName)      adminHbase.close()      println("修改成功!")    }    //modifyHbaseTableColumnFamily("scTable3", "info")    /**      * 插入数据,五个参数      * @param tableName      * @param columnFamily      * @param column      * @param rowkey      * @param value      */    def putDataHbaseTable(tableName:String, columnFamily:String, column:String,                          rowkey:String, value:String) ={      val hbaseconf = HBaseConfiguration.create()      // table      val hTable = new HTable(hbaseconf, tableName)      // row key      val putData = new Put(rowkey.getBytes())      // put value      putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes())      /**        * 插入方式        * ASYNC_WAL : 当数据变动时,异步写WAL日志        * SYNC_WAL : 当数据变动时,同步写WAL日志        * FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘        * SKIP_WAL : 不写WAL日志        * USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL        */      putData.setDurability(Durability.SYNC_WAL)      // put data to table      hTable.put(putData)      println("插入数据成功!")      // close table      hTable.close()    }    //putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala")    def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String,                             column:String = null)={      val hbaseConf = HBaseConfiguration.create()      val hTable = new HTable(hbaseConf, tableName)      // 初始化 Delete ,表名后可接时间戳      val deletaData = new Delete(rowkey.getBytes())      /**        *   1).删除指定列的 最新版本 的数据:Delete addColumn (byte[] family, byte[] qualifier)        *   2).删除指定列的 指定版本 的数据:Delete addColumn (byte[] family, byte[] qualifier, long timestamp )        *   3).删除指定列的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier)        *   4).删除指定列的,时间戳 小于等于 给定时间戳的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier, long timestamp )        *   5).删除指定列族的所有列的 所有版本 数据:Delete addFamily (byte[] family)    默认使用当前时间的时间戳,时间戳大于当前时间的数据删除不掉        *   6).删除指定列族的所有列中 时间戳 小于等于 指定时间戳 的所有数据:Delete addFamily (byte[] family, long timestamp)        *   7).删除指定列族中 所有列的时间戳 等于 指定时间戳的版本数据:Delete addFamilyVersion (byte[] family, long timestamp)        */      deletaData.addColumn(columnFamily.getBytes(),column.getBytes())      //deletaData.addColumns(columnFamily.getBytes(),column.getBytes())      //deletaData.addFamily(columnFamily.getBytes())      hTable.delete(deletaData)      println("删除成功")      hTable.close()    }   // deleteDataHbaseTable("scTable3", "rk0002", "info")    def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={      val hbaseCOnf =  HBaseConfiguration.create()      val hTable = new HTable(hbaseCOnf, tableName)      val getData = new Get(rowkey.getBytes())      /**        *    1). Get addFamily(byte[] family) 指定希望获取的列族        *    2). Get addColumn(byte[] family, byte[] qualifier) 指定希望获取的列        *    3). Get setTimeRange(long minStamp, long maxStamp) 设置获取数据的 时间戳范围        *    4). Get setTimeStamp(long timestamp) 设置获取数据的时间戳        *    5). Get setMaxVersions(int maxVersions) 设定获取数据的版本数        *    6). Get setMaxVersions() 设定获取数据的所有版本        *    7). Get setFilter(Filter filter) 为Get对象添加过滤器        *    8). void setCacheBlocks(boolean cacheBlocks) 设置该Get获取的数据是否缓存在内存中        */      //getData.addFamily(columnFamily.getBytes())      //getData.addColumn(columnFamily.getBytes(), column.getBytes())      //getData.setTimeStamp("1535680422860".toLong)      getData.setMaxVersions()      val results = hTable.get(getData)      for (result <- results.rawCells()){        println( new String(CellUtil.cloneRow(result)) + "\t" +          new String(CellUtil.cloneFamily(result)) + "\t" +          new String(CellUtil.cloneQualifier(result)) + "\t" +          new String(CellUtil.cloneValue(result)) + "\t" +          result.getTimestamp)      }      hTable.close()    }    //getDataHbaseTable("scTable", "rk0002", "info", "age")    def scanDataHbaseTable(tableName:String, startRow:String, stopRow:String,                           columnFamily:String, column:String)={      val hBaseConf = HBaseConfiguration.create()      val hTable = new HTable(hBaseConf, tableName)      /**        *         1). 创建扫描所有行的Scan:Scan()        *   2). 创建Scan,从指定行开始扫描:Scan(byte[] startRow)        *   注意:如果指定行不存在,从下一个最近的行开始        *   3). 创建Scan,指定起止行:Scan(byte[] startRow, byte[] stopRow)        *   注意: startRow <= 结果集 < stopRow        *   4). 创建Scan,指定起始行和过滤器:Scan(byte[] startRow, Filter filter)        */      val scanData = new Scan()      val filter1 = new SingleColumnValueFilter(columnFamily.getBytes(), column.getBytes(), CompareOp.GREATER_OR_EQUAL, "60".getBytes() )      /**        * Scan setStartRow (byte[] startRow) 设置Scan的开始行,默认 结果集 包含该行。如果希望结果集不包含该行,可以在行键末尾加上0。        * Scan setStopRow (byte[] stopRow) 设置Scan的结束行,默认 结果集 不包含该行。如果希望结果集包含该行,可以在行键末尾加上0。        * Scan setBatch(int batch) 指定最多返回的Cell数目.用于防止一行中有过多的数据,导致OutofMemory错误        * Scan setTimeRange (long minStamp, long maxStamp) 扫描指定 时间范围 的数据        * Scan setTimeStamp (long timestamp) 扫描 指定时间 的数据        * Scan addColumn (byte[] family, byte[] qualifier) 指定扫描的列        * Scan addFamily (byte[] family) 指定扫描的列族        * Scan setFilter (Filter filter) 为Scan设置过滤器,详见HBase API Filter过滤器        * Scan setReversed (boolean reversed) 设置Scan的扫描顺序,默认是正向扫描(false),可以设置为逆向扫描(true)。注意:该方法0.98版本以后才可用!!        * Scan setMaxVersions () 获取所有版本的数据        * Scan setMaxVersions (int maxVersions) 设置获取的最大版本数! 不调用上下两个setMaxVersions() 方法,只返回最新版本数据        * void setCaching (int caching) 设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存        * void setRaw (boolean raw) 激活或者禁用raw模式。如果raw模式被激活,Scan将返回 所有已经被打上删除标记但尚未被真正删除 的数据。该功能仅用于激活了 KEEP_DELETED_ROWS的列族,即列族开启了 hcd.setKeepDeletedCells(true)        * Scan激活raw模式后,只能浏览所有的列,而不能指定任意的列,否则会报错        */      scanData.setFilter(filter1)      val resultsScan:ResultScanner = hTable.getScanner(scanData)      while (resultsScan.iterator().hasNext){        val results = resultsScan.iterator().next()        for (result:Cell <- results.rawCells()) {                    println(new String(CellUtil.cloneRow(result)) + "\t" +                      new String(CellUtil.cloneFamily(result)) + "\t" +                      new String(CellUtil.cloneQualifier(result)) + "\t" +                      new String(CellUtil.cloneValue(result)) + "\t" +                      result.getTimestamp)                  }      }      /**        * for 循环无法直接遍历 ResultScanner 暂无办法        *///      for(results:Result <- resultsScan){////        for (result:Cell <- results.rawCells()) {////          println(new String(CellUtil.cloneRow(result)) + "\t" +//            new String(CellUtil.cloneFamily(result)) + "\t" +//            new String(CellUtil.cloneQualifier(result)) + "\t" +//            new String(CellUtil.cloneValue(result)) + "\t" +//            result.getTimestamp)//        }////      }      hTable.close()    }    //scanDataHbaseTable("scTable", "rk0001", "rk0002", "info", "age")  }}

  

转载于:https://www.cnblogs.com/mlxx9527/p/9668733.html

你可能感兴趣的文章
Thread类常用方法
查看>>
Yarn大体框架和工作流程研究
查看>>
vue学习笔记(一)
查看>>
微软专家推荐11个Chrome 插件
查看>>
三天学会HTML5——SVG和Canvas的使用
查看>>
MySql基本操作(二)
查看>>
我的友情链接
查看>>
文件上传时几个Content-type
查看>>
我的友情链接
查看>>
Exchange Server 2013 集成Office Web App
查看>>
字节转换工具,在线字节转换工具
查看>>
实验心得
查看>>
mysql 生成行号
查看>>
Control your Thinkpad T430 fan speed in Ubuntu 12.
查看>>
【OSC手机App技术解析】- 在WebView中组装HTML
查看>>
转载 Linux 整合 AD 實戰:CentOS 7.0 整合 Active Directory 驗證管理
查看>>
Android应用升级,检测更新,下载,检验,安装
查看>>
Elasticsearch refresh vs. flush
查看>>
质量管理:测试基础架构图
查看>>
Windows Server 2008安装SQL Server 2008
查看>>