Implementing a Document Store

Although Riak wasn’t explicitly created as a document store, two features recently added to Riak—Riak Search and Riak Data Types—make it possible to use Riak as a highly scalable document store with rich querying capabilities. In this tutorial, we’ll build a basic implementation of a document store using Riak maps.

Basic Approach

Riak Search enables you to implement a document store in Riak in a variety of ways. You could, for example, store and query JSON objects or XML and then retrieve them later via Solr queries. In this tutorial, however, we will store data in Riak maps, index that data using Riak Search, and then run Solr queries against those stored objects.

You can think of these Search indexes as collections. Each indexed document will have an ID generated automatically by Search, and because we’re not interested in running normal key/value queries on these objects, we’ll allow Riak to assign keys automatically. This means that all we have to do is worry about the bucket type and/or bucket when storing objects.

Use Case

Let’s say that we’re building a WordPress-style CMS and storing blog posts in Riak. We will be storing the following information about each post:

  • Title
  • Author
  • Content (the body of the post)
  • Keywords associated with the post
  • Date posted
  • Whether the post has been published on the site

For each of those pieces of information, we’ll need to decide on (a) which Riak Data Type most directly corresponds and (b) which Solr type we want to associate with the info. It’s important to bear in mind that Riak Data Types can be indexed as a wide variety of things, e.g. registers as Solr text fields, sets as multi-valued datetimes, etc. The table below shows which Riak Data Type and Solr type we’ll be using for each field in our Riak maps.

Info Riak Data Type Solr type
Post title Register String
Post author Register String
Post content Register Text
Keywords Set Multi-valued string
Date posted Register Datetime
Whether the post is currently in draft form Flag Boolean

Before we start actually creating and storing blog posts, let’s set up Riak Search with an appropriate index and schema.

Creating a Schema and Index

In the documentation on search schemas, you’ll find a baseline schema to be used for creating custom schemas. We’ll use that baseline schema here and add the following fields to the <fields> list:

<field name="title_register"   type="string"   indexed="true" stored="true" />
<field name="author_register"  type="string"   indexed="true" stored="true" />
<field name="content_register" type="text"     indexed="true" stored="true" />
<field name="keywords_set"     type="string"   indexed="true" stored="true" multiValued="true" />
<field name="date_register"    type="datetime" indexed="true" stored="true" />
<field name="published_flag"   type="boolean"  indexed="true" stored="true" />

You can see the full schema on GitHub. Let’s store that schema in a file called blog_post_schema.xml and upload that schema to Riak:

import org.apache.commons.io.FileUtils;

File xml = new File("blog_post_schema.xml");
String xmlString = FileUtils.readFileToString(xml);
YokozunaSchema schema = new YokozunaSchema("blog_post_schema", xmlString);
StoreSchema storeSchemaOp = new StoreSchema.Builder(schema).build();
client.execute(storeSchemaOp);
schema_data = File.read('blog_post_schema.xml')
client.create_search_schema('blog_post_schema', schema_data)
$schema_string = file_get_contents('blog_post_schema.xml');
(new \Basho\Riak\Command\Builder\StoreSchema($riak))
  ->withName('blog_post_schema')
  ->withSchemaString($schema_string)
  ->build()
  ->execute();
xml_file = open('blog_post_schema.xml', 'r')
schema_data = xml_file.read()
client.create_search_schema('blog_post_schema', schema_data)
xml_file.close()
var schemaXml = File.ReadAllText("blog_post_schema.xml");
var schema = new SearchSchema("blog_post_schema", schemaXml);
var rslt = client.PutSearchSchema(schema);
/*
 * Full example here:
 *  https://github.com/basho/riak-nodejs-client-examples/blob/master/dev/search/document-store.js
 *
 */
var options = {
    schemaName: 'blog_post_schema',
    schema: schemaXml
};
client.storeSchema(options, function (err, rslt) {
    if (err) {
        throw new Error(err);
    }
});
{ok, SchemaData} = file:read_file("blog_post_schema.xml"),
riakc_pb_socket:create_search_schema(Pid, <<"blog_post_schema">>, SchemaData).
curl -XPUT $RIAK_HOST/search/schema/blog_post_schema \
     -H 'Content-Type: application/xml' \
     --data-binary @blog_post_schema.xml

