Setting up 10 node Riak cluster on a single machine

When trying to evaluate NoSQL databases, its usually better to try them out. While trying them out, its better to use them with multiple node configurations instead of running single node. Such as clusters in Riak or Replica-set in mongodb maybe even a sharded setup. On our project we evaluated a 10 node Riak cluster so that we could experiment with N, R and W values and decide which values where optimal for us. In Riak here is what N, R and W mean

    N = Number of Riak nodes to which data will be replicated
    R = Number of Riak nodes which have to return results for the read to be considered successful
    W = Number of Riak nodes which have to return a write success before the write is considered successful

These N,R and W settings provide us the ability to tune our CAP requirements, thus they need to be carefully considered when architecting the system. What better way is there to test the assumptions we make than trying the assumptions out with some code and running Riak nodes. To experiment with our assumptions, we built a script that will download Riak 1.3.1 and create 10 nodes with different ports for pb_port, http port, handoff_port by changing them in app.config and -name in vm.args file for each node.

The first node is used as a master node to which all other nodes join after they are started using the riak-admin cluster join master_node_name@127.0.0.1 command to join the cluster. When all the nodes have been started, we can look at the cluster plan or configuration using riak-admin cluster plan and then commit the cluster plan using riak-admin cluster commit. This commits the cluster changes and makes all the nodes part of a cluster, we can view the status of the cluster using riak-admin status or see the nodes of the cluster using riak-admin status | grep 'ring_members'.

The shell script to set up 10 node Riak cluster.

#!/bin/bash -e
working_dir=`pwd`
master_node_id='1'
node_directory='riak-node'
node_name_prefix='node'
master_node_name=$node_name_prefix$master_node_id
echo "Checking if Riak is downloaded"
if [ ! -f "riak-1.3.*.tar" ]; then
  rm -rf riak-1.3.*.tar
  wget http://s3.amazonaws.com/downloads.basho.com/riak/1.3/1.3.1/osx/10.6/riak-1.3.1-osx-x86_64.tar.gz
fi
echo "Unzip and Set up Riak"
gunzip riak-1.3.1-osx-x86_64.tar.gz 
tar -xf riak-1.3.1-osx-x86_64.tar
echo "Setting up 10 nodes"
for i in {1..10}
do
  protocolbuffer_port=`expr 8000 + $i`
  http_port=`expr 8100 + $i`
  handoff_port=`expr 8200 + $i`
  cd $working_dir
  nodeid=$node_directory$i
  cp -r riak-1.3.1 $nodeid
  sed -e s/8087/$protocolbuffer_port/g -i '' $nodeid/etc/app.config
  sed -e s/8098/$http_port/g -i '' $nodeid/etc/app.config
  sed -e s/8099/$handoff_port/g -i '' $nodeid/etc/app.config
  sed -e s/riak@/$node_name_prefix$i@/g -i '' $nodeid/etc/vm.args
  cd $nodeid/bin
  ./riak start
  if [ $i -ne $master_node_id ]; then
    ./riak-admin cluster join $master_node_name@127.0.0.1
  fi	
done
cd $working_dir/$node_directory$master_node_id/bin/
./riak-admin cluster plan
./riak-admin cluster commit
./riak-admin status | grep 'node.@'
echo "10 node Riak cluster setup"

*These shell scripts works on my mac and has not been tested on anything other than OSX-10.7+

Once experimenting, prototyping with the Riak cluster is done, we can gracefully shutdown the riak cluster and clean up all the folders using this shell script

#!/bin/bash -e
working_dir=`pwd`
master_node_id='1'
node_directory='riak-node'
echo "Cleaning up riak cluster"
for i in {1..10}
do
  echo "Shutdown node:"$i
  nodeid=$node_directory$i
  cd $working_dir
  cd $nodeid/bin
  ./riak stop
done
cd $working_dir
./riak-node1/erts-5.9.1/bin/epmd -kill
rm -rf riak-node*
rm *.tar
rm -rf riak-1.3.1

These scripts and approach can be modified to work across platforms

When using Groovy with Spring framework,  interacting with the database can be done using the Groovy.SQL class which provides a easy to use interface. When using Groovy.SQL, if we have a need to do transactions, we have the .withTransaction method that accepts a closure, to which we can pass in code to execute within the transaction.

