2022-09-16

include <iostream>

include <string>

include <queue>

include <thread>

include <chrono>

include <limits.h>

include <condition_variable>

include <fstream>

include "json/json.h"

using namespace std;
using namespace chrono;

// couries information
struct Courier {
int64_t id;
int64_t arriveTime;
bool operator < (const Courier &co) const {
return arriveTime < co.arriveTime;
}
bool operator > (const Courier &co) const {
return arriveTime > co.arriveTime;
}
};

// order information
struct Order{
string id;
string name;
int prepareTime; // second
int64_t dispatchTime;
int64_t finishTime; // finish finishTime

Order() {
    prepareTime = 0;
    finishTime = 0;
    id = "";
    name = "";
}
Order(Order *order) {
    this->id = order->id;
    this->name = order->name;
    this->prepareTime = order->prepareTime;
    this->finishTime = order->finishTime;
}
bool operator < (const Order &od) const
{
    return finishTime < od.finishTime; 
}
bool operator > (const Order &od) const
{
    return finishTime > od.finishTime;
}

};

template<typename NODE>
class MyQueue {
public:
virtual int Init(int cap) = 0;
virtual void Deinit() = 0;
virtual int Push(NODE order) = 0;
virtual NODE GetFrontAndPop(void) = 0;
virtual bool IsFull(void) = 0;
virtual bool IsEmpty(void) = 0;
};
template<typename NODE>
class Queue : public MyQueue<NODE> {
public:
Queue() {
this->capicity = INT_MAX;
}
int Init(int cap)
{
this->capicity = cap;
pthread_mutex_init(&(this->mutex), NULL);
}
void Deinit()
{
// lock , free order
pthread_mutex_lock(&(this->mutex));
while (!this->que.empty()) {
this->que.pop();
}

    pthread_mutex_unlock(&(this->mutex));

    //
    pthread_mutex_destroy(&(this->mutex));
    return;
}

int Push(NODE order)
{
    //cout<<"input order id: "<<order.id<<endl;
    if (this->que.size() >= this->capicity) {
        return -1;
    }
    this->que.push(order);
    return 0;
}
bool IsFull() {
    return this->que.size() == this->capicity;
}
bool IsEmpty() {
    return this->que.empty();
}
NODE GetFrontAndPop() {
    while (this->que.empty()) {
        // error
        std::this_thread::sleep_for(1000ms);
        cout<<"que is empty\n";
    }
    NODE order = this->que.front();
    this->que.pop();
    return order;
}

private:
int capicity;
queue<NODE> que;
pthread_mutex_t mutex;
};
template<typename NODE>
class ProQueue : public MyQueue<NODE> {
public:
ProQueue() {
this->capicity = INT_MAX;
}
int Init(int cap)
{
this->capicity = cap;
pthread_mutex_init(&(this->mutex), NULL);
}
void Deinit()
{
// lock , free order
pthread_mutex_lock(&(this->mutex));
while (!this->que.empty()) {
this->que.pop();
}

    pthread_mutex_unlock(&(this->mutex));

    //
    pthread_mutex_destroy(&(this->mutex));
    return;
}

int Push(NODE order)
{
    if (this->que.size() >= this->capicity) {
        return -1;
    }
    this->que.push(order);
    return 0;
}
bool IsFull() {
    return this->que.size() == this->capicity;
}
bool IsEmpty() {
    return this->que.empty();
}
NODE GetFrontAndPop() {
    while (this->que.empty()) {
        // error
        std::this_thread::sleep_for(1000ms);
        cout<<"que is empty\n";
    }
    NODE order = this->que.top();
    this->que.pop();
    return order;
}

private:
int capicity;
priority_queue<NODE, vector<NODE>, greater<NODE>> que;
pthread_mutex_t mutex;
};

class Input {
public:
int Product(MyQueue<Order> *myque, void *args)
{
Order order = (Order)args;
Order od(order);
int ret = myque->Push(order);
return ret;
}
};

//typedef (int*)Rand(int min, int max);

