org.apache.uima.collection.impl.cpm.engine
Class ProcessingUnit

java.lang.Object
  extended by java.lang.Thread
      extended by org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
All Implemented Interfaces:
java.lang.Runnable

public class ProcessingUnit
extends java.lang.Thread

This component executes the processing pipeline. Running in a seperate thread it continuously reads bundles of Cas from the Work Queue filled by ArtifactProducer and sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of Cas Processor listing in the cpe descriptor. The results of analysis produced be Cas Processors is enqueued onto an output queue that is shared with Cas Consumers.


Nested Class Summary
 
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
 
Field Summary
protected  java.lang.Object[] artifact
           
protected  CAS[] casList
           
protected  CPECasPool casPool
           
protected  CAS conversionCas
           
protected  CAS[] conversionCasArray
           
protected  CpeConfiguration cpeConfiguration
           
protected  CPMEngine cpm
           
protected  CasConverter mConverter
           
protected  boolean notifyListeners
           
protected  long numToProcess
           
protected  BoundedWorkQueue outputQueue
           
protected  java.util.LinkedList processContainers
           
protected  ProcessTrace processingUnitProcessTrace
           
protected  boolean releaseCAS
           
protected  java.util.ArrayList statusCbL
           
protected  java.lang.String threadId
           
 int threadState
           
protected  UimaTimer timer
           
 long timer01
           
 long timer02
           
 long timer03
           
 long timer04
           
 long timer05
           
 long timer06
           
protected  BoundedWorkQueue workQueue
           
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Constructor Summary
ProcessingUnit()
           
ProcessingUnit(CPMEngine acpm)
           
ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
          Initialize the PU
 
Method Summary
 void addStatusCallbackListener(BaseStatusCallbackListener aListener)
          Plugs in Listener object used for notifications.
protected  boolean analyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)
          An alternate processing loop designed for the single-threaded CPM.
 void cleanup()
          Null out fields of this object.
 boolean consumeQueue()
          Consumes the input queue to make sure all bundles still there get processede before CPE terminates.
 void disableCasProcessor(int aCasProcessorIndex)
          Disable a CASProcessor in the processing pipeline.
 void disableCasProcessor(java.lang.String aCasProcessorName)
          Alternative method to disable Cas Processor.
protected  void doNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
          Notifies all configured listeners.
 void enableCasProcessor(java.lang.String aCasProcessorName)
          Enables Cas Processor with a given name.
protected  boolean endOfProcessingReached(long aCount)
          Returns true if the CPM has finished analyzing the collection.
protected  long getBytes(java.lang.Object aCas)
          Returns the size of the CAS object.
 java.util.ArrayList getCallbackListeners()
          Returns list of listeners used by this PU for callbacks.
 boolean isCasConsumerPipeline()
           
protected  boolean isProcessorReady(int aStatus)
          Check if the CASProcessor status is available for processing
 boolean isRunning()
          Returns true if this component is in running state.
protected  void notifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
          Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es
protected  void process(java.lang.Object anArtifact)
           
protected  boolean processNext(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)
          Executes the processing pipeline.
 void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
          Removes given listener from the list of listeners
 void run()
          Starts the Processing Pipeline thread.
 void setCasConsumerPipelineIdentity()
          Define a CasConsumer Pipeline identity for this instance
 void setCasPool(CPECasPool aPool)
           
 void setContainers(java.util.LinkedList processorList)
          Plugs in a list of Cas Processor containers.
 void setCPMEngine(CPMEngine acpm)
          Alternative method of providing the reference to the component managing the lifecycle of the CPE
 void setInputQueue(BoundedWorkQueue aInputQueue)
          Alternative method of providing a queue from which this PU will read bundle of Cas
 void setNotifyListeners(boolean aDoNotify)
          Set a flag indicating if notifications should be made via configured Listeners
 void setOutputQueue(BoundedWorkQueue aOutputQueue)
          Alternative method of providing a queue where this PU will deposit results of analysis
 void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
          Plugs in ProcessTrace object used to collect statistics
 void setReleaseCASFlag(boolean aFlag)
          Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing.
 void setUimaTimer(UimaTimer aTimer)
          Plugs in custom timer used by the PU for getting time
