Uploaded image for project: 'Chukwa'
  1. Chukwa
  2. CHUKWA-17

Enhance process information collection

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    • Environment:

      Redhat EL 5.1, Java 6

      Description

      We are currently getting the process information from "top". And a given process looks like,
      28288 gmon 18 0 2225m 277m 8876 S 0 7.0 1:38.56 java

      However, that's still not complete. For it to be truly useful, we would want something like,
      gmon 28288 28244 0 Nov13 ? 00:01:38 /grid/0/java/jdk/bin/java -Xms1000M -Xmx2000M -DAPP=collector
      -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=/gri........
      -DCHUKWA_CONF_DIR=/gri......./../conf -DCHUKWA_LOG_DIR=/grid.........bin/../var/log -classpath
      :/grid..........

      We can get those information by using command below,

      ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd

        Activity

        Hide
        asrabkin Ari Rabkin added a comment -

        Sounds like a good idea

        Show
        asrabkin Ari Rabkin added a comment - Sounds like a good idea
        Hide
        zhangyongjiang Cheng added a comment -

        Index: src/contrib/chukwa/bin/exec-data-loader.sh
        ===================================================================
        — src/contrib/chukwa/bin/exec-data-loader.sh (revision 751021)
        +++ src/contrib/chukwa/bin/exec-data-loader.sh (working copy)
        @@ -35,6 +35,9 @@
        if [ "X$PARM" == "Xtop" ]; then
        kill -9 `cat $

        {CHUKWA_PID_DIR}/Top-data-loader.pid`
        fi
        + if [ "X$PARM" == "Xps" ]; then
        + kill -9 `cat ${CHUKWA_PID_DIR}

        /Ps-data-loader.pid`
        + fi
        if [ "X$PARM" == "Xdf" ]; then
        kill -9 `cat $

        {CHUKWA_PID_DIR}/Df-data-loader.pid`
        fi
        @@ -71,6 +74,10 @@
        ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c &
        fi

        +if [ "X$PARM" == "Xps" ]; then
        + ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo pid,user,vsize,size,pcpu,pmem,time,start,cmd &
        +fi
        +
        if [ "X$PARM" == "Xdf" ]; then
        ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -l &
        fi
        Index: src/contrib/chukwa/bin/systemDataLoader.sh
        ===================================================================
        — src/contrib/chukwa/bin/systemDataLoader.sh (revision 751023)
        +++ src/contrib/chukwa/bin/systemDataLoader.sh (working copy)
        @@ -78,6 +78,23 @@
        fi

        EXISTS=0
        + pidFile="${CHUKWA_PID_DIR}

        /Ps-data-loader.pid"
        + if [ -f $pidFile ]; then
        + pid=`head $

        {pidFile}`
        + ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
        + if [ $ChildPIDRunningStatus -ge 1 ]; then
        + EXISTS=1
        + fi
        + fi
        +
        + if [ ${EXISTS} -lt 1 ]; then
        + echo "ps data loader is stopped."
        + RESULT=1
        + else
        + echo "ps data loader is running."
        + fi
        +
        + EXISTS=0
        pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid"
        if [ -f $pidFile ]; then
        pid=`head ${pidFile}

        `
        @@ -125,6 +142,9 @@
        if [ -f $

        {CHUKWA_PID_DIR}/Top-data-loader.pid ]; then
        kill -9 `cat ${CHUKWA_PID_DIR}

        /Top-data-loader.pid`
        fi
        + if [ -f $

        {CHUKWA_PID_DIR}/Ps-data-loader.pid ]; then
        + kill -9 `cat ${CHUKWA_PID_DIR}

        /Ps-data-loader.pid`
        + fi
        if [ -f $

        {CHUKWA_PID_DIR}/Df-data-loader.pid ]; then
        kill -9 `cat ${CHUKWA_PID_DIR}

        /Df-data-loader.pid`
        fi
        @@ -200,6 +220,20 @@
        fi

        EXISTS=0
        +pidFile="$

        {CHUKWA_PID_DIR}/Ps-data-loader.pid"
        +if [ -f $pidFile ]; then
        + pid=`head ${pidFile}`
        + ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
        + if [ $ChildPIDRunningStatus -ge 1 ]; then
        + EXISTS=1
        + fi
        +fi
        +
        +if [ ${EXISTS} -lt 1 ]; then
        + ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo pid,user,vsize,size,pcpu,pmem,time,start,cmd &
        +fi
        +
        +EXISTS=0
        pidFile="${CHUKWA_PID_DIR}

        /Df-data-loader.pid"
        if [ -f $pidFile ]; then
        pid=`head $

        {pidFile}

        `
        Index: src/contrib/chukwa/conf/chukwa-demux-conf.xml
        ===================================================================
        — src/contrib/chukwa/conf/chukwa-demux-conf.xml (revision 751024)
        +++ src/contrib/chukwa/conf/chukwa-demux-conf.xml (working copy)
        @@ -81,6 +81,12 @@
        </property>

        <property>
        + <name>Ps</name>
        + <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps</value>
        + <description>Parser class for </description>
        + </property>
        +
        + <property>
        <name>Torque</name>
        <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Torque</value>
        <description>Parser class for Parsing qstat and tracejob</description>
        Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
        ===================================================================
        — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java (revision 0)
        +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java (revision 0)
        @@ -0,0 +1,47 @@
        +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
        +
        +import java.text.ParseException;
        +import java.text.SimpleDateFormat;
        +import java.util.Date;
        +
        +public class LogEntry {
        + private final static SimpleDateFormat sdf = new SimpleDateFormat(
        + "yyyy-MM-dd HH:mm");
        +
        + private Date date;
        + private String logLevel;
        + private String className;
        + private String body;
        +
        + public LogEntry(String recordEntry) throws ParseException

        { + String dStr = recordEntry.substring(0, 23); + date = sdf.parse(dStr); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + logLevel = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + className = recordEntry.substring(start, idx - 1); + body = recordEntry.substring(idx + 1); + }

        +
        + public Date getDate()

        { + return date; + }

        +
        + public void setDate(Date date)

        { + this.date = date; + }

        +
        + public String getLogLevel()

        { + return logLevel; + }

        +
        + public String getClassName()

        { + return className; + }

        +
        + public String getBody()

        { + return body; + }

        +}
        Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
        ===================================================================
        — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java (revision 0)
        +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java (revision 0)
        @@ -0,0 +1,142 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
        +
        +import java.util.ArrayList;
        +import java.util.HashMap;
        +import java.util.HashSet;
        +import java.util.Map.Entry;
        +import java.util.regex.Matcher;
        +import java.util.regex.Pattern;
        +
        +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
        +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
        +import org.apache.hadoop.mapred.OutputCollector;
        +import org.apache.hadoop.mapred.Reporter;
        +import org.apache.log4j.Logger;
        +
        +public class Ps extends AbstractProcessor {
        + static Logger log = Logger.getLogger(Ps.class);
        +
        + @Override
        + protected void parse(String recordEntry,
        + OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
        + Reporter reporter) throws Throwable {
        + LogEntry log = new LogEntry(recordEntry);
        + PsOutput ps = new PsOutput(log.getBody());
        + for (HashMap<String, String> processInfo : ps.getProcessList()) {
        + key = new ChukwaRecordKey();
        + ChukwaRecord record = new ChukwaRecord();
        + this.buildGenericRecord(record, null, log.getDate().getTime(), "Ps");
        + for (Entry<String, String> entry : processInfo.entrySet())

        { + record.add(entry.getKey(), entry.getValue()); + }

        + output.collect(key, record);
        + }
        + }
        +
        + public static class PsOutput {
        +
        + // processes info
        + private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>();
        +
        + public PsOutput(String psCmdOutput) throws InvalidPsRecord {
        + if (psCmdOutput == null || psCmdOutput.length() == 0)
        + return;
        +
        + String[] lines = psCmdOutput.split("[\n\r]+");
        +
        + // at least two lines
        + if (lines.length < 2)
        + return;
        +
        + // header
        + ArrayList<String> header = new ArrayList<String>();
        + Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]);
        + while (matcher.find())

        { + header.add(matcher.group(0)); + }

        + if (!header.get(header.size() - 1).equals("CMD"))

        { + throw new InvalidPsRecord("CMD must be the last column"); + }

        +
        + // records
        + boolean foundInitCmd = false;
        + for (int line = 1; line < lines.length; line++) {
        + HashMap<String, String> record = new HashMap<String, String>();
        + recordList.add(record);
        +
        + matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]);
        + for(int index = 0; index<header.size(); index++) {
        + String key = header.get(index);
        + matcher.find();
        + if (!key.equals("CMD")) {
        + String value = matcher.group(0);
        + /**
        + * For STARTED column, it could be in two formats: "MMM dd" or "hh:mm:ss".
        + * If we use ' ' as the delimiter, we must read twice to the date if it's
        + * with "MMM dd" format.
        + */
        + if (key.equals("STARTED")) {
        + char c = value.charAt(0);
        + if ( c < '0' || c > '9')

        { + matcher.find(); + value += matcher.group(0); + }

        + }
        + record.put(key, value);
        + } else

        { + // reached the cmd part. all remains should be put + // together as the command + String value = lines[line].substring(matcher.start()); + record.put(key, value); + if(!foundInitCmd) + foundInitCmd = value.startsWith("init"); + break; + }

        + }
        + }
        + if(!foundInitCmd)
        + throw new InvalidPsRecord("Did not find 'init' cmd");
        + }
        +
        + public ArrayList<HashMap<String, String>> getProcessList()

        { + return recordList; + }

        + }
        +
        + public static class InvalidPsRecord extends Exception {
        + private static final long serialVersionUID = 1L;
        +
        + public InvalidPsRecord()

        { + }

        +
        + public InvalidPsRecord(String arg0)

        { + super(arg0); + }
        +
        + public InvalidPsRecord(Throwable arg0) { + super(arg0); + }

        +
        + public InvalidPsRecord(String arg0, Throwable arg1)

        { + super(arg0, arg1); + }

        + }
        +}
        Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
        ===================================================================
        — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (revision 749503)
        +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (working copy)
        @@ -85,20 +85,6 @@
        this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
        output.collect(key, record);

        • StringBuffer buffer = new StringBuffer();
        • //FIXME please validate this
        • while (i < lines.length) { - record = null; - buffer.append(lines[i]+"\n"); - i++; - - }
        • record = new ChukwaRecord();
        • key = new ChukwaRecordKey();
        • this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
        • //Output Top info to database
        • output.collect(key, record);
          -
          // End of parsing
          } catch (Exception e)
          {
          Index: src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
          ===================================================================
            • src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java (revision 0)
              +++ src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java (revision 0)
              @@ -0,0 +1,79 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one
              + * or more contributor license agreements. See the NOTICE file
              + * distributed with this work for additional information
              + * regarding copyright ownership. The ASF licenses this file
              + * to you under the Apache License, Version 2.0 (the
              + * "License"); you may not use this file except in compliance
              + * with the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ps;
              +
              +import java.io.IOException;
              +import java.io.InputStream;
              +import java.util.ArrayList;
              +import java.util.HashMap;
              +import java.util.Map.Entry;
              +
              +import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.InvalidPsRecord;
              +import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.PsOutput;
              +
              +import junit.framework.TestCase;
              +
              +public class PsOutputTest extends TestCase {
              +
              + public void testGetRecordListFromPsCmd() throws IOException, InvalidPsRecord {
              + Runtime runtime = Runtime.getRuntime();
              + Process process = runtime.exec("ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd");
              + StringBuffer sb = new StringBuffer();
              + InputStream is = process.getInputStream();
              + byte[] buffer = new byte[1024];
              + while(true) { + int len = is.read(buffer); + if(len == -1) + break; + sb.append(new String(buffer, 0, len)); + }

              + String output = sb.toString();
              +
              + PsOutput pso = new PsOutput(output);
              +
              + // search init process
              + for(HashMap<String, String> processInfo : pso.getProcessList()) {
              + for(Entry<String, String> entry : processInfo.entrySet())

              Unknown macro: {+ if(entry.getKey().equals("CMD") && entry.getValue().startsWith("init")) { + return; + }+ }

              + }
              +
              + throw new InvalidPsRecord(output);
              + }
              +
              + public void testGetRecordList() throws IOException, InvalidPsRecord

              { + // below is from command "ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd" + String output = + " PID USER VSZ SZ %CPU %MEM TIME START STARTED CMD\n" + + " 1 root 2064 284 0.0 0.0 00:00:02 2008 Dec 29 init [5]\n" + + " 2 root 0 0 0.0 0.0 00:00:01 2008 Dec 29 [migration/0]\n" + + "20270 chzhang 4248 588 0.0 0.0 00:00:00 15:32 15:32:36 ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd\n" + + "28371 angelac2 7100 1716 0.0 0.0 00:00:00 Feb27 Feb 27 /usr/libexec/gconfd-2 5\n"; + + PsOutput pso = new PsOutput(output); + ArrayList<HashMap<String, String>> processes = pso.getProcessList(); + assertEquals(4, processes.size()); + assertEquals("Dec29", processes.get(0).get("STARTED")); + assertEquals("15:32:36", processes.get(2).get("STARTED")); + assertEquals("ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd", processes.get(2).get("CMD")); + }

              +
              +}
              Index: src/contrib/chukwa/tools/service/chukwa-ps/run
              ===================================================================

            • src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0)
              +++ src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0)
              @@ -0,0 +1,5 @@
              +#!/bin/sh
              +CHUKWA_CONF_DIR=/usr/local/chukwa/conf
              +
              +exec setuidgid gmon /usr/local/chukwa/bin/exec-data-loader.sh --config $CHUKWA_CONF_DIR ps
              +
        Show
        zhangyongjiang Cheng added a comment - Index: src/contrib/chukwa/bin/exec-data-loader.sh =================================================================== — src/contrib/chukwa/bin/exec-data-loader.sh (revision 751021) +++ src/contrib/chukwa/bin/exec-data-loader.sh (working copy) @@ -35,6 +35,9 @@ if [ "X$PARM" == "Xtop" ]; then kill -9 `cat $ {CHUKWA_PID_DIR}/Top-data-loader.pid` fi + if [ "X$PARM" == "Xps" ]; then + kill -9 `cat ${CHUKWA_PID_DIR} /Ps-data-loader.pid` + fi if [ "X$PARM" == "Xdf" ]; then kill -9 `cat $ {CHUKWA_PID_DIR}/Df-data-loader.pid` fi @@ -71,6 +74,10 @@ ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c & fi +if [ "X$PARM" == "Xps" ]; then + ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo pid,user,vsize,size,pcpu,pmem,time,start,cmd & +fi + if [ "X$PARM" == "Xdf" ]; then ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -l & fi Index: src/contrib/chukwa/bin/systemDataLoader.sh =================================================================== — src/contrib/chukwa/bin/systemDataLoader.sh (revision 751023) +++ src/contrib/chukwa/bin/systemDataLoader.sh (working copy) @@ -78,6 +78,23 @@ fi EXISTS=0 + pidFile="${CHUKWA_PID_DIR} /Ps-data-loader.pid" + if [ -f $pidFile ]; then + pid=`head $ {pidFile}` + ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l` + if [ $ChildPIDRunningStatus -ge 1 ]; then + EXISTS=1 + fi + fi + + if [ ${EXISTS} -lt 1 ]; then + echo "ps data loader is stopped." + RESULT=1 + else + echo "ps data loader is running." + fi + + EXISTS=0 pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid" if [ -f $pidFile ]; then pid=`head ${pidFile} ` @@ -125,6 +142,9 @@ if [ -f $ {CHUKWA_PID_DIR}/Top-data-loader.pid ]; then kill -9 `cat ${CHUKWA_PID_DIR} /Top-data-loader.pid` fi + if [ -f $ {CHUKWA_PID_DIR}/Ps-data-loader.pid ]; then + kill -9 `cat ${CHUKWA_PID_DIR} /Ps-data-loader.pid` + fi if [ -f $ {CHUKWA_PID_DIR}/Df-data-loader.pid ]; then kill -9 `cat ${CHUKWA_PID_DIR} /Df-data-loader.pid` fi @@ -200,6 +220,20 @@ fi EXISTS=0 +pidFile="$ {CHUKWA_PID_DIR}/Ps-data-loader.pid" +if [ -f $pidFile ]; then + pid=`head ${pidFile}` + ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l` + if [ $ChildPIDRunningStatus -ge 1 ]; then + EXISTS=1 + fi +fi + +if [ ${EXISTS} -lt 1 ]; then + ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo pid,user,vsize,size,pcpu,pmem,time,start,cmd & +fi + +EXISTS=0 pidFile="${CHUKWA_PID_DIR} /Df-data-loader.pid" if [ -f $pidFile ]; then pid=`head $ {pidFile} ` Index: src/contrib/chukwa/conf/chukwa-demux-conf.xml =================================================================== — src/contrib/chukwa/conf/chukwa-demux-conf.xml (revision 751024) +++ src/contrib/chukwa/conf/chukwa-demux-conf.xml (working copy) @@ -81,6 +81,12 @@ </property> <property> + <name>Ps</name> + <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps</value> + <description>Parser class for </description> + </property> + + <property> <name>Torque</name> <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Torque</value> <description>Parser class for Parsing qstat and tracejob</description> Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java =================================================================== — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java (revision 0) +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java (revision 0) @@ -0,0 +1,47 @@ +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class LogEntry { + private final static SimpleDateFormat sdf = new SimpleDateFormat( + "yyyy-MM-dd HH:mm"); + + private Date date; + private String logLevel; + private String className; + private String body; + + public LogEntry(String recordEntry) throws ParseException { + String dStr = recordEntry.substring(0, 23); + date = sdf.parse(dStr); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + logLevel = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + className = recordEntry.substring(start, idx - 1); + body = recordEntry.substring(idx + 1); + } + + public Date getDate() { + return date; + } + + public void setDate(Date date) { + this.date = date; + } + + public String getLogLevel() { + return logLevel; + } + + public String getClassName() { + return className; + } + + public String getBody() { + return body; + } +} Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java =================================================================== — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java (revision 0) +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java (revision 0) @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +public class Ps extends AbstractProcessor { + static Logger log = Logger.getLogger(Ps.class); + + @Override + protected void parse(String recordEntry, + OutputCollector<ChukwaRecordKey, ChukwaRecord> output, + Reporter reporter) throws Throwable { + LogEntry log = new LogEntry(recordEntry); + PsOutput ps = new PsOutput(log.getBody()); + for (HashMap<String, String> processInfo : ps.getProcessList()) { + key = new ChukwaRecordKey(); + ChukwaRecord record = new ChukwaRecord(); + this.buildGenericRecord(record, null, log.getDate().getTime(), "Ps"); + for (Entry<String, String> entry : processInfo.entrySet()) { + record.add(entry.getKey(), entry.getValue()); + } + output.collect(key, record); + } + } + + public static class PsOutput { + + // processes info + private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>(); + + public PsOutput(String psCmdOutput) throws InvalidPsRecord { + if (psCmdOutput == null || psCmdOutput.length() == 0) + return; + + String[] lines = psCmdOutput.split(" [\n\r] +"); + + // at least two lines + if (lines.length < 2) + return; + + // header + ArrayList<String> header = new ArrayList<String>(); + Matcher matcher = Pattern.compile(" [^ ^\t] +").matcher(lines [0] ); + while (matcher.find()) { + header.add(matcher.group(0)); + } + if (!header.get(header.size() - 1).equals("CMD")) { + throw new InvalidPsRecord("CMD must be the last column"); + } + + // records + boolean foundInitCmd = false; + for (int line = 1; line < lines.length; line++) { + HashMap<String, String> record = new HashMap<String, String>(); + recordList.add(record); + + matcher = Pattern.compile(" [^ ^\t] +").matcher(lines [line] ); + for(int index = 0; index<header.size(); index++) { + String key = header.get(index); + matcher.find(); + if (!key.equals("CMD")) { + String value = matcher.group(0); + /** + * For STARTED column, it could be in two formats: "MMM dd" or "hh:mm:ss". + * If we use ' ' as the delimiter, we must read twice to the date if it's + * with "MMM dd" format. + */ + if (key.equals("STARTED")) { + char c = value.charAt(0); + if ( c < '0' || c > '9') { + matcher.find(); + value += matcher.group(0); + } + } + record.put(key, value); + } else { + // reached the cmd part. all remains should be put + // together as the command + String value = lines[line].substring(matcher.start()); + record.put(key, value); + if(!foundInitCmd) + foundInitCmd = value.startsWith("init"); + break; + } + } + } + if(!foundInitCmd) + throw new InvalidPsRecord("Did not find 'init' cmd"); + } + + public ArrayList<HashMap<String, String>> getProcessList() { + return recordList; + } + } + + public static class InvalidPsRecord extends Exception { + private static final long serialVersionUID = 1L; + + public InvalidPsRecord() { + } + + public InvalidPsRecord(String arg0) { + super(arg0); + } + + public InvalidPsRecord(Throwable arg0) { + super(arg0); + } + + public InvalidPsRecord(String arg0, Throwable arg1) { + super(arg0, arg1); + } + } +} Index: src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java =================================================================== — src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (revision 749503) +++ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (working copy) @@ -85,20 +85,6 @@ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); output.collect(key, record); StringBuffer buffer = new StringBuffer(); //FIXME please validate this while (i < lines.length) { - record = null; - buffer.append(lines[i]+"\n"); - i++; - - } record = new ChukwaRecord(); key = new ChukwaRecordKey(); this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top"); //Output Top info to database output.collect(key, record); - // End of parsing } catch (Exception e) { Index: src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java =================================================================== src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java (revision 0) +++ src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java (revision 0) @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ps; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.InvalidPsRecord; +import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.PsOutput; + +import junit.framework.TestCase; + +public class PsOutputTest extends TestCase { + + public void testGetRecordListFromPsCmd() throws IOException, InvalidPsRecord { + Runtime runtime = Runtime.getRuntime(); + Process process = runtime.exec("ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd"); + StringBuffer sb = new StringBuffer(); + InputStream is = process.getInputStream(); + byte[] buffer = new byte [1024] ; + while(true) { + int len = is.read(buffer); + if(len == -1) + break; + sb.append(new String(buffer, 0, len)); + } + String output = sb.toString(); + + PsOutput pso = new PsOutput(output); + + // search init process + for(HashMap<String, String> processInfo : pso.getProcessList()) { + for(Entry<String, String> entry : processInfo.entrySet()) Unknown macro: {+ if(entry.getKey().equals("CMD") && entry.getValue().startsWith("init")) { + return; + }+ } + } + + throw new InvalidPsRecord(output); + } + + public void testGetRecordList() throws IOException, InvalidPsRecord { + // below is from command "ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd" + String output = + " PID USER VSZ SZ %CPU %MEM TIME START STARTED CMD\n" + + " 1 root 2064 284 0.0 0.0 00:00:02 2008 Dec 29 init [5]\n" + + " 2 root 0 0 0.0 0.0 00:00:01 2008 Dec 29 [migration/0]\n" + + "20270 chzhang 4248 588 0.0 0.0 00:00:00 15:32 15:32:36 ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd\n" + + "28371 angelac2 7100 1716 0.0 0.0 00:00:00 Feb27 Feb 27 /usr/libexec/gconfd-2 5\n"; + + PsOutput pso = new PsOutput(output); + ArrayList<HashMap<String, String>> processes = pso.getProcessList(); + assertEquals(4, processes.size()); + assertEquals("Dec29", processes.get(0).get("STARTED")); + assertEquals("15:32:36", processes.get(2).get("STARTED")); + assertEquals("ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd", processes.get(2).get("CMD")); + } + +} Index: src/contrib/chukwa/tools/service/chukwa-ps/run =================================================================== src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0) +++ src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0) @@ -0,0 +1,5 @@ +#!/bin/sh +CHUKWA_CONF_DIR=/usr/local/chukwa/conf + +exec setuidgid gmon /usr/local/chukwa/bin/exec-data-loader.sh --config $CHUKWA_CONF_DIR ps +
        Hide
        asrabkin Ari Rabkin added a comment -

        +1

        Show
        asrabkin Ari Rabkin added a comment - +1
        Hide
        eyang Eric Yang added a comment -

        Please regenerate the patch and attach as a file. Thanks

        Show
        eyang Eric Yang added a comment - Please regenerate the patch and attach as a file. Thanks
        Hide
        eyang Eric Yang added a comment -

        Cancel patch until a new patch is attached as a file.

        Show
        eyang Eric Yang added a comment - Cancel patch until a new patch is attached as a file.
        Hide
        eyang Eric Yang added a comment -

        +1 the new patch works.

        Show
        eyang Eric Yang added a comment - +1 the new patch works.
        Hide
        eyang Eric Yang added a comment -

        I just committed this, thanks Cheng.

        Show
        eyang Eric Yang added a comment - I just committed this, thanks Cheng.

          People

          • Assignee:
            zhangyongjiang Cheng
            Reporter:
            zhangyongjiang Cheng
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development