Saturday, 14 April 2012

Hadoop Hive Partitioned External Table - notes to self

Hive Partitioned External Table 

I had the same issue as Chris Zheng (see http://zhengdong.me/2012/02/22/hive-external-table-with-partitions/) re not being able to select anything out of my Hive external table partitions.
In fact, I solved this problem several weeks ago without realising when I was moving data from one directory to another and altered partition definitions for the move.

My data is being loaded in a simplistic way into the following directory structure - i.e. each day loads in load_dt=YYYYMMDD:

hdfs://data/myfeed/stg/load_dt=YYYYMMDD

E.g. given the following files:

cat 20120301_myfeed.dat

20120301|001500|test|A|35
20120301|003000|test|B|85
20120301|004500|test|A|25
20120301|010000|test|C|35
20120301|011500|test|A|95
20120301|013000|test|D|55
cat 20120301_myfeed.dat
20120302|001500|test|A|35
20120302|003000|test|B|85
20120302|004500|test|A|25
20120302|010000|test|C|35
20120302|011500|test|A|95
20120302|013000|test|D|55

Load them like this:

hadoop fs -put 20120301_myfeed.dat /data/myfeed/stg/load_dt=20120301
hadoop fs -put 20120302_myfeed.dat /data/myfeed/stg/load_dt=20120302


Create an external table (with load_dt partition) as follows:

set myfeed_stg_location=/data/myfeed/stg
set myfeed_stg_location;

set myfeed_stg=myfeed_stg_ext;
set myfeed_stg;

-- Suppose myfeed stg data had records like this
-- 20120301|001500|test|A|35
CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:myfeed_stg}
    ( event_dt            STRING
    , event_tm            STRING
    , category            STRING
    , code                STRING
    , num_hits            INT
     )
    COMMENT 'Table for myfeed staging data'
    PARTITIONED BY( load_dt STRING )
    ROW FORMAT DELIMITED FIELDS TERMINATED by '|'
    STORED AS TEXTFILE
    LOCATION '${hiveconf:myfeed_stg}';

I found as Chris Zheng did, that I got nothing when I selected anything from my myfeed_stg_ext table.

