seda.sandStorm.internal
Class SinkProxy

java.lang.Object
  extended by seda.sandStorm.internal.SinkProxy
All Implemented Interfaces:
ProfilableIF, SinkIF

public class SinkProxy
extends java.lang.Object
implements SinkIF, ProfilableIF

Used as a proxy to observe and measure communication behavior between stages. By handing out a SinkProxy instead of a FiniteQueue, it is possible to gather statistics on event communication between stages. This is used by StageGraph to construct a graph of the communication patterns between stages.


Field Summary
private  java.lang.Thread client
           
private  java.util.Hashtable clientTbl
           
private static boolean DEBUG
           
 int enqueueCount
          Maintains a running sum of the number of elements enqueued onto this sink.
 int enqueueSuccessCount
          Maintains a running sum of the number of elements successfully enqueued onto this sink (that is, not rejected by the enqueue predicate).
private  ManagerIF mgr
           
private  StageGraph stageGraph
           
 SinkIF thesink
           
 long timer
          Used to maintain a timer for statistics gathering.
private  StageWrapperIF toStage
           
 
Constructor Summary
SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage)
          Create a SinkProxy for the given sink.
 
Method Summary
 void enqueue_abort(java.lang.Object key)
          Abort a previously prepared provisional enqueue operation (from the enqueue_prepare() method).
 void enqueue_commit(java.lang.Object key)
          Commit a previously prepared provisional enqueue operation (from the enqueue_prepare() method).
 boolean enqueue_lossy(QueueElementIF enqueueMe)
          Enqueues the given element onto the queue.
 void enqueue_many(QueueElementIF[] enqueueMe)
          Given an array of elements, atomically enqueues all of the elements in the array.
 java.lang.Object enqueue_prepare(QueueElementIF[] enqueueMe)
          Support for transactional enqueue.
 void enqueue(QueueElementIF enqueueMe)
          Enqueues the given element onto the queue.
 EnqueuePredicateIF getEnqueuePredicate()
          Return the enqueue predicate for this sink.
 int profileSize()
          Return the profile size of the queue.
private  void recordUse()
           
 void setEnqueuePredicate(EnqueuePredicateIF pred)
          Set the enqueue predicate for this sink.
 int size()
          Return the size of the queue.
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

DEBUG

private static final boolean DEBUG
See Also:
Constant Field Values

mgr

private ManagerIF mgr

toStage

private StageWrapperIF toStage

stageGraph

private StageGraph stageGraph

thesink

public SinkIF thesink

client

private java.lang.Thread client

clientTbl

private java.util.Hashtable clientTbl

enqueueCount

public int enqueueCount
Maintains a running sum of the number of elements enqueued onto this sink.


enqueueSuccessCount

public int enqueueSuccessCount
Maintains a running sum of the number of elements successfully enqueued onto this sink (that is, not rejected by the enqueue predicate).


timer

public long timer
Used to maintain a timer for statistics gathering.

Constructor Detail

SinkProxy

public SinkProxy(SinkIF sink,
                 ManagerIF mgr,
                 StageWrapperIF toStage)
Create a SinkProxy for the given sink.

Parameters:
sink - The sink to create a proxy for.
mgr - The associated manager.
toStage - The stage which this sink pushes events to.
Method Detail

size

public int size()
Return the size of the queue.

Specified by:
size in interface SinkIF

enqueue

public void enqueue(QueueElementIF enqueueMe)
             throws SinkException
Description copied from interface: SinkIF
Enqueues the given element onto the queue.

Specified by:
enqueue in interface SinkIF
Parameters:
enqueueMe - The QueueElementIF to enqueue
Throws:
SinkFullException - Indicates that the sink is temporarily full.
SinkClosedException - Indicates that the sink is no longer being serviced.
SinkException

enqueue_lossy

public boolean enqueue_lossy(QueueElementIF enqueueMe)
Description copied from interface: SinkIF
Enqueues the given element onto the queue. This is lossy in that this method drops the element if the element could not be enqueued, rather than throwing a SinkFullException or SinkClosedException. This is meant as a convenience interface for "low priority" enqueue events which can be safely dropped.

