Friday, 22 February 2013

Cheap archival NAS storage using BackBlaze design

BackBlaze have a great low-cost storage product.
The best part is they have open sourced their design.

They have just announced a new 3rd generation storage design as reported by TheRegister costing $2000 for the chassis holding 45 disks (disks not included in the price).

Interestingly Netflix were influenced by the BackBlaze design for their 100TB (36 x 3TB) design.


Thursday, 21 February 2013

BI, DW DBMS, Big Data articles

Articles covering the BI, DW, DBMSs, NoSQL and Big Data



13 Big Data Vendors to watch in 2013 - including AWS, 10gen, Cloudera, Hortonworks, 

Random entry - Graph DB Neo4j overview but the first 5 mins gives an interesting overview of Key-Vaue Pair vs ColumnStore vs Document vs Graph Databases


Big Data Architectures patterns by Eddie Satterley

Wednesday, 20 February 2013

Balancing an HDFS cluster (including java LeaseChecker OutOfmemoryError - still unresolved)

HDFS Balancer

Read the following articles for starters:

Yahoo tutorial module on Hadoop rebalancing 
Rebalancer Design PDF

Architecture for Open Source Applications HDFS - see rebalancing paragraph but take care talks about the threshold being between 0 and 1

Log on a the hadoop user (the user that runs our cluster is called hadoop) 
Change to the ${HADOOP_HOME}/bin where the hadoop scripts reside.
Then run the start-balancer.sh.
The default is a balancing threshold of 10% so choose something a little lower.
I chose 5%.
I should have started closer to 10% like 9% or 8%.
Why? Because start_balancer.sh TAKES FOREVER!
Use hadoop dfsadmin -report to check the redistribution of the space.

[hadoop@mynode hadoop]$ cd $HADOOP_HOME/bin


[hadoop@mynode bin]$ ./start-balancer.sh -threshold 5
starting balancer, logging to /opt/hadoop-0.20.2-cdh3u3/bin/../logs/hadoop-hadoop-balancer-mynode.out
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
Feb 19, 2013 6:44:27 PM           0                 0 KB           516.65 GB              20 GB
[hadoop@mynode bin]$ hadoop dfsadmin -report


