001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import javax.jms.Connection;
020    import javax.jms.ConnectionConsumer;
021    import javax.jms.ConnectionMetaData;
022    import javax.jms.Destination;
023    import javax.jms.ExceptionListener;
024    import javax.jms.JMSException;
025    import javax.jms.Queue;
026    import javax.jms.QueueConnection;
027    import javax.jms.QueueSession;
028    import javax.jms.ServerSessionPool;
029    import javax.jms.Session;
030    import javax.jms.Topic;
031    import javax.jms.TopicConnection;
032    import javax.jms.TopicSession;
033    
034    import org.apache.activemq.ActiveMQConnection;
035    import org.apache.activemq.ActiveMQSession;
036    import org.apache.activemq.AlreadyClosedException;
037    import org.apache.activemq.EnhancedConnection;
038    import org.apache.activemq.advisory.DestinationSource;
039    
040    /**
041     * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
042     * {@link QueueConnection} which is pooled and on {@link #close()} will return
043     * itself to the sessionPool.
044     * 
045     * <b>NOTE</b> this implementation is only intended for use when sending
046     * messages. It does not deal with pooling of consumers; for that look at a
047     * library like <a href="http://jencks.org/">Jencks</a> such as in <a
048     * href="http://jencks.org/Message+Driven+POJOs">this example</a>
049     * 
050     * 
051     */
052    public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
053    
054        private ConnectionPool pool;
055        private boolean stopped;
056    
057        public PooledConnection(ConnectionPool pool) {
058            this.pool = pool;
059            this.pool.incrementReferenceCount();
060        }
061    
062        /**
063         * Factory method to create a new instance.
064         */
065        public PooledConnection newInstance() {
066            return new PooledConnection(pool);
067        }
068    
069        public void close() throws JMSException {
070            if (this.pool != null) {
071                this.pool.decrementReferenceCount();
072                this.pool = null;
073            }
074        }
075    
076        public void start() throws JMSException {
077            assertNotClosed();
078            pool.start();
079        }
080    
081        public void stop() throws JMSException {
082            stopped = true;
083        }
084    
085        public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
086                                                           ServerSessionPool serverSessionPool, int maxMessages)
087            throws JMSException {
088            return getConnection()
089                .createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
090        }
091    
092        public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
093                                                           ServerSessionPool serverSessionPool, int maxMessages)
094            throws JMSException {
095            return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
096        }
097    
098        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1,
099                                                                  ServerSessionPool serverSessionPool, int i)
100            throws JMSException {
101            return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
102        }
103    
104        public String getClientID() throws JMSException {
105            return getConnection().getClientID();
106        }
107    
108        public ExceptionListener getExceptionListener() throws JMSException {
109            return getConnection().getExceptionListener();
110        }
111    
112        public ConnectionMetaData getMetaData() throws JMSException {
113            return getConnection().getMetaData();
114        }
115    
116        public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
117            getConnection().setExceptionListener(exceptionListener);
118        }
119    
120        public void setClientID(String clientID) throws JMSException {
121            getConnection().setClientID(clientID);
122        }
123    
124        public ConnectionConsumer createConnectionConsumer(Queue queue, String selector,
125                                                           ServerSessionPool serverSessionPool, int maxMessages)
126            throws JMSException {
127            return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
128        }
129    
130        // Session factory methods
131        // -------------------------------------------------------------------------
132        public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
133            return (QueueSession)createSession(transacted, ackMode);
134        }
135    
136        public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
137            return (TopicSession)createSession(transacted, ackMode);
138        }
139    
140        public Session createSession(boolean transacted, int ackMode) throws JMSException {
141            return pool.createSession(transacted, ackMode);
142        }
143    
144        // EnhancedCollection API
145        // -------------------------------------------------------------------------
146        
147        public DestinationSource getDestinationSource() throws JMSException {
148            return getConnection().getDestinationSource();
149        }
150    
151        // Implementation methods
152        // -------------------------------------------------------------------------
153    
154        public ActiveMQConnection getConnection() throws JMSException {
155            assertNotClosed();
156            return pool.getConnection();
157        }
158    
159        protected void assertNotClosed() throws AlreadyClosedException {
160            if (stopped || pool == null) {
161                throw new AlreadyClosedException();
162            }
163        }
164    
165        protected ActiveMQSession createSession(SessionKey key) throws JMSException {
166            return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode());
167        }
168    
169        public String toString() {
170            return "PooledConnection { " + pool + " }";
171        }
172    
173    }