Attachments
Issue Links
- links to
Activity
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/78#discussion_r77101042
— Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
@@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.source.
import org.apache.gearpump.streaming.task.
{Task, TaskContext}-class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.output
- override def onStart(startTime: Instant): Unit =
{
- self ! Watermark(Instant.now)
- }
+class Split() extends DataSource {
+
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
- override def onNext(msg: Message): Unit = {
+
+ override def read(): Message = {
Split.TEXT_TO_SPLIT.lines.foreachUnknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } }-
- import scala.concurrent.duration._
- taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
- Watermark(Instant.now))
+ Message("message")
}
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+
+ Watermark(Instant.now)-
- End diff –
-
What's this line of code used for?
Github user Roshanson commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/78#discussion_r77278370
— Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
@@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.source.
import org.apache.gearpump.streaming.task.
{Task, TaskContext}-class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.output
- override def onStart(startTime: Instant): Unit =
{
- self ! Watermark(Instant.now)
- }
+class Split() extends DataSource {
+
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
- override def onNext(msg: Message): Unit = {
+
+ override def read(): Message = {
Split.TEXT_TO_SPLIT.lines.foreachUnknown macro: { line => line.split("[\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) + new Message(msg, System.currentTimeMillis()) } }-
- import scala.concurrent.duration._
- taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
- Watermark(Instant.now))
+ Message("message")
}
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+
+ Watermark(Instant.now)-
- End diff –
-
Thanks,i'm learning how gearpump is working now.
Github user codecov-io commented on the issue:
https://github.com/apache/incubator-gearpump/pull/78
-
- [Current coverage](https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr) is 69.63% (diff: 100%)
> Merging 78(https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr) into [master](https://codecov.io/gh/apache/incubator-gearpump/branch/master?src=pr) will increase coverage by *1.02%*
- [Current coverage](https://codecov.io/gh/apache/incubator-gearpump/pull/78?src=pr) is 69.63% (diff: 100%)
```diff
@@ master #78 diff @@
==========================================
Files 177 177
Lines 5927 5927
Methods 5604 5606 +2
Messages 0 0
Branches 323 321 -2
==========================================
+ Hits 4066 4127 +61
+ Misses 1861 1800 -61
Partials 0 0
```
![Sunburst](https://codecov.io/gh/apache/incubator-gearpump/pull/78/graphs/sunburst.svg?src=pr&size=150)
> Powered by [Codecov](https://codecov.io?src=pr). Last update [529799c...d8f0838](https://codecov.io/gh/apache/incubator-gearpump/compare/529799cc400a72ae9e0d2385044ce1fd5e329bb3...d8f0838ed3e31b844db46e122df56658a0a698a9?src=pr)
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/78#discussion_r77342296
— Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
@@ -19,34 +19,45 @@
package org.apache.gearpump.streaming.examples.wordcount
import java.time.Instant
-import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.source.
-import org.apache.gearpump.streaming.task.
{Task, TaskContext} +import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import scala.collection.mutable.ArrayBuffer
-class Split() extends DataSource {
+class Split extends DataSource {
+
+ val result = ArrayBuffer[Message]()
+ var item = -1
+ Split.TEXT_TO_SPLIT.lines.foreach { line =>
+ line.split("\\s+").filter(_.nonEmpty).foreach { msg => // => 为匿名函数,传入一个msg参数执行右边的操作
— End diff –
please remove this comment.
Github user manuzhang commented on the issue:
https://github.com/apache/incubator-gearpump/pull/78
+1
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/78#discussion_r77100932
— Diff: examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala —
@@ -23,29 +23,34 @@ import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
{DataSource, DataSourceTask, Watermark}import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.source.
import org.apache.gearpump.streaming.task.
{Task, TaskContext}-class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Split() extends DataSource {
I prefer `class Split extends DataSource`, no braces.