CeresDB

License CI OpenIssue Slack Docker

CeresDB is a timeseries database that aims to handle both timeseries and analytic workloads efficiently.

Motivation

In the traditional timeseries database, the Tag columns (InfluxDB calls them Tag and Prometheus calls them Label) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag varies in different scenarios. And in some scenarios the cardinality of Tag is very high, and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such these scenarios.

The basic design idea of CeresDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.

How does CeresDB work?

  • See Quick Start to learn about how to get started
  • For data model of CeresDB, see Data Model
  • For the supported SQL data types, operators, and commands, please navigate to SQL reference

Quick Start

This page shows you how to get started with CeresDB quickly. You'll start a standalone CeresDB server, and then insert and read some sample data using SQL.

Start server

CeresDB docker image is the easiest way to get started, if you haven't installed Docker, go there to install it first.

You can use command below to start a standalone server

docker run -d --name ceresdb-server \
  -p 8831:8831 \
  -p 3307:3307 \
  -p 5440:5440 \
  ceresdb/ceresdb-server:v0.3.1

CeresDB will listen three ports when start:

  • 8831, gRPC port
  • 3307, MySQL port
  • 5440, HTTP port

The easiest to use is HTTP, so sections below will use it for demo. For production environments, gRPC/MySQL are recommended.

Write and read data

Create table

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--data-raw '
CREATE TABLE `demo` (
    `name` string TAG,
    `value` double NOT NULL,
    `t` timestamp NOT NULL,
    timestamp KEY (t))
ENGINE=Analytic
  with
(enable_ttl="false")
'

Write data

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--data-raw '
INSERT INTO demo (t, name, value)
    VALUES (1651737067000, "ceresdb", 100)
'

Read data

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--data-raw '
SELECT
    *
FROM
    `demo`
'

Show create table

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--data-raw '
SHOW CREATE TABLE `demo`
'

Drop table

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--data-raw '
DROP TABLE `demo`
'

Using the SDKs

See sdk

Next Step

Congrats, you have finished this tutorial. For more information about CeresDB, see the following:

SQL Syntax

This chapter introduces the SQL interface of CeresDB.

Data Model

This chapter introduces the data model of CeresDB.

Data Types

CeresDB implements table model, and the supported data types are similar to MySQL. The following table lists the mapping relationship between MySQL and CeresDB.

Support Data Type(case-insensitive)

SQLCeresDB
nullNull
timestampTimestamp
doubleDouble
floatFloat
stringString
VarbinaryVarbinary
uint64UInt64
uint32UInt32
uint16UInt16
uint8UInt8
int64/bigintInt64
int32/intInt32
int16/smallintInt16
int8/tinyintInt8
booleanBoolean

Special Columns

Tables in CeresDB have the following constraints:

  • Primary key is required
  • The primary key must contain a time column, and can only contain one time column
  • The primary key must be non-null, so all columns in primary key must be non-null.

Timestamp Column

Tables in CeresDB must have one timestamp column maps to timestamp in timeseries data, such as timestamp in OpenTSDB/Prometheus. The timestamp column can be set with timestamp key keyword, like TIMESTAMP KEY(ts).

Tag column

Tag is use to defined column as tag column, similar to tag in timeseries data, such as tag in OpenTSDB and label in Prometheus.

Primary key

The primary key is used for data deduplication and sorting. The primary key is composed of some columns and one time column. The primary key can be set in the following some ways:

  • use primary key keyword
  • use tag to auto generate TSID, CeresDB will use (timestamp,TSID) as primary key
  • only set Timestamp column, CeresDB will use (timestamp) as primary key

Notice: If the primary key and tag are specified at the same time, then the tag column is just an additional information identification and will not affect the logic.

CREATE TABLE with_primary_key(
  ts TIMESTAMP NOT NULL,
  c1 STRING NOT NULL,
  c2 STRING NULL,
  c4 STRING NULL,
  c5 STRING NULL,
  TIMESTAMP KEY(ts),
  PRIMARY KEY(c1, ts)
) ENGINE=Analytic WITH (ttl='7d');
  
CREATE TABLE with_tag(
    ts TIMESTAMP NOT NULL,
    c1 STRING TAG NOT NULL,
    c2 STRING TAG NULL,
    c3 STRING TAG NULL,
    c4 DOUBLE NULL,
    c5 STRING NULL,
    c6 STRING NULL,
    TIMESTAMP KEY(ts)
) ENGINE=Analytic WITH (ttl='7d');

CREATE TABLE with_timestamp(
    ts TIMESTAMP NOT NULL,
    c1 STRING NOT NULL,
    c2 STRING NULL,
    c3 STRING NULL,
    c4 DOUBLE NULL,
    c5 STRING NULL,
    c6 STRING NULL,
    TIMESTAMP KEY(ts)
) ENGINE=Analytic WITH (ttl='7d');

TSID

If primary keyis not set, and tag columns is provided, TSID will auto generated from hash of tag columns. In essence, this is also a mechanism for automatically generating id.

Identifier

Identifier in CeresDB can be used as table name, column name etc. It cannot be preserved keywords or start with number and punctuation symbols. CeresDB allows to quote identifiers with back quotes (`). In this case it can be any string like 00_table or select.

Note: it's required to wrap column or table name in back quotes to keep them case-sensitive, such as

show create table `demo`;
show create table `DEMO`;

Data Definition Statements

CREATE TABLE

Basic syntax

Basic syntax (parts between [] are optional):

CREATE TABLE [IF NOT EXIST] 
    table_name ( column_definitions ) 
    ENGINE = engine_type 
    [WITH ( table_options )];

Column definition syntax:

column_name column_type [[NOT] NULL] {[TAG] | [TIMESTAMP KEY] | [PRIMARY KEY]}

Table options syntax are key-value pairs. Value should be quote with quotation marks ('). E.g.:

... WITH ( enable_ttl='false' )

IF NOT EXIST

Add IF NOT EXIST to tell CeresDB to ignore errors if the table name already exists.

Define Column

A column's definition should at least contains the name and type parts. All supported types are listed here.

Column is default be nullable. i.e. NULL keyword is implied. Adding NOT NULL constrains to make it required.

-- this definition
a_nullable int
-- equals to
a_nullable int NULL

-- add NOT NULL to make it required
b_not_null NOT NULL

A column can be marked as special column with related keyword.

Engine

Specifies which engine this table belongs to. CeresDB current support Analytic engine type. This attribute is immutable.

ALTER TABLE

ALTER TABLE can change the schema or options of a table.

CeresDB current supports ADD COLUMN.

-- create a table and add a column to it
CREATE TABLE `t`(a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;
ALTER TABLE `t` ADD COLUMN (b string);

It now becomes

-- DESCRIBE TABLE `t`;

name    type        is_primary  is_nullable is_tag

t       timestamp   true        false       false
tsid    uint64      true        false       false
a       int         false       true        false
b       string      false       true        false

Data Manipulation Statements

INSERT

SELECT

Utility Statements

There are serval utilities SQL in CeresDB that can help in table manipulation or query inspection.

SHOW CREATE TABLE

SHOW CREATE TABLE table_name;

SHOW CREATE TABLE returns a CREATE TABLE DDL that will create a same table with the given one. Including columns, table engine and options. The schema and options shows in CREATE TABLE will based on the current version of the table. An example:

-- create one table
CREATE TABLE `t` (a bigint, b int default 3, c string default 'x', d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;
-- Result: affected_rows: 0

-- show how one table should be created.
SHOW CREATE TABLE `t`;

-- Result DDL:
CREATE TABLE `t` (
    `t` timestamp NOT NULL,
    `tsid` uint64 NOT NULL,
    `a` bigint,
    `b` int,
    `c` string,
    `d` smallint,
    PRIMARY KEY(t,tsid),
    TIMESTAMP KEY(t)
) ENGINE=Analytic WITH (
    arena_block_size='2097152',
    compaction_strategy='default',
    compression='ZSTD',
    enable_ttl='true',
    num_rows_per_row_group='8192',
    segment_duration='',
    ttl='7d',
    update_mode='OVERWRITE',
    write_buffer_size='33554432'
)

DESCRIBE

DESCRIBE table_name;

DESCRIBE will show a detailed schema of one table. The attributes include column name and type, whether it is tag and primary key (todo: ref) and whether it's nullable. The auto created column tsid will also be included (todo: ref).

Example:

CREATE TABLE `t`(a int, b string, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

DESCRIBE TABLE `t`;

The result is:

name    type        is_primary  is_nullable is_tag

t       timestamp   true        false       false
tsid    uint64      true        false       false
a       int         false       true        false
b       string      false       true        false

EXPLAIN

EXPLAIN query;

EXPLAIN shows how a query will be executed. Add it to the beginning of a query like

EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `t` GROUP BY name;

will give

logical_plan
Projection: #MAX(07_optimizer_t.value) AS c1, #AVG(07_optimizer_t.value) AS c2
  Aggregate: groupBy=[[#07_optimizer_t.name]], aggr=[[MAX(#07_optimizer_t.value), AVG(#07_optimizer_t.value)]]
    TableScan: 07_optimizer_t projection=Some([name, value])

physical_plan
ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]
  AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
    CoalesceBatchesExec: target_batch_size=4096
      RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 6)
        AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
          ScanTable: table=07_optimizer_t, parallelism=8, order=None

Options

Options below can be used when create table for analytic engine

  • enable_ttl, bool. When enable TTL on a table, rows older than ttl will be deleted and can't be querid, default true

  • ttl, duration, lifetime of a row, only used when enable_ttl is true. default 7d.

  • storage_format, string. The underlying column's format. Availiable values:

    • columnar, default
    • hybrid

    The meaning of those two values are in Storage format section.

Storage Format

There are mainly two formats supported in analytic engine. One is columnar, which is the traditional columnar format, with one table column in one physical column:

| Timestamp | Device ID | Status Code | Tag 1 | Tag 2 |
| --------- |---------- | ----------- | ----- | ----- |
| 12:01     | A         | 0           | v1    | v1    |
| 12:01     | B         | 0           | v2    | v2    |
| 12:02     | A         | 0           | v1    | v1    |
| 12:02     | B         | 1           | v2    | v2    |
| 12:03     | A         | 0           | v1    | v1    |
| 12:03     | B         | 0           | v2    | v2    |
| .....     |           |             |       |       |

The other one is hybrid, an experimental format used to simulate row-oriented storage in columnar storage to accelerate traditional time-series query.

In traditional time-series user cases like IoT or DevOps, queries will typically first group their result by series id(or device id), then by timestamp. In order to achieve good performance in those scenarios, the data physical layout should match this style, so the hybrid format is proposed like this:

 | Device ID | Timestamp           | Status Code | Tag 1 | Tag 2 | minTime | maxTime |
 |-----------|---------------------|-------------|-------|-------|---------|---------|
 | A         | [12:01,12:02,12:03] | [0,0,0]     | v1    | v1    | 12:01   | 12:03   |
 | B         | [12:01,12:02,12:03] | [0,1,0]     | v2    | v2    | 12:01   | 12:03   |
 | ...       |                     |             |       |       |         |         |
  • Within one file, rows belonging to the same primary key(eg: series/device id) are collapsed into one row
  • The columns besides primary key are divided into two categories:
    • collapsible, those columns will be collapsed into a list. Used to encode fields in time-series table
      • Note: only fixed-length type is supported now
    • non-collapsible, those columns should only contain one distinct value. Used to encode tags in time-series table
      • Note: only string type is supported now
  • Two more columns are added, minTime and maxTime. Those are used to cut unnecessary rows out in query.
    • Note: Not implemented yet.

Example

CREATE TABLE `device` (
    `ts` timestamp NOT NULL,
    `tag1` string tag,
    `tag2` string tag,
    `value1` double,
    `value2` int,
    timestamp KEY (ts)) ENGINE=Analytic
  with (
    enable_ttl = 'false',
    storage_format = 'hybrid'
);

This will create a table with hybrid format, users can inspect data format with parquet-tools. The table above should have following parquet schema:

message arrow_schema {
  optional group ts (LIST) {
    repeated group list {
      optional int64 item (TIMESTAMP(MILLIS,false));
    }
  }
  required int64 tsid (INTEGER(64,false));
  optional binary tag1 (STRING);
  optional binary tag2 (STRING);
  optional group value1 (LIST) {
    repeated group list {
      optional double item;
    }
  }
  optional group value2 (LIST) {
    repeated group list {
      optional int32 item;
    }
  }
}

Deployment

CeresDB is a distributed timeseries database, that is to say, multiple CeresDB instances can be deployed as a cluster to serve with high availability and scalability.

For now, CeresDB only supports one deployment form that features static, rule-based routing. In the future (maybe in 1.0.0 version), a powerful cluster deployment will be supported and then CeresDB cluster will support more great features, including dynamic expansion, data reliability and etc.

As an open source cloud-native, CeresDB can be deployed in the Intel/ARM-based architecture server, and major virtualization environments.

OSstatus
Ubuntu LTS 16.06 or later
CentOS 7.3 or later
Red Hat Enterprise Linux 7.3 or later 7.x releases
macOS 11 or later
Windows
  • For production workloads, Linux is the preferred platform.
  • macOS is mainly used for development

Static Routing

This guide shows how to deploy a CeresDB cluster with static, rule-based routing.

The crucial point here is that CeresDB server provides configurable routing function on table name so what we need is just a valid config containing routing rules which will be shipped to every CeresDB instance in the cluster.

Target

First, let's assume that our target is to deploy a cluster consisting of two CeresDB instances on the same machine. And a large cluster of more CeresDB instances can deploy according to the two-instances example.

Prepare Config

Basic

Suppose the basic config of CeresDB is:

bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831
log_level = "info"
enable_cluster = true

[analytic]
wal_path = "/tmp/ceresdb"

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb"

In order to deploy two CeresDB instances on the same machine, the config should choose different ports to serve and data directories to store data.

Say the CeresDB_0's config is:

bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831
log_level = "info"
enable_cluster = true

[analytic]
wal_path = "/tmp/ceresdb_0"

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb_0"

Then the CeresDB_1's config is:

bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831
log_level = "info"
enable_cluster = true

[analytic]
wal_path = "/tmp/ceresdb_1"

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb_1"

Schema&Shard declaration

Then we should define the common part -- schema&shard declaration and routing rules.

Here is the config for schema&shard declaration:

[[static_route.topology.schema_shards]]
schema = 'public_0'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[static_route.topology.schema_shards]]
schema = 'public_1'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831

In the config above, two schemas are declared:

  • public_0 has two shards served by CeresDB_0.
  • public_1 has two shards served by both CeresDB_0 and CeresDB_1.

Routing rules

Provided with shcema&shard declaration, routing rules can be defined and here is an example of prefix rule:

[[route_rules.prefix_rules]]
schema = 'public_0'
prefix = 'prod_'
shard = 0

This rule means that all the table with prod_ prefix belonging to public_0 should be routed to shard_0 of public_0, that is to say, CeresDB_0. As for the other tables whose names are not prefixed by prod_ will be routed by hash to both shard_0 and shard_1 of public_0.

Besides prefix rule, we can also define a hash rule:

[[route_rules.hash_rules]]
schema = 'public_1'
shards = [0, 1]

This rule tells CeresDB to route public_1's tables to both shard_0 and shard_1 of public_1, that is to say, CeresDB0 and CeresDB_1. And actually this is default routing behavior if no such rule provided for schema public_1.

For now, we can provide the full example config for CeresDB_0 and CeresDB_1:

bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831
log_level = "info"
enable_cluster = true

[analytic]
wal_path = "/tmp/ceresdb_0"

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb_0"

[[static_route.topology.schema_shards]]
schema = 'public_0'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[static_route.topology.schema_shards]]
schema = 'public_1'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831

[[static_route.rules.prefix_rules]]
schema = 'public_0'
prefix = 'prod_'
shard = 0

[[static_route.rules.hash_rules]]
schema = 'public_1'
shards = [0, 1]
bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831
log_level = "info"
enable_cluster = true

[analytic]
wal_path = "/tmp/ceresdb_1"

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb_1"

[[static_route.topology.schema_shards]]
schema = 'public_0'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[static_route.topology.schema_shards]]
schema = 'public_1'
[[static_route.topology.schema_shards.shard_views]]
shard_id = 0
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[static_route.topology.schema_shards.shard_views]]
shard_id = 1
[static_route.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831

[[static_route.rules.prefix_rules]]
schema = 'public_0'
prefix = 'prod_'
shard = 0

[[static_route.rules.hash_rules]]
schema = 'public_1'
shards = [0, 1]

Let's call the two different config files as config_0.toml and config_1.toml but you should know in the real environment the different CeresDB intances can be deployed across different machines, that is to say, there is no need to choose different ports and data directories for different CeresDB instances so that all the CeresDB instances can share one exactly same config file.

Start CeresDBs

After the configs are prepared, what we should to do is to start CeresDB container with the specific config.

Just run the commands below:

sudo docker run -d -t --name ceresdb_0 -p 5440:5440 -p 8831:8831 -v $(pwd)/config_0.toml:/etc/ceresdb/ceresdb.toml ceresdb/ceresdb-server:v0.1.0-alpha
sudo docker run -d -t --name ceresdb_1 -p 15440:15440 -p 18831:18831 -v $(pwd)/config_1.toml:/etc/ceresdb/ceresdb.toml ceresdb/ceresdb-server:v0.1.0-alpha

After the two containers are created and starting running, read and write requests can be served by the two-instances CeresDB cluster.

Dynamic Routing

This guide shows how to deploy a CeresDB cluster with CeresMeta.

Target

First, let's assume that our target is to deploy a cluster consisting of two CeresDB instances on the same machine. And a large cluster of more CeresDB instances can deploy according to the two-instances example.

Start CeresDBs

You can use the following command to create a CeresDB cluster with two instances.

  1. Start CeresMeta first Refer to CeresMeta

  2. Prepare config of CeresDB

# {project_path}/docs/example-cluster-0.toml
bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831
mysql_port = 3307
log_level = "info"
deploy_mode = "Cluster"

[analytic]
wal_path = "/tmp/ceresdb0"

[analytic.storage]
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0

[analytic.storage.object_store]
type = "Local"
data_path = "/tmp/ceresdb0"

[cluster]
cmd_channel_buffer_size = 10

[cluster.node]
addr = "127.0.0.1"
port = 8831

[cluster.meta_client]
# Only support "defaultCluster" currently.
cluster_name = "defaultCluster"
meta_addr = "http://127.0.0.1:2379"
lease = "10s"
timeout = "5s"

[limiter]
write_block_list = ['mytable1']
read_block_list = ['mytable1']
# {project_path}/docs/example-cluster-1.toml
bind_addr = "0.0.0.0"
http_port = 5441
grpc_port = 8832
mysql_port = 13307
log_level = "info"
deploy_mode = "Cluster"

[analytic]
wal_path = "/tmp/ceresdb1"

[analytic.storage]
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0

[analytic.storage.object_store]
type = "Local"
data_path = "/tmp/ceresdb1"

[cluster]
cmd_channel_buffer_size = 10

[cluster.node]
addr = "127.0.0.1"
port = 8832

[cluster.meta_client]
# Only support "defaultCluster" currently.
cluster_name = "defaultCluster"
meta_addr = "http://127.0.0.1:2379"
lease = "10s"
timeout = "5s"

[limiter]
write_block_list = ['mytable1']
read_block_list = ['mytable1']
  1. Start CeresDB instances
  • You need to replace {project_path} with the actual project path
# Update address of CeresMeta in CeresDB config.
docker run -d --name ceresdb-server \
  -p 8831:8831 \
  -p 3307:3307 \
  -p 5440:5440 \
  -v {project_path}/docs/example-cluster-0.toml:/etc/ceresdb/ceresdb.toml \
  ceresdb/ceresdb-server:v0.3.1
  
docker run -d --name ceresdb-server2 \
  -p 8832:8832 \
  -p 13307:13307 \
  -p 5441:5441 \
  -v {project_path}/docs/example-cluster-1.toml:/etc/ceresdb/ceresdb.toml \
  ceresdb/ceresdb-server:v0.3.1

Develop Kits

Java

Go

Python

Rust

Operation and Maintenance

This guide introduces the operation and maintenance of CeresDB, including cluster installation, database&table operations, fault tolerance, disaster recovery, data import and export, etc.

Table Operation

CeresDB supports standard SQL protocols and allows you to create tables and read/write data via http requests.

Create Table

Example

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='\''false'\'')"
}'

Write Data

Example

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "INSERT INTO demo(t, name, value) VALUES(1651737067000, '\''ceresdb'\'', 100)"
}'

Read Data

Example

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "select * from demo"
}'

Query Table Info

Example

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "show create table demo"
}'

Drop Table

Example

curl --location --request POST 'http://127.0.0.1:5440/sql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "DROP TABLE demo"
}'

Table Operation

Query Table Information

Like Mysql's information_schema.tables, CeresDB provides system.public.tables to save tables information. Columns:

  • timestamp([TimeStamp])
  • catalog([String])
  • schema([String])
  • table_name([String])
  • table_id([Uint64])
  • engine([String])

Example

Query table information via table_name like this:

curl --location --request POST 'http://localhost:5000/sql' \
--header 'Content-Type: application/json' \
--header 'x-ceresdb-access-schema: my_schema' \
-d '{
    "query": "select * from system.public.tables where `table_name`=\"my_table\""
}'

Response

{
    "rows":[
        {
            "timestamp":0,
            "catalog":"ceresdb",
            "schema":"monitor_trace",
            "table_name":"my_table",
            "table_id":3298534886446,
            "engine":"Analytic"
        }
}

Block List

Add block list

If you want to reject query for a table, you can add table name to 'read_block_list'.

Example

curl --location --request POST 'http://localhost:5000/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Add",
    "write_block_list":[],
    "read_block_list":["my_table"]
}'

Response

{
  "write_block_list":[

  ],
  "read_block_list":[
    "my_table"
  ]
}

Set block list

You can use set operation to clear exist tables and set new tables to 'read_block_list' like following example.

Example

curl --location --request POST 'http://localhost:5000/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Set",
    "write_block_list":[],
    "read_block_list":["my_table1","my_table2"]
}'

Response

{
  "write_block_list":[

  ],
  "read_block_list":[
    "my_table1",
    "my_table2"
  ]
}

Remove block list

You can remove tables from 'read_block_list' like following example.

Example

curl --location --request POST 'http://localhost:5000/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Remove",
    "write_block_list":[],
    "read_block_list":["my_table1"]
}'

Response

{
  "write_block_list":[

  ],
  "read_block_list":[
    "my_table2"
  ]
}

Observability

CeresDB is observable with Prometheus and Grafana.

targetOSstatus
x86_64-unknown-linux-gnukernel 4.9+
x86_64-apple-darwin10.15+, Catalina+
aarch64-apple-darwin11+, Big Sur+
aarch64-unknown-linux-gnuTBDtracked on #63
*-windows*

In order to compile CeresDB, some relevant dependencies(including the Rust toolchain) should be installed.

Dependencies(Ubuntu20.04)

Assuming the development environment is Ubuntu20.04, execute the following command to install the required dependencies:

apt install git curl gcc g++ libssl-dev pkg-config cmake

It should be noted that the compilation of the project has version requirements for dependencies such as cmake, gcc, g++, etc. If your development environment is an old Linux distribution, it is necessary to manually install these dependencies of a higher version.

Dependencies(MacOS)

If the development environment is MacOS, execute the following command to install the required dependencies.

  1. Install command line tools:
xcode-select --install
  1. Install cmake:
brew install cmake
  1. Install protobuf:
brew install protobuf

Rust

Rust can be installed by rustup. After installing rustup, when entering the CeresDB project, the specified Rust version will be automatically downloaded according to the rust-toolchain file.

After execution, you need to add environment variables to use the Rust toolchain. Basically, just put the following commands into your ~/.bashrc or ~/.bash_profile:

source $HOME/.cargo/env

Compile and run

Compile CeresDB by the following command:

cargo build --release

Then you can run CeresDB using the default configuration file provided in the codebase.

./target/release/ceresdb-server --config ./docs/minimal.toml

Conventional Commit Guide

This document describes how we use conventional commit in our development.

Structure

We would like to structure our commit message like this:

<type>[optional scope]: <description>

There are three parts. type is used to classify which kind of work this commit does. scope is an optional field that provides additional contextual information. And the last field is your description of this commit.

Type

Here we list some common types and their meanings.

  • feat: Implement a new feature.
  • fix: Patch a bug.
  • docs: Add document or comment.
  • build: Change the build script or configuration.
  • style: Style change (only). No logic involved.
  • refactor: Refactor an existing module for performance, structure, or other reasons.
  • test: Enhance test coverage or sqlness.
  • chore: None of the above.

Scope

The scope is more flexible than type. And it may have different values under different types.

For example, In a feat or build commit we may use the code module to define scope, like

feat(cluster):
feat(server):

build(ci):
build(image):

And in docs or refactor commits the motivation is prefer to label the scope, like

docs(comment):
docs(post):

refactor(perf):
refactor(usability):

But you don't need to add a scope every time. This isn't mandatory. It's just a way to help describe the commit.

After all

There are many other rules or scenarios in conventional commit's website. We are still exploring a better and more friendly workflow. Please do let us know by open an issue if you have any suggestions ❤️

Rationale and Goals

As every Rust programmer knows, the language has many powerful features, and there are often several patterns which can express the same idea. Also, as every professional programmer comes to discover, code is almost always read far more than it is written.

Thus, we choose to use a consistent set of idioms throughout our code so that it is easier to read and understand for both existing and new contributors.

Unsafe and Platform-Dependent conditional compilation

Avoid unsafe Rust

One of the main reasons to use Rust as an implementation language is its strong memory safety guarantees; Almost all of these guarantees are voided by the use of unsafe. Thus, unless there is an excellent reason and the use is discussed beforehand, it is unlikely CeresDB will accept patches with unsafe code.

We may consider taking unsafe code given:

  • performance benchmarks showing a very compelling improvement
  • a compelling explanation of why the same performance can not be achieved using safe code
  • tests showing how it works safely across threads

Avoid platform-specific conditional compilation cfg

We hope that CeresDB is usable across many different platforms and Operating systems, which means we put a high value on standard Rust.

While some performance critical code may require architecture specific instructions, (e.g. AVX512) most of the code should not.

Errors

All errors should follow the SNAFU crate philosophy and use SNAFU functionality

Good:

  • Derives Snafu and Debug functionality
  • Has a useful, end-user-friendly display message
#![allow(unused)]
fn main() {
#[derive(Snafu, Debug)]
pub enum Error {
    #[snafu(display(r#"Conversion needs at least one line of data"#))]
    NeedsAtLeastOneLine,
    // ...
}
}

Bad:

#![allow(unused)]
fn main() {
pub enum Error {
    NeedsAtLeastOneLine,
    // ...
}

Use the ensure! macro to check a condition and return an error

Good:

  • Reads more like an assert!
  • Is more concise
#![allow(unused)]
fn main() {
ensure!(!self.schema_sample.is_empty(), NeedsAtLeastOneLine);
}

Bad:

#![allow(unused)]
fn main() {
if self.schema_sample.is_empty() {
    return Err(Error::NeedsAtLeastOneLine {});
}
}

Errors should be defined in the module they are instantiated

Good:

  • Groups related error conditions together most closely with the code that produces them
  • Reduces the need to match on unrelated errors that would never happen
#![allow(unused)]
fn main() {
#[derive(Debug, Snafu)]
pub enum Error {
    #[snafu(display("Not implemented: {}", operation_name))]
    NotImplemented { operation_name: String }
}
// ...
ensure!(foo.is_implemented(), NotImplemented {
    operation_name: "foo",
}
}

Bad:

#![allow(unused)]
fn main() {
use crate::errors::NotImplemented;
// ...
ensure!(foo.is_implemented(), NotImplemented {
    operation_name: "foo",
}
}

The Result type alias should be defined in each module

Good:

  • Reduces repetition
#![allow(unused)]
fn main() {
pub type Result<T, E = Error> = std::result::Result<T, E>;
...
fn foo() -> Result<bool> { true }
}

Bad:

#![allow(unused)]
fn main() {
...
fn foo() -> Result<bool, Error> { true }
}

Err variants should be returned with fail()

Good:

#![allow(unused)]
fn main() {
return NotImplemented {
    operation_name: "Parquet format conversion",
}.fail();
}

Bad:

#![allow(unused)]
fn main() {
return Err(Error::NotImplemented {
    operation_name: String::from("Parquet format conversion"),
});
}

Use context to wrap underlying errors into module specific errors

Good:

  • Reduces boilerplate
#![allow(unused)]
fn main() {
input_reader
    .read_to_string(&mut buf)
    .context(UnableToReadInput {
        input_filename,
    })?;
}

Bad:

#![allow(unused)]
fn main() {
input_reader
    .read_to_string(&mut buf)
    .map_err(|e| Error::UnableToReadInput {
        name: String::from(input_filename),
        source: e,
    })?;
}

Hint for Box<dyn::std::error::Error> in Snafu:

If your error contains a trait object (e.g. Box<dyn std::error::Error + Send + Sync>), in order to use context() you need to wrap the error in a Box:

#![allow(unused)]
fn main() {
#[derive(Debug, Snafu)]
pub enum Error {

    #[snafu(display("gRPC planner got error listing partition keys: {}", source))]
    ListingPartitions {
        source: Box<dyn std::error::Error + Send + Sync>,
    },
}

...

  // Wrap error in a box prior to calling context()
database
  .partition_keys()
  .await
  .map_err(|e| Box::new(e) as _)
  .context(ListingPartitions)?;
}

Note the as _ in the map_err call. Without it, you may get an error such as:

error[E0271]: type mismatch resolving `<ListingPartitions as IntoError<influxrpc::Error>>::Source == Box<<D as Database>::Error>`
  --> query/src/frontend/influxrpc.rs:63:14
   |
63 |             .context(ListingPartitions)?;
   |              ^^^^^^^ expected trait object `dyn snafu::Error`, found associated type
   |
   = note: expected struct `Box<(dyn snafu::Error + Send + Sync + 'static)>`
              found struct `Box<<D as Database>::Error>`
   = help: consider constraining the associated type `<D as Database>::Error` to `(dyn snafu::Error + Send + Sync + 'static)`
   = note: for more information, visit https://doc.rust-lang.org/book/ch19-03-advanced-traits.html

Each error cause in a module should have a distinct Error enum variant

Specific error types are preferred over a generic error with a message or kind field.

Good:

  • Makes it easier to track down the offending code based on a specific failure
  • Reduces the size of the error enum (String is 3x 64-bit vs no space)
  • Makes it easier to remove vestigial errors
  • Is more concise
#![allow(unused)]
fn main() {
#[derive(Debug, Snafu)]
pub enum Error {
    #[snafu(display("Error writing remaining lines {}", source))]
    UnableToWriteGoodLines { source: IngestError },

    #[snafu(display("Error while closing the table writer {}", source))]
    UnableToCloseTableWriter { source: IngestError },
}

// ...

write_lines.context(UnableToWriteGoodLines)?;
close_writer.context(UnableToCloseTableWriter))?;
}

Bad:

#![allow(unused)]
fn main() {
pub enum Error {
    #[snafu(display("Error {}: {}", message, source))]
    WritingError {
        source: IngestError,
        message: String,
    },
}

write_lines.context(WritingError {
    message: String::from("Error while writing remaining lines"),
})?;
close_writer.context(WritingError {
    message: String::from("Error while closing the table writer"),
})?;
}

Tests

Don't return Result from test functions

At the time of this writing, if you return Result from test functions to use ? in the test function body and an Err value is returned, the test failure message is not particularly helpful. Therefore, prefer not having a return type for test functions and instead using expect or unwrap in test function bodies.

Good:

#![allow(unused)]
fn main() {
#[test]
fn google_cloud() {
    let config = Config::new();
    let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
        config.service_account,
        config.bucket,
    ));

    put_get_delete_list(&integration).unwrap();
    list_with_delimiter(&integration).unwrap();
}
}

