MongoDB 上的计算库
MongoDB内置json风格的查询表达式,但有时候用起来不太方面,这种情况下我们要把数据从MongoDB取出来,用外部的第三方库函数完成计算。下面将对比MongoDB上的几种计算库,尤其是语法表达和部署配置方面的区别。
MongoDB Connector
这是MongoDB官方提供的计算库,主要功能是模拟MySQL服务,负责SQL到json查询表达式的翻译,对上接收ODBC或JDBC的SQL请求,对下用json查询表达式访问MongoDB。
Connector支持基本的SQL语法,下面举例说明。MongoDB有名为test1的collection,大多数字段为简单类型,用来存储员工信息,Orders字段为数组类型,用来存储当前员工的多个订单。部分数据如下:
[{ "_id": {"$oid": "6074f6c7e85e8d46400dc4a7"}, "EId": 7,"State": "Illinois","Dept": "Sales","Name": "Alexis","Gender": "F","Salary": 9000,"Birthday": "1972-08-16", "Orders": [ {"OrderID": 70,"Client": "DSG","SellerId": 7,"Amount": 288,"OrderDate": "2009-09-30"}, {"OrderID": 131,"Client": "FOL","SellerId": 7,"Amount": 103.2,"OrderDate": "2009-12-10"} ] } { "_id": {"$oid": "6074f6c7e85e8d46400dc4a8"}, "EId": 8,"State": "California", ... }]
下面用SQL嵌入JAVA代码,实现针对订单表的条件查询。
package mon; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class Main { public static void main(String[] args)throws Exception { Class.forName("com.mysql.jdbc.Driver"); Connection connection =DriverManager.getConnection("jdbc:mysql://127.0.0.1:3307?source=mongo&mechanism=PLAIN&useSSL=false&authenticationPlugins=org.mongodb.mongosql.auth.plugin.MongoSqlAuthenticationPlugin"); Statement statement = connection.createStatement(); String str="SELECT * FROM mongo.test1_orders where Orders.Amount>1000 and Orders.Amount<=3000 and Orders.Client like'%S%' "; ResultSet result = statement.executeQuery(str); … if(connection != null) connection.close(); } }
类似地,只需修改SQL语句,还可以实现分组汇总和条件查询:
str="SELECT year( Orders.Orderdate) y,sum( Orders.Amount) s FROM mongo.test1_orders group by year( Orders.Orderdate)"; str= "SELECT o.Orders.OrderID,o.Orders.Client,o.Orders.Sel≤rId,o.Orders.Amount,o.Orders.OrderDate,e.Name,e.Gender,e.Dept from mongo.test1_Orders o, mongo. test1 e where o.Orders.Sel≤rId=e.EId";
上面代码中,Orders.Orderdate是子文档的默认字段名,虽然里面用到了点号,但实际上SQL不支持多层数据类型,所以Orders.Orderdate只是外观像主子关系(可在元数据文件中重定义),但实际并不会分别解析。事实上,Connector把collection test1识别为2个独立的表,一个是不含子文档的table test1,另一个是只有子文档的table test1_Orders。这就导致SQL必须再此(额外)建立关联关系,而不能利用collection原有的天然主子关系。显然,这样的计算效率并不高。
除了不支持多层结构这种通用的SQL缺点之外,Connector 本身也是各类SQL中表达能力较弱的,比如不支持窗口函数。事实上,官网已经明确说MongoDB Connector只适合一些BI工具的基本需求。
由于是官方产品,所以MongoDB Connector的集成和配置都很简单。安装本计算库后,只需在命令行执行如下命令,即可启动数据库服务:
mongosqld --mongo-uri "mongodb://localhost:27017/?connect=direct" --addr "127.0.0.1:3307"
Calcite
Calcite的理想是用SQL语言计算任意数据源,其中就包括MongoDB。遗憾的是,Calcite on MongoDB的文档少且粗,有些功能没有找到具体说明,这导致下面的描述可能不准确。
Calcite只能以collection为单位取数,如果colleciton较大,就很容易内存溢出(Calcite只支持内存计算)。Calcite不能从多层collection中取数,比如查询前面collection test1里的子文档。为了迁就Calcite,这里将test重整成2个单层collection,即Employees和Orders。
使用Calcite对collection Orders进行条件查询时,代码如下:
package org.example; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Properties; public class App { public static void main(String[] args ) throws Exception{ Properties config = new Properties(); config.put("model", "d:\\mongo-model.json"); config.put("lex", "MYSQL"); Connection con = DriverManager.getConnection("jdbc:calcite:", config); Statement stmt = con.createStatement(); String sql ="select * from orders where Amount>1000 and Amount<=3000"; ResultSet rs = stmt.executeQuery(sql); … if(con!= null) con.close(); } }
应该注意到,这里的条件查询语句简化了,没有模糊查询部分(其他计算库都有),这是因为Calcite还不支持模糊查询。类似地,Calcite也不支持取年份的函数,或字符串和日期的转换函数,所以前面的分组汇总无法实现,只能改写成下面这样:
sql="select Client, sum(Amount) from orders group by Client";
Calciteshe对关联计算的支持也不好,比如不能取部分字段,只能用*号取全部字段,所以前面的关联查询无法实现,只能改写成下面这样:
sql="SELECT * from Orders,Employees where Orders.SellerId=Employees.EId";
Calcite的配置分两部分,首先在Maven中引入calcite-mongodb,之后建立元数据文件mongo_model.json,具体内容如下:
{ "version": "1.0", "defaultSchema": "dSchema", "schemas": [ { "type": "custom", "name": "alias", "factory": "org.apache.calcite.adapter.mongodb.MongoSchemaFactory", "operand": { "host": "localhost:27017", "database": "mongo" } }, { "name": "dSchema", "tables": [ { "name": "orders", "type": "view", "sql": "select cast(_MAP['OrderID'] AS integer)AS OrderID,cast(_MAP['Client'] AS varchar(40)) AS Client,cast(_MAP['SellerId'] AS integer)AS SellerId,cast(_MAP['Amount'] AS float)AS Amount,cast(_MAP['OrderDate'] AS varchar(20)) AS OrderDate from \"alias\".\"Orders\"" }, { "name": "employees", "type": "view", "sql": "select cast(_MAP['EId'] AS integer)AS EId,cast(_MAP['State'] AS varchar(40)) AS State,cast(_MAP['Dept'] AS varchar(40)) AS Dept,cast(_MAP['Name'] AS varchar(40)) AS Name,cast(_MAP['Gender'] AS varchar(40)) AS Gender,cast(_MAP['Salary'] AS float)AS Salary,cast(_MAP['Birthday'] AS varchar(20)) AS Birthday from \"alias\".\"Employees\"" } ] } ] }
上面配置中,\"alias\".\"Orders\"是物理表名,orders是对应的视图名。理论上不用配置视图,只需在代码中直接查物理表即可,但实际上直接查物理表会导致很多SQL错误(比如分组汇总),这很可能是Calcite不够完善导致的。
Scala
Scala是常用的结构化计算语言,对MongoDB支持较早,其原理是:先从MongoDB读取collection,存储为Scala的DataFrame数据对象(或RDD),再用DataFrame的通用计算能力完成计算。
Scala计算库存在一些先天缺点。Scala只能以collection为单位取数,不支持用mongoDB的json查询表达式取数,如果colleciton数据量较大,则取数会花费大量时间。Scala不能从多层collection中取数,如果想计算MongoDB中的多层collection,则必须改造成多个单层collection。比如前面例子中的test1必须拆成2个单层的Orders和Employees。
使用Scala对collection Orders进行条件查询的代码如下:
package test import org.apache.spark.sql.SparkSession import com.mongodb.spark.config._ import com.mongodb.spark.sql.toSparkSessionFunctions object Mon { def main(args: Array[String]): Unit = { val warehouseLocation = "file:${system:user.dir}/spark-warehouse" val spark = SparkSession.builder() .master("local") .appName("MongoDB Test") .getOrCreate() val Orders = spark.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://127.0.0.1:27017/mongo.Orders") )) val condtion=Orders.where("Amount>1000 and Amount<=3000 and Client like'%S%' ") condtion.show() } }
类似地,还可以实现分组汇总和关联计算:
//分组汇总 val groupBy=Orders.groupBy(year(Orders("OrderDate"))).agg(sum("Amount")) //关联计算 val Employees = spark.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://127.0.0.1:27017/mongo.employees") )) val join=Orders.join(Employees,Orders("SellerId")===Employees("EId"),"Inner") .select("OrderID","Client","SellerId","Amount","OrderDate","Name","Gender","Dept")
应该注意到,虽然必须拆分多层的collection才能取数,但只要取到数据,DataFrame的计算能力还是非常强的,这便是通用数据结构的好处。
配置方面,对于程序员来说非常简单,只需在Maven加入org.mongodb.spark即可。
集算器 SPL
集算器 SPL也是专业的开源结构化计算引擎,原理和Calcite类似,可以用统一的语法和数据结构计算各类数据源,其中就包括MongoDB。但集算器 SPL更“轻”,层次更少,语法更简单,对MongoDB的支持也更成熟。
比如对多层collection test1进行条件查询,SPL代码写作:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/mongo") |
2 | =mongo_shell(A1,"test1.find()") |
3 | =A2.conj(Orders) |
4 | =A3.select(Amount>1000 && Amount<=3000 && like@c(Client,"*s*")).fetch() |
5 | =mongo_close(A1) |
从A2可以看出来,SPL支持MongoDB的json查询表达式(find、count、distinct和aggregate),比如区间查询写作:=mongo_shell(A2,"test1.find({Orders.mount:{gt:1000,lt:3000}})")。
在collection数据较多且json表达式较简单的时候,可以通过这种方式减少取到的数据,以防内存溢出;也可以加快查询速度,比如针对索引的查询。如果取到的数据依旧很多,SPL也能轻松处理,因为A2返回的是游标类型,可以计算超出内存的数据。
上述代码可在IDE中执行,也可以存为脚本文件(比如select.dfx),通过JDBC接口在JAVA中调用,具体如下:
package Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class test1 { public static void main(String[] args)throws Exception { Class.forName("com.esproc.jdbc.InternalDriver"); Connection connection =DriverManager.getConnection("jdbc:esproc:local://"); Statement statement = connection.createStatement(); ResultSet result = statement.executeQuery("call select()"); …… if(connection != null) connection.close(); } }
类似地,分组汇总代码如下:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/mongo") |
2 | =mongo_shell(A1,"test1.find()") |
3 | =A2.conj(Orders).groups(year(OrderDate);sum(Amount)) |
4 | =mongo_close(A1) |
关联查询代码如下:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/mongo") |
2 | =mongo_shell(A1,"test1.find()") |
3 | =A2.new(Orders.OrderID,Orders.Client, Name,Gender,Dept).fetch() |
4 | =mongo_close(A1) |
这里要注意的是,SPL的关联代码要比其他计算库都要简单(实际没有关联动作),甚至比MongoDB官方产品Connector简单。这是因为SPL的数据结构本身就是多层的,可以直接对应test1这种多层的collection,可以天然表达主子关系,如此一来就不必再额外进行关联。而其他计算库都是单层数据结构,难以对应多层collection。
当然,SPL也支持2个单层collection的关联:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/mongo") |
2 | =mongo_shell(A1,"Orders.find()").fetch() |
3 | =mongo_shell(A1,"Employees.find()").fetch() |
4 | =mongo_close(A1) |
5 | =join(A2,SellerId;A3,EId) |
6 | =A5.new(_1.OrderID,_1.Client,_2.Name,_2.Gender,_2.Dept) |
SPL表达形式多样,除了本身的过程化语法,还支持SQL语法。因为SQL数据类型不支持多层数据(参考Calcite和Connector),所以只支持2个单层collection的关联,代码如下:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/mongo") |
2 | =mongo_shell(A34,"Orders.find()").fetch() |
3 | =mongo_shell(A34,"Employees.find()").fetch() |
4 | =mongo_close(A34) |
5 | $select o.OrderId,o.Client,e.Name,e.Gender,e.Dept from {A35} o join {A36} e on o.SellerId=e.EId |
MongoDB的特色是多层数据,用json表达式计算多层数据会遇到很多困难,这种情况下SPL做计算库经常可以简化计算。比如:统计下面每条记录中 income,output 的数量之和。
_id | income | output |
1 | {"cpu":1000, "mem":500, "mouse":"100"} | {"cpu":1000, "mem":600 ,"mouse":"120"} |
2 | {"cpu":2000,"mem":1000, "mouse":"50","mainboard":500 } | {"cpu":1500, "mem":300} |
用json表达式计算时,代码很繁琐:
var fields = [ "income", "output"]; db.computer.aggregate([ { $project:{ "values":{ $filter:{ input:{ "$objectToArray":"$$ROOT" }, cond:{ $in:[ "$$this.k", fields ] } } } } }, { $unwind:"$values" }, { $project:{ key:"$values.k", values:{ "$sum":{ "$let":{ "vars":{ "item":{ "$objectToArray":"$values.v" } }, "in":"$$item.v" } } } } }, {$sort: {"_id":-1}}, { "$group": { "_id": "$_id", 'income':{"$first": "$values"}, "output":{"$last": "$values"} }}, ]);
用SPL计算就简单多了:
A | |
1 | =mongo_open("mongodb://127.0.0.1:27017/raqdb") |
2 | =mongo_shell(A1,"computer.find()").fetch() |
3 | =A2.new(_id:ID,income.array().sum():INCOME,output.array().sum():OUTPUT) |
4 | >A1.close() |
最后说下集算器 SPL的配置。在Extend library中启用MongoCli即可完成配置,配置时可用图形界面。
通过上述比较可以看出:在语法表达方面,集算器 SPL对多层数据支持较完美;Connector可以将多层数据识别为多个表,基本能用;scala要将多层数据改造成单层,成本非常高;Calcite不仅改造成本高,而且不够成熟稳定。在配置部署方面,Connector较为方便,集算器 SPL和Scala也比较简单,Calcite依旧垫底。