

SuperSQL(或写作 SuperSQL/Super-SQL)是一个跨数据库联邦查询系统,旨在通过扩展 SQL 语法实现对多种异构数据源的统一查询和集成分析。它允许用户通过单一的 SQL 语句同时操作多个不同类型的数据库、数据湖或文件系统,而无需手动整合数据或切换工具。
1.SuperSQL 解决的问题
SuperSQL 说白了就是一个“万能查询工具”,专门解决公司里数据乱七八糟放的问题。比如公司有的数据在 MySQL 数据库里,有的在 MongoDB 这种 NoSQL 里,还有些报表存在 AWS 云上或者本地文件里。以前要分析这些数据,得分别写代码查不同的系统,搞不好还得把数据搬来搬去,特别麻烦。
而 SuperSQL 直接让你用一句 SQL(就是大家最熟悉的那种数据库查询语言)同时查所有这些地方的数据。比如说,你可以直接写一句:“把 MySQL 里的订单表,和 AWS 上存的客户日志 CSV 文件,还有 MongoDB 里的用户信息,按用户 ID 拼在一起,统计最近一个月消费超过 1000 块的用户。” 它会在背后自动帮你搞定不同数据库的沟通、数据格式转换这些脏活累活。
SuperSQL 解决的问题:
- 数据孤岛(Data Silos)
- 多数据源查询的复杂性
- 实时分析需求
- 技术栈碎片化
- 资源与成本优化
2.SuperSQL复杂场景
1.需求:从MySQL、MongoDB和S3的CSV文件中,查询过去7天订单金额超过1万的用户信息(含用户画像和地理位置)。
难点:多数据源关联、时间过滤、嵌套JSON解析。
SELECT
mysql://prod_db/orders.user_id AS uid,
mongo://analytics/users.profile.name AS username,
s3://bucket/customer_locations.csv.city AS city,
SUM(mysql://prod_db/orders.amount) AS total_spent
FROM
mysql://prod_db/orders
JOIN mongo://analytics/users ON orders.user_id = users.id
JOIN s3://bucket/customer_locations.csv ON orders.user_id = csv.user_id
WHERE
mysql://prod_db/orders.order_date >= NOW() - INTERVAL '7 days'
AND mysql://prod_db/orders.amount > 10000
GROUP BY uid, username, city;
2.需求:将Kafka实时流数据(JSON格式)与HDFS历史日志(Parquet格式)关联,找出异常请求模式。
难点:流批混合查询、JSON解析、时间范围匹配。
SELECT
kafka://logs_stream/ip AS client_ip,
hdfs:///logs/history.parquet.user_agent AS user_agent,
COUNT(*) AS error_count
FROM
kafka://logs_stream -- 实时Kafka流
JOIN hdfs:///logs/history.parquet -- 历史Parquet文件
ON kafka://logs_stream.user_id = hdfs:///logs/history.parquet.user_id
WHERE
kafka://logs_stream.timestamp BETWEEN '2023-10-01 00:00:00' AND NOW()
AND kafka://logs_stream.status_code = '500'
GROUP BY client_ip, user_agent
HAVING error_count > 10;
3.需求:联合查询AWS Redshift(云数仓)和本地PostgreSQL,计算跨云销售占比。
难点:跨网络延迟优化、云与本地鉴权、数据类型转换。
SELECT
redshift://sales.cloud_orders.region AS region,
(SUM(redshift://sales.cloud_orders.amount) +
SUM(postgresql://local_db/onprem_orders.amount)) AS total_sales,
SUM(postgresql://local_db/onprem_orders.amount) / total_sales AS onprem_ratio
FROM
redshift://sales.cloud_orders
FULL OUTER JOIN postgresql://local_db/onprem_orders
ON cloud_orders.order_id = onprem_orders.order_id
GROUP BY region;
4.需求:从MongoDB的嵌套JSON订单中提取商品列表,关联Elasticsearch的商品库存。
难点:JSON数组展开、NoSQL与搜索引擎联合查询。
SELECT
mongo://orders.order_id AS order_id,
UNNEST(mongo://orders.items[*].product_id) AS product_id, -- 展开JSON数组
elasticsearch://inventory/products.stock AS stock
FROM
mongo://orders
JOIN elasticsearch://inventory/products
ON UNNEST(mongo://orders.items[*].product_id) = elasticsearch://inventory/products.id
WHERE
mongo://orders.status = 'shipped'
AND elasticsearch://inventory/products.stock < 10; -- 关联低库存商品
5.需求:从Cassandra(用户行为)、S3(用户画像CSV)、PostgreSQL(订单)中提取特征,供模型训练。
难点:特征拼接、稀疏数据处理、分布式执行优化。
SELECT
cassandra://user_behavior.user_id AS user_id,
-- 从Cassandra计算行为频率
COUNT_IF(cassandra://user_behavior.event_type = 'click') AS click_count,
-- 从S3的CSV加载用户画像
s3://profiles/user_details.csv.age AS age,
-- 从PostgreSQL计算历史订单均值
AVG(postgresql://orders.amount) OVER (PARTITION BY user_id) AS avg_order_amount
FROM
cassandra://user_behavior
LEFT JOIN s3://profiles/user_details.csv
ON user_behavior.user_id = user_details.csv.user_id
LEFT JOIN postgresql://orders
ON user_behavior.user_id = orders.user_id
WHERE
cassandra://user_behavior.timestamp >= '2023-01-01';
6.需求:实时监控可疑交易,要求:
实时流:从 Kafka 获取交易流水(包含用户ID、金额、时间戳)。
图数据库:从 Neo4j 查询用户关联网络(例如:用户是否在“高风险群体”的子图中)。
外部API:调用银行内部 API 验证用户账户状态。
历史数据:从 PostgreSQL 读取用户过去30天的交易均值,判断当前交易是否偏离正常范围。
输出:实时触发警报,并写入 Elasticsearch 供风控团队分析。
-- 定义流式输入(Kafka)与动态外部API调用
CREATE STREAM risk_alerts AS
SELECT
t.user_id AS uid,
t.amount AS current_amount,
-- 调用内部API验证账户状态(JSON响应解析)
EXTERNAL_API('POST', 'https://bank-api/validate', {'user_id': t.user_id})->>'status' AS account_status,
-- 查询图数据库判断用户是否在高风险子图
(CALL neo4j://risk_graph/cypher
WITH "MATCH (u:User {id: $uid})-[:KNOWS*2..5]-(r:RiskyGroup) RETURN COUNT(r) > 0 AS is_risky",
PARAMS => {'uid': t.user_id}
) AS is_risky_group,
-- 关联历史数据计算交易偏离度
(t.amount - AVG(pg://history/user_transactions.amount)
OVER (PARTITION BY t.user_id RANGE 30 DAYS PRECEDING)) AS deviation
FROM
kafka://transactions_stream t -- 实时交易流水
JOIN pg://history/user_transactions -- 历史交易表
ON t.user_id = pg://history/user_transactions.user_id
WHERE
t.amount > 100000 -- 大额交易阈值
AND account_status = 'active' -- API返回状态过滤
AND is_risky_group = TRUE -- 图数据库风险标记
AND deviation > 3.0 -- 偏离历史均值3倍标准差
AND t.timestamp >= NOW() - INTERVAL '5 minutes'; -- 仅处理近5分钟数据
-- 将结果实时写入Elasticsearch
SINK INTO elasticsearch://risk_alerts_index
SELECT * FROM risk_alerts;
