Uploaded image for project: 'Apache Gearpump'
  1. Apache Gearpump
  2. GEARPUMP-192

refactor example sources task to use DataSource API

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.8.1
    • 0.8.3
    • streaming
    • None

    Attachments

      Issue Links

        Activity

          githubbot ASF GitHub Bot added a comment -

          Github user huafengw commented on a diff in the pull request:

          https://github.com/apache/incubator-gearpump/pull/78#discussion_r77100932

          — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
          @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit

          import org.apache.gearpump.Message
          import org.apache.gearpump.cluster.UserConfig
          -import org.apache.gearpump.streaming.source.Watermark
          +import org.apache.gearpump.streaming.source.

          {DataSource, DataSourceTask, Watermark}

          import org.apache.gearpump.streaming.task.

          {Task, TaskContext}

          -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {

          • import taskContext.output
          • override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - }

            +class Split() extends DataSource {

              • End diff –

          I prefer `class Split extends DataSource`, no braces.

          githubbot ASF GitHub Bot added a comment - Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/78#discussion_r77100932 — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala — @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.source. {DataSource, DataSourceTask, Watermark} import org.apache.gearpump.streaming.task. {Task, TaskContext} -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - } +class Split() extends DataSource { End diff – I prefer `class Split extends DataSource`, no braces.
          githubbot ASF GitHub Bot added a comment -

          Github user huafengw commented on a diff in the pull request:

          https://github.com/apache/incubator-gearpump/pull/78#discussion_r77101042

          — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
          @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit

          import org.apache.gearpump.Message
          import org.apache.gearpump.cluster.UserConfig
          -import org.apache.gearpump.streaming.source.Watermark
          +import org.apache.gearpump.streaming.source.

          {DataSource, DataSourceTask, Watermark}

          import org.apache.gearpump.streaming.task.

          {Task, TaskContext}

          -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {

          • import taskContext.output
          • override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - }

            +class Split() extends DataSource {
            +
            +
            + override def open(context: TaskContext, startTime: Instant): Unit = {}

          • override def onNext(msg: Message): Unit = {
            +
            + override def read(): Message = {
            Split.TEXT_TO_SPLIT.lines.foreach
            Unknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } }

            -

          • import scala.concurrent.duration._
          • taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
          • Watermark(Instant.now))
            + Message("message")
            }
            +
            + override def close(): Unit = {}
            +
            + override def getWatermark: Instant = Instant.now()
            +
            + Watermark(Instant.now)
              • End diff –

          What's this line of code used for?

          githubbot ASF GitHub Bot added a comment - Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/78#discussion_r77101042 — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala — @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.source. {DataSource, DataSourceTask, Watermark} import org.apache.gearpump.streaming.task. {Task, TaskContext} -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - } +class Split() extends DataSource { + + + override def open(context: TaskContext, startTime: Instant): Unit = {} override def onNext(msg: Message): Unit = { + + override def read(): Message = { Split.TEXT_TO_SPLIT.lines.foreach Unknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } } - import scala.concurrent.duration._ taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Watermark(Instant.now)) + Message("message") } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + + Watermark(Instant.now) End diff – What's this line of code used for?
          githubbot ASF GitHub Bot added a comment -

          Github user Roshanson commented on a diff in the pull request:

          https://github.com/apache/incubator-gearpump/pull/78#discussion_r77278370

          — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
          @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit

          import org.apache.gearpump.Message
          import org.apache.gearpump.cluster.UserConfig
          -import org.apache.gearpump.streaming.source.Watermark
          +import org.apache.gearpump.streaming.source.

          {DataSource, DataSourceTask, Watermark}

          import org.apache.gearpump.streaming.task.

          {Task, TaskContext}

          -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {

          • import taskContext.output
          • override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - }

            +class Split() extends DataSource {
            +
            +
            + override def open(context: TaskContext, startTime: Instant): Unit = {}

          • override def onNext(msg: Message): Unit = {
            +
            + override def read(): Message = {
            Split.TEXT_TO_SPLIT.lines.foreach
            Unknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } }

            -

          • import scala.concurrent.duration._
          • taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
          • Watermark(Instant.now))
            + Message("message")
            }
            +
            + override def close(): Unit = {}
            +
            + override def getWatermark: Instant = Instant.now()
            +
            + Watermark(Instant.now)
              • End diff –

          Thanks,i'm learning how gearpump is working now.

          githubbot ASF GitHub Bot added a comment - Github user Roshanson commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/78#discussion_r77278370 — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala — @@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.source. {DataSource, DataSourceTask, Watermark} import org.apache.gearpump.streaming.task. {Task, TaskContext} -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output override def onStart(startTime: Instant): Unit = { - self ! Watermark(Instant.now) - } +class Split() extends DataSource { + + + override def open(context: TaskContext, startTime: Instant): Unit = {} override def onNext(msg: Message): Unit = { + + override def read(): Message = { Split.TEXT_TO_SPLIT.lines.foreach Unknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } } - import scala.concurrent.duration._ taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Watermark(Instant.now)) + Message("message") } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + + Watermark(Instant.now) End diff – Thanks,i'm learning how gearpump is working now.
          githubbot ASF GitHub Bot added a comment -

          Github user codecov-io commented on the issue:

          https://github.com/apache/incubator-gearpump/pull/78

            1. [Current coverage](https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr) is 69.63% (diff: 100%)
              > Merging 78(https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr) into [master](https://codecov.io/gh/apache/incubator-gearpump/branch/master?src=pr) will increase coverage by *1.02%*

          ```diff
          @@ master #78 diff @@
          ==========================================
          Files 177 177
          Lines 5927 5927
          Methods 5604 5606 +2
          Messages 0 0
          Branches 323 321 -2
          ==========================================
          + Hits 4066 4127 +61
          + Misses 1861 1800 -61
          Partials 0 0
          ```

          ![Sunburst](https://codecov.io/gh/apache/incubator-gearpump/pull/78/graphs/sunburst.svg?src=pr&size=150)

          > Powered by [Codecov](https://codecov.io?src=pr). Last update [529799c...d8f0838](https://codecov.io/gh/apache/incubator-gearpump/compare/529799cc400a72ae9e0d2385044ce1fd5e329bb3...d8f0838ed3e31b844db46e122df56658a0a698a9?src=pr)

          githubbot ASF GitHub Bot added a comment - Github user codecov-io commented on the issue: https://github.com/apache/incubator-gearpump/pull/78 [Current coverage] ( https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr ) is 69.63% (diff: 100%) > Merging 78 ( https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr ) into [master] ( https://codecov.io/gh/apache/incubator-gearpump/branch/master?src=pr ) will increase coverage by * 1.02% * ```diff @@ master #78 diff @@ ========================================== Files 177 177 Lines 5927 5927 Methods 5604 5606 +2 Messages 0 0 Branches 323 321 -2 ========================================== + Hits 4066 4127 +61 + Misses 1861 1800 -61 Partials 0 0 ``` ! [Sunburst] ( https://codecov.io/gh/apache/incubator-gearpump/pull/78/graphs/sunburst.svg?src=pr&size=150 ) > Powered by [Codecov] ( https://codecov.io?src=pr ). Last update [529799c...d8f0838] ( https://codecov.io/gh/apache/incubator-gearpump/compare/529799cc400a72ae9e0d2385044ce1fd5e329bb3...d8f0838ed3e31b844db46e122df56658a0a698a9?src=pr )
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on a diff in the pull request:

          https://github.com/apache/incubator-gearpump/pull/78#discussion_r77342296

          — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
          @@ -19,34 +19,45 @@
          package org.apache.gearpump.streaming.examples.wordcount

          import java.time.Instant
          -import java.util.concurrent.TimeUnit

          import org.apache.gearpump.Message
          -import org.apache.gearpump.cluster.UserConfig
          -import org.apache.gearpump.streaming.source.

          {DataSource, DataSourceTask, Watermark}

          -import org.apache.gearpump.streaming.task.

          {Task, TaskContext}

          +import org.apache.gearpump.streaming.source.DataSource
          +import org.apache.gearpump.streaming.task.TaskContext

          +import scala.collection.mutable.ArrayBuffer

          -class Split() extends DataSource {

          +class Split extends DataSource {
          +
          + val result = ArrayBuffer[Message]()
          + var item = -1
          + Split.TEXT_TO_SPLIT.lines.foreach { line =>
          + line.split("\\s+").filter(_.nonEmpty).foreach { msg => // => 为匿名函数,传入一个msg参数执行右边的操作
          — End diff –

          please remove this comment.

          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/78#discussion_r77342296 — Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala — @@ -19,34 +19,45 @@ package org.apache.gearpump.streaming.examples.wordcount import java.time.Instant -import java.util.concurrent.TimeUnit import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.source. {DataSource, DataSourceTask, Watermark} -import org.apache.gearpump.streaming.task. {Task, TaskContext} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import scala.collection.mutable.ArrayBuffer -class Split() extends DataSource { +class Split extends DataSource { + + val result = ArrayBuffer [Message] () + var item = -1 + Split.TEXT_TO_SPLIT.lines.foreach { line => + line.split(" \\s +").filter(_.nonEmpty).foreach { msg => // => 为匿名函数,传入一个msg参数执行右边的操作 — End diff – please remove this comment.
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/incubator-gearpump/pull/78

          +1

          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/incubator-gearpump/pull/78 +1
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/incubator-gearpump/pull/78

          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/incubator-gearpump/pull/78
          karol_brejna Karol Brejna added a comment -

          It looks like we could close the issue. The PR has been merged.

          karol_brejna Karol Brejna added a comment - It looks like we could close the issue. The PR has been merged.

          People

            roshanson Kaifang Bao
            mauzhang Manu Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: