001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.command.store.amq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.PrintWriter;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Scanner;
030    
031    import org.apache.activemq.command.ActiveMQBlobMessage;
032    import org.apache.activemq.command.ActiveMQBytesMessage;
033    import org.apache.activemq.command.ActiveMQMapMessage;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ActiveMQObjectMessage;
036    import org.apache.activemq.command.ActiveMQStreamMessage;
037    import org.apache.activemq.command.ActiveMQTextMessage;
038    import org.apache.activemq.command.DataStructure;
039    import org.apache.activemq.command.JournalQueueAck;
040    import org.apache.activemq.command.JournalTopicAck;
041    import org.apache.activemq.command.JournalTrace;
042    import org.apache.activemq.command.JournalTransaction;
043    import org.apache.activemq.kaha.impl.async.Location;
044    import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045    import org.apache.activemq.openwire.OpenWireFormat;
046    import org.apache.activemq.util.ByteSequence;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.apache.velocity.Template;
049    import org.apache.velocity.VelocityContext;
050    import org.apache.velocity.app.Velocity;
051    import org.apache.velocity.app.VelocityEngine;
052    import org.josql.Query;
053    
054    /**
055     * Allows you to view the contents of a Journal.
056     * 
057     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
058     */
059    public class AMQJournalTool {
060    
061            private final ArrayList<File> dirs = new ArrayList<File>();
062            private final WireFormat wireFormat = new OpenWireFormat();
063            private final HashMap<String, String> resources = new HashMap<String, String>();
064    
065            private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
066            private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
067            private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
068            private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
069            private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
070            private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
071            private String where;
072            private VelocityContext context;
073            private VelocityEngine velocity;
074            private boolean help;
075    
076            public static void main(String[] args) throws Exception {
077                    AMQJournalTool consumerTool = new AMQJournalTool();
078                    String[] directories = CommandLineSupport
079                                    .setOptions(consumerTool, args);
080                    if (directories.length < 1) {
081                            System.out
082                                            .println("Please specify the directories with journal data to scan");
083                            return;
084                    }
085                    for (int i = 0; i < directories.length; i++) {
086                            consumerTool.getDirs().add(new File(directories[i]));
087                    }
088                    consumerTool.execute();
089            }
090    
091            public void execute() throws Exception {
092    
093                    if( help ) {
094                            showHelp();
095                            return;
096                    }
097                    
098                    if (getDirs().size() < 1) {
099                            System.out.println("");
100                            System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
101                            System.out.println("");
102                            showHelp();
103                            return;
104                    }
105    
106                    for (File dir : getDirs()) {
107                            if( !dir.exists() ) {
108                                    System.out.println("");
109                                    System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
110                                    System.out.println("");
111                                    showHelp();
112                                    return;
113                            }
114                            if( !dir.isDirectory() ) {
115                                    System.out.println("");
116                                    System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
117                                    System.out.println("");
118                                    showHelp();
119                                    return;
120                            }
121                    }
122                    
123                    
124                    context = new VelocityContext();
125                    List keys = Arrays.asList(context.getKeys());
126    
127                    for (Iterator iterator = System.getProperties().entrySet()
128                                    .iterator(); iterator.hasNext();) {
129                            Map.Entry kv = (Map.Entry) iterator.next();
130                            String name = (String) kv.getKey();
131                            String value = (String) kv.getValue();
132    
133                            if (!keys.contains(name)) {
134                                    context.put(name, value);
135                            }
136                    }
137                    
138                    velocity = new VelocityEngine();
139                    velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
140                    velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
141                    velocity.init();
142    
143    
144                    resources.put("message", messageFormat);
145                    resources.put("topicAck", topicAckFormat);
146                    resources.put("queueAck", queueAckFormat);
147                    resources.put("transaction", transactionFormat);
148                    resources.put("trace", traceFormat);
149                    resources.put("unknown", unknownFormat);
150    
151                    Query query = null;
152                    if (where != null) {
153                            query = new Query();
154                            query.parse("select * from "+Entry.class.getName()+" where "+where);
155    
156                    }
157    
158                    ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
159                    manager.start();
160                    try {
161                            Location curr = manager.getFirstLocation();
162                            while (curr != null) {
163    
164                                    ByteSequence data = manager.read(curr);
165                                    DataStructure c = (DataStructure) wireFormat.unmarshal(data);
166    
167                                    Entry entry = new Entry();
168                                    entry.setLocation(curr);
169                                    entry.setRecord(c);
170                                    entry.setData(data);
171                                    entry.setQuery(query);
172                                    process(entry);
173    
174                                    curr = manager.getNextLocation(curr);
175                            }
176                    } finally {
177                            manager.close();
178                    }
179            }
180    
181            private void showHelp() {
182                    InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
183                    Scanner scanner = new Scanner(is);
184                    while (scanner.hasNextLine()) {
185                            String line = scanner.nextLine();
186                            System.out.println(line);
187                    }
188                    scanner.close();        }
189    
190            private void process(Entry entry) throws Exception {
191    
192                    Location location = entry.getLocation();
193                    DataStructure record = entry.getRecord();
194    
195                    switch (record.getDataStructureType()) {
196                    case ActiveMQMessage.DATA_STRUCTURE_TYPE:
197                            entry.setType("ActiveMQMessage");
198                            entry.setFormater("message");
199                            display(entry);
200                            break;
201                    case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
202                            entry.setType("ActiveMQBytesMessage");
203                            entry.setFormater("message");
204                            display(entry);
205                            break;
206                    case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
207                            entry.setType("ActiveMQBlobMessage");
208                            entry.setFormater("message");
209                            display(entry);
210                            break;
211                    case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
212                            entry.setType("ActiveMQMapMessage");
213                            entry.setFormater("message");
214                            display(entry);
215                            break;
216                    case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
217                            entry.setType("ActiveMQObjectMessage");
218                            entry.setFormater("message");
219                            display(entry);
220                            break;
221                    case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
222                            entry.setType("ActiveMQStreamMessage");
223                            entry.setFormater("message");
224                            display(entry);
225                            break;
226                    case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
227                            entry.setType("ActiveMQTextMessage");
228                            entry.setFormater("message");
229                            display(entry);
230                            break;
231                    case JournalQueueAck.DATA_STRUCTURE_TYPE:
232                            entry.setType("Queue Ack");
233                            entry.setFormater("queueAck");
234                            display(entry);
235                            break;
236                    case JournalTopicAck.DATA_STRUCTURE_TYPE:
237                            entry.setType("Topic Ack");
238                            entry.setFormater("topicAck");
239                            display(entry);
240                            break;
241                    case JournalTransaction.DATA_STRUCTURE_TYPE:
242                            entry.setType(getType((JournalTransaction) record));
243                            entry.setFormater("transaction");
244                            display(entry);
245                            break;
246                    case JournalTrace.DATA_STRUCTURE_TYPE:
247                            entry.setType("Trace");
248                            entry.setFormater("trace");
249                            display(entry);
250                            break;
251                    default:
252                            entry.setType("Unknown");
253                            entry.setFormater("unknown");
254                            display(entry);
255                            break;
256                    }
257            }
258    
259            private String getType(JournalTransaction record) {
260                    switch (record.getType()) {
261                    case JournalTransaction.XA_PREPARE:
262                            return "XA Prepare";
263                    case JournalTransaction.XA_COMMIT:
264                            return "XA Commit";
265                    case JournalTransaction.XA_ROLLBACK:
266                            return "XA Rollback";
267                    case JournalTransaction.LOCAL_COMMIT:
268                            return "Commit";
269                    case JournalTransaction.LOCAL_ROLLBACK:
270                            return "Rollback";
271                    }
272                    return "Unknown Transaction";
273            }
274    
275            private void display(Entry entry) throws Exception {
276    
277                    if (entry.getQuery() != null) {
278                            List list = Collections.singletonList(entry);
279                            List results = entry.getQuery().execute(list).getResults();
280                            if (results.isEmpty()) {
281                                    return;
282                            }
283                    }
284    
285                    CustomResourceLoader.setResources(resources);
286                    try {
287    
288                            context.put("location", entry.getLocation());
289                            context.put("record", entry.getRecord());
290                            context.put("type", entry.getType());
291                            if (entry.getRecord() instanceof ActiveMQMessage) {
292                                    context.put("body", new MessageBodyFormatter(
293                                                    (ActiveMQMessage) entry.getRecord()));
294                            }
295    
296                            Template template = velocity.getTemplate(entry.getFormater());
297                            PrintWriter writer = new PrintWriter(System.out);
298                            template.merge(context, writer);
299                            writer.println();
300                            writer.flush();
301                    } finally {
302                            CustomResourceLoader.setResources(null);
303                    }
304            }
305    
306            public void setMessageFormat(String messageFormat) {
307                    this.messageFormat = messageFormat;
308            }
309    
310            public void setTopicAckFormat(String ackFormat) {
311                    this.topicAckFormat = ackFormat;
312            }
313    
314            public void setTransactionFormat(String transactionFormat) {
315                    this.transactionFormat = transactionFormat;
316            }
317    
318            public void setTraceFormat(String traceFormat) {
319                    this.traceFormat = traceFormat;
320            }
321    
322            public void setUnknownFormat(String unknownFormat) {
323                    this.unknownFormat = unknownFormat;
324            }
325    
326            public void setQueueAckFormat(String queueAckFormat) {
327                    this.queueAckFormat = queueAckFormat;
328            }
329    
330            public String getQuery() {
331                    return where;
332            }
333    
334            public void setWhere(String query) {
335                    this.where = query;
336            }
337    
338            public boolean isHelp() {
339                    return help;
340            }
341    
342            public void setHelp(boolean help) {
343                    this.help = help;
344            }
345    
346            /**
347             * @return the dirs
348             */
349            public ArrayList<File> getDirs() {
350                    return dirs;
351            }
352    
353    }