Description
Motivation:
We have ItSqlLogicTest which performs user load with creation and desctruction of the tables, and need to have background load on meta storage to be able to refresh leases, but seems that this additional load causes weird behavior of meta storage's underlying rocksdb storage, which leads to timeouts on DDL operations. Actually, there are too long range scans in rocksdb storage, which don't correlate with number of entries (including tombstones) and amount of lease data written in background.
It can be reproduced on RocksDBKeyValueStorage.
Prerequisites:
RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random tombstones, 100 values with prefix "tables", 5000 tombstones with prefix "tables".
Load profile:
- Thread1: performs storage.invoke with the same key and value of size 500k bytes, once per 100 ms.
- Thread2: performs storage.range for prefix "tables" once per 200 ms, collects all entries from cursor.
- Thread3: performs storage.get with random key once per 3 ms.
Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes (after 5+ minutes of test running) this time spikes up to hundreds of milliseconds (or even seconds): and this lasts for some time (I observed up to half of a minute), after that the time returns to normal values:
2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size 100 2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size 100 2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size 100 2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size 100 2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size 100 2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size 100 2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size 100 2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size 100 2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size 100 2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size 100 2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size 100 2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size 100
On teamcity it was even over 6 seconds:
Reproducer:
package org.apache.ignite.internal.metastorage; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; import java.io.ObjectStreamException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.ConditionType; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.server.AndCondition; import org.apache.ignite.internal.metastorage.server.Condition; import org.apache.ignite.internal.metastorage.server.ExistenceCondition; import org.apache.ignite.internal.metastorage.server.OrCondition; import org.apache.ignite.internal.metastorage.server.RevisionCondition; import org.apache.ignite.internal.metastorage.server.TombstoneCondition; import org.apache.ignite.internal.metastorage.server.ValueCondition; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.ByteArray; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) public class RocksDBLoadTest { private static final IgniteLogger LOG = Loggers.forClass(RocksDBLoadTest.class); private byte[] randomBytes() { return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); } private byte[] randomBytes(String prefix) { return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8); } /** * Increments the last character of the given string. */ private static String incrementLastChar(String str) { char lastChar = str.charAt(str.length() - 1); return str.substring(0, str.length() - 1) + (char) (lastChar + 1); } @Test public void test(@WorkDirectory Path path) throws InterruptedException { System.out.println("start"); HybridClock clock = new HybridClockImpl(); RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd", path.resolve("rocksdbtest")); storage.start(); for (int i = 0; i < 1000; i++) { storage.put(randomBytes(), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes(), TOMBSTONE, clock.now()); } for (int i = 0; i < 100; i++) { storage.put(randomBytes("tables"), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes("tables"), TOMBSTONE, clock.now()); } ByteArray leaseKey = ByteArray.fromString("placementdriver.leases"); AtomicBoolean leasesStopped = new AtomicBoolean(); AtomicBoolean rangeStopped = new AtomicBoolean(); Thread leases = new Thread(() -> { byte[] leaseRaw = new byte[500_000]; byte a = 0; while (!leasesStopped.get()) { byte[] renewedLease = new byte[500_000]; renewedLease[0] = ++a; storage.invoke( toCondition(or(notExists(leaseKey), value(leaseKey).eq(leaseRaw))), List.of(put(leaseKey, renewedLease)), List.of(noop()), clock.now() ); leaseRaw = renewedLease; try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); leases.start(); Thread range = new Thread(() -> { while (!rangeStopped.get()) { long start = System.currentTimeMillis(); Cursor<Entry> cursor = storage.range("tables".getBytes(StandardCharsets.UTF_8), incrementLastChar("tables").getBytes(StandardCharsets.UTF_8)); List<Object> list = new ArrayList<>(); for(Entry e : cursor) { if (!e.tombstone()) { list.add(e.value()); } } LOG.info("time " + (System.currentTimeMillis() - start) + ", size " + list.size()); cursor.close(); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); range.start(); for (int i = 0; i < 180_000; i++) { storage.get(randomBytes()); Thread.sleep(3); } leasesStopped.set(true); rangeStopped.set(true); leases.join(); range.join(); } private static Condition toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) { if (condition instanceof SimpleCondition.ValueCondition) { var valueCondition = (SimpleCondition.ValueCondition) condition; return new ValueCondition( toValueConditionType(valueCondition.type()), valueCondition.key(), valueCondition.value() ); } else if (condition instanceof SimpleCondition.RevisionCondition) { var revisionCondition = (SimpleCondition.RevisionCondition) condition; return new RevisionCondition( toRevisionConditionType(revisionCondition.type()), revisionCondition.key(), revisionCondition.revision() ); } else if (condition instanceof SimpleCondition) { var simpleCondition = (SimpleCondition) condition; switch (simpleCondition.type()) { case KEY_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key()); case KEY_NOT_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key()); case TOMBSTONE: return new TombstoneCondition(simpleCondition.key()); default: throw new IllegalArgumentException("Unexpected simple condition type " + simpleCondition.type()); } } else if (condition instanceof CompoundCondition) { CompoundCondition compoundCondition = (CompoundCondition) condition; Condition leftCondition = toCondition(compoundCondition.leftCondition()); Condition rightCondition = toCondition(compoundCondition.rightCondition()); switch (compoundCondition.type()) { case AND: return new AndCondition(leftCondition, rightCondition); case OR: return new OrCondition(leftCondition, rightCondition); default: throw new IllegalArgumentException("Unexpected compound condition type " + compoundCondition.type()); } } else { throw new IllegalArgumentException("Unknown condition " + condition); } } private static ValueCondition.Type toValueConditionType(ConditionType type) { switch (type) { case VAL_EQUAL: return ValueCondition.Type.EQUAL; case VAL_NOT_EQUAL: return ValueCondition.Type.NOT_EQUAL; case VAL_GREATER: return ValueCondition.Type.GREATER; case VAL_GREATER_OR_EQUAL: return ValueCondition.Type.GREATER_OR_EQUAL; case VAL_LESS: return ValueCondition.Type.LESS; case VAL_LESS_OR_EQUAL: return ValueCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected value condition type " + type); } } private static RevisionCondition.Type toRevisionConditionType(ConditionType type) { switch (type) { case REV_EQUAL: return RevisionCondition.Type.EQUAL; case REV_NOT_EQUAL: return RevisionCondition.Type.NOT_EQUAL; case REV_GREATER: return RevisionCondition.Type.GREATER; case REV_GREATER_OR_EQUAL: return RevisionCondition.Type.GREATER_OR_EQUAL; case REV_LESS: return RevisionCondition.Type.LESS; case REV_LESS_OR_EQUAL: return RevisionCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected revision condition type " + type); } } }
Attachments
Issue Links
- blocks
-
IGNITE-19578 Decrease count of lease messages to meta storage
- Resolved
- links to