Details

    • Type: New Feature
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: sql
    • Labels:

      Description

      Discussion about high-level languages to define Samza queries. Queries are defined in this language and transformed to a dataflow graph where the nodes are Samza jobs.

      1. StreamSQLforSAMZA-v0.1.docx.docx
        15 kB
        Yi Pan (Data Infrastructure)

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          if the schema registry from Confluent is available shortly, we are more than happy to pick it up.

          +1

          Show
          criccomini Chris Riccomini added a comment - if the schema registry from Confluent is available shortly, we are more than happy to pick it up. +1
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jay Kreps and Milinda Lakmal Pathirage, if the schema registry from Confluent is available shortly, we are more than happy to pick it up.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jay Kreps and Milinda Lakmal Pathirage , if the schema registry from Confluent is available shortly, we are more than happy to pick it up.
          Hide
          jkreps Jay Kreps added a comment -

          Milinda Lakmal Pathirage it's a standalone system/project.

          Show
          jkreps Jay Kreps added a comment - Milinda Lakmal Pathirage it's a standalone system/project.
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Jay Kreps, Is this going to be a part of Kafka? Or a completely separate project?

          Yi Pan (Data Infrastructure) & Chris Riccomini, I think we can wait for Kafka schema registry release because I am still working on the translation from Calcite plan to OperatorRouter instance. I'll create a sub task to track this.

          Show
          milinda Milinda Lakmal Pathirage added a comment - Jay Kreps , Is this going to be a part of Kafka? Or a completely separate project? Yi Pan (Data Infrastructure) & Chris Riccomini , I think we can wait for Kafka schema registry release because I am still working on the translation from Calcite plan to OperatorRouter instance. I'll create a sub task to track this.
          Hide
          jkreps Jay Kreps added a comment -

          WRT to the schema registry. Yeah we built and are about to open source a system that we hope will be the default for this kind of thing. I think it is a bit better than the existing github project which dates back to that AVRO patch that has been floating around forever. The things that may be a bit better include:
          1. Can use Kafka for schema storage so no external database dependency
          2. Support multi-dc operation
          3. Has a single global id space so a message can be linked back to the registry irrespective of the topic. This is required to support mixed schemas.
          4. A better notion of compatibility

          We intend to add a UI for this to browse your schemas and diff them between versions to see what changed and really try to build out the full workflow for people to interact with it.

          We'd be interested in really making it work well for the Samza use case--I think there may be a fair amount of metadata that is needed to really be an effective catalog for streaming. We'd be happy to take on the related tickets to help integrate it here. The plan is to release this middle of next week.

          Show
          jkreps Jay Kreps added a comment - WRT to the schema registry. Yeah we built and are about to open source a system that we hope will be the default for this kind of thing. I think it is a bit better than the existing github project which dates back to that AVRO patch that has been floating around forever. The things that may be a bit better include: 1. Can use Kafka for schema storage so no external database dependency 2. Support multi-dc operation 3. Has a single global id space so a message can be linked back to the registry irrespective of the topic. This is required to support mixed schemas. 4. A better notion of compatibility We intend to add a UI for this to browse your schemas and diff them between versions to see what changed and really try to build out the full workflow for people to interact with it. We'd be interested in really making it work well for the Samza use case--I think there may be a fair amount of metadata that is needed to really be an effective catalog for streaming. We'd be happy to take on the related tickets to help integrate it here. The plan is to release this middle of next week.
          Hide
          julianhyde Julian Hyde added a comment -

          Milinda Lakmal Pathirage Streaming support will be on a branch for maybe a week or two. Then I hope to merge into master branch. It should be in the next release (calcite-1.1). Of course if you are using snapshots of calcite it would be unwise to put your code into Samza's main branch, because of the rule that you can't make a release that uses snapshot versions of other components.

          Show
          julianhyde Julian Hyde added a comment - Milinda Lakmal Pathirage Streaming support will be on a branch for maybe a week or two. Then I hope to merge into master branch. It should be in the next release (calcite-1.1). Of course if you are using snapshots of calcite it would be unwise to put your code into Samza's main branch, because of the rule that you can't make a release that uses snapshot versions of other components.
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Julian Hyde, I'll post my issues and questions to Calcite. Is there any plans to move streaming support to main repo?

          Yi Pan (Data Infrastructure), I'll have a look at the schema-repo project.

          Show
          milinda Milinda Lakmal Pathirage added a comment - Julian Hyde , I'll post my issues and questions to Calcite. Is there any plans to move streaming support to main repo? Yi Pan (Data Infrastructure) , I'll have a look at the schema-repo project.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Milinda Lakmal Pathirage, just talked to Felix GV regarding to the schema repo. It seems to me that we can start with the https://github.com/schema-repo/schema-repo project since it is quite flexible to plugin different modules like ID generator and metadata storage. One thing we might want to check with Jay Kreps is that how different the schema registry API will be in the new Kafka schema registry. But worst case is that we need to write another adapter to connect w/ Kafka's schema registry.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Milinda Lakmal Pathirage , just talked to Felix GV regarding to the schema repo. It seems to me that we can start with the https://github.com/schema-repo/schema-repo project since it is quite flexible to plugin different modules like ID generator and metadata storage. One thing we might want to check with Jay Kreps is that how different the schema registry API will be in the new Kafka schema registry. But worst case is that we need to write another adapter to connect w/ Kafka's schema registry.
          Hide
          julianhyde Julian Hyde added a comment - - edited

          Milinda Lakmal Pathirage By the way, a simple way to make progress: log issues on Calcite with either: (a) a simple valid query that the validator thinks invalid, or (b) a simple invalid query that the validator thinks is valid, the error message it should throw, and use carets ^ to indicate the region of the SQL string where the error occurred.

          I'll shortly have a sample schema of ORDERS and SHIPMENTS streams and PRODUCTS table so that you can write those queries.

          Show
          julianhyde Julian Hyde added a comment - - edited Milinda Lakmal Pathirage By the way, a simple way to make progress: log issues on Calcite with either: (a) a simple valid query that the validator thinks invalid, or (b) a simple invalid query that the validator thinks is valid, the error message it should throw, and use carets ^ to indicate the region of the SQL string where the error occurred. I'll shortly have a sample schema of ORDERS and SHIPMENTS streams and PRODUCTS table so that you can write those queries.
          Hide
          julianhyde Julian Hyde added a comment -

          Milinda Lakmal Pathirage I agree, you should not skip validation. We have adapted Calcite's type system for various flavors of "late schema" (e.g. Drill, Mongo, Splunk, Phoenix have their own particular requirements). At the end of validation each expression in the query has been assigned a type, but it might be a very loose type (e.g. in a JavaScript-like type system, emp.deptno might be translated via syntactic sugar to emp._MAP['deptno'] and have type ANY).

          It is a principle of Calcite that if a query cannot be implemented, it should fail at validate time. Therefore there should be no user errors in the SQL-to-rel translation.

          I am very open to adding new validation modes to Calcite, in particular for streaming. I think the work can be done generically, i.e. knowing about streaming, but not knowing the specifics of Samza's implementation.

          Show
          julianhyde Julian Hyde added a comment - Milinda Lakmal Pathirage I agree, you should not skip validation. We have adapted Calcite's type system for various flavors of "late schema" (e.g. Drill, Mongo, Splunk, Phoenix have their own particular requirements). At the end of validation each expression in the query has been assigned a type, but it might be a very loose type (e.g. in a JavaScript-like type system, emp.deptno might be translated via syntactic sugar to emp._MAP ['deptno'] and have type ANY). It is a principle of Calcite that if a query cannot be implemented, it should fail at validate time. Therefore there should be no user errors in the SQL-to-rel translation. I am very open to adding new validation modes to Calcite, in particular for streaming. I think the work can be done generically, i.e. knowing about streaming, but not knowing the specifics of Samza's implementation.
          Hide
          felixgv Felix GV added a comment -

          Hi Milinda Lakmal Pathirage, Yi Pan (Data Infrastructure),

          The schema-repo project is basically the latest patch in AVRO-1124, with a bit of clean up, bug fixes and improvements added. It has been published to Maven Central for easier dependency, though it may need a new release as there is a fair bit of unreleased updates in the current master, like a new strongly-typed API for easier use of Avro.

          There is also a mailing list for the schema-repo if you have specific questions about it...

          -F

          Show
          felixgv Felix GV added a comment - Hi Milinda Lakmal Pathirage , Yi Pan (Data Infrastructure) , The schema-repo project is basically the latest patch in AVRO-1124 , with a bit of clean up, bug fixes and improvements added. It has been published to Maven Central for easier dependency, though it may need a new release as there is a fair bit of unreleased updates in the current master, like a new strongly-typed API for easier use of Avro. There is also a mailing list for the schema-repo if you have specific questions about it... -F
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          IMO, we shouldn't skip all validations. I think we need to validate the structure of the query to check whether the query is executable, but not access to the fields.

          Yes, I agree with you that we should still perform basic validation, except data type validation if schema registry is missing.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - IMO, we shouldn't skip all validations. I think we need to validate the structure of the query to check whether the query is executable, but not access to the fields. Yes, I agree with you that we should still perform basic validation, except data type validation if schema registry is missing.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Milinda Lakmal Pathirage, we were referring to the avro schema registry that LinkedIn used internally. The first link you referred to looks more like a complete project. Chris Riccomini and Felix GV, could you comment on that?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Milinda Lakmal Pathirage , we were referring to the avro schema registry that LinkedIn used internally. The first link you referred to looks more like a complete project. Chris Riccomini and Felix GV , could you comment on that?
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          I tried disabling validation in Calcite, but query preparation didn't work without the output from parser going through the validator. So, we need to clarify the possibility of disabling/relaxing validation in Calcite with Julian Hyde. IMO, we shouldn't skip all validations. I think we need to validate the structure of the query to check whether the query is executable, but not access to the fields. But I'm not sure whether this is possible with current Calcite implementation.

          There is a simple TableFactory implementation in the latest patch that I have attached to SAMZA-483. I used Avro schema like syntax (string and int types in that implementation) in a Calcite JSON model to define the stream's type. And Calcite has support for SQL arrays, maps, sets, records, objects and some notion of JavaRecord, so I believe that we can implement support for Avro schemas (at least partial support).

          Yi Pan (Data Infrastructure), I found two schema registries which support Avro.

          But I am not sure to which one you referred to in above comment. Can you please let me know where I can find more information about Avro schema registry. I can try to implement a Calcite SchemaFactory based on Avro schema registry.

          Show
          milinda Milinda Lakmal Pathirage added a comment - I tried disabling validation in Calcite, but query preparation didn't work without the output from parser going through the validator. So, we need to clarify the possibility of disabling/relaxing validation in Calcite with Julian Hyde . IMO, we shouldn't skip all validations. I think we need to validate the structure of the query to check whether the query is executable, but not access to the fields. But I'm not sure whether this is possible with current Calcite implementation. There is a simple TableFactory implementation in the latest patch that I have attached to SAMZA-483 . I used Avro schema like syntax (string and int types in that implementation) in a Calcite JSON model to define the stream's type. And Calcite has support for SQL arrays, maps, sets, records, objects and some notion of JavaRecord, so I believe that we can implement support for Avro schemas (at least partial support). Yi Pan (Data Infrastructure) , I found two schema registries which support Avro. https://github.com/schema-repo/schema-repo https://github.com/linkedin/camus/tree/master/camus-schema-registry-avro But I am not sure to which one you referred to in above comment. Can you please let me know where I can find more information about Avro schema registry. I can try to implement a Calcite SchemaFactory based on Avro schema registry.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          We had a discussion on some remaining SQL language related issue last Friday and here is my summary:

          1. Support for PARTITION
            1. Samza needs to know PARTITION key and count passed down by the SQL parser/planner
            2. PARTITION key can be added as an extension to SQL in Calcite. If missing, Samza will choose random partition
            3. PARTITION count is a system property and should not be enforced in SQL grammar. There are three cases we need to handle
              1. topic already exists in Kafka. Samza will only need to read it from Kafka metadata.
              2. topic does not exist and we allow auto-creation of topic. Samza will auto-create the topic w/ default partition count
              3. topic does not exist and auto-creation is not allowed. It will require the user to perform an admin op to create the topics first. Then, Samza can get it the PARTITION count from Kafka
          2. Schema and Metadata support
            1. Schema definition and DDL
              1. We have decided that metadata registry to store schema definition from DDL is optional. The impact is whether we can do a compile time validation or runtime validation: compile time validation is possible when schema metadata is supplied.
              2. Two examples: with Avro schema registry, we can implement an schema metadata interface s.t. Calcite validation module can be applied to perform compile time validation; while with JSON, the validation would be skipped and we opt to get runtime validation errors.
            2. Tuple schema
              1. If we defines tuple schema in a stream, should we support multiple schemas in a single stream? There seems to be possible use non-SQL cases for multiple schemas in a single stream, e.g. a split a stream to multiple according to different schema. It seems to be reasonable to ask the Samza physical operator to support multiple schemas in a single stream (i.e. schema is associated w/ tuple) while no SQL language support is needed. The feature can potentially used by other DSL languages that may implement m-schemas in a single stream.
          3. Window syntax and semantics
            1. How much syntax support we need from SQL language? I opened a ticket to track that: SAMZA-551
            2. Tuple vs Time based window. I opened a ticket to track that as well: SAMZA-552
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - We had a discussion on some remaining SQL language related issue last Friday and here is my summary: Support for PARTITION Samza needs to know PARTITION key and count passed down by the SQL parser/planner PARTITION key can be added as an extension to SQL in Calcite. If missing, Samza will choose random partition PARTITION count is a system property and should not be enforced in SQL grammar. There are three cases we need to handle topic already exists in Kafka. Samza will only need to read it from Kafka metadata. topic does not exist and we allow auto-creation of topic. Samza will auto-create the topic w/ default partition count topic does not exist and auto-creation is not allowed. It will require the user to perform an admin op to create the topics first. Then, Samza can get it the PARTITION count from Kafka Schema and Metadata support Schema definition and DDL We have decided that metadata registry to store schema definition from DDL is optional. The impact is whether we can do a compile time validation or runtime validation: compile time validation is possible when schema metadata is supplied. Two examples: with Avro schema registry, we can implement an schema metadata interface s.t. Calcite validation module can be applied to perform compile time validation; while with JSON, the validation would be skipped and we opt to get runtime validation errors. Tuple schema If we defines tuple schema in a stream, should we support multiple schemas in a single stream? There seems to be possible use non-SQL cases for multiple schemas in a single stream, e.g. a split a stream to multiple according to different schema. It seems to be reasonable to ask the Samza physical operator to support multiple schemas in a single stream (i.e. schema is associated w/ tuple) while no SQL language support is needed. The feature can potentially used by other DSL languages that may implement m-schemas in a single stream. Window syntax and semantics How much syntax support we need from SQL language? I opened a ticket to track that: SAMZA-551 Tuple vs Time based window. I opened a ticket to track that as well: SAMZA-552
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Jay Kreps, just refreshed my memory on the tuple vs time window comparison in http://cs.brown.edu/~ugur/streamsql.pdf

          Key difference is the "tuple-driven" vs "time-driven" distinction. Personally I thought tuple driven is a much closer fit to the underlying Kafka concepts (an ordered stream of tuples).

          I agree from the ordering point of view, tuple-driven is a natural fit w/ Kafka concept. Revisiting the problems presented by the paper, there are mainly the following two issues:

          1. in time-based window, when events happened with the same timestamp, there is no defined ordering.
          2. in tuple-based window, events in the same window (by number of tuples) may actually expand over long physical time and break the "simultaneity" assumption.

          In Samza/Kafka world, we don't need to worry about ordering in a stream if:

          1. ordering in the same stream is always defined by Kafka partitioned ordering
          2. ordering between different input streams is always defined by a consistent MessageSelector among the input streams

          Now, the only issue to be resolved is how to maintain the "simultaneity" semantics among different streams. Here are a few things to consider:

          1. what's the definition of "simultaneity"? Based on the time both events arrives at Samza consumer or based on the actual application timestamp? Ideally it should be later, but it requires the producer to explicitly tag the events, and also requires producers to follow the same wall-clock. Short of that, we can only inject the system timestamp at the ingress point of Samza job, at best.
          2. how to maintain the ordering between the streams? The paper proposed a total order of events in the input streams in a single job: a) order by timestamp of the event to maintain "simultaneity"; b) define a total order for events w/ the same timestamp from all input streams. I believe that we have a way to define the total oder for all input streams in Samza. The tricky part is to maintain the "simultaneity" semantics which requires to sort the events based on timestamp first, then the ordering provided by the Kafka system for "same time events".

          Here is what I am thinking as the first step implementation:

          1. define "simultaneous event" by the timestamp of the ingress Samza consumer. Hence, w/ only one Samza job per query, we don't need to re-sort the received events from Kafka streams since they are already sorted based on "arrival timestamp"

          In the future, the extension to application timestamp could be the following:

          1. Each event will be tagged by ( app-ts, strm-order-no , offset ) and all events from all input streams can be sorted in a single ordered sequence of events.
          2. There will be a termination condition for the last event with a timestamp that is before a certain app_ts that can still be accepted by the Job (i.e. it may or may not be the heart-beat method). Upon closing this window, the ordered sequence of events from all input streams are determined and should be processed.
            1. I have not come up with a good idea of "closing window condition" that can reliably work in a distributed environment yet. Some LinkedIn jobs are using timeout method to close a window, like the call-graph jobs.
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Jay Kreps , just refreshed my memory on the tuple vs time window comparison in http://cs.brown.edu/~ugur/streamsql.pdf Key difference is the "tuple-driven" vs "time-driven" distinction. Personally I thought tuple driven is a much closer fit to the underlying Kafka concepts (an ordered stream of tuples). I agree from the ordering point of view, tuple-driven is a natural fit w/ Kafka concept. Revisiting the problems presented by the paper, there are mainly the following two issues: in time-based window, when events happened with the same timestamp, there is no defined ordering. in tuple-based window, events in the same window (by number of tuples) may actually expand over long physical time and break the "simultaneity" assumption. In Samza/Kafka world, we don't need to worry about ordering in a stream if: ordering in the same stream is always defined by Kafka partitioned ordering ordering between different input streams is always defined by a consistent MessageSelector among the input streams Now, the only issue to be resolved is how to maintain the "simultaneity" semantics among different streams. Here are a few things to consider: what's the definition of "simultaneity"? Based on the time both events arrives at Samza consumer or based on the actual application timestamp? Ideally it should be later, but it requires the producer to explicitly tag the events, and also requires producers to follow the same wall-clock. Short of that, we can only inject the system timestamp at the ingress point of Samza job, at best. how to maintain the ordering between the streams? The paper proposed a total order of events in the input streams in a single job: a) order by timestamp of the event to maintain "simultaneity"; b) define a total order for events w/ the same timestamp from all input streams. I believe that we have a way to define the total oder for all input streams in Samza. The tricky part is to maintain the "simultaneity" semantics which requires to sort the events based on timestamp first, then the ordering provided by the Kafka system for "same time events". Here is what I am thinking as the first step implementation: define "simultaneous event" by the timestamp of the ingress Samza consumer. Hence, w/ only one Samza job per query, we don't need to re-sort the received events from Kafka streams since they are already sorted based on "arrival timestamp" In the future, the extension to application timestamp could be the following: Each event will be tagged by ( app-ts , strm-order-no , offset ) and all events from all input streams can be sorted in a single ordered sequence of events. There will be a termination condition for the last event with a timestamp that is before a certain app_ts that can still be accepted by the Job (i.e. it may or may not be the heart-beat method). Upon closing this window, the ordered sequence of events from all input streams are determined and should be processed. I have not come up with a good idea of "closing window condition" that can reliably work in a distributed environment yet. Some LinkedIn jobs are using timeout method to close a window, like the call-graph jobs.
          Hide
          jkreps Jay Kreps added a comment -

          Read a little bit on the plane. I found a few comparisons of CQL and StreamSQL that were helpful. Key difference is the "tuple-driven" vs "time-driven" distinction. Personally I thought tuple driven is a much closer fit to the underlying Kafka concepts (an ordered stream of tuples).

          Some links:
          Basic overview of CQL:
          http://www.it.uu.se/research/group/udbl/Theses/RobertKajicBSc.pdf
          This paper dives into the tuple/time distinction and proposes a fix:
          http://cs.brown.edu/~ugur/streamsql.pdf

          I also think the heartbeat approach that the CQL people take (http://citeseerx.ist.psu.edu/viewdoc/download;jsessionid=78B3CFA375CA62DD600C8A7705D17FD8?doi=10.1.1.90.1199&rep=rep1&type=pdf) actually doesn't work well in a modern environment which is geographically distributed. Just because the chicago datacenter can't heartbeat to the processing cluster doesn't mean it isn't recording data. I think in practice you have to model the concept of late data directly.

          Show
          jkreps Jay Kreps added a comment - Read a little bit on the plane. I found a few comparisons of CQL and StreamSQL that were helpful. Key difference is the "tuple-driven" vs "time-driven" distinction. Personally I thought tuple driven is a much closer fit to the underlying Kafka concepts (an ordered stream of tuples). Some links: Basic overview of CQL: http://www.it.uu.se/research/group/udbl/Theses/RobertKajicBSc.pdf This paper dives into the tuple/time distinction and proposes a fix: http://cs.brown.edu/~ugur/streamsql.pdf I also think the heartbeat approach that the CQL people take ( http://citeseerx.ist.psu.edu/viewdoc/download;jsessionid=78B3CFA375CA62DD600C8A7705D17FD8?doi=10.1.1.90.1199&rep=rep1&type=pdf ) actually doesn't work well in a modern environment which is geographically distributed. Just because the chicago datacenter can't heartbeat to the processing cluster doesn't mean it isn't recording data. I think in practice you have to model the concept of late data directly.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Posting the proposal from Julian Hyde:
          Julian Hyde julianhyde@gmail.com via samza.apache.org
          1:54 PM (2 hours ago)

          to dev
          Let me propose an alternative approach. The deliverables and the technology stack would be different, but I think we still fulfill the spirit of the proposal, and there are benefits from better interoperability, standards compliance, and building on existing code that already works.

          First, I propose that we implement not a SQL-like language, but standard SQL with streaming extensions. The extensions should be minimal, and should be consistent with the look and feel of the language (e.g. SQL tends to use keywords like OVER rather than punctuation marks like '['). If there is a way to achieve something within standard SQL, we should use it. And any extensions we make should preserve SQL's principles of being a closed language (you can nest structures to arbitrary depth, and you can re-use queries as if they were relations) and of having a sound semantics.

          The language would allow queries involving streams, mixtures of streams and relations, and only relations. A query that only used relations would be 100% standard SQL, and a query that used a stream S would, with luck, be very similar to one that used a table T with the same contents as S.

          Second, I propose that we use Calcite's SQL parser, validator, and logical algebra. We could also use its JDBC driver infrastructure.

          I agree with the consensus that CQL is a good basis for extending relational algebra to streams. The question is, can we shoehorn CQL's algebra extensions into SQL? I believe we can, as follows:
          The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM and RSTREAM are much less important, so can be deferred)
          Streams included in the FROM clause implicitly become relations (but they are “streamable” relations, and the planner will very likely leverage this when finding a viable implementation)
          To use a particular window of a stream, not its entire history, follow it with an OVER clause

          Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”).

          Some examples.

          1. Identity
            SELECT STREAM *
            FROM Orders;
          1. Filter and project
            SELECT STREAM state, quantity
            FROM Orders
            WHERE state = ‘CA’;
          1. Windowed aggregation
            SELECT STREAM product,
            AVG(price) OVER this_week AS avg_price
            FROM Orders
            WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’ DAY PRECEDING);
          1. Aggregation
          2. At the top of each hour, emit totals for each product
            SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT AS c
            FROM Orders
            GROUP BY product, trunc(rowtime to hour)
          1. Relational query on recent history of a stream
            SELECT product, COUNT
            FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING)
            GROUP BY product;

          or alternatively

          SELECT product, COUNT
          FROM Orders
          WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now
          GROUP BY product;

          1. Stream-table join producing a stream
            SELECT STREAM *
            FROM Orders AS o
            JOIN Products AS p ON o.productId = p.productId
            WHERE o.price < p.list_price / 2;
          1. Stream-stream join producing a stream
            SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime
            FROM Orders AS o
            JOIN Shipments AS s
            ON o.orderId = s.orderId
            AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR
            ORDER BY o.rowtime;
          1. Union
            SELECT STREAM rowtime, customerId FROM Orders
            UNION ALL
            SELECT STREAM rowtime, customerId FROM SupportCalls;

          Note that, as in standard SQL, windowed aggregation emits the same number of rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation and GROUP BY examples both leverage the fact that “rowtime” is sorted (and “trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the system would not allow the queries.

          Calcite already has a SQL parser, validator, metadata SPI (that you can use to declare what schemas, tables, streams, columns are available), and a logical algebra. The logical algebra consists of TableScan, Filter, Project, Union, Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to define transformation rules that combine operator calls to produce semantically equivalent operator calls, and has an engine that applies lots of transformation rules, optionally guided by a cost model.

          I am working on a prototype of Calcite that adds streaming [ https://github.com/julianhyde/incubator-calcite/tree/chi <https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new operators are needed: Delta (converts a relation to a stream) and Chi (converts a stream to a relation). And a few rules, such as one that maps Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to transform the logical algebra into something that could be implemented. My prototype can parse

          SELECT STREAM * From Orders

          convert it to

          LogicalDelta
          LogicalProject(id=[$0], product=[$1], quantity=[$2])
          EnumerableTableScan(table=[[STREAMS, ORDERS]])

          and simplify to

          EnumerableStreamScan(table=[[STREAMS, ORDERS]])

          The next step would be to write rules to convert EnumerableStreamScan (and several other operators) to physical algebra operators SamzaStreamScan etc.

          In summary, I think this approach should be given serious consideration. An extended standard SQL is much more useful than a SQL-like language, and I believe I have shown that we can add the necessary extensions to SQL without destroying it.

          Building a SQL parser, validator, relational algebra, JDBC driver and planning framework is a massive amount of work and 90% of the functionality is identical in a streaming and non-streaming system.

          Lastly, building a stack based on extended standard SQL does not preclude adding other high-level languages on top of the algebra at a later date.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Posting the proposal from Julian Hyde : Julian Hyde julianhyde@gmail.com via samza.apache.org 1:54 PM (2 hours ago) to dev Let me propose an alternative approach. The deliverables and the technology stack would be different, but I think we still fulfill the spirit of the proposal, and there are benefits from better interoperability, standards compliance, and building on existing code that already works. First, I propose that we implement not a SQL-like language, but standard SQL with streaming extensions. The extensions should be minimal, and should be consistent with the look and feel of the language (e.g. SQL tends to use keywords like OVER rather than punctuation marks like '['). If there is a way to achieve something within standard SQL, we should use it. And any extensions we make should preserve SQL's principles of being a closed language (you can nest structures to arbitrary depth, and you can re-use queries as if they were relations) and of having a sound semantics. The language would allow queries involving streams, mixtures of streams and relations, and only relations. A query that only used relations would be 100% standard SQL, and a query that used a stream S would, with luck, be very similar to one that used a table T with the same contents as S. Second, I propose that we use Calcite's SQL parser, validator, and logical algebra. We could also use its JDBC driver infrastructure. I agree with the consensus that CQL is a good basis for extending relational algebra to streams. The question is, can we shoehorn CQL's algebra extensions into SQL? I believe we can, as follows: The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM and RSTREAM are much less important, so can be deferred) Streams included in the FROM clause implicitly become relations (but they are “streamable” relations, and the planner will very likely leverage this when finding a viable implementation) To use a particular window of a stream, not its entire history, follow it with an OVER clause Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”). Some examples. Identity SELECT STREAM * FROM Orders; Filter and project SELECT STREAM state, quantity FROM Orders WHERE state = ‘CA’; Windowed aggregation SELECT STREAM product, AVG(price) OVER this_week AS avg_price FROM Orders WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’ DAY PRECEDING); Aggregation At the top of each hour, emit totals for each product SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT AS c FROM Orders GROUP BY product, trunc(rowtime to hour) Relational query on recent history of a stream SELECT product, COUNT FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING) GROUP BY product; or alternatively SELECT product, COUNT FROM Orders WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now GROUP BY product; Stream-table join producing a stream SELECT STREAM * FROM Orders AS o JOIN Products AS p ON o.productId = p.productId WHERE o.price < p.list_price / 2; Stream-stream join producing a stream SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime FROM Orders AS o JOIN Shipments AS s ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR ORDER BY o.rowtime; Union SELECT STREAM rowtime, customerId FROM Orders UNION ALL SELECT STREAM rowtime, customerId FROM SupportCalls; Note that, as in standard SQL, windowed aggregation emits the same number of rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation and GROUP BY examples both leverage the fact that “rowtime” is sorted (and “trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the system would not allow the queries. Calcite already has a SQL parser, validator, metadata SPI (that you can use to declare what schemas, tables, streams, columns are available), and a logical algebra. The logical algebra consists of TableScan, Filter, Project, Union, Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to define transformation rules that combine operator calls to produce semantically equivalent operator calls, and has an engine that applies lots of transformation rules, optionally guided by a cost model. I am working on a prototype of Calcite that adds streaming [ https://github.com/julianhyde/incubator-calcite/tree/chi < https://github.com/julianhyde/incubator-calcite/tree/chi > ]. Just two new operators are needed: Delta (converts a relation to a stream) and Chi (converts a stream to a relation). And a few rules, such as one that maps Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to transform the logical algebra into something that could be implemented. My prototype can parse SELECT STREAM * From Orders convert it to LogicalDelta LogicalProject(id= [$0] , product= [$1] , quantity= [$2] ) EnumerableTableScan(table=[ [STREAMS, ORDERS] ]) and simplify to EnumerableStreamScan(table=[ [STREAMS, ORDERS] ]) The next step would be to write rules to convert EnumerableStreamScan (and several other operators) to physical algebra operators SamzaStreamScan etc. In summary, I think this approach should be given serious consideration. An extended standard SQL is much more useful than a SQL-like language, and I believe I have shown that we can add the necessary extensions to SQL without destroying it. Building a SQL parser, validator, relational algebra, JDBC driver and planning framework is a massive amount of work and 90% of the functionality is identical in a streaming and non-streaming system. Lastly, building a stack based on extended standard SQL does not preclude adding other high-level languages on top of the algebra at a later date.
          Show
          bernhardttom Thomas Bernhardt added a comment - Full Esper reference is http://esper.codehaus.org/esper-5.1.0/doc/reference/en-US/html_single/index.html
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Milinda Lakmal Pathirage, sorry I forgot to put the draft SQL docs online. I am attaching it here s.t. you can comment/edit on it.

          Thanks!

          -Yi

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Milinda Lakmal Pathirage , sorry I forgot to put the draft SQL docs online. I am attaching it here s.t. you can comment/edit on it. Thanks! -Yi
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Yi Pan (Data Infrastructure), Chris Riccomini How about we start working on documenting the language and the design if you guys haven't started it already. I can help with this.

          Show
          milinda Milinda Lakmal Pathirage added a comment - Yi Pan (Data Infrastructure) , Chris Riccomini How about we start working on documenting the language and the design if you guys haven't started it already. I can help with this.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          My notes on Spark StreamSQL after a quick check on: https://issues.apache.org/jira/secure/attachment/12637803/StreamSQLDesignDoc.pdf

          1. Spark Stream SQL adopts SQLstream's syntax. Some of the extension on stream operators in SQLstream are not as SQL-ish like StreamSQL syntax, and it is claimed to do it "deliberately" in the online doc.
          2. It seems that the Spark StreamSQL does not have a time-window syntax implemented yet. From SPARK-1363, time-window syntax is planned for phase two.
          3. It is not clear to me how the windowing technique works across the RDD boundaries in DStream.Mini-batches of RDD are not exactly the same as a continuous stream w/ the atomic unit of computation as a single tuple from the stream.
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - My notes on Spark StreamSQL after a quick check on: https://issues.apache.org/jira/secure/attachment/12637803/StreamSQLDesignDoc.pdf Spark Stream SQL adopts SQLstream 's syntax. Some of the extension on stream operators in SQLstream are not as SQL-ish like StreamSQL syntax, and it is claimed to do it "deliberately" in the online doc. It seems that the Spark StreamSQL does not have a time-window syntax implemented yet. From SPARK-1363 , time-window syntax is planned for phase two. It is not clear to me how the windowing technique works across the RDD boundaries in DStream.Mini-batches of RDD are not exactly the same as a continuous stream w/ the atomic unit of computation as a single tuple from the stream.
          Hide
          criccomini Chris Riccomini added a comment -

          SPARK-1363 documents Spark's approach. It includes a design doc.

          Show
          criccomini Chris Riccomini added a comment - SPARK-1363 documents Spark's approach. It includes a design doc.
          Hide
          martinkl Martin Kleppmann added a comment -

          To throw another idea into the mix, a dataflow language called Juttle was announced yesterday: http://www.jut.io/blog/2014/welcome-to-jut-devops-analytics-platform — I've only looked at it briefly, but I like what I've seen so far. It looks more like a shell script than SQL (IMHO that's a good thing, but it's definitely debatable).

          Show
          martinkl Martin Kleppmann added a comment - To throw another idea into the mix, a dataflow language called Juttle was announced yesterday: http://www.jut.io/blog/2014/welcome-to-jut-devops-analytics-platform — I've only looked at it briefly, but I like what I've seen so far. It looks more like a shell script than SQL (IMHO that's a good thing, but it's definitely debatable).
          Hide
          criccomini Chris Riccomini added a comment -

          Yi Pan (Data Infrastructure), I agree with pretty much everything you've described re: StreamSQL. Here are my notes:

          1. StreamSQL grammar reference is here. SELECT docs are here.
          2. Grammar supports SELECT ... INTO, which seems very desirable. With Samza, this would allow you to SELECT, and have the results go into a second stream. This translates nicely, since it allows us to express output partition keys: `SELECT foo, bar FROM baz INTO stream2 PARTITION BY foo;` Note that the PARTITION BY syntax is something that I just made up, but seems to fit well. Also supports ERROR INTO, which can be used to shunt failed rows into another stream (e.g. serialization errors).
          3. Oddly, they also have CREATE STREAM AS syntax (in addition to INTO syntax).
          4. Noticing that no grammars seem to have an ORDER BY. StreamSQL seems to be the first one that I've come across with this operator. Seems useful.
          5. Grammar borrows range syntax from CQL: `FROM TicksWithTime [SIZE 1 ON LocalTime PARTITION BY FeedName]` The expressiveness of this syntax seems pretty powerful. In this case, LocalTime are second values, and SIZE 1 means window just 1 second. It's unclear exactly what PARTITION BY does, since there is also a GROUP BY in the statement. I wonder if this defines the partitioning of the input stream (and repartitions if the stream is not physically partitioned this way already). Here's another one that is a sliding 1s window over 20 seconds: `[SIZE 20 ADVANCE 1 ON StartOfTimeSlice PARTITION BY FeedName]`
          6. Highly recommend having a look at the tutorial.
          7. Stream SQL brings up the concept of hierarchical data, which is also useful since we support arbitrary serdes in Samza, which might include hierarchical JSON/AVRO/Protobuf schemas. See the wild card page for a nice abstraction on how to expand a hierarchical field into a flat set of field names.
          8. Has the concept of a metronome/heartbeat, so tuples can be injected into the stream based on some fixed interval.

          Overall StreamSQL seems to be a candidate that is very close to what we want, in terms of SQL syntax (maybe more than needed)

          Totally. Maybe we can project down to just what we need as an MVP?

          Note that I am not a big fan of a strong-typed schema here, since field names may be added or deleted over time.

          Yea, the DDL stuff seems a bit strange to me. To fully support DDL, we'd need some metadata repository. A random straw man idea would be to have no DDL and just allow developers to say `SELECT foo, bar FROM baz;`. If foo and bar exist in incoming messages, then the query works. If bar doesn't exist for a row, then the query would fail. Two ideas for getting around the failing query would be to 1) ERROR INTO a new stream 2) provide a DEFAULT command (e.g. SELECT foo, bar DEFAULT NULL FROM baz).

          It has a heart beat build in the SQL syntax. It might be useful if we allow user to inject timer tokens.

          The more I look at these grammars, the more I realize how dependent they are on strong timing. I think this is going to be one of the major challenges. MillWheel's approach to time seems the most viable, but it's annoying that you have to lose data to do it (they claim exactly once messaging, but that's only for messages that don't get dropped due to lag, which they claim is ~0.001%).

          Milinda Lakmal Pathirage, I think most of what Yi Pan and I are discussing tends to agree with your last comment, WRT partitioning, joins, windows, etc. Also, regarding your link on distributed CQL, this is very interesting. I think the most notable thing I got from their write-up was the need for an intermediate layer, which we discussed supporting via an intermediate relational layer. It very much matches what their presentation/write-up describe, except that they use IBM's System S as the runtime instead of Samza, and they refer to the relational layer as the IL (intermediate layer).

          Show
          criccomini Chris Riccomini added a comment - Yi Pan (Data Infrastructure) , I agree with pretty much everything you've described re: StreamSQL. Here are my notes: StreamSQL grammar reference is here . SELECT docs are here . Grammar supports SELECT ... INTO, which seems very desirable. With Samza, this would allow you to SELECT, and have the results go into a second stream. This translates nicely, since it allows us to express output partition keys: `SELECT foo, bar FROM baz INTO stream2 PARTITION BY foo;` Note that the PARTITION BY syntax is something that I just made up, but seems to fit well. Also supports ERROR INTO, which can be used to shunt failed rows into another stream (e.g. serialization errors). Oddly, they also have CREATE STREAM AS syntax (in addition to INTO syntax). Noticing that no grammars seem to have an ORDER BY. StreamSQL seems to be the first one that I've come across with this operator. Seems useful. Grammar borrows range syntax from CQL: `FROM TicksWithTime [SIZE 1 ON LocalTime PARTITION BY FeedName] ` The expressiveness of this syntax seems pretty powerful. In this case, LocalTime are second values, and SIZE 1 means window just 1 second. It's unclear exactly what PARTITION BY does, since there is also a GROUP BY in the statement. I wonder if this defines the partitioning of the input stream (and repartitions if the stream is not physically partitioned this way already). Here's another one that is a sliding 1s window over 20 seconds: ` [SIZE 20 ADVANCE 1 ON StartOfTimeSlice PARTITION BY FeedName] ` Highly recommend having a look at the tutorial . Stream SQL brings up the concept of hierarchical data, which is also useful since we support arbitrary serdes in Samza, which might include hierarchical JSON/AVRO/Protobuf schemas. See the wild card page for a nice abstraction on how to expand a hierarchical field into a flat set of field names. Has the concept of a metronome/heartbeat, so tuples can be injected into the stream based on some fixed interval. Overall StreamSQL seems to be a candidate that is very close to what we want, in terms of SQL syntax (maybe more than needed) Totally. Maybe we can project down to just what we need as an MVP? Note that I am not a big fan of a strong-typed schema here, since field names may be added or deleted over time. Yea, the DDL stuff seems a bit strange to me. To fully support DDL, we'd need some metadata repository. A random straw man idea would be to have no DDL and just allow developers to say `SELECT foo, bar FROM baz;`. If foo and bar exist in incoming messages, then the query works. If bar doesn't exist for a row, then the query would fail. Two ideas for getting around the failing query would be to 1) ERROR INTO a new stream 2) provide a DEFAULT command (e.g. SELECT foo, bar DEFAULT NULL FROM baz). It has a heart beat build in the SQL syntax. It might be useful if we allow user to inject timer tokens. The more I look at these grammars, the more I realize how dependent they are on strong timing. I think this is going to be one of the major challenges. MillWheel's approach to time seems the most viable, but it's annoying that you have to lose data to do it (they claim exactly once messaging, but that's only for messages that don't get dropped due to lag, which they claim is ~0.001%). Milinda Lakmal Pathirage , I think most of what Yi Pan and I are discussing tends to agree with your last comment, WRT partitioning, joins, windows, etc. Also, regarding your link on distributed CQL, this is very interesting. I think the most notable thing I got from their write-up was the need for an intermediate layer, which we discussed supporting via an intermediate relational layer. It very much matches what their presentation/write-up describe, except that they use IBM's System S as the runtime instead of Samza, and they refer to the relational layer as the IL (intermediate layer).
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          My notes on StreamSQL:

          1. Defined a rich DDL to
            1. define schemas and allow hierarchical inheritance of schemas
            2. define input/output stream and can associate a schema to the input stream (why not output stream?). I don’t quite see how the WITH PARAMETERS is used. Also, it seems a bit redundant with another CREATE STREAM as a general definition of input and output stream. Partitions and parallel sub-streams?
            3. it has a very interesting concept of “error streams” that is sort of an escape channel for all unhandled exceptions in the system. It could be very useful for debugging in a distributed system, i.e. if we have a Kafka topic as a distributed system error log, it can be used for this purpose.
            4. Note that I am not a big fan of a strong-typed schema here, since field names may be added or deleted over time. In a distributed system, updating the software throughout the system needs time and the data in the states in various different components also takes time to be updated (i.e. messages with old schema may co-exist with messages with new schema in Kafka topic). A DDL to specify a data model should still be useful. But the schema itself should only contains the minimum absolutely required fields and leave all other fields as optional (may or may not be there)
            5. DDL in StreamSQL also defines window specification, which is a convenient tool when the same window specification is used again and again. However, it may not be necessary as first-class citizen in our implementation, since the window specification should be simple and short in our first version. What I like about is that the window specification seems very comprehensive:
              SIZE size ADVANCE increment {TIME | TUPLES | ON field_identifier_w PREDICATE OPEN ON open_expr CLOSE ON close_expr EMIT ON emit_expr}

              [PARTIAL

              {TRUE | FALSE}

              ]
              [PARTITION BY field_identifier_p[,...]]
              [VALID ALWAYS]
              [OFFSET offset]
              [TIMEOUT timeout]

            6. I don’t see a big use case for the materialized window definition, and StreamBase also plan to deprecate it in the future.
          2. StreamBase also introduce lock concept to control the flow of streams: when locked, buffer the tuples; release the buffered tuples on unlock. I don’t see a big use case on that, except for dealing the case of arrival of tuples with out-of-order timestamps.
          3. Merge seems to be a commonly used operator to combine two streams w/ the same schema together. We can use the same to mark up the merge of two or more partitions of the same stream, s.t. FROM <stream> [MERGE [ALL | EVERY <m_partitions>]]
          4. StreamSQL has defined a HEARTBEAT mechanism to add timer tuples on the same stream with the data tuples. This would be useful to perform synchronization in a distributed system like SAMZA. I would think that SAMZA as a system should provide a HEARTBEAT token by default, and this SQL extension can be optional to provide some control on specific query output streams to the user.
          5. APPLY has a PARALLEL <num_instances> BY <int_field> to specify parallelism of modules running to work on the same input stream. This is necessary if the default model is sequential execution of tasks on a stream. My take on this is: SAMZA should default to a distributed parallel model s.t. by default, there should be as many tasks running on all partitions in any input stream. Hence, no need to introduce this statement. We do need some way of specifying the number of tasks and distribution of partitions in all input streams to the tasks. i.e. if input streams to a SQL query has N1, N2 partitions, how many tasks do we need and how do we distribute and match the partitions to the tasks? Besides, if joining two streams, what if the join key is not the key used for partition? How do we get the result through merge?

          Overall StreamSQL seems to be a candidate that is very close to what we want, in terms of SQL syntax (maybe more than needed). I like the following points:

          1. It has a DDL to define table(i.e. relation) and stream separately
          2. The grammar seems to be very matured and follow SQL standard
          3. It has a rich window specification
          4. It has a heart beat build in the SQL syntax. It might be useful if we allow user to inject timer tokens.

          The biggest gap IMO is:

          1. Lack of a convenient way of specify the parallelism in distributing the partitions of input streams. The APPLY clause seems too convoluted. We need a simple and clear way to specify the parallel tasks that can take M streams each with a (potentially) different number of partitions following the FROM clause.
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited My notes on StreamSQL : Defined a rich DDL to define schemas and allow hierarchical inheritance of schemas define input/output stream and can associate a schema to the input stream (why not output stream?). I don’t quite see how the WITH PARAMETERS is used. Also, it seems a bit redundant with another CREATE STREAM as a general definition of input and output stream. Partitions and parallel sub-streams? it has a very interesting concept of “error streams” that is sort of an escape channel for all unhandled exceptions in the system. It could be very useful for debugging in a distributed system, i.e. if we have a Kafka topic as a distributed system error log, it can be used for this purpose. Note that I am not a big fan of a strong-typed schema here, since field names may be added or deleted over time. In a distributed system, updating the software throughout the system needs time and the data in the states in various different components also takes time to be updated (i.e. messages with old schema may co-exist with messages with new schema in Kafka topic). A DDL to specify a data model should still be useful. But the schema itself should only contains the minimum absolutely required fields and leave all other fields as optional (may or may not be there) DDL in StreamSQL also defines window specification, which is a convenient tool when the same window specification is used again and again. However, it may not be necessary as first-class citizen in our implementation, since the window specification should be simple and short in our first version. What I like about is that the window specification seems very comprehensive: SIZE size ADVANCE increment {TIME | TUPLES | ON field_identifier_w PREDICATE OPEN ON open_expr CLOSE ON close_expr EMIT ON emit_expr} [PARTIAL {TRUE | FALSE} ] [PARTITION BY field_identifier_p [,...] ] [VALID ALWAYS] [OFFSET offset] [TIMEOUT timeout] I don’t see a big use case for the materialized window definition, and StreamBase also plan to deprecate it in the future. StreamBase also introduce lock concept to control the flow of streams: when locked, buffer the tuples; release the buffered tuples on unlock. I don’t see a big use case on that, except for dealing the case of arrival of tuples with out-of-order timestamps. Merge seems to be a commonly used operator to combine two streams w/ the same schema together. We can use the same to mark up the merge of two or more partitions of the same stream, s.t. FROM <stream> [MERGE [ALL | EVERY <m_partitions>] ] StreamSQL has defined a HEARTBEAT mechanism to add timer tuples on the same stream with the data tuples. This would be useful to perform synchronization in a distributed system like SAMZA. I would think that SAMZA as a system should provide a HEARTBEAT token by default, and this SQL extension can be optional to provide some control on specific query output streams to the user. APPLY has a PARALLEL <num_instances> BY <int_field> to specify parallelism of modules running to work on the same input stream. This is necessary if the default model is sequential execution of tasks on a stream. My take on this is: SAMZA should default to a distributed parallel model s.t. by default, there should be as many tasks running on all partitions in any input stream. Hence, no need to introduce this statement. We do need some way of specifying the number of tasks and distribution of partitions in all input streams to the tasks. i.e. if input streams to a SQL query has N1, N2 partitions, how many tasks do we need and how do we distribute and match the partitions to the tasks? Besides, if joining two streams, what if the join key is not the key used for partition? How do we get the result through merge? Overall StreamSQL seems to be a candidate that is very close to what we want, in terms of SQL syntax (maybe more than needed). I like the following points: It has a DDL to define table(i.e. relation) and stream separately The grammar seems to be very matured and follow SQL standard It has a rich window specification It has a heart beat build in the SQL syntax. It might be useful if we allow user to inject timer tokens. The biggest gap IMO is: Lack of a convenient way of specify the parallelism in distributing the partitions of input streams. The APPLY clause seems too convoluted. We need a simple and clear way to specify the parallel tasks that can take M streams each with a (potentially) different number of partitions following the FROM clause.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Chris Riccomini, yes! That's a great point! This does apply to all window operator, given that all tuples in the stream are coming in the order of the "timestamp" used to determine the window operation. The tricky issue on that is: when we have re-paritioning in-between the tasks, the original order on the timestamp field may be lost.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Chris Riccomini , yes! That's a great point! This does apply to all window operator, given that all tuples in the stream are coming in the order of the "timestamp" used to determine the window operation. The tricky issue on that is: when we have re-paritioning in-between the tasks, the original order on the timestamp field may be lost.
          Hide
          criccomini Chris Riccomini added a comment -

          Yi Pan, regarding this comment:

          if a query result has a temporal field as one of its GROUP By or JOIN keys, and the incoming messages can be expected to flow in-order-of the timestamps, we can perform incremental computation of query results.

          Isn't this true with any window operation? If we are doing a windowed join, and a new row comes in, we can look at the buffer for the other incoming stream, and do the join immediately, and output right away, correct? I believe the same holds true for something like SELECT member_id, count FROM PageViews WINDOW(30 seconds) GROUP BY member_id. It seems to me that any windowing implicitly means that it's part of the GROUP BY, and therefore all window operations can be done incrementally.

          Show
          criccomini Chris Riccomini added a comment - Yi Pan , regarding this comment: if a query result has a temporal field as one of its GROUP By or JOIN keys, and the incoming messages can be expected to flow in-order-of the timestamps, we can perform incremental computation of query results. Isn't this true with any window operation? If we are doing a windowed join, and a new row comes in, we can look at the buffer for the other incoming stream, and do the join immediately, and output right away, correct? I believe the same holds true for something like SELECT member_id, count FROM PageViews WINDOW(30 seconds) GROUP BY member_id. It seems to me that any windowing implicitly means that it's part of the GROUP BY, and therefore all window operations can be done incrementally.
          Hide
          criccomini Chris Riccomini added a comment -

          Also found this:

          http://www.sqlstream.com/stream-processing-with-sql/

          And also:

          https://github.com/epfldata/squall

          The latter is a Storm SQL grammar.

          Show
          criccomini Chris Riccomini added a comment - Also found this: http://www.sqlstream.com/stream-processing-with-sql/ And also: https://github.com/epfldata/squall The latter is a Storm SQL grammar.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          My notes on Esper

          1. SELECT has istream|rstream|irstream operators to identify the type of output stream
            1. istream as default? We may want it to be explicit, since there is a chance that our output is to a relation (i.e. a remote DB). We could potentially use INSERT INTO for that purpose
          2. window operator is attached to the stream source, e.g. streamA.win:time(30) and does not seem to use application timestamp from the streams. All timestamp seems to be the local system time
          3. complicated stream operators
            1. introduced pattern / filter on stream which applies as filter on a stream (explicit optimization on stream). I don't think that we need it in the initial draft.
            2. introduced a combined window view operator. I am not quite a fan of that, since it makes the windowing operation more complicated than necessary. IMO, the windowing operation should just be doing the stream-to-relation conversion, not other functions.
          4. tightly coupled with programming language (i.e. some JAVA-flavor in the syntax). I prefer more high level language that is agnostic to the implementation.
          5. hierarchical aggregation keywords in Group By. Not sure how useful it is w/ the 80% use cases in SQL?
          6. defined output and order by: allow to specify output intervals and re-order to the output stream. This maybe useful when we process streams that contains out-of-order tuples.
          7. defined complex multi-row and multi-column selections: http://esper.codehaus.org/esper-5.1.0/doc/reference/en-US/html/epl_clauses.html#epl-subqueries-multicolumn
            1. however, the two examples of sub-queries are both some kind of JOIN between two streams and could be re-written with an aggregation function of assemble() and Group By to create some nested properties in the final query result. It seems more intuitive to me. Chris Riccomini mentioned AGG/JOIN/EXPLODE method, which seems more intuitive than the examples here.
          8. define an abstract to use sql:myDB(“select…from…”) as a source of relation, easy to plugin external DBs as we needed
          9. define UDF to access non-relational data as relation: e.g. select * from StreamA, ufd.getData(“my-NRD”). Esper defines a specific function for each udf to return the metadata of the returned relation, and another function to actually evaluate the values of the rows. e.g. udf.getMeta(“my-NRD”) and udf.getData(“my-NRD”)
          10. Create Schema is used to define event data schema: I am thinking of a semi-schema model that only specify fields that can not be null. For other fields, all can be optional.
          11. defines a method to split the output and define the input partitions
            1. e.g. on event_type insert into <insert_into_def> select … where … insert into <insert_into_def> is used to split the output stream
            2. Esper introduce a concept of Context on stream and Partition is used to create context. Then, all queries are within a context. e.g. create context SegmentedByCustomer partition by custId from BankTxn.
            3. To me, the definition of context is too complex. It would be easier to split the output using PARTITION By keyA TO n_parts in the SELECT result. The tasks need to combine multiple partitions can be done by FROM <stream> (ALL|<n>) PARTITIONS.
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - My notes on Esper SELECT has istream|rstream|irstream operators to identify the type of output stream istream as default? We may want it to be explicit, since there is a chance that our output is to a relation (i.e. a remote DB). We could potentially use INSERT INTO for that purpose window operator is attached to the stream source, e.g. streamA.win:time(30) and does not seem to use application timestamp from the streams. All timestamp seems to be the local system time complicated stream operators introduced pattern / filter on stream which applies as filter on a stream (explicit optimization on stream). I don't think that we need it in the initial draft. introduced a combined window view operator. I am not quite a fan of that, since it makes the windowing operation more complicated than necessary. IMO, the windowing operation should just be doing the stream-to-relation conversion, not other functions. tightly coupled with programming language (i.e. some JAVA-flavor in the syntax). I prefer more high level language that is agnostic to the implementation. hierarchical aggregation keywords in Group By. Not sure how useful it is w/ the 80% use cases in SQL? defined output and order by: allow to specify output intervals and re-order to the output stream. This maybe useful when we process streams that contains out-of-order tuples. defined complex multi-row and multi-column selections: http://esper.codehaus.org/esper-5.1.0/doc/reference/en-US/html/epl_clauses.html#epl-subqueries-multicolumn however, the two examples of sub-queries are both some kind of JOIN between two streams and could be re-written with an aggregation function of assemble() and Group By to create some nested properties in the final query result. It seems more intuitive to me. Chris Riccomini mentioned AGG/JOIN/EXPLODE method, which seems more intuitive than the examples here. define an abstract to use sql:myDB(“select…from…”) as a source of relation, easy to plugin external DBs as we needed define UDF to access non-relational data as relation: e.g. select * from StreamA, ufd.getData(“my-NRD”). Esper defines a specific function for each udf to return the metadata of the returned relation, and another function to actually evaluate the values of the rows. e.g. udf.getMeta(“my-NRD”) and udf.getData(“my-NRD”) Create Schema is used to define event data schema: I am thinking of a semi-schema model that only specify fields that can not be null. For other fields, all can be optional. defines a method to split the output and define the input partitions e.g. on event_type insert into <insert_into_def> select … where … insert into <insert_into_def> is used to split the output stream Esper introduce a concept of Context on stream and Partition is used to create context. Then, all queries are within a context. e.g. create context SegmentedByCustomer partition by custId from BankTxn. To me, the definition of context is too complex. It would be easier to split the output using PARTITION By keyA TO n_parts in the SELECT result. The tasks need to combine multiple partitions can be done by FROM <stream> (ALL|<n>) PARTITIONS.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          In addition to Chris Riccomini's comments, here are a few more addition notes for tigon.io from my side:

          1. concept of “temporal field”
          2. has a FILTER_JOIN to allow joining two streams within a time duration, which mimic a window in JOIN
          3. GROUP BY on temporal field means group is closed and can be computed incrementally
          4. has an output_spec.cfg to allow configuration of partition fields and n_partitions. Don't quite like this. I would like to see it can be specified via grammar

          So far, my only take-away from tigon is:

          1. if a query result has a temporal field as one of its GROUP By or JOIN keys, and the incoming messages can be expected to flow in-order-of the timestamps, we can perform incremental computation of query results.
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - In addition to Chris Riccomini 's comments, here are a few more addition notes for tigon.io from my side: concept of “temporal field” has a FILTER_JOIN to allow joining two streams within a time duration, which mimic a window in JOIN GROUP BY on temporal field means group is closed and can be computed incrementally has an output_spec.cfg to allow configuration of partition fields and n_partitions. Don't quite like this. I would like to see it can be specified via grammar So far, my only take-away from tigon is: if a query result has a temporal field as one of its GROUP By or JOIN keys, and the incoming messages can be expected to flow in-order-of the timestamps, we can perform incremental computation of query results.
          Hide
          criccomini Chris Riccomini added a comment -

          Notes on Tigon:

          1. Tigon SQL reference manual is here.
          2. Data model is only slightly more sophisticated than Azure's. Unsigned ints and longs, floats, booleans, and strings.
          3. Tigon SQL has a bunch of odd network-related operators and data types for bit shifting, IPv4, etc. This is because it's written by AT&T research, and the first use case was network packet analysis.
          4. There doesn't seem to be a WINDOW operator in Tigon SQL. Instead, windows are denoted through WHERE clauses. So,a 5 minute window for a join would be written as `FROM a JOIN b WHERE ABS(a.timestamp_seconds - b.timstamp_seconds) < 300000;` There seem to be some restrictions on this, but abstractly, I think this is how it works. This is interesting.
          5. As far as I can tell, Tigon seems to rely on exact ordering of messages (timestamps never go backwards).
          6. Tigon SQL supports only 2-way joins.
          7. There is a CUBE operation, which allows you to execute a group by against multiple aggregations within a single query: `SELECT sourceIP, destIP, count FROM eth0.IPV4 T GROUP BY T.time/60 as tb, Cube(sourceIP, destIP);` This seems kind of interesting. It sounds like it's mostly for efficiency.
          8. Lack of a WINDOW in GSQL's GROUP BY actually works fairly intuitively. `SELECT sourceIP, tb, count, max(offset) FROM eth0.IPV4 T WHERE T.protocol=1 GROUP BY T.sourceIP, T.time/60 as tb` seems to make a lot of sense to me. GSQL just rolls over the window every time T.time/60 changes. Again, this relies on strongly ordered message arrival.
          9. In general, I find GSQL to be somewhat strange. Some of the operators such as CLOSING_WHEN, CLEANING_BY, etc feel a bit hacky. Similar to CQL, it introduces enough new concepts to make it diverge from standard SQL in a way that breaks with my expectations (as opposed to Azure streaming analytics, which feels very natural and SQL-ish).
          Show
          criccomini Chris Riccomini added a comment - Notes on Tigon: Tigon SQL reference manual is here . Data model is only slightly more sophisticated than Azure's. Unsigned ints and longs, floats, booleans, and strings. Tigon SQL has a bunch of odd network-related operators and data types for bit shifting, IPv4, etc. This is because it's written by AT&T research, and the first use case was network packet analysis. There doesn't seem to be a WINDOW operator in Tigon SQL. Instead, windows are denoted through WHERE clauses. So,a 5 minute window for a join would be written as `FROM a JOIN b WHERE ABS(a.timestamp_seconds - b.timstamp_seconds) < 300000;` There seem to be some restrictions on this, but abstractly, I think this is how it works. This is interesting. As far as I can tell, Tigon seems to rely on exact ordering of messages (timestamps never go backwards). Tigon SQL supports only 2-way joins. There is a CUBE operation, which allows you to execute a group by against multiple aggregations within a single query: `SELECT sourceIP, destIP, count FROM eth0.IPV4 T GROUP BY T.time/60 as tb, Cube(sourceIP, destIP);` This seems kind of interesting. It sounds like it's mostly for efficiency. Lack of a WINDOW in GSQL's GROUP BY actually works fairly intuitively. `SELECT sourceIP, tb, count , max(offset) FROM eth0.IPV4 T WHERE T.protocol=1 GROUP BY T.sourceIP, T.time/60 as tb` seems to make a lot of sense to me. GSQL just rolls over the window every time T.time/60 changes. Again, this relies on strongly ordered message arrival. In general, I find GSQL to be somewhat strange. Some of the operators such as CLOSING_WHEN, CLEANING_BY, etc feel a bit hacky. Similar to CQL, it introduces enough new concepts to make it diverge from standard SQL in a way that breaks with my expectations (as opposed to Azure streaming analytics, which feels very natural and SQL-ish).
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Thanks Chris Riccomini for the great summary.

          I really like the TIMESTAMP BY clause in Azure query language. This allows us to control how timestamp is extracted at the time of the query. I was thinking of adding this to stream definition in Freshet. But this method is better than adding the timestamp column to stream definition. We can do something like following regarding the default timestamp:

          • We can add system timestamp every time tuple get introduced to Samza. This system timestamp will always be there in a tuple. If TIMESTAMP BY is not there, we can use this default timestamp. This system timestamp may become handy when handling out of order events, etc.

          I am not exactly sure whether partitioning based on JOIN column (Item [9] from Chris Riccomini discussion on Azure Streams) will always work for JOIN scenarios. As I remember, one user described a scenario this will not work in Samza user list. But I think this is okay for the first iteration.

          Other thing is window language described in CQL paper is very limited (for example, [Row 30] or [Range 30 seconds] always means a sliding window which drops oldest elements and no way to specify different sliding parameters or specify tumbling windows), so we need to extend this to suits to our needs as discussed earlier.

          One important this about CQL is the concept of stream to relation and then operating over relation allow us to use most of the SQL construct available without conflicting semantics. For example, I assume NOT IN, ALL like blocking SQL constructs can be used in the context of CQL because we are theoretically operating over a relation (time varying).

          I like the PARTITION BY concept and extensions Yi Pan (Data Infrastructure) proposed. This will give us more control and in case of round-robin like partitioning this will allow us to control how partitioning is done during query time.

          I think we need to two different constructs for data definitions. One for STREAM and other for TABLE. Because we are going to support both streams and tables. Another option is to extend /re-use CREATE TABLE (like in Azure) to support streams.

          Show
          milinda Milinda Lakmal Pathirage added a comment - Thanks Chris Riccomini for the great summary. I really like the TIMESTAMP BY clause in Azure query language. This allows us to control how timestamp is extracted at the time of the query. I was thinking of adding this to stream definition in Freshet. But this method is better than adding the timestamp column to stream definition. We can do something like following regarding the default timestamp: We can add system timestamp every time tuple get introduced to Samza. This system timestamp will always be there in a tuple. If TIMESTAMP BY is not there, we can use this default timestamp. This system timestamp may become handy when handling out of order events, etc. I am not exactly sure whether partitioning based on JOIN column (Item [9] from Chris Riccomini discussion on Azure Streams) will always work for JOIN scenarios. As I remember, one user described a scenario this will not work in Samza user list. But I think this is okay for the first iteration. Other thing is window language described in CQL paper is very limited (for example, [Row 30] or [Range 30 seconds] always means a sliding window which drops oldest elements and no way to specify different sliding parameters or specify tumbling windows), so we need to extend this to suits to our needs as discussed earlier. One important this about CQL is the concept of stream to relation and then operating over relation allow us to use most of the SQL construct available without conflicting semantics. For example, I assume NOT IN, ALL like blocking SQL constructs can be used in the context of CQL because we are theoretically operating over a relation (time varying). I like the PARTITION BY concept and extensions Yi Pan (Data Infrastructure) proposed. This will give us more control and in case of round-robin like partitioning this will allow us to control how partitioning is done during query time. I think we need to two different constructs for data definitions. One for STREAM and other for TABLE. Because we are going to support both streams and tables. Another option is to extend /re-use CREATE TABLE (like in Azure) to support streams.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Chris Riccomini, good summary on Azure. Here are some of my comments:

          I like the TIMESTAMP BY syntax in Azure. It seems more flexible than a rigid timestamp field enforced in the data model. It also means a single stream can have multiple timestamp fields, rather than having to re-materialize messages every time a new field should be used as the timestamp.

          Good point on the option to have multiple instantiation of the time-sequence out of the same single stream. I would still argue to have a default system injected timestamp if TIMESTMAP BY is not specified. Hence, avoid always relying on an application field that a publisher has to fill.

          Azure's SELECT has an explicit PARTITION BY clause (http://msdn.microsoft.com/en-us/library/dn835022.aspx).

          Yes, we can borrow this. However, this still misses how many partitions the output of SELECT should be spread across. I would like to add that as an option, something like PARTITION BY <col_name> TO <num_partitions>.

          It's interesting that you can't SELECT * in a join (http://msdn.microsoft.com/en-us/library/dn835026.aspx). I haven't thought about why.

          There is also a very interesting point that Azure made on the join: every join is time-bounded, which is inline with what we have discussed above.

          I totally agree w/ the point that Azure should have some ways of indicating a stream or a relation (i.e. table). In addition, Azure seems to be only implementing RStream output, from the few examples and experiments I did.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Chris Riccomini , good summary on Azure. Here are some of my comments: I like the TIMESTAMP BY syntax in Azure. It seems more flexible than a rigid timestamp field enforced in the data model. It also means a single stream can have multiple timestamp fields, rather than having to re-materialize messages every time a new field should be used as the timestamp. Good point on the option to have multiple instantiation of the time-sequence out of the same single stream. I would still argue to have a default system injected timestamp if TIMESTMAP BY is not specified. Hence, avoid always relying on an application field that a publisher has to fill. Azure's SELECT has an explicit PARTITION BY clause ( http://msdn.microsoft.com/en-us/library/dn835022.aspx ). Yes, we can borrow this. However, this still misses how many partitions the output of SELECT should be spread across. I would like to add that as an option, something like PARTITION BY <col_name> TO <num_partitions>. It's interesting that you can't SELECT * in a join ( http://msdn.microsoft.com/en-us/library/dn835026.aspx ). I haven't thought about why. There is also a very interesting point that Azure made on the join: every join is time-bounded, which is inline with what we have discussed above. I totally agree w/ the point that Azure should have some ways of indicating a stream or a relation (i.e. table). In addition, Azure seems to be only implementing RStream output, from the few examples and experiments I did.
          Hide
          criccomini Chris Riccomini added a comment -

          Thanks for the write-up Yi Pan (Data Infrastructure). I'll have a look at Tigon.

          There seem to be three main layers:

          • SQL grammar.
          • Relational algebra.
          • Actual implementation of relational operators.

          I agree with you that CQL's most interesting contribution seems to be its stream-relation model. I'm not crazy about its grammar, and it only provides a basic STREAM single-node unreplicated/unpartitioned reference implementation. If we buy that we want to use CQL's relational model, then the next questions I want to look at are:

          1. Can few find a better SQL grammar that still fits the same underlying relational model?
          2. Can we find streaming implementations that are distributed/partitioned, but provide the strong timing guarantees that CQL's relational model requires?

          For (1), I've taken a look at Azure, and will also have a look at Tigon/StreamSQL.

          For (2), MillWheel seems interesting. Will have to dig araound.

          Here are my notes on Azure:

          1. I like the TIMESTAMP BY syntax in Azure. It seems more flexible than a rigid timestamp field enforced in the data model. It also means a single stream can have multiple timestamp fields, rather than having to re-materialize messages every time a new field should be used as the timestamp.
          2. Azure again uses the linear road example. Seems to be standard practice.
          3. Azure's SELECT has an explicit PARTITION BY clause (http://msdn.microsoft.com/en-us/library/dn835022.aspx).
          4. There seems to be a fully defined formal grammar for Azure in their reference docs (http://msdn.microsoft.com/en-us/library/dn835022.aspx).
          5. It's interesting that you can't SELECT * in a join (http://msdn.microsoft.com/en-us/library/dn835026.aspx). I haven't thought about why.
          6. The data type supported seem to be quite primitive (http://msdn.microsoft.com/en-us/library/dn835065.aspx). The grid at the bottom of the page hints that our thinking about having a single data model and translating to/from underlying Serdes (SAMZA-484) is might be a reasonable approach. They outline how their data types are converted to/from Avro, JSON, etc.
          7. This page describes how Azure's partitioning works.
          8. The parallelism model for Azure is manual. You have to manually define how many input stream partitions and "streaming units" (somewhat like containers) you want.
          9. Like Samza, Azure requires a user to be explicitly aware of partition count and key information: "If you are joining two streams, please ensure that the streams are partitioned by the partition key of the column that you do the joins, and you have the same number of partitions in both streams."
          10. Azure doesn't really have a concept of a row-based sliding window (e.g. last 50 rows, or [Rows 50] in CQL). The closes that I can find is the Sliding window, which operates at a 100ns interval, but still could theoretically jump from 4 rows to 6 during the epsilon hop.

          Things I like:

          • As expected, I found the grammar to be much more intuitive/approachable than CQL. Azure's grammar is a variation of T-SQL that introduces a few extra stream-related operators.
          • TIMESTAMP BY seems like a nice way to support timestamps.

          Things I don't like:

          • Unlike CQL, seems to have no concept of tables. Seems to make joining a stream against a table impossible. Given Samza's state management, it seems that supporting tables explicitly in the grammar would be nice.
          • Lack of row-based sliding window.
          • Inability to specify a partition key from within the SELECT ... FROM statement. You're only allowed to partition by "PartitionId" right now. This is a missing feature that's yet to be implemented, but I presume will be shortly.
          Show
          criccomini Chris Riccomini added a comment - Thanks for the write-up Yi Pan (Data Infrastructure) . I'll have a look at Tigon. There seem to be three main layers: SQL grammar. Relational algebra. Actual implementation of relational operators. I agree with you that CQL's most interesting contribution seems to be its stream-relation model. I'm not crazy about its grammar, and it only provides a basic STREAM single-node unreplicated/unpartitioned reference implementation. If we buy that we want to use CQL's relational model, then the next questions I want to look at are: Can few find a better SQL grammar that still fits the same underlying relational model? Can we find streaming implementations that are distributed/partitioned, but provide the strong timing guarantees that CQL's relational model requires? For (1), I've taken a look at Azure, and will also have a look at Tigon/StreamSQL. For (2), MillWheel seems interesting. Will have to dig araound. Here are my notes on Azure: I like the TIMESTAMP BY syntax in Azure. It seems more flexible than a rigid timestamp field enforced in the data model. It also means a single stream can have multiple timestamp fields, rather than having to re-materialize messages every time a new field should be used as the timestamp. Azure again uses the linear road example. Seems to be standard practice. Azure's SELECT has an explicit PARTITION BY clause ( http://msdn.microsoft.com/en-us/library/dn835022.aspx ). There seems to be a fully defined formal grammar for Azure in their reference docs ( http://msdn.microsoft.com/en-us/library/dn835022.aspx ). It's interesting that you can't SELECT * in a join ( http://msdn.microsoft.com/en-us/library/dn835026.aspx ). I haven't thought about why. The data type supported seem to be quite primitive ( http://msdn.microsoft.com/en-us/library/dn835065.aspx ). The grid at the bottom of the page hints that our thinking about having a single data model and translating to/from underlying Serdes ( SAMZA-484 ) is might be a reasonable approach. They outline how their data types are converted to/from Avro, JSON, etc. This page describes how Azure's partitioning works. The parallelism model for Azure is manual. You have to manually define how many input stream partitions and "streaming units" (somewhat like containers) you want. Like Samza, Azure requires a user to be explicitly aware of partition count and key information: "If you are joining two streams, please ensure that the streams are partitioned by the partition key of the column that you do the joins, and you have the same number of partitions in both streams." Azure doesn't really have a concept of a row-based sliding window (e.g. last 50 rows, or [Rows 50] in CQL). The closes that I can find is the Sliding window , which operates at a 100ns interval, but still could theoretically jump from 4 rows to 6 during the epsilon hop. Things I like: As expected, I found the grammar to be much more intuitive/approachable than CQL. Azure's grammar is a variation of T-SQL that introduces a few extra stream-related operators. TIMESTAMP BY seems like a nice way to support timestamps. Things I don't like: Unlike CQL, seems to have no concept of tables. Seems to make joining a stream against a table impossible. Given Samza's state management, it seems that supporting tables explicitly in the grammar would be nice. Lack of row-based sliding window. Inability to specify a partition key from within the SELECT ... FROM statement. You're only allowed to partition by "PartitionId" right now. This is a missing feature that's yet to be implemented, but I presume will be shortly.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          I had a discussion with Chris Riccomini this morning. To me, the main take away from CQL paper is the modeling from stream to relation and relation to stream. I quickly checked Esper and found that Esper EPL also implements the concept of IStream, DStream, although in a different term as istream = IStream and rstream = DStream. From the paper describing the unified window definition between Oracle and StreamBase, it is obvious that both takes the windowing approach to convert stream to relation as well.

          The one big concern Chris Riccomini mentioned is the window of "unbounded" in the paper. I found that it might not be difficult to implement a single stream "unbounded" IStream, as mentioned in CQL paper (i.e. converting to RStream and window "Now"). However, if a SQL is a join between two unbounded streams, it does not seem to work, since by semantic definition, IStream now needs to keep the ever growing R(t) for both streams and perform join between them, while RStream on two streams with window "Now" only generate the immediate joins between tuples that arrives "simultaneously", not the tuples of join between one tuple from stream A at Now with another tuple from stream B at time t=0. Given that we have a set of big use cases that will join two or more continuous streams, it is a big issue if we can not keep unbounded history of a stream in a local box. I took a quick look into tigon.io and noticed one important tradeoff they have made in their implementation of GSQL:

          In general, all joins must have a predicate which matches the timestamps of its sources, and every aggregation query must have a timestamp in its list of group-by fields.
          Requiring pipelined query plans has three advantages. First, Tigon SQL can continually produce output. Second, internal tables (for joins and aggregations) remain small as they are continually cleaned of obsolete data. Third, queries can be connected together into complex data processing systems because the output of every stream is another data stream, suitable as the input to another query.

          The inspiration here is: the join between two streams should also be time-bounded. It actually makes sense in many of the use cases in time-sensative stream processing: the join is to find correlation between two sets of events that happened "simultaneously". Hence, if we can agree on the assumption that tuples in two streams that have timestamps not in a same time window should not or being meaningless in the join result, we can actually just focus on implementing a bounded window in our samza tasks and generate both IStream and DStream as window moves. In the ultimate store (i.e. be a persistent database or persistent log) that is the sink of the streamed results, we can opt to ignore the DStream to the store s.t. the user that queries the ultimate store for a time-varying sequence of join results, based on the tuples that are within the same time window (defined as "simultaneity"). I have a slightly different point of view on aggregation function mentioned in tigon.io's document, since there is a use case that you want all cumulative aggregated results from time t=0, as long as the total size of the relation generated from the aggregated function in the query is bounded over time.

          A few more comments following the discussion with Chris Riccomini:
          Comment 4 and 5 from Chris Riccomini are actually missing pieces from CQL as well as Esper. We are looking into how to enhance it, hoping to get something from MillWheel as well.
          Comment 6 may be address by having the query planner as a centralized entity that identifies possibly same states (i.e. "synopses") and identify the task boundaries in the execution graph. If the synopses relationship can be kept within the same task boundary, it can be merged into a single one; if not, the state will need to be replicated. We can pound on optimization on this further in SAMZA-483

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - I had a discussion with Chris Riccomini this morning. To me, the main take away from CQL paper is the modeling from stream to relation and relation to stream. I quickly checked Esper and found that Esper EPL also implements the concept of IStream, DStream, although in a different term as istream = IStream and rstream = DStream. From the paper describing the unified window definition between Oracle and StreamBase, it is obvious that both takes the windowing approach to convert stream to relation as well. The one big concern Chris Riccomini mentioned is the window of "unbounded" in the paper. I found that it might not be difficult to implement a single stream "unbounded" IStream, as mentioned in CQL paper (i.e. converting to RStream and window "Now"). However, if a SQL is a join between two unbounded streams, it does not seem to work, since by semantic definition, IStream now needs to keep the ever growing R(t) for both streams and perform join between them, while RStream on two streams with window "Now" only generate the immediate joins between tuples that arrives "simultaneously", not the tuples of join between one tuple from stream A at Now with another tuple from stream B at time t=0. Given that we have a set of big use cases that will join two or more continuous streams, it is a big issue if we can not keep unbounded history of a stream in a local box. I took a quick look into tigon.io and noticed one important tradeoff they have made in their implementation of GSQL: In general, all joins must have a predicate which matches the timestamps of its sources, and every aggregation query must have a timestamp in its list of group-by fields. Requiring pipelined query plans has three advantages. First, Tigon SQL can continually produce output. Second, internal tables (for joins and aggregations) remain small as they are continually cleaned of obsolete data. Third, queries can be connected together into complex data processing systems because the output of every stream is another data stream, suitable as the input to another query. The inspiration here is: the join between two streams should also be time-bounded. It actually makes sense in many of the use cases in time-sensative stream processing: the join is to find correlation between two sets of events that happened "simultaneously". Hence, if we can agree on the assumption that tuples in two streams that have timestamps not in a same time window should not or being meaningless in the join result, we can actually just focus on implementing a bounded window in our samza tasks and generate both IStream and DStream as window moves. In the ultimate store (i.e. be a persistent database or persistent log) that is the sink of the streamed results, we can opt to ignore the DStream to the store s.t. the user that queries the ultimate store for a time-varying sequence of join results, based on the tuples that are within the same time window (defined as "simultaneity"). I have a slightly different point of view on aggregation function mentioned in tigon.io's document, since there is a use case that you want all cumulative aggregated results from time t=0, as long as the total size of the relation generated from the aggregated function in the query is bounded over time. A few more comments following the discussion with Chris Riccomini : Comment 4 and 5 from Chris Riccomini are actually missing pieces from CQL as well as Esper. We are looking into how to enhance it, hoping to get something from MillWheel as well. Comment 6 may be address by having the query planner as a centralized entity that identifies possibly same states (i.e. "synopses") and identify the task boundaries in the execution graph. If the synopses relationship can be kept within the same task boundary, it can be merged into a single one; if not, the state will need to be replicated. We can pound on optimization on this further in SAMZA-483
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          Here's a few more papers that might be of interest.

          Some are listed here only because they share an author with another streaming system (i.e. MillWheel) – I thought there might be a chance they have an interesting tidbit or two.

          SPL: An Extensible Language for Distributed Stream Processing
          http://hirzels.com/martin/papers/tr14-rc25486-spl.pdf

          Semantics of (Resilient) X10
          http://www.math.unipd.it/~crafa/Pubblicazioni/SemResX10.pdf

          A Survey of the Stream Processing Landscape
          http://sfb876.tu-dortmund.de/PublicPublicationFiles/bockermann_2014b.pdf

          Comparing Distributed Online Stream Processing Systems Considering Fault Tolerance Issues
          http://ojs.academypublisher.com/index.php/jetwi/article/viewFile/jetwi0602174179/9462

          Hailstorm: Distributed Stream Processing with Exactly Once Semantics
          http://www.scs.stanford.edu/14sp-cs240h/projects/dimson_ganjoo.pdf

          Discretized Streams: Fault-Tolerant Streaming Computation at Scale
          http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf

          Tutorial: Cloud-based Data Stream Processing (Figure 1 in this paper is a nice overview of streaming systems)
          http://www.dis.uniroma1.it/~midlab/articoli/paper%2066.pdf

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited Here's a few more papers that might be of interest. Some are listed here only because they share an author with another streaming system (i.e. MillWheel) – I thought there might be a chance they have an interesting tidbit or two. SPL: An Extensible Language for Distributed Stream Processing http://hirzels.com/martin/papers/tr14-rc25486-spl.pdf Semantics of (Resilient) X10 http://www.math.unipd.it/~crafa/Pubblicazioni/SemResX10.pdf A Survey of the Stream Processing Landscape http://sfb876.tu-dortmund.de/PublicPublicationFiles/bockermann_2014b.pdf Comparing Distributed Online Stream Processing Systems Considering Fault Tolerance Issues http://ojs.academypublisher.com/index.php/jetwi/article/viewFile/jetwi0602174179/9462 Hailstorm: Distributed Stream Processing with Exactly Once Semantics http://www.scs.stanford.edu/14sp-cs240h/projects/dimson_ganjoo.pdf Discretized Streams: Fault-Tolerant Streaming Computation at Scale http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf Tutorial: Cloud-based Data Stream Processing (Figure 1 in this paper is a nice overview of streaming systems) http://www.dis.uniroma1.it/~midlab/articoli/paper%2066.pdf
          Hide
          criccomini Chris Riccomini added a comment -

          Full Esper reference is here.

          Show
          criccomini Chris Riccomini added a comment - Full Esper reference is here .
          Hide
          criccomini Chris Riccomini added a comment -

          I took a look at the CQL paper. Here are my notes:

          1. The IStream/RStream/DStream model seems very elegant from an academic perspective, but I am a bit worried about how easy it will be for the average user to pick up and used. I found the linear-road example to be somewhat convoluted. It's certainly not the case that you could say, "If you know SQL, this is a breeze to pick up." I'm mostly comparing this against my mental model of what I expected. I want to compare the CQL syntax to the syntax of StreamBase, Azure Stream Analytics, tigon.io, and Esper. I have a feeling that some of the alternatives might be much more intuitive.
          2. I really like having relations (tables) baked in as a first class citizen. In addition to the 3 "pros" listed in the paper, I think it's really compelling for doing joins against tables. If you were to bolt on external key-value stores (e.g. Voldemort, Cassandra, etc) for joins, it would fit naturally into this model as well. I think Jay had a similar comment above.
          3. I have some concerns about performance with [Range Unbounded], and RStreams over any range other than Now. It seems to me that this state could get prohibitively large.
          4. There's not much of a partitioning model in CQL. We'd need to bolt something on top for Samza/Kafka.
          5. Requirements on time seem very strong. We should compare with Millwheel to see how they handle time. I think Millwheel defines some heuristics that give CQL's requirements (i.e. once you've moved past time T, you'll never get a message for time <= T again). It might be possible to follow Millwheel's strategy when using CQL in order to get strong time guarantees on top of a distributed system. I also found Ben's comments on a time topic to be interesting. I'll need to think more about that.
          6. The state model in STREAM is not nearly as isolated as it is in Samza. Operators may reference other operators' state to reduce state size/duplication. Exposing task state for other tasks within a query topology seems like another use case that we should add to SAMZA-316.

          I find the CQL language to be pretty well thought out, but a bit cumbersome to reason about/implement in a distributed system. I'm going to dig into some of the other systems (listed above) to see how their grammars compare.

          Show
          criccomini Chris Riccomini added a comment - I took a look at the CQL paper. Here are my notes: The IStream/RStream/DStream model seems very elegant from an academic perspective, but I am a bit worried about how easy it will be for the average user to pick up and used. I found the linear-road example to be somewhat convoluted. It's certainly not the case that you could say, "If you know SQL, this is a breeze to pick up." I'm mostly comparing this against my mental model of what I expected. I want to compare the CQL syntax to the syntax of StreamBase , Azure Stream Analytics , tigon.io , and Esper . I have a feeling that some of the alternatives might be much more intuitive. I really like having relations (tables) baked in as a first class citizen. In addition to the 3 "pros" listed in the paper, I think it's really compelling for doing joins against tables. If you were to bolt on external key-value stores (e.g. Voldemort, Cassandra, etc) for joins, it would fit naturally into this model as well. I think Jay had a similar comment above. I have some concerns about performance with [Range Unbounded], and RStreams over any range other than Now. It seems to me that this state could get prohibitively large. There's not much of a partitioning model in CQL. We'd need to bolt something on top for Samza/Kafka. Requirements on time seem very strong. We should compare with Millwheel to see how they handle time. I think Millwheel defines some heuristics that give CQL's requirements (i.e. once you've moved past time T, you'll never get a message for time <= T again). It might be possible to follow Millwheel's strategy when using CQL in order to get strong time guarantees on top of a distributed system. I also found Ben's comments on a time topic to be interesting. I'll need to think more about that. The state model in STREAM is not nearly as isolated as it is in Samza. Operators may reference other operators' state to reduce state size/duplication. Exposing task state for other tasks within a query topology seems like another use case that we should add to SAMZA-316 . I find the CQL language to be pretty well thought out, but a bit cumbersome to reason about/implement in a distributed system. I'm going to dig into some of the other systems (listed above) to see how their grammars compare.
          Hide
          bkirwi Ben Kirwin added a comment - - edited

          I'm a bit late to the conversation, so excuse me if I jump right in.

          I've been developing coast, another high-level streaming project that compiles down to Samza jobs. (https://github.com/bkirwi/coast) The implementation and 'frontend DSL' for this project is in Scala, and there's also some support code for unit testing and job-graph visualization. Unlike most of what's discussed above, this project is not directly SQL-inspired; by analogy to the Hadoop ecosystem, it's more of a Cascading than a Hive. (I suspect you could throw a SQLish frontend on top of it, in the same way Spark SQL does for Spark, but nobody's actually tried this yet.) coast's exactly-once semantics are largely inspired by Kafka's log model, and many of the design choices are intended to make it a 'good citizen' in a larger Kafka-based infrastructure.

          Since we're taking fairly different approaches, I'm not sure how well my conclusions translate over. I'm still trying to plow through some of the papers in this thread (thanks for these!) so for the moment, I'll leave some notes on managing time.

          Yi Pan and others have mentioned some difficulties with reconciling the framework- and application-level views of time: machines may disagree about the current time, messages may be processed arbitrarily long after they're sent, timestamps in a stream might not be monotonically increasing, and messages in different partitions might be ordered at the application level but not within Kafka / another backing store. I regard all these issues as pretty fundamental in a distributed context. I'm also pessimistic about any framework's ability to 'paper over' them; there are a bunch of viable solutions, none of which seem to work seamlessly for all problems.

          So far, like Freshet, coast has basically punted on this: the only notion of time / ordering comes from Kafka's ordering within a partition. It turns out you can get pretty far with this, but not all the way – for example, you can window by message count but not by time. To bridge that gap in expressiveness, I've been playing with the idea of a 'clock stream', which just produces a 'tick' message every n seconds. It turns out that this type of stream has essentially the same semantics as a Kafka-backed stream does: every task sees the same messages in the same order, but possibly skewed slightly in time depending on the local clock; and it's possible to 'checkpoint' your position in the stream by remembering the time of the last successfully-processed 'tick'. For coast, this turns out to be very useful – I already have a rich language for manipulating streams, so by making clock streams 'first class', it's possible to implement most (all?) of the time-based windowing strategies discussed above in library code.

          In a SQL-type API, it would probably be awkward to manipulate streams of ticks directly; but it might still be useful as a implementation mechanism or the basis for a semantic model.

          As noted, this is all somewhat speculative for me at the moment; I'll leave a note here once I get around to implementing this.

          Show
          bkirwi Ben Kirwin added a comment - - edited I'm a bit late to the conversation, so excuse me if I jump right in. I've been developing coast , another high-level streaming project that compiles down to Samza jobs. ( https://github.com/bkirwi/coast ) The implementation and 'frontend DSL' for this project is in Scala, and there's also some support code for unit testing and job-graph visualization. Unlike most of what's discussed above, this project is not directly SQL-inspired; by analogy to the Hadoop ecosystem, it's more of a Cascading than a Hive. (I suspect you could throw a SQLish frontend on top of it, in the same way Spark SQL does for Spark, but nobody's actually tried this yet.) coast 's exactly-once semantics are largely inspired by Kafka's log model, and many of the design choices are intended to make it a 'good citizen' in a larger Kafka-based infrastructure. Since we're taking fairly different approaches, I'm not sure how well my conclusions translate over. I'm still trying to plow through some of the papers in this thread (thanks for these!) so for the moment, I'll leave some notes on managing time. Yi Pan and others have mentioned some difficulties with reconciling the framework- and application-level views of time: machines may disagree about the current time, messages may be processed arbitrarily long after they're sent, timestamps in a stream might not be monotonically increasing, and messages in different partitions might be ordered at the application level but not within Kafka / another backing store. I regard all these issues as pretty fundamental in a distributed context. I'm also pessimistic about any framework's ability to 'paper over' them; there are a bunch of viable solutions, none of which seem to work seamlessly for all problems. So far, like Freshet, coast has basically punted on this: the only notion of time / ordering comes from Kafka's ordering within a partition. It turns out you can get pretty far with this, but not all the way – for example, you can window by message count but not by time. To bridge that gap in expressiveness, I've been playing with the idea of a 'clock stream', which just produces a 'tick' message every n seconds. It turns out that this type of stream has essentially the same semantics as a Kafka-backed stream does: every task sees the same messages in the same order, but possibly skewed slightly in time depending on the local clock; and it's possible to 'checkpoint' your position in the stream by remembering the time of the last successfully-processed 'tick'. For coast , this turns out to be very useful – I already have a rich language for manipulating streams, so by making clock streams 'first class', it's possible to implement most (all?) of the time-based windowing strategies discussed above in library code. In a SQL-type API, it would probably be awkward to manipulate streams of ticks directly; but it might still be useful as a implementation mechanism or the basis for a semantic model. As noted, this is all somewhat speculative for me at the moment; I'll leave a note here once I get around to implementing this.
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Another interesting paper I found was "Query Languages and Data Models for Database Sequences and Data Streams" [1] which propose a different way of handling window queries over streams using 'user defined aggregates(UDA)'. They introduce notion of nonblocking(NB) queries and NB-completeness first. They also show that relational algebra is not NB-complete (its well know that we can't support ALL, EXCEPT, NOT IN like blocking operations over stream without window operator). Instead of using window operator like 'S [Rows 5]', they proposed to use UDA like following to do window computations.

          AGGREGATE tumble avg(Next Int) : Real
          {
          TABLE state(tsum Int, cnt Int);
          INITIALIZE :

          { INSERT INTO state VALUES (Next, 1) }

          ITERATE:

          { UPDATE state SET tsum=tsum+Next, cnt=cnt+1; INSERT INTO RETURN SELECT tsum/cnt FROM state WHERE cnt % 200 = 0; UPDATE state SET tsum=0, cnt=0 WHERE cnt % 200 = 0 }

          TERMINATE : { }
          }

          Emitting tuples to down stream is done by 'INSERT INTO RETUEN'. If you have 'INSERT INTO RETURN' in TERMINATE block, your aggregate is blocking and cannot executed over a stream. There are some interesting samples like finding patterns over a stream in Section 5 of the paper [1]. They even show a implementation of a turing machine using UDAs. Also they use 'union' and UDAs to implement stream joins instead of blocking join operator. Sample can be found in [2].

          Why I was interested about this paper is mainly because

          • It looks like we can even do pattern matching type of queries over streams using UDAs. I am not sure how complicated this using general SQL
          • It looks like we can use this as the intermediate model where other languages, DSLs, APIs transformed into. I am yet to understand how well this will work. But concept of UDA seems pretty interesting to me given the fact that we can even model a turing machine.

          I found several other references in this paper which explains/motivated some of the concepts here. I'll let you know if I find any interesting things in those.

          [1] http://www.cs.ucla.edu/~zaniolo/papers/vldb04cr.pdf
          [2] http://wis.cs.ucla.edu/wis/stream-mill/examples/nexmark.html

          Show
          milinda Milinda Lakmal Pathirage added a comment - Another interesting paper I found was "Query Languages and Data Models for Database Sequences and Data Streams" [1] which propose a different way of handling window queries over streams using 'user defined aggregates(UDA)'. They introduce notion of nonblocking(NB) queries and NB-completeness first. They also show that relational algebra is not NB-complete (its well know that we can't support ALL, EXCEPT, NOT IN like blocking operations over stream without window operator). Instead of using window operator like 'S [Rows 5] ', they proposed to use UDA like following to do window computations. AGGREGATE tumble avg(Next Int) : Real { TABLE state(tsum Int, cnt Int); INITIALIZE : { INSERT INTO state VALUES (Next, 1) } ITERATE: { UPDATE state SET tsum=tsum+Next, cnt=cnt+1; INSERT INTO RETURN SELECT tsum/cnt FROM state WHERE cnt % 200 = 0; UPDATE state SET tsum=0, cnt=0 WHERE cnt % 200 = 0 } TERMINATE : { } } Emitting tuples to down stream is done by 'INSERT INTO RETUEN'. If you have 'INSERT INTO RETURN' in TERMINATE block, your aggregate is blocking and cannot executed over a stream. There are some interesting samples like finding patterns over a stream in Section 5 of the paper [1] . They even show a implementation of a turing machine using UDAs. Also they use 'union' and UDAs to implement stream joins instead of blocking join operator. Sample can be found in [2] . Why I was interested about this paper is mainly because It looks like we can even do pattern matching type of queries over streams using UDAs. I am not sure how complicated this using general SQL It looks like we can use this as the intermediate model where other languages, DSLs, APIs transformed into. I am yet to understand how well this will work. But concept of UDA seems pretty interesting to me given the fact that we can even model a turing machine. I found several other references in this paper which explains/motivated some of the concepts here. I'll let you know if I find any interesting things in those. [1] http://www.cs.ucla.edu/~zaniolo/papers/vldb04cr.pdf [2] http://wis.cs.ucla.edu/wis/stream-mill/examples/nexmark.html
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Milinda Lakmal Pathirage Great discussion! Here are my current thoughts:

          1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are there in CQL. In addition to tuple and time based windows it introduce partitioned window where we partition the stream during window creation.

          Agreed.

          2. If we are going for SQL like semantics, it doesn't matter whether we decide to go with embedded DSL or plain SQL. We can decouple operator layer and the language layer. That's how I have designed Freshet mentioned in earlier comment. In my implementation I have several Samza tasks which implements different operators such as window, select, project and aggregate. what DSL layer does is generation of relation algebra like expression which will be converted to Samza job graph. Each individual node in this job graph is a Samza job with its properties such as input stream, output stream and other operator specific configuration parameters. In the final phase, these nodes get converted into properties file which describes the Samza job. IMHO, we should first decide what type of semantics we are going to support and then design the operator layer based on this semantic.

          Great! We are thinking in the same direction. My thought is to identify the set of primitives (i.e. operators) for popular SQL use cases first and implement them at operator layer. The SQL parser layer can be built on-top of it which generate the same relation algebra expression as your DSL parser and convert to the Samza job graph. Hence, I think that in addition to the agreement on the set of operators in the operator layer, we should also define a common relation algebra expression s.t. we can re-use the translation to Samza job graph in your work.

          3. I am not sure whether we should add concept of time to Samza. We can implement the concept of time in our operator layer and query layer, without integrating concept of time to Samza. Currently in Freshet [1], I don't have the time concept implemented. But I have designed the DSL in a way that, query writer can specify how to timestamp individual tuples. For example timestamp can be a field in tuple for some scenarios. So in Freshet DSL, user can specify which field contains the timestamp. Otherwise, Freshet uses time when tuple first appeared in Freshet as tuple's timestamp.

          The nuances exist when actually implementing the windowing technique in a distributed environment where: a) different tasks running on different machines may see different time; b) in a stream of tuples w/ timestamp, there is no explicit termination of a time window, when streams can be paused by arbitrary time period. IMO, the only meaning solution to this seems to be: i) embed the original timestamp (if missing, injection timestamp as in Freshet) in the data model of the tuple in the stream (i.e. some envelope info tagged to the tuple) and propagate this timestamp downward the whole stream processing pipeline. CQL[1] talked about some interesting points regarding how to propagate the input tuple's timestamp to the output tuple; ii) adopt some explicit window termination token in the stream for time-based window as similar to MillWheel [2]. Leaving it to users to specify timestamp field may not be necessary and complicate the logic.

          4. I am nor sure I completely understood Raul Castro Fernandez's comment about moving the concept of windows out of the query. But if we are doing that, what we can do alternatively is keep the concept of windows in the query but implement the operator layer/physical query plan in a way that it separate out the windowing and query logic. In my experience with CQL, the same thing happens in CQL up to some extent. Most/all scenarios, what happens first in CQL query is window generation as insert/delete stream. Insert/delete stream assumes each tuple can be uniquely identified. When tuple is added to the window, it get emitted to output stream as a 'insert' tuple and when tuple get removed from window, it get emitter to output stream as a 'delete' tuple. Downstream operator should handle this insert/delete tuples according to its logic. I found this concept of insert/delete stream really simplifies the window handling. "CQL Implementation in STREAM" section in [2] contains some interesting details about stream query execution and how sliding windows can be implemented as I described above.

          I personally like the concept of a separate window operator, due to the fact that you mentioned regarding to computation on insert/delete streams in CQL (i.e. incremental processing). However, implementation of the window operator can be as such that we are running a query on top of the SAMZA stores hosting the timestamped tuples (e.g. insert/delete tuples are generated by queries based on the difference between the previous and the current windows). Generalize the storage model for the internal data model of tuples can help to address the memory and debugging issues Raul Castro Fernandez mentioned.
          The key issue mentioned in comparison of Oracle vs StreamBase in [3] is a different one that does not matter whether we implement window operator directly as STREAM does, or implement it as a query. The key issue in [3] seems to be the ambiguity of ordering when the incoming tuples have the same timestamp, which messed up w/ the ordering of the tuples in/across the streams and hence caused non-deterministic results in different types of windows. No matter we embed the timestamp into the data model or not, we have to make sure: a) a window operation on the tuples in the same stream w/ the same timestamps must be deterministic; b) the ordering between tuples from different streams w/ the same timestamps must be deterministic. [3] addressed those two points via a)batchId and b) a SPREAD operator implementing a consistent ordering policy in a stream group (i.e. can be implemented as a consistent policy in MessageChooser).What I am still not sure is how this ordering info can be kept downstream throughout the whole stream processing pipeline.

          In addition, I want to bring up for discussion the following item:
          1) parallelism. We should incorporate the partitioned window in the input streams to help generate partitions and parallel Samza tasks in the execution graph. The partitions to the output stream should also be considered. I have been thinking of introducing some syntax like IStream(SELECT * FROM InStream[partitioned by A into 100 rows 1]) PARTITIONED BY B INTO 200 that allows us to specify the partition keys and the number of partitions in the input and output streams. The tricky part is now the downstream tasks in the execution graph may now get out-of-order arrival of tuples w.r.t. original timestamp. There is an interesting research paper on this out-of-arrival and skewed window time from Stanford [4].

          [1] CQL: https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
          [2] MillWheel: http://research.google.com/pubs/pub41378.html
          [3] "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf
          [4] Flexible time management in data stream systems: http://dl.acm.org/citation.cfm?id=1055596

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Milinda Lakmal Pathirage Great discussion! Here are my current thoughts: 1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are there in CQL. In addition to tuple and time based windows it introduce partitioned window where we partition the stream during window creation. Agreed. 2. If we are going for SQL like semantics, it doesn't matter whether we decide to go with embedded DSL or plain SQL. We can decouple operator layer and the language layer. That's how I have designed Freshet mentioned in earlier comment. In my implementation I have several Samza tasks which implements different operators such as window, select, project and aggregate. what DSL layer does is generation of relation algebra like expression which will be converted to Samza job graph. Each individual node in this job graph is a Samza job with its properties such as input stream, output stream and other operator specific configuration parameters. In the final phase, these nodes get converted into properties file which describes the Samza job. IMHO, we should first decide what type of semantics we are going to support and then design the operator layer based on this semantic. Great! We are thinking in the same direction. My thought is to identify the set of primitives (i.e. operators) for popular SQL use cases first and implement them at operator layer. The SQL parser layer can be built on-top of it which generate the same relation algebra expression as your DSL parser and convert to the Samza job graph. Hence, I think that in addition to the agreement on the set of operators in the operator layer, we should also define a common relation algebra expression s.t. we can re-use the translation to Samza job graph in your work. 3. I am not sure whether we should add concept of time to Samza. We can implement the concept of time in our operator layer and query layer, without integrating concept of time to Samza. Currently in Freshet [1] , I don't have the time concept implemented. But I have designed the DSL in a way that, query writer can specify how to timestamp individual tuples. For example timestamp can be a field in tuple for some scenarios. So in Freshet DSL, user can specify which field contains the timestamp. Otherwise, Freshet uses time when tuple first appeared in Freshet as tuple's timestamp. The nuances exist when actually implementing the windowing technique in a distributed environment where: a) different tasks running on different machines may see different time; b) in a stream of tuples w/ timestamp, there is no explicit termination of a time window, when streams can be paused by arbitrary time period. IMO, the only meaning solution to this seems to be: i) embed the original timestamp (if missing, injection timestamp as in Freshet) in the data model of the tuple in the stream (i.e. some envelope info tagged to the tuple) and propagate this timestamp downward the whole stream processing pipeline. CQL [1] talked about some interesting points regarding how to propagate the input tuple's timestamp to the output tuple; ii) adopt some explicit window termination token in the stream for time-based window as similar to MillWheel [2] . Leaving it to users to specify timestamp field may not be necessary and complicate the logic. 4. I am nor sure I completely understood Raul Castro Fernandez's comment about moving the concept of windows out of the query. But if we are doing that, what we can do alternatively is keep the concept of windows in the query but implement the operator layer/physical query plan in a way that it separate out the windowing and query logic. In my experience with CQL, the same thing happens in CQL up to some extent. Most/all scenarios, what happens first in CQL query is window generation as insert/delete stream. Insert/delete stream assumes each tuple can be uniquely identified. When tuple is added to the window, it get emitted to output stream as a 'insert' tuple and when tuple get removed from window, it get emitter to output stream as a 'delete' tuple. Downstream operator should handle this insert/delete tuples according to its logic. I found this concept of insert/delete stream really simplifies the window handling. "CQL Implementation in STREAM" section in [2] contains some interesting details about stream query execution and how sliding windows can be implemented as I described above. I personally like the concept of a separate window operator, due to the fact that you mentioned regarding to computation on insert/delete streams in CQL (i.e. incremental processing). However, implementation of the window operator can be as such that we are running a query on top of the SAMZA stores hosting the timestamped tuples (e.g. insert/delete tuples are generated by queries based on the difference between the previous and the current windows). Generalize the storage model for the internal data model of tuples can help to address the memory and debugging issues Raul Castro Fernandez mentioned. The key issue mentioned in comparison of Oracle vs StreamBase in [3] is a different one that does not matter whether we implement window operator directly as STREAM does, or implement it as a query. The key issue in [3] seems to be the ambiguity of ordering when the incoming tuples have the same timestamp, which messed up w/ the ordering of the tuples in/across the streams and hence caused non-deterministic results in different types of windows. No matter we embed the timestamp into the data model or not, we have to make sure: a) a window operation on the tuples in the same stream w/ the same timestamps must be deterministic; b) the ordering between tuples from different streams w/ the same timestamps must be deterministic. [3] addressed those two points via a)batchId and b) a SPREAD operator implementing a consistent ordering policy in a stream group (i.e. can be implemented as a consistent policy in MessageChooser).What I am still not sure is how this ordering info can be kept downstream throughout the whole stream processing pipeline. In addition, I want to bring up for discussion the following item: 1) parallelism. We should incorporate the partitioned window in the input streams to help generate partitions and parallel Samza tasks in the execution graph. The partitions to the output stream should also be considered. I have been thinking of introducing some syntax like IStream(SELECT * FROM InStream [partitioned by A into 100 rows 1] ) PARTITIONED BY B INTO 200 that allows us to specify the partition keys and the number of partitions in the input and output streams. The tricky part is now the downstream tasks in the execution graph may now get out-of-order arrival of tuples w.r.t. original timestamp. There is an interesting research paper on this out-of-arrival and skewed window time from Stanford [4] . [1] CQL: https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf [2] MillWheel: http://research.google.com/pubs/pub41378.html [3] "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf [4] Flexible time management in data stream systems: http://dl.acm.org/citation.cfm?id=1055596
          Hide
          milinda Milinda Lakmal Pathirage added a comment -

          Below are my thoughts about several things discussed above.

          1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are there in CQL. In addition to tuple and time based windows it introduce partitioned window where we partition the stream during window creation.
          2. If we are going for SQL like semantics, it doesn't matter whether we decide to go with embedded DSL or plain SQL. We can decouple operator layer and the language layer. That's how I have designed Freshet mentioned in earlier comment. In my implementation I have several Samza tasks which implements different operators such as window, select, project and aggregate. what DSL layer does is generation of relation algebra like expression which will be converted to Samza job graph. Each individual node in this job graph is a Samza job with its properties such as input stream, output stream and other operator specific configuration parameters. In the final phase, these nodes get converted into properties file which describes the Samza job. IMHO, we should first decide what type of semantics we are going to support and then design the operator layer based on this semantic.
          3. I am not sure whether we should add concept of time to Samza. We can implement the concept of time in our operator layer and query layer, without integrating concept of time to Samza. Currently in Freshet [1], I don't have the time concept implemented. But I have designed the DSL in a way that, query writer can specify how to timestamp individual tuples. For example timestamp can be a field in tuple for some scenarios. So in Freshet DSL, user can specify which field contains the timestamp. Otherwise, Freshet uses time when tuple first appeared in Freshet as tuple's timestamp.
          4. I am nor sure I completely understood Raul Castro Fernandez's comment about moving the concept of windows out of the query. But if we are doing that, what we can do alternatively is keep the concept of windows in the query but implement the operator layer/physical query plan in a way that it separate out the windowing and query logic. In my experience with CQL, the same thing happens in CQL up to some extent. Most/all scenarios, what happens first in CQL query is window generation as insert/delete stream. Insert/delete stream assumes each tuple can be uniquely identified. When tuple is added to the window, it get emitted to output stream as a 'insert' tuple and when tuple get removed from window, it get emitter to output stream as a 'delete' tuple. Downstream operator should handle this insert/delete tuples according to its logic. I found this concept of insert/delete stream really simplifies the window handling. "CQL Implementation in STREAM" section in [2] contains some interesting details about stream query execution and how sliding windows can be implemented as I described above.

          [1] https://github.com/milinda/Freshet
          [2] https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf

          Show
          milinda Milinda Lakmal Pathirage added a comment - Below are my thoughts about several things discussed above. 1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are there in CQL. In addition to tuple and time based windows it introduce partitioned window where we partition the stream during window creation. 2. If we are going for SQL like semantics, it doesn't matter whether we decide to go with embedded DSL or plain SQL. We can decouple operator layer and the language layer. That's how I have designed Freshet mentioned in earlier comment. In my implementation I have several Samza tasks which implements different operators such as window, select, project and aggregate. what DSL layer does is generation of relation algebra like expression which will be converted to Samza job graph. Each individual node in this job graph is a Samza job with its properties such as input stream, output stream and other operator specific configuration parameters. In the final phase, these nodes get converted into properties file which describes the Samza job. IMHO, we should first decide what type of semantics we are going to support and then design the operator layer based on this semantic. 3. I am not sure whether we should add concept of time to Samza. We can implement the concept of time in our operator layer and query layer, without integrating concept of time to Samza. Currently in Freshet [1] , I don't have the time concept implemented. But I have designed the DSL in a way that, query writer can specify how to timestamp individual tuples. For example timestamp can be a field in tuple for some scenarios. So in Freshet DSL, user can specify which field contains the timestamp. Otherwise, Freshet uses time when tuple first appeared in Freshet as tuple's timestamp. 4. I am nor sure I completely understood Raul Castro Fernandez 's comment about moving the concept of windows out of the query. But if we are doing that, what we can do alternatively is keep the concept of windows in the query but implement the operator layer/physical query plan in a way that it separate out the windowing and query logic. In my experience with CQL, the same thing happens in CQL up to some extent. Most/all scenarios, what happens first in CQL query is window generation as insert/delete stream. Insert/delete stream assumes each tuple can be uniquely identified. When tuple is added to the window, it get emitted to output stream as a 'insert' tuple and when tuple get removed from window, it get emitter to output stream as a 'delete' tuple. Downstream operator should handle this insert/delete tuples according to its logic. I found this concept of insert/delete stream really simplifies the window handling. "CQL Implementation in STREAM" section in [2] contains some interesting details about stream query execution and how sliding windows can be implemented as I described above. [1] https://github.com/milinda/Freshet [2] https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Milinda Lakmal Pathirage that sounds great! Chris and I are both looking into this SQL on Stream in Samza now. We should align our effort.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Milinda Lakmal Pathirage that sounds great! Chris and I are both looking into this SQL on Stream in Samza now. We should align our effort.
          Hide
          milinda Milinda Lakmal Pathirage added a comment - - edited

          I am developing a Clojure DSL based on CQL[2] on top of Samza. You can find it over here [1]. Currently I am in the process of implementing the DSL to Samza job graph. I would like to provide a document explaining the project if you are interested on this. Also I would like to contribute this back to Samza or contribute to a different implementation Samza developers decide. This was inspired by Maritn's talk "TURNING THE DATABASE INSIDE OUT WITH APACHE SAMZA".

          I mainly choose CQL because its simplicity. It has a concept called time varying relation which makes it easy to model queries over windows as well as it makes it easy to integrate normal SQL relations to streaming queries. CQL paper also describes some implementation details like use of synopses to model time varying relations as plus/minus streams. I found Samza's local storage really useful when implementing this.

          I mainly choose Clojure because, I am planning to go beyond declarative language and allow people to use Clojure to implement complex filtering/aggregation logic, etc.

          Clojure DSL is similar to [3].

          [1] https://github.com/milinda/Freshet
          [2] http://research.microsoft.com/apps/pubs/default.aspx?id=77607
          [3] http://sqlkorma.com

          Show
          milinda Milinda Lakmal Pathirage added a comment - - edited I am developing a Clojure DSL based on CQL [2] on top of Samza. You can find it over here [1] . Currently I am in the process of implementing the DSL to Samza job graph. I would like to provide a document explaining the project if you are interested on this. Also I would like to contribute this back to Samza or contribute to a different implementation Samza developers decide. This was inspired by Maritn's talk "TURNING THE DATABASE INSIDE OUT WITH APACHE SAMZA". I mainly choose CQL because its simplicity. It has a concept called time varying relation which makes it easy to model queries over windows as well as it makes it easy to integrate normal SQL relations to streaming queries. CQL paper also describes some implementation details like use of synopses to model time varying relations as plus/minus streams. I found Samza's local storage really useful when implementing this. I mainly choose Clojure because, I am planning to go beyond declarative language and allow people to use Clojure to implement complex filtering/aggregation logic, etc. Clojure DSL is similar to [3] . [1] https://github.com/milinda/Freshet [2] http://research.microsoft.com/apps/pubs/default.aspx?id=77607 [3] http://sqlkorma.com
          Show
          criccomini Chris Riccomini added a comment - Some more reading list items: http://www.vldb.org/conf/2004/DEMP26.PDF http://web.cs.wpi.edu/~mukherab/i/DCAPE.pdf https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf http://www.eecs.harvard.edu/~mdw/course/cs260r/papers/aurora-cidr03.pdf http://pdf.aminer.org/000/109/510/a_temporal_foundation_for_continuous_queries_over_data_streams.pdf http://archiv.ub.uni-marburg.de/diss/z2007/0671/pdf/djk.pdf https://files.ifi.uzh.ch/dbtg/ndbs/HS12usfduj/KS09.pdf http://davis.wpi.edu/dsrg/PROJECTS/CAPE/publications.htm http://davis.wpi.edu/dsrg/PROJECTS/CAPE/slides.htm
          Show
          criccomini Chris Riccomini added a comment - [1] http://dbs.mathematik.uni-marburg.de/teaching/seminare/09WSGProzesse/06Thema_rtm/a4-kramer.pdf This link is dead. Can be found here: https://files.ifi.uzh.ch/dbtg/ndbs/HS12usfduj/KS09.pdf
          Hide
          raulcf Raul Castro Fernandez added a comment -

          Cool, so it seems this converges towards the SQL-like solution.

          I think that this is a good opportunity to enrich the data model. It is important to understand why people like SQL and keep those features, however, there are other things that can change. I think the data model is probably one thing you can change without annoying people too much.

          This is what I was thinking:
          One idea is to remove the window semantics from the query and instead pushing it to the data model. Instead of specifying the window in a query, you would be actually queriyng a special table/stream that represents the window (think about it as some sort of materialized view). For users, I believe this is natural: all data is accessed the same way. In particular, they do not need to worry about specifying windows in the query, only in the data model.

          From a system implementation, performance and usability point of view, these are some advantages that come to my mind:

          • Easier to reason about windows. There is opportunity to remove the constraints of the Oracle-vs-Streambase model explained above. When I say remove, I might mean hide it, not sure yet.
          • Performance. If someone defines a huge window, all data in the window needs to live in the memory of the node processing such window. This can be wasteful or impossible, requiring materializing to disk partial data and all sort of ugly things (I started discussing some of these problems with Chris Riccomini). Instead, I think by leaving this responsibility to the underlying storage, you can get rid of this problem, by evaluating the window lazily, once it's full. This seems to make more difficult processing windows incrementally. I think it is better to have this problem than having the problem of: I don't know if all data will fit in the memory of my machine.
          • Debugging. While debugging your query, you can ask the engine to save all windows you are processing (remember windows are now just another data structure, such as a table, etc). That way one should be able to just reply them, factoring out time, which in general is quite difficult.

          In general, all this would be about defining a robust and richer data model, i.e. not only tables, but also streams, windows, and probably some other stuff to enable incremental processing. What I mean is that users would still have a SQL-like interface. This probably has some limitations, but I just wanted to keep the ball rolling.

          Show
          raulcf Raul Castro Fernandez added a comment - Cool, so it seems this converges towards the SQL-like solution. I think that this is a good opportunity to enrich the data model. It is important to understand why people like SQL and keep those features, however, there are other things that can change. I think the data model is probably one thing you can change without annoying people too much. This is what I was thinking: One idea is to remove the window semantics from the query and instead pushing it to the data model. Instead of specifying the window in a query, you would be actually queriyng a special table/stream that represents the window (think about it as some sort of materialized view). For users, I believe this is natural: all data is accessed the same way. In particular, they do not need to worry about specifying windows in the query, only in the data model. From a system implementation, performance and usability point of view, these are some advantages that come to my mind: Easier to reason about windows. There is opportunity to remove the constraints of the Oracle-vs-Streambase model explained above. When I say remove, I might mean hide it, not sure yet. Performance. If someone defines a huge window, all data in the window needs to live in the memory of the node processing such window. This can be wasteful or impossible, requiring materializing to disk partial data and all sort of ugly things (I started discussing some of these problems with Chris Riccomini ). Instead, I think by leaving this responsibility to the underlying storage, you can get rid of this problem, by evaluating the window lazily, once it's full. This seems to make more difficult processing windows incrementally. I think it is better to have this problem than having the problem of: I don't know if all data will fit in the memory of my machine. Debugging. While debugging your query, you can ask the engine to save all windows you are processing (remember windows are now just another data structure, such as a table, etc). That way one should be able to just reply them, factoring out time, which in general is quite difficult. In general, all this would be about defining a robust and richer data model, i.e. not only tables, but also streams, windows, and probably some other stuff to enable incremental processing. What I mean is that users would still have a SQL-like interface. This probably has some limitations, but I just wanted to keep the ball rolling.
          Hide
          jkreps Jay Kreps added a comment -

          I agree that the overwhelming advantage of something SQL-like is the fact that 95+% of engineers know it. Since we are trying to popularize a streaming model of computation that is unfamilar to perhaps 98% of people having something that is simple to understand and has a close analogy to something people already know would be ideal.

          I think another aspect to consider is access to state. I think having a notion of tables and streams would be good. So say you have a changelog stream for your user database and want to augment page views with the company of the user viewing the page you would do something like

          create table user_companies as select user_id, company_id from users keyed by user_id;
          create stream as select u.company_id, p.* from user_companies u join pageviews p on u.user_id = p.user_id;
          
          Show
          jkreps Jay Kreps added a comment - I agree that the overwhelming advantage of something SQL-like is the fact that 95+% of engineers know it. Since we are trying to popularize a streaming model of computation that is unfamilar to perhaps 98% of people having something that is simple to understand and has a close analogy to something people already know would be ideal. I think another aspect to consider is access to state. I think having a notion of tables and streams would be good. So say you have a changelog stream for your user database and want to augment page views with the company of the user viewing the page you would do something like create table user_companies as select user_id, company_id from users keyed by user_id; create stream as select u.company_id, p.* from user_companies u join pageviews p on u.user_id = p.user_id;
          Hide
          criccomini Chris Riccomini added a comment -

          We should also have a look at StreamSQL from StreamBase. They have a full DDL and DML for a streaming SQL implementation.

          Show
          criccomini Chris Riccomini added a comment - We should also have a look at StreamSQL from StreamBase. They have a full DDL and DML for a streaming SQL implementation.
          Hide
          renato2099 Renato Javier Marroquín Mogrovejo added a comment -

          I find this paper very comprehensive when defining temporal features over streams [1].
          Another interesting open source project is Esper, it has a very good SQLish syntax. It is worth looking at it for sure.

          [1] http://dbs.mathematik.uni-marburg.de/teaching/seminare/09WSGProzesse/06Thema_rtm/a4-kramer.pdf
          [2] http://esper.codehaus.org/

          Show
          renato2099 Renato Javier Marroquín Mogrovejo added a comment - I find this paper very comprehensive when defining temporal features over streams [1] . Another interesting open source project is Esper, it has a very good SQLish syntax. It is worth looking at it for sure. [1] http://dbs.mathematik.uni-marburg.de/teaching/seminare/09WSGProzesse/06Thema_rtm/a4-kramer.pdf [2] http://esper.codehaus.org/
          Hide
          criccomini Chris Riccomini added a comment -

          Raul Castro Fernandez, something related to this, as you've said, is defining time. Currently Samza has no concept of time at all. Do you have any papers you can recommend on how different systems handle time? I recall MillWheel had some concept of time built in. Are there any good papers on this?

          Show
          criccomini Chris Riccomini added a comment - Raul Castro Fernandez , something related to this, as you've said, is defining time. Currently Samza has no concept of time at all. Do you have any papers you can recommend on how different systems handle time? I recall MillWheel had some concept of time built in. Are there any good papers on this?
          Hide
          criccomini Chris Riccomini added a comment -

          Raul Castro Fernandez, thanks for writing this up!

          I am more in favor of a declarative DSL closer to SQL. I think this will make Samza more appealing to wider group of users since most analysts prefer SQL.

          I agree with David about the argument for a SQL-like language, vs. a DSL. As much as SQL might be annoying, it's very intuitive and popular. You can see this playing out in the Hadoop world, where analysts are using SQL almost exclusively, whether that be in Hive, Impala, Shark, Tajo, etc.

          Just as for windowing semantics, choosing SQL vs. DSL does not mean the other can't be done. Hadoop ended up with several versions of both, as well. In an ideal world though, we'd at least have a very good polished SQL abstraction for Samza.

          Show
          criccomini Chris Riccomini added a comment - Raul Castro Fernandez , thanks for writing this up! I am more in favor of a declarative DSL closer to SQL. I think this will make Samza more appealing to wider group of users since most analysts prefer SQL. I agree with David about the argument for a SQL-like language, vs. a DSL. As much as SQL might be annoying, it's very intuitive and popular. You can see this playing out in the Hadoop world, where analysts are using SQL almost exclusively, whether that be in Hive, Impala, Shark, Tajo, etc. Just as for windowing semantics, choosing SQL vs. DSL does not mean the other can't be done. Hadoop ended up with several versions of both, as well. In an ideal world though, we'd at least have a very good polished SQL abstraction for Samza.
          Hide
          davidzchen David Chen added a comment -

          I'm really excited about this too.

          +1 on using a parser generator such as ANTLR3 for implementing the parser for the external DSL. It may be useful to have both an external DSL as well as a programmatic interface that in some way mirrors the DSL. If we decide to go with an imperative style for the external DSL, we can implement the internal DSL as a fluent API similar to jOOQ (http://www.jooq.org/).

          I am more in favor of a declarative DSL closer to SQL. I think this will make Samza more appealing to wider group of users since most analysts prefer SQL. If we do decide to go with an imperative style, I would rather move away from using a Pig-like syntax since we are not offering any compatibility with Pig (and IMO Pig looks like COBOL) and towards a syntax that is closer to Scala or even Gradle.

          Show
          davidzchen David Chen added a comment - I'm really excited about this too. +1 on using a parser generator such as ANTLR3 for implementing the parser for the external DSL. It may be useful to have both an external DSL as well as a programmatic interface that in some way mirrors the DSL. If we decide to go with an imperative style for the external DSL, we can implement the internal DSL as a fluent API similar to jOOQ ( http://www.jooq.org/ ). I am more in favor of a declarative DSL closer to SQL. I think this will make Samza more appealing to wider group of users since most analysts prefer SQL. If we do decide to go with an imperative style, I would rather move away from using a Pig-like syntax since we are not offering any compatibility with Pig (and IMO Pig looks like COBOL) and towards a syntax that is closer to Scala or even Gradle.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          It might be a good idea to consider an antlr parser for maintainability if the external DSL path is taken (LL* vs bison/flex's LALR).

          Show
          jonbringhurst Jon Bringhurst added a comment - It might be a good idea to consider an antlr parser for maintainability if the external DSL path is taken (LL* vs bison/flex's LALR).
          Hide
          raulcf Raul Castro Fernandez added a comment -

          Wrapping up early discussions with Chris about high-level languages for Samza:

          Some options for High-Level Languages:
          ----------------------------------------------------
          We discussed about two main options to write a high-level language for Samza:

          • SQL-like
          • Pig/Imperative style

          Following I quickly summarize both approaches:

          #SQL-like

          SQL is intended to primarily access static data. Because Samza reads and processes streams, and these are infinite, SQL is not enough. To process streams, there are mainly two families of streaming-SQL versions, these are CQL and Streambase. The fundamental differences between these two are related to their window semantics:

          ###CQL
          In this model, tuples (events, messages) are assumed to have a timestamp. The processing engine thinks of windows as time-based windows, that is: "Count(A.b) in a window of 30sec", where the window will be defined according to the timestamp included in the events.

          ###StreamSQL
          In this model, tuples(events, messages) also include a timestamp. However, this model is more suitable for tuple-based windows, of the form: "Count(A.b) in a window of 1000 tuples".

          I think it is important to make the distinction, because that basically changes how one should think about processing streams when windows are required. There is a good paper on this, that explains the differences between both models in detail, and that I think would be worth reading to decide on one, once there are clear use cases for Samza:

          "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf

          #Pig/Imperative style

          In this case, the idea would be to offer an interface similar to Pig. An imperative style way of writing transformations on streams. This model could include a fixed set of operators of different categories. There would be operators to extract/read data from the underlying system (e.g. kafka topics), stateless operators to perform filtering/map/etc, and there could be stateful operators or stateful constructs (these are basic operators that take 2 inputs, the input stream and the state) that would operate on state defined by users.

          Implementation:
          ---------------------

          #SQL

          A declarative representation of a query would demand 3 things:

          1- Parse the query
          2- Build an execution tree (that would roughly map to the dataflow graph)
          3- Optimize (rewriting the query to preserve the semantics but make it cheaper)

          Apache Optiq gives these three, however there are a bunch of things that should be addressed before anticipating that it is a solution:

          1- Parse the query
          Queries for Samza would include windows, therefore, the parser would need to understand window semantics as well
          2- Execution tree
          This would probably stay more or less the same
          3- Optimization
          This changes a lot. First of all, windows introduce new challenges when one wants to rewrite the query, as pushing an operator upstream can change the semantics, for example. There are a bunch of theory and more practical oriented papers on this, but it seems like a big problem. All I am saying is that before even start thinking on optimizing this, it would be good to have a clear understanding of the requirements. On top of that, the nature of stream processing may lead to network bottlenecks, which would also need to be included in the optimization algorithm. And if you start thinking about joins... very interesting!

          #Imperative style

          In this case it would be needed to write a language and a parser and compiler for such language. This alone is a bunch of work. There is one alternative, writing a DSL. By using a DSL, we get rid of many of the problems of the parser/optimizer. One could think that the problem with DSLs is the efficiency, but this is not that important in this case, where we will have an "optimization" phase anyway.

          So these are the two options that I see for DSL:

          ##Internal DSL
          Meaning that it will basically be part of the parent language, i.e. a library. This is good for fast prototyping and trying new operators, etc. I think it can easily be used by building the appropriate tooling around the DSL.

          ##External DSL
          In this case they have their own syntax. This means that they require a lexer/parser. Luckily enough, there are a bunch of tools to fix this. There is the YACC, javaCC faimily of tools for JVM languages. There is also a more interesting concept (never tried tough) in Scala, (with Parser Combinators).

          So I guess, that one first good step towards this would be to really think which one of the two abstractions is preferable, i.e. stream-SQL or imperative-style DSL. Note that regardless, once there is a vertical solution for one of these--from query to run--it will be possible to write one for the other case.

          Show
          raulcf Raul Castro Fernandez added a comment - Wrapping up early discussions with Chris about high-level languages for Samza: Some options for High-Level Languages: ---------------------------------------------------- We discussed about two main options to write a high-level language for Samza: SQL-like Pig/Imperative style Following I quickly summarize both approaches: #SQL-like SQL is intended to primarily access static data. Because Samza reads and processes streams, and these are infinite, SQL is not enough. To process streams, there are mainly two families of streaming-SQL versions, these are CQL and Streambase. The fundamental differences between these two are related to their window semantics: ###CQL In this model, tuples (events, messages) are assumed to have a timestamp. The processing engine thinks of windows as time-based windows, that is: "Count(A.b) in a window of 30sec", where the window will be defined according to the timestamp included in the events. ###StreamSQL In this model, tuples(events, messages) also include a timestamp. However, this model is more suitable for tuple-based windows, of the form: "Count(A.b) in a window of 1000 tuples". I think it is important to make the distinction, because that basically changes how one should think about processing streams when windows are required. There is a good paper on this, that explains the differences between both models in detail, and that I think would be worth reading to decide on one, once there are clear use cases for Samza: "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf #Pig/Imperative style In this case, the idea would be to offer an interface similar to Pig. An imperative style way of writing transformations on streams. This model could include a fixed set of operators of different categories. There would be operators to extract/read data from the underlying system (e.g. kafka topics), stateless operators to perform filtering/map/etc, and there could be stateful operators or stateful constructs (these are basic operators that take 2 inputs, the input stream and the state) that would operate on state defined by users. Implementation: --------------------- #SQL A declarative representation of a query would demand 3 things: 1- Parse the query 2- Build an execution tree (that would roughly map to the dataflow graph) 3- Optimize (rewriting the query to preserve the semantics but make it cheaper) Apache Optiq gives these three, however there are a bunch of things that should be addressed before anticipating that it is a solution: 1- Parse the query Queries for Samza would include windows, therefore, the parser would need to understand window semantics as well 2- Execution tree This would probably stay more or less the same 3- Optimization This changes a lot. First of all, windows introduce new challenges when one wants to rewrite the query, as pushing an operator upstream can change the semantics, for example. There are a bunch of theory and more practical oriented papers on this, but it seems like a big problem. All I am saying is that before even start thinking on optimizing this, it would be good to have a clear understanding of the requirements. On top of that, the nature of stream processing may lead to network bottlenecks, which would also need to be included in the optimization algorithm. And if you start thinking about joins... very interesting! #Imperative style In this case it would be needed to write a language and a parser and compiler for such language. This alone is a bunch of work. There is one alternative, writing a DSL. By using a DSL, we get rid of many of the problems of the parser/optimizer. One could think that the problem with DSLs is the efficiency, but this is not that important in this case, where we will have an "optimization" phase anyway. So these are the two options that I see for DSL: ##Internal DSL Meaning that it will basically be part of the parent language, i.e. a library. This is good for fast prototyping and trying new operators, etc. I think it can easily be used by building the appropriate tooling around the DSL. ##External DSL In this case they have their own syntax. This means that they require a lexer/parser. Luckily enough, there are a bunch of tools to fix this. There is the YACC, javaCC faimily of tools for JVM languages. There is also a more interesting concept (never tried tough) in Scala, (with Parser Combinators). So I guess, that one first good step towards this would be to really think which one of the two abstractions is preferable, i.e. stream-SQL or imperative-style DSL. Note that regardless, once there is a vertical solution for one of these-- from query to run --it will be possible to write one for the other case.

            People

            • Assignee:
              nickpan47 Yi Pan (Data Infrastructure)
              Reporter:
              raulcf Raul Castro Fernandez
            • Votes:
              3 Vote for this issue
              Watchers:
              30 Start watching this issue

              Dates

              • Created:
                Updated:

                Development