diff --git a/.gitignore b/.gitignore index 553a077..028f415 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.vagrant dist *classes target/ diff --git a/grid/mesos/Vagrantfile b/grid/mesos/Vagrantfile new file mode 100644 index 0000000..a364dea --- /dev/null +++ b/grid/mesos/Vagrantfile @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered. +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + config.vm.box = "precise64" + + # The url from where the 'config.vm.box' box will be fetched if it + # doesn't already exist on the user's system. + config.vm.box_url = "http://files.vagrantup.com/precise64.box" + + config.vm.define "zookeeper" do |zookeeper| + zookeeper.vm.network :private_network, ip: "192.168.57.5" + zookeeper.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "1524"] + end + zookeeper.vm.provision "shell", path: "vagrant/zk.sh" + end + + config.vm.define "brokerOne" do |brokerOne| + brokerOne.vm.network :private_network, ip: "192.168.57.10" + brokerOne.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "1524"] + end + brokerOne.vm.provision "shell", path: "vagrant/broker.sh", :args => "1" + end + +end diff --git a/grid/mesos/build.sbt b/grid/mesos/build.sbt new file mode 100644 index 0000000..c5fbcd6 --- /dev/null +++ b/grid/mesos/build.sbt @@ -0,0 +1,18 @@ + +name := "kafka-mesos" + +version := "0.1.0.0" + +scalaVersion := "2.10.3" + +mainClass := Some("Stub") + +resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" + +libraryDependencies ++= Seq( + "org.specs2" %% "specs2" % "2.2.2" % "test", + "org.apache.kafka" % "kafka_2.10" % "0.8.0" intransitive(), + "log4j" % "log4j" % "1.2.17", + "org.apache.mesos" % "mesos" % "0.15.0", + "com.google.protobuf" % "protobuf-java" % "2.4.1" +) diff --git a/grid/mesos/project/.keep b/grid/mesos/project/.keep new file mode 100644 index 0000000..e69de29 diff --git a/grid/mesos/src/main/scala/KafkaExecutor.scala b/grid/mesos/src/main/scala/KafkaExecutor.scala new file mode 100644 index 0000000..8b0b7a4 --- /dev/null +++ b/grid/mesos/src/main/scala/KafkaExecutor.scala @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.mesos + +import org.apache.mesos.{MesosExecutorDriver, ExecutorDriver, Executor} +import org.apache.mesos.Protos._ +import kafka.utils.{Logging => AppLogging} + +class SampleExecutor extends Executor with AppLogging { + + /** + * Invoked when the executor becomes "disconnected" from the slave + * (e.g., the slave is being restarted due to an upgrade). + */ + def disconnected(executorDriver: ExecutorDriver) = { + info("disconnected") + } + + /** + * Invoked when a fatal error has occured with the executor and/or + * executor driver. The driver will be aborted BEFORE invoking this + * callback. + */ + def error(executorDriver: ExecutorDriver,message: String) = { + super.error(message) + } + + /** + * Invoked when a framework message has arrived for this + * executor. These messages are best effort; do not expect a + * framework message to be retransmitted in any reliable fashion. + */ + def frameworkMessage(executorDriver: ExecutorDriver, data: Array[Byte]) = { + info("frameworkMessage data bytes") + } + + /** + * Invoked when a task running within this executor has been killed + * (via SchedulerDriver::killTask). Note that no status update will + * be sent on behalf of the executor, the executor is responsible + * for creating a new TaskStatus (i.e., with TASK_KILLED) and + * invoking ExecutorDriver::sendStatusUpdate. + */ + def killTask(executorDriver: ExecutorDriver,taskId: TaskID) = { + info("kill task") + } + + /** + * Invoked when a task has been launched on this executor (initiated + * via Scheduler::launchTasks). Note that this task can be realized + * with a thread, a process, or some simple computation, however, no + * other callbacks will be invoked on this executor until this + * callback has returned. + */ + def launchTask(executorDriver: ExecutorDriver,taskInfo: TaskInfo) = { + info("launching task") + } + + /** + * Invoked once the executor driver has been able to successfully + * connect with Mesos. In particular, a scheduler can pass some + * data to it's executors through the FrameworkInfo.ExecutorInfo's + * data field. + */ + def registered(executorDriver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) = { + info("registered") + } + + /** + * Invoked when the executor re-registers with a restarted slave. + */ + def reregistered(x$1: ExecutorDriver,x$2: org.apache.mesos.Protos.SlaveInfo) = { + info("registered with a restarted slave") + } + + /** + * Invoked when the executor should terminate all of it's currently + * running tasks. Note that after a Mesos has determined that an + * executor has terminated any tasks that the executor did not send + * terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, + * TASK_FAILED, etc) a TASK_LOST status update will be created. + */ + def shutdown(x$1: ExecutorDriver) = { + info("shutting down") + } + + def main(args: Array[String]) = { + args.foreach(a => info(a)) + } +} \ No newline at end of file diff --git a/grid/mesos/src/main/scala/KafkaScheduler.scala b/grid/mesos/src/main/scala/KafkaScheduler.scala new file mode 100644 index 0000000..f5939a7 --- /dev/null +++ b/grid/mesos/src/main/scala/KafkaScheduler.scala @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.mesos + +import org.apache.mesos.{SchedulerDriver, Scheduler} +import java.util +import org.apache.mesos.Protos._ +import org.apache.mesos.state.State +import kafka.utils.{Logging => AppLogging} + +// Mesos Framework API http://mesos.apache.org/documentation/latest/app-framework-development-guide/ + +class KafkaScheduler(command: String, numInstances: Int, state: State) extends Scheduler with AppLogging { + + /** + * Invoked when the scheduler becomes "disconnected" from the master + * (e.g., the master fails and another is taking over). + */ + def disconnected(schedulerDriver: SchedulerDriver) = { + info("disconnected started") + } + + /** + * Invoked when there is an unrecoverable error in the scheduler or + * scheduler driver. The driver will be aborted BEFORE invoking this + * callback. + */ + def error(schedulerDriver: SchedulerDriver,message: String) = { + super.error(message) + } + + /** + * Invoked when an executor has exited/terminated. Note that any + * tasks running will have TASK_LOST status updates automagically + * generated. + */ + def executorLost(schedulerDriver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) = { + info("executorLost status = " + status.toString) + } + + /** + * Invoked when a framework message has arrived for this + * executor. These messages are best effort; do not expect a + * framework message to be retransmitted in any reliable fashion. + */ + def frameworkMessage(schedulerDriver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]) = { + info("frameworkMessage received") + } + + /** + * Invoked when an offer is no longer valid (e.g., the slave was + * lost or another framework used resources in the offer). If for + * whatever reason an offer is never rescinded (e.g., dropped + * message, failing over framework, etc.), a framwork that attempts + * to launch tasks using an invalid offer will receive TASK_LOST + * status updats for those tasks (see Scheduler::resourceOffers). + */ + def offerRescinded(schedulerDriver: SchedulerDriver, executorId: OfferID) = { + info("the offer we had was taken away from us") + } + + /** + * Invoked once the executor driver has been able to successfully + * connect with Mesos. In particular, a scheduler can pass some + * data to it's executors through the FrameworkInfo.ExecutorInfo's + * data field. + */ + def registered(schedulerDriver: SchedulerDriver, framworkId: FrameworkID, masterInfo: MasterInfo) = { + info("registered with FrameworkID") + } + + + /** + * Invoked when the executor re-registers with a restarted slave. + */ + def reregistered(schedulerDriver: SchedulerDriver, masterInfo: MasterInfo) = { + info("registered without FrameworkID") + } + + /** + * Invoked when resources have been offered to this framework. A + * single offer will only contain resources from a single slave. + * Resources associated with an offer will not be re-offered to + * _this_ framework until either (a) this framework has rejected + * those resources (see SchedulerDriver::launchTasks) or (b) those + * resources have been rescinded (see Scheduler::offerRescinded). + * Note that resources may be concurrently offered to more than one + * framework at a time (depending on the allocator being used). In + * that case, the first framework to launch tasks using those + * resources will be able to use them while the other frameworks + * will have those resources rescinded (or if a framework has + * already launched tasks with those resources then those tasks will + * fail with a TASK_LOST status and a message saying as much). + */ + def resourceOffers(schedulerDriver: SchedulerDriver, offer: util.List[Offer]) = { + info("resource offers") + } + + /** + * Invoked when a slave has been determined unreachable (e.g., + * machine failure, network partition). Most frameworks will need to + * reschedule any tasks launched on this slave on a new slave. + */ + def slaveLost(schedulerDriver: SchedulerDriver, slaveId: SlaveID) = { + info("slave lost") + } + + /** + * Invoked when the status of a task has changed (e.g., a slave is + * lost and so the task is lost, a task finishes and an executor + * sends a status update saying so, etc). Note that returning from + * this callback _acknowledges_ receipt of this status update! If + * for whatever reason the scheduler aborts during this callback (or + * the process exits) another status update will be delivered (note, + * however, that this is currently not true if the slave sending the + * status update is lost/fails during that time). + */ + def statusUpdate(schedulerDriver: SchedulerDriver, taskStatus: TaskStatus) = { + info("status update") + } + + def main(args: Array[String]) = { + args.foreach(a => info(a)) + } +} \ No newline at end of file diff --git a/grid/mesos/vagrant/README.md b/grid/mesos/vagrant/README.md new file mode 100644 index 0000000..e707f02 --- /dev/null +++ b/grid/mesos/vagrant/README.md @@ -0,0 +1,29 @@ +# Apache Kafka # + +Using Vagrant to get up and running. + +1) Install Vagrant [http://www.vagrantup.com/](http://www.vagrantup.com/) +2) Install Virtual Box [https://www.virtualbox.org/](https://www.virtualbox.org/) + +In the main kafka folder + +1) ./sbt update +2) ./sbt package +3) ./sbt assembly-package-dependency +4) vagrant up + +once this is done +* Zookeeper will be running 192.168.57.5 +* Broker 1 on 192.168.57.10 + +When you are all up and running you will be back at a command brompt. + +If you want you can login to the machines using vagrant ssh but you don't need to. + +You can access the brokers and zookeeper by their IP + +e.g. + +bin/kafka-console-producer.sh --broker-list 192.168.57.10:9092 --topic sandbox + +bin/kafka-console-consumer.sh --zookeeper 192.168.57.5:2181 --topic sandbox --from-beginning diff --git a/grid/mesos/vagrant/broker.sh b/grid/mesos/vagrant/broker.sh new file mode 100755 index 0000000..b71e18a --- /dev/null +++ b/grid/mesos/vagrant/broker.sh @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash -x +apt-get -y update +apt-get install -y software-properties-common python-software-properties +add-apt-repository -y ppa:webupd8team/java +apt-get -y update +/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections +apt-get -y install oracle-java7-installer oracle-java7-set-default + +chmod a+rw /opt +cd /opt +ln -s /vagrant kafka +cd kafka +IP=$(ifconfig | grep 'inet addr:'| grep 168 | grep 192|cut -d: -f2 | awk '{ print $1}') +sed 's/broker.id=0/'broker.id=$1'/' /opt/kafka/config/server.properties > /tmp/prop1.tmp +sed 's/#advertised.host.name=/'advertised.host.name=$IP'/' /tmp/prop1.tmp > /tmp/prop2.tmp +sed 's/#host.name=localhost/'host.name=$IP'/' /tmp/prop2.tmp > /tmp/prop3.tmp +sed 's/zookeeper.connect=localhost:2181/'zookeeper.connect=192.168.57.5:2181'/' /tmp/prop3.tmp > /opt/server.properties + +bin/kafka-server-start.sh /opt/server.properties 1>> /tmp/broker.log 2>> /tmp/broker.log & diff --git a/grid/mesos/vagrant/mesos.sh b/grid/mesos/vagrant/mesos.sh new file mode 100755 index 0000000..38461ec --- /dev/null +++ b/grid/mesos/vagrant/mesos.sh @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/sh -Eux + +# Trap non-normal exit signals: 1/HUP, 2/INT, 3/QUIT, 15/TERM, ERR +trap founderror 1 2 3 15 ERR + +founderror() +{ + exit 1 +} + +exitscript() +{ + #remove lock file + #rm $lockfile + exit 0 +} + +apt-get update +apt-get install -y vim git wget screen curl + +####################################################################################################################### +sudo apt-get -y update +sudo apt-get install -y software-properties-common python-software-properties +sudo add-apt-repository -y ppa:webupd8team/java +sudo apt-get -y update +sudo /bin/echo debconf shared/accepted-oracle-license-v1-1 select true | sudo /usr/bin/debconf-set-selections +sudo apt-get -y install oracle-java7-installer oracle-java7-set-default +####################################################################################################################### + +sudo chmod a+rw -R /opt +sudo mkdir -p /opt/apache +sudo chmod a+rw -R /opt/apache +sudo apt-get install -y g++ python2.7-dev libcppunit-dev libunwind7-dev git +sudo apt-get update && sudo apt-get install -y libsasl2-modules libsasl2-dev autotools-dev libltdl-dev libtool +sudo apt-get update && sudo apt-get install -y autoconf autopoint make libcurl4-openssl-dev +cd /opt/apache +git clone git://git.apache.org/mesos.git +cd /opt/apache/mesos +sudo ./bootstrap +sudo ./configure +sudo make +sudo chmod a+rw /opt/apache +curl -fL https://raw.github.com/mesosphere/mesos-docker/master/bin/mesos-docker-setup | sudo bash + +curl -s https://get.docker.io/ubuntu/ | sudo sh + +exitscript diff --git a/grid/mesos/vagrant/zk.sh b/grid/mesos/vagrant/zk.sh new file mode 100755 index 0000000..a05e494 --- /dev/null +++ b/grid/mesos/vagrant/zk.sh @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash -x +apt-get -y update +apt-get install -y software-properties-common python-software-properties +add-apt-repository -y ppa:webupd8team/java +apt-get -y update +/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections +apt-get -y install oracle-java7-installer oracle-java7-set-default + +chmod a+rw /opt +cd /opt +ln -s /vagrant kafka +cd kafka +bin/zookeeper-server-start.sh config/zookeeper.properties 1>> /tmp/zk.log 2>> /tmp/zk.log & \ No newline at end of file