TBase 应用接入指南
- 2019 年 12 月 3 日
- 筆記
上一篇:TBase Quick Start (请点击文章底部“阅读原文”查看)
介绍了TBase的架构,源码的编译安装,集群运行状态、启动停止等。本篇将介绍应用程序如何连接TBase数据库进行建库、建表、数据导入、查询等操作。
TBase兼容所有支持Postgres协议的客户端连接,这里将详细介绍JAVA、C语言、shell语言、Python、PHP、Golang 这6种最常用的开发语言连接TBase的操作方法。
1、JAVA开发
1.1、创建数据表
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; public class createtable { public static void main( String args[] ) { Connection c = null; Statement stmt = null; try { Class.forName("org.postgresql.Driver"); c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","tbase", "tbase"); System.out.println("Opened database successfully"); stmt = c.createStatement(); String sql = "create table tbase(id int,nickname text) distribute by shard(id) to group default_group" ; stmt.executeUpdate(sql); stmt.close(); c.close(); } catch ( Exception e ) { System.err.println( e.getClass().getName()+": "+ e.getMessage() ); System.exit(0); } System.out.println("Table created successfully"); } }
说明:
- 这里连接的节点为任意CN主节点,后面所有操作,没特别说明,都是连接到CN主节点进行操作。
1.2、使用普通协议插入数据
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; public class insert { public static void main(String args[]) { Connection c = null; Statement stmt = null; try { Class.forName("org.postgresql.Driver"); c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","tbase", "tbase"); c.setAutoCommit(false); System.out.println("Opened database successfully"); stmt = c.createStatement(); String sql = "INSERT INTO tbase (id,nickname) " + "VALUES (1,'tbase');"; stmt.executeUpdate(sql); sql = "INSERT INTO tbase (id,nickname) " + "VALUES (2, 'pgxz' ),(3,'pgxc');"; stmt.executeUpdate(sql); stmt.close(); c.commit(); c.close(); } catch (Exception e) { System.err.println( e.getClass().getName()+": "+ e.getMessage() ); System.exit(0); } System.out.println("Records created successfully"); } }
1.3、使用扩展协议插入数据
import java.sql.Connection; import java.sql.DriverManager; import java.sql.*; import java.util.Random; public class insert_prepared { public static void main(String args[]) { Connection c = null; PreparedStatement stmt; try { Class.forName("org.postgresql.Driver"); c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","tbase", "tbase"); c.setAutoCommit(false); System.out.println("Opened database successfully"); //插入数据 String sql = "INSERT INTO tbase (id,nickname) VALUES (?,?)"; stmt = c.prepareStatement(sql); stmt.setInt(1, 9999); stmt.setString(2, "tbase_prepared"); stmt.executeUpdate(); //插入更新 sql = "INSERT INTO tbase (id,nickname) VALUES (?,?) ON CONFLICT(id) DO UPDATE SET nickname=?"; stmt = c.prepareStatement(sql); stmt.setInt(1, 9999); stmt.setString(2, "tbase_prepared"); stmt.setString(3, "tbase_prepared_update"); stmt.executeUpdate(); stmt.close(); c.commit(); c.close(); } catch (Exception e) { System.err.println( e.getClass().getName()+": "+ e.getMessage() ); System.exit(0); } System.out.println("Records created successfully"); } }
1.4、copy from 加载文件到表
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; import java.io.*; public class copyfrom { public static void main( String args[] ) { Connection c = null; Statement stmt = null; FileInputStream fs = null; try { Class.forName("org.postgresql.Driver"); c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","tbase", "tbase"); System.out.println("Opened database successfully"); CopyManager cm = new CopyManager((BaseConnection) c); fs = new FileInputStream("/data/tbase/tbase.csv"); String sql = "COPY tbase FROM STDIN DELIMITER AS ','"; cm.copyIn(sql, fs); c.close(); fs.close(); } catch ( Exception e ) { System.err.println( e.getClass().getName()+": "+ e.getMessage() ); System.exit(0); } System.out.println("Copy data successfully"); } }
1.5、copy to 导出数据到文件
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; import java.io.*; public class copyto { public static void main( String args[] ) { Connection c = null; Statement stmt = null; FileOutputStream fs = null; try { Class.forName("org.postgresql.Driver"); c = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:15432/postgres?currentSchema=public&binaryTransfer=false","tbase", "tbase"); System.out.println("Opened database successfully"); CopyManager cm = new CopyManager((BaseConnection) c); fs = new FileOutputStream("/data/tbase/tbase.csv"); String sql = "COPY tbase TO STDOUT DELIMITER AS ','"; cm.copyOut(sql, fs); c.close(); fs.close(); } catch ( Exception e ) { System.err.println( e.getClass().getName()+": "+ e.getMessage() ); System.exit(0); } System.out.println("Copy data successfully"); } }
1.6、jdbc包下载地址
https://jdbc.postgresql.org/download.html
2、C程序开发
2.1、连接数据库
#include <stdio.h> #include <stdlib.h> #include "libpq-fe.h" int main(int argc, char **argv){ const char *conninfo; PGconn *conn; if (argc > 1){ conninfo = argv[1]; }else{ conninfo = "dbname = postgres"; } conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK){ fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn)); }else{ printf("连接数据库成功!n"); } PQfinish(conn); return 0; }
编译
gcc -c -I /usr/local/install/tbase_pgxz/include/ conn.c gcc -o conn conn.o -L /usr/local/install/tbase_pgxz/lib/ -lpq
运行
./conn "host=172.16.0.3 dbname=postgres port=11000" 连接数据库成功!
./conn "host=172.16.0.3 dbname=postgres port=15432 user=tbase" 连接数据库成功!
2.2、建立数据表
#include <stdio.h> #include <stdlib.h> #include "libpq-fe.h" int main(int argc, char **argv){ const char *conninfo; PGconn *conn; PGresult *res; const char *sql = "create table tbase(id int,nickname text) distribute by shard(id) to group default_group"; if (argc > 1){ conninfo = argv[1]; }else{ conninfo = "dbname = postgres"; } conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK){ fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn)); }else{ printf("连接数据库成功!n"); } res = PQexec(conn,sql); if(PQresultStatus(res) != PGRES_COMMAND_OK){ fprintf(stderr, "建立数据表失败: %s",PQresultErrorMessage(res)); }else{ printf("建立数据表成功!n"); } PQclear(res); PQfinish(conn); return 0; }
编译
gcc -c -I /usr/local/install/tbase_pgxz/include/ createtable.c gcc -o createtable createtable.o -L /usr/local/install/tbase_pgxz/lib/ -lpq
运行
./createtable "port=11000 dbname=postgres" 连接数据库成功! 建立数据表成功!
2.3、插入数据
#include <stdio.h> #include <stdlib.h> #include "libpq-fe.h" int main(int argc, char **argv){ const char *conninfo; PGconn *conn; PGresult *res; const char *sql = "INSERT INTO tbase (id,nickname) values(1,'tbase'),(2,'pgxz')"; if (argc > 1){ conninfo = argv[1]; }else{ conninfo = "dbname = postgres"; } conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK){ fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn)); }else{ printf("连接数据库成功!n"); } res = PQexec(conn,sql); if(PQresultStatus(res) != PGRES_COMMAND_OK){ fprintf(stderr, "插入数据失败: %s",PQresultErrorMessage(res)); }else{ printf("插入数据成功!n"); } PQclear(res); PQfinish(conn); return 0; }
编译
gcc -c -I /usr/local/install/tbase_pgxz/include/ insert.c gcc -o insert insert.o -L /usr/local/install/tbase_pgxz/lib/ -lpq
运行
./insert "dbname=postgres port=15432"
2.4、查询数据
#include <stdio.h> #include <stdlib.h> #include "libpq-fe.h" int main(int argc, char **argv){ const char *conninfo; PGconn *conn; PGresult *res; const char *sql = "select * from tbase"; if (argc > 1){ conninfo = argv[1]; }else{ conninfo = "dbname = postgres"; } conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK){ fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn)); }else{ printf("连接数据库成功!n"); } res = PQexec(conn,sql); if(PQresultStatus(res) != PGRES_TUPLES_OK){ fprintf(stderr, "插入数据失败: %s",PQresultErrorMessage(res)); }else{ printf("查询数据成功!n"); int rownum = PQntuples(res) ; int colnum = PQnfields(res); for(int j = 0;j< colnum; ++j){ printf("%st",PQfname(res,j)); } printf("n"); for(int i = 0;i< rownum; ++i){ for(int j = 0;j< colnum; ++j){ printf("%st",PQgetvalue(res,i,j)); } printf("n"); } } PQclear(res); PQfinish(conn); return 0; }
编译
gcc -std=c99 -c -I /usr/local/install/tbase_pgxz/include/ select.c gcc -o select select.o -L /usr/local/install/tbase_pgxz/lib/ -lpq
运行
./select "dbname=postgres port=15432" 连接数据库成功! 查询数据成功! id nickname 1 tbase 2 pgxz
2.5、流数据COPY入表
#include <string.h> #include <stdio.h> #include <stdlib.h> #include "libpq-fe.h" int main(int argc, char **argv){ const char *conninfo; PGconn *conn; PGresult *res; const char *buffer = "1,tbasen2,pgxzn3,Tbase牛"; if (argc > 1){ conninfo = argv[1]; }else{ conninfo = "dbname = postgres"; } conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK){ fprintf(stderr, "连接数据库失败: %s",PQerrorMessage(conn)); }else{ printf("连接数据库成功!n"); } res=PQexec(conn,"COPY tbase FROM STDIN DELIMITER ',';"); if(PQresultStatus(res) != PGRES_COPY_IN){ fprintf(stderr, "copy数据出错1: %s",PQresultErrorMessage(res)); }else{ int len = strlen(buffer); if(PQputCopyData(conn,buffer,len) == 1){ if(PQputCopyEnd(conn,NULL) == 1){ res = PQgetResult(conn); if(PQresultStatus(res) == PGRES_COMMAND_OK){ printf("copy数据成功!n"); }else{ fprintf(stderr, "copy数据出错2: %s",PQerrorMessage(conn)); } }else{ fprintf(stderr, "copy数据出错3: %s",PQerrorMessage(conn)); } }else{ fprintf(stderr, "copy数据出错4: %s",PQerrorMessage(conn)); } } PQclear(res); PQfinish(conn); return 0; }
编译
gcc -c -I /usr/local/install/tbase_pgxz/include/ copy.c gcc -o copy copy.o -L /usr/local/install/tbase_pgxz/lib/ -lpq
执行
./copy "dbname=postgres port=15432" 连接数据库成功! copy数据成功!
3、shell脚本开发
#!/bin/sh if [ $# -ne 0 ] then echo "usage: $0 exec_sql" exit 1 fi exec_sql=$1 masters=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_host, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"` port_list=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select string_agg(node_port::text, ' ') from (select * from pgxc_node where node_type = 'D' order by node_name) t"` node_cnt=`psql -h 172.16.0.29 -d postgres -p 15432 -t -c "select count(*) from pgxc_node where node_type = 'D'"` masters=($masters) ports=($port_list) echo $node_cnt flag=0 for((i=0;i<$node_cnt;i++)); do seq=$(($i+1)) master=${masters[$i]} port=${ports[$i]} echo $master echo $port psql -h $master -p $port postgres -c "$exec_sql" done
4、python程序开发
4.1、安装psycopg2模块
[root@VM_0_29_centos ~]# yum install python-psycopg2
4.2、连接数据库
#coding=utf-8 #!/usr/bin/python import psycopg2 try: conn = psycopg2.connect(database="postgres", user="tbase", password="", host="172.16.0.29", port="15432") print "连接数据库成功" conn.close() except psycopg2.Error,msg: print "连接数据库出错,错误详细信息:%s" %(msg.args[0])
运行
[tbase@VM_0_29_centos python]$ python conn.py 连接数据库成功
4.3、创建数据表
#coding=utf-8 #!/usr/bin/python import psycopg2 try: conn = psycopg2.connect(database="postgres", user="tbase", password="", host="172.16.0.29", port="15432") print "连接数据库成功" cur = conn.cursor() sql = """ create table tbase ( id int, nickname varchar(100) )distribute by shard(id) to group default_group """ cur.execute(sql) conn.commit() print "建立数据表成功" conn.close() except psycopg2.Error,msg: print "TBase Error %s" %(msg.args[0])
运行
[tbase@VM_0_29_centos python]$ python createtable.py 连接数据库成功 建立数据表成功
4.4、插入数据
#coding=utf-8 #!/usr/bin/python import psycopg2 try: conn = psycopg2.connect(database="postgres", user="tbase", password="", host="172.16.0.29", port="15432") print "连接数据库成功" cur = conn.cursor() sql = "insert into tbase values(1,'tbase'),(2,'tbase');" cur.execute(sql) sql = "insert into tbase values(%s,%s)" cur.execute(sql,(3,'pg')) conn.commit() print "插入数据成功" conn.close() except psycopg2.Error,msg: print "操作数据库出库 %s" %(msg.args[0])
运行
[tbase@VM_0_29_centos python]$ python insert.py 连接数据库成功 插入数据成功
4.5、查询数据
#coding=utf-8 #!/usr/bin/python import psycopg2 try: conn = psycopg2.connect(database="postgres", user="tbase", password="", host="172.16.0.29", port="15432") print "连接数据库成功" cur = conn.cursor() sql = "select * from tbase" cur.execute(sql) rows = cur.fetchall() for row in rows: print "ID = ", row[0] print "NICKNAME = ", row[1],"n" conn.close() except psycopg2.Error,msg: print "操作数据库出库 %s" %(msg.args[0])
运行
[tbase@VM_0_29_centos python]$ python select.py 连接数据库成功 ID = 1 NICKNAME = tbase ID = 2 NICKNAME = pgxz ID = 3 NICKNAME = pg
4.6、copy from 加载文件到表
#coding=utf-8 #!/usr/bin/python import psycopg2 try: conn = psycopg2.connect(database="postgres", user="tbase", password="", host="172.16.0.29", port="15432") print "连接数据库成功" cur = conn.cursor() filename = "/data/tbase/tbase.txt" cols = ('id','nickname') tablename="public.tbase" cur.copy_from(file=open(filename),table=tablename,columns=cols,sep=',') conn.commit() print "导入数据成功" conn.close() except psycopg2.Error,msg: print "操作数据库出库 %s" %(msg.args[0])
执行
[tbase@VM_0_29_centos python]$ python copy_from.py 连接数据库成功 导入数据成功
5、PHP程序开发
5.1、连接数据库
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n<BR>"; ; exit; }else{ echo "连接数据库成功"."n<BR>"; } //关闭连接 pg_close($conn); ?>
执行
[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/conn.php 连接数据库成功
5.2、创建数据表
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n"; ; exit; }else{ echo "连接数据库成功"."n"; } //建立数据表 $sql="create table public.tbase(id integer,nickname varchar(100)) distribute by shard(id) to group default_group;"; $result = @pg_exec($conn,$sql) ; if (!$result){ $error_msg=@pg_errormessage($conn); echo "创建数据表出错,详情:".$error_msg."n"; ; exit; }else{ echo "创建数据表成功"."n"; } //关闭连接 pg_close($conn); ?>
执行
[root@VM_0_47_centos test]# curl http://127.0.0.1:8080/dbsta/test/createtable.php 连接数据库成功 创建数据表成功
5.3、插入数据
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n"; ; exit; }else{ echo "连接数据库成功"."n"; } //插入数据 $sql="insert into public.tbase values(1,'tbase'),(2,'pgxz');"; $result = @pg_exec($conn,$sql) ; if (!$result){ $error_msg=@pg_errormessage($conn); echo "插入数据出错,详情:".$error_msg."n"; exit; }else{ echo "插入数据成功"."n"; } //关闭连接 pg_close($conn); ?>
执行
[tbase@VM_0_47_centos test]$ curl http://127.0.0.1:8080/dbsta/test/insert.php 连接数据库成功 插入数据成功
5.4、查询数据
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n"; ; exit; }else{ echo "连接数据库成功"."n"; } //查询数据 $sql="select id,nickname from public.tbase"; $result = @pg_exec($conn,$sql) ; if (!$result){ $error_msg=@pg_errormessage($conn); echo "查询数据出错,详情:".$error_msg."n"; exit; }else{ echo "插入数据成功"."n"; } $record_num = pg_numrows($result); echo "返回记录数".$record_num."n"; $rec=pg_fetch_all($result); for($i=0;$i<$record_num;$i++){ echo "记录数#".strval($i+1)."n"; echo "id:".$rec[$i]["id"]."n"; echo "nickname:".$rec[$i]["nickname"]."nn"; } //关闭连接 pg_close($conn); ?>
调用方法
[root@VM_0_47_centos ~]# curl http://127.0.0.1:8080/dbsta/test/select.php 连接数据库成功 插入数据成功 返回记录数2 记录数#1 id:1 nickname:tbase 记录数#2 id:2 nickname:pgxz
5.5、流数据copy 入表
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n"; ; exit; }else{ echo "连接数据库成功"."n"; } $row=ARRAY("1,TBase","2,pgxz"); $flag=pg_copy_from($conn,"public.tbase",$row,","); if (!$flag){ $error_msg=@pg_errormessage($conn); echo "copy出错,详情:".$error_msg."n"; }else{ echo "copy成功"."n"; } //关闭连接 pg_close($conn); ?>
调用方法
curl http://127.0.0.1/dbsta/cron/php_copy_from.php 连接数据库成功 copy成功
5.6、copy to导出数据到一个数组中
<?php $host="172.16.0.29"; $port="15432"; $dbname="postgres"; $user="tbase" ; $password=""; //连接数据库 $conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password"); if (!$conn){ $error_msg=@pg_errormessage($conn); echo "连接数据库出错,详情:".$error_msg."n"; ; exit; }else{ echo "连接数据库成功"."n"; } $row=pg_copy_to($conn,"public.tbase",","); if (!$row){ $error_msg=@pg_errormessage($conn); echo "copy出错,详情:".$error_msg."n"; }else{ print_r($row); } //关闭连接 pg_close($conn); ?>
调用方法
curl http://127.0.0.1/dbsta/cron/php_copy_to.php 连接数据库成功 Array ( [0] => 1,TBase [1] => 2,pgxz )
6、golang程序开发
6.1、连接数据库
package main import ( "fmt" "time" "github.com/jackc/pgx" ) func main() { var error_msg string //连接数据库 conn, err := db_connect() if err != nil { error_msg = "连接数据库失败,详情:" + err.Error() write_log("Error", error_msg) return } //程序运行结束时关闭连接 defer conn.Close() write_log("Log", "连接数据库成功") } /* 功能描述:写入日志处理 参数说明: log_level -- 日志级别,只能是是Error或Log error_msg -- 日志内容 返回值说明:无 */ func write_log(log_level string, error_msg string) { //打印错误信息 fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("日志级别:", log_level) fmt.Println("详细信息:", error_msg) } /* 功能描述:连接数据库 参数说明:无 返回值说明: conn *pgx.Conn -- 连接信息 err error --错误信息 */ func db_connect() (conn *pgx.Conn, err error) { var config pgx.ConnConfig config.Host = "127.0.0.1" //数据库主机host或ip config.User = "tbase" //连接用户 config.Password = "pgsql" //用户密码 config.Database = "postgres" //连接数据库名 config.Port = 15432 //端口号 conn, err = pgx.Connect(config) return conn, err }
[root@VM_0_29_centos tbase]# go run conn.go 访问时间:2018-04-03 20:40:28 日志级别:Log 详细信息:连接数据库成功
编译后运行
[root@VM_0_29_centos tbase]# go build conn.go [root@VM_0_29_centos tbase]# ./conn 访问时间:2018-04-03 20:40:48 日志级别:Log 详细信息:连接数据库成功
6.2、创建数据表
package main import ( "fmt" "time" "github.com/jackc/pgx" ) func main() { var error_msg string var sql string //连接数据库 conn, err := db_connect() if err != nil { error_msg = "连接数据库失败,详情:" + err.Error() write_log("Error", error_msg) return } //程序运行结束时关闭连接 defer conn.Close() write_log("Log", "连接数据库成功") //建立数据表 sql = "create table public.tbase(id varchar(20),nickname varchar(100)) distribute by shard(id) to group default_group;" _, err = conn.Exec(sql) if err != nil { error_msg = "创建数据表失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "创建数据表成功") } } /* 功能描述:写入日志处理 参数说明: log_level -- 日志级别,只能是是Error或Log error_msg -- 日志内容 返回值说明:无 */ func write_log(log_level string, error_msg string) { //打印错误信息 fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("日志级别:", log_level) fmt.Println("详细信息:", error_msg) } /* 功能描述:连接数据库 参数说明:无 返回值说明: conn *pgx.Conn -- 连接信息 err error --错误信息 */ func db_connect() (conn *pgx.Conn, err error) { var config pgx.ConnConfig config.Host = "127.0.0.1" //数据库主机host或ip config.User = "tbase" //连接用户 config.Password = "pgsql" //用户密码 config.Database = "postgres" //连接数据库名 config.Port = 15432 //端口号 conn, err = pgx.Connect(config) return conn, err }
[root@VM_0_29_centos tbase]# go run createtable.go 访问时间:2018-04-03 20:50:24 日志级别:Log 详细信息:连接数据库成功 访问时间:2018-04-03 20:50:24 日志级别:Log 详细信息:创建数据表成功
6.3、插入数据
package main import ( "fmt" "strings" "time" "github.com/jackc/pgx" ) func main() { var error_msg string var sql string var nickname string //连接数据库 conn, err := db_connect() if err != nil { error_msg = "连接数据库失败,详情:" + err.Error() write_log("Error", error_msg) return } //程序运行结束时关闭连接 defer conn.Close() write_log("Log", "连接数据库成功") //插入数据 sql = "insert into public.tbase values('1','tbase'),('2','pgxz');" _, err = conn.Exec(sql) if err != nil { error_msg = "插入数据失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "插入数据成功") } //绑定变量插入数据,不需要做防注入处理 sql = "insert into public.tbase values($1,$2),($1,$3);" _, err = conn.Exec(sql, "3", "postgresql", "postgres") if err != nil { error_msg = "插入数据失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "插入数据成功") } //拼接sql语句插入数据,需要做防注入处理 nickname = "TBase is ' good!" sql = "insert into public.tbase values('1','" + sql_data_encode(nickname) + "')" _, err = conn.Exec(sql) if err != nil { error_msg = "插入数据失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "插入数据成功") } } /* 功能描述:sql查询拼接字符串编码 参数说明: str -- 要编码的字符串 返回值说明: 返回编码过的字符串 */ func sql_data_encode(str string) string { return strings.Replace(str, "'", "''", -1) } /* 功能描述:写入日志处理 参数说明: log_level -- 日志级别,只能是是Error或Log error_msg -- 日志内容 返回值说明:无 */ func write_log(log_level string, error_msg string) { //打印错误信息 fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("日志级别:", log_level) fmt.Println("详细信息:", error_msg) } /* 功能描述:连接数据库 参数说明:无 返回值说明: conn *pgx.Conn -- 连接信息 err error --错误信息 */ func db_connect() (conn *pgx.Conn, err error) { var config pgx.ConnConfig config.Host = "127.0.0.1" //数据库主机host或ip config.User = "tbase" //连接用户 config.Password = "pgsql" //用户密码 config.Database = "postgres" //连接数据库名 config.Port = 15432 //端口号 conn, err = pgx.Connect(config) return conn, err }
[root@VM_0_29_centos tbase]# go run insert.go 访问时间:2018-04-03 21:05:51 日志级别:Log 详细信息:连接数据库成功 访问时间:2018-04-03 21:05:51 日志级别:Log 详细信息:插入数据成功 访问时间:2018-04-03 21:05:51 日志级别:Log 详细信息:插入数据成功 访问时间:2018-04-03 21:05:51 日志级别:Log 详细信息:插入数据成功
6.4、查询数据
package main import ( "fmt" "strings" "time" "github.com/jackc/pgx" ) func main() { var error_msg string var sql string //连接数据库 conn, err := db_connect() if err != nil { error_msg = "连接数据库失败,详情:" + err.Error() write_log("Error", error_msg) return } //程序运行结束时关闭连接 defer conn.Close() write_log("Log", "连接数据库成功") sql = "SELECT id,nickname FROM public.tbase LIMIT 2" rows, err := conn.Query(sql) if err != nil { error_msg = "查询数据失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "查询数据成功") } var nickname string var id string for rows.Next() { err = rows.Scan(&id, &nickname) if err != nil { error_msg = "执行查询失败,详情:" + err.Error() write_log("Error", error_msg) return } error_msg = fmt.Sprintf("id:%s nickname:%s", id, nickname) write_log("Log", error_msg) } rows.Close() nickname = "tbase" sql = "SELECT id,nickname FROM public.tbase WHERE nickname ='" + sql_data_encode(nickname) + "' " rows, err = conn.Query(sql) if err != nil { error_msg = "查询数据失败,详情:" + err.Error() write_log("Error", error_msg) return } else { write_log("Log", "查询数据成功") } defer rows.Close() for rows.Next() { err = rows.Scan(&id, &nickname) if err != nil { error_msg = "执行查询失败,详情:" + err.Error() write_log("Error", error_msg) return } error_msg = fmt.Sprintf("id:%s nickname:%s", id, nickname) write_log("Log", error_msg) } } /* 功能描述:sql查询拼接字符串编码 参数说明: str -- 要编码的字符串 返回值说明: 返回编码过的字符串 */ func sql_data_encode(str string) string { return strings.Replace(str, "'", "''", -1) } /* 功能描述:写入日志处理 参数说明: log_level -- 日志级别,只能是是Error或Log error_msg -- 日志内容 返回值说明:无 */ func write_log(log_level string, error_msg string) { //打印错误信息 fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("日志级别:", log_level) fmt.Println("详细信息:", error_msg) } /* 功能描述:连接数据库 参数说明:无 返回值说明: conn *pgx.Conn -- 连接信息 err error --错误信息 */ func db_connect() (conn *pgx.Conn, err error) { var config pgx.ConnConfig config.Host = "127.0.0.1" //数据库主机host或ip config.User = "tbase" //连接用户 config.Password = "pgsql" //用户密码 config.Database = "postgres" //连接数据库名 config.Port = 15432 //端口号 conn, err = pgx.Connect(config) return conn, err }
[root@VM_0_29_centos tbase]# go run select.go 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:连接数据库成功 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:查询数据成功 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:id:2 nickname:tbase 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:id:3 nickname:postgresql 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:查询数据成功 访问时间:2018-04-09 10:35:50 日志级别:Log 详细信息:id:1 nickname:tbase
6.5、流数据copy from入表
package main import ( "fmt" "math/rand" "time" "github.com/jackc/pgx" ) func main() { var error_msg string //连接数据库 conn, err := db_connect() if err != nil { error_msg = "连接数据库失败,详情:" + err.Error() write_log("Error", error_msg) return } //程序运行结束时关闭连接 defer conn.Close() write_log("Log", "连接数据库成功") //构造5000行数据 inputRows := [][]interface{}{} var id string var nickname string for i := 0; i < 5000; i++ { id = fmt.Sprintf("%d", rand.Intn(10000)) nickname = fmt.Sprintf("%d", rand.Intn(10000)) inputRows = append(inputRows, []interface{}{id, nickname}) } copyCount, err := conn.CopyFrom(pgx.Identifier{"tbase"}, []string{"id", "nickname"}, pgx.CopyFromRows(inputRows)) if err != nil { error_msg = "执行copyFrom失败,详情:" + err.Error() write_log("Error", error_msg) return } if copyCount != len(inputRows) { error_msg = fmt.Sprintf("执行copyFrom失败,copy行数:%d 返回行数为:%d", len(inputRows), copyCount) write_log("Error", error_msg) return } else { error_msg = "Copy 记录成功" write_log("Log", error_msg) } } /* 功能描述:写入日志处理 参数说明: log_level -- 日志级别,只能是是Error或Log error_msg -- 日志内容 返回值说明:无 */ func write_log(log_level string, error_msg string) { //打印错误信息 fmt.Println("访问时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("日志级别:", log_level) fmt.Println("详细信息:", error_msg) } /* 功能描述:连接数据库 参数说明:无 返回值说明: conn *pgx.Conn -- 连接信息 err error --错误信息 */ func db_connect() (conn *pgx.Conn, err error) { var config pgx.ConnConfig config.Host = "127.0.0.1" //数据库主机host或ip config.User = "tbase" //连接用户 config.Password = "pgsql" //用户密码 config.Database = "postgres" //连接数据库名 config.Port = 15432 //端口号 conn, err = pgx.Connect(config) return conn, err }
[root@VM_0_29_centos tbase]# go run copy_from.go 访问时间:2018-04-09 10:36:40 日志级别:Log 详细信息:连接数据库成功 访问时间:2018-04-09 10:36:40 日志级别:Log 详细信息:Copy 记录成功
6.6、golang相关资源包
需要git的资源包: https://github.com/jackc/pgx https://github.com/pkg/errors
