MirrorYuChen
MirrorYuChen
Published on 2025-03-29 / 15 Visits
1
0

workflow学习笔记(三):任务流

workflow学习笔记(三):任务流

​ 任务流就是实际业务逻辑,将协议和算法放到流程图中串起来,典型任务流是一个闭合的串并联图,复杂的业务逻辑可能是一个非闭合的有向无环图(DAG),在workflow的世界里有三个原则:

  • 串行是由任务组成的;
  • 并行是由串行组成的;
  • 并行是一种任务;

1.简单Http请求

/*
 * @Author: chenjingyu
 * @Date: 2025-03-28 22:15:58
 * @Contact: 2458006466@qq.com
 * @Description: test.cc
 */
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <signal.h>
#include <string>

#define REDIRECT_MAX 4
#define RETRY_MAX 2

void http_callback(WFHttpTask *task) {
  auto *resp = task->get_resp();
  std::cout << "Http status: " << resp->get_status_code();
  
  const void *body;
  size_t body_len;
  resp->get_parsed_body(&body, &body_len);

  FILE *fp = fopen("resp.txt", "w");
  fwrite(body, 1, body_len, fp);
  fclose(fp);

  std::cout << "Write file done." << std::endl;
}

static WFFacilities::WaitGroup wg(1);

// 信号处理函数,用于停止程序
void sig_handler(int signo) {
  wg.done();
}

int main(int argc, char *argv[]) {
  // 1.等待ctrl+c信号
  signal(SIGINT, sig_handler);
  std::string url = "http://www.baidu.com";

  // 2.创建http任务
  WFHttpTask *task = WFTaskFactory::create_http_task(
    url, REDIRECT_MAX, RETRY_MAX, http_callback
  );
  
  // 3.启动任务
  task->start();

  // 4.等待任务结束
  wg.wait();

  return 0;
}

​ CMakeLists.txt文件内容如下:

cmake_minimum_required(VERSION 3.1...3.26)

project(Server LANGUAGES C CXX)

find_library(LIBRT rt)
find_package(OpenSSL REQUIRED)
find_package(workflow REQUIRED CONFIG HINTS ..)

include_directories(${OPENSSL_INCLUDE_DIR} ${WORKFLOW_INCLUDE_DIR})
link_directories(${WORKFLOW_LIB_DIR})

set(CMAKE_C_FLAGS   "${CMAKE_C_FLAGS}   -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")

set(TEST_LIST
  test
)

set(WORKFLOW_LIB workflow pthread OpenSSL::SSL OpenSSL::Crypto ${LIBRT})

foreach(test ${TEST_LIST})
	add_executable(${test} ${test}.cc)
	target_link_libraries(${test} ${WORKFLOW_LIB})
endforeach()

​ 编译运行后,ctrl+c停止程序,可以查到 resp.txt内容。

3.串行任务流

​ workflow中有一个 SeriesWork类用于执行串行任务流,创建完 SeriesWork实例后,只需要通过 push_back接口,将需要串行执行的任务给添加进去,就可以让需要串行执行的任务依次进行执行。

/*
 * @Author: chenjingyu
 * @Date: 2025-03-28 22:15:58
 * @Contact: 2458006466@qq.com
 * @Description: test
 */
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <signal.h>
#include <string>

#define REDIRECT_MAX 4
#define RETRY_MAX 2

void http_callback(WFHttpTask *task) {
  auto *resp = task->get_resp();
  auto *req = task->get_req();
  std::cout << "req uri: " << req->get_request_uri() << " resp status: " << resp->get_status_code() << std::endl;
}

WFHttpTask *create_http_task(const std::string &url) {
  return WFTaskFactory::create_http_task(
    url, REDIRECT_MAX, RETRY_MAX, http_callback
  );
}

int main(int argc, char *argv[]) {
  std::string url = "http://www.baidu.com";
  auto *first_task = create_http_task(url);
  WFFacilities::WaitGroup wg(1);
  auto series_callback = [&wg](const SeriesWork *series) {
    std::cout << "All Task Finished." << std::endl;
    wg.done();
  };
  // 创建SeriesWork
  auto *series = Workflow::create_series_work(first_task, series_callback);
  series->push_back(create_http_task("http://www.biying.com"));
  series->push_back(create_http_task("http://www.sougou.com"));
  series->start();

  wg.wait();

  return 0;
}

​ 编译运行:

>> ./series    
req uri: / resp status: 200
req uri: / resp status: 200
req uri: / resp status: 403
All Task Finished.

3.并行任务流

​ workflow中有一个 ParallelWork类用于执行并行任务流,创建完实例后,将需要并行执行的任务对应串行流通过 add_series接口添加进去,进行并行执行。

/*
 * @Author: chenjingyu
 * @Date: 2025-03-28 22:15:58
 * @Contact: 2458006466@qq.com
 * @Description: test
 */
#include <iostream>
#include <workflow/Workflow.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <string>
#include <vector>

#define REDIRECT_MAX 4
#define RETRY_MAX 2

void http_callback(WFHttpTask *task) {
  auto *resp = task->get_resp();
  auto *req = task->get_req();
  std::cout << "req uri: " << req->get_request_uri() << " resp status: " << resp->get_status_code() << std::endl;
}

WFHttpTask *create_http_task(const std::string &url) {
  return WFTaskFactory::create_http_task(
    url, REDIRECT_MAX, RETRY_MAX, http_callback
  );
}

int main(int argc, char *argv[]) {
  std::vector<std::string> urls = {
    "http://www.baidu.com",
    "http://www.bing.com",
    "http://www.sougou.com"
  };

  WFFacilities::WaitGroup wg(1);
  // 创建Parallel任务
  auto *parallel = Workflow::create_parallel_work([&wg](const ParallelTask *task) {
    wg.done();
  });

  for (const auto &url : urls) {
    auto *task = create_http_task(url);
    SeriesWork *series = Workflow::create_series_work(task, nullptr);
    parallel->add_series(series);
  }
  parallel->start();

  wg.wait();

  return 0;
}

​ 编译运行:

>> ./parallel 
req uri: / resp status: 403
req uri: / resp status: 200
req uri: / resp status: 200

4.参考资料


Comment