protected  void showMetadata(java.lang.Object[] aCasList)
           
 void stopCasProcessors(boolean kill)
          Stops all Cas Processors that are part of this PU.
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

threadState

public int threadState

casPool

protected CPECasPool casPool

releaseCAS

protected boolean releaseCAS

cpm

protected CPMEngine cpm

workQueue

protected BoundedWorkQueue workQueue

outputQueue

protected BoundedWorkQueue outputQueue

mConverter

protected CasConverter mConverter

processingUnitProcessTrace

protected ProcessTrace processingUnitProcessTrace

processContainers

protected java.util.LinkedList processContainers

numToProcess

protected long numToProcess

casList

protected CAS[] casList

statusCbL

protected java.util.ArrayList statusCbL

notifyListeners

protected boolean notifyListeners

conversionCas

protected CAS conversionCas

artifact

protected java.lang.Object[] artifact

conversionCasArray

protected CAS[] conversionCasArray

timer

protected UimaTimer timer

threadId

protected java.lang.String threadId

cpeConfiguration

protected CpeConfiguration cpeConfiguration

timer01

public long timer01

timer02

public long timer02

timer03

public long timer03

timer04

public long timer04

timer05

public long timer05

timer06

public long timer06
Constructor Detail

ProcessingUnit

public ProcessingUnit()

ProcessingUnit

public ProcessingUnit(CPMEngine acpm,
                      BoundedWorkQueue aInputQueue,
                      BoundedWorkQueue aOutputQueue)
Initialize the PU

Parameters:
acpm - - component managing life cycle of the CPE
aInputQueue - - queue to read from
aOutputQueue - - queue to write to

ProcessingUnit

public ProcessingUnit(CPMEngine acpm)
Method Detail

isRunning

public boolean isRunning()
Returns true if this component is in running state.

Returns:
- true if running, false otherwise

setCasConsumerPipelineIdentity

public void setCasConsumerPipelineIdentity()
Define a CasConsumer Pipeline identity for this instance

Parameters:
aInputQueue -

isCasConsumerPipeline

public boolean isCasConsumerPipeline()

setInputQueue

public void setInputQueue(BoundedWorkQueue aInputQueue)
Alternative method of providing a queue from which this PU will read bundle of Cas

Parameters:
aInputQueue - - read queue

setOutputQueue

public void setOutputQueue(BoundedWorkQueue aOutputQueue)
Alternative method of providing a queue where this PU will deposit results of analysis

Parameters:
aOutputQueue - - queue to write to

setCPMEngine

public void setCPMEngine(CPMEngine acpm)
Alternative method of providing the reference to the component managing the lifecycle of the CPE

Parameters:
acpm - - reference to the contrlling engine

cleanup

public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.


setNotifyListeners

public void setNotifyListeners(boolean aDoNotify)
Set a flag indicating if notifications should be made via configured Listeners

Parameters:
aDoNotify - - true if notification is required, false otherwise

addStatusCallbackListener

public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
Plugs in Listener object used for notifications.

Parameters:
aListener - - BaseStatusCallbackListener instance

getCallbackListeners

public java.util.ArrayList getCallbackListeners()
Returns list of listeners used by this PU for callbacks.

Returns:
- lif of BaseStatusCallbackListener instances

removeStatusCallbackListener

public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Removes given listener from the list of listeners

Parameters:
aListener - - object to remove from the list

setProcessingUnitProcessTrace

public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
Plugs in ProcessTrace object used to collect statistics

Parameters:
aProcessingUnitProcessTrace - - object to compile stats

setUimaTimer

public void setUimaTimer(UimaTimer aTimer)
Plugs in custom timer used by the PU for getting time

Parameters:
aTimer - - custom timer to use

setContainers

public void setContainers(java.util.LinkedList processorList)
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline

disableCasProcessor

public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.

Parameters:
aCasProcessorIndex - - location in the pipeline of the Cas Processor to delete