[hadoop@mynode bin]$ cat /opt/hadoop-0.20.2-cdh3u3/bin/../logs/hadoop-hadoop-balancer-mynode.out
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
Feb 19, 2013 6:44:27 PM           0                 0 KB           516.65 GB              20 GB
Feb 19, 2013 7:05:57 PM           1              2.39 GB           514.07 GB              20 GB
Feb 19, 2013 7:28:28 PM           2              4.89 GB           511.59 GB              20 GB
Feb 19, 2013 7:50:29 PM           3              7.32 GB            509.2 GB              20 GB
Feb 19, 2013 8:12:29 PM           4              9.74 GB           506.67 GB              20 GB
Feb 19, 2013 8:34:30 PM           5             12.18 GB           504.51 GB              20 GB
Feb 19, 2013 8:56:30 PM           6             14.66 GB           502.14 GB              20 GB
Exception in thread "LeaseChecker" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:78)
at org.apache.hadoop.ipc.Client$Connection.sendParam(Client.java:754)
at org.apache.hadoop.ipc.Client.call(Client.java:1080)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
at $Proxy1.renewLease(Unknown Source)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at $Proxy1.renewLease(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.renew(DFSClient.java:1282)
at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.run(DFSClient.java:1294)
at java.lang.Thread.run(Thread.java:662)


[hadoop@mynode bin]$ ./stop-balancer.sh 
./stop-balancer.sh: fork: retry: Resource temporarily unavailable
./stop-balancer.sh: fork: retry: Resource temporarily unavailable
./stop-balancer.sh: fork: retry: Resource temporarily unavailable
./stop-balancer.sh: fork: retry: Resource temporarily unavailable
./stop-balancer.sh: fork: Resource temporarily unavailable
[hadoop@mynode bin]$ w
 21:19:18 up 231 days, 11:44,  2 users,  load average: 0.03, 0.01, 0.00
USER     TTY      FROM              LOGIN@   IDLE   JCPU   PCPU WHAT

[hadoop@mynode bin]$ hadoop job -list
/opt/hadoop/bin/hadoop: fork: retry: Resource temporarily unavailable
/opt/hadoop/bin/hadoop: fork: retry: Resource temporarily unavailable
/opt/hadoop/bin/hadoop: fork: retry: Resource temporarily unavailable
/opt/hadoop/bin/hadoop: fork: retry: Resource temporarily unavailable
/opt/hadoop/bin/hadoop: fork: Resource temporarily unavailable


[hadoop@mynode bin]$ cd ../pids
[hadoop@mynode pids]$ ls -atlr
total 20
drwxr-xr-x 17 hadoop hadoop 4096 Mar  8  2012 ..
-rw-rw-r--  1 hadoop hadoop    5 Feb 13 12:20 hadoop-hadoop-namenode.pid
-rw-rw-r--  1 hadoop hadoop    5 Feb 13 12:21 hadoop-hadoop-jobtracker.pid
-rw-rw-r--  1 hadoop hadoop    5 Feb 19 18:44 hadoop-hadoop-balancer.pid
drwxr-xr-x  2 hadoop hadoop 4096 Feb 19 18:44 .


[hadoop@mynode bin]$ kill -0 2329
[hadoop@mynode bin]$ echo $?
0
[hadoop@mynode bin]$ kill 2329
[hadoop@mynode bin]$ echo $?
0
[hadoop@mynode bin]$ ps -ef | grep 2329 | grep -v grep
[hadoop@mynode bin]$ 


Sometime later ... restarted a start_balancer.sh using 9% then 8% threshold ...


[hadoop@mynode bin]$ ./start-balancer.sh -threshold 9
starting balancer, logging to /opt/hadoop-0.20.2-cdh3u3/bin/../logs/hadoop-hadoop-balancer-mynode.out
[hadoop@mynode bin]$ tail -10f /opt/hadoop-0.20.2-cdh3u3/bin/../logs/hadoop-hadoop-balancer-mynode.out
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
The cluster is balanced. Exiting...
Balancing took 629.0 milliseconds

[hadoop@mynode bin]$ ./start-balancer.sh -threshold 8
starting balancer, logging to /opt/hadoop-0.20.2-cdh3u3/bin/../logs/hadoop-hadoop-balancer-mynode.out

Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
Mar 15, 2013 6:21:37 PM           0                 0 KB            63.46 GB              10 GB
Mar 15, 2013 6:42:37 PM           1              1.22 GB            62.13 GB              10 GB
...









Thursday, 14 February 2013

Hadoop Hive insert into partition table example script

Here's an example of creating Hadoop hive daily summary partitions and loading data from a Hive transaction table into newly created partitioned summary table.

The Hadoop Hive Manual has the insert syntax covered neatly but sometimes it's good to see an example.

Background

Colleagues wanted us to produce a smaller query set based on a large (billion rows per day) transaction table called big_txns that was partitioned by load date (load_dt).
They wanted the following in the smaller query set (1,000s of records):

  • Transaction Day - the day the transaction occurred (txn_dt was datetime yyyymmddHHMMSS so needed substringing for yyyymmdd)
  • Transaction Type - an interesting attribute
  • Counts - totals x transaction day x txn_type


Hive Create Partitioned External Table


DROP TABLE IF EXISTS txn_summ_x_txn_type;

CREATE EXTERNAL TABLE txn_summ_x_txn_type
    ( 
txn_dt     STRING,
        txn_type   STRING,
        cnt        BIGINT
     )
    COMMENT 'Transaction summary table showing counts x txn_dt x txn_type partitioned by load_dt'
    PARTITIONED BY( load_dt STRING )
    ROW FORMAT DELIMITED FIELDS TERMINATED by '|'
    STORED AS TEXTFILE
    LOCATION '/data/txn/summ/txn_summ_x_txn_type';


Hive Insert into Daily Partitions

Here is a very basic shell script to build this summary table.
[Aside - originally I had the hive statement using self-consuming input a la ...
hive <
...
EOF
but when I ran it in the background it kept stopping for I don't know what reason.
So resorted to hive -e "hive cmds".
Need to go back and look at this]


#!/bin/bash
# Descrption: Insert the txn_type counts x txn_dt partitioned by load_dt
# Usage:      ins_txn_summ_x_txn_type.sh START_DAY=YYYYMMDD END_DAY=YYYYMMDD
# --------------------------------------------------------------------------


dohive() {

load_dt=$1

hive -e "
SET mapred.child.java.opts=-Xmx1024M;

alter table txn_summ_x_txn_type add if not exists partition (load_dt='${load_dt}');

insert overwrite table txn_summ_x_txn_type
partition ( load_dt = '${load_dt}' )
select SUBSTR(t.txn_dt,1,8) as txn_dt,
       t.txn_type, 
       count(*) as cnt
from big_txns t
where t.load_dt='${load_dt}'
group by SUBSTR(t.txn_dt,1,8),
         t.txn_type;
"

}

#START_DAY=20121001
#END_DAY=20121031

# Allow one to change the START_DAY and END_DAY
eval $@

if [ $# -ne 2 ]
then
  echo "ERROR: usage: ins_txn_summ_x_txn_type.sh START_DAY=YYYYMMDD END_DAY=YYYYMMDD (do not cross month boundaries - if non-Linux - see comment below)"
  exit 1
fi

DAY=$START_DAY

while [ $DAY -le $END_DAY ]
do
  echo DAY $DAY
  dohive $DAY 
  # DAY=$(($DAY+1)) # use this and don't span months if non-Linux
  # on linux this DAY increment works a treat - tx Matt Livesey for reminding
  DAY=`date --date=${DAY}' +1 day' +%Y%m%d`
done

Friday, 1 February 2013

Move data from one Hadoop cluster to another using distcp (including a note at the end re distcp'ing between different hadoop versions)

Cluster Specs
I have 2 HDFS clusters in the same network domain (1G networking).
The source cluster has 9 data nodes (16 core, 48G RAM, 4x2TB disks) and the target cluster has 18 data nodes (16 core, 48 G RAM, 4x2TB disks).

Task
I needed to transfer a week's worth of data from one to the other - 80G per day. 

Attempt 1 - Slow and Fiddly
Initially I used the following which was slow and fiddly:
  • copyToLocal - HDFS to NFS - 45 mins
  • copyFromLocal - NFS to HDFS - 1 hr 45 mins
Attempt 2 - Fast and Simple
Then I used hadoop distcp
  • hadoop distcp - 4mins
  • Rename path in HDFS cluster #2  (to be in line with naming convention there) - 1 sec
  • Create hive partitions in HDFS cluster 2 - 1 min

Conclusion
Hadoop distcp is pretty impressive running across 2 quiet clusters.

Misc Notes
Read more about hadoop distcp here.

Command used (note - just happened that hdfs nn was listening on different ports in the 2 clusters):

nohup hadoop distcp hdfs://hdfssvr1:8020/data/abc/stg/load_dt=20121127 hdfs://hdfssvr2:54310/user/hive/warehouse/abc > /var/tmp/distcp_20121127.log 2>&1 &

(Note the above command copies the dir load_dt=20121127 [full of subdirs in my case] into /user/hive/warehouse/abc dir in the remote hdfs cluster)

Logging generated by the command:

13/02/01 18:41:07 INFO tools.DistCp: srcPaths=[hdfs://hdfssvr1:8020/data/abc/stg/load_dt=20121127]
13/02/01 18:41:07 INFO tools.DistCp: destPath=hdfs://hdfssvr2:54310/user/hive/warehouse/abc
13/02/01 18:41:09 INFO tools.DistCp: sourcePathsCount=4901
13/02/01 18:41:09 INFO tools.DistCp: filesToCopyCount=4311
13/02/01 18:41:09 INFO tools.DistCp: bytesToCopyCount=80.1g
13/02/01 18:41:10 INFO mapred.JobClient: Running job: job_201302011505_0006
13/02/01 18:41:11 INFO mapred.JobClient:  map 0% reduce 0%
13/02/01 18:41:26 INFO mapred.JobClient:  map 1% reduce 0%
13/02/01 18:41:27 INFO mapred.JobClient:  map 2% reduce 0%
13/02/01 18:41:28 INFO mapred.JobClient:  map 3% reduce 0%
13/02/01 18:41:29 INFO mapred.JobClient:  map 4% reduce 0%
13/02/01 18:41:30 INFO mapred.JobClient:  map 5% reduce 0%
13/02/01 18:44:05 INFO mapred.JobClient:  map 93% reduce 0%
13/02/01 18:44:07 INFO mapred.JobClient:  map 94% reduce 0%
13/02/01 18:44:10 INFO mapred.JobClient:  map 95% reduce 0%
13/02/01 18:44:12 INFO mapred.JobClient:  map 96% reduce 0%
13/02/01 18:44:14 INFO mapred.JobClient:  map 97% reduce 0%
13/02/01 18:44:17 INFO mapred.JobClient:  map 98% reduce 0%
13/02/01 18:44:20 INFO mapred.JobClient:  map 99% reduce 0%
13/02/01 18:44:31 INFO mapred.JobClient:  map 100% reduce 0%
13/02/01 18:44:32 INFO mapred.JobClient: Job complete: job_201302011505_0006
13/02/01 18:44:32 INFO mapred.JobClient: Counters: 20
13/02/01 18:44:32 INFO mapred.JobClient:   Job Counters 
13/02/01 18:44:32 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=12572408
13/02/01 18:44:32 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/02/01 18:44:32 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/02/01 18:44:32 INFO mapred.JobClient:     Launched map tasks=186
13/02/01 18:44:32 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/02/01 18:44:32 INFO mapred.JobClient:   distcp
13/02/01 18:44:32 INFO mapred.JobClient:     Files copied=4311
13/02/01 18:44:32 INFO mapred.JobClient:     Bytes copied=85969881779
13/02/01 18:44:32 INFO mapred.JobClient:     Bytes expected=85969881779
13/02/01 18:44:32 INFO mapred.JobClient:   FileSystemCounters
13/02/01 18:44:32 INFO mapred.JobClient:     HDFS_BYTES_READ=85971609725
13/02/01 18:44:32 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=8955232
13/02/01 18:44:32 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=85969881779
13/02/01 18:44:32 INFO mapred.JobClient:   Map-Reduce Framework
13/02/01 18:44:32 INFO mapred.JobClient:     Map input records=4900
13/02/01 18:44:32 INFO mapred.JobClient:     Physical memory (bytes) snapshot=25844953088
13/02/01 18:44:32 INFO mapred.JobClient:     Spilled Records=0
13/02/01 18:44:32 INFO mapred.JobClient:     CPU time spent (ms)=3410430
13/02/01 18:44:32 INFO mapred.JobClient:     Total committed heap usage (bytes)=35171532800
13/02/01 18:44:32 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=201811800064
13/02/01 18:44:32 INFO mapred.JobClient:     Map input bytes=1473253
13/02/01 18:44:32 INFO mapred.JobClient:     Map output records=0
13/02/01 18:44:32 INFO mapred.JobClient:     SPLIT_RAW_BYTES=25854

Update - 30th June 2013

Moving data between CDH3u3 and CDH4.3

Run this on the new cluster.

nohup hadoop distcp hftp://mycdh3u3nn:50070/data/xyz/stg/load_dt=YYYYMMDD hdfs://mycdh430nn:8020/data/xyz/stg/ &

This results in the load_dt=YYYYMMDD hdfs directory being copied here:
            hdfs://data/xyz/stg/load_dt=YYYYMMDD 
in the new cluster

Note:

  • The use of the hftp protocol to read-only copy the data from the old cluster and write into the new cluster using hdfs protocol.
  • I used a unix user of the same name on both namenodes. This happened to be the hadoop supergroup user on the old cluster. You might need to work out the permissions to get this to work.