Libosmium  2.17.1
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  return osmium::config::get_max_queue_size("INPUT", 20);
76  }
77 
78  inline std::size_t get_osmdata_queue_size() noexcept {
79  return osmium::config::get_max_queue_size("OSMDATA", 20);
80  }
81 
82  } // namespace detail
83 
90  class Reader {
91 
92  // The Reader::read() function reads from a queue of buffers which
93  // can contain nested buffers. These nested buffers will be in
94  // here, because read() can only return a single unnested buffer.
95  osmium::memory::Buffer m_back_buffers{};
96 
98 
100 
101  detail::ParserFactory::create_parser_type m_creator;
102 
103  enum class status {
104  okay = 0, // normal reading
105  error = 1, // some error occurred while reading
106  closed = 2, // close() called
107  eof = 3 // eof of file was reached without error
109 
110  int m_childpid = 0;
111 
112  detail::future_string_queue_type m_input_queue;
113 
114  int m_fd;
115 
116  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
117 
118  osmium::io::detail::ReadThreadManager m_read_thread_manager;
119 
120  detail::future_buffer_queue_type m_osmdata_queue;
121  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
122 
123  std::future<osmium::io::Header> m_header_future{};
125 
127 
128  std::size_t m_file_size = 0;
129  std::atomic<std::size_t> m_offset{0};
130 
134 
135  void set_option(osmium::thread::Pool& pool) noexcept {
136  m_pool = &pool;
137  }
138 
140  m_read_which_entities = value;
141  }
142 
143  void set_option(osmium::io::read_meta value) noexcept {
144  // Ignore this setting if we have a history/change file,
145  // because if this is set to "no", we don't see the difference
146  // between visible and deleted objects.
148  m_read_metadata = value;
149  }
150  }
151 
152  void set_option(osmium::io::buffers_type value) noexcept {
153  m_buffers_kind = value;
154  }
155 
156  // This function will run in a separate thread.
158  int fd,
159  const detail::ParserFactory::create_parser_type& creator,
160  detail::future_string_queue_type& input_queue,
161  detail::future_buffer_queue_type& osmdata_queue,
162  std::promise<osmium::io::Header>&& header_promise,
163  std::atomic<std::size_t>* offset_ptr,
164  osmium::osm_entity_bits::type read_which_entities,
165  osmium::io::read_meta read_metadata,
166  osmium::io::buffers_type buffers_kind,
167  bool want_buffered_pages_removed) {
168  std::promise<osmium::io::Header> promise{std::move(header_promise)};
169  osmium::io::detail::parser_arguments args = {
170  pool,
171  fd,
172  input_queue,
173  osmdata_queue,
174  promise,
175  offset_ptr,
176  read_which_entities,
177  read_metadata,
178  buffers_kind,
179  want_buffered_pages_removed
180  };
181  creator(args)->parse();
182  }
183 
184 #ifndef _WIN32
196  static int execute(const std::string& command, const std::string& filename, int* childpid) {
197  int pipefd[2];
198  if (pipe(pipefd) < 0) {
199  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
200  }
201  const pid_t pid = fork();
202  if (pid < 0) {
203  throw std::system_error{errno, std::system_category(), "fork failed"};
204  }
205  if (pid == 0) { // child
206  // close all file descriptors except one end of the pipe
207  for (int i = 0; i < 32; ++i) {
208  if (i != pipefd[1]) {
209  ::close(i);
210  }
211  }
212  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
213  exit(1);
214  }
215 
216  ::open("/dev/null", O_RDONLY); // stdin
217  ::open("/dev/null", O_WRONLY); // stderr
218  // hack: -g switches off globbing in curl which allows [] to be used in file names
219  // this is important for XAPI URLs
220  // in theory this execute() function could be used for other commands, but it is
221  // only used for curl at the moment, so this is okay.
222  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
223  exit(1);
224  }
225  }
226  // parent
227  *childpid = pid;
228  ::close(pipefd[1]);
229  return pipefd[0];
230  }
231 #endif
232 
241  static int open_input_file_or_url(const std::string& filename, int* childpid) {
242  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
243  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
244 #ifndef _WIN32
245  return execute("curl", filename, childpid);
246 #else
247  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
248 #endif
249  }
250  const int fd = osmium::io::detail::open_for_reading(filename);
251 #if __linux__
252  if (fd >= 0) {
253  // Tell the kernel we are going to read this file sequentially
254  ::posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
255  }
256 #endif
257  return fd;
258  }
259 
260  std::unique_ptr<Decompressor> make_decompressor(const osmium::io::File& file, int fd) {
261  const auto& factory = osmium::io::CompressionFactory::instance();
262  if (file.buffer()) {
263  return factory.create_decompressor(file.compression(), file.buffer(), file.buffer_size());
264  }
265 
266  if (file.format() == file_format::pbf) {
267  return std::unique_ptr<Decompressor>{new DummyDecompressor{}};
268  }
269 
270  return factory.create_decompressor(file.compression(), fd);
271  }
272 
273  public:
274 
315  template <typename... TArgs>
316  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
317  m_file(file.check()),
318  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
319  m_input_queue(detail::get_input_queue_size(), "raw_input"),
320  m_fd(m_file.buffer() ? -1 : open_input_file_or_url(m_file.filename(), &m_childpid)),
323  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
325  m_file_size(m_fd > 2 ? osmium::file_size(m_fd) : 0) {
326 
327  (void)std::initializer_list<int>{
328  (set_option(args), 0)...
329  };
330 
331  if (!m_pool) {
333  }
334 
335  m_decompressor->set_offset_ptr(&m_offset);
336 
337  std::promise<osmium::io::Header> header_promise;
338  m_header_future = header_promise.get_future();
339 
341  if (cpc >= 0) {
342  m_decompressor->set_want_buffered_pages_removed(true);
343  }
344 
345  const int fd_for_parser = m_decompressor->is_real() ? -1 : m_fd;
346  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), fd_for_parser, std::ref(m_creator),
347  std::ref(m_input_queue), std::ref(m_osmdata_queue),
348  std::move(header_promise), &m_offset, m_read_which_entities,
350  m_decompressor->want_buffered_pages_removed()};
351  }
352 
353  template <typename... TArgs>
354  explicit Reader(const std::string& filename, TArgs&&... args) :
355  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
356  }
357 
358  template <typename... TArgs>
359  explicit Reader(const char* filename, TArgs&&... args) :
360  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
361  }
362 
363  Reader(const Reader&) = delete;
364  Reader& operator=(const Reader&) = delete;
365 
366  Reader(Reader&&) = delete;
367  Reader& operator=(Reader&&) = delete;
368 
369  ~Reader() noexcept {
370  try {
371  close();
372  } catch (...) {
373  // Ignore any exceptions because destructor must not throw.
374  }
375  }
376 
385  void close() {
387 
388  m_read_thread_manager.stop();
389 
390  m_osmdata_queue_wrapper.drain();
391 
392  try {
393  m_read_thread_manager.close();
394  } catch (...) {
395  // Ignore any exceptions.
396  }
397 
398 #ifndef _WIN32
399  if (m_childpid) {
400  int status = 0;
401  const pid_t pid = ::waitpid(m_childpid, &status, 0);
402 #pragma GCC diagnostic push
403 #pragma GCC diagnostic ignored "-Wold-style-cast"
404  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
405  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
406  }
407 #pragma GCC diagnostic pop
408  m_childpid = 0;
409  }
410 #endif
411  }
412 
420  if (m_status == status::error) {
421  throw io_error{"Can not get header from reader when in status 'error'"};
422  }
423 
424  try {
425  if (m_header_future.valid()) {
426  m_header = m_header_future.get();
427  }
428  } catch (...) {
429  close();
431  throw;
432  }
433 
434  return m_header;
435  }
436 
445  osmium::memory::Buffer read() {
446  osmium::memory::Buffer buffer;
447 
448  // If there are buffers on the stack, return those first.
449  if (m_back_buffers) {
450  if (m_back_buffers.has_nested_buffers()) {
451  buffer = std::move(*m_back_buffers.get_last_nested());
452  } else {
453  buffer = std::move(m_back_buffers);
454  m_back_buffers = osmium::memory::Buffer{};
455  }
456  return buffer;
457  }
458 
459  if (m_status != status::okay) {
460  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
461  }
462 
465  return buffer;
466  }
467 
468  try {
469  // m_input_format.read() can return an invalid buffer to signal EOF,
470  // or a valid buffer with or without data. A valid buffer
471  // without data is not an error, it just means we have to
472  // keep getting the next buffer until there is one with data.
473  while (true) {
474  buffer = m_osmdata_queue_wrapper.pop();
475  if (detail::at_end_of_data(buffer)) {
477  m_read_thread_manager.close();
478  return buffer;
479  }
480  if (buffer.has_nested_buffers()) {
481  m_back_buffers = std::move(buffer);
482  buffer = std::move(*m_back_buffers.get_last_nested());
483  }
484  if (buffer.committed() > 0) {
485  return buffer;
486  }
487  }
488  } catch (...) {
489  close();
491  throw;
492  }
493  }
494 
499  bool eof() const {
501  }
502 
507  std::size_t file_size() const noexcept {
508  return m_file_size;
509  }
510 
525  std::size_t offset() const noexcept {
526  return m_offset;
527  }
528 
529  }; // class Reader
530 
539  template <typename... TArgs>
540  osmium::memory::Buffer read_file(TArgs&&... args) {
541  osmium::memory::Buffer buffer{1024 * 1024, osmium::memory::Buffer::auto_grow::yes};
542 
543  Reader reader{std::forward<TArgs>(args)...};
544  while (auto read_buffer = reader.read()) {
545  buffer.add_buffer(read_buffer);
546  buffer.commit();
547  }
548 
549  return buffer;
550  }
551 
552  } // namespace io
553 
554 } // namespace osmium
555 
556 #endif // OSMIUM_IO_READER_HPP
static CompressionFactory & instance()
Definition: compression.hpp:191
Definition: compression.hpp:287
Definition: file.hpp:72
size_t buffer_size() const noexcept
Definition: file.hpp:147
bool has_multiple_object_versions() const noexcept
Definition: file.hpp:303
const char * buffer() const noexcept
Definition: file.hpp:143
file_compression compression() const noexcept
Definition: file.hpp:294
file_format format() const noexcept
Definition: file.hpp:285
Definition: header.hpp:68
Definition: reader.hpp:90
osmium::memory::Buffer read()
Definition: reader.hpp:445
osmium::io::buffers_type m_buffers_kind
Definition: reader.hpp:133
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:112
osmium::memory::Buffer m_back_buffers
Definition: reader.hpp:95
static void parser_thread(osmium::thread::Pool &pool, int fd, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, std::atomic< std::size_t > *offset_ptr, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata, osmium::io::buffers_type buffers_kind, bool want_buffered_pages_removed)
Definition: reader.hpp:157
int m_childpid
Definition: reader.hpp:110
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:143
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:120
std::size_t m_file_size
Definition: reader.hpp:128
Reader & operator=(Reader &&)=delete
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:135
void set_option(osmium::io::buffers_type value) noexcept
Definition: reader.hpp:152
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:196
enum osmium::io::Reader::status m_status
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:241
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:116
status
Definition: reader.hpp:103
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:359
osmium::io::Header m_header
Definition: reader.hpp:124
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:101
Reader & operator=(const Reader &)=delete
osmium::io::Header header()
Definition: reader.hpp:419
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:121
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:316
std::size_t file_size() const noexcept
Definition: reader.hpp:507
Reader(Reader &&)=delete
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:139
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:123
std::unique_ptr< Decompressor > make_decompressor(const osmium::io::File &file, int fd)
Definition: reader.hpp:260
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:118
bool eof() const
Definition: reader.hpp:499
Reader(const Reader &)=delete
std::size_t offset() const noexcept
Definition: reader.hpp:525
std::atomic< std::size_t > m_offset
Definition: reader.hpp:129
osmium::thread::thread_handler m_thread
Definition: reader.hpp:126
void close()
Definition: reader.hpp:385
~Reader() noexcept
Definition: reader.hpp:369
osmium::io::File m_file
Definition: reader.hpp:97
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:131
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:354
osmium::io::read_meta m_read_metadata
Definition: reader.hpp:132
int m_fd
Definition: reader.hpp:114
osmium::thread::Pool * m_pool
Definition: reader.hpp:99
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
int8_t clean_page_cache_after_read() noexcept
Definition: config.hpp:106
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:540
buffers_type
Definition: file_format.hpp:60
read_meta
Definition: file_format.hpp:55
type
Definition: entity_bits.hpp:63
@ all
object or changeset
Definition: entity_bits.hpp:76
@ nothing
Definition: entity_bits.hpp:67
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551
Definition: error.hpp:46