With our schema uploaded, we can create an index called blog_posts and associate that index with our schema:

YokozunaIndex blogPostIndex = new YokozunaIndex("blog_posts", "blog_post_schema");
StoreIndex storeIndex = new StoreIndex.Builder(blogPostIndex).build();
client.execute(storeIndex);
client.create_search_index('blog_posts', 'blog_post_schema')
(new Command\Builder\Search\StoreIndex($riak))
  ->withName('blog_posts')
  ->usingSchema('blog_post_schema')
  ->build()
  ->execute();
client.create_search_index('blog_posts', 'blog_post_schema')
var idx = new SearchIndex("blog_posts", "blog_post_schema");
var rslt = client.PutSearchIndex(idx);
var options = {
    schemaName: 'blog_post_schema',
    indexName: 'blog_posts'
};
client.storeIndex(options, function (err, rslt) {
    if (err) {
        throw new Error(err);
    }
});
riakc_pb_socket:create_search_index(Pid, <<"blog_posts">>, <<"blog_post_schema">>, []).
curl -XPUT $RIAK_HOST/search/index/blog_posts \
     -H 'Content-Type: application/json' \
     -d '{"schema": "blog_post_schema"}'

How Collections will Work

Collections are not a concept that is native to Riak but we can easily mimic collections by thinking of a bucket type as a collection. When we associate a bucket type with a Riak Search index, all of the objects stored in any bucket of that bucket type will be queryable on the basis of that one index. For this tutorial, we’ll create a bucket type called cms and think of that as a collection. We could also restrict our blog_posts index to a single bucket just as easily and think of that as a queryable collection, but we will not do that in this tutorial.

The advantage of the bucket-type-based approach is that we could store blog posts from different blogs in different buckets and query them all at once as part of the same index. It depends on the use case at hand. In this tutorial, we’ll only be storing posts from one blog, which is called “Cat Pics Quarterly” and provides in-depth theoretical discussions of cat pics with a certain number of Reddit upvotes. All of the posts in this blog will be stored in the bucket cat_pics_quarterly.

First, let’s create our cms bucket type and associate it with the blog_posts index:

riak-admin bucket-type create cms \
  '{"props":{"datatype":"map","search_index":"blog_posts"}}'
riak-admin bucket-type activate cms

Now, any object stored in any bucket of the type cms will be indexed as part of our “collection.”

Storing Blog Posts as Maps

Now that we know how each element of a blog post can be translated into one of the Riak Data Types, we can create an interface in our application to serve as that translation layer. Using the method described in Data Modeling with Riak Data Types, we can construct a class that looks like this:

import java.util.Set;

public class BlogPost {
    private String title;
    private String author;
    private String content;
    private Set<String> keywords;
    private DateTime datePosted;
    private Boolean published;
    private static final String bucketType = "cms";

    private Location location;

    private RiakClient client;

    public BlogPost(RiakClient client
                    String bucketName,
                    String title,
                    String author,
                    String content,
                    Set<String> keywords,
                    DateTime datePosted,
                    Boolean published) {
      this.client = client;
      this.location = new Location(new Namespace(bucketType, bucketName), null);
      this.title = title;
      this.author = author;
      this.content = content;
      this.keywords = keywords;
      this.datePosted = datePosted;
      this.published = published;
    }