Bad:

#![allow(unused)]
fn main() {
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>;

#[test]
fn google_cloud() -> Result<()> {
    let config = Config::new();
    let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
        config.service_account,
        config.bucket,
    ));

    put_get_delete_list(&integration)?;
    list_with_delimiter(&integration)?;
    Ok(())
}
}

Thanks

Fork from influxdb_iox.

RoadMap

v0.1.0

  • Standalone version, local storage
  • Analytical storage format
  • Support SQL

v0.2.0

  • Distributed version supports static topology defined in config file.
  • The underlying storage supports Aliyun OSS.
  • WAL implementation based on OBKV.

v0.3.0

  • Release multi-language clients, including Java, Rust and Python.
  • Static cluster mode with CeresMeta.
  • Basic implementation of hybrid storage format.

v0.4.0

  • Implement more sophisticated cluster solution that enhances reliability and scalability of CeresDB.
  • Set up nightly benchmark with TSBS.

v1.0.0-alpha (Released)

  • Implement Distributed WAL based on Apache Kafka.
  • Release Golang client.
  • Improve the query performance for traditional time series workloads.
  • Support dynamic migration of tables in cluster mode.

v1.0.0

  • Formally release CeresDB and its SDKs with all breaking changes finished.
  • Finish the majority of work related to Table Partitioning.
  • Various efforts to improve query performance, especially for cloud-native cluster mode. These works includes:
    • Multi-tier cache.
    • Introduce various methods to reduce the data fetched from remote storage (improve the accuracy of SST data filtering).
    • Increase the parallelism while fetching data from remote object-store.
  • Improve data ingestion performance by introducing resource control over compaction.

