Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <chrono>
- #include <future>
- #include <ios>
- #include <memory>
- #include <optional>
- #include <string>
- #include <vector>
- namespace bench {
- using clock_t = std::chrono::system_clock;
- struct SensorSample
- {
- unsigned sensor_id;
- clock_t::time_point timestamp;
- std::string payload;
- };
- struct SensorState
- {
- std::vector<SensorSample> samples;
- std::ios::pos_type next_read_pos;
- bool eof;
- };
- class CsvFile
- {
- public:
- /**
- * Reads several sensor messages, if available
- *
- * Will be called from multiple threads, should parallelize well.
- * Batch size (number of messages) should be tuned for efficient inter-thread
- * communication. Maybe 64 kiB worth of raw data as a starting point.
- * But be careful: 64 kiB for 60,000 sensors is already 3.6 GiB
- */
- SensorState read_batch(unsigned sensor_id, std::ios::pos_type);
- };
- class SensorReader
- {
- std::shared_ptr<CsvFile> infile;
- unsigned id;
- SensorState current_state;
- std::vector<SensorSample>::iterator next_pos;
- std::future<SensorState> next_state;
- std::optional<SensorSample> next_slowpath();
- public:
- SensorReader(const std::shared_ptr<CsvFile>& infile, unsigned id,
- std::ios::pos_type first_read_pos)
- : infile(infile),
- id(id),
- current_state({{}, first_read_pos, false}),
- next_pos(current_state.samples.end())
- {}
- /**
- * Next sample for this sensor or empty result on EOF
- */
- std::optional<SensorSample> next()
- {
- if(next_pos != current_state.samples.end())
- return std::move(*(next_pos++));
- return next_slowpath();
- }
- };
- std::optional<SensorSample> SensorReader::next_slowpath()
- {
- if(current_state.eof) // no more samples for this sensor in file
- return {};
- /*
- * Normally, the next batch should have been queued up, except when
- * this is the first time we read
- */
- if(next_state.valid())
- current_state = next_state.get();
- else
- current_state = infile->read_batch(
- id, current_state.next_read_pos);
- next_pos = current_state.samples.begin();
- /*
- * Queue up prefetching the next batch
- *
- * Potential optimizations:
- * 1. If the batch contains a lot of dynamic memory allocations,
- * it might be worth moving the old state into the async function so
- * that it can be deallocated by another thread than the one doing the
- * sorting and merging
- * 2. A custom work queue with worker threads allows threads to have
- * more state persistent between calls. This can improve locality of
- * data if memory or other resources like file descriptors are
- * associated with read operations.
- * However, It won't help with per-sensor state since we don't want
- * 60k threads
- */
- if(! current_state.eof)
- next_state = std::async([infile=this->infile, id=this->id,
- read_pos=current_state.next_read_pos]() {
- return infile->read_batch(id, read_pos); });
- /*
- * Return result
- */
- if(next_pos != current_state.samples.end())
- return std::move(*(next_pos++));
- return {};
- }
- } /* namespace bench */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement