Tuesday 19 June 2012

Pig notes to self


Some commands


# Note SUBSTRING is like a python slice
# so suppose field x has "abcdfegh"
# SUBSTRING(x,3,4) => "d"
# SUBSTRING(x,2,5) => "cdef"


Note this code is there for syntax purposes only - it does nothing meaningful ...


comments 


/* .... over multiple lines ...*/


-- use -param arg1='abcd' on the command line
-- use -param myvar='xyz' on the command line
%default arg1 'default value'
%default myvar 'default value'


REGISTER myudf.jar;
REGISTER piggybank.jar;


DEFINE SUBSTRING org.apache.pig.piggybank.evaluation.string.SUBSTRING();
DEFINE LENGTH  org.apache.pig.piggybank.evaluation.string.LENGTH();


my_file = LOAD '$myfile' USING PigStorage('|') AS (col1:chararray, col2:double, col3:long);
my_file = DISTINCT my_file; -- remove duplicates


my_recs = FOREACH my_file GENERATE SUBSTRING(col1,0,14) AS mycol, null AS col4:chararray, (LENGTH(col1) < 3 ? col1 : SUBSTRING(REPLACE(col1,' ',''), 0,LENGTH(REPLACE(col1,' ',''))-2)) AS col5:chararray, col2, col3;


-- CONCAT(myudf.ZeroPad6Left(col1), myudf.ZeroPad6Left(col1)) AS col6:chararray


my_joined = JOIN my_recs by (col1, col2), my_recs by (col1,col2);


my_joined = FILTER my_joined BY (col3 < 1000);


my_joined2 = JOIN my_joined by col1 LEFT OUTER, my_recs by col1;


my_fin_rec = FOREACH my_joined2 GENERATE ;


STORE my_fin_rec INTO '$OUTPUTfile' USING PigStorage('|');

Saturday 9 June 2012

Transferring Data via SSH - notes to self

Notes re transferring Data via SSH

ssh -c arcfour

If using SSH (scp/sftp/rsync with ssh), you can achieve speed enhancements using "-c arcfour" (sacrificing a little security - might be ok in-house e.g.). See notes re SSH from Charles Martin Reid's wiki.

Example using rsync

rsync can sync entire directory structures but this script needed data positioned in a certain way. rsync can do loads and is a good starting point ...
This script could/should be rewritten to make more use of rsync features.


#!/bin/ksh


eval $@


PUBKEY=${HOME}/.ssh/mykey.pub
svrname=`uname -n | cut -c1-8`
srcdir=/mysrcdir
sftpUsr=remuser
prisftpserver=remsvr
remdir=/remdestdir


cd ${srcdir}


START_DAY=${START_DAY:-`date --date="1 days ago" +%Y%m%d`}
END_DAY=${END_DAY:-`date --date="1 days ago" +%Y%m%d`}


DAY=${START_DAY}
while [ $DAY -le $END_DAY ]
do


echo "Starting DAY=$DAY ..."


echo "`date +'%Y/%m/%d %H:%M:%S'`|Start|${DAY}"


# Try and create the directory - it may have already be created
ssh -i ${PUBKEY} -q ${sftpUsr}@${prisftpserver} "mkdir ${remdir}/${DAY}; chmod 777 ${remdir}/${DAY}"


# replace with the pattern matching the files you want rsync'd
rsync -av --rsync-path=/opt/sfw/bin/rsync --rsh="ssh -i ${PUBKEY}"   ${sftpUsr}@${prisftpserver}:${remdir}/${DAY}/${svrname}


echo "`date +'%Y/%m/%d %H:%M:%S'`|Complete|${DAY}"


DAY=$(($DAY+1))


done


Example not using rsync


#!/bin/ksh
# script built by several hence slightly different formatting stds used :(


eval $@


PUBKEY=${HOME}/.ssh/mykey.pub
svrname=`uname -n | cut -c1-8`   # local server
srcdir=/src_logs        # replace with location of source data files
sftpUsr=remuser          # replace with remote user
prisftpserver=remserver  # replace with remote server
remdir=/rem_logs         # replace with location of destination directory


cd ${srcdir}


# this example caters for daily logfiles
START_DAY=${START_DAY:-`date --date="1 days ago" +%Y%m%d`}
END_DAY=${END_DAY:-`date --date="1 days ago" +%Y%m%d`}


DAY=${START_DAY}
while [ $DAY -le $END_DAY ]
do


echo "Starting DAY=$DAY ..."


# Try and create the directory - it may have already be created
ssh -i ${PUBKEY} -q ${sftpUsr}@${prisftpserver} "mkdir ${remdir}/${DAY}; chmod 777 ${remdir}/${DAY}"


for filename in `ls -1 `  # replace
do


base_filename=`basename ${filename} .gz`
dir_filename=`dirname ${filename}`


scp_count=0
scp_error=1


while [ $scp_error -ne 0 ] && [ $scp_count -le 2 ] # give up after 3 scp attempts
do


scp_count=$(($scp_count+1))
echo "`date +'%Y/%m/%d %H:%M:%S'`|Started (${scp_count})|$filename|${base_filename}.gz"


# throttle speed to 1M with 120sec timeout to handle hanging scp's
scp -i ${PUBKEY} -l100000 -o ConnectTimeout=120 -q ${filename} ${sftpUsr}@${prisftpserver}:${remdir}/${DAY}/${svrname}_${dir_filename}_${base_filename}.gz
# use arcfour cipher which is faster but less secure with 120sec timeout to handle hanging scp's
#scp -i ${PUBKEY} -c arcfour -o ConnectTimeout=120 -q ${filename} ${sftpUsr}@${prisftpserver}:${remdir}/${DAY}/${svrname}_${dir_filename}_${base_filename}.gz
scp_error=$?


done

echo "`date +'%Y/%m/%d %H:%M:%S'`|Complete|${filename}|${base_filename}.gz"


done


DAY=$(($DAY+1))


done


Streaming data


Flume 
Scribe
Storm
S4


TBC

Saturday 2 June 2012

Dumbo python - links and notes to self

https://github.com/klbostee/dumbo/wiki/Short-tutorial

https://raw.github.com/klbostee/dumbo/dbeae6c939cf7ef84ac81996041fc368df054c52/examples/join.py

http://dumbotics.com/category/examples/

https://github.com/klbostee/dumbo/wiki/Example-programs