Exploratory Data Analysis using Kiji and Hive

Exploratory Data Analysis using Kiji and Hive

When last we left off in Part 1 – Building a Hive Adapter, we discussed some of the challenges and implementation details for writing a Hive adapter for Kiji tables. Additionally the Kiji Music Tutorial uses KijiMR to generate recommendations of user’s song data based on an existing recommendation algorithm. But what if we don’t yet have a concrete idea about what kinds of useful models to apply to our data? Writing Java code and kicking off(and debugging) MapReduce jobs can be cumbersome.

So for this blog post we’ll start with the Kiji Music data loaded in a Kiji cluster(via the bulk importing portion of that tutorial). Now we can use the Kiji Hive Adapter to do some ad hoc analytics to learn more about this data set.

If you’re running this from a Bento Box, you can start a Hive shell with the Kiji Hive Adapter jar preloaded with the command

$ $KIJI_HOME/hive-adapter/bin/bento-hive.sh shell

This will start a Hive shell that looks something like this:

hive> 

From here we can enter HiveQL commands. As an example, from the previous post, we can create a view of the Kiji Music users table using the following CREATE EXTERNAL TABLE statement:

CREATE EXTERNAL TABLE users (
  user STRING,
  track_plays ARRAY<STRUCT<ts: TIMESTAMP, value: STRING>>,
  next_song_rec STRUCT<ts: TIMESTAMP, value: STRING>
)
STORED BY 'org.kiji.hive.KijiTableStorageHandler'
WITH SERDEPROPERTIES (
  'kiji.columns' = ':entity_id[0],info:track_plays,info:next_song_rec[0]'
)
TBLPROPERTIES (
  'kiji.table.uri' = 'kiji://.env/kiji_music/users'
);

Now that we’ve created a table within Hive, let’s try to poke around the data a bit to see if we can try to identify similar users with a heuristic based on their song play data.

As an example, we can find all users who ever listened to song-44 with the following query:

SELECT user FROM users WHERE array_contains(track_plays.value, "song-44");

Yielding:


user
user-47
user-40
user-46

Since our goal is to identify like-minded users based on their song play history, we’ll want to dive deeper into their song play data. This query generates a listing of each user and a list of all of the songs that they’ve played(this query returns a fair amount of raw results as this is basically a SELECT *).

SELECT user, track_plays.value FROM users;

Possibly more interesting is a list of users ordered by the number of total song plays. This could be useful for targeting the most active users within models.

SELECT user, size(track_plays) total_plays FROM users ORDER BY total_plays DESC;

The track_plays.value “column” generated by this statement returns a complex ARRAY container object of all the song plays per user. Relational databases would be more likely to have a single row per user-track_plays.timestamp instance, which also simplifies the reasoning about what the query is doing. To generate this we use the explode() UDF to create many rows from the array the LATERAL VIEW functionality of Hive to attach it with the user data:

SELECT user, tracks FROM users LATERAL VIEW explode(track_plays) tks AS tracks;

Running this query results in(... refers to ommitted results beyond 5):


user {ts, value}
user-41 {"ts":"2012-01-05 03:23:00","value":"song-41"}
user-41 {"ts":"2012-01-05 03:19:00","value":"song-45"}
user-41 {"ts":"2012-01-05 03:17:00","value":"song-46"}
user-41 {"ts":"2012-01-05 03:14:00","value":"song-49"}
user-41 {"ts":"2012-01-05 03:11:00","value":"song-49"}
...
...

To refine this a bit further, we can use a GROUP BY to aggregate the just on the song names and to get a count:

SELECT user, tracks.value, count(1) as count FROM users LATERAL VIEW explode(track_plays) tks AS tracks GROUP by user, tracks.value;

Running this query results in(... refers to ommitted results beyond 5):


user track_plays.value count
user-0 song-0 29
user-0 song-1 7
user-0 song-11 3
user-0 song-16 2
user-0 song-2 11
...
...
...

These results should be useful for later analysis. So let's save this query as a view within Hive to be able to reason about queries that use these results more easily:

CREATE VIEW usertracks AS SELECT user, tracks.value, count(1) as count FROM users LATERAL VIEW explode(track_plays) tks AS tracks GROUP by user, tracks.value;

As a first pass, let's define the similarity of users as the number of songs that they have played in common with no regard to count. To compute this across all users we can do a self-join on this usertracks view (we’ll save some effort by only computing this result when the left user comes before the right user since this is a symmetric operation.):

SELECT A.user, B.user, count(1) AS similarity FROM usertracks A JOIN usertracks B ON A.value = B.value WHERE A.user < B.user GROUP BY A.user, B.user ORDER BY similarity DESC LIMIT 10;

Running this query spawns 5 MapReduce jobs and results in the following(... refers to ommitted results beyond 5):


user1 user2 similarity
user-20 user-24 16
user-45 user-46 14
user-12 user-15 14
user-21 user-28 14
user-6 user-8 14
...
...
...


Note: We could probably do some query optimization here to save some recomputation here and in the subsequent queries, but this exercise is purely exploratory with ad-hoc queries.

There is lot of noise as many user-user combinations have 14 songs in common. Perhaps this analysis could produce better results by utilizing the ratio of the number of times that a particular song was played to the total number of instances of a song played by that user. To compute a similarity matrix for this, we create two views to track the total plays and to normalize the user track data that we are exploring above:

CREATE VIEW totalplays AS SELECT user, size(track_plays) total_plays FROM users ORDER BY total_plays DESC;
CREATE VIEW normalizedusertracks AS SELECT UT.user, UT.value, (UT.count/TP.total_plays) freq FROM usertracks UT JOIN totalplays TP on UT.user = TP.user;

Then we run a similar query summing the product of the frequency ratios of each song between users:

SELECT A.user, B.user, sum(A.freq * B.freq) AS similarity FROM normalizedusertracks A JOIN normalizedusertracks B ON A.value = B.value WHERE A.user < B.user GROUP BY A.user, B.user ORDER BY similarity DESC LIMIT 10;

Running this query spawns 9 MapReduce jobs and results in the following(... refers to ommitted results beyond 5):


user1 user2 similarity
user-32 user-38 0.12640845070422535
user-32 user-39 0.12614819350887935
user-38 user-39 0.1255175983436853
user-30 user-32 0.12370546810273404
user-30 user-38 0.12263655462184873
...
...
...

Looking at this list, we notice that user-30, user-32, user-38, and user-39 form a cluster as they share similar listening habits(which is to say that they listen to the same set of songs a lot). Looking into the specific track play data of these users could help us develop an actionable model that can be used in KijiExpress. We could investigate further and classify which other users might also fit into this cluster (for example, user-31 seems fairly close) and be able to recommend songs to listen to based on the preferences of that cluster.

The ability to do ad hoc analysis on this data is an extremely powerful tool. Exploratory analysis executed in Hive saves many cycles of writing and debugging MapReduce jobs within Java, eliminating many headaches and wasted person hours.

Happy Querying!