An Introduction to KijiScoring

Scoring is the act of applying a trained model to input data to produce an actionable result such as a categorization (fraud detection) or the next step in a pattern (retail product recommendation). Broadly interpreted, ‘scoring’ could be something as sophisticated as applying a trained linear regression model or as straightforward as extracting the domain from an email address. Readers familiar with KijiMR’s ‘producers’ will recognize them as a way to score a large number of entities in batch. However, what if we are only interested in scoring a single entity on demand, perhaps at read time as part of a web application? Launching a producer as a MapReduce job across an entire Kiji table could involve minutes of spin-up time.

KijiScoring is the project that makes these kinds of single-entity, real-time calculations possible using a technique we call ‘Freshening’. Freshening is the conditional application of a calculation to entity-centric data to produce score results on read (i.e. lazily) without launching an entire MapReduce job. As a trade-off for the computational savings and generating current results, freshening scores must be calculable on the time scale of real-time application performance. In order to load a web page with acceptable latency, the entire freshening process might be given as little as a few dozen milliseconds to compute a score.

In this blog post we’ll describe the components of KijiScoring’s real-time scoring process and give an outline of the steps involved.

Scoring Components

The process of scoring in Kiji consists of four cardinal components:

  1. A Kiji table with a column that will hold the scored result.
  2. A KijiProducer class, as defined by KijiMR. This is the piece of code actually responsible for generating the result. While producers generally run against many entities in a Kiji table when launched through MR, in KijiScoring they will run against one entity at a time, possibly leveraging key value stores (e.g. for things such as training weights).
  3. A FreshnessPolicy instance. These are responsible for deciding whether or not to run scoring on a per request basis. KijiScoring comes with a few stock policies with simple strategies such as ‘always freshen’ or ‘freshen any value which is older than X milliseconds’. Users may write their own, more complicated policies in Java (e.g. ‘freshen values if the customer represented by this entity has bought a product since our last freshening’).
  4. A FreshKijiTableReader. This has an API like a regular KijiTableReader, but will check for registered Producers and FreshnessPolicies on any column requested through it and will run freshening if appropriate. In the case of a timeout, stale data will be returned.

Below is a diagram showing two requests issued through a FreshKijiTableReader. In the top request, the freshness policy determines that the entity’s data is stale and runs a Producer to generate fresh values from the trained Model is necessary. KijiScoring executes the Producer against the requested entity, writes the result into the table, and then returns the updated data for the request. In the bottom request, the freshness policy indicates that the data for this entity is fresh. Running the Producer is unnecessary and the existing data in the table is returned directly.

Two requests through KijiScoring

Using KijiScoring then means registering a KijiProducer and a FreshnessPolicy with a column of a Kiji table and then accessing that data via a FreshKijiTableReader. To make this a little more concrete, we will show an example of a FreshnessPolicy and how to install it below:

Sample FreshnessPolicy Class

This is a slightly modified form of the ShelfLife FreshnessPolicy included in KijiScoring, which runs scoring if an entity’s data hasn’t been updated in a configured amount of time. It has been abridged and modified from its original form to better highlight the relevant sections.

The salient methods in this class are its isFresh() method, whose return value determines whether or not to launch a Producer to calculate a new score for this Entity, and the the serialize()/deserialize() methods, responsible for saving and restoring the Policy’s state. In the case of ShelfLife, the isFresh() method compares the timestamp of the data currently stored in the KijiTable against the current time minus the ‘shelf life’. Its serialize()/deserialize() methods simply manipulate a JSON representation of the shelf life in milliseconds.

/**
 * A stock {@link org.kiji.scoring.KijiFreshnessPolicy} which returns fresh 
 * if requested data was modified within a specified number of milliseconds of the 
 * current time.
 *
 * <p>We only show a modified excerpt of this class here, to help illustrate how this
 * FreshnessPolicys work. Consult the kiji-scoring project on github for the full
 * class.</p>
 */
public final class ShelfLife implements KijiFreshnessPolicy {
  private long mShelfLifeMillis = -1;

  /**
   * Default empty constructor for automatic construction. This is for reflection
   * utils that create this class at runtime. Users should use
   * {@link #ShelfLife(long)} instead.
   */
  public ShelfLife() {}

  // …