Afterwards

With an in-depth understanding of the time-series database and its various use cases, the majority of our work will focus on performance, reliability, scalability, ease of use, and collaborations with open-source communities.

  • Add utilities that support PromQL, InfluxQL, OpenTSDB protocol, and so on.
  • Provide basic utilities for operation and maintenance. Specifically, the following are included:
    • Deployment tools that fit well for cloud infrastructures like Kubernetes.
    • Enhance self-observability, especially critical logs and metrics should be supplemented.
  • Develop various tools that ease the use of CeresDB. For example, data import and export tools.
  • Explore new storage formats that will improve performance on hybrid workloads (analytical and traditional time-series workloads).

Introduction to CeresDB's Architecture

Target

  • Provide the overview of CeresDB to the developers who want to know more about CeresDB but have no idea where to start.
  • Make a brief introduction to the important modules of CeresDB and the connections between these modules but details about their implementations are not be involved.

Motivation

CeresDB is a timeseries database. However, CeresDB's goal is to handle both timeseries and analytic workloads compared with the traditional ones, which usually have a poor performance in handling analytic workloads.

In the traditional timeseries database, the Tag columns (InfluxDB calls them Tag and Prometheus calls them Label) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag varies in different scenarios. And in some scenarios the cardinality of Tag is very high, and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such these scenarios.

