001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.sql.Blob;
023    import java.sql.Connection;
024    import java.sql.PreparedStatement;
025    import java.sql.ResultSet;
026    import java.sql.SQLException;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.store.jdbc.TransactionContext;
031    import org.apache.activemq.util.ByteArrayOutputStream;
032    
033    /**
034     * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
035     * operations. This is a little more involved since to insert a blob you have
036     * to:
037     * 
038     * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
039     * value.
040     * 
041     * The databases/JDBC drivers that use this adapter are:
042     * <ul>
043     * <li></li>
044     * </ul>
045     * 
046     * @org.apache.xbean.XBean element="blobJDBCAdapter"
047     * 
048     * 
049     */
050    public class BlobJDBCAdapter extends DefaultJDBCAdapter {
051    
052        public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
053            throws SQLException, JMSException {
054            PreparedStatement s = null;
055            ResultSet rs = null;
056            try {
057    
058                // Add the Blob record.
059                s = c.prepareStatement(statements.getAddMessageStatement());
060                s.setLong(1, seq);
061                s.setString(2, destinationName);
062                s.setString(3, messageID);
063                s.setString(4, " ");
064    
065                if (s.executeUpdate() != 1) {
066                    throw new JMSException("Failed to broker message: " + messageID + " in container.");
067                }
068                s.close();
069    
070                // Select the blob record so that we can update it.
071                s = c.prepareStatement(statements.getFindMessageStatement());
072                s.setLong(1, seq);
073                rs = s.executeQuery();
074                if (!rs.next()) {
075                    throw new JMSException("Failed to broker message: " + messageID + " in container.");
076                }
077    
078                // Update the blob
079                Blob blob = rs.getBlob(1);
080                OutputStream stream = blob.setBinaryStream(data.length);
081                stream.write(data);
082                stream.close();
083                s.close();
084    
085                // Update the row with the updated blob
086                s = c.prepareStatement(statements.getUpdateMessageStatement());
087                s.setBlob(1, blob);
088                s.setLong(2, seq);
089    
090            } catch (IOException e) {
091                throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
092            } finally {
093                try {
094                    rs.close();
095                } catch (Throwable ignore) {
096                }
097                try {
098                    s.close();
099                } catch (Throwable ignore) {
100                }
101            }
102        }
103    
104        public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
105            PreparedStatement s = null;
106            ResultSet rs = null;
107            try {
108    
109                s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
110                s.setLong(1, seq);
111                rs = s.executeQuery();
112    
113                if (!rs.next()) {
114                    return null;
115                }
116                Blob blob = rs.getBlob(1);
117                InputStream is = blob.getBinaryStream();
118    
119                ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
120                int ch;
121                while ((ch = is.read()) >= 0) {
122                    os.write(ch);
123                }
124                is.close();
125                os.close();
126    
127                return os.toByteArray();
128    
129            } catch (IOException e) {
130                throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
131            } finally {
132                try {
133                    rs.close();
134                } catch (Throwable ignore) {
135                }
136                try {
137                    s.close();
138                } catch (Throwable ignore) {
139                }
140            }
141        }
142    
143    }