1 year ago
#309173
eleftheria15
Chrono library multithreading time derivation limitations?
I am trying to solve the problem with a time derivation in a multithreaded setup. I have 3 threads, all pinned to different cores. The first two threads (reader_threads.cc) run in the infinite while loop inside the run()
function. They finish their execution and send the current time window they are into the third thread.
The current time window is calculated based on the value from chrono time
/ Ti
The third thread is running at its own pace, and it's checking only the request when the flag has been raised, which is also sent via Message
to the third thread.
I was able to get the desired behavior of all three threads in the same epoch if one epoch is at least 20000us. In the results, you can find more info.
Reader threads
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>
#include "control_thread.h"
#define INTERNAL_THREAD
#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#endif
using namespace std;
atomic<bool> thread_active[2];
atomic<bool> go;
pthread_barrier_t barrier;
template <typename T>
void send(Message volatile * m, unsigned int epoch, bool flag) {
for (int i = 0 ; i < sizeof(T); i++){
m->epoch = epoch;
m->flag = flag;
}
}
ControlThread * ct;
// Main run for threads
void run(unsigned int threadID){
// Put message into incoming buffer
Message volatile * m1 = &(ct->incoming_requests[threadID - 1]);
thread_active[threadID] = true;
std::atomic<bool> flag;
// this thread is done initializing stuff
thread_active[threadID] = true;
while (!go);
while(true){
using namespace std::chrono;
// Get current time with precision of microseconds
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_microseconds is type time_point<system_clock, microseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto duration = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{duration}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
auto epoch = duration / Ti;
pthread_barrier_wait(&barrier);
flag = true;
// send current time to the control thread
send<int>(m1, epoch, flag);
auto current_position = duration % Ti;
std::chrono::duration<double, micro> multi_thread_sleep = chrono::microseconds(Ti) - chrono::microseconds(current_position);
if(multi_thread_sleep > chrono::microseconds::zero()){
this_thread::sleep_for(multi_thread_sleep);
}
}
}
int threads_num = 3;
void server() {
// Don't start control thread until reader threds finish init
for (int i=1; i < threads_num; i++){
while (!thread_active[i]);
}
go = true;
while (go) {
for (int i = 0; i < threads_num; i++) {
ct->current_requests(i);
}
// Arbitrary sleep to ensure that locking is accurate
std::this_thread::sleep_for(50us);
}
}
class Thread {
public:
#if defined INTERNAL_THREAD
thread execution_handle;
#endif
unsigned int id;
Thread(unsigned int i) : id(i) {}
};
void init(){
ct = new ControlThread();
}
int main (int argc, char * argv[]){
Thread * r[4];
pthread_barrier_init(&barrier, NULL, 2);
init();
/* start threads
*================*/
for (unsigned int i = 0; i < threads_num; i++) {
r[i] = new Thread(i);
#if defined INTERNAL_THREAD
if(i==0){
r[0]->execution_handle = std::thread([] {server();});
}else if(i == 1){
r[i]->execution_handle = std::thread([i] {run(i);});
}else if(i == 2){
r[i]->execution_handle = std::thread([i] {run(i);});
}
/* pin to core i */
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
#endif
}
// wait for threads to end
for (unsigned int i = 0; i < threads_num + 1; i++) {
#if defined INTERNAL_THREAD
r[i]->execution_handle.join();
#endif
}
pthread_barrier_destroy(&barrier);
return 0;
}
Control Thread
#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__
// Global vars
const auto Ti = std::chrono::microseconds(15000);
std::mutex m;
int count;
class Message{
public:
std::atomic<bool> flag;
unsigned long epoch;
};
class ControlThread {
public:
/* rw individual threads */
Message volatile incoming_requests[4];
void current_requests(unsigned long current_thread) {
using namespace std::chrono;
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_milliseconds is type time_point<system_clock, milliseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto time = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{time}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
long contol_thread_epoch = time / Ti;
// Only check request when flag is raised
if(incoming_requests[current_thread].flag){
m.lock();
incoming_requests[current_thread].flag = false;
m.unlock();
// If reader thread epoch and control thread matches
if(incoming_requests[current_thread].epoch == contol_thread_epoch){
// printf("Successful desired behaviour\n");
}else{
count++;
if(count > 0){
printf("Missed %d\n", count);
}
}
}
}
};
#endif
RUN
g++ -std=c++2a -pthread -lrt -lm -lcrypt reader_threads.cc -o run
sudo ./run
Results
The following missed epochs are with one loop iteration (single Ti) equal to 1000us. Also, by increasing Ti, the less number of epochs have been skipped. Finally, if Ti is set to the 20000 us , no skipped epochs are detected. Does anyone have an idea whether I am making a mistake in casting or in communication between threads? Why the threads are not in sync if epoch is i.e. 5000us?
Missed 1
Missed 2
Missed 3
Missed 4
Missed 5
Missed 6
Missed 7
Missed 8
Missed 9
Missed 10
Missed 11
Missed 12
Missed 13
Missed 14
Missed 15
Missed 16
c++
multithreading
real-time
c++-chrono
0 Answers
Your Answer