本文共 24826 字,大约阅读时间需要 82 分钟。
主要操作包括: print table,DataSet 转换成table,Scan,select,as,where / filter,groupBy,distinct,join,leftOuterJoin,rightOuterJoin
union,unionAll,intersect,intersectAll,minus,minusAll,in,orderBy,fetch,offset,Sink csv,insert
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.datasetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) table.first(1000).print() /** * 打印输出表数据 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.datasetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run1 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1").first(10) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1").first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.selectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //选择需要的字段 .select('_1,'_2,'_3) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.asimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //重命令字段名称 .as('id,'name,'value) //选择需要的字段 .select('id,'name,'value) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.asimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //重命令字段名称 .as('id,'name,'value) //选择需要的字段 .select('id,'name as 'name2,'value) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //重命令字段名称 .as('id,'name,'value) //选择需要的字段 .select('id,'name,'value) //条件过滤 .where("value=20") .where("id=4") .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
4,c,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //重命令字段名称 .as('id,'name,'value) //选择需要的字段 .select('id,'name,'value) //条件过滤 .where('value === 20) .where('id === 4) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
4,c,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.groupByimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",40) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //重命令字段名称 .as('id,'name,'value) //选择需要的字段 .groupBy('name) .select('name,'value.sum as 'value) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
a,10b,20c,70
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //记录去重 .distinct() .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 3,c,30 */ }}
1,a,103,c,302,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30),(20,"b",20) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1") //去掉字段重复的再求和 .select('_3.sum.distinct) .first(100) //print 输出 (相当于sink) .print() /** * 输出结果 * 60 */ }}
60
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.join(table2).where(" a = d ").first(1000).print() }}
1,a,10,1,a,100
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) //table.leftOuterJoin(table2,"a=d").first(1000).print() table.leftOuterJoin(table2,'a === 'd).first(1000).print() /** * 输出结果 * * 2,b,20,null,null,null * 1,a,10,1,a,100 * 3,c,30,null,null,null */ }}
1,a,10,1,a,1002,b,20,null,null,null3,c,30,null,null,null
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.rightOuterJoin(table2,"a = d").first(1000).print() /** * 输出结果 * * * null,null,null,20,b,20 * null,null,null,30,c,30 * 1,a,10,1,a,100 */ }}
null,null,null,20,bnull,null,null,30,c1,a,10,1,a,100
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.union(table2).first(1000).print() /** * 输出结果 * * 30,c,30 * 1,a,100 * 2,b,20 * 20,b,20 * 1,a,10 * 3,c,30 */ }}
30,c,301,a,1002,b,2020,b,201,a,103,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.unionAll(table2).first(1000).print() /** * 输出结果 * * 30,c,30 * 1,a,100 * 2,b,20 * 20,b,20 * 1,a,10 * 3,c,30 */ }}
1,a,102,b,203,c,301,a,1002,b,2020,b,2030,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.intersect(table2).first(1000).print() /** * 输出结果 * * 2,b,20 */ }}
2,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) table.intersectAll(table2).first(1000).print() /** * 输出结果 * * 2,b,20 */ }}
2,b,202,b,20
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) /** * 左表不存在于右表中的数据,会去重 */ table.minus(table2).first(1000).print() /** * 输出结果 * 1,a,10 * 3,c,30 */ }}
1,a,103,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) ) val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f) /** * 左表不存在于右表中的数据,不会去重,如果左表某个元素有n次,右表中有m次,那这个元素出现的是n - m次 */ table.minusAll(table2).first(1000).print() /** * 输出结果 * * 1,a,10 * 2,b,20 * 2,b,20 * 3,c,30 */ }}
1,a,102,b,202,b,203,c,30
如果子查询中的值在表中存在就返回真,这个元素就满足条件可以被返回来
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.inimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) ) //列不能重复 val table = tableEnv.fromDataSet(dataSet,'a,'b,'c) val table2 = tableEnv.fromDataSet(dataSet2,'d) /** * 表和子表的关系 * 子查询只能由一列组成,表的查询条件的列类型需要和子查询保持一致 * 如果子查询中的值在表中存在就返回真,这个元素就满足条件可以被返回来 */ table.where('a.in(table2)) .first(1000).print() /** * 输出结果 * * 1,a,10 */ }}
1,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderByimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分区来排序) .orderBy('id.desc) //.orderBy('value1.asc) .first(1000) //print 输出 (相当于sink) .print() /** * 输出结果 * * 20,f,200 * 3,c,30 * 2,b,20 * 1,a,10 */ }}
20,f,2003,c,302,b,201,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.fetchimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分区来排序) .orderBy('id.desc) .fetch(2) //只有有序的才能用,只取了2个元素 .first(1000) //print 输出 (相当于sink) .print() /** * 输出结果 * * 20,f,200 * 3,c,30 */ }}
20,f,2003,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.offsetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) //注册table tableEnv.registerTable("user1",table) //查询table 所有数据 tableEnv.scan("user1").as('id,'name,'value1) //.orderBy('id.asc) //按id列,升序排序(注意是按分区来排序) .orderBy('id.desc) .offset(2) //只有有序的才能用,偏移了2个元素 .first(1000) //print 输出 (相当于sink) .print() /** * 输出结果 * * 2,b,20 * 1,a,10 */ }}
2,b,201,a,10
package com.opensourceteams.module.bigdata.flink.example.tableapi.sink.csvimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.sinks.CsvTableSinkobject Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) val cvsTableSink = new CsvTableSink("sink-data/csv/a.csv", ",", 1, WriteMode.OVERWRITE ) val fieldNames: Array[String] = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT) tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink) table.insertInto("cvsTableSink") env.execute() }}
1,a,102,b,203,c,30
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.insertimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.api.scala._import org.apache.flink.table.sinks.CsvTableSinkobject Run { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) //从dataset转化为 table val table = tableEnv.fromDataSet(dataSet) val cvsTableSink = new CsvTableSink("/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data/csv/a.csv", ",", 1, WriteMode.OVERWRITE ) val fieldNames: Array[String] = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT) tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink) table.insertInto("cvsTableSink") env.execute() }}
1,a,102,b,203,c,30
转载地址:http://zwcia.baihongyu.com/