1 year ago

#309173

test-img

eleftheria15

Chrono library multithreading time derivation limitations?

I am trying to solve the problem with a time derivation in a multithreaded setup. I have 3 threads, all pinned to different cores. The first two threads (reader_threads.cc) run in the infinite while loop inside the run() function. They finish their execution and send the current time window they are into the third thread.

The current time window is calculated based on the value from chrono time / Ti

The third thread is running at its own pace, and it's checking only the request when the flag has been raised, which is also sent via Message to the third thread.

I was able to get the desired behavior of all three threads in the same epoch if one epoch is at least 20000us. In the results, you can find more info.

Reader threads

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>

#include "control_thread.h"

#define INTERNAL_THREAD 

#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#endif

using namespace std;
    
atomic<bool> thread_active[2];
atomic<bool> go;

pthread_barrier_t barrier;

template <typename T> 
void send(Message volatile * m, unsigned int epoch, bool flag) {
  for (int i = 0 ; i < sizeof(T); i++){
    m->epoch = epoch;
    m->flag = flag;
  }
}

ControlThread * ct;

// Main run for threads
void run(unsigned int threadID){

    // Put message into incoming buffer
    Message volatile * m1 = &(ct->incoming_requests[threadID - 1]);

    thread_active[threadID] = true;
    std::atomic<bool> flag;

    // this thread is done initializing stuff 
    thread_active[threadID] = true;

    while (!go);

    while(true){

        using namespace std::chrono;

        // Get current time with precision of microseconds
        auto now = time_point_cast<microseconds>(steady_clock::now());
        // sys_microseconds is type time_point<system_clock, microseconds>
        using sys_microseconds = decltype(now);
        // Convert time_point to signed integral type
        auto duration = now.time_since_epoch();
        // Convert signed integral type to time_point
        sys_microseconds dt{microseconds{duration}};

        // test
        if (dt != now){
          std::cout << "Failure." << std::endl;
        }else{
          // std::cout << "Success." << std::endl;
        }
        
        auto epoch = duration / Ti;

        pthread_barrier_wait(&barrier);

        flag = true;
        // send current time to the control thread
        send<int>(m1, epoch, flag);

        auto current_position = duration % Ti;

        std::chrono::duration<double, micro> multi_thread_sleep = chrono::microseconds(Ti) - chrono::microseconds(current_position);

        if(multi_thread_sleep > chrono::microseconds::zero()){
          this_thread::sleep_for(multi_thread_sleep);
        }
    }
}

int threads_num = 3; 

void server() {    

      // Don't start control thread until reader threds finish init
      for (int i=1; i < threads_num; i++){
        while (!thread_active[i]);
      }

      go = true;
    
      while (go) {
        for (int i = 0; i < threads_num; i++) {
          ct->current_requests(i);
        }

        // Arbitrary sleep to ensure that locking is accurate
        std::this_thread::sleep_for(50us);

      }
    }
    
    class Thread {
    
    public:
    #if defined INTERNAL_THREAD
      thread execution_handle;
    #endif
      unsigned int id; 
      Thread(unsigned int i) : id(i) {}
    };
    
    
    void init(){
      ct = new ControlThread();
    }
    
    int main (int argc, char * argv[]){
    
      Thread * r[4];
      pthread_barrier_init(&barrier, NULL, 2);

      init();
    
      /* start threads
       *================*/
    
      for (unsigned int i = 0; i < threads_num; i++) {
        r[i] = new Thread(i);
      
    #if defined INTERNAL_THREAD
        if(i==0){
          r[0]->execution_handle = std::thread([] {server();});
        }else if(i == 1){
          r[i]->execution_handle = std::thread([i] {run(i);});
        }else if(i == 2){
          r[i]->execution_handle = std::thread([i] {run(i);});
        }
    
        /* pin to core i */
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(i, &cpuset);
        int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
    
    
    #endif
      }

    // wait for threads to end
    
    for (unsigned int i = 0; i < threads_num + 1; i++) {
      #if defined INTERNAL_THREAD    
        r[i]->execution_handle.join();
      #endif
      }
      pthread_barrier_destroy(&barrier);
      return 0;
    }

Control Thread

#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__

// Global vars
const auto Ti = std::chrono::microseconds(15000);
std::mutex m;
int count;

class Message{
  public:
  std::atomic<bool> flag;
  unsigned long epoch;
};


class ControlThread {
public:

  /* rw individual threads */
  Message volatile incoming_requests[4];

  void current_requests(unsigned long current_thread) {
    
    using namespace std::chrono;

    auto now = time_point_cast<microseconds>(steady_clock::now());
    // sys_milliseconds is type time_point<system_clock, milliseconds>
    using sys_microseconds = decltype(now);
    // Convert time_point to signed integral type
    auto time = now.time_since_epoch();
    // Convert signed integral type to time_point
    sys_microseconds dt{microseconds{time}};
    
    // test
    if (dt != now){
      std::cout << "Failure." << std::endl;
    }else{
      // std::cout << "Success." << std::endl;
    }

    long contol_thread_epoch = time / Ti;

      // Only check request when flag is raised
      if(incoming_requests[current_thread].flag){

        m.lock();

        incoming_requests[current_thread].flag = false;

        m.unlock();

        // If reader thread epoch and control thread matches
        if(incoming_requests[current_thread].epoch == contol_thread_epoch){

        // printf("Successful desired behaviour\n");

        }else{
          count++;
          if(count > 0){
            printf("Missed %d\n", count);
          }
        }
      }
    }
  };
#endif 

RUN

g++ -std=c++2a -pthread -lrt -lm -lcrypt reader_threads.cc -o run
sudo ./run 

Results

The following missed epochs are with one loop iteration (single Ti) equal to 1000us. Also, by increasing Ti, the less number of epochs have been skipped. Finally, if Ti is set to the 20000 us , no skipped epochs are detected. Does anyone have an idea whether I am making a mistake in casting or in communication between threads? Why the threads are not in sync if epoch is i.e. 5000us?

Missed 1
Missed 2
Missed 3
Missed 4
Missed 5
Missed 6
Missed 7
Missed 8
Missed 9
Missed 10
Missed 11
Missed 12
Missed 13
Missed 14
Missed 15
Missed 16 

c++

multithreading

real-time

c++-chrono

0 Answers

Your Answer

Accepted video resources