workflow学习笔记(三):任务流
任务流就是实际业务逻辑,将协议和算法放到流程图中串起来,典型任务流是一个闭合的串并联图,复杂的业务逻辑可能是一个非闭合的有向无环图(DAG),在workflow的世界里有三个原则:
- 串行是由任务组成的;
- 并行是由串行组成的;
- 并行是一种任务;
1.简单Http请求
/*
* @Author: chenjingyu
* @Date: 2025-03-28 22:15:58
* @Contact: 2458006466@qq.com
* @Description: test.cc
*/
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <signal.h>
#include <string>
#define REDIRECT_MAX 4
#define RETRY_MAX 2
void http_callback(WFHttpTask *task) {
auto *resp = task->get_resp();
std::cout << "Http status: " << resp->get_status_code();
const void *body;
size_t body_len;
resp->get_parsed_body(&body, &body_len);
FILE *fp = fopen("resp.txt", "w");
fwrite(body, 1, body_len, fp);
fclose(fp);
std::cout << "Write file done." << std::endl;
}
static WFFacilities::WaitGroup wg(1);
// 信号处理函数,用于停止程序
void sig_handler(int signo) {
wg.done();
}
int main(int argc, char *argv[]) {
// 1.等待ctrl+c信号
signal(SIGINT, sig_handler);
std::string url = "http://www.baidu.com";
// 2.创建http任务
WFHttpTask *task = WFTaskFactory::create_http_task(
url, REDIRECT_MAX, RETRY_MAX, http_callback
);
// 3.启动任务
task->start();
// 4.等待任务结束
wg.wait();
return 0;
}
CMakeLists.txt
文件内容如下:
cmake_minimum_required(VERSION 3.1...3.26)
project(Server LANGUAGES C CXX)
find_library(LIBRT rt)
find_package(OpenSSL REQUIRED)
find_package(workflow REQUIRED CONFIG HINTS ..)
include_directories(${OPENSSL_INCLUDE_DIR} ${WORKFLOW_INCLUDE_DIR})
link_directories(${WORKFLOW_LIB_DIR})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")
set(TEST_LIST
test
)
set(WORKFLOW_LIB workflow pthread OpenSSL::SSL OpenSSL::Crypto ${LIBRT})
foreach(test ${TEST_LIST})
add_executable(${test} ${test}.cc)
target_link_libraries(${test} ${WORKFLOW_LIB})
endforeach()
编译运行后,ctrl+c
停止程序,可以查到 resp.txt
内容。
3.串行任务流
workflow中有一个 SeriesWork
类用于执行串行任务流,创建完 SeriesWork
实例后,只需要通过 push_back
接口,将需要串行执行的任务给添加进去,就可以让需要串行执行的任务依次进行执行。
/*
* @Author: chenjingyu
* @Date: 2025-03-28 22:15:58
* @Contact: 2458006466@qq.com
* @Description: test
*/
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <signal.h>
#include <string>
#define REDIRECT_MAX 4
#define RETRY_MAX 2
void http_callback(WFHttpTask *task) {
auto *resp = task->get_resp();
auto *req = task->get_req();
std::cout << "req uri: " << req->get_request_uri() << " resp status: " << resp->get_status_code() << std::endl;
}
WFHttpTask *create_http_task(const std::string &url) {
return WFTaskFactory::create_http_task(
url, REDIRECT_MAX, RETRY_MAX, http_callback
);
}
int main(int argc, char *argv[]) {
std::string url = "http://www.baidu.com";
auto *first_task = create_http_task(url);
WFFacilities::WaitGroup wg(1);
auto series_callback = [&wg](const SeriesWork *series) {
std::cout << "All Task Finished." << std::endl;
wg.done();
};
// 创建SeriesWork
auto *series = Workflow::create_series_work(first_task, series_callback);
series->push_back(create_http_task("http://www.biying.com"));
series->push_back(create_http_task("http://www.sougou.com"));
series->start();
wg.wait();
return 0;
}
编译运行:
>> ./series
req uri: / resp status: 200
req uri: / resp status: 200
req uri: / resp status: 403
All Task Finished.
3.并行任务流
workflow中有一个 ParallelWork
类用于执行并行任务流,创建完实例后,将需要并行执行的任务对应串行流通过 add_series
接口添加进去,进行并行执行。
/*
* @Author: chenjingyu
* @Date: 2025-03-28 22:15:58
* @Contact: 2458006466@qq.com
* @Description: test
*/
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <string>
#include <vector>
#define REDIRECT_MAX 4
#define RETRY_MAX 2
void http_callback(WFHttpTask *task) {
auto *resp = task->get_resp();
auto *req = task->get_req();
std::cout << "req uri: " << req->get_request_uri() << " resp status: " << resp->get_status_code() << std::endl;
}
WFHttpTask *create_http_task(const std::string &url) {
return WFTaskFactory::create_http_task(
url, REDIRECT_MAX, RETRY_MAX, http_callback
);
}
int main(int argc, char *argv[]) {
std::vector<std::string> urls = {
"http://www.baidu.com",
"http://www.bing.com",
"http://www.sougou.com"
};
WFFacilities::WaitGroup wg(1);
// 创建Parallel任务
auto *parallel = Workflow::create_parallel_work([&wg](const ParallelTask *task) {
wg.done();
});
for (const auto &url : urls) {
auto *task = create_http_task(url);
SeriesWork *series = Workflow::create_series_work(task, nullptr);
parallel->add_series(series);
}
parallel->start();
wg.wait();
return 0;
}
编译运行:
>> ./parallel
req uri: / resp status: 403
req uri: / resp status: 200
req uri: / resp status: 200