The basic design idea of CeresDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.

Architecture

┌──────────────────────────────────────────┐
│       RPC Layer (HTTP/gRPC/MySQL)        │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│                 SQL Layer                │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │     Parser      │  │     Planner     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌───────────────────┐  ┌───────────────────┐
│    Interpreter    │  │      Catalog      │
└───────────────────┘  └───────────────────┘
┌──────────────────────────────────────────┐
│               Query Engine               │
│ ┌─────────────────┐  ┌─────────────────┐ │
│ │    Optimizer    │  │    Executor     │ │
│ └─────────────────┘  └─────────────────┘ │
└──────────────────────────────────────────┘
┌──────────────────────────────────────────┐
│         Pluggable Table Engine           │
│  ┌────────────────────────────────────┐  │
│  │              Analytic              │  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││      Wal       ││    Memtable    ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││     Flush      ││   Compaction   ││  │
│  │└────────────────┘└────────────────┘│  │
│  │┌────────────────┐┌────────────────┐│  │
│  ││    Manifest    ││  Object Store  ││  │
│  │└────────────────┘└────────────────┘│  │
│  └────────────────────────────────────┘  │
│  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           Another Table Engine        │  │
│  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
└──────────────────────────────────────────┘

The figure above shows the architecture of CeresDB stand-alone service and the details of some important modules will be described in the following part.