Specified by:
enqueue_lossy in interface SinkIF
Parameters:
enqueueMe - The QueueElementIF to enqueue
Returns:
true if the element was enqueued, false otherwise.

enqueue_many

public void enqueue_many(QueueElementIF[] enqueueMe)
                  throws SinkException
Description copied from interface: SinkIF
Given an array of elements, atomically enqueues all of the elements in the array. This guarantees that no other thread can interleave its own elements with those being inserted from this array. The implementation must enqueue all of the elements or none of them; if a SinkFullException or SinkClosedException is thrown, none of the elements will have been enqueued. This implies that the enqueue predicate (if any) must accept all elements in the array for the enqueue to proceed.

Specified by:
enqueue_many in interface SinkIF
Parameters:
enqueueMe - The element array to enqueue
Throws:
SinkFullException - Indicates that the sink is temporarily full.
SinkClosedException - Indicates that the sink is no longer being serviced.
SinkException

profileSize

public int profileSize()
Return the profile size of the queue.

Specified by:
profileSize in interface ProfilableIF

enqueue_prepare

public java.lang.Object enqueue_prepare(QueueElementIF[] enqueueMe)
                                 throws SinkException
Description copied from interface: SinkIF
Support for transactional enqueue.

This method allows a client to provisionally enqueue a number of elements onto the queue, and then later commit the enqueue (with a enqueue_commit() call), or abort (with a enqueue_abort() call). This mechanism can be used to perform "split-phase" enqueues, where a client first enqueues a set of elements on the queue and then performs some work to "fill in" those elements before performing a commit. This can also be used to perform multi-queue transactional enqueue operations, with an "all-or-nothing" strategy for enqueueing events on multiple queues.

This method would generally be used in the following manner:

   Object key = sink.enqueue_prepare(someElements);
   if (can_commit) {
     sink.enqueue_commit(key);
   } else {
     sink.enqueue_abort(key);
   }
 

Note that this method does not protect against "dangling prepares" -- that is, a prepare without an associated commit or abort operation. This method should be used with care. In particular, be sure that all code paths (such as exceptions) after a prepare include either a commit or an abort.

Like enqueue_many, enqueue_prepare is an "all or none" operation: the enqueue predicate must accept all elements for enqueue, or none of them will be enqueued.

Specified by:
enqueue_prepare in interface SinkIF
Parameters:
enqueueMe - The element array to provisionally enqueue
Returns:
A "transaction key" that may be used to commit or abort the provisional enqueue
Throws:
SinkFullException - Indicates that the sink is temporarily full and that the requested elements could not be provisionally enqueued.
SinkClosedException - Indicates that the sink is no longer being serviced.
SinkException
See Also:
enqueue_commit, enqueue_abort

enqueue_commit

public void enqueue_commit(java.lang.Object key)
Description copied from interface: SinkIF
Commit a previously prepared provisional enqueue operation (from the enqueue_prepare() method). Causes the provisionally enqueued elements to appear on the queue for future dequeue operations. Note that once a enqueue_prepare() has returned an enqueue key, the queue cannot reject the entries.

Specified by:
enqueue_commit in interface SinkIF

enqueue_abort

public void enqueue_abort(java.lang.Object key)
Description copied from interface: SinkIF
Abort a previously prepared provisional enqueue operation (from the enqueue_prepare() method). Causes the queue to discard the provisionally enqueued elements.

Specified by:
enqueue_abort in interface SinkIF

setEnqueuePredicate

public void setEnqueuePredicate(EnqueuePredicateIF pred)
Description copied from interface: SinkIF
Set the enqueue predicate for this sink. This mechanism allows user to define a method that will 'screen' QueueElementIF's during the enqueue procedure to either accept or reject them. The enqueue predicate runs in the context of the caller of enqueue(), which means it must be simple and fast. This can be used to implement many interesting queue-thresholding policies, such as simple count threshold, credit-based mechanisms, and more.

Specified by:
setEnqueuePredicate in interface SinkIF

getEnqueuePredicate

public EnqueuePredicateIF getEnqueuePredicate()
Description copied from interface: SinkIF
Return the enqueue predicate for this sink.

Specified by:
getEnqueuePredicate in interface SinkIF

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

recordUse

private void recordUse()