typedef int (*RANDRANGE)(int min, int max);

// rand from min to max, [min, max]
static int Randrange(int min, int max)
{
if (max <= min) {
return -1;
}
return min + rand() % (max - min);
}

class Dispatch {
public:
Dispatch() {
range = nullptr;
}
virtual int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers) = 0;
int RegistRand(RANDRANGE rg) {
range = rg;
}
int CariesArrivedTime(int min, int max)
{
if (range != nullptr) {
return range(min, max);
}
return Randrange(min, max);
}
void AddFoodWait(int prepareTime)
{
this->foodwait += prepareTime;
}
int64_t GetFoodWait()
{
return foodwait;
}
void AddCariesWait(int wait)
{
this->carieswait += wait;
}
int64_t GetCariesWait()
{
return carieswait;
}
void AddOrders() {
orders++;
}
int64_t GetOrders() {
return orders;
}
int64_t GetAvgFoodWaitTime() {
if (orders == 0) {
return 0;
}
return foodwait / orders;
}
int64_t GetAvgCariesWaitTime() {
if (orders == 0) {
return 0;
}
return carieswait / orders;
}
private:
RANDRANGE range;
int64_t foodwait;
int64_t carieswait;
int64_t orders;
};

class Matched : public Dispatch
{
public:
int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers)
{
Queue<Order> *q = (Queue<Order> *)que;
Queue<Courier> *cou = (Queue<Courier> *)couriers;
if (cou->IsEmpty()) {
cout<<"couries is empty!\n";
return -3;
}
Order order = q->GetFrontAndPop();
Courier courier = cou->GetFrontAndPop();
if (order.finishTime >= courier.arriveTime) {
AddCariesWait(order.finishTime - courier.arriveTime);
} else {
AddFoodWait(courier.arriveTime - order.finishTime);
}
AddOrders();
// match : 不需要等待腥泥,可以派送,算出來

    cout<<"Match Dispatch "<<order.name<<" "<<order.finishTime<<" couries arrived time: "<<courier.arriveTime<<endl;

}

};

class FIFO : public Dispatch
{
public:
int DispatchOrder(MyQueue<Order> *que, MyQueue<Courier> *couriers)
{
ProQueue<Order> q = (ProQueue<Order>)que;
Order order = q->GetFrontAndPop();
ProQueue<Courier> cou = (ProQueue<Courier>)couriers;
Courier cour = cou->GetFrontAndPop();
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (now >= order.finishTime && now >= cour.arriveTime) {
if (order.finishTime >= cour.arriveTime) {
AddCariesWait(order.finishTime - cour.arriveTime);
} else {
AddFoodWait(cour.arriveTime - order.finishTime);
}
AddOrders();
cout<<"FIFO Dispatch "<<order.name<<" "<<order.prepareTime<<" "<<order.finishTime<<" "<<now<<endl;
return 0;
}

    q->Push(order);
    cou->Push(cour);
    return -2;
}

};

class Notify {
public:
int FilePath(const char *path) {

}
void Print(string s) {
    cout<<s<<endl;
}
void Event(string s) {

}
void Log(string) {

}

};

enum STRATEGIE {
STRATEGIE_MATCH = 0x01, // match strategie
STRATEGIE_FIFO // fifo strategie
};

struct Config {
STRATEGIE st;
int32_t cap;
};

class Server{
private:
std::condition_variable full;
std::condition_variable emp;
std::mutex mtx; // 保護隊列
int64_t capicity;
int64_t nums; // 隊列中的數(shù)量
int64_t emptys; // 空閑數(shù)量
int stragegie; // 策略
Dispatch *dispatch;
MyQueue<Order> *que;
MyQueue<Courier> *couriers;
Notify notify;
bool dispatching;

public:
int InitServer(Config *conf);
void DeInitServer();

int ReadData(const char* file);

int DispatchOrder();
int Statistics();

};

