微信二维码

二维码 扫二维码马上关注
扫码咨询
动态DNS使用阿里云DNS API

        对于任务调度系统中的任务状态管理,数据库通常存储任务调度的过程和状态,并控制任务的锁。咨询锁是非常容易实现的一小部分任务。但是我们如何设计一个任务状态管理数据库来处理每小时数亿甚至数十亿个任务呢?在面向多个用户的任务调度平台(例如面向所有租户的云任务调度平台)中,一个主要的挑战是编写任务数据。更新任务状态(大量任务,每个任务至少更新一次)是另一个挑战。

 

一、大规模任务调度数据库设计

        云任务调度具有以下特点:

        1、用户的任务之间没有关系。在调度期间,可能依赖于单个用户的任务。

        2、它有大量的数据。

        3、任务处于最终稳定状态。任务稳定之后,它们的记录保持不变。

        示例PostgreSQL设计具有以下特性:

        1、生成任务数据后,我们将其写入任务处理表。

        2、我们可以通过旋转来设计任务处理表(例如,每小时旋转一个表)。由于数据处理完成后是直接清洗的,我们不需要对其进行真空处理。

        3、 在分区方面,任务处理表对用户级分区进行了采样,当我们得到一个要处理的任务时,这个分区更加细化(减少冗余扫描)。

        4、当任务达到最终状态时,从任务运行表中删除它,并将其写入历史表。

 

        早期的历史表从RDS PG中删除,并写入阿里巴巴云操作系统。我们可以通过RDS PG OSS外部表接口访问这些历史数据。


二、演示设计

        1.用于存储用户生成的任务的初始任务表。

create table task_init (    -- Initial task table  
  uid  int,        -- User ID  
  ptid serial8,    -- Parent task ID  
  tid serial,      -- Subtask ID    
  state int default 1,    -- Task state: 1 indicates the initial state; -1 indicates being processed; 0 indicates processing completed    
  retry int default -1,   -- Number of retries    
  info text,              -- Other information    
  ts timestamp            -- Time     
);     

        2.任务历史表,用于存储任务的最终状态。

create table task_hist (    -- Task history table  
  uid  int,   -- User ID  
  ptid int8,  -- Parent task ID  
  tid int,    -- Subtask ID    
  state int default 1,    -- Task state: 1 indicates the initial state; -1 indicates being processed; 0 indicates processing completed    
  retry int default -1,   -- Number of retries    
  info text,              -- Other information    
  ts timestamp      -- Time      
);     

        3.为了简化测试,我们将根据用户ID创建分区。

(For the rotate design and multi-level partition design mentioned above, refer to the articles mentioned at the end of this document.)
do language plpgsql 
$$
  
declare  
begin  
  for i in 1..1000 loop  
    execute 'create table task_init_'||i||' ( like task_init including all)';  
    execute 'create table task_hist_'||i||' ( like task_hist including all)';  
  end loop;  
end;  

$$
;  

        4.为了方便测试,可以使用无模式设计,生成用户任务的初始数据,并将其写入plpgsql逻辑中。

create or replace function ins_task_init(  
  uid int,  
  info text,  
  ts timestamp  
)  returns void as 
$$
  
declare  
  target name;  
begin  
  target := format('%I', 'task_init_'||uid);  
  execute format('insert into %I (uid,info,ts) values (%L,%L,%L)', target, uid,info,ts);  
end;  

$$
 language plpgsql strict;  

        5.按照以下步骤运行任务。

        a、从任务表中读取任务。

        b、用户执行任务。

        c、反馈执行结果,为未成功执行的任务更新表task_init,并为成功执行(和结束)的任务将数据从task_init迁移到task_hist。

        为了测试数据库的性能,我们需要将这三个步骤的逻辑编写到plpgsql中。同时,我们必须在批处理中使用delete limit特性来同时执行多个任务。

        我们将CTID行号用于此处的一个位置,以获得最佳性能。这样,我们就不必使用索引,可以获得更好的性能。

        我们使用了咨询锁,这样单个用户就没有并行任务(在实际业务中允许并行任务)。

        我们还没有在这里测试更新状态。当任务失败时,task_init的一部分会被更新(与插入和删除相比,它的比例很小,可以忽略)。

        禁用task_init表的自动真空,并通过旋转进行处理。

