Commit 61669d74 authored by Upliner's avatar Upliner

Transfer mysql->tarantool

parent f4815953
package main package main
import ( import (
"encoding/hex"
"fmt" "fmt"
"github.com/satori/go.uuid"
"github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool"
"math/rand"
"time" "time"
"log" "log"
"strings" "strings"
"sync"
"os" "os"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"encoding/json"
) )
func bin2hex(bin []byte) string {
hexData := make([]byte, len(bin)*2)
hex.Encode(hexData, bin)
return string(hexData)
}
func newVid() string {
u, err := uuid.NewV4()
if err != nil {
panic(err)
}
return bin2hex(u[:])
}
func queryItems(conn *tarantool.Connection) []string {
result, err := conn.Select("visitorid_matching", "primary", 1000000, 500000, tarantool.IterAll, []interface{}{})
if err != nil {
panic(err)
}
var items []string
for _, row := range result.Data {
rowa := row.([]interface{})
items = append(items, rowa[2].(string))
}
return items
}
func genItems(items []string, cnt int) []string {
result := make([]string, cnt)
for i := range result {
if rand.Intn(2) == 0 {
result[i] = items[rand.Intn(len(items))]
} else {
result[i] = newVid()
}
}
return result
}
func GetTTConnn(url string) *tarantool.Connection { func GetTTConnn(url string) *tarantool.Connection {
opts := tarantool.Opts{ opts := tarantool.Opts{
Reconnect: time.Second * 10, Reconnect: time.Second * 10,
...@@ -72,6 +34,18 @@ func GetTTConnn(url string) *tarantool.Connection { ...@@ -72,6 +34,18 @@ func GetTTConnn(url string) *tarantool.Connection {
} }
return ttConn return ttConn
} }
func GetMySQLConn() *sqlx.DB {
msConn, err := sqlx.Open("mysql", os.Getenv("MYSQL_URL"))
if err == nil {
err = msConn.Ping()
}
if err != nil {
fmt.Println("MySQL connection error:", err)
return nil
}
return msConn
}
func main() { func main() {
if len(os.Args) < 2 { if len(os.Args) < 2 {
...@@ -79,103 +53,39 @@ func main() { ...@@ -79,103 +53,39 @@ func main() {
} }
conn := GetTTConnn(os.Args[1]) conn := GetTTConnn(os.Args[1])
if conn == nil { if conn == nil {
log.Fatal("connection error") log.Fatal("Tarantool connection error")
return
} }
var tt float64 ms := GetMySQLConn()
var tmx float64 if ms == nil {
existingVids := queryItems(conn) log.Fatal("MySQL connection error")
fmt.Println("Items queried")
cnt := 1000000
fmt.Println("Running writer")
go func() {
conn := GetTTConnn(os.Args[1])
fchan := make(chan *tarantool.Future, 100000)
go func() {
for {
fchan <- conn.InsertAsync("visitorid_matching", []interface{}{"host1", "host2", newVid(), newVid()})
time.Sleep(time.Microsecond* 100)
}
close(fchan)
}()
for f := range fchan {
_, err := f.Get()
if err != nil {
log.Fatal(err)
}
}
}()
fmt.Println("Item count",cnt)
items := genItems(existingVids, cnt)
for _, item := range items {
t1 := time.Now()
_, err := conn.Select("visitorid_matching", "primary", 0, 1, tarantool.IterEq, []interface{}{"host1", "host2", item})
if err != nil {
log.Fatal(err)
}
tm := time.Now().Sub(t1).Seconds()
if tm > tmx {
tmx = tm
}
tt += tm
} }
fmt.Println("sync max =", tmx*1000)
fmt.Println("sync avg =", tt*1000/float64(len(items)))
items = genItems(existingVids, cnt) fchan := make(chan *tarantool.Future, 100000)
tt = 0
tmx = 0
var trmx,trsum float64
type asyncItem struct {
T time.Time
F *tarantool.Future
}
fchan := make(chan asyncItem, 100000)
t1 := time.Now()
go func() { go func() {
for _, item := range items { var rows []struct {
fchan <- asyncItem{time.Now(),conn.SelectAsync("visitorid_matching", "primary", 0, 1, tarantool.IterEq, []interface{}{"host1", "host2", item})} ID uint32 `db:"id" json:"-"`
CatID uint32 `json:"-"`
URL string `json:"url"`
Title string `json:"name"`
Price float64 `json:"price"`
Brand string `json:"vendor"`
ImageURL float64 `json:"picture"`
OldPrice float64 `json:"oldprice"`
CurrencyID string `json:"currencyId"`
Description string `json:"description"`
}
ms.Select(&rows,"SELECT ID,CatID,URL,Title,Price,Brand,ImageURL,OldPrice,CurrencyID,Description from goods")
fmt.Println("Got",len(rows), "rows")
for _, row := range rows{
bs, _ := json.Marshal(row)
fchan <- conn.InsertAsync("retarg_goods", []interface{}{row.ID, row.CatID,string(bs)})
} }
close(fchan) close(fchan)
}() }()
t2 := time.Now() for f := range fchan {
for item := range fchan { _, err := f.Get()
item.F.Get() if err != nil {
t3 := time.Now() log.Fatal(err)
tm := t3.Sub(t2).Seconds()
if tm > tmx {
tmx = tm
}
t2 = t3
tm = t3.Sub(item.T).Seconds()
trsum += tm
if tm > trmx {
trmx = tm
} }
} }
fmt.Println("async max =", tmx*1000)
fmt.Println("async avg =", time.Now().Sub(t1).Seconds()*1000/float64(len(items)))
tt = 0
tmx = 0
items = genItems(existingVids, cnt)
var mu sync.Mutex
for _, item := range items {
go func() {
t1 := time.Now()
_, err := conn.Select("visitorid_matching", "primary", 0, 1, tarantool.IterEq, []interface{}{"host1", "host2", item})
if err != nil {
log.Fatal(err)
}
tm := time.Now().Sub(t1).Seconds()
mu.Lock()
if tm > tmx {
tmx = tm
}
tt += tm
mu.Unlock()
}()
time.Sleep(time.Microsecond * 100)
}
fmt.Println("mixed max =", tmx*1000)
fmt.Println("mixed avg =", tt*1000/float64(len(items)))
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment