Wednesday 17 June 2015

Part 5. Java Thread Synchronization, Wait notify: Queue, a case study


In the series of Java thread, I have wrote four posts about this topic:
Part 1. Basic Thread in java. How to create them?
Part 2. Thread in java: method revisiting
Part 3. Thread in Java: Thread synchronization (or thread safe)
Part 4: Java thread synchronization and wait/notify mechanism

One case study is writing queue for sharing data between many threads. I have used this queue in many projects like web, restful which requires sharing data between many unknown threads.

/**
*
* @author LuyenCHU
*/
import java.util.Collection;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Queue {

   private int maxQueueSize = 0;
   private LinkedList queueData = new LinkedList();
   private boolean isWating = false;

   public Queue() {
   }

   public Queue(int maxSize) {
       maxQueueSize = maxSize;

   }

   public int size() {
       synchronized (this) {
           return queueData.size();
       }
   }

   public boolean isEmpty() {
       synchronized (this) {
           return queueData.isEmpty();
       }
   }

   public T dequeue() {
       synchronized (this) {
           T first = null;
           if (size() > 0) {
               first = queueData.removeFirst();
           }
           if (first == null) {
               //will wait when no element in the list
               //System.out.println("No Data found in the queue, waiting.......................");
               isWating = true;
               try {
                   this.wait();
               } catch (InterruptedException ex) {
                   Logger.getLogger(Queue.class.getName()).log(Level.SEVERE, null, ex);
               }
           }

           return first;
       }
   }

   public T dequeue(T obj) {
       T found = null;
       synchronized (this) {
           found = find(obj);
           if (found != null) {
               queueData.remove(found);
           }
       }
       return found;
   }

   public void enqueue(T obj) throws IndexOutOfBoundsException {
       //long curr = System.currentTimeMillis();
          //System.out.println("Added one");
       synchronized (this) {
           if ((maxQueueSize > 0) && (size() >= maxQueueSize)) {
               throw new IndexOutOfBoundsException("Queue is full. Element not added.");
           }
           queueData.add(obj);
           if (isWating) {
               this.notify();
               isWating = false;
           }
       }
       //System.out.println("Add Time : " + (System.currentTimeMillis() - curr));
   }
   public T find(T obj) {
       synchronized (this) {
           T current;
           ListIterator iter = queueData.listIterator(0);
           while (iter.hasNext()) {
               current = iter.next();
               if (current.equals(obj)) {
                   return current;
               }
           }
       }
       return null;
   }

   public void addAll(Collection obj) {
       synchronized (this) {
           this.queueData.addAll(obj);
           if (isWating) {
               this.notify();
               isWating = false;
           }
       }
   }

}

No comments:

Post a Comment