RPC Layer

module path: https://github.com/CeresDB/ceresdb/tree/main/server

The current RPC supports multiple protocols including HTTP, gRPC, MySQL.

Basically, HTTP and MySQL are used to debug CeresDB, query manually and perform DDL operations (such as creating, deleting tables, etc.). And gRPC protocol can be regarded as a customized protocol for high-performance, which is suitable for massive reading and writing operations.

SQL Layer

module path: https://github.com/CeresDB/ceresdb/tree/main/sql

SQL layer takes responsibilities for parsing sql and generating the plan.

Based on sqlparser a sql dialect, which introduces some key concepts including Tag and Timestamp, is provided for processing timeseries data. And by utilizing DataFusion the planner can generate not only normal logical plans but also custom ones, such as plans for PromQL.

Interpreter

module path: https://github.com/CeresDB/ceresdb/tree/main/interpreters

The Interpreter module encapsulates the SQL CRUD operations. Actually, a sql received by CeresDB will be parsed, converted into the query plan and then executed in some specific interpreter, such as SelectInterpreter, InsertInterpreter and etc.

Catalog

module path: https://github.com/CeresDB/ceresdb/tree/main/catalog_impls

Catalog is actually the module managing metadata and the levels of metadata adopted by CeresDB is similar to PostgreSQL: Catalog > Schema > Table, but they are only used as namespace.

