001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.sql.PreparedStatement;
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    import java.util.ArrayList;
025    import java.util.HashSet;
026    import java.util.LinkedList;
027    import java.util.Set;
028    import java.util.concurrent.locks.ReadWriteLock;
029    import java.util.concurrent.locks.ReentrantReadWriteLock;
030    
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.MessageId;
033    import org.apache.activemq.command.ProducerId;
034    import org.apache.activemq.command.SubscriptionInfo;
035    import org.apache.activemq.store.jdbc.JDBCAdapter;
036    import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
037    import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
038    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
039    import org.apache.activemq.store.jdbc.Statements;
040    import org.apache.activemq.store.jdbc.TransactionContext;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
046     * encouraged to override the default implementation of methods to account for differences in JDBC Driver
047     * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
048     * The databases/JDBC drivers that use this adapter are:
049     * <ul>
050     * <li></li>
051     * </ul>
052     * 
053     * @org.apache.xbean.XBean element="defaultJDBCAdapter"
054     * 
055     * 
056     */
057    public class DefaultJDBCAdapter implements JDBCAdapter {
058        private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
059        public static final int MAX_ROWS = 10000;
060        protected Statements statements;
061        protected boolean batchStatments = true;
062        protected boolean prioritizedMessages;
063        protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
064        // needs to be min twice the prefetch for a durable sub and large enough for selector range
065        protected int maxRows = MAX_ROWS;
066    
067        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
068            s.setBytes(index, data);
069        }
070    
071        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
072            return rs.getBytes(index);
073        }
074    
075        public void doCreateTables(TransactionContext c) throws SQLException, IOException {
076            Statement s = null;
077            cleanupExclusiveLock.writeLock().lock();
078            try {
079                // Check to see if the table already exists. If it does, then don't
080                // log warnings during startup.
081                // Need to run the scripts anyways since they may contain ALTER
082                // statements that upgrade a previous version
083                // of the table
084                boolean alreadyExists = false;
085                ResultSet rs = null;
086                try {
087                    rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
088                            new String[] { "TABLE" });
089                    alreadyExists = rs.next();
090                } catch (Throwable ignore) {
091                } finally {
092                    close(rs);
093                }
094                s = c.getConnection().createStatement();
095                String[] createStatments = this.statements.getCreateSchemaStatements();
096                for (int i = 0; i < createStatments.length; i++) {
097                    // This will fail usually since the tables will be
098                    // created already.
099                    try {
100                        LOG.debug("Executing SQL: " + createStatments[i]);
101                        s.execute(createStatments[i]);
102                    } catch (SQLException e) {
103                        if (alreadyExists) {
104                            LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
105                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
106                                    + " Vendor code: " + e.getErrorCode());
107                        } else {
108                            LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
109                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110                                    + " Vendor code: " + e.getErrorCode());
111                            JDBCPersistenceAdapter.log("Failure details: ", e);
112                        }
113                    }
114                }
115                c.getConnection().commit();
116            } finally {
117                cleanupExclusiveLock.writeLock().unlock();
118                try {
119                    s.close();
120                } catch (Throwable e) {
121                }
122            }
123        }
124    
125        public void doDropTables(TransactionContext c) throws SQLException, IOException {
126            Statement s = null;
127            cleanupExclusiveLock.writeLock().lock();
128            try {
129                s = c.getConnection().createStatement();
130                String[] dropStatments = this.statements.getDropSchemaStatements();
131                for (int i = 0; i < dropStatments.length; i++) {
132                    // This will fail usually since the tables will be
133                    // created already.
134                    try {
135                        LOG.debug("Executing SQL: " + dropStatments[i]);
136                        s.execute(dropStatments[i]);
137                    } catch (SQLException e) {
138                        LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
139                                + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
140                                + e.getErrorCode());
141                        JDBCPersistenceAdapter.log("Failure details: ", e);
142                    }
143                }
144                c.getConnection().commit();
145            } finally {
146                cleanupExclusiveLock.writeLock().unlock();
147                try {
148                    s.close();
149                } catch (Throwable e) {
150                }
151            }
152        }
153    
154        public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
155            PreparedStatement s = null;
156            ResultSet rs = null;
157            cleanupExclusiveLock.readLock().lock();
158            try {
159                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
160                rs = s.executeQuery();
161                long seq1 = 0;
162                if (rs.next()) {
163                    seq1 = rs.getLong(1);
164                }
165                rs.close();
166                s.close();
167                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
168                rs = s.executeQuery();
169                long seq2 = 0;
170                if (rs.next()) {
171                    seq2 = rs.getLong(1);
172                }
173                long seq = Math.max(seq1, seq2);
174                return seq;
175            } finally {
176                cleanupExclusiveLock.readLock().unlock();
177                close(rs);
178                close(s);
179            }
180        }
181        
182        public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
183            PreparedStatement s = null;
184            ResultSet rs = null;
185            cleanupExclusiveLock.readLock().lock();
186            try {
187                s = c.getConnection().prepareStatement(
188                        this.statements.getFindMessageByIdStatement());
189                s.setLong(1, storeSequenceId);
190                rs = s.executeQuery();
191                if (!rs.next()) {
192                    return null;
193                }
194                return getBinaryData(rs, 1);
195            } finally {
196                cleanupExclusiveLock.readLock().unlock();
197                close(rs);
198                close(s);
199            }
200        }
201        
202    
203        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
204                long expiration, byte priority) throws SQLException, IOException {
205            PreparedStatement s = c.getAddMessageStatement();
206            cleanupExclusiveLock.readLock().lock();
207            try {
208                if (s == null) {
209                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
210                    if (this.batchStatments) {
211                        c.setAddMessageStatement(s);
212                    }
213                }
214                s.setLong(1, sequence);
215                s.setString(2, messageID.getProducerId().toString());
216                s.setLong(3, messageID.getProducerSequenceId());
217                s.setString(4, destination.getQualifiedName());
218                s.setLong(5, expiration);
219                s.setLong(6, priority);
220                setBinaryData(s, 7, data);
221                if (this.batchStatments) {
222                    s.addBatch();
223                } else if (s.executeUpdate() != 1) {
224                    throw new SQLException("Failed add a message");
225                }
226            } finally {
227                cleanupExclusiveLock.readLock().unlock();
228                if (!this.batchStatments) {
229                    if (s != null) {
230                        s.close();
231                    }
232                }
233            }
234        }
235    
236        public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
237                long expirationTime, String messageRef) throws SQLException, IOException {
238            PreparedStatement s = c.getAddMessageStatement();
239            cleanupExclusiveLock.readLock().lock();
240            try {
241                if (s == null) {
242                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
243                    if (this.batchStatments) {
244                        c.setAddMessageStatement(s);
245                    }
246                }
247                s.setLong(1, messageID.getBrokerSequenceId());
248                s.setString(2, messageID.getProducerId().toString());
249                s.setLong(3, messageID.getProducerSequenceId());
250                s.setString(4, destination.getQualifiedName());
251                s.setLong(5, expirationTime);
252                s.setString(6, messageRef);
253                if (this.batchStatments) {
254                    s.addBatch();
255                } else if (s.executeUpdate() != 1) {
256                    throw new SQLException("Failed add a message");
257                }
258            } finally {
259                cleanupExclusiveLock.readLock().unlock();
260                if (!this.batchStatments) {
261                    s.close();
262                }
263            }
264        }
265    
266        public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
267            PreparedStatement s = null;
268            ResultSet rs = null;
269            cleanupExclusiveLock.readLock().lock();
270            try {
271                s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
272                s.setString(1, messageID.getProducerId().toString());
273                s.setLong(2, messageID.getProducerSequenceId());
274                s.setString(3, destination.getQualifiedName());
275                rs = s.executeQuery();
276                if (!rs.next()) {
277                    return new long[]{0,0};
278                }
279                return new long[]{rs.getLong(1), rs.getLong(2)};
280            } finally {
281                cleanupExclusiveLock.readLock().unlock();
282                close(rs);
283                close(s);
284            }
285        }
286    
287        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
288            PreparedStatement s = null;
289            ResultSet rs = null;
290            cleanupExclusiveLock.readLock().lock();
291            try {
292                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
293                s.setString(1, id.getProducerId().toString());
294                s.setLong(2, id.getProducerSequenceId());
295                rs = s.executeQuery();
296                if (!rs.next()) {
297                    return null;
298                }
299                return getBinaryData(rs, 1);
300            } finally {
301                cleanupExclusiveLock.readLock().unlock();
302                close(rs);
303                close(s);
304            }
305        }
306    
307        public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
308            PreparedStatement s = null;
309            ResultSet rs = null;
310            cleanupExclusiveLock.readLock().lock();
311            try {
312                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
313                s.setLong(1, seq);
314                rs = s.executeQuery();
315                if (!rs.next()) {
316                    return null;
317                }
318                return rs.getString(1);
319            } finally {
320                cleanupExclusiveLock.readLock().unlock();
321                close(rs);
322                close(s);
323            }
324        }
325    
326        public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
327            PreparedStatement s = c.getRemovedMessageStatement();
328            cleanupExclusiveLock.readLock().lock();
329            try {
330                if (s == null) {
331                    s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
332                    if (this.batchStatments) {
333                        c.setRemovedMessageStatement(s);
334                    }
335                }
336                s.setLong(1, seq);
337                if (this.batchStatments) {
338                    s.addBatch();
339                } else if (s.executeUpdate() != 1) {
340                    throw new SQLException("Failed to remove message");
341                }
342            } finally {
343                cleanupExclusiveLock.readLock().unlock();
344                if (!this.batchStatments && s != null) {
345                    s.close();
346                }
347            }
348        }
349    
350        public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
351                throws Exception {
352            PreparedStatement s = null;
353            ResultSet rs = null;
354            cleanupExclusiveLock.readLock().lock();
355            try {
356                s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
357                s.setString(1, destination.getQualifiedName());
358                rs = s.executeQuery();
359                if (this.statements.isUseExternalMessageReferences()) {
360                    while (rs.next()) {
361                        if (!listener.recoverMessageReference(rs.getString(2))) {
362                            break;
363                        }
364                    }
365                } else {
366                    while (rs.next()) {
367                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
368                            break;
369                        }
370                    }
371                }
372            } finally {
373                cleanupExclusiveLock.readLock().unlock();
374                close(rs);
375                close(s);
376            }
377        }
378    
379        public void doMessageIdScan(TransactionContext c, int limit, 
380                JDBCMessageIdScanListener listener) throws SQLException, IOException {
381            PreparedStatement s = null;
382            ResultSet rs = null;
383            cleanupExclusiveLock.readLock().lock();
384            try {
385                s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
386                s.setMaxRows(limit);
387                rs = s.executeQuery();
388                // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
389                LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
390                while (rs.next()) {
391                    reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
392                }
393                if (LOG.isDebugEnabled()) {
394                    LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
395                }
396                for (MessageId id : reverseOrderIds) {
397                    listener.messageId(id);
398                }
399            } finally {
400                cleanupExclusiveLock.readLock().unlock();
401                close(rs);
402                close(s);
403            }
404        }
405        
406        public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
407                String subscriptionName, long seq, long prio) throws SQLException, IOException {
408            PreparedStatement s = c.getUpdateLastAckStatement();
409            cleanupExclusiveLock.readLock().lock();
410            try {
411                if (s == null) {
412                    s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
413                    if (this.batchStatments) {
414                        c.setUpdateLastAckStatement(s);
415                    }
416                }
417                s.setLong(1, seq);
418                s.setString(2, destination.getQualifiedName());
419                s.setString(3, clientId);
420                s.setString(4, subscriptionName);
421                s.setLong(5, prio);
422                if (this.batchStatments) {
423                    s.addBatch();
424                } else if (s.executeUpdate() != 1) {
425                    throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
426                }
427            } finally {
428                cleanupExclusiveLock.readLock().unlock();
429                if (!this.batchStatments) {
430                    close(s);
431                }
432            }
433        }
434    
435    
436        public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
437                                            String subscriptionName, long seq, long priority) throws SQLException, IOException {
438            PreparedStatement s = c.getUpdateLastAckStatement();
439            cleanupExclusiveLock.readLock().lock();
440            try {
441                if (s == null) {
442                    s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
443                    if (this.batchStatments) {
444                        c.setUpdateLastAckStatement(s);
445                    }
446                }
447                s.setLong(1, seq);
448                s.setString(2, destination.getQualifiedName());
449                s.setString(3, clientId);
450                s.setString(4, subscriptionName);
451    
452                if (this.batchStatments) {
453                    s.addBatch();
454                } else if (s.executeUpdate() != 1) {
455                    throw new IOException("Could not update last ack seq : "
456                                + seq + ", for sub: " + subscriptionName);
457                }
458            } finally {
459                cleanupExclusiveLock.readLock().unlock();
460                if (!this.batchStatments) {
461                    close(s);
462                }            
463            }
464        }
465    
466        public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
467                String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
468            // dumpTables(c,
469            // destination.getQualifiedName(),clientId,subscriptionName);
470            PreparedStatement s = null;
471            ResultSet rs = null;
472            cleanupExclusiveLock.readLock().lock();
473            try {
474                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
475                s.setString(1, destination.getQualifiedName());
476                s.setString(2, clientId);
477                s.setString(3, subscriptionName);
478                rs = s.executeQuery();
479                if (this.statements.isUseExternalMessageReferences()) {
480                    while (rs.next()) {
481                        if (!listener.recoverMessageReference(rs.getString(2))) {
482                            break;
483                        }
484                    }
485                } else {
486                    while (rs.next()) {
487                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
488                            break;
489                        }
490                    }
491                }
492            } finally {
493                cleanupExclusiveLock.readLock().unlock();
494                close(rs);
495                close(s);
496            }
497        }
498    
499        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
500                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
501            
502            PreparedStatement s = null;
503            ResultSet rs = null;
504            cleanupExclusiveLock.readLock().lock();
505            try {
506                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
507                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
508                s.setString(1, destination.getQualifiedName());
509                s.setString(2, clientId);
510                s.setString(3, subscriptionName);
511                s.setLong(4, seq);
512                rs = s.executeQuery();
513                int count = 0;
514                if (this.statements.isUseExternalMessageReferences()) {
515                    while (rs.next() && count < maxReturned) {
516                        if (listener.recoverMessageReference(rs.getString(1))) {
517                            count++;
518                        }
519                    }
520                } else {
521                    while (rs.next() && count < maxReturned) {
522                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
523                            count++;
524                        }
525                    }
526                }
527            } finally {
528                cleanupExclusiveLock.readLock().unlock();
529                close(rs);
530                close(s);
531            }
532        }
533    
534        public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
535                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
536    
537            PreparedStatement s = null;
538            ResultSet rs = null;
539            cleanupExclusiveLock.readLock().lock();
540            try {
541                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
542                s.setMaxRows(maxRows);
543                s.setString(1, destination.getQualifiedName());
544                s.setString(2, clientId);
545                s.setString(3, subscriptionName);
546                s.setLong(4, seq);
547                s.setLong(5, priority);
548                rs = s.executeQuery();
549                int count = 0;
550                if (this.statements.isUseExternalMessageReferences()) {
551                    while (rs.next() && count < maxReturned) {
552                        if (listener.recoverMessageReference(rs.getString(1))) {
553                            count++;
554                        }
555                    }
556                } else {
557                    while (rs.next() && count < maxReturned) {
558                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
559                            count++;
560                        }
561                    }
562                }
563            } finally {
564                cleanupExclusiveLock.readLock().unlock();
565                close(rs);
566                close(s);
567            }
568        }
569    
570        public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
571                String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
572            PreparedStatement s = null;
573            ResultSet rs = null;
574            int result = 0;
575            cleanupExclusiveLock.readLock().lock();
576            try {
577                if (isPrioritizedMessages) {
578                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
579                } else {
580                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
581                }
582                s.setString(1, destination.getQualifiedName());
583                s.setString(2, clientId);
584                s.setString(3, subscriptionName);
585                rs = s.executeQuery();
586                if (rs.next()) {
587                    result = rs.getInt(1);
588                }
589            } finally {
590                cleanupExclusiveLock.readLock().unlock();
591                close(rs);
592                close(s);
593            }
594            return result;
595        }
596    
597        /**
598         * @param c 
599         * @param info 
600         * @param retroactive 
601         * @throws SQLException 
602         * @throws IOException 
603         */
604        public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
605                throws SQLException, IOException {
606            // dumpTables(c, destination.getQualifiedName(), clientId,
607            // subscriptionName);
608            PreparedStatement s = null;
609            cleanupExclusiveLock.readLock().lock();
610            try {
611                long lastMessageId = -1;
612                if (!retroactive) {
613                    s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
614                    ResultSet rs = null;
615                    try {
616                        rs = s.executeQuery();
617                        if (rs.next()) {
618                            lastMessageId = rs.getLong(1);
619                        }
620                    } finally {
621                        close(rs);
622                        close(s);
623                    }
624                }
625                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
626                int maxPriority = 1;
627                if (isPrioritizedMessages) {
628                    maxPriority = 10;
629                }
630    
631                for (int priority = 0; priority < maxPriority; priority++) {
632                    s.setString(1, info.getDestination().getQualifiedName());
633                    s.setString(2, info.getClientId());
634                    s.setString(3, info.getSubscriptionName());
635                    s.setString(4, info.getSelector());
636                    s.setLong(5, lastMessageId);
637                    s.setString(6, info.getSubscribedDestination().getQualifiedName());
638                    s.setLong(7, priority);
639    
640                    if (s.executeUpdate() != 1) {
641                        throw new IOException("Could not create durable subscription for: " + info.getClientId());
642                    }
643                }
644    
645            } finally {
646                cleanupExclusiveLock.readLock().unlock();
647                close(s);
648            }
649        }
650    
651        public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
652                String clientId, String subscriptionName) throws SQLException, IOException {
653            PreparedStatement s = null;
654            ResultSet rs = null;
655            cleanupExclusiveLock.readLock().lock();
656            try {
657                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
658                s.setString(1, destination.getQualifiedName());
659                s.setString(2, clientId);
660                s.setString(3, subscriptionName);
661                rs = s.executeQuery();
662                if (!rs.next()) {
663                    return null;
664                }
665                SubscriptionInfo subscription = new SubscriptionInfo();
666                subscription.setDestination(destination);
667                subscription.setClientId(clientId);
668                subscription.setSubscriptionName(subscriptionName);
669                subscription.setSelector(rs.getString(1));
670                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
671                        ActiveMQDestination.QUEUE_TYPE));
672                return subscription;
673            } finally {
674                cleanupExclusiveLock.readLock().unlock();
675                close(rs);
676                close(s);
677            }
678        }
679    
680        public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
681                throws SQLException, IOException {
682            PreparedStatement s = null;
683            ResultSet rs = null;
684            cleanupExclusiveLock.readLock().lock();
685            try {
686                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
687                s.setString(1, destination.getQualifiedName());
688                rs = s.executeQuery();
689                ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
690                while (rs.next()) {
691                    SubscriptionInfo subscription = new SubscriptionInfo();
692                    subscription.setDestination(destination);
693                    subscription.setSelector(rs.getString(1));
694                    subscription.setSubscriptionName(rs.getString(2));
695                    subscription.setClientId(rs.getString(3));
696                    subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
697                            ActiveMQDestination.QUEUE_TYPE));
698                    rc.add(subscription);
699                }
700                return rc.toArray(new SubscriptionInfo[rc.size()]);
701            } finally {
702                cleanupExclusiveLock.readLock().unlock();
703                close(rs);
704                close(s);
705            }
706        }
707    
708        public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
709                IOException {
710            PreparedStatement s = null;
711            cleanupExclusiveLock.readLock().lock();
712            try {
713                s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
714                s.setString(1, destinationName.getQualifiedName());
715                s.executeUpdate();
716                s.close();
717                s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
718                s.setString(1, destinationName.getQualifiedName());
719                s.executeUpdate();
720            } finally {
721                cleanupExclusiveLock.readLock().unlock();
722                close(s);
723            }
724        }
725    
726        public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
727                String subscriptionName) throws SQLException, IOException {
728            PreparedStatement s = null;
729            cleanupExclusiveLock.readLock().lock();
730            try {
731                s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
732                s.setString(1, destination.getQualifiedName());
733                s.setString(2, clientId);
734                s.setString(3, subscriptionName);
735                s.executeUpdate();
736            } finally {
737                cleanupExclusiveLock.readLock().unlock();
738                close(s);
739            }
740        }
741    
742        public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
743            PreparedStatement s = null;
744            cleanupExclusiveLock.writeLock().lock();
745            try {
746                if (isPrioritizedMessages) {
747                    LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
748                    s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
749                } else {
750                    LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
751                    s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
752                }
753                s.setLong(1, System.currentTimeMillis());
754                int i = s.executeUpdate();
755                LOG.debug("Deleted " + i + " old message(s).");
756            } finally {
757                cleanupExclusiveLock.writeLock().unlock();
758                close(s);
759            }
760        }
761    
762        public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
763                String clientId, String subscriberName) throws SQLException, IOException {
764            PreparedStatement s = null;
765            ResultSet rs = null;
766            long result = -1;
767            cleanupExclusiveLock.readLock().lock();
768            try {
769                s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
770                s.setString(1, destination.getQualifiedName());
771                s.setString(2, clientId);
772                s.setString(3, subscriberName);
773                rs = s.executeQuery();
774                if (rs.next()) {
775                    result = rs.getLong(1);
776                }
777            } finally {
778                cleanupExclusiveLock.readLock().unlock();
779                close(rs);
780                close(s);
781            }
782            return result;
783        }
784    
785        private static void close(PreparedStatement s) {
786            try {
787                s.close();
788            } catch (Throwable e) {
789            }
790        }
791    
792        private static void close(ResultSet rs) {
793            try {
794                rs.close();
795            } catch (Throwable e) {
796            }
797        }
798    
799        public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
800            HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
801            PreparedStatement s = null;
802            ResultSet rs = null;
803            cleanupExclusiveLock.readLock().lock();
804            try {
805                s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
806                rs = s.executeQuery();
807                while (rs.next()) {
808                    rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
809                }
810            } finally {
811                cleanupExclusiveLock.readLock().unlock();
812                close(rs);
813                close(s);
814            }
815            return rc;
816        }
817    
818        /**
819         * @return true if batchStements
820         */
821        public boolean isBatchStatments() {
822            return this.batchStatments;
823        }
824    
825        /**
826         * @param batchStatments
827         */
828        public void setBatchStatments(boolean batchStatments) {
829            this.batchStatments = batchStatments;
830        }
831    
832        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
833            this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
834        }
835    
836        /**
837         * @return the statements
838         */
839        public Statements getStatements() {
840            return this.statements;
841        }
842    
843        public void setStatements(Statements statements) {
844            this.statements = statements;
845        }
846    
847        public int getMaxRows() {
848            return maxRows;
849        }
850    
851        public void setMaxRows(int maxRows) {
852            this.maxRows = maxRows;
853        }    
854    
855        /**
856         * @param c
857         * @param destination
858         * @param clientId
859         * @param subscriberName
860         * @return
861         * @throws SQLException
862         * @throws IOException
863         */
864        public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
865                String clientId, String subscriberName) throws SQLException, IOException {
866            PreparedStatement s = null;
867            ResultSet rs = null;
868            cleanupExclusiveLock.readLock().lock();
869            try {
870                s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
871                s.setString(1, destination.getQualifiedName());
872                s.setString(2, clientId);
873                s.setString(3, subscriberName);
874                rs = s.executeQuery();
875                if (!rs.next()) {
876                    return null;
877                }
878                return getBinaryData(rs, 1);
879            } finally {
880                close(rs);
881                cleanupExclusiveLock.readLock().unlock();
882                close(s);
883            }
884        }
885    
886        public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
887                IOException {
888            PreparedStatement s = null;
889            ResultSet rs = null;
890            int result = 0;
891            cleanupExclusiveLock.readLock().lock();
892            try {
893                s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
894                s.setString(1, destination.getQualifiedName());
895                rs = s.executeQuery();
896                if (rs.next()) {
897                    result = rs.getInt(1);
898                }
899            } finally {
900                cleanupExclusiveLock.readLock().unlock();
901                close(rs);
902                close(s);
903            }
904            return result;
905        }
906    
907        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
908                long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
909            PreparedStatement s = null;
910            ResultSet rs = null;
911            cleanupExclusiveLock.readLock().lock();
912            try {
913                if (isPrioritizedMessages) {
914                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
915                } else {
916                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
917                }
918                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
919                s.setString(1, destination.getQualifiedName());
920                s.setLong(2, nextSeq);
921                if (isPrioritizedMessages) {
922                    s.setLong(3, priority);
923                    s.setLong(4, priority);
924                }
925                rs = s.executeQuery();
926                int count = 0;
927                if (this.statements.isUseExternalMessageReferences()) {
928                    while (rs.next() && count < maxReturned) {
929                        if (listener.recoverMessageReference(rs.getString(1))) {
930                            count++;
931                        } else {
932                            LOG.debug("Stopped recover next messages");
933                            break;
934                        }
935                    }
936                } else {
937                    while (rs.next() && count < maxReturned) {
938                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
939                            count++;
940                        } else {
941                            LOG.debug("Stopped recover next messages");
942                            break;
943                        }
944                    }
945                }
946            } catch (Exception e) {
947                e.printStackTrace();
948            } finally {
949                cleanupExclusiveLock.readLock().unlock();
950                close(rs);
951                close(s);
952            }
953        }
954        
955    /*    public void dumpTables(Connection c, String destinationName, String clientId, String
956          subscriptionName) throws SQLException { 
957            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
958            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
959            PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
960                    + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
961                    + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
962                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
963                    + " ORDER BY M.ID");
964          s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
965          printQuery(s,System.out); }
966    
967        public void dumpTables(Connection c) throws SQLException {
968            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
969            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
970        }
971    
972        private void printQuery(Connection c, String query, PrintStream out)
973                throws SQLException {
974            printQuery(c.prepareStatement(query), out);
975        }
976    
977        private void printQuery(PreparedStatement s, PrintStream out)
978                throws SQLException {
979    
980            ResultSet set = null;
981            try {
982                set = s.executeQuery();
983                ResultSetMetaData metaData = set.getMetaData();
984                for (int i = 1; i <= metaData.getColumnCount(); i++) {
985                    if (i == 1)
986                        out.print("||");
987                    out.print(metaData.getColumnName(i) + "||");
988                }
989                out.println();
990                while (set.next()) {
991                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
992                        if (i == 1)
993                            out.print("|");
994                        out.print(set.getString(i) + "|");
995                    }
996                    out.println();
997                }
998            } finally {
999                try {
1000                    set.close();
1001                } catch (Throwable ignore) {
1002                }
1003                try {
1004                    s.close();
1005                } catch (Throwable ignore) {
1006                }
1007            }
1008        }  */  
1009    
1010        public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1011                throws SQLException, IOException {
1012            PreparedStatement s = null;
1013            ResultSet rs = null;
1014            cleanupExclusiveLock.readLock().lock();
1015            try {
1016                s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1017                s.setString(1, id.toString());
1018                rs = s.executeQuery();
1019                long seq = -1;
1020                if (rs.next()) {
1021                    seq = rs.getLong(1);
1022                }
1023                return seq;
1024            } finally {
1025                cleanupExclusiveLock.readLock().unlock();
1026                close(rs);
1027                close(s);
1028            }
1029        }
1030    
1031    }