Turns out you need to add the partitions explicitly :( like so:

hive> alter table myfeed_stg_ext add partition (load_dt=20120301);
OK
Time taken: 0.751 seconds
hive> alter table myfeed_stg_ext add partition (load_dt=20120302);
OK
Time taken: 0.279 seconds
hive> select * from myfeed_stg_ext;
OK
20120301 001500 test A 35 20120301
20120301 003000 test B 85 20120301
20120301 004500 test A 25 20120301
20120301 010000 test C 35 20120301
20120301 011500 test A 95 20120301
20120301 013000 test D 55 20120301
20120302 001500 test A 35 20120302
20120302 003000 test B 85 20120302
20120302 004500 test A 25 20120302
20120302 010000 test C 35 20120302
20120302 011500 test A 95 20120302
20120302 013000 test D 55 20120302
Time taken: 0.501 seconds
hive> select * from myfeed_stg_ext where load_dt = 20120301;     
OK
20120301 001500 test A 35 20120301
20120301 003000 test B 85 20120301
20120301 004500 test A 25 20120301
20120301 010000 test C 35 20120301
20120301 011500 test A 95 20120301
20120301 013000 test D 55 20120301
Time taken: 0.314 seconds
hive> 



Here's a simple shell script to move the data from a existing directory structure /data/myfeed/stg to /data/myfeed/stg/load_dt=YYYYMMDD. Make sure it runs per month or change to handle month/year boundaries.



#!/bin/bash


day=20120301
while [ $day -le 20120331 ]
do


  echo "hadoop fs -mv /data/myfeed/stg/${day} /data/myfeed/stg/load_dt=${day}"
  hadoop fs -mv /data/myfeed/stg/${day} /data/myfeed/stg/load_dt=${day}
  if [ $? -ne 0 ]
  then
    echo "ERROR: hadoop mv failed"
    exit 1
  fi
  sleep 1  # don't need these sleeps - used during testing


  hive -e "ALTER TABLE myfeed_stg_pext ADD PARTITION (load_dt=${day}); select * from myfeed_stg_pext where load_dt = '$day' limit 10;"
  sleep 2  # don't need these sleeps - used during testing


  day=$(($day+1))


done


TBC ...

Read up on dynamic partitions ... could this be a more elegant approach?
And compression - lzo, others?
(http://www.mrbalky.com/2011/02/24/hive-tables-partitions-and-lzo-compression/)

Monday, 23 January 2012

Hadoop notes to self

Hadoop Tools Ecosystem

Useful link re the Hadoop tools ecosystem
http://nosql.mypopescu.com/post/1541593207/quick-reference-hadoop-tools-ecosystem

Hadoop standalone rebuild

See useful link

rm -rf /data/hdfs
rm -rf /data/tmpd_hdfs
hadoop namenode -format
start-all.sh

Hadoop install on CentOS - JDK + Cloudera distribution

# copy CDH tarfiles and jdk somewhere - say /tmp/downloads


cd /opt # or wherever you decide to install hadoop
# install jdk
/tmp/jdk-6u25-linux-x64.bin # Install jdk

# install hadoop apps
for `f in *cdh3*gz`
do
  tar -xvzf $f
done
# build soft links to current CDH version
for f in `ls -d *cdh3u3`; do   g=`echo $f| cut -d'-' -f 1`; ln -s $f $g; done
# check permissions and chown -R hadoop:hadoop if reqd


# edit /etc/profile and add necessary to the path e.g. +++

export JAVA_HOME="/opt/jdk"
PATH="$PATH:$JAVA_HOME/bin:/opt/hadoop/bin:/opt/hive/bin:/opt/pig/bin"
export HIVE_HOME=/opt/hive
export HADOOP_HOME=/opt/hadoop
export PIG_HOME=/opt/pig

Good Installation/config notes

Great notes re setting up a cluster


To be aware of

Small files in HDFS problem

Configuring

Read this article re configuration notes as a starter
Jobtracker hanging - memory issues - read here
Also read misconfiguration article
Transparent HugePages - see Linux reference and Greg Rahn's expose of THP issue on Hadoop

architectural notes here
Troubleshooting


$ hadoop fsck / 2>&1 | grep -v '\.\.\.'
FSCK started by hadoop (auth:SIMPLE) from /10.1.2.5 for path / at Tue Jun 19 08:16:59 BST 2012
 Total size: 14476540835550 B
 Total dirs: 6780
 Total files: 1040334 (Files currently being written: 3678)
 Total blocks (validated): 1207343 (avg. block size 11990412 B)
 Minimally replicated blocks: 1207343 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 0 (0.0 %)
 Mis-replicated blocks: 0 (0.0 %)
 Default replication factor: 3
 Average block replication: 3.0023208
 Corrupt blocks: 0
 Missing replicas: 0 (0.0 %)
 Number of data-nodes: 9
 Number of racks: 1
FSCK ended at Tue Jun 19 08:17:15 BST 2012 in 15878 milliseconds


The filesystem under path '/' is HEALTHY


$ hadoop dfsadmin -report
Configured Capacity: 65158503579648 (59.26 TB)
Present Capacity: 61841246261419 (56.24 TB)
DFS Remaining: 18049941311488 (16.42 TB)
DFS Used: 43791304949931 (39.83 TB)
DFS Used%: 70.81%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0


Decommissioning nodes from cluster


Decommissioning a node
  • Add to the exclude node file
  • Run hadoop dfsadmin -refreshNodes on the name node
This results in the data node added to the excludenode file to move all its data to other nodes in the clulster.

[hadoop@mynamenode conf]$ hadoop dfsadmin -report | more
Configured Capacity: 86201191694336 (78.4 TB)
Present Capacity: 81797299645664 (74.39 TB)
DFS Remaining: 32416639721472 (29.48 TB)
DFS Used: 49380659924192 (44.91 TB)
DFS Used%: 60.37%
Under replicated blocks: 315421
Blocks with corrupt replicas: 0
Missing blocks: 0

Name: mydecommnode:50010
Rack: /dc1/r16
Decommission Status : Decommission in progress
Configured Capacity: 7239833731072 (6.58 TB)
DFS Used: 6869802995712 (6.25 TB)
Non DFS Used: 369725145088 (344.33 GB)
DFS Remaining: 305590272(291.43 MB)
DFS Used%: 94.89%
DFS Remaining%: 0%
Last contact: Wed May 08 11:37:15 BST 2013

Removing the decommissioned datanode
  • Kill hadoop java processes still running on the node
  • Remove from slaves file on the name nodes
  • Leave in the exclude node file until a cluster reboot (changed in newer versions of Hadoop TBC?)
Fsck of hdfs shows replication violations/issues

Solution is to toggle the replication factor down one and then back to where it should be.
See example below.

$ ./fsck_all.sh | head -20
fsck'ing fs [/data]
FSCK started by hadoop (auth:SIMPLE) from / for path /data at Tue May 14 08:33:37 BST 2013
/data/myfeed/load_dt=20130430/batch=202/myfile1_20130430110747+0100_20130430110848+0100_src1_0224180.dat.lzo:  Replica placement policy is violated for blk_-1157106589514956189_3885959. Block should be additionally replicated on 1 more rack(s)
.......................Status: HEALTHY

So run the following:


$ hadoop fs -setrep 2 /data/myfeed/load_dt=20130430/batch=202/myfile1_20130430110747+0100_20130430110848+0100_src1_0224180.dat.lzo

Replication 2 set: hdfs://mynamenode/data/myfeed/load_dt=20130430/batch=202/myfile1_20130430110747+0100_20130430110848+0100_src1_0224180.dat.lzo

$ hadoop fs -setrep 3 /data/myfeed/load_dt=20130430/batch=202/myfile1_20130430110747+0100_20130430110848+0100_src1_0224180.dat.lzo

Replication 3 set: hdfs://mynamenode/data/myfeed/load_dt=20130430/batch=202/myfile1_20130430110747+0100_20130430110848+0100_src1_0224180.dat.lzo


And the replica violations disappear.

Also use this approach to solve underreplication issues.


Sunday, 8 January 2012

Managing Teradata TTU patch versions on RHE Linux clients

Teradata provides base CDs to install the TTU client software on various platforms.
Select the CD for the platform you need to install on and run the setup script. (I installed twice as there were errors reported due to dependency issues the first time but this was probably not necessary). This will install the base versions of the software.
You then need to download the latest versions of the TTU modules from the Teradata-at-your-Service (T@YS) website.
Unfortunately not all patch downloads require the same install procedure and you need to read the README notes to determine the best order for the install. Also - it appears that the initial installer installed some modules that weren't necessary. I have been meaning to work on an automatic upgrader but you seldom do these upgrades so haven't got round to it (something Teradata should provide imho). I will post if I get round to it.

This small script is used to show the current Teradata TTU versions running on our RHE Linux clients.


#!/bin/ksh


PATCH_LIST="bteq-12 cliv2_64-12 cliv2-12 mqaxsmod-12 npaxsmod-12 papi_x8664_c000-12 papic000-12 pddlc000-12 pdtcc000-12 pexp_x664_c000-12 pexpc000-12 picu_x8664_c000-12 picuc000-12 pinsc000-12 piom-12 plod_x8664_c000-12 plodc000-12 podbcc000-12 poscc000-12 posl_x8664_c000-12 poslc00-12 pselc000-12 pstm_x8664_c000-12 pstmc000-12 pupd_x8664_c000-12 pupdc000-12 tbldc000-12 tdicu_64-12 tdicu-12 tdodbc-12 TeraGSS_redhatlinux-i386-12 TeraGS_suselinux-x8664-12"


for p in `echo $PATCH_LIST`
do
  #echo "Check for patch $p"
  rpm -qa | grep $p
  if [ $? -ne 0 ]
  then
    echo "Patch missing: $p"
  fi
done

Friday, 30 December 2011

Korn shell wrapper script for a Teradata FastExport


Thanks to Euan Murray for this improvement on my slow bteq extract.
Uses Teradata FastExport fexp and reduced the extract time from hours to minutes.
Posted here as example wrapper shell script for a Teradata FastExport.
N.B. Caution: remember to coalesce any fields that can yield null. Also the comma can cause problems


#!/bin/ksh
# set -x
# Program: td_fexp_shell_wrapper_eg.ksh
# Description: Example shell wrapper script to fast export lots of data from Teradata
# Version: 0.1 glour    15-Jul-2011 Initial version
# Version: 0.2 emurray1 14-Oct-2011 Altered to be fexp
# -------------------------------------------------------------------------


PRG=`basename $0 .ksh` # i.e. PRG="td_fexp_shell_wrapper_eg"
eval $@ # evaluate any command line args


LOGON_STR="prodtd/user,passwd;" # the TD username and passwd
LOCATION=/output_path/ # the location of the output file
DATAFILE=${LOCATION}/${PRG}.dat # the output file
DATA_DATE=$(date +%Y-%m-%d -d "$(date +%d) days ago") # gets the last day of previous month
echo ${DATA_DATE}
DEBUG=0 # for debugging set to 1


>$DATAFILE # empty the output file


fexp > /dev/null 2>&1 <
.logon ${LOGON_STR}
.LOGTABLE   DATABASE.FExp_Log;
.BEGIN EXPORT  SESSIONS 48;
.EXPORT OUTFILE ${DATAFILE};
SELECT ',' ||
a.aaaa1 || '|' ||
trim(a.aaaa2) ||'|'||
coalesce(b.bbbb1,'??') ||'|'||
coalesce(c.cccc1,'??') ||'|'||
        ... more fields here ...
cast (c.cccc2_dt as date  format 'YYYYMMDD') (TITLE '')
FROM db1.table_a a, db2.table_b b, db3.table_c c
WHERE a.aaaa1 = b.bbbb1
AND b.bbbb4 = c.cccc3
AND     '${DATA_DATE}' between a.aaaa_from_Dt and st.aaaa_to_Dt
AND     ... other clauses ...;
.END EXPORT;
.LOGOFF;
EOF
RET_CODE=$?
if [[ ${RET_CODE} -ne 0 ]]
then
        echo fast export failed investigate before proceeding
        return
fi


# clean up the extra control chars in fast export output file
echo removing fast load extra chars
sed -r 's@^[[:cntrl:][:print:]]*,@@g' ${DATAFILE} > ${LOCATION}/${PRG}.${DATA_DATE}.dat
RET_CODE=$?
if [[ ${RET_CODE} -ne 0 ]]
then
        echo cleanse failed investigate before proceeding
        exit
else
        echo Cleanse complete removing original file
        rm ${DATAFILE}
fi

Tuesday, 13 December 2011

Perl TCP Socket Programs


3 small pieces of perl code for the syntax

  1. listener.pl - server socket script to listen and consume a stream of ASCII lines from a socket
  2. producer.pl - client socket script to produce/write a stream of XML (ASCII lines) supplied in a file onto a socket
  3. consumer.pl - client socket script to consume a stream of XML ASCII lines from a socket 





listener.pl - simple server



#!/usr/bin/perl
use IO::Socket;
use POSIX qw/strftime/;


$|=1;


my $sock = new IO::Socket::INET (
                                 LocalHost => 'localhost',  # rem to change to server ip addr if not running on same machine as the client
                                 LocalPort => '5555',
                                 Proto => 'tcp',
                                 Listen => 1,
                                 Reuse => 1,
                                );
die "Could not create socket: $!\n" unless $sock;


$count = 0;
$num_recs = 0;
$num_files = 1;
$max_recs = 30000;
$path = "/data/";
$file_root = "test";
$datetime = strftime('%Y%m%d%H%M%S',localtime);
open OUT, "|gzip -c > ${path}${file_root}_${datetime}_${num_files}.gz" or die "unable to create OUT";


my $new_sock = $sock->accept();


while(<$new_sock>) {
        $line = $_;
        #print "line: $count: $num_recs : $line";
        $count++;
        if (m#
        {
                print OUT $line;
                $num_recs++;
                if (($num_recs % $max_recs) == 0)
                {
                        #print "in reset : $count : $num_recs \n";
                        close(OUT);
                        $num_files++;
                        $datetime = strftime('%Y%m%d%H%M%S',localtime);
                        open OUT, "|gzip -c > ${path}${file_root}_${datetime}_${num_files}.gz" or die "unable to create OUT";
                }
        }
        else
        {
                #print "$count : $num_recs : in else\n";
                print OUT $line;
        }
}
close (OUT);
close($sock);




producer.pl - simple producer client



#!/usr/bin/perl


use IO::Socket;


my $sock = new IO::Socket::INET (
                                 PeerAddr => 'localhost',   # rem to chg to server ip addr if not running socket client and server on same server
                                 PeerPort => '5577',
                                 Proto => 'tcp',
                                );
die "Could not create socket: $!\n" unless $sock;


open IN, "xml.dump" or die "unable to open IN";


$i = 0;
print "before while\n";
while ()
{
        $i++;
        #print "in loop: $i \n";
        print $sock "$_";
}


close(IN);
close($sock);




consumer.pl - client consumer/reader of socket



#!/usr/bin/perl


use IO::Socket;
use POSIX qw/strftime/;
use File::Path;


my $sock = new IO::Socket::INET (
                                 PeerAddr => 'localhost', # rem to replace with svr ip if not on same machine as socket server
                                 PeerPort => '1099',
                                 Proto => 'tcp',
                                );
die "Could not create socket: $!\n" unless $sock;




$|=1;


$count = 0;
$num_recs = 0;
$num_files = 1;
$max_recs = 60000;
$path = "/data";
$file_root = "test";
$datetime = strftime('%Y%m%d%H%M%S',localtime);
$yyyymmdd = strftime('%Y%m%d',localtime);
unless (-d "${path}${yyyymmdd}")
{
        mkpath("${path}/${yyyymmdd}") or die "Unable to mkpath(${path}${yyyymmdd}) ($!)\n";
}
open OUT, "|gzip -c > ${path}${yyyymmdd}/${file_root}_${datetime}_${num_files}.gz" or die "unable to create OUT";


# client read from socket
while(<$sock>) {
        $line = $_;
        #print "line: $count: $num_recs : $line";
        $count++;
        if (m#
        {
                print OUT $line;
                $num_recs++;
                if (($num_recs % $max_recs) == 0)
                {
                        #print "in reset : $count : $num_recs \n";
                        close(OUT);
                        $num_files++;
                        $datetime = strftime('%Y%m%d%H%M%S',localtime);
                        $yyyymmdd = strftime('%Y%m%d',localtime);
                        unless (-d "${path}${yyyymmdd}")
                        {
                                mkpath("${path}${yyyymmdd}") or die "Unable to mkpath(${path}${yyyymmdd}) ($!)\n";
                        }
                        open OUT, "|gzip -c > ${path}${yyyymmdd}/${file_root}_${datetime}_${num_files}.gz" or die "unable to create OUT";
                }
        }
        else
        {
                #print "$count : $num_recs : in else\n";
                print OUT $line;
        }
}
close (OUT);
close($sock);

Friday, 2 December 2011

Useful links etc

TBC - just a bunch of useful links

Port Forwarding

ssh -L localport:localhost:remoteport remotehost

Hadoop rebuild


rm -rf /data/hdfs
rm -rf /data/tmpd_hdfs
hadoop namenode -format
start-all.sh


Emailing attachment on Linux (CentOS 6.1)


mailx -s "Example send PDF file" -a mypdf.pdf myemailaddress@mydomain.com <
pdf test mail
EOF


Other info

Solaris

Check for FC

# fcinfo hba-port
No Adapters Found
or


# fcinfo hba-port|grep -i wwn
HBA Port WWN: 2100001b321c25ba
        Node WWN: 2000001b321c25ba
HBA Port WWN: 2101001b323c25ba
        Node WWN: 2001001b323c25ba
HBA Port WWN: 2100001b321c08b9
        Node WWN: 2000001b321c08b9
HBA Port WWN: 2101001b323c08b9
        Node WWN: 2001001b323c08b9


pgp (Network Associates Freeware version)

To view keys on keyring
/opt/PGP/pgp -kv


To add key to keyring
/opt/PGP/pgp -ka


To edit the trust level of the 's key
/opt/PGP/pgp -ke [keyring]

To pgp encrypt a bunch of files (in this example a directory full of *.gz files
userid=xxxxxx    # the userid associated with the recipient's public key
for f in `ls *.gz`
do
  echo $f
  if [ ! -f ${f}.pgp ]
  then
    /opt/PGP/pgp -e $f $userid
    if [ $? -ne 0 ]
    then
      echo "ERROR: Unable to pgp encrypt file: $f"
      exit 1
    fi
  fi
done

Stop/start Solaris service - e.g. httpd

svcadm -v enable /network/http:apache2
svcadm -v disable /network/http:apache2

Swappiness in Linux

See Scott Alan Miller's (SAM's) article on swappiness
He says ...
"On a latency sensitive system or a system where disk IO is at a premium, lowering this number is generally advisable".
So for hadoop which is typically disk IO centric, you want to lower this - even set it to 0.
On Linux system run:
sysctl vm.swappiness
or
grep vm.swappiness /etc/sysctl.conf 
To set to 0:
sysctl vm.swappiness=0
or

echo "vm.swappiness = 0" >> /etc/sysctl.conf

For virtualised system he recommends setting to 10.
And to profile performance before and after the change.