libpqxx
The C++ client library for PostgreSQL
stream_query_impl.hxx
1 /* Code for parts of pqxx::internal::stream_query.
2  *
3  * These definitions needs to be in a separate file in order to iron out
4  * circular dependencies between headers.
5  */
6 #if !defined(PQXX_H_STREAM_QUERY_IMPL)
7 # define PQXX_H_STREAM_QUERY_IMPL
8 
9 namespace pqxx::internal
10 {
11 template<typename... TYPE>
13  transaction_base &tx, std::string_view query) :
14  transaction_focus{tx, "stream_query"}, m_char_finder{get_finder(tx)}
15 {
16  auto const r{tx.exec(internal::concat("COPY (", query, ") TO STDOUT"))};
17  r.expect_columns(sizeof...(TYPE));
18  r.expect_rows(0);
19  register_me();
20 }
21 
22 
23 template<typename... TYPE>
24 inline char_finder_func *
25 stream_query<TYPE...>::get_finder(transaction_base const &tx)
26 {
27  auto const group{enc_group(tx.conn().encoding_id())};
28  return get_s_char_finder<'\t', '\\'>(group);
29 }
30 
31 
32 // C++20: Replace with generator? Could be faster (local vars vs. members).
34 
37 template<typename... TYPE> class stream_query_input_iterator
38 {
39  using stream_t = stream_query<TYPE...>;
40 
41 public:
42  using value_type = std::tuple<TYPE...>;
43  using difference_type = long;
44 
45  explicit stream_query_input_iterator(stream_t &home) :
46  m_home(&home),
47  m_line{typename stream_query<TYPE...>::line_handle(
49  {
50  consume_line();
51  }
54 
57  {
58  assert(not done());
59  consume_line();
60  return *this;
61  }
62 
64 
67  {
68  ++*this;
69  return {};
70  }
71 
73  value_type operator*() const
74  {
75  return m_home->parse_line(zview{m_line.get(), m_line_size});
76  }
77 
79  bool operator==(stream_query_end_iterator) const noexcept { return done(); }
82  {
83  return not done();
84  }
85 
87  operator=(stream_query_input_iterator &&rhs) noexcept
88  {
89  if (&rhs != this)
90  {
91  m_line = std::move(rhs.m_line);
92  m_home = rhs.m_home;
93  m_line_size = rhs.m_line_size;
94  }
95  return *this;
96  }
97 
98 private:
99  stream_query_input_iterator() {}
100 
102  bool done() const noexcept { return m_home->done(); }
103 
105 
108  void consume_line() &
109  {
110  auto [line, size]{m_home->read_line()};
111  m_line = std::move(line);
112  m_line_size = size;
113  if (m_line)
114  {
115  // We know how many fields to expect. Replace the newline at the end
116  // with the field separator, so the parsing loop only needs to scan for a
117  // tab, not a tab or a newline.
118  char *const ptr{m_line.get()};
119  assert(ptr[size] == '\n');
120  ptr[size] = '\t';
121  }
122  }
123 
124  stream_t *m_home;
125 
127  typename stream_t::line_handle m_line;
128 
130  std::size_t m_line_size;
131 };
132 
133 
134 template<typename... TYPE>
135 inline bool operator==(
136  stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
137 {
138  return i.done();
139 }
140 
141 
142 template<typename... TYPE>
143 inline bool operator!=(
144  stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
145 {
146  return not i.done();
147 }
148 
149 
150 template<typename... TYPE> inline auto stream_query<TYPE...>::begin() &
151 {
152  return stream_query_input_iterator<TYPE...>{*this};
153 }
154 
155 
156 template<typename... TYPE>
157 inline std::pair<typename stream_query<TYPE...>::line_handle, std::size_t>
159 {
160  assert(not done());
161 
162  internal::gate::connection_stream_from gate{m_trans->conn()};
163  try
164  {
165  auto line{gate.read_copy_line()};
166  // Check for completion.
167  if (not line.first)
168  PQXX_UNLIKELY close();
169  return line;
170  }
171  catch (std::exception const &)
172  {
173  close();
174  throw;
175  }
176 }
177 } // namespace pqxx::internal
178 #endif
result exec(std::string_view query, std::string_view desc)
Execute a command.
Definition: transaction_base.cxx:249
bool operator!=(stream_query_end_iterator) const noexcept
Comparison only works for comparing to end().
Definition: stream_query_impl.hxx:81
std::string concat(TYPE...item)
Efficiently combine a bunch of items into one big string.
Definition: concat.hxx:31
void pqfreemem(void const *ptr) noexcept
Wrapper for PQfreemem(), with C++ linkage.
Definition: util.cxx:205
Marker-type wrapper: zero-terminated std::string_view.
Definition: zview.hxx:37
auto begin()&
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:150
stream_query_input_iterator operator++(int)
Post-increment. Only here to satisfy input_iterator concept.
Definition: stream_query_impl.hxx:66
Internal items for libpqxx' own use. Do not use these yourself.
Definition: encodings.cxx:32
bool operator==(stream_query_end_iterator) const noexcept
Are we at the end?
Definition: stream_query_impl.hxx:79
The end() iterator for a stream_query.
Definition: stream_query.hxx:46
Definition: connection-stream_from.hxx:12
Input iterator for stream_query.
Definition: stream_query_impl.hxx:37
result expect_columns(row_size_type cols) const
Expect that result consists of exactly cols columns.
Definition: result.hxx:363
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:28
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:79
result expect_rows(size_type n) const
Check that result contains exactly n rows.
Definition: result.hxx:321
std::tuple< TYPE... > parse_line(zview line)&
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:118
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:106
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
std::pair< line_handle, std::size_t > read_line()&
Read a COPY line from the server.
Definition: stream_query_impl.hxx:158
value_type operator*() const
Dereference. There's no caching in here, so don't repeat calls.
Definition: stream_query_impl.hxx:73
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition: encoding_group.hxx:71
pqxx::internal::encoding_group enc_group(std::string_view encoding_name)
Convert libpq encoding name to its libpqxx encoding group.
Definition: encodings.cxx:35
stream_query_input_iterator & operator++()&
Pre-increment. This is what you'd normally want to use.
Definition: stream_query_impl.hxx:56
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:150