  /**
   * Constructor which initializes all state.  No call to 
   * {@link #deserialize(String)} is necessary.
   *
   * @param shelfLife the age in milliseconds beyond which data becomes stale.
   */
  public ShelfLife(long shelfLife) {
    if (shelfLife < 0) {
      throw new IllegalArgumentException(
          "Shelf life must be a positive number of milliseconds.");
    }
    mShelfLifeMillis = shelfLife;
  }

  /** {@inheritDoc} */
  @Override
  public boolean isFresh(KijiRowData rowData, PolicyContext policyContext) {
    // This method is where the actual work takes place. Given a KijiRowData for
    // an entity, is it 'stale'?
    final KijiColumnName columnName = policyContext.getAttachedColumn();
    if (mShelfLifeMillis == -1) {
      throw new RuntimeException(
          "Shelf life not set.  Did you call ShelfLife.deserialize()?");
    }
    if (columnName == null) {
      throw new RuntimeException("Target column was not set in the PolicyContext.");
    }

    // If the column does not exist in the row data, it is not fresh.
    if (!rowData.containsColumn(columnName.getFamily(), columnName.getQualifier())) {
      return false;
    }
    NavigableSet<Long> timestamps =
        rowData.getTimestamps(columnName.getFamily(), columnName.getQualifier());
    // If there are no values in the column in the row data, it is not fresh.
    // If there are values but the newest is more than mShelfLifeMillis old, it is
    // not fresh.
    return !timestamps.isEmpty()
        && System.currentTimeMillis() - timestamps.first() <= mShelfLifeMillis;
  }

  // …

  /** {@inheritDoc} */
  @Override
  public String serialize() {
    // This method serializes the state of this policy to a string.
    // Custom freshness policies with persistent settings should implement
    // this and deserialize() below.
    //
    // The only required state for this policy is the shelf life duration.
    final JsonObject jsonObject = new JsonObject();
    jsonObject.add("shelfLife", new JsonPrimitive(mShelfLifeMillis));

    return jsonObject.toString();
  }

  /** {@inheritDoc} */
  @Override
  public void deserialize(String policyState) {
    // Complements serialize() above. Restores state from a JSON representation.
    final JsonParser parser = new JsonParser();
    final JsonObject jsonObject = (JsonObject) parser.parse(policyState);
    // Load the shelf life from the policy state.
    mShelfLifeMillis = jsonObject.get("shelfLife").getAsLong();
  }
}

Command Line Interface for installing freshness policies and producers

To register a FreshnessPolicy and Producer on a kiji table, KijiScoring includes a ‘fresh’ command-line tool. Here’s an example of installing the above ShelfLife freshness policy with a shelf life of 1 hour (360000 = 60*60*1000 milliseconds) and a producer named ProductRecommendationProducer.

kiji fresh 
    kiji://.env/myInstance/users/derived:recommendations 
    --do=register 
    org.kiji.scoring.lib.ShelfLife {“shelfLife”:3600000} 
    com.mycompany.freshening.ProductRecommendationProducer

If there is a freshness policy attached to derived:recommendations in users, you will be prompted to continue and overwrite or to cancel your registration. If you continue, you will see the following message:

Freshness policy: org.kiji.scoring.lib.ShelfLife with state {“shelfLife”:3600000} and producer: com.mycompany.freshening.ProductRecommendationProducer
attached to column: derived:recommendations in table: users

Now that the you have stored a freshness policy in your meta-table, you can construct a FreshKijiTableReader to use it. Here’s a code snippet along those lines:

// Open a FreshKijiTableReader for the table with a timeout of 200 milliseconds.
final FreshKijiTableReader freshReader = FreshKijiTableReaderBuilder.create()
        .withTable(userTable)
        .withTimeout(200)
        .build();

// EntityId for the row we want.
final EntityId eid = table.getEntityId("foo");
// Create a data request for the desired column.
final KijiDataRequest request = KijiDataRequest.create("derived", "recommendations");

Product value = freshReader.get(eid, request)
    .getMostRecentValue("derived", "recommendations");

This is just a simple example. For more information, consult the JavaDoc for KijiScoring and the kiji-scoring GitHub project.

What comes next?

KijiScoring is currently home to a local implementation of a freshening service accessed via a specialized KijiTableReader. In the coming months we plan to add support for remotely executed freshening as well as integration with a model repository that houses KijiExpress trained models to make models readily available to a scoring application without the need for painful classpath management. Also, we’re exploring interfacing KijiScoring with KijiRest to let clients of the Kiji REST server also access real-time scored models.

Expect more blog posts on these developments in the future.