Wednesday 17 June 2015

Part 5. Java Thread Synchronization, Wait notify: Queue, a case study


In the series of Java thread, I have wrote four posts about this topic:
Part 1. Basic Thread in java. How to create them?
Part 2. Thread in java: method revisiting
Part 3. Thread in Java: Thread synchronization (or thread safe)
Part 4: Java thread synchronization and wait/notify mechanism

One case study is writing queue for sharing data between many threads. I have used this queue in many projects like web, restful which requires sharing data between many unknown threads.

/**
*
* @author LuyenCHU
*/
import java.util.Collection;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Queue {

   private int maxQueueSize = 0;
   private LinkedList queueData = new LinkedList();
   private boolean isWating = false;

   public Queue() {
   }

   public Queue(int maxSize) {
       maxQueueSize = maxSize;

   }

   public int size() {
       synchronized (this) {
           return queueData.size();
       }
   }

   public boolean isEmpty() {
       synchronized (this) {
           return queueData.isEmpty();
       }
   }

   public T dequeue() {
       synchronized (this) {
           T first = null;
           if (size() > 0) {
               first = queueData.removeFirst();
           }
           if (first == null) {
               //will wait when no element in the list
               //System.out.println("No Data found in the queue, waiting.......................");
               isWating = true;
               try {
                   this.wait();
               } catch (InterruptedException ex) {
                   Logger.getLogger(Queue.class.getName()).log(Level.SEVERE, null, ex);
               }
           }

           return first;
       }
   }

   public T dequeue(T obj) {
       T found = null;
       synchronized (this) {
           found = find(obj);
           if (found != null) {
               queueData.remove(found);
           }
       }
       return found;
   }

   public void enqueue(T obj) throws IndexOutOfBoundsException {
       //long curr = System.currentTimeMillis();
          //System.out.println("Added one");
       synchronized (this) {
           if ((maxQueueSize > 0) && (size() >= maxQueueSize)) {
               throw new IndexOutOfBoundsException("Queue is full. Element not added.");
           }
           queueData.add(obj);
           if (isWating) {
               this.notify();
               isWating = false;
           }
       }
       //System.out.println("Add Time : " + (System.currentTimeMillis() - curr));
   }
   public T find(T obj) {
       synchronized (this) {
           T current;
           ListIterator iter = queueData.listIterator(0);
           while (iter.hasNext()) {
               current = iter.next();
               if (current.equals(obj)) {
                   return current;
               }
           }
       }
       return null;
   }

   public void addAll(Collection obj) {
       synchronized (this) {
           this.queueData.addAll(obj);
           if (isWating) {
               this.notify();
               isWating = false;
           }
       }
   }

}

Part 4: Java thread synchronization and wait/notify mechanism

Wait notify mechanism with synchronization
In posts of thread in Java, I wrote four posts about this topic:
Part 1. Basic Thread in java. How to create them?
Part 2. Thread in java: method revisiting
Part 3. Thread in Java: Thread synchronization (or thread safe)
Part 4: Java thread synchronization and wait/notify mechanism

As mentioned in previous post [Part 3: Java Thread], we never know when the shared resource is set or not yet set. We cannot make forever loop to check the state of that resource. One solution for this solution is to use wait/notify mechanism in java. The way to do that is we use [object.wait()] to wait for other threads set the data. In the threads setting the data, we call [object.notify()] to notify all threads are waiting for resource wake up, and doing necessary process. Look at example below to see how wait/notify works.

import java.util.LinkedList;

public class SynchronizeDemo {
       public static void main(String[] args) {
              LinkedList resources = new LinkedList();
              PusherThread pt = new PusherThread(resources, 100);
              pt.start();

              PrinterThread print1 = new PrinterThread(resources, "Print1", 100);
              PrinterThread print2 = new PrinterThread(resources, "Print2", 50);
              print1.start();
              print2.start();
              System.out.println("All threads are started!!!!");
       }

}

class PusherThread extends Thread {
       private LinkedList rs;

       public PusherThread(LinkedList rs, int no) {
              this.noOfElement = no;
              this.rs = rs;
       }

       private int noOfElement = 10;

       public void run() {
              for (int i = 0; i < noOfElement; i++) {
                     synchronized (rs) {
                           rs.add(i);
                       rs.notify();
                           try {
                                  Thread.sleep(300);
                           } catch (InterruptedException e) {
                           }
                     }
              }
              System.out.println(noOfElement + " numbers are added to the list!");
       }
}

class PrinterThread extends Thread {
       private LinkedList rs;

       public PrinterThread(LinkedList rs, String name, long sleepTime) {
              this.rs = rs;
              this.sleepTime = sleepTime;
              this.threadName = name;
       }

       private boolean start = true;
       private long sleepTime = 100;

       public void stop(boolean val) {
              this.start = val;
       }

       public void sleepIn(long val) {
              this.sleepTime = val;
       }

       private String threadName;

       public void nameMe(String val) {
              this.threadName = val;
       }

       public void run() {
              while (start) {
                     // synchronize helps us only one value is removed
                     synchronized (rs) {
                           if (!rs.isEmpty()) {
                                  int val = rs.remove();// remove a value
                                  System.out.println("[" + threadName + "], Removed :" + val);
                           }else{
                           System.out.println("No more data, waiting....");
                           try {
                                rs.wait();
                           } catch (InterruptedException e) {
                           }
                           }
                     }
                     try {
                           Thread.sleep(sleepTime);
                     } catch (InterruptedException e) {
                     }
              }
       }

}