int Server::InitServer(Config *conf)
{
// default conf
if (conf == NULL) {
stragegie = STRATEGIE_FIFO;
capicity = 10;
} else {
stragegie = conf->st;
capicity = conf->cap;
}

if (stragegie == STRATEGIE_MATCH) {
    Matched *match = new Matched();
    dispatch = (Dispatch*)match;
    Queue<Order> *que = new Queue<Order>();
    que->Init(capicity);
    this->que = (Queue<Order>*)que;
    this->couriers = new Queue<Courier>();
} else if (stragegie == STRATEGIE_FIFO) {
    FIFO *fifo = new FIFO();
    dispatch = (Dispatch*)fifo;
    ProQueue<Order> *que = new ProQueue<Order>();
    que->Init(capicity);
    this->que = (ProQueue<Order>*)que;
    this->couriers = new ProQueue<Courier>();
}
dispatching = true;
return 0;

}

int Server::ReadData(const char* filename)
{
Json::Reader reader;
Json::Value root;

std::ifstream is;

is.open(filename, std::ios::binary);
int64_t cid = 100;
if (reader.parse(is, root, false)) {
    int size = root.size();
    cout<<"size "<<size<<endl;
    for (int i = 0; i < size;) {
        if (i % 2 == 1) {
            std::this_thread::sleep_for(2000ms);
        }
        std::unique_lock<std::mutex> lock(mtx);
        this->emp.wait(lock, [this]{return !que->IsFull();});
        // 不足兩個空間怎么辦啃匿?
        int j = 0;
        for (; j < 2 && i + j < size;) {
            Order order;
            Courier courier;
            courier.id = cid++;
            Json::Value tmp = root[i + j];
            order.id = tmp["id"].asString();
            order.name = tmp["name"].asString();
            order.prepareTime = tmp["prepTime"].asInt() * 1000;
            order.dispatchTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
            order.finishTime = order.dispatchTime + order.prepareTime;
            courier.arriveTime = order.dispatchTime + dispatch->CariesArrivedTime(3, 15) * 1000;
            que->Push(order);
            notify.Print("order recived: " + order.id +"time "+ to_string(order.dispatchTime));
            couriers->Push(courier);
            notify.Print("courier dispatched: " + to_string(courier.id) + "time " + to_string(order.dispatchTime));
            //notify.Print("push " + order.name + " " + to_string(order.finishTime));
            j++;
        }
        i += j;
        emp.notify_all();
    }
    while (true) {
        std::this_thread::sleep_for(2000ms);
        std::unique_lock<std::mutex> lock(mtx);
        this->emp.wait(lock, [this]{return que->IsEmpty();});
        this->Statistics();
        cout<<" que is empty()\n";
        this->dispatching = false;
        break;
    }

} else {
    cout<<"open file "<<filename<<" failed\n";
}
cout<<"close file \n";
is.close();

return 0;

}
/*
int Server::ReadData(const char* file)
{
auto start = system_clock::now();
auto end = system_clock::now();

// fen duan duqu?
static int times = 0;
while (times < 20) {
    std::this_thread::sleep_for(2000ms);
    std::unique_lock<std::mutex> lock(mtx);

    this->emp.wait(lock, [this]{return !que->IsFull();});

    static int64_t id = 0;
    for (int i = 0; i < 2; i++) {
        Order order;
        order.name = "rndNO" + to_string(id) + "*";
        order.id = to_string(id);
        id++;
        order.prepareTime = rand() % 15000; // 0-15s
        order.dispatchTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
        order.finishTime = order.dispatchTime + order.prepareTime;
        que->Push(order);
        Courier courier;
        courier.arriveTime = order.dispatchTime + dispatch->CariesArrivedTime(3, 15) * 1000;
        if(couriers->Push(courier) != 0) {
            cout<<"couriers push error\n";
        }
        notify.Print("push " + order.name + " " + to_string(order.finishTime));
    }
    times++;
    if (times >= 20) {
        int64_t fw = dispatch->GetAvgFoodWaitTime();
        int64_t cw = dispatch->GetAvgCariesWaitTime();
        cout.flush();
        notify.Print("food wait time: " + to_string(fw) + "ms couries wait time: " + to_string(cw));
        //std::quick_exit();
        exit(0);
    }
    emp.notify_all();
}

}*/