In our project since we were using spring already, using annotations to define transactions would be a great. Standard @Transactional annotations with Groovy.SQL will not work, since every place where the Groovy.SQL is used a new connection is acquired from the connection pool causing the database work to span multiple connections, which can result in dead-locks on the database. What we really want is that the database connection be the same across all invocations of Groovy.SQL with-in the same transaction started by the annotated method.

This series of spring configuration helped us get the right behavior.

Setup annotations for spring using.

<mvc:annotation-driven/>

Setup connection pooling using the the BoneCP connection pool library, note that we have set autoCommit to false, since autoCommit is true by default.

<bean id="datasource" class="com.jolbox.bonecp.BoneCPDataSource" 
destroy-method="close">
<property name="driverClass" value="${db.driver}"/>
<property name="jdbcUrl" value="${db.url}"/>
<property name="username" value="${db.user}"/>
<property name="password" value="${db.passwd}"/>
<property name="defaultAutoCommit" value="false"/>
</bean>

 Setup annotation driven transaction manager and ensure that proxy-target-class is true

<tx:annotation-driven transaction-manager="txManager" 
proxy-target-class="true"/>

Next setup TransactionAwareDataSourceProxy using the datasource, previously defined. The transactionAwareDataSourceProxy is named datasourceProxy here

<bean id="datasourceProxy" 
class="org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy">
<constructor-arg ref="datasource"/>
</bean>

Next pass the datasourceProxy to the DataSourceTransactionManager

<bean id="txManager" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="datasourceProxy"/>
</bean>

This datasourceProxy can now be used to setup Groovy.SQL.

<bean id="sql" class="groovy.sql.Sql">
<constructor-arg ref="datasourceProxy"/>
</bean>

This sql bean can now be autowired inside any class to be used by read or write methods and with annotations so that we can have transactions.

