HepLib
Loading...
Searching...
No Matches
PoolPipe.cpp
Go to the documentation of this file.
1
6#include "PoolPipe.h"
7#include "unistd.h"
8#include <stdexcept>
9#include <iostream>
10#include <arpa/inet.h>
11
12namespace HepLib {
13
14 Pool::Pool(const std::vector<void*> & vec) {
15 for (auto item : vec) ava.push_back(item);
16 }
17
18 void* Pool::acquire() {
19 std::unique_lock<std::mutex> lock(mtx);
20 while (ava.empty()) cv.wait(lock);
21 void* obj = ava.back();
22 ava.pop_back();
23 return obj;
24 }
25
26 void Pool::release(void* obj) {
27 {
28 std::lock_guard<std::mutex> lock(mtx);
29 ava.push_back(obj);
30 }
31 cv.notify_one();
32 }
33
34 Pipe::Pipe(const std::function<std::string(const std::string &)> & fi) : f(fi) {
35 if (pipe(p2c)==-1 || pipe(c2p)==-1) {
36 perror("pipe");
37 exit(EXIT_FAILURE);
38 }
39
40 pid = fork();
41 if (pid == -1) {
42 perror("fork");
43 exit(EXIT_FAILURE);
44 }
45
46 if (pid==0) { // child process
47 close(p2c[1]); // close the write side
48 close(c2p[0]); // close the read side
49 char length_buffer[4];
50 uint32_t length;
51
52 while (true) {
53 ssize_t bytesRead = read(p2c[0], length_buffer, sizeof(length_buffer));
54 if (bytesRead <= 0) break;
55 length = ntohl(*(uint32_t*)length_buffer);
56
57 char* buffer = new char[length+1]; // +1 for '\0'
58 bytesRead = read(p2c[0], buffer, length);
59 buffer[bytesRead] = '\0';
60
61 std::string cmd(buffer);
62 delete[] buffer;
63
64 auto res = f(cmd); // cmd -> res
65
66 length = htonl(res.size());
67 write(c2p[1], &length, sizeof(length));
68 write(c2p[1], res.c_str(), res.size());
69 }
70
71 close(p2c[0]);
72 close(c2p[1]);
73 exit(EXIT_SUCCESS);
74 } else {
75 close(p2c[0]); // close the read
76 close(c2p[1]); // close the write
77 }
78 }
79
81 close(p2c[1]); // close the write
82 close(c2p[0]); // close the read
83 waitpid(pid, NULL, 0);
84 }
85
86 std::string Pipe::run(const std::string &code) {
87 uint32_t length = htonl(code.size());
88 write(p2c[1], &length, sizeof(length));
89 write(p2c[1], code.c_str(), code.size());
90
91 char length_buffer[4];
92 read(c2p[0], length_buffer, sizeof(length_buffer));
93 length = ntohl(*(uint32_t*)length_buffer);
94
95 char* buffer = new char[length+1]; // +1 for '\0'
96 ssize_t bytesRead = read(c2p[0], buffer, length);
97 buffer[bytesRead] = '\0';
98 std::string result(buffer);
99 delete[] buffer;
100
101 return result;
102 }
103
104 PipePool::PipePool(int size, const std::function<std::string(const std::string &)> & f) : pipe_ptr_vec(size) {
105 std::vector<void*> void_ptr_vec(size);
106 for(int i=0; i<size; i++) {
107 pipe_ptr_vec[i] = new Pipe(f);
108 void_ptr_vec[i] = pipe_ptr_vec[i];
109 }
110 pool = new Pool(void_ptr_vec);
111 }
112
114 for(int i=pipe_ptr_vec.size()-1; i>=0; i--) delete pipe_ptr_vec[i];
115 delete pool;
116 }
117
118 std::string PipePool::run(const std::string &code) {
119 auto worker = (Pipe*)(pool->acquire());
120 auto res = worker->run(code);
121 pool->release(worker);
122 return res;
123 }
124
125 std::vector<std::string> PipePool::run_all(const std::string &code) {
126 std::vector<std::string> res_vec(pipe_ptr_vec.size());
127 for(int i=0; i<res_vec.size(); i++) {
128 res_vec[i] = pipe_ptr_vec[i]->run(code);
129 }
130 return std::move(res_vec);
131 }
132
133}
PoolPipe header file.
std::string run(const std::string &code)
Definition PoolPipe.cpp:118
std::vector< std::string > run_all(const std::string &code)
Definition PoolPipe.cpp:125
PipePool(int size, const std::function< std::string(const std::string &)> &f)
Definition PoolPipe.cpp:104
Pipe(const std::function< std::string(const std::string &)> &fi)
Definition PoolPipe.cpp:34
std::string run(const std::string &code)
Definition PoolPipe.cpp:86
void * acquire()
Definition PoolPipe.cpp:18
void release(void *obj)
Definition PoolPipe.cpp:26
Pool(const std::vector< void * > &vec)
Definition PoolPipe.cpp:14
HepLib namespace.
Definition BASIC.cpp:17