create or replace function run_task(  
  uid int,  
  batch int  
) returns void as 
$$
  
declare  
  target1 name;  
  target2 name;  
begin  
  target1 := format('%I', 'task_init_'||uid);  
  target2 := format('%I', 'task_hist_'||uid);  
  execute format('with t1 as (select ctid from %I where pg_try_advisory_xact_lock(%L) limit %s) , t2 as (delete from %I where ctid = any (array(select ctid from t1)) returning *)  insert into %I select * from t2;', target1, uid, batch, target1, target2);  
end;  

$$
 language plpgsql strict;  

        测试分解的操作。

 

        1.编写初始任务

postgres=# select ins_task_init(1,'test',now()::timestamp);  
 ins_task_init   
---------------  
   
(1 row)  
  
postgres=# select ins_task_init(1,'test',now()::timestamp);  
 ins_task_init   
---------------  
   
(1 row)  

        2.运行的任务

postgres=# select run_task(1,100);  
 run_task   
----------  
   
(1 row)  

        3.查看任务是否结束并将其迁移到历史表

postgres=# select * from task_init_1;  
 uid | ptid | tid | state | retry | info | ts   
-----+------+-----+-------+-------+------+----  
(0 rows)  
  
postgres=# select * from task_hist_1;  
 uid | ptid | tid | state | retry | info |             ts               
-----+------+-----+-------+-------+------+----------------------------  
   1 |    1 |   1 |     1 |    -1 | test | 2017-07-20 15:26:32.739766  
   1 |    2 |   2 |     1 |    -1 | test | 2017-07-20 15:26:33.233469  
(2 rows)  

三、性能压力测试

 

        1、生成任务的性能

vi ins.sql  
\set uid random(1,1000)  
select ins_task_init(:uid,'test',now()::timestamp);   
  
pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 32 -j 32 -T 120  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 360 s  
number of transactions actually processed: 86074880  
latency average = 0.268 ms  
latency stddev = 0.295 ms  
tps = 239079.558174 (including connections establishing)  
tps = 239088.708200 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set uid random(1,1000)  
         0.267  select ins_task_init(:uid,'test',now()::timestamp);  
  
postgres=# select count(*) from task_init_1;  
 count   
-------  
 88861  
(1 row)  
  
postgres=# select count(*) from task_init_2;  
 count   
-------  
 88196  
(1 row)  
  
....  
  
postgres=# select count(*) from task_init_1000;  
 count   
-------  
 88468  
(1 row)  
  •         2、运行任务的性能

(obtain 10,000 tasks at once in a batch)
vi run.sql  
\set uid random(1,1000)  
select run_task(:uid,10000);  
  
pgbench -M prepared -n -r -P 1 -f ./run.sql -c 32 -j 32 -T 120  
  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 3294  
latency average = 1171.228 ms  
latency stddev = 361.056 ms  
tps = 27.245606 (including connections establishing)  
tps = 27.247560 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.003  \set uid random(1,1000)  
      1171.225  select run_task(:uid,10000);  
  
postgres=# select count(*) from task_init_1000;  
 count   
-------  
 18468  
(1 row)  
  
postgres=# select count(*) from task_hist_1000;  
 count    
--------  
 224207  
(1 row)  

        四、单独的数据测试

        1.每秒生成239,000个任务

        2.每秒消耗272,000个任务

        在任务生成和使用期间运行的测试数据

        每秒生成16.8万个任务

        每秒消耗168,000多个任务

        不积累任何任务。

 

结论:PostgreSQL在云海量任务调度系统中扮演着重要的角色。一个PostgreSQL实例就足以每小时处理任务生成和消耗。任务调度系统比MQ更复杂。与MQ超集类似,如果需要MQ,可以使用RDS PostgreSQL。其性能指标优于上述试验。
最后,阿里云推出了很多优惠活动,领取阿里云代金券阿里云学生优惠券获得更多优惠!

更多精彩内容,请关注元吉优惠券网:专注阿里云代金券阿里云服务器报价腾讯云代金券的免费领取!
更多精彩内容推荐:
阿里云服务器个人网站搭建 
阿里云代金券的查询和使用方法

阿里云服务器学生优惠购买和配置方法
阿里云常见问题解答
阿里云服务器操作方法(新手图文教程)

































































 


在线客服
热线电话

扫一扫 微信加好友