class ProductGateway {
@Autowired SqlUtil sql List findBy(productId) {
def productRows = sql.rows(....)
// retrieve and return list of products
// handle exceptions
} @Transactional
int create(product) {
// Create rows, update data under transaction
sql.executeInsert(...)
sql.executeInsert(...)
// handle exceptions }
}

Backup in mongodb replica-set configurations

There are multiple ways to take backups of mongodb is different configuraitions, one of the configuration that I have been involved recently is replica-sets. When mongodb is running in replica-set configuration, there is a single primary node and multiple secondary nodes. To take backup of the replica-set we can either do a mongodump of one of the nodes or shutdown one of the secondary nodes and take file copies, since in a replica-set all nodes have the same data (except arbiter). Lets see how we could deal with mongodump method of taking backup.

Sync the primary node so that all writes are flused to disk and lock the database for writes, doing a fsync allows for all writes to be persisted to the disk.

use admin
db.fsyncLock()

now we can issue the mongodump command, there are many more options for mongodump that can be changed, using defaults here

mongodump -h node_name  --out /data/backups/backup_file_name

once the mongodump command is done we can unlock the database so that writes can be issued.

use admin
db.fsyncUnlock()

The downside of this approach is that the primary is not available for writes, but reads are fine. If by any chance there is write issued, all reads are also blocked after that, which is pretty drastic. The other option is to operate on one of the secondaries, this allows us to keep the primary available for write and read, the secondary can be used for backup purposes. Which node is PRIMARY or SECONDARY can be dynamically determined by running some javascript on the command line

#!/usr/bin/env ruby
require 'rubygems'
require 'json'
mongo_nodes = JSON.parse `mongo node_name --quiet 
              --eval "printjson(rs.status().members.map(
                        function(m) { return 
                       {'name':m.name, 'stateStr':m.stateStr} }))"`
primary_node = mongo_nodes.detect { 
                   |member| member['stateStr'] == 'PRIMARY'
                    }

This dynamic script allows us to find the node that we want to take backup from, either the primary or secondary.

MSSQL JDBC Driver behavior

My latest project involves talking to MS-SQL Server using the JDBC driver and Java. While doing this we setup the database connection and had a simple SQL to get the firstname and lastname for a unique userid from the applicationuser table in the database.

SELECT first_name,last_name 
FROM application_user 
WHERE user_id = ?

Given the above SQL, we did not think too much about performance as the user_id was indexed. The java code as below was used to run the SQL.

Connection conn = db.conn(DATABASE_URL);
try {
    PreparedStatement stmt = prepare(conn,
            "SELECT first_name, last_name " +
            "FROM application_user " +
            "WHERE user_id = ?");
    stmt.setString(1, username);
    ResultSet resultSet = stmt.executeQuery();
    return extractResults(resultSet);
} catch (SQLException e) {
    e.printStackTrace();
}
return null;

When writing integration tests we started noticing that the SQL was taking about 6 seconds to execute. The same SQL would execute inside 100 milliseconds on the MSSQL query analyzer. The friendly DBA’s on our team pointed out that the SQL was doing some data type conversion as the user_id field was of the type VARCHAR but the SQL sent by the the JDBC driver set the data type to NVARCHAR because of this the index was not being used and the SQL took more than 6 seconds to execute. Researching this topic further we decided to cast the variable to VARCHAR as shown below.

Connection conn = db.conn(DATABASE_URL);
try {
    PreparedStatement stmt = prepare(conn,
            "SELECT first_name, last_name " +
            "FROM application_user " +
            "WHERE user_id = cast(? AS VARCHAR");
    stmt.setString(1, username);
    ResultSet resultSet = stmt.executeQuery();
    return extractResults(resultSet);
} catch (SQLException e) {
    e.printStackTrace();
}
return null;

The above code executed under 100milliseconds and showed us that the data types being used did not match the datatype in the database. We later found out that the MS-SQL JDBC driver does this to properly deal with unicode characters. This behavior can be turned off using the sendStringParametersAsUnicode flag on the database connection. Once this flag is set to false on the connection, then all the SQL we issue do not need the cast

Connection conn = db.conn(DATABASE_URL
                + ";sendStringParametersAsUnicode=false");
try {
    PreparedStatement stmt = prepare(conn,
            "SELECT first_name, last_name " +
            "FROM application_user " +
            "WHERE user_id = ? ");
    stmt.setString(1, username);
    ResultSet resultSet = stmt.executeQuery();
    return extractResults(resultSet);
} catch (SQLException e) {
    e.printStackTrace();
}
return null;

Off course this only works if there is no unicode data in your database if there is any unicode data in the database, we will have to revert to casting individual SQL statements.

Back to blogging

There has been a long pause in my blogging activity. I was trying to finish of my latest writing engagement in regards to NoSQL. Working with Martin Fowler on NoSQL Distilled was really fun and will provide a concise text and easy way to understand for everyone the rise of the NoSQL movement and help with what kinds of trade-offs need to be made while working with NoSQL.
The book should soon be in print and e-book formats. Martin has written more about it here

With so much pain, why are stored procedures used so much

I keep encountering situations where all the business logic for the applications is in stored procedures and the application layer is just calling the stored procedures to get the work done and return the data. There are many problems with this approach some of them are.
  • Writing stored procedure code is fraught with danger as there are no modern IDE's that support refactoring, provide code smells like "variable not used", "variable out of scope".
  • Finding usages of a given stored procedure or function usually means doing a text search of the whole code base for the name of the function or stored procedure, so refactoring to change name is painful, which means names that do not make any sense are propagated, causing pain and loss of developer productivity
  • When coding of stored procedures is done, you need a database to compile the code, this usually means a large database install on your desktop or laptop the other option being to connect to the central database server, again this leads to developers having to carry a lot of dependent systems just to compile their code, this can to solved by database vendors providing a way to compile the code outside of the database.
  • Code complexity tools, PMD metrics, Checkstyle etc type of tools are very rare to find for stored procedures, thus making the visualization of metrics around the stored procedure code almost impossible or very hard
  • Unit testing stored procedures using *Unit testing frameworks out there like pl/sql unit, ounit, tsql unit is hard, since these frameworks need to be run inside the database and integrating them with Continuous Integration further exasperates the problems
  • Order or creation of stored procedures becomes important as you start creating lots of stored procedures and they become interdependent. While creating them in a brand new database, there are false notifications thrown around about missing stored procedures, usually to get around this problem, I have seen a master list of ordered stored procedures for creation maintained by the team or just recompile all stored procedures once they are created "ALTER RECOMPILE" was built for this. Both of these solutions have their own overhead.
  • While running CPU intensive stored procedures, the database engine is the only machine (like JVM) available for the code to run, so if you want to start more processes so that we can handle more requests, its not possible without a database engine. So the only solution left is to get a bigger box (Vertical Scaling)
There certainly are lots of other problems associated with using stored procedures, which I will not get into.

Replica sets in MongoDB

Replica sets is a feature of MongoDB for Automatic Failover, in this setup there is a primary server and the rest are secondary servers. If the primary server goes down, the rest of the secondary servers choose a new primary via an election process, each server can also be assigned number of votes, so that you can decide the next primary based on data-center location, machine properties etc, you can also start mongo database processes that act only as election tie-breakers these are known as arbiters, these arbiters will never have data, but just act as agents that break the tie.

All operations are directed at the primary server, the primary server writes the operations to its operation log (also known as opslog), the secondary servers get updates from the primary server. The data is written to the primary server and later replicated to the other secondary servers, so when the write happens at the primary and before the write is replicated to the secondary servers, if the primary server goes down you will loose the data that was written to the primary but never replicated to the secondary servers, you can get around this by specifying how many servers should have the data, before the write is considered good

db.runCommand( { getlasterror : 1 , w : 3 } )
in the above command, you are saying that the write to the database is considered good, only if the write has been propagated to at least 3 servers, off course doing this for every write is going to be very expensive, so you should batch all your writes for a user action and then issue getlasterror

This is how you start the the mongod servers, in a replica set, the can run on any machine any port, as long as they can talk to each other over the network and all of them have the same "--replSet" parameter, in the example below its "prod"

mongod --replSet prod --port 27017 --dbpath /data/node1 
mongod --replSet prod --port 27027 --dbpath /data/node2 
mongod --replSet prod --port 27037 --dbpath /data/node3 

Once the three servers are up, you have to create a replica configuration as shown below, if you use localhost as a server name, then all the members of the replica set have to be on localhost, if the mongo servers are on different servers, you should use distinct machine names and not localhost for anyone of them, once the replica config is defined, you then initiate the replica using the configuration as shown below

replica_config = {_id: 'prod', members: [
                          {_id: 0, host: 'localhost:27017'},
                          {_id: 1, host: 'localhost:27027'},
                          {_id: 2, host: 'localhost:27037'}]}
#Now initiate the replica_config
rs.initiate(replica_config);
When you are connecting to a replica set, you have to connect to atleast one server which is alive, using the ruby driver you can connect to more than one server using the "multi" method, one part you should be careful about is, lets say you define all the servers in the replica set as your connection string, but one of the members of the replica set is down, you will get connection failures, so the best thing to do is give members of the replica set that are up and the drivers will discover the other servers when they come online or go offline. Here is a sample ruby program to find a doc in a loop.
#!/usr/bin/env ruby
require 'mongo'
begin
  @connection = Mongo::Connection.multi([
                                       ['localhost',27017],
                                       ['localhost',27027],
                                       ['localhost',27037]])
  @collection = @connection.db("sales").collection("products")
  product = { "name" => "Refactoring", 
              "code" => "023XX3",
              "type" => "book", 
              "in_stock" => 100}
  @collection.insert(product)
  100.times do
    sleep 0.5
    begin
  	  product = @collection.find_one "code" => "023XX3"
  	  puts "Found Book: "+product["name"]
  	rescue Exception => e
    	puts e.message
    	next
  	end
  end
end
While the ruby program is running, you can kill the current primary and you will see that the program gets connection exceptions, while the replica set is figuring out the next master, once the next master is picked, the program starts going about its way finding the same data from the newly elected primary, here is a screen cast of the replica sets in action. Replica Sets screencast

Schema less databases and its ramifications.

In the No-SQL land schema-less is a power full feature that is advertised a lot, schema-less basically means you don't have to worry about column names and table names in a traditional sense, if you want to change the column name you just start saving the data using the new column name Lets say you have a document database like mongoDB and you have JSON document as shown below.

{  "_id":"4bc9157e201f254d204226bf",
   "FIRST_NAME":"JOHN",
   "MIDDLE_NAME":"D",
   "LAST_NAME":"DOE",
   "CREATED":"2010-10-12"
}

You have some corresponding code to read the documents from the database and lets say you lots of data in the database in the order of millions of documents. If you want to change the name of some attributes or columns at this point and the new JSON would look like

{  "_id":"4bc9157e201f254d204226bf",
   "first_name":"JOHN",
   "middle_name":"D",
   "last_name":"DOE",
   "created":"2010-10-12"
}

You will have to either change every document in the database to match the new attribute names or you have to make sure you code can handle both types of attribute names like

