Details
Description
Ignite fails to restore snapshot created under streamed load:
Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=148] Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]] Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=146] Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
Test (attached):
public void testClusterSnapshotConsistencyWithStreamer() throws Exception { int grids = 2; CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000); AtomicBoolean stopLoading = new AtomicBoolean(false); dfltCacheCfg = null; Class.forName("org.apache.ignite.IgniteJdbcDriver"); String tableName = "TEST_TBL1"; startGrids(grids); grid(0).cluster().state(ACTIVE); IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, stopLoading, loadNumberBeforeSnapshot); loadNumberBeforeSnapshot.await(); grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(); stopLoading.set(true); load1.get(); grid(0).cache("SQL_PUBLIC_" + tableName).destroy(); grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, F.asList("SQL_PUBLIC_TEST_TBL1")).get(); } /** */ private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, int backups, boolean streaming, AtomicBoolean stop, CountDownLatch startSnp) { return GridTestUtils.runMultiThreadedAsync(() -> { if(useCache) { String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase(); IgniteCache<Integer, Object> cache = grid(0) .createCache(new CacheConfiguration<Integer, Object>(cacheName).setBackups(backups) .setCacheMode(CacheMode.REPLICATED)); try (IgniteDataStreamer<Integer, Object> ds = grid(0).dataStreamer(cacheName)) { for (int i = 0; !stop.get(); ++i) { if (streaming) ds.addData(i, new Account(i, i - 1)); else cache.put(i, new Account(i, i - 1)); if (startSnp.getCount() > 0) startSnp.countDown(); Thread.yield(); } } } else { try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) { createTable(conn, tblName, backups); try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + tblName + "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) { if (streaming) conn.prepareStatement("SET STREAMING ON;").execute(); int leftLimit = 97; // letter 'a' int rightLimit = 122; // letter'z' int targetStringLength = 15; Random rand = new Random(); // for (int i = 0; !stop.get(); ++i) { int orgid = rand.ints(1, 0, 5).findFirst().getAsInt(); String val = rand.ints(leftLimit, rightLimit + 1).limit(targetStringLength) .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) .toString(); stmt.setInt(1, i); stmt.setString(2, val); stmt.setInt(3, orgid); stmt.setInt(4, 0); stmt.executeUpdate(); if (startSnp.getCount() > 0) startSnp.countDown(); Thread.yield(); } } } catch (Exception e) { while (startSnp.getCount() > 0) startSnp.countDown(); throw new IgniteException("Unable to load.", e); } } }, 1, "load-thread-" + tblName); }
Attachments
Attachments
Issue Links
- Dependency
-
IGNITE-18075 Improve snapshot creation check of the streaming updates.
-
- Resolved
-
-
IGNITE-18076 Store snapshot creation warnings.
-
- Resolved
-
- is related to
-
IGNITE-19465 Log message of warn-result of snapshot creation might be more accurate.
-
- Resolved
-
- relates to
-
IGNITE-18310 Add cache ids to the DataStreamer-while-snapshot warning.
-
- Open
-
- links to
1.
|
Fix Datastreamer documentation. |
|
Resolved | Vladimir Steshin |