001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    import java.util.Collections;
024    import java.util.Set;
025    import java.util.concurrent.ScheduledFuture;
026    import java.util.concurrent.ScheduledThreadPoolExecutor;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.TimeUnit;
029    
030    import javax.sql.DataSource;
031    
032    import org.apache.activemq.ActiveMQMessageAudit;
033    import org.apache.activemq.broker.BrokerService;
034    import org.apache.activemq.broker.BrokerServiceAware;
035    import org.apache.activemq.broker.ConnectionContext;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ActiveMQQueue;
038    import org.apache.activemq.command.ActiveMQTopic;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.command.ProducerId;
042    import org.apache.activemq.openwire.OpenWireFormat;
043    import org.apache.activemq.store.MessageStore;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.TopicMessageStore;
046    import org.apache.activemq.store.TransactionStore;
047    import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048    import org.apache.activemq.store.memory.MemoryTransactionStore;
049    import org.apache.activemq.usage.SystemUsage;
050    import org.apache.activemq.util.ByteSequence;
051    import org.apache.activemq.util.FactoryFinder;
052    import org.apache.activemq.util.IOExceptionSupport;
053    import org.apache.activemq.util.LongSequenceGenerator;
054    import org.apache.activemq.wireformat.WireFormat;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    /**
059     * A {@link PersistenceAdapter} implementation using JDBC for persistence
060     * storage.
061     * 
062     * This persistence adapter will correctly remember prepared XA transactions,
063     * but it will not keep track of local transaction commits so that operations
064     * performed against the Message store are done as a single uow.
065     * 
066     * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067     * 
068     * 
069     */
070    public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071        BrokerServiceAware {
072    
073        private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074        private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075                                                                       "META-INF/services/org/apache/activemq/store/jdbc/");
076        private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077                                                                        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078    
079        private WireFormat wireFormat = new OpenWireFormat();
080        private BrokerService brokerService;
081        private Statements statements;
082        private JDBCAdapter adapter;
083        private MemoryTransactionStore transactionStore;
084        private ScheduledThreadPoolExecutor clockDaemon;
085        private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086        private int cleanupPeriod = 1000 * 60 * 5;
087        private boolean useExternalMessageReferences;
088        private boolean useDatabaseLock = true;
089        private long lockKeepAlivePeriod = 1000*30;
090        private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091        private DatabaseLocker databaseLocker;
092        private boolean createTablesOnStartup = true;
093        private DataSource lockDataSource;
094        private int transactionIsolation;
095        
096        protected int maxProducersToAudit=1024;
097        protected int maxAuditDepth=1000;
098        protected boolean enableAudit=false;
099        protected int auditRecoveryDepth = 1024;
100        protected ActiveMQMessageAudit audit;
101        
102        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
103        protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
104    
105        public JDBCPersistenceAdapter() {
106        }
107    
108        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
109            super(ds);
110            this.wireFormat = wireFormat;
111        }
112    
113        public Set<ActiveMQDestination> getDestinations() {
114            // Get a connection and insert the message into the DB.
115            TransactionContext c = null;
116            try {
117                c = getTransactionContext();
118                return getAdapter().doGetDestinations(c);
119            } catch (IOException e) {
120                return emptyDestinationSet();
121            } catch (SQLException e) {
122                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123                return emptyDestinationSet();
124            } finally {
125                if (c != null) {
126                    try {
127                        c.close();
128                    } catch (Throwable e) {
129                    }
130                }
131            }
132        }
133    
134        @SuppressWarnings("unchecked")
135        private Set<ActiveMQDestination> emptyDestinationSet() {
136            return Collections.EMPTY_SET;
137        }
138        
139        protected void createMessageAudit() {
140            if (enableAudit && audit == null) {
141                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142                TransactionContext c = null;
143                
144                try {
145                    c = getTransactionContext();
146                    getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147                        public void messageId(MessageId id) {
148                            audit.isDuplicate(id);
149                        }
150                    });
151                } catch (Exception e) {
152                    LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153                } finally {
154                    if (c != null) {
155                        try {
156                            c.close();
157                        } catch (Throwable e) {
158                        }
159                    }
160                }
161            }
162        }
163        
164        public void initSequenceIdGenerator() {
165            TransactionContext c = null;
166            try {
167                c = getTransactionContext();
168                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169                    public void messageId(MessageId id) {
170                        audit.isDuplicate(id);
171                    }
172                });
173            } catch (Exception e) {
174                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175            } finally {
176                if (c != null) {
177                    try {
178                        c.close();
179                    } catch (Throwable e) {
180                    }
181                }
182            }
183            
184        }
185    
186        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187            MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188            if (transactionStore != null) {
189                rc = transactionStore.proxy(rc);
190            }
191            return rc;
192        }
193    
194        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195            TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196            if (transactionStore != null) {
197                rc = transactionStore.proxy(rc);
198            }
199            return rc;
200        }
201    
202        /**
203         * Cleanup method to remove any state associated with the given destination
204         * No state retained.... nothing to do
205         *
206         * @param destination Destination to forget
207         */
208        public void removeQueueMessageStore(ActiveMQQueue destination) {
209        }
210    
211        /**
212         * Cleanup method to remove any state associated with the given destination
213         * No state retained.... nothing to do
214         *
215         * @param destination Destination to forget
216         */
217        public void removeTopicMessageStore(ActiveMQTopic destination) {
218        }
219    
220        public TransactionStore createTransactionStore() throws IOException {
221            if (transactionStore == null) {
222                transactionStore = new MemoryTransactionStore(this);
223            }
224            return this.transactionStore;
225        }
226    
227        public long getLastMessageBrokerSequenceId() throws IOException {
228            TransactionContext c = getTransactionContext();
229            try {
230                long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
231                sequenceGenerator.setLastSequenceId(seq);
232                long brokerSeq = 0;
233                if (seq != 0) {
234                    byte[] msg = getAdapter().doGetMessageById(c, seq);
235                    if (msg != null) {
236                        Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
237                        brokerSeq = last.getMessageId().getBrokerSequenceId();
238                    } else {
239                       LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
240                    }
241                }
242                return brokerSeq;
243            } catch (SQLException e) {
244                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
245                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
246            } finally {
247                c.close();
248            }
249        }
250        
251        public long getLastProducerSequenceId(ProducerId id) throws IOException {
252            TransactionContext c = getTransactionContext();
253            try {
254                return getAdapter().doGetLastProducerSequenceId(c, id);
255            } catch (SQLException e) {
256                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
257                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
258            } finally {
259                c.close();
260            }
261        }
262    
263    
264        public void start() throws Exception {
265            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
266    
267            if (isCreateTablesOnStartup()) {
268                TransactionContext transactionContext = getTransactionContext();
269                transactionContext.begin();
270                try {
271                    try {
272                        getAdapter().doCreateTables(transactionContext);
273                    } catch (SQLException e) {
274                        LOG.warn("Cannot create tables due to: " + e);
275                        JDBCPersistenceAdapter.log("Failure Details: ", e);
276                    }
277                } finally {
278                    transactionContext.commit();
279                }
280            }
281    
282            if (isUseDatabaseLock()) {
283                DatabaseLocker service = getDatabaseLocker();
284                if (service == null) {
285                    LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
286                } else {
287                    service.start();
288                    if (lockKeepAlivePeriod > 0) {
289                        keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
290                            public void run() {
291                                databaseLockKeepAlive();
292                            }
293                        }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
294                    }
295                    if (brokerService != null) {
296                        brokerService.getBroker().nowMasterBroker();
297                    }
298                }
299            }
300    
301            cleanup();
302    
303            // Cleanup the db periodically.
304            if (cleanupPeriod > 0) {
305                cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
306                    public void run() {
307                        cleanup();
308                    }
309                }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
310            }
311            
312            createMessageAudit();
313        }
314    
315        public synchronized void stop() throws Exception {
316            if (cleanupTicket != null) {
317                cleanupTicket.cancel(true);
318                cleanupTicket = null;
319            }
320            if (keepAliveTicket != null) {
321                keepAliveTicket.cancel(false);
322                keepAliveTicket = null;
323            }
324            
325            // do not shutdown clockDaemon as it may kill the thread initiating shutdown
326            DatabaseLocker service = getDatabaseLocker();
327            if (service != null) {
328                service.stop();
329            }
330        }
331    
332        public void cleanup() {
333            TransactionContext c = null;
334            try {
335                LOG.debug("Cleaning up old messages.");
336                c = getTransactionContext();
337                getAdapter().doDeleteOldMessages(c, false);
338                getAdapter().doDeleteOldMessages(c, true);
339            } catch (IOException e) {
340                LOG.warn("Old message cleanup failed due to: " + e, e);
341            } catch (SQLException e) {
342                LOG.warn("Old message cleanup failed due to: " + e);
343                JDBCPersistenceAdapter.log("Failure Details: ", e);
344            } finally {
345                if (c != null) {
346                    try {
347                        c.close();
348                    } catch (Throwable e) {
349                    }
350                }
351                LOG.debug("Cleanup done.");
352            }
353        }
354    
355        public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
356            this.clockDaemon = clockDaemon;
357        }
358    
359        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
360            if (clockDaemon == null) {
361                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
362                    public Thread newThread(Runnable runnable) {
363                        Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
364                        thread.setDaemon(true);
365                        return thread;
366                    }
367                });
368            }
369            return clockDaemon;
370        }
371    
372        public JDBCAdapter getAdapter() throws IOException {
373            if (adapter == null) {
374                setAdapter(createAdapter());
375            }
376            return adapter;
377        }
378    
379        public DatabaseLocker getDatabaseLocker() throws IOException {
380            if (databaseLocker == null && isUseDatabaseLock()) {
381                setDatabaseLocker(loadDataBaseLocker());
382            }
383            return databaseLocker;
384        }
385    
386        /**
387         * Sets the database locker strategy to use to lock the database on startup
388         * @throws IOException 
389         */
390        public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
391            databaseLocker = locker;
392            databaseLocker.setPersistenceAdapter(this);
393            databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
394        }
395    
396        public DataSource getLockDataSource() throws IOException {
397            if (lockDataSource == null) {
398                lockDataSource = getDataSource();
399                if (lockDataSource == null) {
400                    throw new IllegalArgumentException(
401                            "No dataSource property has been configured");
402                }
403            } else {
404                LOG.info("Using a separate dataSource for locking: "
405                        + lockDataSource);
406            }
407            return lockDataSource;
408        }
409        
410        public void setLockDataSource(DataSource dataSource) {
411            this.lockDataSource = dataSource;
412        }
413    
414        public BrokerService getBrokerService() {
415            return brokerService;
416        }
417    
418        public void setBrokerService(BrokerService brokerService) {
419            this.brokerService = brokerService;
420        }
421    
422        /**
423         * @throws IOException
424         */
425        protected JDBCAdapter createAdapter() throws IOException {
426           
427            adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
428           
429            // Use the default JDBC adapter if the
430            // Database type is not recognized.
431            if (adapter == null) {
432                adapter = new DefaultJDBCAdapter();
433                LOG.debug("Using default JDBC Adapter: " + adapter);
434            }
435            return adapter;
436        }
437    
438        private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
439            Object adapter = null;
440            TransactionContext c = getTransactionContext();
441            try {
442                try {
443                    // Make the filename file system safe.
444                    String dirverName = c.getConnection().getMetaData().getDriverName();
445                    dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
446    
447                    try {
448                        adapter = finder.newInstance(dirverName);
449                        LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
450                    } catch (Throwable e) {
451                        LOG.info("Database " + kind + " driver override not found for : [" + dirverName
452                                 + "].  Will use default implementation.");
453                    }
454                } catch (SQLException e) {
455                    LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
456                              + e.getMessage());
457                    JDBCPersistenceAdapter.log("Failure Details: ", e);
458                }
459            } finally {
460                c.close();
461            }
462            return adapter;
463        }
464    
465        public void setAdapter(JDBCAdapter adapter) {
466            this.adapter = adapter;
467            this.adapter.setStatements(getStatements());
468            this.adapter.setMaxRows(getMaxRows());
469        }
470    
471        public WireFormat getWireFormat() {
472            return wireFormat;
473        }
474    
475        public void setWireFormat(WireFormat wireFormat) {
476            this.wireFormat = wireFormat;
477        }
478    
479        public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
480            if (context == null) {
481                return getTransactionContext();
482            } else {
483                TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
484                if (answer == null) {
485                    answer = getTransactionContext();
486                    context.setLongTermStoreContext(answer);
487                }
488                return answer;
489            }
490        }
491    
492        public TransactionContext getTransactionContext() throws IOException {
493            TransactionContext answer = new TransactionContext(this);
494            if (transactionIsolation > 0) {
495                answer.setTransactionIsolation(transactionIsolation);
496            }
497            return answer;
498        }
499    
500        public void beginTransaction(ConnectionContext context) throws IOException {
501            TransactionContext transactionContext = getTransactionContext(context);
502            transactionContext.begin();
503        }
504    
505        public void commitTransaction(ConnectionContext context) throws IOException {
506            TransactionContext transactionContext = getTransactionContext(context);
507            transactionContext.commit();
508        }
509    
510        public void rollbackTransaction(ConnectionContext context) throws IOException {
511            TransactionContext transactionContext = getTransactionContext(context);
512            transactionContext.rollback();
513        }
514    
515        public int getCleanupPeriod() {
516            return cleanupPeriod;
517        }
518    
519        /**
520         * Sets the number of milliseconds until the database is attempted to be
521         * cleaned up for durable topics
522         */
523        public void setCleanupPeriod(int cleanupPeriod) {
524            this.cleanupPeriod = cleanupPeriod;
525        }
526    
527        public void deleteAllMessages() throws IOException {
528            TransactionContext c = getTransactionContext();
529            try {
530                getAdapter().doDropTables(c);
531                getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
532                getAdapter().doCreateTables(c);
533                LOG.info("Persistence store purged.");
534            } catch (SQLException e) {
535                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
536                throw IOExceptionSupport.create(e);
537            } finally {
538                c.close();
539            }
540        }
541    
542        public boolean isUseExternalMessageReferences() {
543            return useExternalMessageReferences;
544        }
545    
546        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
547            this.useExternalMessageReferences = useExternalMessageReferences;
548        }
549    
550        public boolean isCreateTablesOnStartup() {
551            return createTablesOnStartup;
552        }
553    
554        /**
555         * Sets whether or not tables are created on startup
556         */
557        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
558            this.createTablesOnStartup = createTablesOnStartup;
559        }
560    
561        public boolean isUseDatabaseLock() {
562            return useDatabaseLock;
563        }
564    
565        /**
566         * Sets whether or not an exclusive database lock should be used to enable
567         * JDBC Master/Slave. Enabled by default.
568         */
569        public void setUseDatabaseLock(boolean useDatabaseLock) {
570            this.useDatabaseLock = useDatabaseLock;
571        }
572    
573        public static void log(String msg, SQLException e) {
574            String s = msg + e.getMessage();
575            while (e.getNextException() != null) {
576                e = e.getNextException();
577                s += ", due to: " + e.getMessage();
578            }
579            LOG.warn(s, e);
580        }
581    
582        public Statements getStatements() {
583            if (statements == null) {
584                statements = new Statements();
585            }
586            return statements;
587        }
588    
589        public void setStatements(Statements statements) {
590            this.statements = statements;
591        }
592    
593        /**
594         * @param usageManager The UsageManager that is controlling the
595         *                destination's memory usage.
596         */
597        public void setUsageManager(SystemUsage usageManager) {
598        }
599    
600        protected void databaseLockKeepAlive() {
601            boolean stop = false;
602            try {
603                DatabaseLocker locker = getDatabaseLocker();
604                if (locker != null) {
605                    if (!locker.keepAlive()) {
606                        stop = true;
607                    }
608                }
609            } catch (IOException e) {
610                LOG.error("Failed to get database when trying keepalive: " + e, e);
611            }
612            if (stop) {
613                stopBroker();
614            }
615        }
616    
617        protected void stopBroker() {
618            // we can no longer keep the lock so lets fail
619            LOG.info("No longer able to keep the exclusive lock so giving up being a master");
620            try {
621                brokerService.stop();
622            } catch (Exception e) {
623                LOG.warn("Failure occurred while stopping broker");
624            }
625        }
626    
627        protected DatabaseLocker loadDataBaseLocker() throws IOException {
628            DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
629            if (locker == null) {
630                locker = new DefaultDatabaseLocker();
631                LOG.debug("Using default JDBC Locker: " + locker);
632            }
633            return locker;
634        }
635    
636        public void setBrokerName(String brokerName) {
637        }
638    
639        public String toString() {
640            return "JDBCPersistenceAdapter(" + super.toString() + ")";
641        }
642    
643        public void setDirectory(File dir) {
644        }
645    
646        // interesting bit here is proof that DB is ok
647        public void checkpoint(boolean sync) throws IOException {
648            // by pass TransactionContext to avoid IO Exception handler
649            Connection connection = null;
650            try {
651                connection = getDataSource().getConnection();
652            } catch (SQLException e) {
653                LOG.debug("Could not get JDBC connection for checkpoint: " + e);
654                throw IOExceptionSupport.create(e);
655            } finally {
656                if (connection != null) {
657                    try {
658                        connection.close();
659                    } catch (Throwable ignored) {
660                    }
661                }
662            }
663        }
664    
665        public long size(){
666            return 0;
667        }
668    
669        public long getLockKeepAlivePeriod() {
670            return lockKeepAlivePeriod;
671        }
672    
673        public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
674            this.lockKeepAlivePeriod = lockKeepAlivePeriod;
675        }
676    
677        public long getLockAcquireSleepInterval() {
678            return lockAcquireSleepInterval;
679        }
680    
681        /**
682         * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
683         * not applied if DataBaseLocker is injected.
684         */
685        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
686            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
687        }
688        
689        /**
690         * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
691         * This allowable dirty isolation level may not be achievable in clustered DB environments
692         * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
693         * see isolation level constants in {@link java.sql.Connection}
694         * @param transactionIsolation the isolation level to use
695         */
696        public void setTransactionIsolation(int transactionIsolation) {
697            this.transactionIsolation = transactionIsolation;
698        }
699    
700            public int getMaxProducersToAudit() {
701                    return maxProducersToAudit;
702            }
703    
704            public void setMaxProducersToAudit(int maxProducersToAudit) {
705                    this.maxProducersToAudit = maxProducersToAudit;
706            }
707    
708            public int getMaxAuditDepth() {
709                    return maxAuditDepth;
710            }
711    
712            public void setMaxAuditDepth(int maxAuditDepth) {
713                    this.maxAuditDepth = maxAuditDepth;
714            }
715    
716            public boolean isEnableAudit() {
717                    return enableAudit;
718            }
719    
720            public void setEnableAudit(boolean enableAudit) {
721                    this.enableAudit = enableAudit;
722            }
723    
724        public int getAuditRecoveryDepth() {
725            return auditRecoveryDepth;
726        }
727    
728        public void setAuditRecoveryDepth(int auditRecoveryDepth) {
729            this.auditRecoveryDepth = auditRecoveryDepth;
730        }
731    
732        public long getNextSequenceId() {
733            synchronized(sequenceGenerator) {
734                return sequenceGenerator.getNextSequenceId();
735            }
736        }
737    
738        public int getMaxRows() {
739            return maxRows;
740        }
741    
742        /*
743         * the max rows return from queries, with sparse selectors this may need to be increased
744         */
745        public void setMaxRows(int maxRows) {
746            this.maxRows = maxRows;
747        }
748    }