Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Distributed Lock, SQL Query, and Entry Processor With C# in Apache Ignite

DZone's Guide to

Distributed Lock, SQL Query, and Entry Processor With C# in Apache Ignite

In this example, we get a distributed lock on the cache with a specific key and bulk process the entries on the server side with an entry processor.

· Database Zone ·
Free Resource

Databases are better when they can run themselves. CockroachDB is a SQL database that automates scaling and recovery. Check it out here.

Before beginning our example, check out my previous article about the query entity and basic SQL on Apache Ignite.

Let's start our simple example with defining the bean to be cached and queried: 

public class Seat: IBinarizable {
 [QuerySqlField(IsIndexed = true)]
 public int SeatId {
  set;
  get;
 }

 public int ActId {
  set;
  get;
 }

 public int SectionId {
  set;
  get;
 }

 public int BlockId {
  set;
  get;
 }

 public int RowId {
  set;
  get;
 }

 public int SeatNo {
  set;
  get;
 }

 [QuerySqlField(IsIndexed = true)]
 public int SeatStatus {
  set;
  get;
 }

 public Seat(int seatId, int actId, int sectionId, int blockId, int rowId, int seatNo, int seatStatus) {
  SeatId = seatId;
  ActId = actId;
  SectionId = sectionId;
  BlockId = blockId;
  RowId = rowId;
  SeatNo = seatNo;
  SeatStatus = seatStatus;
 }

 public Seat() {}

 public void WriteBinary(IBinaryWriter writer) {
  writer.WriteInt("SeatId", SeatId);
  writer.WriteInt("ActId", ActId);
  writer.WriteInt("SectionId", SectionId);
  writer.WriteInt("BlockId", BlockId);
  writer.WriteInt("RowId", RowId);
  writer.WriteInt("SeatStatus", SeatStatus);
 }

 public void ReadBinary(IBinaryReader reader) {
  SeatId = reader.ReadInt("SeatId");
  ActId = reader.ReadInt("ActId");
  SectionId = reader.ReadInt("SectionId");
  BlockId = reader.ReadInt("BlockId");
  RowId = reader.ReadInt("RowId");
  SeatStatus = reader.ReadInt("SeatStatus");
 }

}

Be careful about the QuerySqlField annotation on SeatId and SeatStatus fields. These are important!

Then, let's configure the Spring XML file for our Ignite server:

<beans
	xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
	<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
		<!-- Set to true to enable distributed class loading for examples, default is false. -->
		<property name="clientMode" value="false"/>
		<property name="peerClassLoadingEnabled" value="true"/>
		<property name="cacheConfiguration">
			<list>
				<bean class="org.apache.ignite.configuration.CacheConfiguration">
					<property name="name" value="SeatCache"/>
					<property name="cacheMode" value="REPLICATED"/>
					<property name="atomicityMode" value="TRANSACTIONAL"/>
					<property name="queryEntities">
						<list>
							<bean class="org.apache.ignite.cache.QueryEntity">
								<property name="keyType" value="java.lang.String"/>
								<property name="valueType" value="IgniteTest.Seat"/>
								<property name="fields">
									<map>
										<entry key="SeatId" value="java.lang.Integer"/>
										<entry key="SeatStatus" value="java.lang.Integer"/>
									</map>
								</property>
								<property name="indexes">
									<list>
										<bean class="org.apache.ignite.cache.QueryIndex">
											<constructor-arg value="SeatId"/>
										</bean>
										<bean class="org.apache.ignite.cache.QueryIndex">
											<constructor-arg value="SeatStatus"/>
										</bean>
									</list>
								</property>
							</bean>
						</list>
					</property>
				</bean>
			</list>
		</property>
		<property name="discoverySpi">
			<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
				<property name="ipFinder">
					<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
						<property name="addresses">
							<list>
								<value>127.0.0.1:47500..47509</value>
							</list>
						</property>
					</bean>
				</property>
			</bean>
		</property>
	</bean>
</beans>

It is important to configure cache's atomicity mode as transactional to take advantage of the locking mechanism.

The other important property is queryEntities on our configuration. We are configuring the fields which are going to be queried and indexed by H2 database with this property.

We will create our EntryProcessor now. For further information about entry processors, you can look at the official documentation about cache operations

[Serializable]
public class KeyProcessor: ICacheEntryProcessor < string, Seat, int, Seat > {
 public Seat Process(IMutableCacheEntry < string, Seat > entry, int arg) {
  var newEntry = entry.Value;
  newEntry.SeatStatus = arg;
  entry.Value = newEntry;
  return entry.Value;
 }
}

This processor is so simple that it only updates the SeatStatus fields of a cache entry. It is important to set the entry.Value in the Process method. This is the key point for updating the cache.

We are ready to start our Ignite server:

Environment.SetEnvironmentVariable("IGNITE_H2_DEBUG_CONSOLE", "true");
IIgnite ignite = Ignition.Start("{configuration_xml_file}");

We enabled the H2 database debug console to query our memory database. After starting the server H2 database, the table structure will look like this:

Image title

As seen, two fields are indexed and in a queryable state on the SeatCache.SEAT table.

Let's pump some sample data to our cache. We can write a small piece of client code for this purpose. In any case we, have the bean and the configuration file.

After filling our cache, let's run a simple query on our memory table with the help of the H2 debug console:

Image title

Now, we are ready to make some changes on our table. Let's select some rows and update their SeatStatus fields with the C# client.

Let's change the clientMode to true on our config file and run our C# client code:

Environment.SetEnvironmentVariable("IGNITE_H2_DEBUG_CONSOLE", "true");
IIgnite ignite = Ignition.Start("{configuration_xml_file}");
ICache < string, Seat > seatCache = ignite.GetCache < string, Seat > ("SeatCache");

var lockk = seatCache.Lock("myLock");
lockk.Enter();

var sql = new SqlQuery(typeof(Seat), "where SeatId >= ? and SeatId < ? and SeatStatus = ?", 99995, 99998, 1);
IQueryCursor < ICacheEntry < string, Seat >> queryCursor = seatCache.Query(sql);
List < String > keys = new List < string > ();
foreach(ICacheEntry < string, Seat > entry in queryCursor) {
 keys.Add(key);
}
seatCache.InvokeAll(keys, new KeyProcessor(), 0);

lockk.Exit();

We get a distributed lock named myLock on our cache. After entering the lock, we query the in-memory H2 database and bulk process the entries by calling the invokeAll method of our cache. Our entry processor updates the SeatStatus fields. After the process, we release the lock, and the final state of the table is:

Image title

In this example we achieved two important things:

  1. Get a distributed lock on the cache with a specific key.

  2. Bulk process the entries on the server side with an entry processor.

We can use this logic as a "Select For Update Skip Locked" mechanism, meaning we take its advantages on some relational databases (for example, Oracle, PostgreSQL).

Databases should be easy to deploy, easy to use, and easy to scale. If you agree, you should check out CockroachDB, a scalable SQL database built for businesses of every size. Check it out here. 

Topics:
apache ignite ,distributed cache ,tutorial ,sql ,c# ,database

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}