    public void store() throws Exception {
        RegisterUpdate titleUpdate = new RegisterUpdate(title);
        RegisterUpdate authorUpdate = new RegisterUpdate(author);
        RegisterUpdate contentUpdate = new RegisterUpdate(content);
        SetUpdate keywordsUpdate = new SetUpdate();
        for (String keyword : keywords) {
            keywordsUpdate.add(keyword);
        }
        RegisterUpdate dateUpdate =
            new RegisterUpdate(datePosted.toString("YYYY-MM-DD HH:MM"));
        if (published) {
            FlagUpdate published = new FlagUpdate(published);
        }
        FlagUpdate publishedUpdate = new FlagUpdate(published);
        MapUpdate mapUpdate = new MapUpdate()
            .update("title", titleUpdate)
            .update("author", authorUpdate)
            .update("content", contentUpdate)
            .update("keywords", keywordsUpdate)
            .update("date", dateUpdate)
            .update("published", publishedUpdate);
        UpdateMap storeBlogPost = new UpdateMap.Builder(location, mapUpdate)
            .build();
        client.execute(storeBlogPost);
    }
}
class BlogPost
  def initialize(bucket_name, title, author, content, keywords, date_posted, published)
    bucket = client.bucket_type('cms').bucket(bucket_name)
    map = Riak::Crdt::Map.new(bucket, nil)
    map.batch do |m|
      m.registers['title'] = title
      m.registers['author'] = author
      m.registers['content'] = content
      keywords.each do |k|
        m.sets['keywords'].add(k)
      end
      m.registers['date'] = date_posted
      if published
        m.flags['published'] = true
      end
  end
end
class BlogPost {
  private $title = '';
  private $author = '';
  private $content = '';
  private $keywords = [];
  private $datePosted = '';
  private $published = false;
  private $bucketType = "cms";

  private $bucket = null;

  private $riak = null;

  public function __construct(\Basho\Riak $riak, $bucket, $title, $author, $content, array $keywords, $date, $published)
  {
    this->riak = $riak;
    this->bucket = new Bucket($bucket, $this->bucketType);
    this->title = $title;
    this->author = $author;
    this->content = $content;
    this->keywords = $keywords;
    this->datePosted = $date;
    this->published = $published;
  }

  public function store()
  {
    $setBuilder = (new \Basho\Riak\Command\Builder\UpdateSet($this->riak));
      
    foreach($this->keywords as $keyword) {
      $setBuilder->add($keyword);
    }

    (new \Basho\Riak\Command\Builder\UpdateMap($this->riak))
      ->updateRegister('title', $this->title)
      ->updateRegister('author', $this->author)
      ->updateRegister('content', $this->content)
      ->updateRegister('date', $this->date)
      ->updateFlag('published', $this->published)
      ->updateSet('keywords', $setBuilder)
      ->withBucket($this->bucket)
      ->build()
      ->execute();
  }
}
from riak.datatypes import Map

class BlogPost:
    def __init__(bucket_name, title, author, content, keywords, date_posted, published):
        bucket = client.bucket_type('cms').bucket(bucket_name)
        map = Map(bucket, None)
        self.map.registers['title'].assign(title)
        self.map.registers['author'].assign(author)
        self.map.registers['content'].assign(content)
        for k in keywords:
            self.map.sets['keywords'].add(k)
        self.map.registers['date'] = date_posted
        if published:
            self.map.flags['published'].enable()
        self.map.store()
/*
 * Please see the code in the RiakClientExamples project:
 * https://github.com/basho/riak-dotnet-client/tree/develop/src/RiakClientExamples/Dev/Search
 */
/*
 * Please see the code in the examples repository:
 * https://github.com/basho/riak-nodejs-client-examples/blob/master/dev/search/
 */

Now, we can store some blog posts. We’ll start with just one:

Set<String> keywords = new HashSet<String>();
keywords.add("adorbs");
keywords.add("cheshire");

BlogPost post1 = new BlogPost(client, // client object
                              "cat_pics_quarterly", // bucket
                              "This one is so lulz!", // title
                              "Cat Stevens", // author
                              "Please check out these cat pics!", // content
                              keywords, // keywords
                              new DateTime(), // date posted
                              true); // published
try {
    post1.store();
} catch (Exception e) {
    System.out.println(e);
}
keywords = ['adorbs', 'cheshire']
date = Time.now.strftime('%Y-%m-%d %H:%M')
blog_post1 = BlogPost.new('cat_pics_quarterly',
                          'This one is so lulz!',
                          'Cat Stevens',
                          'Please check out these cat pics!',
                          keywords,
                          date,
                          true)
