/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.ignite.scenario; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.scenario.internal.AbstractTask; import org.apache.ignite.scenario.internal.PocTesterArguments; import org.apache.ignite.scenario.internal.TaskProperties; import org.apache.ignite.scenario.internal.events.EventFileStorageImpl; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.scenario.internal.utils.PocTesterUtils.println; import static org.apache.ignite.scenario.internal.utils.PocTesterUtils.sleep; /** * Activates cluster. */ public class RegisterTask extends AbstractTask { /** */ private static final Logger LOG = LogManager.getLogger(RegisterTask.class.getName()); private IgniteCache cacheToPut; private Random rand; /** */ private static final String SEQ_NUM_FLD = "f0"; /** */ private static final int UPDATES_COUNT = 50; /** */ private static final String BINARY_TYPE_NAME = "TestBinaryType"; /** * * @param args Arguments. * @param props Task properties. */ public RegisterTask(PocTesterArguments args, TaskProperties props){ super(args, props); } /** * * @throws Exception */ @Override public void setUp() throws Exception { super.setUp(); boolean done = false; int cnt = 0; while (!done && cnt++ < 300) { try { cacheToPut = ignite().createCache("cacheToPut"); } catch (Exception e){ LOG.error(String.format("Failed to set cache 'cacheToPut'. Will try again. Cnt = %d", cnt), e); sleep(100L); } } rand = new Random(); } /** {@inheritDoc} */ @Override public void body0() { AtomicBoolean ab = new AtomicBoolean(); ab.set(false); boolean done = false; int cnt = 0; while (!done && cnt++ < 300) { try { ignite().compute().withAsync().call(new BinaryObjectAdder((IgniteEx)ignite(), 5000, ab)); } catch (Exception e){ LOG.error(String.format("Failed to send compute job. Will try again. Cnt = %d", cnt), e); sleep(100L); } } } /** {@inheritDoc} */ @Nullable public String getTaskReport() { //TODO: avoid null result. return null; } /** */ @Override protected void addPropsToMap(TaskProperties props){ super.addPropsToMap(props); Map hdrMap = (Map) propMap.get("headersMap"); hdrMap.put("unit", "boolean"); hdrMap.put("data", "status"); propMap.put("reportDir", "reports"); } /** * Instruction for node to perform add new binary object action on cache in keepBinary mode. * * Instruction includes id the object should be added under, new field to add to binary schema * and {@link FieldType type} of the field. */ private static final class BinaryUpdateDescription { /** */ private int itemId; /** */ private String fieldName; /** */ private FieldType fieldType; /** * @param itemId Item id. * @param fieldName Field name. * @param fieldType Field type. */ private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType) { this.itemId = itemId; this.fieldName = fieldName; this.fieldType = fieldType; } } /** * */ private enum FieldType { /** */ NUMBER, /** */ STRING, /** */ ARRAY, /** */ OBJECT } /** * Generates random number to use when creating binary object with field of numeric {@link FieldType type}. */ private static int getNumberFieldVal() { return ThreadLocalRandom.current().nextInt(100); } /** * Generates random string to use when creating binary object with field of string {@link FieldType type}. */ private static String getStringFieldVal() { return "str" + (100 + ThreadLocalRandom.current().nextInt(9)); } /** * Generates random array to use when creating binary object with field of array {@link FieldType type}. */ private static byte[] getArrayFieldVal() { byte[] res = new byte[3]; ThreadLocalRandom.current().nextBytes(res); return res; } /** * @param builder Builder. * @param desc Descriptor with parameters of BinaryObject to build. * @return BinaryObject built by provided description */ private static BinaryObject newBinaryObject(BinaryObjectBuilder builder, BinaryUpdateDescription desc) { builder.setField(SEQ_NUM_FLD, desc.itemId + 1); switch (desc.fieldType) { case NUMBER: builder.setField(desc.fieldName, getNumberFieldVal()); break; case STRING: builder.setField(desc.fieldName, getStringFieldVal()); break; case ARRAY: builder.setField(desc.fieldName, getArrayFieldVal()); break; case OBJECT: builder.setField(desc.fieldName, new Object()); } return builder.build(); } /** * Compute job executed on each node in cluster which constantly adds new entries to ignite cache * according to {@link BinaryUpdateDescription descriptions} it reads from shared queue. */ private static final class BinaryObjectAdder implements IgniteCallable { /** */ private final IgniteEx ignite; /** */ private final long timeout; /** */ private final AtomicBoolean stopFlag; /** */ private Random r = new Random(); /** * @param ignite Ignite. * @param timeout Timeout. * @param stopFlag Stop flag. */ BinaryObjectAdder(IgniteEx ignite, long timeout, AtomicBoolean stopFlag) { this.ignite = ignite; this.timeout = timeout; this.stopFlag = stopFlag; } /** {@inheritDoc} */ @Override public Object call() throws Exception { // START_LATCH.await(); boolean done = false; int cnt = 0; IgniteCache cache; while(!done && cnt++ < 300) { try { cache = ignite.cache("cacheToPut").withKeepBinary(); BinaryUpdateDescription[] updatesArr = new BinaryUpdateDescription[50]; for (int i = 0; i < 50; i++) { FieldType fType = null; int f = r.nextInt(4); switch (f) { case 0: fType = FieldType.NUMBER; break; case 1: fType = FieldType.STRING; break; case 2: fType = FieldType.ARRAY; break; case 3: fType = FieldType.OBJECT; } updatesArr[i] = new BinaryUpdateDescription(i, "f" + (f + 1), fType); } int i = 0; while (i<1000) { BinaryUpdateDescription desc = updatesArr[r.nextInt(50)]; if (desc == null) break; BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME + i + r.nextInt()); i++; BinaryObject bo = newBinaryObject(builder, desc); cache.put(desc.itemId, bo); // LOG.info(String.format("Cache size = %d", cache.size())); // LOG.info(String.format("Queue size = %d", updatesQueue.size())); } } catch (Exception e){ LOG.error(String.format("Failed to get cache instance. Will try again. Cnt = %d", cnt), e); sleep(1000L); } } // if (updatesQueue.isEmpty()) // FINISH_LATCH_NO_CLIENTS.countDown(); stopFlag.set(false); return null; } } }