disableCasProcessor

public void disableCasProcessor(java.lang.String aCasProcessorName)
Alternative method to disable Cas Processor. Uses a name to locate it.

Parameters:
aCasProcessorName - - a name of the Cas Processor to disable

enableCasProcessor

public void enableCasProcessor(java.lang.String aCasProcessorName)
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.

Parameters:
aCasProcessorName - - name of the Cas Processor to enable

run

public void run()
Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.

Specified by:
run in interface java.lang.Runnable
Overrides:
run in class java.lang.Thread

consumeQueue

public boolean consumeQueue()
Consumes the input queue to make sure all bundles still there get processede before CPE terminates.


processNext

protected boolean processNext(java.lang.Object[] aCasObjectList,
                              ProcessTrace pTrTemp)
                       throws ResourceProcessException,
                              java.io.IOException,
                              CollectionException,
                              AbortCPMException,
                              KillPipelineException
Executes the processing pipeline. Given bundle of Cas instances is processed by each Cas Processor in the pipeline. Conversions between different types of Cas Processors is done on the fly. Two types of Cas Processors are currently supported:
  • CasDataProcessor
  • CasObjectProcessor
  • The first operates on instances of CasData the latter operates on instances of CAS. The results produced by Cas Processors are added to the output queue.

    Parameters:
    - - aCasObjectList - bundle of Cas to analyze
    - - pTrTemp - object used to aggregate stats
    Throws:
    ResourceProcessException
    java.io.IOException
    CollectionException
    AbortCPMException
    KillPipelineException

    notifyListeners

    protected void notifyListeners(java.lang.Object aCas,
                                   boolean isCasObject,
                                   EntityProcessStatus aEntityProcStatus)
    Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es

    Parameters:
    aCas - - object containing an array of OR a single instance of Cas
    isCasObject - - true if instance of Cas is of type Cas, false otherwise
    aEntityProcStatus - - status object that may contain exceptions and trace

    doNotifyListeners

    protected void doNotifyListeners(java.lang.Object aCas,
                                     boolean isCasObject,
                                     EntityProcessStatus aEntityProcStatus)
    Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.

    Parameters:
    aCas - - Cas to pass to listener
    isCasObject - - true is Cas is of type CAS
    aEntityProcStatus - - status object containing exceptions and trace info

    setReleaseCASFlag

    public void setReleaseCASFlag(boolean aFlag)
    Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.

    Parameters:
    aFlag - - true if this thread should release a CAS when analysis is complete

    stopCasProcessors

    public void stopCasProcessors(boolean kill)
    Stops all Cas Processors that are part of this PU.

    Parameters:
    kill - - true if CPE has been stopped before finishing processing during external stop

    endOfProcessingReached

    protected boolean endOfProcessingReached(long aCount)
    Returns true if the CPM has finished analyzing the collection.

    Parameters:
    aCount - - running total of documents processed so far
    Returns:
    - true if CPM has processed all docs, false otherwise

    process

    protected void process(java.lang.Object anArtifact)
    Parameters:
    anArtifact -

    showMetadata

    protected void showMetadata(java.lang.Object[] aCasList)
    Parameters:
    aCasList -

    isProcessorReady

    protected boolean isProcessorReady(int aStatus)
    Check if the CASProcessor status is available for processing


    getBytes

    protected long getBytes(java.lang.Object aCas)
    Returns the size of the CAS object. Currently only CASData is supported.

    Parameters:
    aCas - - Cas to get the size for
    Returns:

    setCasPool

    public void setCasPool(CPECasPool aPool)
    Parameters:
    tcasPool -

    analyze

    protected boolean analyze(java.lang.Object[] aCasObjectList,
                              ProcessTrace pTrTemp)
                       throws java.lang.Exception
    An alternate processing loop designed for the single-threaded CPM.

    Parameters:
    aCasObjectList - - a list of CASes to analyze
    pTrTemp - - process trace where statistics are added during analysis
    Throws:
    java.lang.Exception


    Copyright © 2012. All Rights Reserved.