$keywords = ['adorbs', 'cheshire'];
$date = new \DateTime('now');

$post1 = new BlogPost(
  $riak, // client object
  'cat_pics_quarterly', // bucket
  'This one is so lulz!', // title
  'Cat Stevens', // author
  'Please check out these cat pics!', // content
  $keywords, // keywords
  $date, // date posted
  true // published
);
import datetime

keywords = ['adorbs', 'cheshire']
date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
blog_post1 = BlogPost('cat_pics_quarterly',
                      'This one is so lulz!',
                      'Cat Stevens',
                      'Please check out these cat pics!',
                      keywords,
                      date,
                      true)
var keywords = new HashSet<string> { "adorbs", "cheshire" };

var post = new BlogPost(
    "This one is so lulz!",
    "Cat Stevens",
    "Please check out these cat pics!",
    keywords,
    DateTime.Now,
    true);

var repo = new BlogPostRepository(client, "cat_pics_quarterly");
string id = repo.Save(post);
var post = new BlogPost(
    'This one is so lulz!',
    'Cat Stevens',
    'Please check out these cat pics!',
    [ 'adorbs', 'cheshire' ],
    new Date(),
    true
);

var repo = new BlogPostRepository(client, 'cat_pics_quarterly');

repo.save(post, function (err, rslt) {
    logger.info("key: '%s', model: '%s'", rslt.key, JSON.stringify(rslt.model));
});

Querying

Now that we have some blog posts stored in our “collection,” we can start querying for whatever we’d like. Let’s say that we want to find all blog posts with the keyword funny (after all, some cat pics are quite serious, and we may not want those).

String index = "blog_posts";
String query = "keywords_set:funny";

SearchOperation searchOp = new SearchOperation
    .Builder(BinaryValue.create(index), query)
    .build();
cluster.execute(searchOp);
List<Map<String, List<String>>> results = searchOp.get().getAllResults();
results = client.search('blog_posts', 'keywords_set:funny')
$response = (new \Basho\Riak\Command\Builder\Search\FetchObjects($riak))
  ->withIndexName('blog_posts')
  ->withQuery('keywords_set:funny')
  ->build()
  ->execute();
results = client.fulltext_search('blog_posts', 'keywords_set:funny')
var searchRequest = new RiakSearchRequest("blog_posts", "keywords_set:funny");
var rslt = client.Search(searchRequest);
var searchCmd = new Riak.Commands.YZ.Search.Builder()
    .withIndexName('blog_posts')
    .withQuery('keywords_set:funny')
    .withCallback(search_cb)
    .build();

client.execute(searchCmd);
curl "$RIAK_HOST/search/query/blog_posts?wt=json&q=keywords_set:funny"

Or we can find posts that contain the word furry:

String index = "blog_posts";
String query = "content_register:furry";

SearchOperation searchOp = new SearchOperation
    .Builder(BinaryValue.create(index), query)
    .build();
cluster.execute(searchOp);
List<Map<String, List<String>>> results = searchOp.get().getAllResults();
results = client.search('blog_posts', 'content_register:furry')
$response = (new \Basho\Riak\Command\Builder\Search\FetchObjects($riak))
  ->withIndexName('blog_posts')
  ->withQuery('content_register:furry')
  ->build()
  ->execute();
results = client.fulltext_search('blog_posts', 'content_register:furry')
var searchRequest = new RiakSearchRequest("blog_posts", "content_register:furry");
var rslt = client.Search(searchRequest);
var searchCmd = new Riak.Commands.YZ.Search.Builder()
    .withIndexName('blog_posts')
    .withQuery('content_register:furry')
    .withCallback(search_cb)
    .build();

client.execute(searchCmd);
curl "$RIAK_HOST/search/query/blog_posts?wt=json&q=content_register:furry"

Here are some more possible queries:

Info Query
Unpublished posts published_flag:false
Titles that begin with Loving* title_register:Loving*
Post bodies containing the words furry and jumping content_register:[furry AND jumping]