int Server::Statistics()
{
int64_t fw = dispatch->GetAvgFoodWaitTime();
int64_t cw = dispatch->GetAvgCariesWaitTime();
cout.flush();
notify.Print("food wait time: " + to_string(fw) + "ms couries wait time: " + to_string(cw));
return 0;
}

int Server::DispatchOrder()
{
while (this->dispatching) {
std::this_thread::sleep_for(100ms);
std::unique_lock<std::mutex> lock(mtx);
this->emp.wait(lock, [this]{return !que->IsEmpty();});
if (dispatch->DispatchOrder(que, couriers) == -2) { // order not prepared, couries not arrived

    }
    emp.notify_all();
}

}

int ProductOrder(Server* server, const char* file)
{
pthread_setname_np(pthread_self(), "product");
return server->ReadData(file);
}

int DispatchOrder(Server* server)
{
pthread_setname_np(pthread_self(), "dispatch");
return server->DispatchOrder();
}

int main()
{
/* Queue myque;
myque.Init(10);
Input input;
//myque.Init(100);
Matched match;

thread read(Read, "/home/err", &myque);
thread disp(DispatchMeth, &myque, &match);
read.join();
disp.join(); */

Server server;
server.InitServer(NULL);

thread read(ProductOrder, &server, "dispatch_orders.json");

thread disp(DispatchOrder, &server);
//thread disp1(DispatchOrder, &server);
read.join();
disp.join();
//disp1.join();
server.Statistics();

}

cmake_minimum_required(VERSION 3.16)
project(SERVER)

include_directories(
    ${CMAKE_SOURCE_DIR}/include
)

link_directories(
    ${CMAKE_SOURCE_DIR}/lib
)
aux_source_directory(. DIR_SRCS)


add_executable(Server ${DIR_SRCS})

target_link_libraries(Server PUBLIC -lpthread -ljsoncpp)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蛔外,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子溯乒,更是在濱河造成了極大的恐慌夹厌,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裆悄,死亡現(xiàn)場離奇詭異矛纹,居然都是意外死亡,警方通過查閱死者的電腦和手機光稼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門或南,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人艾君,你說我怎么就攤上這事迎献。” “怎么了腻贰?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵吁恍,是天一觀的道長。 經(jīng)常有香客問我播演,道長冀瓦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任写烤,我火速辦了婚禮翼闽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘洲炊。我一直安慰自己感局,他們只是感情好,可當我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布暂衡。 她就那樣靜靜地躺著询微,像睡著了一般。 火紅的嫁衣襯著肌膚如雪狂巢。 梳的紋絲不亂的頭發(fā)上撑毛,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天,我揣著相機與錄音唧领,去河邊找鬼藻雌。 笑死雌续,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的胯杭。 我是一名探鬼主播驯杜,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼做个!你這毒婦竟也來了艇肴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤叁温,失蹤者是張志新(化名)和其女友劉穎再悼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體膝但,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡冲九,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了跟束。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片莺奸。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖冀宴,靈堂內(nèi)的尸體忽然破棺而出灭贷,到底是詐尸還是另有隱情,我是刑警寧澤略贮,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布甚疟,位于F島的核電站,受9級特大地震影響逃延,放射性物質(zhì)發(fā)生泄漏览妖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一揽祥、第九天 我趴在偏房一處隱蔽的房頂上張望讽膏。 院中可真熱鬧,春花似錦拄丰、人聲如沸府树。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奄侠。三九已至,卻和暖如春站绪,著一層夾襖步出監(jiān)牢的瞬間遭铺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工恢准, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留魂挂,地道東北人。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓馁筐,卻偏偏與公主長得像涂召,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子敏沉,可洞房花燭夜當晚...
    茶點故事閱讀 45,446評論 2 359