Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 3.7.0
    • Component/s: None
    • Labels:

      Description

      This feature is used for supporting windowing.

      The storage needs to have the following features:
      1. Spillable key value storage (integrate with APEXMALHAR-2026)
      2. Upon checkpoint, it saves a snapshot for the entire data set with the checkpointing window id. This should be done incrementally (ManagedState) to avoid wasting space with unchanged data
      3. When recovering, it takes the recovery window id and restores to that snapshot
      4. When a window is committed, all windows with a lower ID should be purged from the store.
      5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, and because of 2 and 3, we may want to add methods to the WindowedStorage interface so that the implementation of WindowedOperator can notify the storage of checkpointing, recovering and committing of a window.

        Issue Links

          Activity

          Hide
          thw Thomas Weise added a comment -

          What's the use case? I think this needs a clearer explanation as to what the purpose of the proposed feature is.

          We do have managed state and on top of it abstractions to plug it into operators:

          https://github.com/apache/apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/

          Show
          thw Thomas Weise added a comment - What's the use case? I think this needs a clearer explanation as to what the purpose of the proposed feature is. We do have managed state and on top of it abstractions to plug it into operators: https://github.com/apache/apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/
          Hide
          brightchen bright chen added a comment -

          Here are some thoughts to implement it based on state manager

          • Use one bucket only, use the Event Window and Key to generate key for the bucket, and use Stream Window as timeBucket. The problem of this approach is get the whole map of one Event window as it lack of key. Is possible the client code get value by Event Window and key instead of get whole map?
          • One Event window map to one bucket, and Stream window id as timeBucket. But the problem is the size of Event Window are variable and could be huge. and also the Event window size are vary. So this approach probably not doable.
          • One Streaming window map to a bucket. It will have problem to map Event window. And also have problem get whole map by Event Window, as we don't know which bucket to read the data.
          Show
          brightchen bright chen added a comment - Here are some thoughts to implement it based on state manager Use one bucket only, use the Event Window and Key to generate key for the bucket, and use Stream Window as timeBucket. The problem of this approach is get the whole map of one Event window as it lack of key. Is possible the client code get value by Event Window and key instead of get whole map? One Event window map to one bucket, and Stream window id as timeBucket. But the problem is the size of Event Window are variable and could be huge. and also the Event window size are vary. So this approach probably not doable. One Streaming window map to a bucket. It will have problem to map Event window. And also have problem get whole map by Event Window, as we don't know which bucket to read the data.
          Hide
          davidyan David Yan added a comment -

          Review-only PR is open: https://github.com/apache/apex-malhar/pull/345
          Implementation is complete but unit tests are pending.
          This depends on the completion of this PR: https://github.com/apache/apex-malhar/pull/324

          Show
          davidyan David Yan added a comment - Review-only PR is open: https://github.com/apache/apex-malhar/pull/345 Implementation is complete but unit tests are pending. This depends on the completion of this PR: https://github.com/apache/apex-malhar/pull/324
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ilooner commented on a diff in the pull request:

          https://github.com/apache/apex-malhar/pull/345#discussion_r72389548

          — Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java —
          @@ -0,0 +1,188 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing,
          + * software distributed under the License is distributed on an
          + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
          + * KIND, either express or implied. See the License for the
          + * specific language governing permissions and limitations
          + * under the License.
          + */
          +package org.apache.apex.malhar.lib.window.impl;
          +
          +import java.util.Iterator;
          +import java.util.Map;
          +
          +import org.apache.apex.malhar.lib.state.spillable.Spillable;
          +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
          +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
          +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
          +import org.apache.apex.malhar.lib.utils.serde.Serde;
          +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
          +import org.apache.apex.malhar.lib.window.Window;
          +import org.apache.apex.malhar.lib.window.WindowedStorage;
          +
          +import com.datatorrent.api.Context;
          +import com.datatorrent.netlet.util.Slice;
          +
          +/**
          + * This is an implementation of WindowedPlainStorage that makes use of

          {@link Spillable}

          data structures
          + *
          + * @param <T> Type of the value per window
          + */
          +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
          +{
          + private SpillableStateStore store;
          + private transient SpillableComplexComponentImpl sccImpl;
          + private long bucket;
          + private Serde<Window, Slice> windowSerde;
          + private Serde<T, Slice> valueSerde;
          +
          + protected transient Spillable.SpillableByteMap<Window, T> internMap;
          +
          + public SpillableWindowedPlainStorage()
          +

          { + }

          +
          + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
          +

          { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + }

          +
          + public void setStore(SpillableStateStore store)
          +

          { + this.store = store; + }

          +
          + public void setBucket(long bucket)
          +

          { + this.bucket = bucket; + }

          +
          + public void setWindowSerde(Serde<Window, Slice> windowSerde)
          +

          { + this.windowSerde = windowSerde; + }

          +
          + public void setValueSerde(Serde<T, Slice> valueSerde)
          +

          { + this.valueSerde = valueSerde; + }

          +
          + @Override
          + public void put(Window window, T value)
          +

          { + internMap.put(window, value); + }

          +
          + @Override
          + public T get(Window window)
          +

          { + return internMap.get(window); + }

          +
          + @Override
          + public Iterable<Map.Entry<Window, T>> entrySet()
          +

          { + return internMap.entrySet(); + }

          +
          + @Override
          + public Iterator<Map.Entry<Window, T>> iterator()
          +

          { + return internMap.entrySet().iterator(); + }

          +
          + @Override
          + public boolean containsWindow(Window window)
          +

          { + return internMap.containsKey(window); + }

          +
          + @Override
          + public long size()
          +

          { + return internMap.size(); + }

          +
          + @Override
          + public void remove(Window window)
          +

          { + internMap.remove(window); + }

          +
          + @Override
          + public void migrateWindow(Window fromWindow, Window toWindow)
          +

          { + internMap.put(toWindow, internMap.remove(fromWindow)); + }

          +
          + @Override
          + public void setup(Context.OperatorContext context)
          + {
          + if (store == null)

          { + // provide a default store + store = new ManagedStateSpillableStateStore(); + }

          + if (bucket == 0)

          { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + }

          + // set default serdes
          + if (windowSerde == null)

          { + windowSerde = new SerdeKryoSlice<>(); + }

          + if (valueSerde == null)

          { + valueSerde = new SerdeKryoSlice<>(); + }

          + sccImpl = new SpillableComplexComponentImpl(store);
          + sccImpl.setup(context);
          + internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde);
          — End diff –

          @davidyan74 I think you are getting a size of zero because you are allocating a new spillable data structure here. You should check if sccImpl is null and only if it's null initialize it and all the SpillableDatastructures

          Show
          githubbot ASF GitHub Bot added a comment - Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72389548 — Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java — @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures + * + * @param <T> Type of the value per window + */ +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T> +{ + private SpillableStateStore store; + private transient SpillableComplexComponentImpl sccImpl; + private long bucket; + private Serde<Window, Slice> windowSerde; + private Serde<T, Slice> valueSerde; + + protected transient Spillable.SpillableByteMap<Window, T> internMap; + + public SpillableWindowedPlainStorage() + { + } + + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + } + + public void setStore(SpillableStateStore store) + { + this.store = store; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setValueSerde(Serde<T, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public void put(Window window, T value) + { + internMap.put(window, value); + } + + @Override + public T get(Window window) + { + return internMap.get(window); + } + + @Override + public Iterable<Map.Entry<Window, T>> entrySet() + { + return internMap.entrySet(); + } + + @Override + public Iterator<Map.Entry<Window, T>> iterator() + { + return internMap.entrySet().iterator(); + } + + @Override + public boolean containsWindow(Window window) + { + return internMap.containsKey(window); + } + + @Override + public long size() + { + return internMap.size(); + } + + @Override + public void remove(Window window) + { + internMap.remove(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { + internMap.put(toWindow, internMap.remove(fromWindow)); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (store == null) { + // provide a default store + store = new ManagedStateSpillableStateStore(); + } + if (bucket == 0) { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + } + // set default serdes + if (windowSerde == null) { + windowSerde = new SerdeKryoSlice<>(); + } + if (valueSerde == null) { + valueSerde = new SerdeKryoSlice<>(); + } + sccImpl = new SpillableComplexComponentImpl(store); + sccImpl.setup(context); + internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde); — End diff – @davidyan74 I think you are getting a size of zero because you are allocating a new spillable data structure here. You should check if sccImpl is null and only if it's null initialize it and all the SpillableDatastructures
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 commented on a diff in the pull request:

          https://github.com/apache/apex-malhar/pull/345#discussion_r72494611

          — Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java —
          @@ -0,0 +1,188 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing,
          + * software distributed under the License is distributed on an
          + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
          + * KIND, either express or implied. See the License for the
          + * specific language governing permissions and limitations
          + * under the License.
          + */
          +package org.apache.apex.malhar.lib.window.impl;
          +
          +import java.util.Iterator;
          +import java.util.Map;
          +
          +import org.apache.apex.malhar.lib.state.spillable.Spillable;
          +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
          +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
          +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
          +import org.apache.apex.malhar.lib.utils.serde.Serde;
          +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
          +import org.apache.apex.malhar.lib.window.Window;
          +import org.apache.apex.malhar.lib.window.WindowedStorage;
          +
          +import com.datatorrent.api.Context;
          +import com.datatorrent.netlet.util.Slice;
          +
          +/**
          + * This is an implementation of WindowedPlainStorage that makes use of

          {@link Spillable}

          data structures
          + *
          + * @param <T> Type of the value per window
          + */
          +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
          +{
          + private SpillableStateStore store;
          + private transient SpillableComplexComponentImpl sccImpl;
          + private long bucket;
          + private Serde<Window, Slice> windowSerde;
          + private Serde<T, Slice> valueSerde;
          +
          + protected transient Spillable.SpillableByteMap<Window, T> internMap;
          +
          + public SpillableWindowedPlainStorage()
          +

          { + }

          +
          + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
          +

          { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + }

          +
          + public void setStore(SpillableStateStore store)
          +

          { + this.store = store; + }

          +
          + public void setBucket(long bucket)
          +

          { + this.bucket = bucket; + }

          +
          + public void setWindowSerde(Serde<Window, Slice> windowSerde)
          +

          { + this.windowSerde = windowSerde; + }

          +
          + public void setValueSerde(Serde<T, Slice> valueSerde)
          +

          { + this.valueSerde = valueSerde; + }

          +
          + @Override
          + public void put(Window window, T value)
          +

          { + internMap.put(window, value); + }

          +
          + @Override
          + public T get(Window window)
          +

          { + return internMap.get(window); + }

          +
          + @Override
          + public Iterable<Map.Entry<Window, T>> entrySet()
          +

          { + return internMap.entrySet(); + }

          +
          + @Override
          + public Iterator<Map.Entry<Window, T>> iterator()
          +

          { + return internMap.entrySet().iterator(); + }

          +
          + @Override
          + public boolean containsWindow(Window window)
          +

          { + return internMap.containsKey(window); + }

          +
          + @Override
          + public long size()
          +

          { + return internMap.size(); + }

          +
          + @Override
          + public void remove(Window window)
          +

          { + internMap.remove(window); + }

          +
          + @Override
          + public void migrateWindow(Window fromWindow, Window toWindow)
          +

          { + internMap.put(toWindow, internMap.remove(fromWindow)); + }

          +
          + @Override
          + public void setup(Context.OperatorContext context)
          + {
          + if (store == null)

          { + // provide a default store + store = new ManagedStateSpillableStateStore(); + }

          + if (bucket == 0)

          { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + }

          + // set default serdes
          + if (windowSerde == null)

          { + windowSerde = new SerdeKryoSlice<>(); + }

          + if (valueSerde == null)

          { + valueSerde = new SerdeKryoSlice<>(); + }

          + sccImpl = new SpillableComplexComponentImpl(store);
          + sccImpl.setup(context);
          + internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde);
          — End diff –

          @ilooner Wouldn't that mean we should make the SpillableByteMapImpl non transient and make it serializable by kryo? When I did that, it had this error:

          ```
          com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
          Serialization trace:
          internMap (org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage)

          at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
          at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
          at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125)
          at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145)
          at org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
          at org.junit.rules.RunRules.evaluate(RunRules.java:20)
          at com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
          at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
          at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
          Caused by: java.lang.UnsupportedOperationException
          at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161)
          at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
          at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
          at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
          at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
          ... 33 more
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72494611 — Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java — @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures + * + * @param <T> Type of the value per window + */ +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T> +{ + private SpillableStateStore store; + private transient SpillableComplexComponentImpl sccImpl; + private long bucket; + private Serde<Window, Slice> windowSerde; + private Serde<T, Slice> valueSerde; + + protected transient Spillable.SpillableByteMap<Window, T> internMap; + + public SpillableWindowedPlainStorage() + { + } + + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + } + + public void setStore(SpillableStateStore store) + { + this.store = store; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setValueSerde(Serde<T, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public void put(Window window, T value) + { + internMap.put(window, value); + } + + @Override + public T get(Window window) + { + return internMap.get(window); + } + + @Override + public Iterable<Map.Entry<Window, T>> entrySet() + { + return internMap.entrySet(); + } + + @Override + public Iterator<Map.Entry<Window, T>> iterator() + { + return internMap.entrySet().iterator(); + } + + @Override + public boolean containsWindow(Window window) + { + return internMap.containsKey(window); + } + + @Override + public long size() + { + return internMap.size(); + } + + @Override + public void remove(Window window) + { + internMap.remove(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { + internMap.put(toWindow, internMap.remove(fromWindow)); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (store == null) { + // provide a default store + store = new ManagedStateSpillableStateStore(); + } + if (bucket == 0) { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + } + // set default serdes + if (windowSerde == null) { + windowSerde = new SerdeKryoSlice<>(); + } + if (valueSerde == null) { + valueSerde = new SerdeKryoSlice<>(); + } + sccImpl = new SpillableComplexComponentImpl(store); + sccImpl.setup(context); + internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde); — End diff – @ilooner Wouldn't that mean we should make the SpillableByteMapImpl non transient and make it serializable by kryo? When I did that, it had this error: ``` com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: internMap (org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125) at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145) at org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.lang.UnsupportedOperationException at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ... 33 more ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 commented on a diff in the pull request:

          https://github.com/apache/apex-malhar/pull/345#discussion_r72521168

          — Diff: library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java —
          @@ -0,0 +1,122 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing,
          + * software distributed under the License is distributed on an
          + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
          + * KIND, either express or implied. See the License for the
          + * specific language governing permissions and limitations
          + * under the License.
          + */
          +package org.apache.apex.malhar.lib.window;
          +
          +import org.junit.Assert;
          +import org.junit.Ignore;
          +import org.junit.Rule;
          +import org.junit.Test;
          +
          +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
          +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
          +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
          +
          +import com.datatorrent.lib.util.KryoCloneUtils;
          +
          +/**
          + * Unit tests for Spillable Windowed Storage
          + */
          +public class SpillableWindowedStorageTest
          +{
          + @Rule
          + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
          +
          + @Ignore
          + @Test
          + public void testWindowedPlainStorage()
          + {
          + SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
          + Window window1 = new Window.TimeWindow<>(1000, 10);
          + Window window2 = new Window.TimeWindow<>(1010, 10);
          + Window window3 = new Window.TimeWindow<>(1020, 10);
          + storage.setStore(testMeta.store);
          + storage.setup(testMeta.operatorContext);
          + storage.beginApexWindow(1000);
          + storage.put(window1, 1);
          + storage.put(window2, 2);
          + storage.put(window3, 3);
          + storage.endApexWindow();
          + storage.beginApexWindow(1001);
          + storage.put(window1, 4);
          + storage.put(window2, 5);
          + storage.endApexWindow();
          + storage.beforeCheckpoint(1001);
          + storage.checkpointed(1001);
          +
          + SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
          +
          + storage.beginApexWindow(1002);
          + storage.put(window1, 6);
          + storage.put(window2, 7);
          + storage.endApexWindow();
          +
          + Assert.assertEquals(6L, storage.get(window1).longValue());
          + Assert.assertEquals(7L, storage.get(window2).longValue());
          + Assert.assertEquals(3L, storage.get(window3).longValue());
          +
          + storage.beginApexWindow(1003);
          + storage.put(window1, 8);
          + storage.put(window2, 9);
          + storage.endApexWindow();
          +
          + // simulating crash here
          + storage.teardown();
          +
          + storage = clonedStorage;
          + storage.setup(testMeta.operatorContext);
          +
          — End diff –

          @ilooner I made both SpillableComplexComponentImpl and SpillableByteMap non-transient and made the necessary changes to have them serializable by kryo. Take a look at the commit a71d2cc. But it looks like the data is still not fetched from disk after the recovery.

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72521168 — Diff: library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java — @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage; + +import com.datatorrent.lib.util.KryoCloneUtils; + +/** + * Unit tests for Spillable Windowed Storage + */ +public class SpillableWindowedStorageTest +{ + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Ignore + @Test + public void testWindowedPlainStorage() + { + SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>(); + Window window1 = new Window.TimeWindow<>(1000, 10); + Window window2 = new Window.TimeWindow<>(1010, 10); + Window window3 = new Window.TimeWindow<>(1020, 10); + storage.setStore(testMeta.store); + storage.setup(testMeta.operatorContext); + storage.beginApexWindow(1000); + storage.put(window1, 1); + storage.put(window2, 2); + storage.put(window3, 3); + storage.endApexWindow(); + storage.beginApexWindow(1001); + storage.put(window1, 4); + storage.put(window2, 5); + storage.endApexWindow(); + storage.beforeCheckpoint(1001); + storage.checkpointed(1001); + + SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage); + + storage.beginApexWindow(1002); + storage.put(window1, 6); + storage.put(window2, 7); + storage.endApexWindow(); + + Assert.assertEquals(6L, storage.get(window1).longValue()); + Assert.assertEquals(7L, storage.get(window2).longValue()); + Assert.assertEquals(3L, storage.get(window3).longValue()); + + storage.beginApexWindow(1003); + storage.put(window1, 8); + storage.put(window2, 9); + storage.endApexWindow(); + + // simulating crash here + storage.teardown(); + + storage = clonedStorage; + storage.setup(testMeta.operatorContext); + — End diff – @ilooner I made both SpillableComplexComponentImpl and SpillableByteMap non-transient and made the necessary changes to have them serializable by kryo. Take a look at the commit a71d2cc. But it looks like the data is still not fetched from disk after the recovery.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 closed the pull request at:

          https://github.com/apache/apex-malhar/pull/345

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 closed the pull request at: https://github.com/apache/apex-malhar/pull/345
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 reopened a pull request:

          https://github.com/apache/apex-malhar/pull/345

          APEXMALHAR-2130 REVIEW ONLY (WindowedOperator): incorporating Spillable data structures

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/345.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #345


          commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-06-05T00:11:20Z

          • Intermediate commit.

          commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-06-13T06:03:21Z

          Intermediate commit

          commit 60acf68b96f2145af3d90b410c6b20613347f881
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-06-21T06:58:09Z

          Intermediate commit

          commit 5b9ca5e7e1e8139cc5c8ab6dd96b8a60553e38fc
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-13T18:57:57Z

          Merge branch 'APEXMALHAR-2048_pull' of github.com:ilooner/incubator-apex-malhar

          commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-17T21:32:34Z

          Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap

          commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-18T01:56:41Z

          Added SpillableComplexComponentImpl

          commit 9f17b4ba9233e3f46746505941a378236193f719
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-18T03:29:07Z

          Added propagating callbacks to store

          commit 9e637a899e6aedb4ca6495a676654bb636616267
          Author: devtagare <devtagare@gmail.com>
          Date: 2016-05-18T22:25:56Z

          APEXMALHAR-2066 JdbcPolling,idempotent,partitionable

          commit 10fe7a14c7294c54c5d18e4d0e94882778266ac6
          Author: sandeshh <sandesh.hegde@gmail.com>
          Date: 2016-05-25T15:56:56Z

          Kafka 0.9.0 output operators and unit tests.

          1. Abstract Base class
          2. Kafka Output operator
          3. Exactly Once output operator
          Key in the Kafka message is used by the operator to track the tuples written by it.

          commit 8df0de73f58c69ac457e5a28d51a7b56b5549859
          Author: Chaitanya <chaitanya@datatorrent.com>
          Date: 2016-07-13T10:16:13Z

          APEXMALHAR-2019 Implemented S3 Input Module

          commit 2ed0a102d434ace34978c6e940e04c998133fd1e
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-18T01:56:41Z

          Added SpillableComplexComponentImpl

          commit c60e76eee8ae5cf034d4d9984b840d3219f52594
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-18T03:29:07Z

          Added propagating callbacks to store

          commit 452db2feee7cc6e186df3faf384147b41eef40ea
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-14T00:04:20Z

          Split WindowedStorage interface into two interfaces for plain and key data

          commit be96f667238cce47957324eac9979ebafc013fc6
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-15T21:40:59Z

          check null for retractionStorage

          commit b7471a5df5e883b23b5a6051b974d3521ce46151
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-15T21:48:13Z

          Removed unused imports

          commit 361d0e152f4709dd8fda864ce94727161807a98e
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-16T01:00:20Z

          added first draft implementation for spillable data structures

          commit ad65865ed9e5ba25d44aacaf76372e2338ef6377
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-18T21:04:24Z

          Implemented some of Tim's suggestion

          commit 32c2660409b3c85872002aba6b84c5c5e748f203
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-20T01:17:43Z

          moved storage initialization to setup

          commit 955f2d6c66be26f8f205a743eb9c49d4e8d2bdd6
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-20T17:50:05Z

          removed identifier as per tim's suggestion

          commit ec4e7508e2fd70030ecab256bba32be4cad8ad3a
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-21T00:05:51Z

          choose a bucket automatically and added entrySet implementation for SpillableWindowedKeyedStorage

          commit fe41f0c20235aac3ca57facc2491ecfba11d20a7
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-21T06:38:48Z

          Added checkpoint callbacks to spillable complex components
          Added some half completed tests

          commit dfc84a7847b4e71ec12ca8e8e1e0127732d880c0
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-21T18:07:47Z

          intermediate commit

          commit fe209fb72e169ae9605c826e61adfabda670c74d
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-21T18:08:15Z

          Merge branch 'APEXMALHAR-2048_pull' of github.com:ilooner/incubator-apex-malhar into windowedSpillable

          commit 11fb1ddd24705ed215ecbe4b9da72a39a3e8a9e1
          Author: David Yan <david@datatorrent.com>
          Date: 2016-07-21T19:59:24Z

          intermediate commit

          commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T04:09:54Z

          Finished unit test for SpillableArrayListMultimap

          commit dc258b8900688264f349307737b59b096dbc3d2b
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T05:19:37Z

          Added unit test which uses managed state

          commit ed9924b810a76c39404701da81f4753ab68af5a5
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T06:55:18Z

          Finished adding managed state tests for SpillableByteMap

          commit 43da17d9633dc2405b596b25697b6b4b0baef69f
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T16:31:53Z

          Added ManagedStateTests For SpillableArrayList

          commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T16:49:00Z

          Added managed state tests for SpillableArrayListMultimap

          commit 57c4e5e3c3e019613f31753be5052c32ba762e53
          Author: Timothy Farkas <tim@datatorrent.com>
          Date: 2016-07-24T16:57:35Z

          Added ManagedStateTest for SpillableComplexComponent


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 reopened a pull request: https://github.com/apache/apex-malhar/pull/345 APEXMALHAR-2130 REVIEW ONLY (WindowedOperator): incorporating Spillable data structures You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/345.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #345 commit c2c3f0acfcdf033a0e3044967ab3f8048f719259 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-06-05T00:11:20Z Intermediate commit. commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-06-13T06:03:21Z Intermediate commit commit 60acf68b96f2145af3d90b410c6b20613347f881 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-06-21T06:58:09Z Intermediate commit commit 5b9ca5e7e1e8139cc5c8ab6dd96b8a60553e38fc Author: David Yan <david@datatorrent.com> Date: 2016-07-13T18:57:57Z Merge branch ' APEXMALHAR-2048 _pull' of github.com:ilooner/incubator-apex-malhar commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-17T21:32:34Z Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-18T01:56:41Z Added SpillableComplexComponentImpl commit 9f17b4ba9233e3f46746505941a378236193f719 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-18T03:29:07Z Added propagating callbacks to store commit 9e637a899e6aedb4ca6495a676654bb636616267 Author: devtagare <devtagare@gmail.com> Date: 2016-05-18T22:25:56Z APEXMALHAR-2066 JdbcPolling,idempotent,partitionable commit 10fe7a14c7294c54c5d18e4d0e94882778266ac6 Author: sandeshh <sandesh.hegde@gmail.com> Date: 2016-05-25T15:56:56Z Kafka 0.9.0 output operators and unit tests. 1. Abstract Base class 2. Kafka Output operator 3. Exactly Once output operator Key in the Kafka message is used by the operator to track the tuples written by it. commit 8df0de73f58c69ac457e5a28d51a7b56b5549859 Author: Chaitanya <chaitanya@datatorrent.com> Date: 2016-07-13T10:16:13Z APEXMALHAR-2019 Implemented S3 Input Module commit 2ed0a102d434ace34978c6e940e04c998133fd1e Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-18T01:56:41Z Added SpillableComplexComponentImpl commit c60e76eee8ae5cf034d4d9984b840d3219f52594 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-18T03:29:07Z Added propagating callbacks to store commit 452db2feee7cc6e186df3faf384147b41eef40ea Author: David Yan <david@datatorrent.com> Date: 2016-07-14T00:04:20Z Split WindowedStorage interface into two interfaces for plain and key data commit be96f667238cce47957324eac9979ebafc013fc6 Author: David Yan <david@datatorrent.com> Date: 2016-07-15T21:40:59Z check null for retractionStorage commit b7471a5df5e883b23b5a6051b974d3521ce46151 Author: David Yan <david@datatorrent.com> Date: 2016-07-15T21:48:13Z Removed unused imports commit 361d0e152f4709dd8fda864ce94727161807a98e Author: David Yan <david@datatorrent.com> Date: 2016-07-16T01:00:20Z added first draft implementation for spillable data structures commit ad65865ed9e5ba25d44aacaf76372e2338ef6377 Author: David Yan <david@datatorrent.com> Date: 2016-07-18T21:04:24Z Implemented some of Tim's suggestion commit 32c2660409b3c85872002aba6b84c5c5e748f203 Author: David Yan <david@datatorrent.com> Date: 2016-07-20T01:17:43Z moved storage initialization to setup commit 955f2d6c66be26f8f205a743eb9c49d4e8d2bdd6 Author: David Yan <david@datatorrent.com> Date: 2016-07-20T17:50:05Z removed identifier as per tim's suggestion commit ec4e7508e2fd70030ecab256bba32be4cad8ad3a Author: David Yan <david@datatorrent.com> Date: 2016-07-21T00:05:51Z choose a bucket automatically and added entrySet implementation for SpillableWindowedKeyedStorage commit fe41f0c20235aac3ca57facc2491ecfba11d20a7 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-21T06:38:48Z Added checkpoint callbacks to spillable complex components Added some half completed tests commit dfc84a7847b4e71ec12ca8e8e1e0127732d880c0 Author: David Yan <david@datatorrent.com> Date: 2016-07-21T18:07:47Z intermediate commit commit fe209fb72e169ae9605c826e61adfabda670c74d Author: David Yan <david@datatorrent.com> Date: 2016-07-21T18:08:15Z Merge branch ' APEXMALHAR-2048 _pull' of github.com:ilooner/incubator-apex-malhar into windowedSpillable commit 11fb1ddd24705ed215ecbe4b9da72a39a3e8a9e1 Author: David Yan <david@datatorrent.com> Date: 2016-07-21T19:59:24Z intermediate commit commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T04:09:54Z Finished unit test for SpillableArrayListMultimap commit dc258b8900688264f349307737b59b096dbc3d2b Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T05:19:37Z Added unit test which uses managed state commit ed9924b810a76c39404701da81f4753ab68af5a5 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T06:55:18Z Finished adding managed state tests for SpillableByteMap commit 43da17d9633dc2405b596b25697b6b4b0baef69f Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T16:31:53Z Added ManagedStateTests For SpillableArrayList commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T16:49:00Z Added managed state tests for SpillableArrayListMultimap commit 57c4e5e3c3e019613f31753be5052c32ba762e53 Author: Timothy Farkas <tim@datatorrent.com> Date: 2016-07-24T16:57:35Z Added ManagedStateTest for SpillableComplexComponent
          Hide
          davidyan David Yan added a comment -

          bright chen Please take a look at the subtasks I created for this JIRA. Those are the things that are needed for the spillable version of windowed storage to function properly. We should probably divide the work between the two of us.

          Show
          davidyan David Yan added a comment - bright chen Please take a look at the subtasks I created for this JIRA. Those are the things that are needed for the spillable version of windowed storage to function properly. We should probably divide the work between the two of us.
          Hide
          davidyan David Yan added a comment -

          We are in the middle of the process of using Timothy Farkas's spillable data structures to implement the state storage for the WindowedOperator. We need something that supports the equivalence of Map<Window, Map<K, V>> and Map<Window, V>, where Window is event-time based window, and recovery is based on Apex windows.

          There are some gaps in the current state from what we need, most notably:

          1. Getting all keys given a window from Map<Window, Map<K, V>>
          2. Getting all windows from Map<Window, V>
          3. Deleting a window from Map<Window, Map<K, V>> and from Map<Window, V>
          4. Deleting a key given a window from Map<Window, Map<K, V>>

          Because of the above required features, implementing Map<Window, Map<K, V>> with a SpillableByteMap<Pair<Window, K>, V> in conjunction with a SpillableArrayListMultimap<Window, K> will not work.

          We are considering the following:

          To support 1 and 2:

          • Add the support of getting all keys >= given key by taking advantage of the FileAccess.FileReader.seek() and next() method and expose the functionality in the Bucket interface.
          • The seek() and next() need to take a timebucket. That means in order to support 1, we need to have the ability to derive the timebucket from the event-time window, and have SpillableByteMap to support user provided mapping from Key to time bucket. (If such mapping is provided, time bucket will not be assumed -1 any more).
          • To support 2, we also need to add functionality of getting the list of all timeBuckets.
          • Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything?

          To support 3 and 4:

          • We are thinking of a special valueSlice that denotes a deleted key. When a key is deleted, we just set the value to be the special valueSlice. The get methods will also handle it accordingly.
          • Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window.

          As you can see, going ahead with this will require some surgery on existing ManagedState and Spillable data structures.
          This is based on my limited knowledge on Managed State so please pardon me and correct me if my statements don't make sense.

          Chandni Singh Timothy Farkas Please comment.

          Show
          davidyan David Yan added a comment - We are in the middle of the process of using Timothy Farkas 's spillable data structures to implement the state storage for the WindowedOperator. We need something that supports the equivalence of Map<Window, Map<K, V>> and Map<Window, V>, where Window is event-time based window, and recovery is based on Apex windows. There are some gaps in the current state from what we need, most notably: 1. Getting all keys given a window from Map<Window, Map<K, V>> 2. Getting all windows from Map<Window, V> 3. Deleting a window from Map<Window, Map<K, V>> and from Map<Window, V> 4. Deleting a key given a window from Map<Window, Map<K, V>> Because of the above required features, implementing Map<Window, Map<K, V>> with a SpillableByteMap<Pair<Window, K>, V> in conjunction with a SpillableArrayListMultimap<Window, K> will not work. We are considering the following: To support 1 and 2: Add the support of getting all keys >= given key by taking advantage of the FileAccess.FileReader.seek() and next() method and expose the functionality in the Bucket interface. The seek() and next() need to take a timebucket. That means in order to support 1, we need to have the ability to derive the timebucket from the event-time window, and have SpillableByteMap to support user provided mapping from Key to time bucket. (If such mapping is provided, time bucket will not be assumed -1 any more). To support 2, we also need to add functionality of getting the list of all timeBuckets. Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything? To support 3 and 4: We are thinking of a special valueSlice that denotes a deleted key. When a key is deleted, we just set the value to be the special valueSlice. The get methods will also handle it accordingly. Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window. As you can see, going ahead with this will require some surgery on existing ManagedState and Spillable data structures. This is based on my limited knowledge on Managed State so please pardon me and correct me if my statements don't make sense. Chandni Singh Timothy Farkas Please comment.
          Hide
          timothyfarkas Timothy Farkas added a comment -

          David Yan As far I can tell what you have implemented ontop of spillable datastructures is 95% of the way there. I'm not sure what has changed.

          1. SpillableByteMap already supports deleting a key by setting the value for a key to be an empty byte array. So that's there. This can easily be extended to delete all the values in SpillableArrayListMultimap.

          2. The only missing piece is the ability to iterate over the set of keys in a map. This can be done with another SpillableByteMap let's call this map the linkedListMap. The key of the linkedListMap represents the current node, the value represents the next node. You then keep track of your "head" key and iterate by taking your current node, and getting the value. The value then becomes the current node and so on. When the value for the current node is null you are done traversing the list. You can take this and wrap it in an iterator.

          Show
          timothyfarkas Timothy Farkas added a comment - David Yan As far I can tell what you have implemented ontop of spillable datastructures is 95% of the way there. I'm not sure what has changed. 1. SpillableByteMap already supports deleting a key by setting the value for a key to be an empty byte array. So that's there. This can easily be extended to delete all the values in SpillableArrayListMultimap. 2. The only missing piece is the ability to iterate over the set of keys in a map. This can be done with another SpillableByteMap let's call this map the linkedListMap. The key of the linkedListMap represents the current node, the value represents the next node. You then keep track of your "head" key and iterate by taking your current node, and getting the value. The value then becomes the current node and so on. When the value for the current node is null you are done traversing the list. You can take this and wrap it in an iterator.
          Hide
          csingh Chandni Singh added a comment - - edited

          Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it.

          >>>>
          Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything?

          Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it:

          • create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket.
          • make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket.

          >>>
          Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window.

          Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well.

          I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods.

          Show
          csingh Chandni Singh added a comment - - edited Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it. >>>> Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything? Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it: create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket. make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket. >>> Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window. Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well. I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods.
          Hide
          davidyan David Yan added a comment - - edited

          Timothy Farkas Yes, in theory we can do what you said with two spillable data structures, with a LinkedList instead of an ArrayList, but it's not ideal since TFile and DTFile already support returning an iterator that iterates over entries that are greater than or equal to a given key and we should make use of that to get the list of keys given a window on an equivalence of Map<Pair<Window, K>, V>. We just need to expose that functionality in managed state and assign the timebucket based on the event Window.

          Show
          davidyan David Yan added a comment - - edited Timothy Farkas Yes, in theory we can do what you said with two spillable data structures, with a LinkedList instead of an ArrayList, but it's not ideal since TFile and DTFile already support returning an iterator that iterates over entries that are greater than or equal to a given key and we should make use of that to get the list of keys given a window on an equivalence of Map<Pair<Window, K>, V>. We just need to expose that functionality in managed state and assign the timebucket based on the event Window.
          Hide
          davidyan David Yan added a comment -

          Chandni Singh Thanks. I will try your suggestions and will have questions for you along the way.

          Show
          davidyan David Yan added a comment - Chandni Singh Thanks. I will try your suggestions and will have questions for you along the way.
          Hide
          thw Thomas Weise added a comment -

          Which keys belong to which window can be seen as derived information when using a key in managed state that is <window><key>. We can then retrieve all keys for a given window by doing a prefix scan for <window>. This becomes difficult however when including the state that wasn't flushed to the data files yet (they are compacted asynchronously).

          Using another spillable datastructure to store the keys for each window is possible, but also comes with drawbacks, since it duplicates information. Once of them is obviously performance, since it is increasing the HDFS usage. It also needs to use the same store, as otherwise there is the situation of two resources that need to be updated atomically.

          Assuming that both collections can use the same store (key prefix) and share the same WAL and data files, then we need to see whether time based purging will still work as intended. David?

          Show
          thw Thomas Weise added a comment - Which keys belong to which window can be seen as derived information when using a key in managed state that is <window><key>. We can then retrieve all keys for a given window by doing a prefix scan for <window>. This becomes difficult however when including the state that wasn't flushed to the data files yet (they are compacted asynchronously). Using another spillable datastructure to store the keys for each window is possible, but also comes with drawbacks, since it duplicates information. Once of them is obviously performance, since it is increasing the HDFS usage. It also needs to use the same store, as otherwise there is the situation of two resources that need to be updated atomically. Assuming that both collections can use the same store (key prefix) and share the same WAL and data files, then we need to see whether time based purging will still work as intended. David?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 opened a pull request:

          https://github.com/apache/apex-malhar/pull/405

          APEXMALHAR-2130 #resolve Added a spillable map that takes two keys with support of iterating through all entries with a given first key

          @siyuanh @tweise @ilooner Please review. This is mostly for the scalable implementation of a WindowedKeyedStorage.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar APEXMALHAR-2130

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/405.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #405


          commit 183c9ff01e3f197c9088cdcf460827023d414ce1
          Author: David Yan <david@datatorrent.com>
          Date: 2016-09-08T20:59:50Z

          APEXMALHAR-2130 #resolve Added a spillable map that takes two keys with support of iterating through all entries with a given first key


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 opened a pull request: https://github.com/apache/apex-malhar/pull/405 APEXMALHAR-2130 #resolve Added a spillable map that takes two keys with support of iterating through all entries with a given first key @siyuanh @tweise @ilooner Please review. This is mostly for the scalable implementation of a WindowedKeyedStorage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar APEXMALHAR-2130 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 183c9ff01e3f197c9088cdcf460827023d414ce1 Author: David Yan <david@datatorrent.com> Date: 2016-09-08T20:59:50Z APEXMALHAR-2130 #resolve Added a spillable map that takes two keys with support of iterating through all entries with a given first key
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 opened a pull request:

          https://github.com/apache/apex-malhar/pull/407

          APEXMALHAR-2130 WindowStorage interface changes in preparation of inc…

          …orporating spillable data structures

          @siyuanh please review and merge
          @tweise @ShunxinLu FYI

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar windowedStorageInterfaceChange

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/407.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #407


          commit 103f7e39d4d855a720d1f5e9b76152cfe9b79d8d
          Author: David Yan <david@datatorrent.com>
          Date: 2016-09-11T07:30:20Z

          APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 opened a pull request: https://github.com/apache/apex-malhar/pull/407 APEXMALHAR-2130 WindowStorage interface changes in preparation of inc… …orporating spillable data structures @siyuanh please review and merge @tweise @ShunxinLu FYI You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar windowedStorageInterfaceChange Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/407.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #407 commit 103f7e39d4d855a720d1f5e9b76152cfe9b79d8d Author: David Yan <david@datatorrent.com> Date: 2016-09-11T07:30:20Z APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 opened a pull request:

          https://github.com/apache/apex-malhar/pull/408

          APEXMALHAR-2130 Added SpillableSet and SpillableSetMultimap implementations

          @tweise @siyuanh @ilooner please review

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar spillableSet

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/408.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #408


          commit 13a29e3f6e8424b11dff075fdcc306f171f1a446
          Author: David Yan <david@datatorrent.com>
          Date: 2016-09-12T08:00:52Z

          APEXMALHAR-2130 Added SpillableSet and SpillableSetMultimap implementations


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 opened a pull request: https://github.com/apache/apex-malhar/pull/408 APEXMALHAR-2130 Added SpillableSet and SpillableSetMultimap implementations @tweise @siyuanh @ilooner please review You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar spillableSet Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/408.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #408 commit 13a29e3f6e8424b11dff075fdcc306f171f1a446 Author: David Yan <david@datatorrent.com> Date: 2016-09-12T08:00:52Z APEXMALHAR-2130 Added SpillableSet and SpillableSetMultimap implementations
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/apex-malhar/pull/407

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/407
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 closed the pull request at:

          https://github.com/apache/apex-malhar/pull/405

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 closed the pull request at: https://github.com/apache/apex-malhar/pull/405
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 closed the pull request at:

          https://github.com/apache/apex-malhar/pull/345

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 closed the pull request at: https://github.com/apache/apex-malhar/pull/345
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 opened a pull request:

          https://github.com/apache/apex-malhar/pull/424

          APEXMALHAR-2130 Spillable implementation for WindowedOperator

          @tweise @brightchen @siyuanh Please review and merge

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/424.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #424


          commit ede7f056919ba11c607b06d302153a7498e5e3e6
          Author: David Yan <david@datatorrent.com>
          Date: 2016-08-15T21:19:08Z

          APEXMALHAR-2130 Spillable implementation for WindowedOperator


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 opened a pull request: https://github.com/apache/apex-malhar/pull/424 APEXMALHAR-2130 Spillable implementation for WindowedOperator @tweise @brightchen @siyuanh Please review and merge You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #424 commit ede7f056919ba11c607b06d302153a7498e5e3e6 Author: David Yan <david@datatorrent.com> Date: 2016-08-15T21:19:08Z APEXMALHAR-2130 Spillable implementation for WindowedOperator
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidyan74 closed the pull request at:

          https://github.com/apache/apex-malhar/pull/424

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidyan74 closed the pull request at: https://github.com/apache/apex-malhar/pull/424
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user davidyan74 reopened a pull request:

          https://github.com/apache/apex-malhar/pull/424

          APEXMALHAR-2130 Spillable implementation for WindowedOperator

          @tweise @brightchen @siyuanh Please review and merge

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/apex-malhar/pull/424.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #424



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user davidyan74 reopened a pull request: https://github.com/apache/apex-malhar/pull/424 APEXMALHAR-2130 Spillable implementation for WindowedOperator @tweise @brightchen @siyuanh Please review and merge You can merge this pull request into a Git repository by running: $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #424
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/apex-malhar/pull/424

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/424
          Hide
          brightchen bright chen added a comment -

          I tested the purge function use KeyedWindowedOperatorBenchmarkApp around 20 minutes in my local machine. It seems fine in most of time, it only have most recent 2 files. But it become 4 files after I killed the application. Following is results

          MacBook-Pro-2:2 bright$ ll
          total 88344
          rw-rr- 1 bright staff 43516867 Jan 23 14:47 1485211620000
          rw-rr- 1 bright staff 1707578 Jan 23 14:47 1485211680000
          rw-rr- 1 bright staff 300 Jan 23 14:47 _META

          MacBook-Pro-2:2 bright$ ll
          total 93888
          rw-rr- 1 bright staff 33381148 Jan 23 14:59 1485212340000
          rw-rr- 1 bright staff 14681911 Jan 23 14:59 1485212400000
          rw-rr- 1 bright staff 1906 Jan 23 14:59 _META

          MacBook-Pro-2:2 bright$ ll
          total 114536
          rw-rr- 1 bright staff 22219720 Jan 23 15:02 1485212460000
          rw-rr- 1 bright staff 20585272 Jan 23 15:03 1485212520000
          rw-rr- 1 bright staff 15794686 Jan 23 15:03 1485212580000
          rw-rr- 1 bright staff 29718 Jan 23 15:03 1485212640000
          rw-rr- 1 bright staff 2490 Jan 23 15:03 _META

          Show
          brightchen bright chen added a comment - I tested the purge function use KeyedWindowedOperatorBenchmarkApp around 20 minutes in my local machine. It seems fine in most of time, it only have most recent 2 files. But it become 4 files after I killed the application. Following is results MacBook-Pro-2:2 bright$ ll total 88344 rw-r r - 1 bright staff 43516867 Jan 23 14:47 1485211620000 rw-r r - 1 bright staff 1707578 Jan 23 14:47 1485211680000 rw-r r - 1 bright staff 300 Jan 23 14:47 _META MacBook-Pro-2:2 bright$ ll total 93888 rw-r r - 1 bright staff 33381148 Jan 23 14:59 1485212340000 rw-r r - 1 bright staff 14681911 Jan 23 14:59 1485212400000 rw-r r - 1 bright staff 1906 Jan 23 14:59 _META MacBook-Pro-2:2 bright$ ll total 114536 rw-r r - 1 bright staff 22219720 Jan 23 15:02 1485212460000 rw-r r - 1 bright staff 20585272 Jan 23 15:03 1485212520000 rw-r r - 1 bright staff 15794686 Jan 23 15:03 1485212580000 rw-r r - 1 bright staff 29718 Jan 23 15:03 1485212640000 rw-r r - 1 bright staff 2490 Jan 23 15:03 _META

            People

            • Assignee:
              davidyan David Yan
              Reporter:
              brightchen bright chen
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development