At present, Catalog and Schema have two different kinds of implementation for stand-alone and distributed mode because some strategies to generate ids and ways to persist metadata differ in different mode.

Query Engine

module path: https://github.com/CeresDB/ceresdb/tree/main/query_engine

Query Engine is responsible for optimizing and executing query plan given a basic SQL plan provided by SQL layer and now such work is mainly delegated to DataFusion.

In addition to the basic functions of SQL, CeresDB also defines some customized query protocols and optimization rules for some specific query plans by utilizing the extensibility provided by DataFusion. For example, the implementation of PromQL is implemented in this way and read it if you are interested.

Pluggable Table Engine

module path: https://github.com/CeresDB/ceresdb/tree/main/table_engine

Table Engine is actually a storage engine for managing tables in CeresDB and the pluggability of Table Engine is a core design of CeresDB which matters in achieving our target (process both timeseries and analytic workloads well). CeresDB will have multiple kinds of Table Engine for different workloads and the most appropriate one should be chosen as the storage engine according to the workload pattern.

Now the requirements for a Table Engine are:

  • Manage all the shared resources under the engine:
    • Memory
    • Storage
    • CPU
  • Manage metadata of tables such as table schema and table options;
  • Provide Table instances which provides read and write methods;
  • Take responsibilities for creating, opening, dropping and closing Table instance;
  • ....

Actually the things that a Table Engine needs to process are a little complicated. And now in CeresDB only one Table Engine called Analytic is provided and does a good job in processing analytical workload, but it is not ready yet to handle the timeseries workload (we plan to enhance it for a better performance by adding some indexes which help handle timeseries workload).

The following part gives a description about details of Analytic Table Engine.

WAL

module path: https://github.com/CeresDB/ceresdb/tree/main/wal

The model of CeresDB processing data is WAL + MemTable that the recent written data is written to WAL first and then to MemTable and after a certain amount of data in MemTable is accumulated, the data will be organized in a query-friendly form to persistent devices.

Now two implementations of WAL are provided for stand-alone and distributed mode:

  • For stand-alone mode, WAL is based on RocksDB and data is persisted on the local disk.
  • For distributed mode, WAL is required as a distributed component and to be responsible for reliability of the newly written data, so now we provide an implementation based on OceanBase and in our roadmap a more lightweight implementation will be provided.

Besides, WAL's trait definition tells that WAL has the concept of Region and actually each table is assigned to a Region so that the isolation between tables is gained and such an isolation provides convenience for some operations on table's level (such as different TTLs for different tables).

MemTable

module path: https://github.com/CeresDB/ceresdb/tree/main/analytic_engine/src/memtable

Memtable is used to store the newly written data and after a certain amount of data is accumulated, CeresDB organizes the data in MemTable into a query-friendly storage format (SST) and stores it to the persistent device. MemTable is readable before it gets persisted (flushed).

The current implementation of MemTable is based on agatedb's skiplist. It allows concurrent reads and writes and can control memory usage based on Arena.

Flush

module path: https://github.com/CeresDB/ceresdb/blob/main/analytic_engine/src/instance/flush_compaction.rs

What Flush does is that when the memory usage of MemTable reaches the threshold, some MemTables are selected for flushing into query-friendly SSTs saved on persistent device.

