Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      Current implementation of Hedwig and BookKeeper is designed to scale to hundreds of thousands of topics, but now we are looking at scaling them to tens to hundreds of millions of topics, using a scalable key/value store such as HBase.

      1. hedwigscale.pdf
        86 kB
        Sijie Guo
      2. hedwigscale.pdf
        84 kB
        Sijie Guo

        Issue Links

          Activity

          Hide
          Sijie Guo added a comment -

          Attach a draft design document of scaling hedwig/bookkeeper for discussion.

          And I think this jira is a master task doing this work. It would better to divide it into several subtasks.

          • Provide a MetaStore interface, and a possible implementation such as HBase.
          • BookKeeper
            • Provide a clean LedgerManager interface to move zookeeper-related meta operations to zk-based ledger manager.
            • Implement a MetaStore-based ledger manager to leverage scalability of a scalable MetaStore.
          • Hedwig
            • Provide a meta operation interface HedwigMetaManager over hedwig, just similar as what LedgerManager does in bookkeeper.
            • Implement a MetaStore-based Hedwig meta manager.
          Show
          Sijie Guo added a comment - Attach a draft design document of scaling hedwig/bookkeeper for discussion. And I think this jira is a master task doing this work. It would better to divide it into several subtasks. Provide a MetaStore interface, and a possible implementation such as HBase. BookKeeper Provide a clean LedgerManager interface to move zookeeper-related meta operations to zk-based ledger manager. Implement a MetaStore-based ledger manager to leverage scalability of a scalable MetaStore. Hedwig Provide a meta operation interface HedwigMetaManager over hedwig, just similar as what LedgerManager does in bookkeeper. Implement a MetaStore-based Hedwig meta manager.
          Hide
          Flavio Junqueira added a comment -

          Hi Sijie, I agree with creating sub-tasks for this umbrella jira. For bookkeeper, we need to access ledger metadata both from clients and bookies, right? We may want to have a separate task for each, even though they will most likely rely upon the same interface to the metadata repository.

          I'd like to clarify one point. In my understanding, we still plan to use zookeeper for node availability, while we move all metadata to another scalable data store, such as hbase. Is this correct? If so, the plugable interface will allow the use of different repositories for the metadata part, but we will still rely upon zookeeper to monitor node availability.

          I have a few other comments about the design doc:

          1. In the definition of the compare-and-swap operation, the comparison is performed using the key and value itself. This might be expensive, so I was wondering if it is a better approach to use versions instead. The drawback is relying upon a backend that provides versioned data. It seems fine for me, though.
          2. Related to the previous comment, it might be a better idea to state somewhere what properties we require from the backend store.
          3. I'm not entirely sure I understand the implementation of leader election in 5.1. What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic? Does it find out via session expiration? Also, I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen.
          Show
          Flavio Junqueira added a comment - Hi Sijie, I agree with creating sub-tasks for this umbrella jira. For bookkeeper, we need to access ledger metadata both from clients and bookies, right? We may want to have a separate task for each, even though they will most likely rely upon the same interface to the metadata repository. I'd like to clarify one point. In my understanding, we still plan to use zookeeper for node availability, while we move all metadata to another scalable data store, such as hbase. Is this correct? If so, the plugable interface will allow the use of different repositories for the metadata part, but we will still rely upon zookeeper to monitor node availability. I have a few other comments about the design doc: In the definition of the compare-and-swap operation, the comparison is performed using the key and value itself. This might be expensive, so I was wondering if it is a better approach to use versions instead. The drawback is relying upon a backend that provides versioned data. It seems fine for me, though. Related to the previous comment, it might be a better idea to state somewhere what properties we require from the backend store. I'm not entirely sure I understand the implementation of leader election in 5.1. What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic? Does it find out via session expiration? Also, I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen.
          Hide
          Sijie Guo added a comment -

          Thanks for Flavio's comments.

          > For bookkeeper, we need to access ledger metadata both from clients and bookies, right?

          yes. the implementation of metastore based ledger manager should be two tasks, one is client, the other one is server. the server part would depends on the client part, because client part handles how to store ledger metadata, server part handles how to garbage collect ledgers.

          > Is this correct? If so, the plugable interface will allow the use of different repositories for the metadata part, but we will still rely upon zookeeper to monitor node availability.

          yes. we still use zookeeper for node availability, while moving metadata operations to different storage.

          > In the definition of the compare-and-swap operation, the comparison is performed using the key and value itself. This might be expensive, so I was wondering if it is a better approach to use versions instead. The drawback is relying upon a backend that provides versioned data. It seems fine for me, though.

          in the proposal, the comparison operation is just applied in a cell (located by (key,family,qualifier), while the set operation can be applied on multiple cells.
          for example, suppose we have two columns, one column is data column, which is used to store actual data; while the other one is version column, which is used to store a incremented number. the initial value is (oldData, 0). when we want to update data column, we executed by CAS (key, 0, key, (newData, 1)). the comparison is applied only on version column, is not on data column, which is not expensive.

          As my knowledge, zk#setData provides a conditional set over version, the set operation succeeds only when the given matches the version of the znode, which is a kind of CAS. CAS would be better to support more K/V stores.

          > Related to the previous comment, it might be a better idea to state somewhere what properties we require from the backend store.

          I think I have put them in section 3, the operations required by a MetaStore.

          > I'm not entirely sure I understand the implementation of leader election in 5.1. What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic? Does it find out via session expiration? Also, I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen.

          >> I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen.

          doesn't this case exit using zookeeper? it seems that there is still a gap between hub crashed and znode deletion (session expired). in metastore-based topic manager, this gap becomes hub crashed and other hub server got notified about hub crashed.

          >> What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic?

          if a hub server is not crashed, other hub server would not receive the notification from zookeeper about that hub crashed (can zookeeper guarantee it?). so ownership would not change, since other hub server still see a same zxid about that hub server.

          Show
          Sijie Guo added a comment - Thanks for Flavio's comments. > For bookkeeper, we need to access ledger metadata both from clients and bookies, right? yes. the implementation of metastore based ledger manager should be two tasks, one is client, the other one is server. the server part would depends on the client part, because client part handles how to store ledger metadata, server part handles how to garbage collect ledgers. > Is this correct? If so, the plugable interface will allow the use of different repositories for the metadata part, but we will still rely upon zookeeper to monitor node availability. yes. we still use zookeeper for node availability, while moving metadata operations to different storage. > In the definition of the compare-and-swap operation, the comparison is performed using the key and value itself. This might be expensive, so I was wondering if it is a better approach to use versions instead. The drawback is relying upon a backend that provides versioned data. It seems fine for me, though. in the proposal, the comparison operation is just applied in a cell (located by (key,family,qualifier), while the set operation can be applied on multiple cells. for example, suppose we have two columns, one column is data column, which is used to store actual data; while the other one is version column, which is used to store a incremented number. the initial value is (oldData, 0). when we want to update data column, we executed by CAS (key, 0, key, (newData, 1)). the comparison is applied only on version column, is not on data column, which is not expensive. As my knowledge, zk#setData provides a conditional set over version, the set operation succeeds only when the given matches the version of the znode, which is a kind of CAS. CAS would be better to support more K/V stores. > Related to the previous comment, it might be a better idea to state somewhere what properties we require from the backend store. I think I have put them in section 3, the operations required by a MetaStore. > I'm not entirely sure I understand the implementation of leader election in 5.1. What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic? Does it find out via session expiration? Also, I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen. >> I suppose that if the hub has crashed but the list of hubs hasn't changed, then multiple iterations of 1 may have to happen. doesn't this case exit using zookeeper? it seems that there is still a gap between hub crashed and znode deletion (session expired). in metastore-based topic manager, this gap becomes hub crashed and other hub server got notified about hub crashed. >> What happens if a hub is incorrectly suspected of crashing and it loses ownership over a topic? if a hub server is not crashed, other hub server would not receive the notification from zookeeper about that hub crashed (can zookeeper guarantee it?). so ownership would not change, since other hub server still see a same zxid about that hub server.
          Hide
          Flavio Junqueira added a comment -

          in the proposal, the comparison operation is just applied in a cell (located by (key,family,qualifier), while the set operation can be applied on multiple cells.

          for example, suppose we have two columns, one column is data column, which is used to store actual data; while the other one is version column, which is used to store a incremented number. the initial value is (oldData, 0). when we want to update data column, we executed by CAS (key, 0, key, (newData, 1)). the comparison is applied only on version column, is not on data column, which is not expensive.

          I was thinking that in the case of hbase, we have versions for the rows already, so we could use those to perform the comparison: if the version passed as input is not current, then we don't swap. I agree that it restricts the kv-stores we could use, though. The approach you describe is more general.

          As my knowledge, zk#setData provides a conditional set over version, the set operation succeeds only when the given matches the version of the znode, which is a kind of CAS. CAS would be better to support more K/V stores.

          Exactly, the comparison is performed using versions (int).

          I think I have put them in section 3, the operations required by a MetaStore.

          That's right, the beginning of Section 3 talks about it. I read it as the set of operations that we execute against the metadata store as opposed to the semantics we expect from the store.

          doesn't this case exit using zookeeper? it seems that there is still a gap between hub crashed and znode deletion (session expired). in metastore-based topic manager, this gap becomes hub crashed and other hub server got notified about hub crashed.

          Yes, it exists with zookeeper, but the current design document does not reflect it. You may consider updating it.

          if a hub server is not crashed, other hub server would not receive the notification from zookeeper about that hub crashed (can zookeeper guarantee it?). so ownership would not change, since other hub server still see a same zxid about that hub server.

          It was not entirely clear to me that you're assuming zookeeper in the document for this part. If that's the assumption, then it seems fine to me.

          Show
          Flavio Junqueira added a comment - in the proposal, the comparison operation is just applied in a cell (located by (key,family,qualifier), while the set operation can be applied on multiple cells. for example, suppose we have two columns, one column is data column, which is used to store actual data; while the other one is version column, which is used to store a incremented number. the initial value is (oldData, 0). when we want to update data column, we executed by CAS (key, 0, key, (newData, 1)). the comparison is applied only on version column, is not on data column, which is not expensive. I was thinking that in the case of hbase, we have versions for the rows already, so we could use those to perform the comparison: if the version passed as input is not current, then we don't swap. I agree that it restricts the kv-stores we could use, though. The approach you describe is more general. As my knowledge, zk#setData provides a conditional set over version, the set operation succeeds only when the given matches the version of the znode, which is a kind of CAS. CAS would be better to support more K/V stores. Exactly, the comparison is performed using versions (int). I think I have put them in section 3, the operations required by a MetaStore. That's right, the beginning of Section 3 talks about it. I read it as the set of operations that we execute against the metadata store as opposed to the semantics we expect from the store. doesn't this case exit using zookeeper? it seems that there is still a gap between hub crashed and znode deletion (session expired). in metastore-based topic manager, this gap becomes hub crashed and other hub server got notified about hub crashed. Yes, it exists with zookeeper, but the current design document does not reflect it. You may consider updating it. if a hub server is not crashed, other hub server would not receive the notification from zookeeper about that hub crashed (can zookeeper guarantee it?). so ownership would not change, since other hub server still see a same zxid about that hub server. It was not entirely clear to me that you're assuming zookeeper in the document for this part. If that's the assumption, then it seems fine to me.
          Hide
          Sijie Guo added a comment -

          > I was thinking that in the case of hbase, we have versions for the rows already, so we could use those to perform the comparison: if the version passed as input is not current, then we don't swap.

          hmm, hbase did provide versions for rows. but I not sure that HBase provide such functionality to check current version and swap. I think it just provide set a value associate with a version, not a CAS by version.

          > Yes, it exists with zookeeper, but the current design document does not reflect it. You may consider updating it.

          will update the document.

          > It was not entirely clear to me that you're assuming zookeeper in the document for this part. If that's the assumption, then it seems fine to me.

          will modify the document to make this part more clearly.

          Show
          Sijie Guo added a comment - > I was thinking that in the case of hbase, we have versions for the rows already, so we could use those to perform the comparison: if the version passed as input is not current, then we don't swap. hmm, hbase did provide versions for rows. but I not sure that HBase provide such functionality to check current version and swap. I think it just provide set a value associate with a version, not a CAS by version. > Yes, it exists with zookeeper, but the current design document does not reflect it. You may consider updating it. will update the document. > It was not entirely clear to me that you're assuming zookeeper in the document for this part. If that's the assumption, then it seems fine to me. will modify the document to make this part more clearly.
          Hide
          Flavio Junqueira added a comment -

          hmm, hbase did provide versions for rows. but I not sure that HBase provide such functionality to check current version and swap. I think it just provide set a value associate with a version, not a CAS by version.

          You're right, checkAndPut does not allow us to use the internal version to perform the check. I actually found a jira discussing it: HBASE-4527. The recommended way described there is to have our own version column.

          Show
          Flavio Junqueira added a comment - hmm, hbase did provide versions for rows. but I not sure that HBase provide such functionality to check current version and swap. I think it just provide set a value associate with a version, not a CAS by version. You're right, checkAndPut does not allow us to use the internal version to perform the check. I actually found a jira discussing it: HBASE-4527 . The recommended way described there is to have our own version column.
          Hide
          Sijie Guo added a comment -

          try to attach a new version of design doc, which improve the leader election section.

          Show
          Sijie Guo added a comment - try to attach a new version of design doc, which improve the leader election section.
          Hide
          Sijie Guo added a comment -

          as discussion offline about whether using bytes array or String as a key, I prefer using bytes array. because bytes array is a more generic representation for data, which it doesn't need to care about string character set. Actually in hedwig, the topic name is provide thru protobuf ByteString, if we want to use topic name or subscriber name as key, bytes array is a better solution.

          Also bytes array is more flexible for application to decide how to serialize/form a key.

          Show
          Sijie Guo added a comment - as discussion offline about whether using bytes array or String as a key, I prefer using bytes array. because bytes array is a more generic representation for data, which it doesn't need to care about string character set. Actually in hedwig, the topic name is provide thru protobuf ByteString, if we want to use topic name or subscriber name as key, bytes array is a better solution. Also bytes array is more flexible for application to decide how to serialize/form a key.
          Hide
          Flavio Junqueira added a comment -

          I'm marking this feature to be available in 4.2.0 for now, but please feel free to disagree.

          Show
          Flavio Junqueira added a comment - I'm marking this feature to be available in 4.2.0 for now, but please feel free to disagree.
          Hide
          Roger Bush added a comment -

          I'd like to weigh in on the concept of accommodating more store types through having a more abstract interface, without sacrificing the integrity of BK/Hedwig.

          Thinking about the basics that all KV stores have, the first concept is "table". All KV stores can map a table (even if it is only a filename in an mdbm). So clearly the table concept is such a common feature, we should have something that maps to this.

          A related question is, do we want to encode the concept of "column families" into the interface? This is a relatively newer concept that not all kv stores have. In fact, we should be careful to understand the different concepts here, as there are a few that are usually bound together. One is that you can have, in principal, an infinite number of columns. A second concept is that stores may provide different levels of transactional semantics around the row object (and thus, may or may not provide multi-column transactional updates). In the future, NoSQL vendors will probably standardize on a few important concepts in this area, but in my opinion we a ways off from this happening (to a level of providing reasonable choice among mature candidate technologies).

          However, the current implementation BK and Hedwig do not need these concepts (since they do without them currently). I would argue that adding column families restricts the types of K/V stores you can use, unnecessarily.

          A simple workaround, which provides ability to work with both store types, is to instead rely on non-transactional, infinite rows (which many kv stores have). Using this more abundantly available feature, would map column families to tables. For example, have a table whose name is = TABLE_NAME::COLUMN_NAME, for each column family.

          Concerning the possible transactional row update semantics: since BK and Hedwig currently don't rely on this, should they start unnecessarily? Not assuming column families communicates there is no need for transactional row update semantics implicitly through the interface.

          Relying on less (since Hedwig/BK prove they don't need it) makes our components interoperate with more choice. It does require a tiny bit of ingenuity on the plugin implementers part, but not much. And further this can be laid out as implementation notes somewhere with the source.

          Another thing I've been thinking about is SCAN. Scan is extremely useful for offline administrative tasks (like garbage collection). It is far less useful for serving operations - at least if you plan on using them without bounds. The simple example that caught my eye as I was reading Sijie's document was Bookie recovery. Are we really going to scan through all of the ledgers to recover? While this seems like a simple, generic scheme, it may not be the one we'd want to use. Do we really want the recovery process to be somehow related to a possibly very large scan time? Do we really need scan for recovery? Or is there an application implementation that does not use scan and that brings up Bookie's quickly? Scanning big tables is usually one of those application mistakes that needs to get fixed, regardless of whether you are using a NoSQL or SQL database (I'm thinking of the many times I have heard someone say derisively "they were scanning the biggest DB table" - as the "problem" that had to get fixed). I have to look more at the BK and Hedwig code to understand this.

          Show
          Roger Bush added a comment - I'd like to weigh in on the concept of accommodating more store types through having a more abstract interface, without sacrificing the integrity of BK/Hedwig. Thinking about the basics that all KV stores have, the first concept is "table". All KV stores can map a table (even if it is only a filename in an mdbm). So clearly the table concept is such a common feature, we should have something that maps to this. A related question is, do we want to encode the concept of "column families" into the interface? This is a relatively newer concept that not all kv stores have. In fact, we should be careful to understand the different concepts here, as there are a few that are usually bound together. One is that you can have, in principal, an infinite number of columns. A second concept is that stores may provide different levels of transactional semantics around the row object (and thus, may or may not provide multi-column transactional updates). In the future, NoSQL vendors will probably standardize on a few important concepts in this area, but in my opinion we a ways off from this happening (to a level of providing reasonable choice among mature candidate technologies). However, the current implementation BK and Hedwig do not need these concepts (since they do without them currently). I would argue that adding column families restricts the types of K/V stores you can use, unnecessarily. A simple workaround, which provides ability to work with both store types, is to instead rely on non-transactional, infinite rows (which many kv stores have). Using this more abundantly available feature, would map column families to tables. For example, have a table whose name is = TABLE_NAME::COLUMN_NAME, for each column family. Concerning the possible transactional row update semantics: since BK and Hedwig currently don't rely on this, should they start unnecessarily? Not assuming column families communicates there is no need for transactional row update semantics implicitly through the interface. Relying on less (since Hedwig/BK prove they don't need it) makes our components interoperate with more choice. It does require a tiny bit of ingenuity on the plugin implementers part, but not much. And further this can be laid out as implementation notes somewhere with the source. Another thing I've been thinking about is SCAN. Scan is extremely useful for offline administrative tasks (like garbage collection). It is far less useful for serving operations - at least if you plan on using them without bounds. The simple example that caught my eye as I was reading Sijie's document was Bookie recovery. Are we really going to scan through all of the ledgers to recover? While this seems like a simple, generic scheme, it may not be the one we'd want to use. Do we really want the recovery process to be somehow related to a possibly very large scan time? Do we really need scan for recovery? Or is there an application implementation that does not use scan and that brings up Bookie's quickly? Scanning big tables is usually one of those application mistakes that needs to get fixed, regardless of whether you are using a NoSQL or SQL database (I'm thinking of the many times I have heard someone say derisively "they were scanning the biggest DB table" - as the "problem" that had to get fixed). I have to look more at the BK and Hedwig code to understand this.
          Hide
          Roger Bush added a comment -

          Another point I think is useful is to think of this feature as not just "scaling hedwig", but as the ability to dynamically choose many different kv stores. I can see this has some importance for possible future uses of BookKeeper. We already know, that with a tiny bit of ingenuity (already being demonstrated by Hedwig), BookKeeper becomes a scalable, highly available record logger.

          Such a scalable, highly available logger is a very nice piece of technology, which has many applications.

          However, there are many applications where people will not trust any part of their data to something they don't understand, like ZooKeeper. They would want their metadata to be in something more conventional, like a (gasp) Oracle database. In fact, storing important data in ZooKeeper might not even be allowed by their compliance department. We could debate the extent to whether this is a valid concern, but there are definitely lots of places that this would kill the deal.

          In this case, the Pluggable Metadata API comes to the rescue, allowing a simple generic SQL adapter layer (someone else will write this) to work with all SQL databases, thus making BK relevant in new places.

          Show
          Roger Bush added a comment - Another point I think is useful is to think of this feature as not just "scaling hedwig", but as the ability to dynamically choose many different kv stores. I can see this has some importance for possible future uses of BookKeeper. We already know, that with a tiny bit of ingenuity (already being demonstrated by Hedwig), BookKeeper becomes a scalable, highly available record logger. Such a scalable, highly available logger is a very nice piece of technology, which has many applications. However, there are many applications where people will not trust any part of their data to something they don't understand, like ZooKeeper. They would want their metadata to be in something more conventional, like a (gasp) Oracle database. In fact, storing important data in ZooKeeper might not even be allowed by their compliance department. We could debate the extent to whether this is a valid concern, but there are definitely lots of places that this would kill the deal. In this case, the Pluggable Metadata API comes to the rescue, allowing a simple generic SQL adapter layer (someone else will write this) to work with all SQL databases, thus making BK relevant in new places.
          Hide
          Roger Bush added a comment -

          In my last point I failed to mention that SQL stores, while not providing infinite scalability, provide a different quality in their store that is useful. And this is also an argument for kv stores without column families.

          Show
          Roger Bush added a comment - In my last point I failed to mention that SQL stores, while not providing infinite scalability, provide a different quality in their store that is useful. And this is also an argument for kv stores without column families.
          Hide
          Roger Bush added a comment -

          I have some code which demonstrates an abstract plugin kv store. It shows how a plugin can be dynamically loaded and configured using a classname string and config string. A plugin is essentially the "abstract factory" pattern (it currently creates "Table" objects). There is also a "usage Test" which shows all the important idiomatic use cases for the current API. Finally there is a "Mock implementation" that shows the basics of making a kv store implementation.

          To see it work, run the Test.

          What it does not currently do:

          1. Async - this seems easy enough to add once the correct behavior is determined. It's also easier to figure out the properties using synchronous calls first since the code is easier to read.

          2. SCAN - I need to think more about scan to see how it's used. This doesn't really affect how it will look in the API, but I want to know a little more before adding it.

          3. Strings - it uses Strings instead of byte []. This is another easy transform, so it's strings for now.

          4. Implement a real kv store in Java - I actually wrote this in Perl first and wrote MDBM and Riak adapters as proof-of-concept of the dynamic loading.

          I've got it on GitHub as a "scratch pad", to develop the idea before inclusion to BK. The link is public.

          https://github.com/rogerbush8/BookKeeperMetadataPlugin

          Show
          Roger Bush added a comment - I have some code which demonstrates an abstract plugin kv store. It shows how a plugin can be dynamically loaded and configured using a classname string and config string. A plugin is essentially the "abstract factory" pattern (it currently creates "Table" objects). There is also a "usage Test" which shows all the important idiomatic use cases for the current API. Finally there is a "Mock implementation" that shows the basics of making a kv store implementation. To see it work, run the Test. What it does not currently do: 1. Async - this seems easy enough to add once the correct behavior is determined. It's also easier to figure out the properties using synchronous calls first since the code is easier to read. 2. SCAN - I need to think more about scan to see how it's used. This doesn't really affect how it will look in the API, but I want to know a little more before adding it. 3. Strings - it uses Strings instead of byte []. This is another easy transform, so it's strings for now. 4. Implement a real kv store in Java - I actually wrote this in Perl first and wrote MDBM and Riak adapters as proof-of-concept of the dynamic loading. I've got it on GitHub as a "scratch pad", to develop the idea before inclusion to BK. The link is public. https://github.com/rogerbush8/BookKeeperMetadataPlugin
          Hide
          Ivan Kelly added a comment -

          One concern I have with the direction this, is that we seem to be looking ahead, trying to preempt all possible future requirements and then trying to accommodate everything in one go. The problem with this is that is creates a lot of work upfront, which may eventually never be necessary. I think it's better to do something which is simple and which meets our requirements now, and if more requirements come in later, simply change the interface. This interface isn't going to be set in stone once it's in. Also, creating a smaller/simpler interface now, means smaller patches, which will make it much easier for us to review things and get them into trunk.

          I have a couple of comments about the prototype also. I think Versioned should be called VersionedValue or similar. Occurred should be an inner class of Version. Version should even be an inner class of VersionedValue.

          Regarding the sync/async thing, we have two options here. BookKeeper and hedwig clients both assume asyncness in the data store, so for backends with sync apis, such as HBase, there will have to be some sort of adapter in place. We have to decide whether we put this adapter in (1) the metastore layer or in (2) the LedgerManager layer (TopicManager in Hedwig). For (1) the metastore API itself would be completely async. For (2) the MetaStoreLedgerManager would implement an async->sync adapter, and then use the metastore API which would be completely synchronous. The metastore api should be completely async or completely sync to keep the size of implementation down. Personally I prefer option (1). It means backends which already have async APIs work very simply.

          For scan, there are a couple of usecases where we would need to scan everything in a "table". This may have many many thousands of entries, so the api should be cursor based. We have had problems in the past with ZooKeeper's getChildren() api precisely because of this. Since it wasn't cursor based, it would try to pull down the whole list at once, which exceeded the max packet size for ZooKeeper.

          Show
          Ivan Kelly added a comment - One concern I have with the direction this, is that we seem to be looking ahead, trying to preempt all possible future requirements and then trying to accommodate everything in one go. The problem with this is that is creates a lot of work upfront, which may eventually never be necessary. I think it's better to do something which is simple and which meets our requirements now, and if more requirements come in later, simply change the interface. This interface isn't going to be set in stone once it's in. Also, creating a smaller/simpler interface now, means smaller patches, which will make it much easier for us to review things and get them into trunk. I have a couple of comments about the prototype also. I think Versioned should be called VersionedValue or similar. Occurred should be an inner class of Version. Version should even be an inner class of VersionedValue. Regarding the sync/async thing, we have two options here. BookKeeper and hedwig clients both assume asyncness in the data store, so for backends with sync apis, such as HBase, there will have to be some sort of adapter in place. We have to decide whether we put this adapter in (1) the metastore layer or in (2) the LedgerManager layer (TopicManager in Hedwig). For (1) the metastore API itself would be completely async. For (2) the MetaStoreLedgerManager would implement an async->sync adapter, and then use the metastore API which would be completely synchronous. The metastore api should be completely async or completely sync to keep the size of implementation down. Personally I prefer option (1). It means backends which already have async APIs work very simply. For scan, there are a couple of usecases where we would need to scan everything in a "table". This may have many many thousands of entries, so the api should be cursor based. We have had problems in the past with ZooKeeper's getChildren() api precisely because of this. Since it wasn't cursor based, it would try to pull down the whole list at once, which exceeded the max packet size for ZooKeeper.
          Hide
          Roger Bush added a comment -

          Ivan, thanks for your comments.

          You bring up a very good point on the incremental approach, that is my thought as well. We can always accommodate additional requirements using, for example, additional mixin interfaces. Better to do something simple, with a very well understood purpose to solve today's problems, and do it the right way. Let's solve the problems we know we have to solve now, in a simple, understandable way. That was my goal with the prototype, so hopefully that shows (I tried to make it simple, minimal and complete).

          VersionedValue sounds good to me. Concerning Occurred, there is also the -1, 0, +1 metaphor that has been in existence a long time which we could use. As far as making things inner classes, I'm all for hiding details where possible. Keep in mind Version needs to be implemented by the Metadata Store implementation. I'll look at your suggestions in context with the code today and get back to you on that in a separate comment.

          Concerning async/sync, it seems like the Metadata Store API can be made simply async since each of its calls are 1-to-1 with the async store calls. Converting the sync to async (and thus eliminating sync) would be done by using the same callback style as BookKeeper, to make things cohesive. I'm in agreement with you on this one. However, I would also include in the Tests, a way of making the calls synchronous, for some simple tests (they are easier to follow). This doesn't change the need for async tests, but is rather an easy way to write simple, linear tests that are understandable.

          Once you get to something that is not 1-to-1 (api calls to internal async calls), you get some very distasteful looking "continuation-passing style" code, that lacks some sort of framework to make it look nice. Does the adapter you mention use threads or some other technique? It would be good to explore this a little.

          Note we ran into this issue (async/sync) with the ManagedLedger (not to be confused with the LedgerManager you speak of above). ManagedLedger needs to make several async calls and do different things based on the result (in addition to doing error recovery, of course). When we tried to make this pure async, we created some very interesting code (interesting in a way we want to avoid). I'm of the opinion that ManagedLedger needs to be threaded, or have some async framework that makes the code more readable (e.g. continuation passing style framework or otherwise).

          As far as where we put this async->sync adapter, it seems like it needs to be available to the Metadata Store implementer, so this could be a utility library we supply with some notes (it would be nice if we could use this for the ManagedLedger too).

          Concerning scan, there definitely needs to be cursors to break this up into chunks, that is a certainty for scan. However, in the case of BookKeeper, it seems like the use case is simply for deleting ledgers that have been marked delete by Hedwig. In that case, couldn't we just use a simple "dequeue abstraction"? This would avoid the need for scan. Here's a sketch of how I envision this working: Each bookie has it's own "table" for this. This would let the trimmer do some nice scheduling of deletes that maps to the physical layer (we'd want to remove ledgers at just a little bit faster than they are marked delete - and perhaps there is a knob to make this go faster, in emergency situations when we need to recover space). The table for the bookie would have an expanding list (the dequeue) where entries are added at the tail, and removed from the head. There would be another table that would hold a reference to both the head and tail. These would be updated in a fashion such that any system crash simply results in a blank entry just after/below the head/tail. The entries would have a key "1", "2", ... and the value would be the ledger id. The trimmer would remove from the earliest entries (and update it's cursor), and you would add new mark deletes to the tail. You'd need to use CAS on the cursor update, and do things in such a way that failure doesn't strand an entry outside of the bounds of the head/tail cursor. There is wraparound to think of (tail cursor going too high). This must be a technique that already exists, no?

          So if scan can be elided from the BookKeeper requirements by this simple (and hopefully workable) technique, what we could do is have an additional scan java interface for Hedwig, which may need it. The underlying implementation would have to also implement this interface to support Hedwig. This can be checked at load time. We can make some code to show how everything cooperates nicely (to show: some usage code that shows nice, elegant extended requirements, that doesn't need any code duplication).

          Show
          Roger Bush added a comment - Ivan, thanks for your comments. You bring up a very good point on the incremental approach, that is my thought as well. We can always accommodate additional requirements using, for example, additional mixin interfaces. Better to do something simple, with a very well understood purpose to solve today's problems, and do it the right way. Let's solve the problems we know we have to solve now, in a simple, understandable way. That was my goal with the prototype, so hopefully that shows (I tried to make it simple, minimal and complete). VersionedValue sounds good to me. Concerning Occurred, there is also the -1, 0, +1 metaphor that has been in existence a long time which we could use. As far as making things inner classes, I'm all for hiding details where possible. Keep in mind Version needs to be implemented by the Metadata Store implementation. I'll look at your suggestions in context with the code today and get back to you on that in a separate comment. Concerning async/sync, it seems like the Metadata Store API can be made simply async since each of its calls are 1-to-1 with the async store calls. Converting the sync to async (and thus eliminating sync) would be done by using the same callback style as BookKeeper, to make things cohesive. I'm in agreement with you on this one. However, I would also include in the Tests, a way of making the calls synchronous, for some simple tests (they are easier to follow). This doesn't change the need for async tests, but is rather an easy way to write simple, linear tests that are understandable. Once you get to something that is not 1-to-1 (api calls to internal async calls), you get some very distasteful looking "continuation-passing style" code, that lacks some sort of framework to make it look nice. Does the adapter you mention use threads or some other technique? It would be good to explore this a little. Note we ran into this issue (async/sync) with the ManagedLedger (not to be confused with the LedgerManager you speak of above). ManagedLedger needs to make several async calls and do different things based on the result (in addition to doing error recovery, of course). When we tried to make this pure async, we created some very interesting code (interesting in a way we want to avoid). I'm of the opinion that ManagedLedger needs to be threaded, or have some async framework that makes the code more readable (e.g. continuation passing style framework or otherwise). As far as where we put this async->sync adapter, it seems like it needs to be available to the Metadata Store implementer, so this could be a utility library we supply with some notes (it would be nice if we could use this for the ManagedLedger too). Concerning scan, there definitely needs to be cursors to break this up into chunks, that is a certainty for scan. However, in the case of BookKeeper, it seems like the use case is simply for deleting ledgers that have been marked delete by Hedwig. In that case, couldn't we just use a simple "dequeue abstraction"? This would avoid the need for scan. Here's a sketch of how I envision this working: Each bookie has it's own "table" for this. This would let the trimmer do some nice scheduling of deletes that maps to the physical layer (we'd want to remove ledgers at just a little bit faster than they are marked delete - and perhaps there is a knob to make this go faster, in emergency situations when we need to recover space). The table for the bookie would have an expanding list (the dequeue) where entries are added at the tail, and removed from the head. There would be another table that would hold a reference to both the head and tail. These would be updated in a fashion such that any system crash simply results in a blank entry just after/below the head/tail. The entries would have a key "1", "2", ... and the value would be the ledger id. The trimmer would remove from the earliest entries (and update it's cursor), and you would add new mark deletes to the tail. You'd need to use CAS on the cursor update, and do things in such a way that failure doesn't strand an entry outside of the bounds of the head/tail cursor. There is wraparound to think of (tail cursor going too high). This must be a technique that already exists, no? So if scan can be elided from the BookKeeper requirements by this simple (and hopefully workable) technique, what we could do is have an additional scan java interface for Hedwig, which may need it. The underlying implementation would have to also implement this interface to support Hedwig. This can be checked at load time. We can make some code to show how everything cooperates nicely (to show: some usage code that shows nice, elegant extended requirements, that doesn't need any code duplication).
          Hide
          Flavio Junqueira added a comment -

          I really like the idea of revisiting the way we garbage-collect ledgers. I'm not sure I fully grasp the dequeue idea, but I can see that there might be some simple, more efficient ways to do it without having to scan all ledger metadata; for example, by keeping a table of deleted ledgers.

          Different schemes might require multiple operations against the data store, which may lead to higher latency. It is probably worth keeping it in mind.

          Show
          Flavio Junqueira added a comment - I really like the idea of revisiting the way we garbage-collect ledgers. I'm not sure I fully grasp the dequeue idea, but I can see that there might be some simple, more efficient ways to do it without having to scan all ledger metadata; for example, by keeping a table of deleted ledgers. Different schemes might require multiple operations against the data store, which may lead to higher latency. It is probably worth keeping it in mind.
          Hide
          Sijie Guo added a comment -

          @Flavio

          > for example, by keeping a table of deleted ledgers.

          I had considered the idea to keeping a table of deleted ledgers, when I implemented metastore-based ledger manager before. It seems easy, but actually it doesn't. The critical problem is how to remove the items from deleted ledgers table. if we don't remove them, the table will grow and encounter the same issue as SCAN.

          removing the item should need the co-ordination between bookie servers who own the deleted ledgers. one possible solution is using reference counting mechanism, each deleted ledger should have a list of bookie who owed it. each bookie delete its reference from the list when it finished garbage collecting that ledger. the last one who deleted reference, delete the ledger item from table.

          even we have mechanism to remove items from delete table, but we still need SCAN to get items from deleted ledgers table.

          @Roger,

          I am not sure that I understood the idea of dequeue. could you explain more about it?

          Show
          Sijie Guo added a comment - @Flavio > for example, by keeping a table of deleted ledgers. I had considered the idea to keeping a table of deleted ledgers, when I implemented metastore-based ledger manager before. It seems easy, but actually it doesn't. The critical problem is how to remove the items from deleted ledgers table. if we don't remove them, the table will grow and encounter the same issue as SCAN. removing the item should need the co-ordination between bookie servers who own the deleted ledgers. one possible solution is using reference counting mechanism, each deleted ledger should have a list of bookie who owed it. each bookie delete its reference from the list when it finished garbage collecting that ledger. the last one who deleted reference, delete the ledger item from table. even we have mechanism to remove items from delete table, but we still need SCAN to get items from deleted ledgers table. @Roger, I am not sure that I understood the idea of dequeue. could you explain more about it?
          Hide
          Sijie Guo added a comment -

          > Regarding the sync/async thing, we have two options here.

          I would prefer options 1, in metastore layer. BookKeeper & Hedwig should use async calls to talk with metastore, which make implementing metastore-based ledger manager & topic manager much easier. From my previous work, I use asynchbase library which provides async calls.

          BTW, I am not sure a versioned api is enough for hedwig leader election. Maybe we need a CAS method for it. so I would suggest that it would better to provide the ledger election algorithm using the proposal api.

          Show
          Sijie Guo added a comment - > Regarding the sync/async thing, we have two options here. I would prefer options 1, in metastore layer. BookKeeper & Hedwig should use async calls to talk with metastore, which make implementing metastore-based ledger manager & topic manager much easier. From my previous work, I use asynchbase library which provides async calls. BTW, I am not sure a versioned api is enough for hedwig leader election. Maybe we need a CAS method for it. so I would suggest that it would better to provide the ledger election algorithm using the proposal api.
          Hide
          Flavio Junqueira added a comment -

          one possible solution is using reference counting mechanism, each deleted ledger should have a list of bookie who owed it. each bookie delete its reference from the list when it finished garbage collecting that ledger. the last one who deleted reference, delete the ledger item from table.

          This is what I had in mind.

          even we have mechanism to remove items from delete table, but we still need SCAN to get items from deleted ledgers table.

          Agreed, I'm not saying that we don't need scans at all, but possibly we can do it with more efficient scans.

          Show
          Flavio Junqueira added a comment - one possible solution is using reference counting mechanism, each deleted ledger should have a list of bookie who owed it. each bookie delete its reference from the list when it finished garbage collecting that ledger. the last one who deleted reference, delete the ledger item from table. This is what I had in mind. even we have mechanism to remove items from delete table, but we still need SCAN to get items from deleted ledgers table. Agreed, I'm not saying that we don't need scans at all, but possibly we can do it with more efficient scans.
          Hide
          Ivan Kelly added a comment -

          Concerning scan, there definitely needs to be cursors to break this up into chunks, that is a certainty for scan. However, in the case of BookKeeper, it seems like the use case is simply for deleting ledgers that have been marked delete by Hedwig. In that case, couldn't we just use a simple "dequeue abstraction"? This would avoid the need for scan. Here's a sketch of how I envision this working: Each bookie has it's own "table" for this. This would let the trimmer do some nice scheduling of deletes that maps to the physical layer (we'd want to remove ledgers at just a little bit faster than they are marked delete - and perhaps there is a knob to make this go faster, in emergency situations when we need to recover space). The table for the bookie would have an expanding list (the dequeue) where entries are added at the tail, and removed from the head. There would be another table that would hold a reference to both the head and tail. These would be updated in a fashion such that any system crash simply results in a blank entry just after/below the head/tail. The entries would have a key "1", "2", ... and the value would be the ledger id. The trimmer would remove from the earliest entries (and update it's cursor), and you would add new mark deletes to the tail. You'd need to use CAS on the cursor update, and do things in such a way that failure doesn't strand an entry outside of the bounds of the head/tail cursor. There is wraparound to think of (tail cursor going too high). This must be a technique that already exists, no?

          This is adding another abstraction into the metastore interface which only makes sense for the bookkeeper delete ledger scenario. For hedwig this makes no sense. Moreover, Hedwig does require scan. To use a deque like this, it would be better to skip the metastore interface, and instead implement the LedgerManager interface directly.

          It could also create problems with consistency, if after taking from the queue, the deleter crashes. This could leave a ledger dangling, which we would still require garbage collection to make sure its cleaned up.

          Not having scan also has other implications for BK. For 4.2.0 we want to implement a "fsck" functionallity, which checks that each bookie contains every ledger entry it should. This requires that we be able to get a list of ledgers.

          Show
          Ivan Kelly added a comment - Concerning scan, there definitely needs to be cursors to break this up into chunks, that is a certainty for scan. However, in the case of BookKeeper, it seems like the use case is simply for deleting ledgers that have been marked delete by Hedwig. In that case, couldn't we just use a simple "dequeue abstraction"? This would avoid the need for scan. Here's a sketch of how I envision this working: Each bookie has it's own "table" for this. This would let the trimmer do some nice scheduling of deletes that maps to the physical layer (we'd want to remove ledgers at just a little bit faster than they are marked delete - and perhaps there is a knob to make this go faster, in emergency situations when we need to recover space). The table for the bookie would have an expanding list (the dequeue) where entries are added at the tail, and removed from the head. There would be another table that would hold a reference to both the head and tail. These would be updated in a fashion such that any system crash simply results in a blank entry just after/below the head/tail. The entries would have a key "1", "2", ... and the value would be the ledger id. The trimmer would remove from the earliest entries (and update it's cursor), and you would add new mark deletes to the tail. You'd need to use CAS on the cursor update, and do things in such a way that failure doesn't strand an entry outside of the bounds of the head/tail cursor. There is wraparound to think of (tail cursor going too high). This must be a technique that already exists, no? This is adding another abstraction into the metastore interface which only makes sense for the bookkeeper delete ledger scenario. For hedwig this makes no sense. Moreover, Hedwig does require scan. To use a deque like this, it would be better to skip the metastore interface, and instead implement the LedgerManager interface directly. It could also create problems with consistency, if after taking from the queue, the deleter crashes. This could leave a ledger dangling, which we would still require garbage collection to make sure its cleaned up. Not having scan also has other implications for BK. For 4.2.0 we want to implement a "fsck" functionallity, which checks that each bookie contains every ledger entry it should. This requires that we be able to get a list of ledgers.
          Hide
          Roger Bush added a comment -

          @ivan

          >> This is adding another abstraction into the metastore interface which only makes sense for the bookkeeper delete ledger scenario. For hedwig this makes no sense.

          I don't think I was being clear enough. Actually this is not what I was saying. My point was that ledger garbage collection can be implemented without scan, simplifying the kv API. Nor would I want to put the dequeue into the API (as you point out this makes no sense, and I agree). The dequeue is an application-specific implementation technique for implementing ledger deletion which doesn't rely on SCAN but only on get/set/delete/CAS.

          >> Moreover, Hedwig does require scan.

          Sure, but this could be handled by adding a scan interface. You'd have BK relying on the abstract interface that doesn't include scan, and Hedwig using the scan. BK would be ignorant of the fact that the implementation provides scan. Hedwig, since it needs it, would use it. If an underlying store had a natural SCAN, then the implementer would also implement the scan api (a single function), if it didn't, the implementer would not. Thus, you'd have the best of all worlds: BK can use a larger set of stores, Hedwig uses a smaller set of stores (with SCAN), and there is no additional work required (no duplication of code, or things that are almost the same - we just have a single, mixin scan interface).

          In other words the Metastore API can be thought of as representing the needs of the two separate applications BK and Hedwig. Since (if?) BK doesn't need scan, why require it? The scan would exclude many kv stores.

          There is a simple and elegant way to not have code duplication, and have BK have the API it needs (no scan) and Hedwig have the API it needs (+ scan). You simply need a mixin interface that has only the scan related API. Therefore hbase would implement scan, Hedwig would use scan and BK would not. A different store that had no ability to do scan could still be used for BK. Also, there is no cost to maintain or implement.

          >> Not having scan also has other implications for BK. For 4.2.0 we want to implement a "fsck" functionallity, which checks that each bookie contains every ledger entry it should. This requires that we be able to get a list of ledgers.

          It is not clear that our group will be able to use hbase. This is why I'm pushing to not require scan as part of the base BK requirements as it gives the largest choice of kv stores possible. fsck could still be implemented using scan, but perhaps as an external tool which needs scan (again this can use the mixin approach).

          Show
          Roger Bush added a comment - @ivan >> This is adding another abstraction into the metastore interface which only makes sense for the bookkeeper delete ledger scenario. For hedwig this makes no sense. I don't think I was being clear enough. Actually this is not what I was saying. My point was that ledger garbage collection can be implemented without scan, simplifying the kv API. Nor would I want to put the dequeue into the API (as you point out this makes no sense, and I agree). The dequeue is an application-specific implementation technique for implementing ledger deletion which doesn't rely on SCAN but only on get/set/delete/CAS. >> Moreover, Hedwig does require scan. Sure, but this could be handled by adding a scan interface. You'd have BK relying on the abstract interface that doesn't include scan, and Hedwig using the scan. BK would be ignorant of the fact that the implementation provides scan. Hedwig, since it needs it, would use it. If an underlying store had a natural SCAN, then the implementer would also implement the scan api (a single function), if it didn't, the implementer would not. Thus, you'd have the best of all worlds: BK can use a larger set of stores, Hedwig uses a smaller set of stores (with SCAN), and there is no additional work required (no duplication of code, or things that are almost the same - we just have a single, mixin scan interface). In other words the Metastore API can be thought of as representing the needs of the two separate applications BK and Hedwig. Since (if?) BK doesn't need scan, why require it? The scan would exclude many kv stores. There is a simple and elegant way to not have code duplication, and have BK have the API it needs (no scan) and Hedwig have the API it needs (+ scan). You simply need a mixin interface that has only the scan related API. Therefore hbase would implement scan, Hedwig would use scan and BK would not. A different store that had no ability to do scan could still be used for BK. Also, there is no cost to maintain or implement. >> Not having scan also has other implications for BK. For 4.2.0 we want to implement a "fsck" functionallity, which checks that each bookie contains every ledger entry it should. This requires that we be able to get a list of ledgers. It is not clear that our group will be able to use hbase. This is why I'm pushing to not require scan as part of the base BK requirements as it gives the largest choice of kv stores possible. fsck could still be implemented using scan, but perhaps as an external tool which needs scan (again this can use the mixin approach).
          Hide
          Roger Bush added a comment -

          @flavio

          >> I really like the idea of revisiting the way we garbage-collect ledgers. I'm not sure I fully grasp the dequeue idea, but I can see that there might be some simple, more efficient ways to do it without having to scan all ledger metadata; for example, by keeping a table of deleted ledgers.

          @sijie

          >> I had considered the idea to keeping a table of deleted ledgers, when I implemented metastore-based ledger manager before. It seems easy, but actually it doesn't. The critical problem is how to remove the items from deleted ledgers table. if we don't remove them, the table will grow and encounter the same issue as SCAN.

          Dequeue
          -------
          A dequeue is a queue that you can add at one end and remove from the other end. This is an abstract concept that we would implement using a table (the place where the entries in the queue go, which are ledgers to be deleted), and two "pointers" (one to the head where you add, and the tail from which you remove). For this to "work" it needs to be implementable using set/get/delete/CAS (and no scan), as well as showing how you do not get "dangling ledgers" since we have at least two separate things to update (head/tail pointer and list), and we can fail after the first is done.

          Basic representation works like this - we have a table for each bookie. This table represents the list of ledgers which may be deleted. We do this per-bookie to make things easy on ourselves (easy to balance the overall load on a per-bookie basis). The entries of each table are (k,v) where the key is an incrementing number starting from 1. The value is the ledger id (and anything else we want to put). Adding an entry at the head, requires reading the head pointer, updating it to make space and adding the entry at that key. Failure is handled handling the writes to head and tail in the right order. A failure will cause a "blank entry" (conceptually) within the boundaries of our head/tail pointers (which are just long integers). Adding to the head is done by reading the head pointer, incrementing, writing the new head pointer, and inserting the new ledger item at the previous pointer position. A failure will result in a "blank space". You can have multiple blank spaces (multiple failures). Deleting an item requires the reverse order of operations (delete the item first, then move the pointer). We are doing things in such a way that the list can contain blanks, but no ledger will fall outside the head/tail boundaries. The thing that deletes will have to know that a null item is OK (and silently skip it).

          This will keep the list trimmed and you won't run out of space. Further, it is a minimal list and you could do some nice things like make sure the list is trimmed at a certain rate, per bookie. I won't delve into those details now.

          That's how I'd do this without having scan for BK.

          What is not clear from this description is how to handle "wraparound". But I think this can be easily done (this is just the classic "dequeue abstraction"). I need to think a little and present this clearly.

          Also there may be some details that I'm not thinking about (yi is going to present some details for the SNV team on BK internals today, so maybe this will be clearer).

          Hopefully this is clear enough for someone to say why, from a BK standpoint, this wouldn't work or would work (if that is obvious at this point - it may not be).

          Show
          Roger Bush added a comment - @flavio >> I really like the idea of revisiting the way we garbage-collect ledgers. I'm not sure I fully grasp the dequeue idea, but I can see that there might be some simple, more efficient ways to do it without having to scan all ledger metadata; for example, by keeping a table of deleted ledgers. @sijie >> I had considered the idea to keeping a table of deleted ledgers, when I implemented metastore-based ledger manager before. It seems easy, but actually it doesn't. The critical problem is how to remove the items from deleted ledgers table. if we don't remove them, the table will grow and encounter the same issue as SCAN. Dequeue ------- A dequeue is a queue that you can add at one end and remove from the other end. This is an abstract concept that we would implement using a table (the place where the entries in the queue go, which are ledgers to be deleted), and two "pointers" (one to the head where you add, and the tail from which you remove). For this to "work" it needs to be implementable using set/get/delete/CAS (and no scan), as well as showing how you do not get "dangling ledgers" since we have at least two separate things to update (head/tail pointer and list), and we can fail after the first is done. Basic representation works like this - we have a table for each bookie. This table represents the list of ledgers which may be deleted. We do this per-bookie to make things easy on ourselves (easy to balance the overall load on a per-bookie basis). The entries of each table are (k,v) where the key is an incrementing number starting from 1. The value is the ledger id (and anything else we want to put). Adding an entry at the head, requires reading the head pointer, updating it to make space and adding the entry at that key. Failure is handled handling the writes to head and tail in the right order. A failure will cause a "blank entry" (conceptually) within the boundaries of our head/tail pointers (which are just long integers). Adding to the head is done by reading the head pointer, incrementing, writing the new head pointer, and inserting the new ledger item at the previous pointer position. A failure will result in a "blank space". You can have multiple blank spaces (multiple failures). Deleting an item requires the reverse order of operations (delete the item first, then move the pointer). We are doing things in such a way that the list can contain blanks, but no ledger will fall outside the head/tail boundaries. The thing that deletes will have to know that a null item is OK (and silently skip it). This will keep the list trimmed and you won't run out of space. Further, it is a minimal list and you could do some nice things like make sure the list is trimmed at a certain rate, per bookie. I won't delve into those details now. That's how I'd do this without having scan for BK. What is not clear from this description is how to handle "wraparound". But I think this can be easily done (this is just the classic "dequeue abstraction"). I need to think a little and present this clearly. Also there may be some details that I'm not thinking about (yi is going to present some details for the SNV team on BK internals today, so maybe this will be clearer). Hopefully this is clear enough for someone to say why, from a BK standpoint, this wouldn't work or would work (if that is obvious at this point - it may not be).
          Hide
          Roger Bush added a comment -

          @flavio

          We've done a first pass walkthrough of the BK code. In looking at the current BK code, and how ledgers are garbage collected, it looks like it does the following:

          1. The garbage collector gets a list of active nodes. The active nodes are determined by ephemeral nodes (one per reader on each ledger?).
          2. The garbage collector then gets all the nodes. If a node is not in the active list, it's corresponding ledger is deleted.

          Note that the dequeue idea is simply a way of having a deletion list without a scan. It does not solve the reference counting problem (something has to determine when something can be deleted and put it on the list). The only thing it does is decouples the determination of whether something can be deleted from the timing. A simpler model could, for example, immediately delete when the reference count goes to zero. So for now, let's table the dequeue idea, as there are more pressing issues to solve (if it's necessitated we can trot the idea out later).

          How would we go about replacing the current garbage collection scheme with something that uses the Metastore interface? As an aside, I don't think the above scheme scales since you'll have to continuously scan through millions of ledgers to find the few that can be reaped. There will be many calls to scan (returning X items at a time), which fail to bring back any ledgers to delete. Here's one measurement from "real-life": Tribble adds about 10 volumes (ledgers) per second. However there are something like 200,000 ledgers (volumes). So in steady state we are deleting 10 volumes per second out of 200,000 (we keep 1000 volumes of backlog per each of 200 logs). So for this use-case, we need to scan 200,000 records to find 10. I'd imagine SNP would be even worse than this (1M to find X ledgers).

          It would be more thrifty to keep a list of ledgers we can delete if we can discover those. It seems that ref counting might be a way to accomplish this.

          Could we talk a little bit about flavio's idea of "revisiting garbage collection"?

          Show
          Roger Bush added a comment - @flavio We've done a first pass walkthrough of the BK code. In looking at the current BK code, and how ledgers are garbage collected, it looks like it does the following: 1. The garbage collector gets a list of active nodes. The active nodes are determined by ephemeral nodes (one per reader on each ledger?). 2. The garbage collector then gets all the nodes. If a node is not in the active list, it's corresponding ledger is deleted. Note that the dequeue idea is simply a way of having a deletion list without a scan. It does not solve the reference counting problem (something has to determine when something can be deleted and put it on the list). The only thing it does is decouples the determination of whether something can be deleted from the timing. A simpler model could, for example, immediately delete when the reference count goes to zero. So for now, let's table the dequeue idea, as there are more pressing issues to solve (if it's necessitated we can trot the idea out later). How would we go about replacing the current garbage collection scheme with something that uses the Metastore interface? As an aside, I don't think the above scheme scales since you'll have to continuously scan through millions of ledgers to find the few that can be reaped. There will be many calls to scan (returning X items at a time), which fail to bring back any ledgers to delete. Here's one measurement from "real-life": Tribble adds about 10 volumes (ledgers) per second. However there are something like 200,000 ledgers (volumes). So in steady state we are deleting 10 volumes per second out of 200,000 (we keep 1000 volumes of backlog per each of 200 logs). So for this use-case, we need to scan 200,000 records to find 10. I'd imagine SNP would be even worse than this (1M to find X ledgers). It would be more thrifty to keep a list of ledgers we can delete if we can discover those. It seems that ref counting might be a way to accomplish this. Could we talk a little bit about flavio's idea of "revisiting garbage collection"?
          Hide
          Ivan Kelly added a comment -

          I don't think I was being clear enough. Actually this is not what I was saying. My point was that ledger garbage collection can be implemented without scan, simplifying the kv API. Nor would I want to put the dequeue into the API (as you point out this makes no sense, and I agree). The dequeue is an application-specific implementation technique for implementing ledger deletion which doesn't rely on SCAN but only on get/set/delete/CAS.

          My underlying point was, that if the metastore interface cannot be used, unmodified for bookkeeper and for hedwig, then motivation for metastore goes away. Sijie came up with metastore to have a means to scale hedwig+bk by implementing a single interface. What you propose makes this no longer the case. If the plugin developer has to implement the metastore and the metastore+scan interfaces, they may as well be implementing the LedgerManager(bk) and the TopicManager(hw) interfaces.

          Whats more, if scan isn't available, it will be possible to use a deque for GC on BK, but this would be more comfortably done at the level of LedgerManager. If it is done at the level of metastore, then the MetaStoreLedgerManager will need to implement using deque. If any implementation of metastore does support scan(such as HBase or an SQL backend), it can no longer be used, as MetaStoreLedgerManager uses the deque.

          Show
          Ivan Kelly added a comment - I don't think I was being clear enough. Actually this is not what I was saying. My point was that ledger garbage collection can be implemented without scan, simplifying the kv API. Nor would I want to put the dequeue into the API (as you point out this makes no sense, and I agree). The dequeue is an application-specific implementation technique for implementing ledger deletion which doesn't rely on SCAN but only on get/set/delete/CAS. My underlying point was, that if the metastore interface cannot be used, unmodified for bookkeeper and for hedwig, then motivation for metastore goes away. Sijie came up with metastore to have a means to scale hedwig+bk by implementing a single interface. What you propose makes this no longer the case. If the plugin developer has to implement the metastore and the metastore+scan interfaces, they may as well be implementing the LedgerManager(bk) and the TopicManager(hw) interfaces. Whats more, if scan isn't available, it will be possible to use a deque for GC on BK, but this would be more comfortably done at the level of LedgerManager. If it is done at the level of metastore, then the MetaStoreLedgerManager will need to implement using deque. If any implementation of metastore does support scan(such as HBase or an SQL backend), it can no longer be used, as MetaStoreLedgerManager uses the deque.
          Hide
          Flavio Junqueira added a comment -

          @roger It seems that there are two separate concerns with respect to scalability, one that originally generated this issue and another that you're raising about scans. Both sound important.

          Our initial concern was about the amount of state a zookeeper ensemble can handle. A zookeeper server keeps its state in memory, so all zookeeper state must fit into the memory of a single server. For an application with a large number of ledgers (tens to hundreds of millions), we can't use zookeeper. Consequently, we proposed to enable options that can have essentially an unbounded amount of storage.

          The concern you're raising about the cost of scans for garbage-collection purposes growing with the number of ledgers is valid, and it sounds like a good idea to rethink the way we are garbage-collecting ledgers in bookies.

          Show
          Flavio Junqueira added a comment - @roger It seems that there are two separate concerns with respect to scalability, one that originally generated this issue and another that you're raising about scans. Both sound important. Our initial concern was about the amount of state a zookeeper ensemble can handle. A zookeeper server keeps its state in memory, so all zookeeper state must fit into the memory of a single server. For an application with a large number of ledgers (tens to hundreds of millions), we can't use zookeeper. Consequently, we proposed to enable options that can have essentially an unbounded amount of storage. The concern you're raising about the cost of scans for garbage-collection purposes growing with the number of ledgers is valid, and it sounds like a good idea to rethink the way we are garbage-collecting ledgers in bookies.
          Hide
          Matteo Merli added a comment -

          We've been going through the delete ledger operation

          Deletion sequence

          1. Client calls bookkeeperClient.deleteLedger(id)
            1. We know the N bookies that have this ledger (bk1, ..., bkn)
            2. Add id to the list of to_delete ledgers for bk1
            3. ...
            4. Add id to the list of to_delete ledgers for bkn
            5. Remove ledger id from main list (eg. '/ledgers/Lxxxxxx')
            6. Return success to the client
          2. Each bookie, in the GC thread will scan its own list of ledgers from the to_delete queue and remove them from the list when the deletion has been finalized.

          Handling failures

          When the client deletes a ledger, it'd have to do N+1 separated steps. Each of them can potentially fail, and the client will receive an exception in this case.

          If the client receives an exception, it has to assume that the ledger has not been deleted properly and retry the deleteLedger() call, although the operation may have actually succeeded (or partially succeeded). In the subsequent deleteLedger() call, the same steps will re-executed.

          The worst-case scenario is that a bookie could see the same ledger more than once in its own to_delete queue, and hence try to delete it twice, but this should be easy to handle.

          Show
          Matteo Merli added a comment - We've been going through the delete ledger operation Deletion sequence Client calls bookkeeperClient.deleteLedger(id) We know the N bookies that have this ledger (bk1, ..., bkn) Add id to the list of to_delete ledgers for bk1 ... Add id to the list of to_delete ledgers for bkn Remove ledger id from main list (eg. '/ledgers/Lxxxxxx') Return success to the client Each bookie, in the GC thread will scan its own list of ledgers from the to_delete queue and remove them from the list when the deletion has been finalized. Handling failures When the client deletes a ledger, it'd have to do N+1 separated steps. Each of them can potentially fail, and the client will receive an exception in this case. If the client receives an exception, it has to assume that the ledger has not been deleted properly and retry the deleteLedger() call, although the operation may have actually succeeded (or partially succeeded). In the subsequent deleteLedger() call, the same steps will re-executed. The worst-case scenario is that a bookie could see the same ledger more than once in its own to_delete queue, and hence try to delete it twice, but this should be easy to handle.
          Hide
          Sijie Guo added a comment -

          Thanks Matteo providing such interesting idea to improve garbage collection. Just has one concern about this solution, it may introduce inconsistency between those deleted ledgers which might make deleteLedger confusing.

          for example, ledger l has bookie list (bk1, bk2, bk3). application called deleteLedger(l), it added l to bk1's deleted ledger list, but failed to add l to bk2's deleted ledger list. then application receive an exception, which means the deletion failed.

          if application doesn't deleteLedger(l) again, which means ledger l is not deleted. but the data of ledger l on bk1 might be garbage collected, which might cause data loss.

          Show
          Sijie Guo added a comment - Thanks Matteo providing such interesting idea to improve garbage collection. Just has one concern about this solution, it may introduce inconsistency between those deleted ledgers which might make deleteLedger confusing. for example, ledger l has bookie list (bk1, bk2, bk3). application called deleteLedger(l), it added l to bk1's deleted ledger list, but failed to add l to bk2's deleted ledger list. then application receive an exception, which means the deletion failed. if application doesn't deleteLedger(l) again, which means ledger l is not deleted. but the data of ledger l on bk1 might be garbage collected, which might cause data loss.
          Hide
          Matteo Merli added a comment -

          Yes, you're right, with the above scheme you can end with half-deleted ledgers. I think it could be fixed by adding another step to the sequence. Although I would not call this a data loss, since the client explicitly asked for deletion, meaning the data is no longer relevant.

          Anyway, the client must keep track of its own ledger somewhere, and it's its responsibility to make sure the ledgers are deleted (and to retry when the deleteLedger() fails). Otherwise, even with the current implemented solution, there is a resource leak in the system, with unused dangling ledgers that stick around forever.

          Show
          Matteo Merli added a comment - Yes, you're right, with the above scheme you can end with half-deleted ledgers. I think it could be fixed by adding another step to the sequence. Although I would not call this a data loss , since the client explicitly asked for deletion, meaning the data is no longer relevant. Anyway, the client must keep track of its own ledger somewhere, and it's its responsibility to make sure the ledgers are deleted (and to retry when the deleteLedger() fails). Otherwise, even with the current implemented solution, there is a resource leak in the system, with unused dangling ledgers that stick around forever.
          Hide
          Roger Bush added a comment -

          @ivank - just to resolve for this thread, I realized scan is necessary for any administrative functions. Since we want to add administrative functionality for BK, it makes sense to have this (you would add it eventually anyway). So I think this is a non-issue at this point.

          Show
          Roger Bush added a comment - @ivank - just to resolve for this thread, I realized scan is necessary for any administrative functions. Since we want to add administrative functionality for BK, it makes sense to have this (you would add it eventually anyway). So I think this is a non-issue at this point.
          Hide
          Roger Bush added a comment -

          @matteo, @sijie, @flavio - concerning garbage collection: There is always the possibility that a newly created ledger does not get recorded by the client, and is thus left dangling. There is no ability to solve this using the API, so from a manageability standpoint, there must be a way of finding these empty, orphaned ledgers. We wouldn't expect many of these, but we'd need to periodically clean them up. I'm not sure of the implementation details (or perhaps you already do this in BK).

          Show
          Roger Bush added a comment - @matteo, @sijie, @flavio - concerning garbage collection: There is always the possibility that a newly created ledger does not get recorded by the client, and is thus left dangling. There is no ability to solve this using the API, so from a manageability standpoint, there must be a way of finding these empty, orphaned ledgers. We wouldn't expect many of these, but we'd need to periodically clean them up. I'm not sure of the implementation details (or perhaps you already do this in BK).
          Hide
          Roger Bush added a comment -

          I added scan and appropriate test code and Mock implementation.

          I also made the changes ivank recommended concerning cleaning things up a bit (renamed Versioned => VersionedValue, made some things inner interfaces/enums).

          What still remains:

          1. Add license header (I'll do today).
          2. String versus byte [] - wondering how ugly this will make the code, so I was holding off.
          3. Convert to async versions, and have a sync adapter so MainTest can be written in synchronous fashion. Adapter can just be a test utility.
          4. Metadata Qualifying Tests – should be possible to write a simple metadata acceptance test for functional correctness. Would handle corner cases and check invariants using the abstract interfaces. Could be done later.

          Seems like 1-3 have to be done before it's ready to use.

          I'm guessing integration into the existing codebase is simply a matter of putting com.yahoo.bookkeeper.metadata.plugin into it's own top-level-directory

          https://github.com/rogerbush8/BookKeeperMetadataPlugin

          Show
          Roger Bush added a comment - I added scan and appropriate test code and Mock implementation. I also made the changes ivank recommended concerning cleaning things up a bit (renamed Versioned => VersionedValue, made some things inner interfaces/enums). What still remains: 1. Add license header (I'll do today). 2. String versus byte [] - wondering how ugly this will make the code, so I was holding off. 3. Convert to async versions, and have a sync adapter so MainTest can be written in synchronous fashion. Adapter can just be a test utility. 4. Metadata Qualifying Tests – should be possible to write a simple metadata acceptance test for functional correctness. Would handle corner cases and check invariants using the abstract interfaces. Could be done later. Seems like 1-3 have to be done before it's ready to use. I'm guessing integration into the existing codebase is simply a matter of putting com.yahoo.bookkeeper.metadata.plugin into it's own top-level-directory https://github.com/rogerbush8/BookKeeperMetadataPlugin
          Hide
          Flavio Junqueira added a comment -

          Hi Roger.

          There is always the possibility that a newly created ledger does not get recorded by the client, and is thus left dangling.

          As part of a create ledger call, we currently create a metadata znode for the ledger. Consequently, there is a record of the ledger being created even before the client starts writing to it. If no client ever deletes that ledger is a separate issue.

          Show
          Flavio Junqueira added a comment - Hi Roger. There is always the possibility that a newly created ledger does not get recorded by the client, and is thus left dangling. As part of a create ledger call, we currently create a metadata znode for the ledger. Consequently, there is a record of the ledger being created even before the client starts writing to it. If no client ever deletes that ledger is a separate issue.
          Show
          Roger Bush added a comment - Whoops, should be supplying versioned URLs: https://github.com/rogerbush8/BookKeeperMetadataPlugin/commit/7d3c55fc9e3b1a6ae08ba60b2a62a050aff40087
          Hide
          Roger Bush added a comment -

          @sijie - If you failed to delete it from a bookie, then it is still the client's responsibility to call deleteLedger until it succeeds. Once it's being deleted, it isn't going to be used anyway. You could always return an exception that said it was partially deleted and can be retried (although I'm not sure how this information would be used).

          But at the end of the day, once the application decides it's not going to be used, the ledger will be deleted until the delete succeeds. What is important is the delete must be restartable.

          So I'm wondering if this inconsistency matters, or if it's an inconsistent state that doesn't matter.

          Show
          Roger Bush added a comment - @sijie - If you failed to delete it from a bookie, then it is still the client's responsibility to call deleteLedger until it succeeds. Once it's being deleted, it isn't going to be used anyway. You could always return an exception that said it was partially deleted and can be retried (although I'm not sure how this information would be used). But at the end of the day, once the application decides it's not going to be used, the ledger will be deleted until the delete succeeds. What is important is the delete must be restartable. So I'm wondering if this inconsistency matters, or if it's an inconsistent state that doesn't matter.
          Hide
          Sijie Guo added a comment -

          > So I'm wondering if this inconsistency matters, or if it's an inconsistent state that doesn't matter.

          Currently I don't have any user case to say it matters or not. Just raise my concern about the behavior of deleting ledger which might make user confused.

          Show
          Sijie Guo added a comment - > So I'm wondering if this inconsistency matters, or if it's an inconsistent state that doesn't matter. Currently I don't have any user case to say it matters or not. Just raise my concern about the behavior of deleting ledger which might make user confused.
          Hide
          Roger Bush added a comment -

          Just a thought on "autoincrement", I think this should be omitted. This is a special case of creating a unique id. Any of these could be implemented using client driven CAS. Collisions/retries on this should be low/reasonable both for bk and for hedwig, I would think.

          Show
          Roger Bush added a comment - Just a thought on "autoincrement", I think this should be omitted. This is a special case of creating a unique id. Any of these could be implemented using client driven CAS. Collisions/retries on this should be low/reasonable both for bk and for hedwig, I would think.
          Hide
          Sijie Guo added a comment -

          @Roger

          If there is autoincrement provided by backend meta store, it would better to use it. I am not sure client-driven CAS could do it perfectly.

          Show
          Sijie Guo added a comment - @Roger If there is autoincrement provided by backend meta store, it would better to use it. I am not sure client-driven CAS could do it perfectly.
          Hide
          Flavio Junqueira added a comment -

          @Sijie With CAS ,one can read, increment, and replace if the value is still the same. Otherwise the operation fails and the client needs to try again. There is the potential of starvation (a client continuously tries and fails), but I suppose we can consider that it is small.

          @All I'm not entirely convinced that the identification of ledgers should be delegated to the metadata store. I agree that we can't keep using sequential znodes as we are currently doing for a large deployment, but we could introduce for example the notion of namespaces and have a counter for each namespace. The value of the counter for each namespace is kept by zookeeper, and we use one znode per namespace. This solution only works if the number of namespaces is not large.

          Other people have mentioned namespaces, so it might be an interesting feature to consider.

          Show
          Flavio Junqueira added a comment - @Sijie With CAS ,one can read, increment, and replace if the value is still the same. Otherwise the operation fails and the client needs to try again. There is the potential of starvation (a client continuously tries and fails), but I suppose we can consider that it is small. @All I'm not entirely convinced that the identification of ledgers should be delegated to the metadata store. I agree that we can't keep using sequential znodes as we are currently doing for a large deployment, but we could introduce for example the notion of namespaces and have a counter for each namespace. The value of the counter for each namespace is kept by zookeeper, and we use one znode per namespace. This solution only works if the number of namespaces is not large. Other people have mentioned namespaces, so it might be an interesting feature to consider.
          Hide
          Sijie Guo added a comment -

          @Flavio, I know that CAS could do it by retrying when failure happened. but if a metastore has auto-increment support, why not use it? if a metastore doens't support auto-increment, we could fall back to use CAS.

          > I'm not entirely convinced that the identification of ledgers should be delegated to the metadata store.

          yes. we don't need to delegate to metastore. just what HierarchicalLedgerManager did, we could still using zookeeper as id generator.

          Also, introducing counter per namespace is very awesome. I like the idea.

          Show
          Sijie Guo added a comment - @Flavio, I know that CAS could do it by retrying when failure happened. but if a metastore has auto-increment support, why not use it? if a metastore doens't support auto-increment, we could fall back to use CAS. > I'm not entirely convinced that the identification of ledgers should be delegated to the metadata store. yes. we don't need to delegate to metastore. just what HierarchicalLedgerManager did, we could still using zookeeper as id generator. Also, introducing counter per namespace is very awesome. I like the idea.
          Hide
          Flavio Junqueira added a comment -

          Hi Sijie, This umbrella issue is marked for 4.2.0, but I can see a number of open jiras here. Are we really going to be able to wrap up all jiras for 4.2.0? If not, which seems to be the case, what are the ones we can bump to 4.3.0?

          Show
          Flavio Junqueira added a comment - Hi Sijie, This umbrella issue is marked for 4.2.0, but I can see a number of open jiras here. Are we really going to be able to wrap up all jiras for 4.2.0? If not, which seems to be the case, what are the ones we can bump to 4.3.0?
          Hide
          Sijie Guo added a comment -

          Flavio Junqueira for some issues, jiannan already has the patches, I would suggest them to be in 4.2.0. for those two documentation jiras, I would take them and add documentations. for other jiras, I would move out from this parent jira and mark them to be fixed in 4.3.0.

          Show
          Sijie Guo added a comment - Flavio Junqueira for some issues, jiannan already has the patches, I would suggest them to be in 4.2.0. for those two documentation jiras, I would take them and add documentations. for other jiras, I would move out from this parent jira and mark them to be fixed in 4.3.0.
          Hide
          Flavio Junqueira added a comment -

          My understanding of the status of this work is:

          1. Tasks 3 and 7 have patches, but there has been comments and Jiannan Wang said he will be updating the patches shortly.
          2. Task 4 is included in 7.
          3. Tasks 9, 10, and 13 are documentation tasks and Sijie Guo is working on them.
          Show
          Flavio Junqueira added a comment - My understanding of the status of this work is: Tasks 3 and 7 have patches, but there has been comments and Jiannan Wang said he will be updating the patches shortly. Task 4 is included in 7. Tasks 9, 10, and 13 are documentation tasks and Sijie Guo is working on them.
          Hide
          Sijie Guo added a comment -

          the patches are ready for task 10 and 13. after they are committed, this jira could be resolved as 'implemented'.

          Show
          Sijie Guo added a comment - the patches are ready for task 10 and 13. after they are committed, this jira could be resolved as 'implemented'.
          Hide
          Sijie Guo added a comment -

          marked it as implemented, since all sub tasks are done. thanks Fangmin Lv Jiannan Wang Ivan Kelly Flavio Junqueira 's works on it.

          Show
          Sijie Guo added a comment - marked it as implemented, since all sub tasks are done. thanks Fangmin Lv Jiannan Wang Ivan Kelly Flavio Junqueira 's works on it.

            People

            • Assignee:
              Sijie Guo
              Reporter:
              Sijie Guo
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development