Bulk Importing in KijiMR

One of the most important considerations when using any kind of data store is how to get your data into the store. While abstraction layers like KijiSchema are useful for avoiding the need to understand the underlying implementation details of the HBase in the random access use case, bulk importing data often involves huge datasets that warrants special consideration from both a performance and an operational perspective. In addition, we’ll also walk through a real world use case of bulk importing some actual data.

While we have a variety of canned bulk importers described in the bulk importer userguide for parsing and importing various common file formats, there are other important job configuration parameters for how the imports are executed.

Option 1: Using the KijiSchema PUT API

In direct writing mode, KijiMR will create a MapReduce job that reads over the input and generates the desired put(s) associated with each entry in the data. These puts are directly applied to a Kiji table as the job progresses.

Benefits:

  • Ability to run this safely on an active cluster.
  • The data that has been loaded is available immediately while the job is in progress.
  • Drawbacks:

  • If this job fails partway for any reason, we’ll have partially loaded data.
  • This would involve generating writes for every entity, which may result in potentially significant write load on the cluster.
  • Since Kiji doesn’t yet support atomic writes to the same row(but this is coming soon), bulk importers will generate a write for each individual cell, even if they are associated with the same row. This would result in greater disk utilization.
  • Option 2: Bulk Loading using HFiles

    For bulk loading, Kiji can create a MapReduce job that reads over the input and generates HFiles containing the rows that can be loaded directly into HBase using their bulk load functionality. Once the job completes, the HFiles can be loaded into the Kiji table to allow them to be accessed.

    Benefits:

  • Atomicity for committing on the job. Either we get everything or we get nothing.
  • Orders of magnitude faster (see comparison below).
  • Drawbacks:

  • As a result of the ordering of the StoreFiles, this could trigger a compaction on running clusters, which could be painful on large clusters. See the commentary in HBASE-3404 for more information.
  • My Recommendation

    For a new cluster, the recommended practice is to bulk load via HFiles. This allows for the initial backlog of data to be imported quickly, while avoiding the compaction issues above(since there’s no data in the cluster). If this bulk import job is for an existing Kiji instance, using the direct puts will allow the existing cluster to continue to respond to requests while still accepting the new data.

    The Test Drive

    As part of my testing of this new functionality, I tried bulk importing some of the sample data from the Kaggle Blue Book for Bulldozers challenge. This data came in a CSV file with a header that contained 401,126 lines including the header. The header line contains the names of the 53 fields. The rest of this example assumes that you have a bento box and a Kiji instance installed.

    Specifying the layout to create a Kiji table with this many columns would have required a rather long-winded DDL. Since we need at least one row for each column family:qualifier, this would involve a lot of tedious copy and pasting. To accelerate this process, I wrote a little script: generate-ddl.sh that takes in said CSV file, parses the header, and auto-generates a default DDL assuming that every field is a string. Then the user could simply modify this generated DDL and produce the layout that they are looking for. Once we are happy with this, we can create the table using the kiji-schema-shell:

    ./generate-ddl.sh Train.csv > Train.ddl
    kiji-schema-shell --file=Train.ddl

    The CSVBulkImporter requires an import descriptor that defines the mapping from the source fields in the CSV to the destination Kiji columns. Being that this is also dependent on the fields, I’ve written another little script generate-import-descriptor.sh that takes in said CSV file, parses the header, and autogenerates a default import descriptor JSON file.

    ./generate-import-descriptor.sh Train.csv > Train.json
    hadoop fs -copyFromLocal Train.json /

    Finally we need to trim the header from the data file that we wish to bulk-import, and copy it over to HDFS so that the MapReduce job can get at it.

    tail -n +2 Train.csv > Train-no-header.csv
    hadoop fs -copyFromLocal Train-no-header.csv /

    With Option 1: Using the KijiSchema PUT API

    We can use Kiji to create a bulk importer job whose output is a kiji table(note the –output parameter).

    kiji bulk-import 
      -Dkiji.import.text.column.header_row=`head -1 Train.csv` 
      -Dkiji.import.text.input.descriptor.path=/Train.json 
      --importer=org.kiji.mapreduce.lib.bulkimport.CSVBulkImporter 
      --output="format=kiji table=kiji://.env/default/train nsplits=1" 
      --input="format=text file=/Train-no-header.csv"

    This took 78 minutes on a bento cluster running on my local machine, and consumed 3.2g disk space.

    With Option 2: Bulk loading via HFiles

    Alternately, we can use Kiji to create a bulk importer whose output is a kiji table. Note that the main difference here is that the –output parameter has changed to using the hfile format and that we specify the destination HFile. Note: we still need a table to know what layout the HFile should use.

    kiji bulk-import 
      -Dkiji.import.text.column.header_row=`head -1 data/Train.csv` 
      -Dkiji.import.text.input.descriptor.path=/Train.json 
      --importer=org.kiji.mapreduce.lib.bulkimport.CSVBulkImporter 
      --output="format=hfile table=kiji://.env/default/train nsplits=1 file=hdfs://localhost:8020/train.bulkload" 
      --input="format=text file=/Train-no-header.csv"

    Finally once these files get created, they can be bulk loaded with the bulk-load tool:

    kiji bulk-load 
      --hfile=hdfs://localhost:8020/train.bulkload/part-r-00000.hfile 
      --table=kiji://.env/default/train

    This took 2 minutes on a bento cluster running on my local machine, and consumed 483.4m disk space. This is vastly (50x) faster than the individual PUTs on my little laptop.

    Now all of this data has been loaded into Kiji to do whatever you might like for post processing!

    Performance Stats

    Method Processing Time (min.) Memory Usage
    Using KijiSchema PUTs 78 3.20 GB
    Bulk Loading using HFiles 2 0.47 GB

    Above are the results for bulk importing of data into uncompressed Kiji tables. As you can see there’s nearly a 40x difference between bulk import time, and a 6x difference in the disk utilization. Using HFiles to bulk import data is more performant than doing direct writes, but there are potential operational difficulties on a running cluster with the possibly of an (expensive) compaction looming when the data files are loaded.

    Addendum: Timestamps

    In Common Pitfallfalls of Timestamps in HBase, we discuss many of the potential pain points of dealing with timestamps within HBase. While in general we don’t recommend manually setting timestamps, in the case of the initial bulk importing of data, it’s often beneficial to backfill the timestamps based on the data to be imported. This way initial imported data can have the same behavior as newly added data in your application.