博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink1.7.2 tableapi批处理示例
阅读量:6241 次
发布时间:2019-06-22

本文共 24826 字,大约阅读时间需要 82 分钟。

flink1.7.2 tableapi批处理示例

源码

概述

  • 本文为flink1.7.2 tableapi批处理示例
  • 主要操作包括: print table,DataSet 转换成table,Scan,select,as,where / filter,groupBy,distinct,join,leftOuterJoin,rightOuterJoin

    union,unionAll,intersect,intersectAll,minus,minusAll,in,orderBy,fetch,offset,Sink csv,insert

print table

  • 功能描述: 打印输出表数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.datasetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    table.first(1000).print()    /**      * 打印输出表数据      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

DataSet 转换成table

package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.datasetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run1 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1").first(10)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *       * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

Scan

  • 功能描述: 查询表中所有数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.scanimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1").first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

select

  • 功能描述: 选择表中需要的字段
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.selectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //选择需要的字段      .select('_1,'_2,'_3)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

as

  • 功能描述: 重命名字段名称
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.asimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //重命令字段名称      .as('id,'name,'value)      //选择需要的字段      .select('id,'name,'value)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

as

  • 功能描述: 重命名字段名称
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.asimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //重命令字段名称      .as('id,'name,'value)      //选择需要的字段       .select('id,'name as 'name2,'value)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,30

where / filter (过滤字段,字符串)

  • 功能描述: 条件过滤
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //重命令字段名称      .as('id,'name,'value)      //选择需要的字段      .select('id,'name,'value)      //条件过滤      .where("value=20")      .where("id=4")      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
4,c,20

where / filter (过滤字段,表达式)

  • 功能描述: 过滤数据
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.whereimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",20) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //重命令字段名称      .as('id,'name,'value)      //选择需要的字段      .select('id,'name,'value)      //条件过滤      .where('value === 20)      .where('id === 4)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
4,c,20

groupBy

  • 功能描述: 分组统计
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.groupByimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30), (4,"c",40) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //重命令字段名称      .as('id,'name,'value)      //选择需要的字段      .groupBy('name)      .select('name,'value.sum as 'value)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
  • 70 = 30 + 40
a,10b,20c,70

distinct

  • 功能描述: 查询记录去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //记录去重      .distinct()      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,103,c,302,b,20

distinct

  • 功能描述: sum.distinct ,去掉字段重复的再求和
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run2 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(1,"a",10),(2,"b",20), (3,"c",30),(20,"b",20) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1")      //去掉字段重复的再求和      .select('_3.sum.distinct)      .first(100)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      * 60      */  }}
  • 输出结果
60

join

  • 功能描述: 内连接
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.join(table2).where(" a = d ").first(1000).print()  }}
  • 输出结果
1,a,10,1,a,100

leftOuterJoin

  • 功能描述: 左外连接,用左表中的每一个元素,去连接右表中的元素,如果右表中存在,就匹配值,如呆不存在就为空值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   //table.leftOuterJoin(table2,"a=d").first(1000).print()   table.leftOuterJoin(table2,'a === 'd).first(1000).print()    /**      * 输出结果      *      * 2,b,20,null,null,null      * 1,a,10,1,a,100      * 3,c,30,null,null,null      */  }}
  • 输出结果
1,a,10,1,a,1002,b,20,null,null,null3,c,30,null,null,null

rightOuterJoin

  • 功能描述: 右外连接,用右表中的每一个元素,去连接左表中的元素,如果左表中存在,就匹配值,如呆不存在就为空值
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.rightOuterJoin(table2,"a = d").first(1000).print()    /**      * 输出结果      *      *      * null,null,null,20,b,20      * null,null,null,30,c,30      * 1,a,10,1,a,100      */  }}
  • 输出结果
null,null,null,20,bnull,null,null,30,c1,a,10,1,a,100

union

  • 功能描述: 两个表串连,取并集(会去重)
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.union(table2).first(1000).print()    /**      * 输出结果      *      * 30,c,30      * 1,a,100      * 2,b,20      * 20,b,20      * 1,a,10      * 3,c,30      */  }}
  • 输出结果
30,c,301,a,1002,b,2020,b,201,a,103,c,30

unionAll 两个表串连,取并集(不会去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.unionAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.unionAll(table2).first(1000).print()    /**      * 输出结果      *      * 30,c,30      * 1,a,100      * 2,b,20      * 20,b,20      * 1,a,10      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,203,c,301,a,1002,b,2020,b,2030,c,30

intersect,两个表相连接,取交集 (会去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.intersect(table2).first(1000).print()    /**      * 输出结果      *      * 2,b,20      */  }}
  • 输出结果
2,b,20

intersectAll,两个表相连接,取交集 (不会去重)

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.intersectAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)   table.intersectAll(table2).first(1000).print()    /**      * 输出结果      *      * 2,b,20      */  }}
  • 输出结果
2,b,202,b,20

minus

  • 功能描述: 左表不存在于右表中的数据,会去重
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    env.setParallelism(1)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)    /**      * 左表不存在于右表中的数据,会去重      */   table.minus(table2).first(1000).print()    /**      * 输出结果      * 1,a,10      * 3,c,30      */  }}
  • 输出结果
1,a,103,c,30

