Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-9240

Support parallel ETL with the topic expression

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Resolved
    • Affects Version/s: None
    • Fix Version/s: 6.2
    • Component/s: None
    • Labels:
      None

      Description

      It would be useful for SolrCloud to support large scale Extract, Transform and Load work loads with streaming expressions. Instead of using MapReduce for ETL, the topic expression can be used which allows SolrCloud to be treated like a distributed message queue filled with data to be processed. The topic expression works in batches and supports retrieval of stored fields, so large scale text ETL will work well with this approach.

      This ticket makes two changes to the topic() expression that makes parallel ETL possible:

      1) Changes the topic expression so it can operate in parallel.
      2) Adds the initialCheckpoint parameter to the topic expression so a topic can start pulling records from anywhere in the queue.

      Daemons can be sent to worker nodes that each work on processing a partition of the data from the same topic. The daemon() function's natural behavior is perfect for iteratively calling a topic until all records in the topic have been processed.

      The sample code below pulls all records from one collection and indexes them into another collection. A Transform function could be wrapped around the topic() to transform the records before loading. Custom functions can also be built to load the data in parallel to any outside system.

      
      parallel(
               workerCollection, 
               workers="2", 
               sort="DaemonOp desc", 
               daemon(
                      update(
                            updateCollection, 
                            batchSize=200, 
                            topic(
                                  checkpointCollection,
                                  topicCollection, 
                                  q=*:*, 
                                  id="topic1",
                                  fl="id, to , from, body", 
                                  partitionKeys="id",
                                  initialCheckpoint="0")), 
                      runInterval="1000", 
                      id="daemon1"))
      
      1. SOLR-9240.patch
        22 kB
        Joel Bernstein
      2. SOLR-9240.patch
        19 kB
        Joel Bernstein

        Issue Links

          Activity

          Hide
          joel.bernstein Joel Bernstein added a comment -

          After reviewing the TopicStream I found that it already supports the partitionKeys parameter because it's using the SolrStream under the covers, where the partitioning logic resides.

          So all that needs to be done on this ticket is to add parallel(topic()) test cases and fix any issues that arise.

          Show
          joel.bernstein Joel Bernstein added a comment - After reviewing the TopicStream I found that it already supports the partitionKeys parameter because it's using the SolrStream under the covers, where the partitioning logic resides. So all that needs to be done on this ticket is to add parallel(topic()) test cases and fix any issues that arise.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          While working on the test case for this I found that there is small change that needs to be made to support the topic() function in parallel. Each worker needs to maintain it's own set of checkpoints, so each worker needs a separate topic ID. This is easily solved by appending the worker ID to the topic() ID.

          I'll update the ticket accordingly.

          Show
          joel.bernstein Joel Bernstein added a comment - While working on the test case for this I found that there is small change that needs to be made to support the topic() function in parallel. Each worker needs to maintain it's own set of checkpoints, so each worker needs a separate topic ID. This is easily solved by appending the worker ID to the topic() ID. I'll update the ticket accordingly.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Initial patch.

          Show
          joel.bernstein Joel Bernstein added a comment - Initial patch.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch that fixes some tests that were broken in the initial patch.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch that fixes some tests that were broken in the initial patch.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          This ticket is looking fairly good. I did a round of manual testing with the expression below which worked as expected.

          parallel(
                   workerCollection, 
                   workers="2", 
                   sort="DaemonOp desc", 
                   daemon(
                            update(
                                  updateCollection, 
                                  batchSize=200, 
                                  topic(
                                      checkpointCollection,
                                      topicCollection, 
                                      q=*:*, 
                                       id="topic40",
                                       fl="id, to , from", 
                                       partitionKeys="id",
                                       initialCheckpoint="0")), 
                         runInterval="1000", 
                         id="test3"))
          

          This expression sends a daemon expression to two worker nodes. The daemon is wrapping an update expression which is wrapping a topic() expression. The topic has the new initialCheckpoint parameter so it starts pulling records from checkpoint 0, which includes every record that matches the topic query in the index. The topic also has the partitionKeys parameter so each worker pulls a partition of records that match the topic query.

          The daemon function will run the update() function iteratively. Each run will update the topic checkpoints for each worker.

          The effect of this is that each worker will iterate though it's partition of the topic query, reindexing all the records that match the topic in another collection.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited This ticket is looking fairly good. I did a round of manual testing with the expression below which worked as expected. parallel( workerCollection, workers= "2" , sort= "DaemonOp desc" , daemon( update( updateCollection, batchSize=200, topic( checkpointCollection, topicCollection, q=*:*, id= "topic40" , fl= "id, to , from" , partitionKeys= "id" , initialCheckpoint= "0" )), runInterval= "1000" , id= "test3" )) This expression sends a daemon expression to two worker nodes. The daemon is wrapping an update expression which is wrapping a topic() expression. The topic has the new initialCheckpoint parameter so it starts pulling records from checkpoint 0, which includes every record that matches the topic query in the index. The topic also has the partitionKeys parameter so each worker pulls a partition of records that match the topic query. The daemon function will run the update() function iteratively. Each run will update the topic checkpoints for each worker. The effect of this is that each worker will iterate though it's partition of the topic query, reindexing all the records that match the topic in another collection.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit fc3894e837701b78a4704cf27529c34c15666586 in lucene-solr's branch refs/heads/master from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=fc3894e ]

          SOLR-9240:Support running the topic() Streaming Expression in parallel mode.

          Show
          jira-bot ASF subversion and git services added a comment - Commit fc3894e837701b78a4704cf27529c34c15666586 in lucene-solr's branch refs/heads/master from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=fc3894e ] SOLR-9240 :Support running the topic() Streaming Expression in parallel mode.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit c3c1f8d6e6cb57cb30e736d5ff0387400729d216 in lucene-solr's branch refs/heads/master from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=c3c1f8d ]

          SOLR-9240: Added testcase with text field in the fl for topic

          Show
          jira-bot ASF subversion and git services added a comment - Commit c3c1f8d6e6cb57cb30e736d5ff0387400729d216 in lucene-solr's branch refs/heads/master from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=c3c1f8d ] SOLR-9240 : Added testcase with text field in the fl for topic
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit c6df1868a0cdf79b4d4f8b8cd5fc58cf9794d6dc in lucene-solr's branch refs/heads/branch_6x from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=c6df186 ]

          SOLR-9240:Support running the topic() Streaming Expression in parallel mode.

          Show
          jira-bot ASF subversion and git services added a comment - Commit c6df1868a0cdf79b4d4f8b8cd5fc58cf9794d6dc in lucene-solr's branch refs/heads/branch_6x from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=c6df186 ] SOLR-9240 :Support running the topic() Streaming Expression in parallel mode.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 10f3700e725edb9793c76ad367edfb70f97b34a8 in lucene-solr's branch refs/heads/master from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=10f3700 ]

          SOLR-9240: Update CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit 10f3700e725edb9793c76ad367edfb70f97b34a8 in lucene-solr's branch refs/heads/master from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=10f3700 ] SOLR-9240 : Update CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 75d3243647923c462a345205d08e0fbb6dbe73f3 in lucene-solr's branch refs/heads/branch_6x from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=75d3243 ]

          SOLR-9240: Update CHANGES.txt

          Conflicts:
          solr/CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit 75d3243647923c462a345205d08e0fbb6dbe73f3 in lucene-solr's branch refs/heads/branch_6x from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=75d3243 ] SOLR-9240 : Update CHANGES.txt Conflicts: solr/CHANGES.txt
          Hide
          mikemccand Michael McCandless added a comment -

          Bulk close resolved issues after 6.2.0 release.

          Show
          mikemccand Michael McCandless added a comment - Bulk close resolved issues after 6.2.0 release.

            People

            • Assignee:
              joel.bernstein Joel Bernstein
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development