libpqxx
The C++ client library for PostgreSQL
stream_query.hxx
1 /* Definition of the pqxx::internal::stream_query class.
2  *
3  * Enables optimized batch reads from a database table.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/stream_query instead.
6  *
7  * Copyright (c) 2000-2025, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_H_STREAM_QUERY
14 #define PQXX_H_STREAM_QUERY
15 
16 #if !defined(PQXX_HEADER_PRE)
17 # error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18 #endif
19 
20 #include <cassert>
21 #include <functional>
22 #include <variant>
23 
24 #include "pqxx/connection.hxx"
25 #include "pqxx/except.hxx"
26 #include "pqxx/internal/concat.hxx"
27 #include "pqxx/internal/encoding_group.hxx"
28 #include "pqxx/internal/encodings.hxx"
29 #include "pqxx/internal/gates/connection-stream_from.hxx"
30 #include "pqxx/internal/stream_iterator.hxx"
31 #include "pqxx/separated_list.hxx"
32 #include "pqxx/transaction_base.hxx"
33 #include "pqxx/transaction_focus.hxx"
34 #include "pqxx/util.hxx"
35 
36 
37 namespace pqxx
38 {
39 class transaction_base;
40 } // namespace pqxx
41 
42 
43 namespace pqxx::internal
44 {
47 {};
48 
49 
50 // C++20: Can we use generators, and maybe get speedup from HALO?
52 
79 template<typename... TYPE> class stream_query : transaction_focus
80 {
81 public:
82  using line_handle = std::unique_ptr<char, void (*)(void const *)>;
83 
85  inline stream_query(transaction_base &tx, std::string_view query);
87  inline stream_query(
88  transaction_base &tx, std::string_view query, params const &);
89 
90  stream_query(stream_query &&) = delete;
91  stream_query &operator=(stream_query &&) = delete;
92 
93  ~stream_query() noexcept
94  {
95  try
96  {
97  close();
98  }
99  catch (std::exception const &e)
100  {
101  reg_pending_error(e.what());
102  }
103  }
104 
106  bool done() const & noexcept { return m_char_finder == nullptr; }
107 
109  inline auto begin() &;
111 
115  auto end() const & { return stream_query_end_iterator{}; }
116 
118  std::tuple<TYPE...> parse_line(zview line) &
119  {
120  assert(not done());
121 
122  auto const line_size{std::size(line)};
123 
124  // This function uses m_row as a buffer, across calls. The only reason for
125  // it to carry over across calls is to avoid reallocation.
126 
127  // Make room for unescaping the line. It's a pessimistic size.
128  // Unusually, we're storing terminating zeroes *inside* the string.
129  // This is the only place where we modify m_row. MAKE SURE THE BUFFER DOES
130  // NOT GET RESIZED while we're working, because we're working with views
131  // into its buffer.
132  m_row.resize(line_size + 1);
133 
134  std::size_t offset{0u};
135  char *write{m_row.data()};
136 
137  // DO NOT shrink m_row to fit. We're carrying views pointing into the
138  // buffer. (Also, how useful would shrinking really be?)
139 
140  // Folding expression: scan and unescape each field, and convert it to its
141  // requested type.
142  std::tuple<TYPE...> data{parse_field<TYPE>(line, offset, write)...};
143 
144  assert(offset == line_size + 1u);
145  return data;
146  }
147 
149  std::pair<line_handle, std::size_t> read_line() &;
150 
151 private:
153 
156  static inline char_finder_func *get_finder(transaction_base const &tx);
157 
159 
173  std::tuple<std::size_t, char *, zview>
174  read_field(zview line, std::size_t offset, char *write)
175  {
176 #if !defined(NDEBUG)
177  auto const line_size{std::size(line)};
178 #endif
179 
180  assert(offset <= line_size);
181 
182  char const *lp{std::data(line)};
183 
184  // The COPY line now ends in a tab. (We replace the trailing newline with
185  // that to simplify the loop here.)
186  assert(lp[line_size] == '\t');
187  assert(lp[line_size + 1] == '\0');
188 
189  if ((lp[offset] == '\\') and (lp[offset + 1] == 'N'))
190  {
191  // Null field. Consume the "\N" and the field separator.
192  offset += 3;
193  assert(offset <= (line_size + 1));
194  assert(lp[offset - 1] == '\t');
195  // Return a null value. There's nothing to write into m_row.
196  return {offset, write, {}};
197  }
198 
199  // Beginning of the field text in the row buffer.
200  char const *const field_begin{write};
201 
202  // We're relying on several assumptions just for making the main loop
203  // condition work:
204  // * The COPY line ends in a newline.
205  // * Multibyte characters never start with an ASCII-range byte.
206  // * We can index a view beyond its bounds (but within its address space).
207  //
208  // Effectively, the newline acts as a final field separator.
209  while (lp[offset] != '\t')
210  {
211  assert(lp[offset] != '\0');
212 
213  // Beginning of the next character of interest (or the end of the line).
214  auto const stop_char{m_char_finder(line, offset)};
215  PQXX_ASSUME(stop_char > offset);
216  assert(stop_char < (line_size + 1));
217 
218  // Copy the text we have so far. It's got no special characters in it.
219  std::memcpy(write, &lp[offset], stop_char - offset);
220  write += (stop_char - offset);
221  offset = stop_char;
222 
223  // We're still within the line.
224  char const special{lp[offset]};
225  if (special == '\\')
226  {
227  // Escape sequence.
228  // Consume the backslash.
229  ++offset;
230  assert(offset < line_size);
231 
232  // The database will only escape ASCII characters, so we assume that
233  // we're dealing with a single-byte character.
234  char const escaped{lp[offset]};
235  assert((escaped >> 7) == 0);
236  ++offset;
237  *write++ = unescape_char(escaped);
238  }
239  else
240  {
241  // Field separator. Fall out of the loop.
242  assert(special == '\t');
243  }
244  }
245 
246  // Hit the end of the field.
247  assert(lp[offset] == '\t');
248  *write = '\0';
249  ++write;
250  ++offset;
251  return {offset, write, {field_begin, write - field_begin - 1}};
252  }
253 
255 
268  template<typename TARGET>
269  TARGET parse_field(zview line, std::size_t &offset, char *&write)
270  {
271  using field_type = strip_t<TARGET>;
272  using nullity = nullness<field_type>;
273 
274  assert(offset <= std::size(line));
275 
276  auto [new_offset, new_write, text]{read_field(line, offset, write)};
277  PQXX_ASSUME(new_offset > offset);
278  PQXX_ASSUME(new_write >= write);
279  offset = new_offset;
280  write = new_write;
281  if constexpr (nullity::always_null)
282  {
283  if (std::data(text) != nullptr)
284  throw conversion_error{concat(
285  "Streaming a non-null value into a ", type_name<field_type>,
286  ", which must always be null.")};
287  }
288  else if (std::data(text) == nullptr)
289  {
290  if constexpr (nullity::has_null)
291  return nullity::null();
292  else
293  internal::throw_null_conversion(type_name<field_type>);
294  }
295  else
296  {
297  // Don't ever try to convert a non-null value to nullptr_t!
298  return from_string<field_type>(text);
299  }
300  }
301 
303  void close() noexcept
304  {
305  if (not done())
306  {
307  m_char_finder = nullptr;
308  unregister_me();
309  }
310  }
311 
313 
317  char_finder_func *m_char_finder;
318 
320 
324  std::string m_row;
325 };
326 } // namespace pqxx::internal
327 #endif
std::string concat(TYPE...item)
Efficiently combine a bunch of items into one big string.
Definition: concat.hxx:31
Marker-type wrapper: zero-terminated std::string_view.
Definition: zview.hxx:37
auto begin()&
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:150
Internal items for libpqxx' own use. Do not use these yourself.
Definition: encodings.cxx:32
The end() iterator for a stream_query.
Definition: stream_query.hxx:46
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:28
auto end() const &
End iterator. Only for use by "range for.".
Definition: stream_query.hxx:115
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:79
std::tuple< TYPE... > parse_line(zview line)&
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:118
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:106
constexpr char unescape_char(char escaped) noexcept
Return original byte for escaped character.
Definition: util.hxx:633
Build a parameter list for a parameterised or prepared statement.
Definition: params.hxx:32
Value conversion failed, e.g. when converting "Hello" to int.
Definition: except.hxx:282
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
The home of all libpqxx classes, functions, templates, etc.
Definition: array.cxx:26
std::pair< line_handle, std::size_t > read_line()&
Read a COPY line from the server.
Definition: stream_query_impl.hxx:158
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition: encoding_group.hxx:71
Traits describing a type's "null value," if any.
Definition: strconv.hxx:90
void throw_null_conversion(std::string const &type)
Throw exception for attempt to convert SQL NULL to given type.
Definition: strconv.cxx:256
std::remove_cv_t< std::remove_reference_t< TYPE >> strip_t
Remove any constness, volatile, and reference-ness from a type.
Definition: types.hxx:80
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:150