During the flushing procedure, the data will be divided by a certain time range (which is configured by table option Segment Duration), and no SST will span the Segment Duration. Actually this is also a common operation in most timeseries databases which organizes data in the time dimension to speed up subsequent time-related operations, such as querying data over a time range and assisting purge data outside the TTL.

At present, the control process of Flush is a little complicated, so the details will be explained in another document.

Compaction

module path: https://github.com/CeresDB/ceresdb/tree/main/analytic_engine/src/compaction

The data of MemTable is flushed as SSTs, but the file size of recently flushed SST may be very small. And too small or too many SSTs lead to the poor query performance. Therefore, Compaction is then introduced to rearrange the SSTs so that the multiple smaller SST files can be compacted into a larger SST file.

The detailed strategy of Compaction will also be described with Flush in subsequent documents.

Manifest

module path: https://github.com/CeresDB/ceresdb/tree/main/analytic_engine/src/meta

Manifest takes responsibilities for managing tables' metadata of Analytic Engine including:

  • Table schema and table options;
  • The sequence number where the newest flush finishes;
  • The information of SST, such as SST path.

Now the Manifest is based on WAL (this is a different instance from the WAL mentioned above for newly written data) and in order to avoid infinite expansion of metadata (actually every Flush leads to an update on sst information), Snapshot is also introduced to clean up the history of metadata updates.

Object Store

module path: https://github.com/CeresDB/ceresdb/tree/main/components/object_store

The SST generated by Flush needs to be persisted and the abstraction of the persistent storage device is ObjectStore including multiple implementations:

The distributed architecture of CeresDB separates storage and computing, which requires Object Store needs to be a highly available and reliable service independent of CeresDB. Therefore, storage systems like Amazon S3, Alibaba Cloud OSS is a good choice and in the future implementations on storage systems of some other cloud service providers is planned to provide.

SST

module path: https://github.com/CeresDB/ceresdb/tree/main/analytic_engine/src/sst

Both Flush and Compaction involves SST and in the codebase SST itself is actually an abstraction that can have multiple specific implementations. The current implementation is based on Parquet, which is a column-oriented data file format designed for efficient data storage and retrieval.

The format of SST is very critical for retrieving data and is also the most important part to perform well in handling both timeseries and analytic workloads. At present, our Parquet-based implementation is good at processing analytic workload but is poor at processing timeseries workload. In our roadmap, we will explore more storage formats in order to achieve a good performance in both workloads.

Space

module path: https://github.com/CeresDB/ceresdb/blob/main/analytic_engine/src/space.rs

In Analytic Engine, there is a concept called space and here is an explanation for it to resolve some ambiguities when read source code. Actually Analytic Engine does not have the concept of catalog and schema and only provides two levels of relationship: space and table. And in the implementation, the schema id (which should be unique across all catalogs) on the upper layer is actually mapped to space id.

The space in Analytic Engine serves mainly for isolation of resources for different tenants, such as the usage of memory.

Critical Path

After a brief introduction to some important modules of CeresDB, we will give a description for some critical paths in code, hoping to provide interested developers with a guide for reading the code.

Query

┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─10──│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   9│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
                           │    ▲
                          5│   8│
                           │    │
                           ▼    │
                   ┌──────────────────┐
                   │                  │
                   │   Query Engine   │
                   │                  │
                   └──────────────────┘
                           │    ▲
                          6│   7│
                           │    │
                           ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

Take SELECT SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.

Here are the details:

  • Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
  • Parse SQL in the request by the parser;
  • With the parsed sql and the catalog/schema module, DataFusion can generate the logical plan;
  • With the logical plan, the corresponding Interpreter is created and logical plan will be executed by it;
  • For the logical plan of normal Select SQL, it will be executed through SelectInterpreter;
  • In the SelectInterpreter the specific query logic is executed by the Query Engine:
    • Optimize the logical plan;
    • Generate the physical plan;
    • Optimize the physical plan;
    • Execute the physical plan;
  • The execution of physical plan involves Analytic Engine:
    • Data is obtained by read method of Table instance provided by Analytic Engine;
    • The source of the table data is SST and Memtable, and the data can be filtered by the pushed down predicates;
    • After retrieving the table data, Query Engine will complete the specific computation and generate the final results;
  • SelectInterpreter gets the results and feeds them to the protocol module;
  • After the protocol layer converts the results, the server module responds to the client with them.

Write

┌───────┐      ┌───────┐      ┌───────┐
│       │──1──▶│       │──2──▶│       │
│Server │      │  SQL  │      │Catalog│
│       │◀─8───│       │◀─3───│       │
└───────┘      └───────┘      └───────┘
                │    ▲
               4│   7│
                │    │
                ▼    │
┌─────────────────────────────────────┐
│                                     │
│             Interpreter             │
│                                     │
└─────────────────────────────────────┘
      │    ▲
      │    │
      │    │
      │    │
      │    │       ┌──────────────────┐
      │    │       │                  │
     5│   6│       │   Query Engine   │
      │    │       │                  │
      │    │       └──────────────────┘
      │    │
      │    │
      │    │
      ▼    │
 ┌─────────────────────────────────────┐
 │                                     │
 │            Table Engine             │
 │                                     │
 └─────────────────────────────────────┘

Take INSERT SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.

Here are the details:

  • Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
  • Parse SQL in the request by the parser;
  • With the parsed sql and the catalog/schema module, DataFusion can generate the logical plan;
  • With the logical plan, the corresponding Interpreter is created and logical plan will be executed by it;
  • For the logical plan of normal INSERT SQL, it will be executed through InsertInterpreter;
  • In the InsertInterpreter, write method of Table provided Analytic Engine is called:
    • Write the data into WAL first;
    • Write the data into MemTable then;
  • Before writing to MemTable, the memory usage will be checked. If the memory usage is too high, the flush process will be triggered:
    • Persist some old MemTables as SSTs;
    • Delete the corresponding WAL entries;
    • Updates the manifest for the new SSTs and the sequence number of WAL;
  • Server module responds to the client with the execution result.

Storage

Query

Wal

Table Partitioning