Hbase客户端编程

Hadoop 踩坑记(四)

Posted by Cheereus on May 24, 2020

环境

关于 Hbase 的安装与配置以及 Eclipse 的配置请参考前两篇文章

本系列选用的 hbase 版本为 1.4.13

本系列选用的 hadoop 版本为 2.8.5

请注意包名、服务器等个性化配置

引入jar包

需要将 Hbase 中与客户端相关的 jar 包引入到 Build Path

理论上只需要将 org.apache.hadoop.hbase.* 相关包引入即可,但实际操作中还是遇到了缺少情况,因此将 hbase 的 lib 目录下的所有 jar 包都引入了

这种粗暴的方法源于本人对于 java 开发的了解太少,经验丰富的朋友应该可以按需引入。

表的创建与删除

一个示例的 Student 表结构如下图所示

表1

代码如下

package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
public class HBaseTest {
//声明静态配置 HBaseConfiguration
   static Configuration cfg=HBaseConfiguration.create();
    //创建学生表
   public static void createStdTable() throws Exception {
   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
      //数据表名
    String tablename="Student";
    //列簇名列表
    String[] columnFamilys= new String[] {"Std","Course"};
    //建立连接
    Connection con = ConnectionFactory.createConnection(cfg);
    //获得Admin对象
    Admin admin = con.getAdmin();
    //获得表对象
    TableName tName  = TableName.valueOf(tablename);
    //判断表是否存在
        if (admin.tableExists(tName)) {
            System.out.println("table Exists!");
            System.exit(0);
        }
        else{
        HTableDescriptor tableDesc = new HTableDescriptor(tName);
        //添加列簇
        for(String cf:columnFamilys)
        {
          HColumnDescriptor cfDesc = new HColumnDescriptor(cf);
          if(cf.equals("Course"))//设置课程的最大历史版本
            cfDesc.setMaxVersions(3);
          tableDesc.addFamily(cfDesc);
        }
        //创建表
            admin.createTable(tableDesc);
            System.out.println("create table success!");
        }
        admin.close();
        con.close();
    }
   public static void  main (String [] agrs) throws Throwable {
      try {
        createStdTable();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
  }

}

其中

cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");

导入了 hbase-site.xml 文件中的服务器配置,理论上应该要将该文件引用为项目配置,但本人水平所限,没有做到,因此采用代码中手动配置的方法,后面的代码中也一样采用了此种方式。

运行上述代码,得到 create table success! 的提示即为创建成功

删除表的代码如下

package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

public class Delete {
//声明静态配置 HBaseConfiguration
   static Configuration cfg=HBaseConfiguration.create();
   public static void DeleteTable() throws Exception{
   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
   Connection con = ConnectionFactory.createConnection(cfg);
   //获得表对象
   TableName tablename = TableName.valueOf("Student");
    //获得Admin对象
   Admin admin = con.getAdmin();
   if(admin.tableExists(tablename)){
     try
       {
        admin.disableTable(tablename);
        admin.deleteTable(tablename);
       }catch(Exception ex){
         ex.printStackTrace();
       }
    }
 }
   public static void  main (String [] agrs) throws Throwable {
    try {
      DeleteTable();
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

表模式的修改

增加新的列

下面的代码会在表中新增列 Test

public static void  AddStdColFamily () throws Throwable {
    Connection con = ConnectionFactory.createConnection(cfg);
    //获得表对象
    TableName tablename = TableName.valueOf("Student");
    //获得Admin对象
    Admin admin = con.getAdmin();
    HColumnDescriptor newCol = new HColumnDescriptor("Test");
    newCol.setMaxVersions(3);
    if(admin.tableExists(tablename)){
      try
        {
         admin.disableTable(tablename);
         admin.addColumn(tablename, newCol);
        }catch(Exception ex){
          ex.printStackTrace();
       }
    }
    admin.enableTable(tablename);
    admin.close();
    con.close();
}

修改列簇属性

修改 Test 列簇的最大历史版本数为 5

public static void  ModifyStdColFamily () throws Throwable {
  Connection con = ConnectionFactory.createConnection(cfg);
  //获得表对象
  TableName tablename = TableName.valueOf("Student");
  //获得Admin对象
   Admin admin = con.getAdmin();
   HColumnDescriptor modCol = new HColumnDescriptor("Test");
   modCol.setMaxVersions(5);
   if(admin.tableExists(tablename)){
     try
     {
       admin.disableTable(tablename);
       admin.modifyColumn(tablename, modCol);
      }catch(Exception ex){
          ex.printStackTrace();
      }
     }
      admin.enableTable(tablename);
      admin.close();
      con.close();
}  

删除列

删除 Test 列

public static void  DeleteStdColFamily() throws Throwable {
   Connection con = ConnectionFactory.createConnection(cfg);
   //获得表对象
   TableName tablename = TableName.valueOf("Student");
   //获得Admin对象
   Admin admin = con.getAdmin();
 if(admin.tableExists(tablename)){
     try
     {
       admin.disableTable(tablename);
       admin.deleteColumn(tablename, Bytes.toBytes("Test"));
      }catch(Exception ex){
         ex.printStackTrace();
       }
    }
    admin.enableTable(tablename);
    admin.close();
    con.close();
}

在表中插入和修改数据(略)

与MapReduce集成

读取 hdfs 文件后写入数据表

hdfs 文件 Std.txt 内容为

200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56
200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66
200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36

这里需要注意的是,此文件不能有空行,否则读取数据时会报错

代码如下,Map 过程读取 Std.txt 文件中的每一行,然后将学生的学号设置为 key,学生的其它信息设置为 value 后,写出到中间结果。Reduce 过程负责将 Map 形成的中间结果写入到 HBase 的 Student 表中,故 Reduce 继承至TableReducer,在 main 函数中使用

package wit;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class StdHdfsToHBase {
  public static class HDFSMap extends Mapper<Object, Text, Text, Text> {
      //实现map函数,读取hdfs上的std.txt文件
    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
{
    //取出学生的学号为rowKey
    String stdRowKey = value.toString().split(",")[0];
     System.out.println(stdRowKey);
    //学号后面的学生信息为value
    String stdInfo =
value.toString().substring(stdRowKey.length()+1);
     System.out.println(stdInfo);
    context.write(new Text(stdRowKey), new Text(stdInfo));
}
   }
  public static class HDFSReducer extends TableReducer<Text, Text,
ImmutableBytesWritable>{
     @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
        Put put = new Put(key.getBytes());
        for (Text val : values) {
         String[] stdInfo = val.toString().split(",");
             put.addColumn("Std".getBytes(), "Name".getBytes(), 
stdInfo[0].getBytes());
             put.addColumn("Std".getBytes(), "gender".getBytes(), 
stdInfo[1].getBytes());
             put.addColumn("Std".getBytes(), "birth".getBytes(), 
stdInfo[2].getBytes());
             put.addColumn("Std".getBytes(), "dept".getBytes(), 
stdInfo[3].getBytes());
             put.addColumn("Course".getBytes(), "math".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[4])));
             put.addColumn("Course".getBytes(), "arts".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[5])));
             put.addColumn("Course".getBytes(), "phy".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[6])));
             //写入学生信息到HBase表
           context.write(new ImmutableBytesWritable(key.getBytes()), put); 
          }
    }
}
  public static void main(String[] args) throws
    IOException, ClassNotFoundException, InterruptedException {
      Configuration conf = HBaseConfiguration.create();
      conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw");
      Job job = Job.getInstance(conf, "StdHdfsToHBase");
      job.setJarByClass(StdHdfsToHBase.class);
      // 设置Map
      job.setMapperClass(HDFSMap.class); 
      job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);
          //设置Reducer
      TableMapReduceUtil.initTableReducerJob("Student",
      HDFSReducer.class, job);
          job.setOutputKeyClass(ImmutableBytesWritable.class);
          job.setOutputValueClass(Put.class);
      //设置std.txt的输入目录
      FileInputFormat.addInputPath(job, new
      Path("hdfs://hadoop1-ali:9000/input/std"));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}