   first_name = doc["first_name"] 
   first_name = doc["FIRST_NAME"] unless !first_name.nil? 
   middle_name = doc["middle_name"]  
   middle_name = doc["MIDDLE_NAME"] unless !middle_name.nil?
   last_name = doc["last_name"]
   last_name = doc["LAST_NAME"] unless !last_name.nil?

This attribute name change also affects the indexes created on mongoDB, since the attribute name change is not across all the documents, an Index created on

   db.people.ensureIndex({first_name:1})

will not index documents where the attribute name is FIRST_NAME, so you have to create another index for this new attribute name

   db.people.ensureIndex({FIRST_NAME:1})

As you can see this gets really complicated if you do multiple refactorings, over a period of time. So when you hear schema less make sure you understand the ramifications of refactoring the attribute names at will and its effect on the code base and the database.

Effective use of data for better customer experience.

For more than seven years I have been getting offers for credit cards from Airlines and Banks. One particular bank has been sending me these solicitations for more than seven years. That is 12 mailings per year, more than 72 mailings so far, remember these are physical paper mailings not the electronic kind. I don't like the junk, it hurts the environment and worst of all I think its not good use of the data they have. How hard is it to design a system around the data they have.

Lets say they have a table of all the targeted customers they want to send a credit card applications to, why not have a attribute on the table for counting how many times the solicitation was sent, or they can even have the date the first solicitation was sent.

Customer
    Name
    Address
    City
    FirstSolicitationSent

or

Customer
    Name
    Address
    City
    Solicitations

So they could say if the days between today and the firstSolicitationSent is more than 90 days, then not send another solicitation, or if this number so solicitations is more than three do not send another solicitation.

This allows them to not send solicitations for years and ultimately loose the customer, I understand the argument of the customer needing time to react to the solicitation, but seven years of trying to convert a prospect is pure waste of time and effort. The data available can be used in better ways.

Schema design in a document database

We are using MongoDB on our project, since mongo is document store, schema design is somewhat different, when you are using traditional RDBMS data stores, one thinks about tables and rows, while using a document database you have to think about the schema in a some what different way. Lets say, we want to save a customer object, when using a RDBMS we would come up with Customer, Address, Phone, Email. They are related to each other as shown below. customer.jpg When doing a document database, the schema design actually does not change much, the Customer document contains an array of Addresses, a one to many relationship. You will not need the FK columns or the Primary Key columns on the child tables, since the child rows are embedded in the parent object. The JSON object below shows how the data would look.
{
"_id" : ObjectId("4bd8ae97c47016442af4a580"),
"customerid" : 99999,
"name" : "Foo Sushi Inc",
"type" : "Good",
"since" : "12/12/2001",
"addresses" : [{
		"address" : "4821 Big Street",
		"city" : "Stone",			
		"state" : "IL",
		"country" : "USA"
	},
	{	"address" : "1248 Barlow Ln",
		"city" : "Hedgestone",			
		"country" : "UK"
	}		
],
"emails" : [ 
	{"email" : "foousa@sushi.com"},
	{"email" : "foouk@sushi.com"}
],
"phones" : [ 
	{"phone" : "773-7777-7777"},
	{"phone" : "020-6666-6666"}
]
}
So Instead of 1 Row for customer, 2 rows for address, phone and email each, you get one Customer document. If you want to query for customers in USA. Using RDBMS you would do
SELECT customer.name FROM customer, address 
WHERE customer.customerid = address.customerid 
AND address.country="USA"
The same query in mongo would look like
db.customers.find({"addresses.country":"USA"},{"name":true})
where customers is the collection in which we store our customers.

All Content Copyright © 2012 Pramod Sadalage. All Rights Reserved.