Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.9.0
    • Component/s: None
    • Labels:
      None

      Description

      Add a variant of CsvTable that can be streamed. It would serve as an example of how to write stream adapters.

      It would be like the CSV adapter, but watches a file and reports records added to the end of the file (like the tail command).

      You’d have to change CsvTable to implement StreamableTable, and implement the Table stream() method to return a variant of the table that is in “follow” mode.

      It would probably be implemented by a variant of CsvEnumerator, but it is getting its input in bursts, as the file is appended to.

      You would still be able to use this adapter to read historical data from the CSV file. Appending records to the file would make a nice demo.

        Issue Links

          Activity

          Hide
          zhenw zhen wang added a comment -

          I am having some fun with this one here: https://github.com/apache/calcite/compare/master...zinking:csvstream?expand=1
          current status: the streaming query can capture the file changes and emit output correspondingly.

          one remaining item is regarding how to terminate the stream
          1. when the query is in emitting output status, then it can be cancelled perfectly.
          2. however when the query is blocked for more input, seems I haven't managed to break from that point

          >0: jdbc:calcite:model=/Users/awang/workspace/> select stream * from ss.depts;
          -------------------

          ROWTIME DEPTNO NAME

          -------------------

          2016-05-27 05:12:31 10 Sales
          2016-05-27 05:12:31 20 Marketing
          2016-05-27 05:12:31 30 Accounts
          2016-05-27 05:12:31 40 40
          2016-05-27 05:12:31 50 50
          2016-05-27 05:12:31 60 60

          Exception in thread "SIGINT handler" java.lang.UnsupportedOperationException
          at org.apache.calcite.avatica.AvaticaResultSet.cancel(AvaticaResultSet.java:171)
          at org.apache.calcite.avatica.AvaticaStatement.cancel(AvaticaStatement.java:299)
          at sqlline.DispatchCallback.forceKillSqlQuery(DispatchCallback.java:83)
          at sqlline.SunSignalHandler.handle(SunSignalHandler.java:38)
          at sun.misc.Signal$1.run(Signal.java:212)
          at java.lang.Thread.run(Thread.java:745)

          Julian Hyde would you mind take a look and comments are appreciated. thanks.

          Show
          zhenw zhen wang added a comment - I am having some fun with this one here: https://github.com/apache/calcite/compare/master...zinking:csvstream?expand=1 current status: the streaming query can capture the file changes and emit output correspondingly. one remaining item is regarding how to terminate the stream 1. when the query is in emitting output status, then it can be cancelled perfectly. 2. however when the query is blocked for more input, seems I haven't managed to break from that point >0: jdbc:calcite:model=/Users/awang/workspace/> select stream * from ss.depts; -------- ------ ----- ROWTIME DEPTNO NAME -------- ------ ----- 2016-05-27 05:12:31 10 Sales 2016-05-27 05:12:31 20 Marketing 2016-05-27 05:12:31 30 Accounts 2016-05-27 05:12:31 40 40 2016-05-27 05:12:31 50 50 2016-05-27 05:12:31 60 60 Exception in thread "SIGINT handler" java.lang.UnsupportedOperationException at org.apache.calcite.avatica.AvaticaResultSet.cancel(AvaticaResultSet.java:171) at org.apache.calcite.avatica.AvaticaStatement.cancel(AvaticaStatement.java:299) at sqlline.DispatchCallback.forceKillSqlQuery(DispatchCallback.java:83) at sqlline.SunSignalHandler.handle(SunSignalHandler.java:38) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:745) Julian Hyde would you mind take a look and comments are appreciated. thanks.
          Hide
          julianhyde Julian Hyde added a comment -

          Looks pretty good. Can you open a formal pull request?

          I think there's a bit too much copy-pasted code. E.g. CsvStreamScannableTable.getRowType does the same as the base method CsvTable.getRowType. Can you make a pass and remove any code that can be removed.

          Please rename CSVStreamReader to CsvStreamReader, to match our naming convention.

          Regarding cancel. I wonder whether the reader could block waiting for data with a timeout of say 1 second, check to see whether the query has been canceled, then try to read again, in a loop.

          Show
          julianhyde Julian Hyde added a comment - Looks pretty good. Can you open a formal pull request? I think there's a bit too much copy-pasted code. E.g. CsvStreamScannableTable.getRowType does the same as the base method CsvTable.getRowType. Can you make a pass and remove any code that can be removed. Please rename CSVStreamReader to CsvStreamReader, to match our naming convention. Regarding cancel. I wonder whether the reader could block waiting for data with a timeout of say 1 second, check to see whether the query has been canceled, then try to read again, in a loop.
          Hide
          zhenw zhen wang added a comment -

          https://github.com/apache/calcite/pull/239/files
          cleaned the imp a bit, but still haven't figured out how to cancel the query. I guess I shouldn't do something like install a CTRL+C hook?

          Show
          zhenw zhen wang added a comment - https://github.com/apache/calcite/pull/239/files cleaned the imp a bit, but still haven't figured out how to cancel the query. I guess I shouldn't do something like install a CTRL+C hook?
          Hide
          julianhyde Julian Hyde added a comment - - edited

          A signal handler is not sufficient: the statement may be on a different machine than the place where the Control-C is issued.

          The reader needs to have a reference to the execution context - or a callback to get it - so that it can continually find out the state of the statement. The DataContext parameter passed to Bindable.bind is an example of that context. Use a debugger and see if you can understand the life-cycle of the execution context, and figure out how to get that context into a reader.

          Show
          julianhyde Julian Hyde added a comment - - edited A signal handler is not sufficient: the statement may be on a different machine than the place where the Control-C is issued. The reader needs to have a reference to the execution context - or a callback to get it - so that it can continually find out the state of the statement. The DataContext parameter passed to Bindable.bind is an example of that context. Use a debugger and see if you can understand the life-cycle of the execution context, and figure out how to get that context into a reader.
          Hide
          zhenw zhen wang added a comment -

          indeed I didn't quite get how DataContext could used to help. will continue ...

          public Enumerable<Object[]> scan(DataContext root) {

          so `DataContext` could easily be injected into `CsvStreamReader`. problem is how does the reader get the `Cancel` signal ?

          the `order` stream could end because it always have next result. when CTRL-C is signaled, SQLine enters Canceled mode. and *upon next record* it will break from the reading records loop and after that close statement result set etc...

          what is different for CsvStream here is that, although SQL line enters Canceled mode, it needs another(next) input to break from the reading record loop, which CsvStream wouldn't able to give (as itself is blocked for next input).

          so my understanding is: upon cancel handling in AvaticaStatement, it has to obtain the DataContext, and inject the cancel status into variable map, and thus reader could check there.

          Show
          zhenw zhen wang added a comment - indeed I didn't quite get how DataContext could used to help. will continue ... public Enumerable<Object[]> scan(DataContext root) { so `DataContext` could easily be injected into `CsvStreamReader`. problem is how does the reader get the `Cancel` signal ? the `order` stream could end because it always have next result. when CTRL-C is signaled, SQLine enters Canceled mode. and * upon next record * it will break from the reading records loop and after that close statement result set etc... what is different for CsvStream here is that, although SQL line enters Canceled mode, it needs another(next) input to break from the reading record loop, which CsvStream wouldn't able to give (as itself is blocked for next input). so my understanding is: upon cancel handling in AvaticaStatement, it has to obtain the DataContext, and inject the cancel status into variable map, and thus reader could check there.
          Hide
          julianhyde Julian Hyde added a comment -

          I agree about the "upon next record" problem. That is why I suggested, a couple of comments back, that the reader reads using a timeout. I notice that CsvStreamReader has a field Queue<String> contentQueue and calls contentQueue.poll();. Can you convert that to a BlockingQueue and call BlockingQueue.poll(long timeout, TimeUnit unit)?

          You could add

          enum DataContext.Variable {
            CANCEL_FLAG("cancelFlag", CancelFlag.class)
          }
          

          and the enumerator could check its state periodically:

          DataContext root;
          CancelFlag flag = root.get(DataContext.Variable.CANCEL_FLAG.name());
          for (;;) {
             ...
          
            if (flag.isCancelRequested()) {
              return;
            }
          }
          
          Show
          julianhyde Julian Hyde added a comment - I agree about the "upon next record" problem. That is why I suggested, a couple of comments back, that the reader reads using a timeout. I notice that CsvStreamReader has a field Queue<String> contentQueue and calls contentQueue.poll(); . Can you convert that to a BlockingQueue and call BlockingQueue.poll(long timeout, TimeUnit unit) ? You could add enum DataContext.Variable { CANCEL_FLAG( "cancelFlag" , CancelFlag.class) } and the enumerator could check its state periodically: DataContext root; CancelFlag flag = root.get(DataContext.Variable.CANCEL_FLAG.name()); for (;;) { ... if (flag.isCancelRequested()) { return ; } }
          Hide
          zhenw zhen wang added a comment -

          I updated PR with one possible solution. don't quite like it though as the dataContext are hacked into Statement.
          possibly elegant ways to do this. but that's what I understand.

          the `throw unsupported` in the cancel handling suggests `cancel` haven't been considered thoroughly throughout the execution stage.

          and another issue is that, enumerator read one more record after CTRL+C is pressed. this might not be desired.

          Show
          zhenw zhen wang added a comment - I updated PR with one possible solution. don't quite like it though as the dataContext are hacked into Statement. possibly elegant ways to do this. but that's what I understand. the `throw unsupported` in the cancel handling suggests `cancel` haven't been considered thoroughly throughout the execution stage. and another issue is that, enumerator read one more record after CTRL+C is pressed. this might not be desired.
          Hide
          julianhyde Julian Hyde added a comment -

          the `throw unsupported` in the cancel handling suggests `cancel` haven't been considered thoroughly throughout the execution stage.

          Yes, that's true. CALCITE-849 is related to this, but we never fixed it. We're solving the problem now.

          and another issue is that, enumerator read one more record after CTRL+C is pressed. this might not be desired.

          I think that's unavoidable. I don't know any systems that cancel immediately. It's much more efficient to allow some leeway.

          Show
          julianhyde Julian Hyde added a comment - the `throw unsupported` in the cancel handling suggests `cancel` haven't been considered thoroughly throughout the execution stage. Yes, that's true. CALCITE-849 is related to this, but we never fixed it. We're solving the problem now. and another issue is that, enumerator read one more record after CTRL+C is pressed. this might not be desired. I think that's unavoidable. I don't know any systems that cancel immediately. It's much more efficient to allow some leeway.
          Hide
          julianhyde Julian Hyde added a comment -

          I took zhen wang's work in https://github.com/apache/calcite/pull/239, fixed up, and added cancel functionality. I strongly believe that cancel flag should be allocated at execute time, not prepare time, so I changed the code accordingly. I also added a test for cancel, and tried to reduce the duplication between CsvEnumerator and CsvStreamEnumerator. (It overlaps work going on in CALCITE-884, unfortunately.)

          zhen wang, Can you please review https://github.com/julianhyde/calcite/tree/1227-streaming-csv-table?

          Show
          julianhyde Julian Hyde added a comment - I took zhen wang 's work in https://github.com/apache/calcite/pull/239 , fixed up, and added cancel functionality. I strongly believe that cancel flag should be allocated at execute time, not prepare time, so I changed the code accordingly. I also added a test for cancel, and tried to reduce the duplication between CsvEnumerator and CsvStreamEnumerator. (It overlaps work going on in CALCITE-884 , unfortunately.) zhen wang , Can you please review https://github.com/julianhyde/calcite/tree/1227-streaming-csv-table?
          Hide
          zhenw zhen wang added a comment -

          Julian Hyde I took a careful look at your work. looks all good and fits my original target. two points of comparison with mine:
          1. I was supposed to come up with your `cancel` mechanism design. I think where the `cancel` flag is put currently fits my original thoughts. and I think my problem is : although I know where it belongs to , I can't seem to hook all critical components up as how it looks currently.

          2. how the `cancel` is checked, my imp used "null" return to indicating a `cancellation` of the execution. which is a bit confusing.

          all in all, learned a lot in your fix. I'm all good with your change.

          Show
          zhenw zhen wang added a comment - Julian Hyde I took a careful look at your work. looks all good and fits my original target. two points of comparison with mine: 1. I was supposed to come up with your `cancel` mechanism design. I think where the `cancel` flag is put currently fits my original thoughts. and I think my problem is : although I know where it belongs to , I can't seem to hook all critical components up as how it looks currently. 2. how the `cancel` is checked, my imp used "null" return to indicating a `cancellation` of the execution. which is a bit confusing. all in all, learned a lot in your fix. I'm all good with your change.
          Show
          julianhyde Julian Hyde added a comment - Fixed in http://git-wip-us.apache.org/repos/asf/calcite/commit/65c1cec2 .
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Resolved in release 1.9.0 (2016-09-22)

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Resolved in release 1.9.0 (2016-09-22)

            People

            • Assignee:
              julianhyde Julian Hyde
              Reporter:
              julianhyde Julian Hyde
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development