使用BulkLoad快速向HBase导入数据

背景

某些时候,我们在初始化HBase表的同时需要向其中快速导入大量的数据(比如搭建压力测试环境),如果我们通过调用HBase的PUT接口来插入,速度是十分慢的。这个时候我们可以采用BulkLoad的方式来快速导入数据。采用BulkLoad,有以下2个优势:

  1. BulkLoad不会写WAL,也不会产生flush以及split。
  2. 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用BulkLoad就不会有这个顾虑。

步骤

BulkLoad一共可以分为三步:数据生成,HFile转换以及HFile加载。下面这幅图(源于网络)生动地展现了各个步骤所执行的操作:

源数据生成

俗话说的好,“巧妇难为无米之炊”,BulkLoad的第一步就是生成源数据。假设我们现在需要往HBase导入10亿条用户登录记录,其中rowkey是用户id,value是用户登录时间戳。那么我们首先需要将这10亿条数据存入在某个文本文件中,并且每条数据最好都是固定的格式。我们假设文件为/home/test/data.txt并且每行的rowkey和用户登录时间都用空格隔开:

1
2
3
4
5
6
$ more /home/test/data.txt
1 1474800800
1 1474801700
2 1474800900
3 1474801500
...

然后我们将数据上传至hdfs中为下一步做准备,我们假设将data.txt上传至hdfs的/test目录下:

1
$ hadoop fs -put /home/data/test.txt /test

HFile转换

当拥有了原始数据后,我们需要将其转换成HFile(HBase的底层数据存储格式)。通常我们采用MapReduce来做转换。在map阶段,我们需要生成对应的rowKey以及Put对象,然后在接下来组装成HFile。如何将数据按照HFile的格式进行组装不需要我们关心,HFileOutputFormat2类会替我们解决。但是一般我们需要自己手动实现mapper类来实现数据的预处理以及转换。

我们还是接着刚才的例子,首先我们需要构建一个MapReduce job。值得一提的是,HBase提供了一个静态方法来方便我们对job进行配置,即HFileOutputFormat2类中的configureIncrementalLoad方法。然后在mapper类中,我们对每一行按照空格拆分成rowKey和value,然后组装成一个Put对象和rowKey一起输出。最后job若执行成功,则可以在hdfs下的/test/hfiles文件夹中查看具体结果。具体代码如下:

MapReduce的job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class HFileGenerator {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
Job job = Job.getInstance(conf, "HFile Generator");
job.setJarByClass(HFileGenerator.class);
// set mapper class
job.setMapperClass(TestHFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// set input and output path
String inputPath = "/test/data.txt";
String outputPath = "/test/hfiles";
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// other config
HFileOutputFormat2.configureIncrementalLoad(job, (HTable) connection.getTable("users_login"));
// begin
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Mapper类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static final byte[] FAMILY_BYTE = Bytes.toBytes("c");
private static final byte[] QUALIFIER_INDEX = Bytes.toBytes("time");
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 得到rowKey以及value,并转成字节
String line = value.toString();
String[] items = line.split(" ", -1);
long userId = Long.parseLong(items[0]);
long timestamp = Long.parseLong(items[1]);
byte[] userIdBytes = Bytes.toBytes(userId);
byte[] timestampBytes = Bytes.toBytes(timestamp);
// 生成Put对象
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(userIdBytes);
Put put = new Put(rowKey.copyBytes());
put.add(FAMILY_BYTE, QUALIFIER_INDEX, timestampBytes); // 测试的时候value和key相同
// 输出
context.write(rowKey, put);
}
}

除了采用MapReduce,如果原始数据已经是TSV格式的文件,我们还可以直接用importTsv命令生成HFile,具体使用方法请参考官方文档

HFile加载

一旦HFile转换完成,我们就可以将其加载至HBase集群中。加载的方式其实十分简单,就是将HFile移动至HBase对应的RegionServer的存储目录下,所以往往该操作执行地十分快。我们可以通过命令行调用completebulkload工具加载,也可以通过代码执行。这里我们采用代码的方式:

1
2
3
4
5
6
7
8
9
10
public class CompleteBulkLoad {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
String hFilePath = "hdfs://master:9000/test/hfiles";
HBaseConfiguration.addHbaseResources(conf);
LoadIncrementalHFiles loadFiles = new LoadIncrementalHFiles(conf);
loadFiles.doBulkLoad(new Path(hFilePath), (HTable) connection.getTable("users_login"));
}
}

加载完成后,我们可以去对应的表中查看数据是否加载成功。如果加载成功的话,原来转换好的HFile即/test/hfiles将会消失。

总结

BulkLoad将原来位于HBase集群的写入消耗挪到了Hadoop集群上,并且通过直接拷贝HFile的方式,极大地提高了数据的写入速度。BulkLoad通常用在HBase表的预初始化,增量数据定时导入以及数据迁移等涉及到大规模数据批量导入的场景。使用BulkLoad时,还有以下几个注意点:

  1. HFile转换成功后,我们磁盘中既存在着一份源数据,又存在着一份HFile。所以假设我们需要导入100G的数据,那么我们至少需要准备200G的磁盘空间。在HFile转换成功之后,我们就可以删去源数据以节省磁盘空间。
  2. 当你的HBase表已经在被其他应用程序写入,那么在HFile转换成功之后,尽量马上执行HFile加载操作。这是因为如果不及时加载HFile,此时可能因为其他应用程序的写入而导致region又出现了分裂。那么HFile在加载的时候又会按照当前region的划分再重新分裂一遍,而这个操作往往是十分低效而又耗时的。所以一般建议HFile的加载需要紧接着转换一起做。

参考资料

  1. Importing Data Into HBase
  2. 使用Bulk Load快速向HBase中导入数据
  3. MapReduce生成HFile入库到HBase
坚持原创技术分享,您的支持将鼓励我继续创作!