Libosmium  2.15.2
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2019 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  return osmium::config::get_max_queue_size("INPUT", 20);
76  }
77 
78  inline std::size_t get_osmdata_queue_size() noexcept {
79  return osmium::config::get_max_queue_size("OSMDATA", 20);
80  }
81 
82  } // namespace detail
83 
90  class Reader {
91 
92  // The Reader::read() function reads from a queue of buffers which
93  // can contain nested buffers. These nested buffers will be in
94  // here, because read() can only return a single unnested buffer.
95  osmium::memory::Buffer m_back_buffers{};
96 
98 
99  osmium::thread::Pool* m_pool = nullptr;
100 
101  detail::ParserFactory::create_parser_type m_creator;
102 
103  enum class status {
104  okay = 0, // normal reading
105  error = 1, // some error occurred while reading
106  closed = 2, // close() called
107  eof = 3 // eof of file was reached without error
108  } m_status = status::okay;
109 
110  int m_childpid = 0;
111 
112  detail::future_string_queue_type m_input_queue;
113 
114  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
115 
116  osmium::io::detail::ReadThreadManager m_read_thread_manager;
117 
118  detail::future_buffer_queue_type m_osmdata_queue;
119  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
120 
121  std::future<osmium::io::Header> m_header_future{};
122  osmium::io::Header m_header{};
123 
125 
126  std::size_t m_file_size = 0;
127 
130 
131  void set_option(osmium::thread::Pool& pool) noexcept {
132  m_pool = &pool;
133  }
134 
136  m_read_which_entities = value;
137  }
138 
139  void set_option(osmium::io::read_meta value) noexcept {
140  m_read_metadata = value;
141  }
142 
143  // This function will run in a separate thread.
145  const detail::ParserFactory::create_parser_type& creator,
146  detail::future_string_queue_type& input_queue,
147  detail::future_buffer_queue_type& osmdata_queue,
148  std::promise<osmium::io::Header>&& header_promise,
149  osmium::osm_entity_bits::type read_which_entities,
150  osmium::io::read_meta read_metadata) {
151  std::promise<osmium::io::Header> promise{std::move(header_promise)};
152  osmium::io::detail::parser_arguments args = {
153  pool,
154  input_queue,
155  osmdata_queue,
156  promise,
157  read_which_entities,
158  read_metadata
159  };
160  creator(args)->parse();
161  }
162 
163 #ifndef _WIN32
164 
175  static int execute(const std::string& command, const std::string& filename, int* childpid) {
176  int pipefd[2];
177  if (pipe(pipefd) < 0) {
178  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
179  }
180  const pid_t pid = fork();
181  if (pid < 0) {
182  throw std::system_error{errno, std::system_category(), "fork failed"};
183  }
184  if (pid == 0) { // child
185  // close all file descriptors except one end of the pipe
186  for (int i = 0; i < 32; ++i) {
187  if (i != pipefd[1]) {
188  ::close(i);
189  }
190  }
191  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
192  exit(1);
193  }
194 
195  ::open("/dev/null", O_RDONLY); // stdin
196  ::open("/dev/null", O_WRONLY); // stderr
197  // hack: -g switches off globbing in curl which allows [] to be used in file names
198  // this is important for XAPI URLs
199  // in theory this execute() function could be used for other commands, but it is
200  // only used for curl at the moment, so this is okay.
201  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
202  exit(1);
203  }
204  }
205  // parent
206  *childpid = pid;
207  ::close(pipefd[1]);
208  return pipefd[0];
209  }
210 #endif
211 
220  static int open_input_file_or_url(const std::string& filename, int* childpid) {
221  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
222  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
223 #ifndef _WIN32
224  return execute("curl", filename, childpid);
225 #else
226  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
227 #endif
228  }
229  return osmium::io::detail::open_for_reading(filename);
230  }
231 
232  public:
233 
256  template <typename... TArgs>
257  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
258  m_file(file.check()),
259  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
260  m_input_queue(detail::get_input_queue_size(), "raw_input"),
261  m_decompressor(m_file.buffer() ?
262  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
263  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
264  m_read_thread_manager(*m_decompressor, m_input_queue),
265  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
266  m_osmdata_queue_wrapper(m_osmdata_queue),
267  m_file_size(m_decompressor->file_size()) {
268 
269  (void)std::initializer_list<int>{
270  (set_option(args), 0)...
271  };
272 
273  if (!m_pool) {
274  m_pool = &thread::Pool::default_instance();
275  }
276 
277  std::promise<osmium::io::Header> header_promise;
278  m_header_future = header_promise.get_future();
279  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), std::ref(m_creator), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), m_read_which_entities, m_read_metadata};
280  }
281 
282  template <typename... TArgs>
283  explicit Reader(const std::string& filename, TArgs&&... args) :
284  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
285  }
286 
287  template <typename... TArgs>
288  explicit Reader(const char* filename, TArgs&&... args) :
289  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
290  }
291 
292  Reader(const Reader&) = delete;
293  Reader& operator=(const Reader&) = delete;
294 
295  Reader(Reader&&) = delete;
296  Reader& operator=(Reader&&) = delete;
297 
298  ~Reader() noexcept {
299  try {
300  close();
301  } catch (...) {
302  // Ignore any exceptions because destructor must not throw.
303  }
304  }
305 
314  void close() {
315  m_status = status::closed;
316 
317  m_read_thread_manager.stop();
318 
319  m_osmdata_queue_wrapper.drain();
320 
321  try {
322  m_read_thread_manager.close();
323  } catch (...) {
324  // Ignore any exceptions.
325  }
326 
327 #ifndef _WIN32
328  if (m_childpid) {
329  int status;
330  const pid_t pid = ::waitpid(m_childpid, &status, 0);
331 #pragma GCC diagnostic push
332 #pragma GCC diagnostic ignored "-Wold-style-cast"
333  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
334  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
335  }
336 #pragma GCC diagnostic pop
337  m_childpid = 0;
338  }
339 #endif
340  }
341 
349  if (m_status == status::error) {
350  throw io_error{"Can not get header from reader when in status 'error'"};
351  }
352 
353  try {
354  if (m_header_future.valid()) {
355  m_header = m_header_future.get();
356  }
357  } catch (...) {
358  close();
359  m_status = status::error;
360  throw;
361  }
362 
363  return m_header;
364  }
365 
375  osmium::memory::Buffer buffer;
376 
377  // If there are buffers on the stack, return those first.
378  if (m_back_buffers) {
379  if (m_back_buffers.has_nested_buffers()) {
380  buffer = std::move(*m_back_buffers.get_last_nested());
381  } else {
382  buffer = std::move(m_back_buffers);
383  m_back_buffers = osmium::memory::Buffer{};
384  }
385  return buffer;
386  }
387 
388  if (m_status != status::okay) {
389  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
390  }
391 
392  if (m_read_which_entities == osmium::osm_entity_bits::nothing) {
393  m_status = status::eof;
394  return buffer;
395  }
396 
397  try {
398  // m_input_format.read() can return an invalid buffer to signal EOF,
399  // or a valid buffer with or without data. A valid buffer
400  // without data is not an error, it just means we have to
401  // keep getting the next buffer until there is one with data.
402  while (true) {
403  buffer = m_osmdata_queue_wrapper.pop();
404  if (detail::at_end_of_data(buffer)) {
405  m_status = status::eof;
406  m_read_thread_manager.close();
407  return buffer;
408  }
409  if (buffer.has_nested_buffers()) {
410  m_back_buffers = std::move(buffer);
411  buffer = std::move(*m_back_buffers.get_last_nested());
412  }
413  if (buffer.committed() > 0) {
414  return buffer;
415  }
416  }
417  } catch (...) {
418  close();
419  m_status = status::error;
420  throw;
421  }
422  }
423 
428  bool eof() const {
429  return m_status == status::eof || m_status == status::closed;
430  }
431 
436  std::size_t file_size() const noexcept {
437  return m_file_size;
438  }
439 
454  std::size_t offset() const noexcept {
455  return m_decompressor->offset();
456  }
457 
458  }; // class Reader
459 
468  template <typename... TArgs>
471 
472  Reader reader{std::forward<TArgs>(args)...};
473  while (auto read_buffer = reader.read()) {
474  buffer.add_buffer(read_buffer);
475  buffer.commit();
476  }
477 
478  return buffer;
479  }
480 
481  } // namespace io
482 
483 } // namespace osmium
484 
485 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:119
status
Definition: reader.hpp:103
osmium::memory::Buffer read()
Definition: reader.hpp:374
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:257
type
Definition: entity_bits.hpp:63
std::size_t file_size(int fd)
Definition: file.hpp:109
std::size_t file_size() const noexcept
Definition: reader.hpp:436
bool eof() const
Definition: reader.hpp:428
static Pool & default_instance()
Definition: pool.hpp:180
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:288
Definition: location.hpp:550
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:114
std::size_t committed() const noexcept
Definition: buffer.hpp:356
object or changeset
Definition: entity_bits.hpp:76
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:112
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:469
osmium::io::File m_file
Definition: reader.hpp:97
Definition: file.hpp:72
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:135
Definition: attr.hpp:333
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:116
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:175
static void parser_thread(osmium::thread::Pool &pool, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata)
Definition: reader.hpp:144
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:283
Definition: reader.hpp:90
~Reader() noexcept
Definition: reader.hpp:298
Definition: error.hpp:44
std::size_t offset() const noexcept
Definition: reader.hpp:454
Definition: pool.hpp:90
osmium::io::Header header()
Definition: reader.hpp:348
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:139
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
bool has_nested_buffers() const noexcept
Definition: buffer.hpp:437
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:131
Definition: buffer.hpp:97
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:220
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:118
read_meta
Definition: file_format.hpp:54
Definition: entity_bits.hpp:67
Definition: compression.hpp:137
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:101
Definition: header.hpp:68
void close()
Definition: reader.hpp:314
Definition: util.hpp:85