minusAll

  • 功能描述: 左表不存在于右表中的数据,不会去重,如果左表某个元素有n次,右表中有m次,那这个元素出现的是n - m次
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.minusAllimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    env.setParallelism(1)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20),(2,"b",20),(2,"b",20),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(2,"b",20),(2,"b",20),(20,"b",20), (30,"c",30) )    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d,'e,'f)    /**      * 左表不存在于右表中的数据,不会去重,如果左表某个元素有n次,右表中有m次,那这个元素出现的是n - m次      */   table.minusAll(table2).first(1000).print()    /**      * 输出结果      *      * 1,a,10      * 2,b,20      * 2,b,20      * 3,c,30      */  }}
  • 输出结果
1,a,102,b,202,b,203,c,30

in

  • 功能描述:表和子表的关系,子查询只能由一列组成,
    表的查询条件的列类型需要和子查询保持一致,

如果子查询中的值在表中存在就返回真,这个元素就满足条件可以被返回来

  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.inimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    val dataSet2 = env.fromElements( (1,"a",100),(20,"b",20), (30,"c",30) )    //列不能重复    val table = tableEnv.fromDataSet(dataSet,'a,'b,'c)    val table2 = tableEnv.fromDataSet(dataSet2,'d)    /**      * 表和子表的关系      * 子查询只能由一列组成,表的查询条件的列类型需要和子查询保持一致      * 如果子查询中的值在表中存在就返回真,这个元素就满足条件可以被返回来      */   table.where('a.in(table2))      .first(1000).print()    /**      * 输出结果      *      * 1,a,10      */  }}
  • 输出结果
1,a,10

orderBy

  • 功能描述: 按指定列的升序或降序排序(是按分区来排序的)
  • 经测试只能按一列进行排骗子
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderByimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    env.setParallelism(1)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1").as('id,'name,'value1)      //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)      .orderBy('id.desc)      //.orderBy('value1.asc)      .first(1000)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *       * 20,f,200      * 3,c,30      * 2,b,20      * 1,a,10      */  }}
  • 输出结果
20,f,2003,c,302,b,201,a,10

fetch

  • 功能描述: 先进行排序后,取前几个元素
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.fetchimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    env.setParallelism(1)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1").as('id,'name,'value1)      //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)       .orderBy('id.desc)      .fetch(2)  //只有有序的才能用,只取了2个元素      .first(1000)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 20,f,200      * 3,c,30      */  }}
  • 输出结果
20,f,2003,c,30

offset

  • 功能描述: 只有有序的才能用,偏移了2个元素
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.offsetimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._object Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    env.setParallelism(1)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20) ,(20,"f",200),(3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    //注册table    tableEnv.registerTable("user1",table)    //查询table 所有数据    tableEnv.scan("user1").as('id,'name,'value1)      //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)       .orderBy('id.desc)      .offset(2)  //只有有序的才能用,偏移了2个元素      .first(1000)      //print 输出 (相当于sink)      .print()    /**      * 输出结果      *      * 2,b,20      * 1,a,10      */  }}
  • 输出结果
2,b,201,a,10

Sink csv

  • 功能描述:
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.sink.csvimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.sinks.CsvTableSinkobject Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    val cvsTableSink = new CsvTableSink("sink-data/csv/a.csv",      ",",      1,      WriteMode.OVERWRITE        )    val fieldNames: Array[String] = Array("id", "name", "value")    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT)    tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink)    table.insertInto("cvsTableSink")    env.execute()  }}
  • 输出结果
1,a,102,b,203,c,30

insert

  • 功能描述: 往一个表中插入数据,相当于sink
  • scala 程序
package com.opensourceteams.module.bigdata.flink.example.tableapi.operation.insertimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.scala.{ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.api.scala._import org.apache.flink.table.sinks.CsvTableSinkobject Run {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)    val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )    //从dataset转化为 table    val table = tableEnv.fromDataSet(dataSet)    val cvsTableSink = new CsvTableSink("/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data/csv/a.csv",      ",",      1,      WriteMode.OVERWRITE        )    val fieldNames: Array[String] = Array("id", "name", "value")    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.INT)    tableEnv.registerTableSink("cvsTableSink",fieldNames,fieldTypes,cvsTableSink)    table.insertInto("cvsTableSink")    env.execute()  }}
  • 输出结果
  • a.csv
1,a,102,b,203,c,30

转载地址:http://zwcia.baihongyu.com/

你可能感兴趣的文章
LDA主题模型简介
查看>>
可拖动的DIV续
查看>>
关于“类型初始值设定项引发异常”
查看>>
MySql 小表驱动大表
查看>>
Redis 数据结构的底层实现 (一) RealObject,embstr,sds,ziplist,quicklist
查看>>
SQL语句注入的问题
查看>>
jQueryEasyUI Messager基本使用
查看>>
【C语言学习趣事】_33_关于C语言和C++语言中的取余数(求模)的计算_有符号和无符号数的相互转换问题...
查看>>
Tensorboard教程:显示计算图中节点信息
查看>>
java 线程基本概念 可见性 同步
查看>>
Java:JUnit包
查看>>
unity_快捷键
查看>>
洛谷P3358 最长k可重区间集问题(费用流)
查看>>
洛谷P1251 餐巾计划问题(费用流)
查看>>
Beta冲刺(2/5)(麻瓜制造者)
查看>>
vs2012编码的UI测试使用教程
查看>>
android 在非UI线程更新UI仍然成功原因深入剖析
查看>>
清北NOIP训练营集训笔记——图论
查看>>
oracle ORA-00060死锁查询、表空间扩容
查看>>
转载自https://github.com/jsfront/src/blob/master/css.md
查看>>