Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-10237

CFS.loadNewSSTables() broken for pre-3.0 sstables

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Normal
    • Resolution: Fixed
    • Fix Version/s: 3.0.0 rc1
    • Component/s: None
    • Labels:
      None
    • Severity:
      Normal

      Description

      While working on CASSANDRA-10236 I discovered that CFS.loadNewSSTables() doesn't work for pre-3.0 sstables - just for version ma sstables.

      TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new sstables during runtime doesn't.

      Issues with CFS.loadNewSSTables() discovered so far:

      1. MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet) returns null for MetadataType.HEADER which results in a NPE later in MetadataSerializer.serialize executing Collections.sort.
      2. After working around the previous issue, it turns out that it couldn't load the digest file, since Component.DIGEST is a singleton which refers to CRC32, but pre-3.0 sstables use Adler32.
      3. After working around that one, it fails in StreamingHistogram$StreamingHistogramSerializer.deserialize as maxBinSize==Integer.MAX_VALUE.

      As loading legacy sstables works fine during startup, I assume my workarounds are not correct.

      For reference, this commit contains a ton of legacy sstables (simple, counter, clustered and clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to read these tables using CFS.loadNewSSTables().

      LegacySSTablesTest.txt
      diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
      index d2922cc..1be6450 100644
      --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
      +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
      @@ -18,6 +18,9 @@
       package org.apache.cassandra.io.sstable;
       
       import java.io.File;
      +import java.io.FileInputStream;
      +import java.io.FileOutputStream;
      +import java.io.IOException;
       import java.nio.ByteBuffer;
       import java.util.ArrayList;
       import java.util.HashSet;
      @@ -27,10 +30,15 @@ import java.util.Set;
       import org.junit.BeforeClass;
       import org.junit.Test;
       
      +import org.slf4j.Logger;
      +import org.slf4j.LoggerFactory;
      +
       import org.apache.cassandra.SchemaLoader;
       import org.apache.cassandra.Util;
       import org.apache.cassandra.config.CFMetaData;
      +import org.apache.cassandra.cql3.QueryProcessor;
       import org.apache.cassandra.db.ColumnFamilyStore;
      +import org.apache.cassandra.db.ConsistencyLevel;
       import org.apache.cassandra.db.DeletionTime;
       import org.apache.cassandra.db.Keyspace;
       import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
      @@ -43,6 +51,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
       import org.apache.cassandra.io.sstable.format.SSTableFormat;
       import org.apache.cassandra.io.sstable.format.SSTableReader;
       import org.apache.cassandra.io.sstable.format.Version;
      +import org.apache.cassandra.io.sstable.format.big.BigFormat;
       import org.apache.cassandra.schema.KeyspaceParams;
       import org.apache.cassandra.service.StorageService;
       import org.apache.cassandra.streaming.StreamPlan;
      @@ -57,6 +66,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
        */
       public class LegacySSTableTest
       {
      +    private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
      +
           public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
           public static final String KSNAME = "Keyspace1";
           public static final String CFNAME = "Standard1";
      @@ -64,6 +75,8 @@ public class LegacySSTableTest
           public static Set<String> TEST_DATA;
           public static File LEGACY_SSTABLE_ROOT;
       
      +    public static final String[] legacyVersions = {"jb", "ka", "la"};
      +
           @BeforeClass
           public static void defineSchema() throws ConfigurationException
           {
      @@ -208,4 +221,65 @@ public class LegacySSTableTest
                   throw e;
               }
           }
      +
      +    @Test
      +    public void testLegacyCqlTables() throws Exception
      +    {
      +        QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
      +
      +        loadLegacyTables();
      +    }
      +
      +    private void loadLegacyTables() throws IOException
      +    {
      +        for (String legacyVersion : legacyVersions)
      +        {
      +            logger.info("Preparing legacy version {}", legacyVersion);
      +
      +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
      +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
      +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
      +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
      +
      +            loadLegacyTable("legacy_%s_simple", legacyVersion);
      +            loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
      +            loadLegacyTable("legacy_%s_clust", legacyVersion);
      +            loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
      +
      +        }
      +    }
      +
      +    private void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
      +    {
      +        String table = String.format(tablePattern, legacyVersion);
      +
      +        logger.info("Loading legacy table {}", table);
      +
      +        ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table);
      +
      +        for (File cfDir : cfs.getDirectories().getCFDirectories())
      +        {
      +            copySstables(legacyVersion, table, cfDir);
      +        }
      +
      +        cfs.loadNewSSTables();
      +    }
      +
      +    private static void copySstables(String legacyVersion, String table, File cfDir) throws IOException
      +    {
      +        byte[] buf = new byte[65536];
      +
      +        for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
      +        {
      +            if (file.isFile())
      +            {
      +                File target = new File(cfDir, file.getName());
      +                int rd;
      +                FileInputStream is = new FileInputStream(file);
      +                FileOutputStream os = new FileOutputStream(target);
      +                while ((rd = is.read(buf)) >= 0)
      +                    os.write(buf, 0, rd);
      +            }
      +        }
      +    }
       }
      
      broken-workaround
      diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
      index 554c782..e953b1d 100644
      --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
      +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
      @@ -96,7 +96,7 @@ public class Verifier implements Closeable
               {
                   validator = null;
       
      -            if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
      +            if (new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists())
                   {
                       validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
                       validator.validate();
      diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
      index 54dd35b..d0405e4 100644
      --- a/src/java/org/apache/cassandra/io/sstable/Component.java
      +++ b/src/java/org/apache/cassandra/io/sstable/Component.java
      @@ -34,6 +34,7 @@ public class Component
           public static final char separator = '-';
       
           final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
      +
           public enum Type
           {
               // the base data for an sstable: the remaining components can be regenerated
      @@ -79,13 +80,17 @@ public class Component
               }
           }
       
      +    private static final String DIGEST_CRC32_NAME = "Digest.crc32";
      +    private static final String DIGEST_ADLER32_NAME = "Digest.adler32";
      +
           // singleton components for types that don't need ids
           public final static Component DATA = new Component(Type.DATA);
           public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
           public final static Component FILTER = new Component(Type.FILTER);
           public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
           public final static Component STATS = new Component(Type.STATS);
      -    public final static Component DIGEST = new Component(Type.DIGEST);
      +    public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, DIGEST_CRC32_NAME);
      +    public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, DIGEST_ADLER32_NAME);
           public final static Component CRC = new Component(Type.CRC);
           public final static Component SUMMARY = new Component(Type.SUMMARY);
           public final static Component TOC = new Component(Type.TOC);
      @@ -138,11 +143,23 @@ public class Component
                   case FILTER:            component = Component.FILTER;                       break;
                   case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
                   case STATS:             component = Component.STATS;                        break;
      -            case DIGEST:            component = Component.DIGEST;                       break;
                   case CRC:               component = Component.CRC;                          break;
                   case SUMMARY:           component = Component.SUMMARY;                      break;
                   case TOC:               component = Component.TOC;                          break;
                   case CUSTOM:            component = new Component(Type.CUSTOM, path.right); break;
      +            case DIGEST:
      +                switch (path.right)
      +                {
      +                    case DIGEST_CRC32_NAME:
      +                        component = Component.DIGEST_CRC32;
      +                        break;
      +                    case DIGEST_ADLER32_NAME:
      +                        component = Component.DIGEST_ADLER32;
      +                        break;
      +                    default:
      +                        throw new IllegalStateException();
      +                }
      +                break;
                   default:
                        throw new IllegalStateException();
               }
      diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
      index 38829df..0db6f00 100644
      --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
      +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
      @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
       import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
       import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
       import org.apache.cassandra.utils.Pair;
      +import org.apache.hadoop.mapred.JobTracker;
       
       import static org.apache.cassandra.io.sstable.Component.separator;
       
      @@ -344,4 +345,16 @@ public class Descriptor
           {
               return hashCode;
           }
      +
      +    public Component digestComponent()
      +    {
      +        switch (version.compressedChecksumType())
      +        {
      +            case Adler32:
      +                return Component.DIGEST_ADLER32;
      +            case CRC32:
      +                return Component.DIGEST_CRC32;
      +        }
      +        throw new IllegalStateException();
      +    }
       }
      diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
      index bd21536..74e4b56 100644
      --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
      +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
      @@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                       Component.STATS,
                       Component.SUMMARY,
                       Component.TOC,
      -                Component.DIGEST));
      +                Component.DIGEST_CRC32));
       
               if (metadata.params.bloomFilterFpChance < 1.0)
                   components.add(Component.FILTER);
      diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
      index 9a5eae8..a40c37a 100644
      --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
      +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
      @@ -122,7 +122,12 @@ public class MetadataSerializer implements IMetadataSerializer
                       in.seek(offset);
                       component = type.serializer.deserialize(descriptor.version, in);
                   }
      -            components.put(type, component);
      +            if (component == null)
      +            {
      +                assert type != MetadataType.HEADER || !descriptor.version.storeRows();
      +            }
      +            else
      +                components.put(type, component);
               }
               return components;
           }
      diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
      index 70cd860..b88f4f2 100644
      --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
      +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
      @@ -109,7 +109,7 @@ public class DataIntegrityMetadata
               {
                   this.descriptor = descriptor;
                   checksum = descriptor.version.uncompressedChecksumType().newInstance();
      -            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
      +            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(descriptor.digestComponent())));
                   dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
                   try
                   {
      @@ -211,7 +211,7 @@ public class DataIntegrityMetadata
       
               public void writeFullChecksum(Descriptor descriptor)
               {
      -            File outFile = new File(descriptor.filenameFor(Component.DIGEST));
      +            File outFile = new File(descriptor.filenameFor(descriptor.digestComponent()));
                   try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
                   {
                       out.write(String.valueOf(fullChecksum.getValue()));
      diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
      index 920eee0..1420cae 100644
      --- a/src/java/org/apache/cassandra/io/util/FileUtils.java
      +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
      @@ -173,7 +173,7 @@ public class FileUtils
       
           public static void renameWithConfirm(File from, File to)
           {
      -        assert from.exists();
      +        assert from.exists() : String.format("File to rename does not exist: %s", from.getPath());
               if (logger.isDebugEnabled())
                   logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
               // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
      diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
      index 13ce0c1..0233169 100644
      --- a/test/unit/org/apache/cassandra/db/VerifyTest.java
      +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
      @@ -275,11 +275,11 @@ public class VerifyTest
               SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
       
       
      -        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw");
      +        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), "rw");
               Long correctChecksum = Long.parseLong(file.readLine());
               file.close();
       
      -        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST));
      +        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
       
               try (Verifier verifier = new Verifier(cfs, sstable, false))
               {
      @@ -315,7 +315,7 @@ public class VerifyTest
               file.close();
       
               // Update the Digest to have the right Checksum
      -        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST));
      +        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
       
               try (Verifier verifier = new Verifier(cfs, sstable, false))
               {
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                snazy Robert Stupp
                Reporter:
                snazy Robert Stupp
                Authors:
                Robert Stupp
                Reviewers:
                Blake Eggleston
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: