Index: contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/scribeFile.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/scribeFile.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/scribeFile.java (revision 0) @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// File generated by hadoop record compiler. Do not edit. +package org.apache.hadoop.hive.contrib.etltools.scribecopier; + +public class scribeFile extends org.apache.hadoop.record.Record { + private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo; + private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter; + private static int[] _rio_rtiFilterFields; + static { + _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("scribeFile"); + _rio_recTypeInfo.addField("year", org.apache.hadoop.record.meta.TypeID.IntTypeID); + _rio_recTypeInfo.addField("month", org.apache.hadoop.record.meta.TypeID.IntTypeID); + _rio_recTypeInfo.addField("day", org.apache.hadoop.record.meta.TypeID.IntTypeID); + _rio_recTypeInfo.addField("part", org.apache.hadoop.record.meta.TypeID.IntTypeID); + } + + private int year; + private int month; + private int day; + private int part; + public scribeFile() { } + public scribeFile( + final int year, + final int month, + final int day, + final int part) { + this.year = year; + this.month = month; + this.day = day; + this.part = part; + } + public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() { + return _rio_recTypeInfo; + } + public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) { + if (null == rti) return; + _rio_rtiFilter = rti; + _rio_rtiFilterFields = null; + } + private static void setupRtiFields() + { + if (null == _rio_rtiFilter) return; + // we may already have done this + if (null != _rio_rtiFilterFields) return; + int _rio_i, _rio_j; + _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()]; + for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) { + _rio_rtiFilterFields[_rio_i] = 0; + } + java.util.Iterator _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator(); + _rio_i=0; + while (_rio_itFilter.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next(); + java.util.Iterator _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator(); + _rio_j=1; + while (_rio_it.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next(); + if (_rio_tInfo.equals(_rio_tInfoFilter)) { + _rio_rtiFilterFields[_rio_i] = _rio_j; + break; + } + _rio_j++; + } + _rio_i++; + } + } + public int getYear() { + return year; + } + public void setYear(final int year) { + this.year=year; + } + public int getMonth() { + return month; + } + public void setMonth(final int month) { + this.month=month; + } + public int getDay() { + return day; + } + public void setDay(final int day) { + this.day=day; + } + public int getPart() { + return part; + } + public void setPart(final int part) { + this.part=part; + } + public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(this,_rio_tag); + _rio_a.writeInt(year,"year"); + _rio_a.writeInt(month,"month"); + _rio_a.writeInt(day,"day"); + _rio_a.writeInt(part,"part"); + _rio_a.endRecord(this,_rio_tag); + } + private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(_rio_tag); + year=_rio_a.readInt("year"); + month=_rio_a.readInt("month"); + day=_rio_a.readInt("day"); + part=_rio_a.readInt("part"); + _rio_a.endRecord(_rio_tag); + } + public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + if (null == _rio_rtiFilter) { + deserializeWithoutFilter(_rio_a, _rio_tag); + return; + } + // if we're here, we need to read based on version info + _rio_a.startRecord(_rio_tag); + setupRtiFields(); + for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) { + if (1 == _rio_rtiFilterFields[_rio_i]) { + year=_rio_a.readInt("year"); + } + else if (2 == _rio_rtiFilterFields[_rio_i]) { + month=_rio_a.readInt("month"); + } + else if (3 == _rio_rtiFilterFields[_rio_i]) { + day=_rio_a.readInt("day"); + } + else if (4 == _rio_rtiFilterFields[_rio_i]) { + part=_rio_a.readInt("part"); + } + else { + java.util.ArrayList typeInfos = (java.util.ArrayList)(_rio_rtiFilter.getFieldTypeInfos()); + org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID()); + } + } + _rio_a.endRecord(_rio_tag); + } + public int compareTo (final Object _rio_peer_) throws ClassCastException { + if (!(_rio_peer_ instanceof scribeFile)) { + throw new ClassCastException("Comparing different types of records."); + } + scribeFile _rio_peer = (scribeFile) _rio_peer_; + int _rio_ret = 0; + _rio_ret = (year == _rio_peer.year)? 0 :((year<_rio_peer.year)?-1:1); + if (_rio_ret != 0) return _rio_ret; + _rio_ret = (month == _rio_peer.month)? 0 :((month<_rio_peer.month)?-1:1); + if (_rio_ret != 0) return _rio_ret; + _rio_ret = (day == _rio_peer.day)? 0 :((day<_rio_peer.day)?-1:1); + if (_rio_ret != 0) return _rio_ret; + _rio_ret = (part == _rio_peer.part)? 0 :((part<_rio_peer.part)?-1:1); + if (_rio_ret != 0) return _rio_ret; + return _rio_ret; + } + public boolean equals(final Object _rio_peer_) { + if (!(_rio_peer_ instanceof scribeFile)) { + return false; + } + if (_rio_peer_ == this) { + return true; + } + scribeFile _rio_peer = (scribeFile) _rio_peer_; + boolean _rio_ret = false; + _rio_ret = (year==_rio_peer.year); + if (!_rio_ret) return _rio_ret; + _rio_ret = (month==_rio_peer.month); + if (!_rio_ret) return _rio_ret; + _rio_ret = (day==_rio_peer.day); + if (!_rio_ret) return _rio_ret; + _rio_ret = (part==_rio_peer.part); + if (!_rio_ret) return _rio_ret; + return _rio_ret; + } + public Object clone() throws CloneNotSupportedException { + scribeFile _rio_other = new scribeFile(); + _rio_other.year = this.year; + _rio_other.month = this.month; + _rio_other.day = this.day; + _rio_other.part = this.part; + return _rio_other; + } + public int hashCode() { + int _rio_result = 17; + int _rio_ret; + _rio_ret = (int)year; + _rio_result = 37*_rio_result + _rio_ret; + _rio_ret = (int)month; + _rio_result = 37*_rio_result + _rio_ret; + _rio_ret = (int)day; + _rio_result = 37*_rio_result + _rio_ret; + _rio_ret = (int)part; + _rio_result = 37*_rio_result + _rio_ret; + return _rio_result; + } + public static String signature() { + return "LscribeFile(iiii)"; + } + public static class Comparator extends org.apache.hadoop.record.RecordComparator { + public Comparator() { + super(scribeFile.class); + } + static public int slurpRaw(byte[] b, int s, int l) { + try { + int os = s; + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=z; l-=z; + } + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=z; l-=z; + } + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=z; l-=z; + } + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=z; l-=z; + } + return (os - s); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + static public int compareRaw(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + try { + int os1 = s1; + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + if (i1 != i2) { + return ((i1-i2) < 0) ? -1 : 0; + } + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + } + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + if (i1 != i2) { + return ((i1-i2) < 0) ? -1 : 0; + } + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + } + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + if (i1 != i2) { + return ((i1-i2) < 0) ? -1 : 0; + } + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + } + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + if (i1 != i2) { + return ((i1-i2) < 0) ? -1 : 0; + } + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + } + return (os1 - s1); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + int ret = compareRaw(b1,s1,l1,b2,s2,l2); + return (ret == -1)? -1 : ((ret==0)? 1 : 0);} + } + + static { + org.apache.hadoop.record.RecordComparator.define(scribeFile.class, new Comparator()); + } +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeUtils.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeUtils.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeUtils.java (revision 0) @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.etltools.scribecopier; + +import java.io.*; +import java.util.*; + + +/** + * A set of utilities around using scribe files + * + * @author Joydeep Sen Sarma + */ +public class ScribeUtils { + + public static String getScribeFileName(scribeFile sf, String prefix) { + String s = String.format(prefix+"-%1$4d-%2$02d-%3$02d_%4$05d", + sf.getYear(), sf.getMonth(), sf.getDay(), + sf.getPart()); + return (s); + } + + public static String getDateString(scribeFile sf) { + return (String.format("%1$4d-%2$02d-%3$02d", + sf.getYear(), sf.getMonth(), sf.getDay())); + } + + + public static long pollSize(scribeFile sf, String prefix, File baseDir) { + File sFile = new File(baseDir, getScribeFileName(sf, prefix)); + return sFile.length(); + } + + public static String getScribePrefix(String fileName) { + String [] strvec = fileName.split("-"); + if(strvec.length != 4) + return null; + else + return strvec[0]; + } + + public static scribeFile getScribeFile(String fileName) { + String [] strvec = null; + String [] partvec = null; + + try { + strvec = fileName.split("-"); + if(strvec.length != 4) + return null; + + partvec = strvec[3].split("_"); + if(partvec.length != 2) + return null; + + return (new scribeFile(Integer.parseInt(strvec[1]), Integer.parseInt(strvec[2]), + Integer.parseInt(partvec[0]), Integer.parseInt(partvec[1]))); + } catch (NumberFormatException e) { + System.err.println(strvec[1]+"\t"+strvec[2]+"\t"+partvec[0]+"\t"+partvec[1]); + return null; + } + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeHdfsCopier.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeHdfsCopier.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/etltools/scribecopier/ScribeHdfsCopier.java (revision 0) @@ -0,0 +1,1075 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.etltools.scribecopier; + +import java.io.*; +import java.util.*; +import java.text.SimpleDateFormat; +import java.text.ParsePosition; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; + + +public class ScribeHdfsCopier implements Configurable { + + protected File regFile; + protected Path scribeDirFile; + protected String scribeDir; + protected GregorianCalendar targetCal; + protected String scribeFilePrefix; + protected Random randGen = new Random(); + protected int parallelism; + protected Configuration conf; + protected String jobNamePrefix; + protected int compressCopy; + protected String targetDir; + protected FileSystem fs; + protected FileSystem srcFs; + protected boolean batchMode, forceCopy; + protected String instName = null; + protected int maxNumFilesToCopy; + + Vector toWatch= null; + Date minDate = null; + String outputDir; + Set targetFileSet = new HashSet(); + String instance_list=null; + + protected Map> srcFileMap = new TreeMap>(); + protected Map> targetFileMap = new TreeMap>(); + protected Map> copyMap; + + String currentTime =null; + boolean dryrun; + /** + * instailize the dates to be copied. + * @param targetDate + * @param compressCopy + * @param targetDir + * @param batchMode + * @param forceCopy + * @throws IOException + */ + public ScribeHdfsCopier(Date targetDate, int compressCopy, String targetDir, + boolean batchMode, boolean forceCopy, String instance_list, int maxNumFilesToCopy, boolean dryrun) throws IOException { + this.compressCopy = compressCopy; + this.targetDir = targetDir; + this.batchMode = batchMode; + this.forceCopy = forceCopy; + this.instance_list = instance_list; + this.maxNumFilesToCopy = maxNumFilesToCopy; + this.dryrun = dryrun; + if (targetDate == null) { + targetCal = new GregorianCalendar(); + } else { + targetCal = new GregorianCalendar(targetDate.getYear()+1900 , targetDate + .getMonth(), targetDate.getDate()); + } + + + GregorianCalendar gc = (GregorianCalendar) targetCal.clone(); + + GregorianCalendar today = new GregorianCalendar(); + SimpleDateFormat sd = new SimpleDateFormat("yyyyMMddHHmmss"); + currentTime = sd.format(today.getTime()); + toWatch = new Vector(3); + + minDate = null; + for (int i = 0; i < 3; i++) { + // in batch mode - we only copy the data for the day indicated + if (i == 0 && batchMode) + continue; + + gc.add(Calendar.DAY_OF_MONTH, i - 1); + Date one = new Date(gc.get(Calendar.YEAR)-1900, gc.get(Calendar.MONTH), gc + .get(Calendar.DAY_OF_MONTH)); + + System.err.println("adding Date " + one.toString()); + if (minDate == null) + minDate = one; + else if (minDate.after(one)) + minDate = one; + + toWatch.add(one); + } + + + System.err.println("Min Date:"+minDate); + } + + /** + * Main Driver invokes all the steps + * 1. Get list files in source directory + * 2. Comprare this list with target directory to determine files to be copied. + * 3. Generate input files for mapper + * 4. Invoke mapred + * 5. Commit files + * 6. mark Done + * + */ + + public int run() throws Exception { + System.err.println("Start RunId="+currentTime+"time="+ new Date().toString()); + if (alreadyRunning()) { + System.out.print("Copier process already running"); + return 0; + } + int ret; + + System.err.println("Getting Src files..."); + + getFileList(scribeDirFile, srcFileMap); + System.err.println("getFileList RunId="+currentTime+"time="+ new Date().toString()); + System.err.println("Getting files to Copy..."); + copyMap = getFilesToCopy(srcFileMap, !batchMode); + Random r = new Random(); + String rdirname = "/tmp/" + r.nextInt(); + String odirname = "/tmp/" + r.nextInt(); + outputDir = "/tmp/" + r.nextInt(); + System.err.println("getFilesToCopy RunId="+currentTime+"time="+ new Date().toString()); + + + if (!fs.mkdirs(new Path(rdirname))) + throw new IOException("failed to create scratch dir:" + rdirname); + + if (!fs.mkdirs(new Path(outputDir))) + throw new IOException("failed to create output dir:" + outputDir); + + conf.set("outdir", outputDir); + + System.err.println("Output dir="+outputDir); + System.err.println("Generating copy file..."); + int num_files = genFileMap(rdirname, copyMap); + System.err.println("genFileMap RunId="+currentTime+"time="+ new Date().toString()); + if (dryrun) + { + System.err.println("Dryrun returning..."); + return 0; + } + if (num_files != 0){ + System.err.println("Run Copy process.."); + ret = runCopyProcess(rdirname, odirname); + if (ret != 0){ + System.err.println("Copy Failed :"+ ret); + return ret; + } + //System.err.println("runCopyProcess RunId="+currentTime+"time="+ new Date().toString()); + //System.err.println("commit"); + //commit(odirname); + //System.err.println("commit RunId="+currentTime+"time="+ new Date().toString()); + } + System.err.println("Mark Done"); + markDone(); + System.err.println("markDone RunId="+currentTime+"time="+ new Date().toString()); + + return 0; + } + + + + + /** + * Find the list of files in source directory. + * @param scribeDirFile + * @param fileMap + * @throws IOException + */ + private void getFileList(Path scribeDirFile, + Map> fileMap) throws IOException { + + String arr[] = null; + FileStatus[] subdirs = srcFs.listStatus(scribeDirFile); + fileMap.clear(); + + for (FileStatus onedir : subdirs) { + Path bdir = onedir.getPath(); + FileStatus[] files = srcFs.listStatus(bdir); + TreeSet fileSet = new TreeSet(); + + fileMap.put(bdir.getName(), fileSet); + for (FileStatus f : files) { + + String name = f.getPath().getName(); + if (name.endsWith(".gz")) { + name = name.replaceFirst("\\.gz$", ""); + } + //System.err.println("Adding name " +bdir + " "+ name); + scribeFile sf = ScribeUtils.getScribeFile(name); + String pfx = ScribeUtils.getScribePrefix(name); + if (pfx == null || !pfx.equals(scribeFilePrefix)) { + //System.err.println("Invalid prefix " + name); + continue; + } + if (sf == null) { + //System.err.println("Invalid name " + name); + continue; + } + fileSet.add(sf); + } + } + } + + + /** + * Compare source files with target files which has already been copied and deterime the files to be copied. + * Copy only those files which does not exists in target. + * @param srcMap + * @param ignoreLastFile + * @return + * @throws IOException + */ + private Map> getFilesToCopy( + Map> srcMap, boolean ignoreLastFile + ) throws IOException { + TreeMap> copyMap = new TreeMap>(); + + GregorianCalendar gc = (GregorianCalendar)targetCal.clone(); + gc.add(Calendar.DAY_OF_MONTH, 1); + Date tom = new Date(gc.get(Calendar.YEAR), gc.get(Calendar.MONTH), gc.get(Calendar.DAY_OF_MONTH)); + + GregorianCalendar today = new GregorianCalendar(); + Date currDate = new Date(today.get(Calendar.YEAR)-1900, today.get(Calendar.MONTH), today.get(Calendar.DAY_OF_MONTH)); + int markWhenDone = getConf().getInt("markWhenDone", -1); + + + getTargetFileList(); + System.err.println("getTargetFileList RunId="+currentTime+"time="+ new Date().toString()); + Iterator bkt_i = srcMap.keySet().iterator(); + while (bkt_i.hasNext()) { + + String bkt = bkt_i.next(); + TreeSet fileSet = new TreeSet(); + copyMap.put(bkt, fileSet); + Iterator file_i = srcMap.get(bkt).iterator(); + while (file_i.hasNext()) { + scribeFile sf = file_i.next(); + //System.err.println("copy: checking file :"+ bkt + " "+ sf); + Date sfd = scribeFileToDate(sf); + if (ignoreLastFile && !file_i.hasNext()) { + //Copy previous days file, if it has passed midnight. + if (markWhenDone != -1 && (today.get(Calendar.HOUR_OF_DAY) > markWhenDone) && currDate.after(sfd)){ + System.err.println("Not skipping last file because the day has passed "+ bkt + " " + sf+ " "+currDate+" "+sfd+" "+minDate); + } + else{ + + System.err.println("copy: skipping last file :"+ bkt + " " + sf); + break; + } + } + if (!toWatch.contains(sfd)) { + //System.err.println("copy: skipping outofdate file :"+ bkt + " " + sf); + continue; + } + if (scribeFileToDate(sf).equals(tom)){ + System.err.println("copy: skipp tomorrows file "+ bkt + " " + sf); + continue; + } + + //Path pt = new Path(targetFileName(sf, bkt)); + if (!isTargetFileExits(sf, bkt)) { + //System.err.println("copy: Adding scribe file :" +bkt + " "+ sf); + fileSet.add(sf); + } + } + } + return copyMap; + } + + /** + * Generate input file for mapper. input file contains names of files to be copied to target. + * @param dir_name + * @param copyMap + * @throws IOException + */ + + + public int genFileMap(String dir_name, Map> copyMap) + throws IOException { + int num_files = 0; + + Iterator bkt_i = copyMap.keySet().iterator(); + Vector vec = new Vector(); + int totalFiles = 0; + + while (bkt_i.hasNext()) { + String bkt = bkt_i.next(); + Set fileSet = copyMap.get(bkt); + totalFiles += fileSet.size(); + } + int maxFilesPerBkt = 100000000; + if (!batchMode){ + if (totalFiles > maxNumFilesToCopy){ + maxFilesPerBkt = maxNumFilesToCopy/copyMap.keySet().size(); + maxFilesPerBkt++; + } + } + bkt_i = copyMap.keySet().iterator(); + System.err.println("maxNumFilesToCopy="+maxNumFilesToCopy+ " maxFilesPerBkt="+ maxFilesPerBkt+ " totalFiles="+totalFiles); + while (bkt_i.hasNext()) { + String bkt = bkt_i.next(); + Set fileSet = copyMap.get(bkt); + Iterator sf_i = fileSet.iterator(); + int filesPerBkt = 0; + while (sf_i.hasNext()) { + scribeFile sf = sf_i.next(); + vec + .add(bkt + "/" + + ScribeUtils.getScribeFileName(sf, scribeFilePrefix)+","+ targetFileName(sf, bkt)); + filesPerBkt++; + if (filesPerBkt >= maxFilesPerBkt) break; + } + num_files += filesPerBkt; + System.err.println("File per bucket bkt="+bkt+ " num_files="+ filesPerBkt); + + } + if (num_files == 0){ + System.err.println("No files to be copied"); + return 0; + } + int num_loader = parallelism; + + if (num_files < parallelism) + num_loader = num_files; + + int files_per_load = num_files / num_loader; + //limit files per load to 1, number of mappers are limited number of allocated slots + if (files_per_load >= 1){ + num_loader = num_files/1; + files_per_load =1; + } + int gidx = 0; + System.err.println("parallelism="+parallelism+ " num_files="+num_files+" num_loader="+num_loader+" files_per_load="+files_per_load); + for (int i = 0; i < num_loader; i++) { + Path fpath = new Path(dir_name + "/" + i); + //System.err.println("Adding files to "+ fpath); + FSDataOutputStream out = fs.create(fpath); + PrintStream ps = new PrintStream(out); + + int splits_in_this_job = files_per_load; + if (i == (num_loader - 1)) { + splits_in_this_job = vec.size() - gidx; + } + + for (int j = 0; j < splits_in_this_job; j++) { + System.err.println("\t files gidx="+gidx+ " "+ vec.get(gidx).toString()); + ps.println(vec.get(gidx).toString()); + + gidx++; + } + + ps.close(); + } + return num_files; + + } + + /** + * + * Set up and run the mapred job. + * @param rdirname + * @param odirname + * @return + */ + + private int runCopyProcess(String rdirname, String odirname) { + + boolean has_failed = false; + try { + JobConf conf = new JobConf(getConf(), ScribeHdfsCopier.class); + + //conf.setOutputPath(new Path(odirname)); + conf.setInputPath(new Path(rdirname)); + + if (compressCopy == 1) { + SequenceFileOutputFormat + .setOutputCompressorClass(conf, GzipCodec.class); + SequenceFileOutputFormat.setOutputCompressionType(conf, + CompressionType.BLOCK); + FileOutputFormat.setCompressOutput(conf, true); + conf.setMapOutputCompressorClass(GzipCodec.class); + conf.setMapOutputCompressionType(CompressionType.BLOCK); + } + + conf.setOutputFormat(org.apache.hadoop.mapred.lib.NullOutputFormat.class); + conf.setInputFormat(TextInputFormat.class); + + + conf.setMapperClass(MapClass.class); + + // no reduce. + conf.setReducerClass(IdentityReducer.class); + conf.setNumReduceTasks(0); + conf.setSpeculativeExecution(false); + + conf.setJobName(jobNamePrefix + " " +targetCal.getTime()); + + System.err.println("Starting job..."); + RunningJob rj = JobClient.runJob(conf); + has_failed = !rj.isSuccessful(); + + } catch (Exception e) { + has_failed = true; + e.printStackTrace(); + } + + if (has_failed) { + System.err.println("Job Failed"); + return 1; + } + return 0; + } + + /** + * mapred job copies the files to a temp location. commit will copy to final location. + * @param odirname + * @throws IOException + */ + + private void commit(String odirname) throws IOException{ + Map> fileMap = new TreeMap>(); + getFileList(new Path(odirname), fileMap); + Iterator bkt_i = copyMap.keySet().iterator(); + Vector vec = new Vector(); + String scribeDir = conf.get("scribeDirPath", ""); + while (bkt_i.hasNext()) { + String bkt = bkt_i.next(); + Set fileSet = copyMap.get(bkt); + Iterator sf_i = fileSet.iterator(); + while (sf_i.hasNext()) { + scribeFile sf = sf_i.next(); + String srcFile = srcFileName(outputDir, sf, bkt) ; + String inFile = srcFileName(scribeDir, sf, bkt) ; + String targetFile = targetFileName(sf, bkt); + + if (compressCopy == 1) + srcFile = srcFile + ".gz"; + + //System.out.println("Moving file "+ srcFile + " to "+ targetFile) ; + + //Path targetPath = new Path(targetFile); + // fs.mkdirs(targetPath.getParent()); + + + //fs.rename(new Path(srcFile), new Path(targetFile)); + + //if (fs.exists(new Path(targetFile))){ + // System.err.println("To be deleted: "+ inFile); + // deleteFile(new Path(inFile)); + //} + + + } + } + } + + /** + * Create the Done file when a day is done. + * @throws IOException + */ + private void markDone() throws IOException { + //look at target files + srcFileMap.clear(); + + getFileList(scribeDirFile, srcFileMap); + copyMap = getFilesToCopy(srcFileMap, false); + boolean left = isAnyFileLeftToCopy(minDate, copyMap); + + if (left){ + System.err.println("More files left to copy"); + return; + } + + SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd"); + String dateStr = sd.format(minDate); + Path doneDir = new Path(getDoneDir(dateStr, instName)); + System.err.println("Done Dir: "+doneDir); + if (batchMode){ + fs.mkdirs(doneDir); + } + + else { + GregorianCalendar today = new GregorianCalendar(); + Date currDate = new Date(today.get(Calendar.YEAR)-1900, today.get(Calendar.MONTH), today.get(Calendar.DAY_OF_MONTH)); + int markWhenDone = getConf().getInt("markWhenDone", -1); + if (markWhenDone != -1 && (today.get(Calendar.HOUR_OF_DAY) > markWhenDone) && currDate.after(minDate)){ + System.err.println("Making done dir Since it is past "+ markWhenDone); + fs.mkdirs(doneDir); + } + } + //Create the final Done File if all files are done + if (instance_list != null) { + System.err.println("Checking other done files"); + String arr[] = instance_list.split(","); + for (String inst : arr) { + Path doneFile = new Path(getDoneDir(dateStr, inst)); + if (!fs.exists(doneFile)) { + System.err.println("Does not exists :" + doneFile); + return; + } + } + Path finalDoneFile = new Path(getDoneDir(dateStr, null)); + System.err.println("Fianl Done file :" + finalDoneFile); + if (!fs.exists(finalDoneFile)) + fs.mkdirs(finalDoneFile); + + } + } + + + public void setConf(Configuration conf) { + this.conf = conf; + scribeFilePrefix = conf.get("scribeFilePrefix", ""); + + try { + fs = FileSystem.get(conf); + + parallelism = conf.getInt("parallelism", 10); + scribeDir = conf.get("scribeDirPath", ""); + scribeDirFile = new Path(scribeDir); + srcFs = scribeDirFile.getFileSystem(conf); + instName = ""; + if (conf.get("copierInstanceName") != null) { + instName = conf.get("copierInstanceName"); + } + if (batchMode) { + jobNamePrefix = "Recovery mode " + scribeFilePrefix + " Copier " + instName; + } else { + jobNamePrefix = scribeFilePrefix + " Copier " + instName; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Configuration getConf() { + return conf; + } + + /** + * get target file name. Need to add instance name to target directory. + * @param bkt + * @return + */ + private String getTargetBktName( String bkt){ + String prefix ; + if (instName != null && instName.length() > 0) + return "inst="+instName + "/host="+ bkt; + return "inst=NULL/host="+ bkt; + } + + void getTargetFileList() throws IOException{ + String targetFileName = targetDir + "/ds=*"+ + "/time=*/"+getTargetBktName("*") + "/*"; + + if (compressCopy == 1){ + targetFileName = targetFileName + ".gz"; + } + System.err.println("glob : "+targetFileName); + FileStatus fileStats[] = fs.globStatus(new Path(targetFileName)); + for (FileStatus f : fileStats){ + String name = f.getPath().getParent().getName()+"/"+f.getPath().getName(); + //System.err.println("Adding to set "+ name); + targetFileSet.add(name); + + } + } + + + private boolean isTargetFileExits(scribeFile sf,String bkt ) throws IOException + { + String targetFileName = "host="+bkt+"/"+ + ScribeUtils.getScribeFileName(sf, scribeFilePrefix); + if (compressCopy == 1){ + targetFileName = targetFileName + ".gz"; + } + + + //System.err.println("Checking files exists : "+targetFileName); + + //FileStatus fileStats[] = fs.globStatus(new Path(targetFileName)); + //if (fileStats.length == 0) + // return false; + + if (targetFileSet.contains(targetFileName)){ + //System.err.println("Checked files exists : "+targetFileName); + return true; + } + return false; + } + private String targetFileName(scribeFile sf,String bkt ) { + String targetFileName = targetDir + "/ds="+ + ScribeUtils.getDateString(sf)+ "/time="+ currentTime+"/"+getTargetBktName(bkt) + "/"+ + ScribeUtils.getScribeFileName(sf, scribeFilePrefix); + if (compressCopy == 1){ + targetFileName = targetFileName + ".gz"; + } + return targetFileName; + } + + private String srcFileName(String srcDir,scribeFile sf,String bkt ){ + String srcFile = srcDir + "/"+ bkt + "/"+ + ScribeUtils.getScribeFileName(sf, scribeFilePrefix); + return srcFile; + } + + + protected Date scribeFileToDate(scribeFile sf) { + // Scribe months are 1-12 - shift to 0-11 range + return new Date(sf.getYear()-1900, sf.getMonth() - 1, sf.getDay()); + } + + protected boolean alreadyRunning() throws IOException { + if (batchMode) + return false; + + if (dryrun) + return false; + + JobClient jc = new JobClient(new JobConf(getConf(), ScribeHdfsCopier.class)); + JobStatus[] jobs = jc.jobsToComplete(); + for (JobStatus j : jobs) { + String id = j.getJobId(); + RunningJob rj = jc.getJob(id); + if (rj.getJobName().indexOf(jobNamePrefix) == 0) + return true; + } + return false; + } + + + + private boolean isAnyFileLeftToCopy(Date minDate, Map> copyMap){ + Iterator bkt_i = copyMap.keySet().iterator(); + while (bkt_i.hasNext()) { + String bkt = bkt_i.next(); + Set fileSet = copyMap.get(bkt); + Iterator sf_i = fileSet.iterator(); + while (sf_i.hasNext()) { + scribeFile sf = sf_i.next(); + if (scribeFileToDate(sf).equals(minDate)){ + System.err.println("File not finished "+ bkt + " "+ sf); + return true; + } + } + } + return false; + } + + + private String getDoneDir(String dateStr , String instName) { + String common; + if (instName == null || instName.length() == 0) { + common = "ds=" + dateStr + "/" + "done/done/done"; + } else { + common = "ds=" + dateStr + "/" + + instName + "-" + "done/done/done"; + } + return targetDir + "/" + common; + + } + + + /** + * Innner Runner class which parses arguments and calls ScribeHdfsCopier + * @author suresh + * + */ + + public static class ScribeCopierRunner extends Configured implements Tool { + + protected static int printUsage() { + System.out.println("Usage: ScribeCopier -loc -ds -"); + System.out.println("Defaults: date=today"); + System.out.println(" targetDir=/user/facebook/scribe_staging//ds="); + System.out.println("scribe-ds-name ==> file name prefix in scribe"); + System.out.println("scribe-dir ==> folder where this scribe data set is stored. nfs mounted on all hadoop nodes!"); + System.out.println("Advanced Arguments: "); + System.out.println("-m ==> will use specified number of mappers for doing the copy. By default this is capped at 8 and derived automatically from amount of data to be copied"); + System.out.println("-compressCopy 0|1 ==> whether to compress the output data or not. On by default"); + System.out.println("-batchMode ==> When you run for previous day, when you pass --date you must pass in -batchMode"); + System.out.println("-instance_list ==> list of copier instances."); + return (1); + } + + public int run(String[] args) throws Exception { + + String scribeDirName = null, scribeFilePrefix = null; + Date targetDate = null; + SimpleDateFormat dparser = new SimpleDateFormat("yyyy-MM-dd"); + int parallelism = 10; + int compressCopy = 1; + + String copierInstanceName = null; + String targetDir = null; + + String instance_list = null; + boolean batchMode = false; + boolean testMode = false; + boolean forceCopy = false; + int markWhenDone = -1; + int maxNumFilesToCopy = 150; + boolean deleteWhenDone = false; + FileSystem srcFs = null; + boolean dryrun = false; + + String deleteCmd = null; + // reading arguments + for (int i = 0; i < args.length; ++i) { + try { + if ("-loc".equalsIgnoreCase(args[i])) { + scribeDirName = args[++i]; + } else if ("-date".equalsIgnoreCase(args[i])) { + targetDate = dparser.parse(args[++i], new ParsePosition(0)); + } else if ("-ds".equalsIgnoreCase(args[i])) { + scribeFilePrefix = args[++i]; + } else if ("-instance_name".equalsIgnoreCase(args[i])) { + copierInstanceName = args[++i]; + } else if ("-m".equalsIgnoreCase(args[i])) { + parallelism = Integer.parseInt(args[++i]); + } else if ("-deleteCmd".equalsIgnoreCase(args[i])) { + deleteCmd = args[++i]; + } else if ("-compressCopy".equalsIgnoreCase(args[i])) { + compressCopy = Integer.parseInt(args[++i]); + if ((compressCopy != 0) && (compressCopy != 1)) { + System.err.println("Invalid argument for -compressCopy :" + + compressCopy); + return (printUsage()); + } + } else if ("-markWhenDone".equalsIgnoreCase(args[i])) { + markWhenDone = Integer.parseInt(args[++i]); + } else if ("-deleteWhenDone".equalsIgnoreCase(args[i])) { + deleteWhenDone = true; + } else if ("-batchMode".equalsIgnoreCase(args[i])) { + batchMode = true; + } else if ("-maxNumFilesToCopy".equalsIgnoreCase(args[i])) { + maxNumFilesToCopy = Integer.parseInt(args[++i]); + } + else if ("-dryrun".equalsIgnoreCase(args[i])) { + dryrun = true; + } + else if ("-targetDir".equalsIgnoreCase(args[i])) { + targetDir = args[++i]; + } else if ("-instance_list".equalsIgnoreCase(args[i])) { + instance_list = args[++i]; + } else { + System.err.println("Unknown argument: " + args[i]); + return (printUsage()); + } + + } catch (ArrayIndexOutOfBoundsException except) { + System.err.println("ERROR:Required parameter missing in " + + args[i - 1]); + return (printUsage()); + } catch (NumberFormatException e) { + System.err.println("ERROR:Incorrect parameter," + + "expecting number: " + args[i - 1]); + return (printUsage()); + } + } + + if (scribeDirName == null || scribeFilePrefix == null) { + System.err.println("Missing argument: " + + ((scribeDirName == null) ? "-loc" : + + ((scribeFilePrefix == null) ? "-ds" : ""))); + return (printUsage()); + } + + if (!scribeDirName.contains(":")) { + scribeDirName = "file://" + scribeDirName; + + } + + srcFs = new Path(scribeDirName).getFileSystem(getConf()); + + if(targetDir == null) { + if (testMode) { + System.out.println("In test mode - writing data to: /user/facebook/test_scribe_staging"); + targetDir = "/user/facebook/test_scribe_staging/"+scribeFilePrefix; + } else { + targetDir = "/user/facebook/scribe_staging/"+scribeFilePrefix; + } + + } + + // set some global variables in the conf so that they are available to all + getConf().set("scribeDirPath", scribeDirName); + getConf().set("scribeFilePrefix", scribeFilePrefix); + getConf().setInt("parallelism", parallelism); + getConf().setBoolean("testMode", testMode); + getConf().setInt("markWhenDone", markWhenDone); + getConf().setBoolean("deleteWhenDone", deleteWhenDone); + if (deleteCmd != null) + getConf().set("deleteCmd", deleteCmd); + + if (copierInstanceName != null) + getConf().set("copierInstanceName", copierInstanceName); + getConf().setInt("mapred.task.timeout", 8 * 60 * 60 * 1000); // make sure + // copiers + // don't + // timeout = + // 8hr + getConf().setInt("mapred.min.split.size", 1000000000); // make sure input + // files are not + // split + + ScribeHdfsCopier sc = new ScribeHdfsCopier(targetDate, compressCopy, + targetDir, batchMode, forceCopy, instance_list, maxNumFilesToCopy, dryrun); + sc.setConf(getConf()); + return (sc.run()); + } + } + + public static void main(String[] args) { + try { + int res = ToolRunner.run(new Configuration(), new ScribeCopierRunner(), + args); + System.exit(res); + } catch (Throwable e) { + e.printStackTrace(); + System.exit(1); + } + } + + + + /** + * Inner Map class which implemnets the map() interface used by scribeCopier to copy files. + * @author suresh + * + * @param + * @param + */ + + public static class MapClass + extends MapReduceBase + implements Mapper { + + public static final String tmpSuffix = ".__tmp"; + protected static final Log LOG = LogFactory + .getLog(MapClass.class.getName()); + byte[] buf; + OutputStream fileOut; + BufferedInputStream bis; + JobConf jc; + Path tempDest; + Path finalDest; + Random randGen = new Random(); + boolean aborted = false; + FileSystem fs; + boolean testMode; + String scribeDir; + String ext; + CompressionCodec codec; + String outdir; + boolean compressed; + String finalFile = null; + + protected enum Counter { + } + + public void deleteFile(Path file) throws IOException { + boolean deleteWhenDone = jc.getBoolean("deleteWhenDone", false); + String deleteCmd = jc.get("deleteCmd", null); + + + if (!deleteWhenDone) return; + if (deleteCmd == null){ + LOG.info("Deleting file: "+ file); + fs.delete(file, false); + } + else { + + URI uri = file.toUri(); + String path = uri.getRawPath(); + String cmd = deleteCmd + " dfs -rmr "+path; + LOG.info("Delete Cmd:"+ cmd); + Process p = Runtime.getRuntime().exec(cmd); + try{ + p.waitFor(); + } + catch (Exception e) { + e.printStackTrace(); + } + int ret =p.exitValue(); + if (ret != 0) + LOG.info("Delete Cmd Failed:"+ ret); + + + } + } + + public void configure(JobConf job) { + try { + jc = job; + buf = new byte[job.getInt("io.file.buffer.size", 4096)]; + testMode = job.getBoolean("testMode", false); + scribeDir = job.get("scribeDirPath"); + fs = FileSystem.get(job); + compressed = FileOutputFormat.getCompressOutput(job); + outdir = jc.get("outdir"); + String scribeDir = jc.get("scribeDirPath"); + + if (compressed) { + LOG.info("Out file compressed"); + Class codecClass = FileOutputFormat + .getOutputCompressorClass(jc, DefaultCodec.class); + codec = (CompressionCodec) ReflectionUtils + .newInstance(codecClass, jc); + ext = codec.getDefaultExtension(); + + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + public void map(LongWritable key, Text value, OutputCollector output, + Reporter reporter) throws IOException { + String arr[] = value.toString().split(","); + String inFile = scribeDir + "/" + arr[0]; + Path fname = new Path(inFile); + finalFile = arr[1]; + try { + final long reportInterval = 1L << 25; + long bytesSinceLastReport = 0L; + long bytesLastReported = 0L; + + finalFile = arr[1]; + + LOG.info("Input File "+ inFile); + LOG.info("File File "+ finalFile); + finalDest = new Path(finalFile); + if (fs.exists(finalDest)){ + LOG.info("final file Already exists:"+finalDest ); + deleteFile(fname); + return; + } + + if (!compressed) { + + tempDest = new Path(outdir, arr[0]+ "." + randGen.nextInt() + + tmpSuffix); + + fs.mkdirs(finalDest.getParent()); + fs.mkdirs(tempDest.getParent()); + if (fs.exists(tempDest)) + fs.delete(tempDest); + fileOut = fs.create(tempDest); + } else { + + + tempDest = new Path(outdir, arr[0] + ext + randGen.nextInt() + + tmpSuffix); + + fs.mkdirs(finalDest.getParent()); + fs.mkdirs(tempDest.getParent()); + if (fs.exists(tempDest)) + fs.delete(tempDest); + + fileOut = codec.createOutputStream(fs.create(tempDest)); + } + + LOG.info("Output file tmp file:"+tempDest ); + LOG.info("Output final file:"+finalDest ); + FileSystem srcFs = fname.getFileSystem(jc); + FSDataInputStream inputFile = srcFs.open(fname); + long flen = srcFs.getFileStatus(fname).getLen(); + + LOG.info("File length :"+flen); + long endBytes = flen; + + long beginBytes = 0; + while (true) { + int bytesRead; + try { + bytesRead = inputFile.read(buf, 0, buf.length); + if (bytesRead == -1) { + break; + } + + fileOut.write(buf, 0, bytesRead); + beginBytes += bytesRead; + + bytesSinceLastReport += bytesRead; + if (bytesSinceLastReport > reportInterval) { + bytesLastReported += bytesSinceLastReport; + reporter.setStatus("Copied " + bytesLastReported + " of " + + endBytes + " from " + fname); + bytesSinceLastReport = 0L; + } + + } catch (IOException e) { + LOG.info("Exception Reached trying to copy " + + Math.min(buf.length, (endBytes - beginBytes + 1)) + + " bytes from offset " + beginBytes); + LOG.info("FD=" + inputFile.toString()); + throw e; + } + + } + fileOut.close(); + + } catch (IOException e) { + aborted = true; + LOG.error("IO Error - deleting temporary output file"); + // try to do as much cleanup as possible + try { + fileOut.close(); + } catch (IOException eprime) { + } + fs.delete(tempDest); + throw e; + } + + LOG.info("Renaming file "+ tempDest + " to "+finalDest ); + boolean ret = fs.rename(tempDest, finalDest) ; + if (!ret){ + LOG.info("Renaming FAILED "+ tempDest + " to "+finalDest ); + } + else + { + deleteFile(fname); + } + + } + } + + + +}