Home  >  Q&A  >  body text

golang - go 语言中mysql操作200万数据时应该如何写?

在写一个将 discuzx 的 post 数据的 bbcode 转换成 html 的功能。
但是转换过程中,越到后面,越卡了。
本来想学学并发的,无奈不会啊。。。太菜了。
注释掉的是想要弄的。。。

求个解决方案。代码如下。

package main

import (
    "database/sql"
    "fmt"
    "github.com/frustra/bbcode"
    _ "github.com/go-sql-driver/mysql"
    //"time"
    //"runtime"
    "strconv"
)

const (
    XnFromPost = "bbs_post"
    XnTopost   = "bbs_post"
    DxFromPost = "pre_forum_post"
)

type Hostinfo struct {
    DBUser,
    DBPassword,
    DBname,
    DBHost,
    DBPort,
    DBChar string
}

type post struct {
    pid     int
    message string
}

var OldDB, NewDB *sql.DB

/**
  设置数据库,并连接数据库
*/
func SetDB() (olddata, newdata *sql.DB) {
    var localhost, remotehost Hostinfo

    localhost = Hostinfo{
        "root",
        "123456",
        "xiuno",
        "",
        "3306",
        "utf8",
    }

    remotehost = Hostinfo{
        "root",
        "123456",
        "gxvtc",
        "",
        "3306",
        "utf8",
    }

    olddata, _ = connMysql(&localhost)
    newdata, _ = connMysql(&remotehost)

    return olddata, newdata
}

/**
连接数据库
*/
func connMysql(host *Hostinfo) (*sql.DB, error) {
    //db, err := sql.Open("mysql", "root:123456@/"+dbs+"?charset=utf8")
    if host.DBHost != "" {
        host.DBHost = "tcp(" + host.DBHost + ":" + host.DBPort + ")"
        fmt.Println(host.DBHost)
    }

    db, err := sql.Open("mysql", host.DBUser+":"+host.DBPassword+"@"+host.DBHost+"/"+host.DBname+"?charset="+host.DBChar)
    return db, err
}

func main() {
    OldDB, NewDB = SetDB()
    defer OldDB.Close()
    defer NewDB.Close()
    updatePost()
}

func updatePost() {
    const total = 100

    //selectPost := "SELECT pid,message FROM " + XnFromPost + " ORDER BY pid ASC LIMIT 10"
    selectPost := "SELECT pid,message FROM " + DxFromPost + " ORDER BY pid ASC "
    /*
        Data, err := OldDB.Query(selectPost)
        if err != nil {
            fmt.Println(err.Error())
        }*/
    fmt.Println(selectPost)

    updatePost := "UPDATE " + XnTopost + " SET message = ? WHERE pid = ? limit 1"
    fmt.Println(updatePost)
    stmt, err := OldDB.Prepare(updatePost)
    //insertPost := "INSERT INTO " + XnTopost + " (message,pid) VALUES (?,?)"
    //fmt.Println(insertPost)
    //stmt, err := OldDB.Prepare(insertPost)

    if err != nil {
        fmt.Println(err.Error())
    }

    //mypost := make(map[int]post)

    i := 0
    for {
        tmpNum := i * total

        i++
        tmpSQL := selectPost + " LIMIT " + strconv.Itoa(tmpNum) + "," + strconv.Itoa(total)
        fmt.Println(tmpSQL)
        Data, err := NewDB.Query(tmpSQL)
        //Data, err := NewDB.Query(tmpSQL)
        if err != nil {
            fmt.Println(err.Error())
        }

        tag := false
        //使用并发的方式
        for Data.Next() {
            tag = true
            var pid int
            var msg string
            Data.Scan(&pid, &msg)
            fmt.Println(pid)

            //bbcode转码html
            compiler := bbcode.NewCompiler(true, true)
            msg = compiler.Compile(msg)
            //mypost[pid] = post{pid, msg}
            _, err = stmt.Exec(msg, pid)
            if err != nil {
                fmt.Println("pid: ", pid, err.Error())
            }
        }

        if tag == false {
            fmt.Println("没有数据了...")
            break
        }

    }

    /*
       //直接查找并更新
           for Data.Next() {
               var pid int
               var msg string
               Data.Scan(&pid, &msg)

               //bbcode转码html
               compiler := bbcode.NewCompiler(true, true)
               msg = compiler.Compile(msg)

               mypost[pid] = post{pid, msg}
               //fmt.Println(mypost)
               fmt.Println(pid)

               _, err = stmt.Exec(msg, pid)
               if err != nil {
                   fmt.Println("pid: ", pid, err.Error())
               }
           }
    */

    /*
            //使用并发的方式
            for Data.Next() {
                var pid int
                var msg string
                Data.Scan(&pid, &msg)

                //bbcode转码html
                compiler := bbcode.NewCompiler(true, true)
                msg = compiler.Compile(msg)
                mypost[pid] = post{pid, msg}
            }

        runtime.GOMAXPROCS(runtime.NumCPU())
        c := make(chan post)
        for _, v := range mypost {
            go ShowMsg(c, v)
            /*
                go func() {
                    fmt.Println(v.pid)
                    c <- v.pid
                }()
                <-c
            //time.Sleep(2 * time.Second)
            //fmt.Println(v.pid)
        }

        for _, _ = range mypost {
            fmt.Println(&c)
            <-c
        } */

}

func ShowMsg(c chan post, mypost post) {
    //fmt.Println(mypost.pid)
    c <- mypost
}
天蓬老师天蓬老师2743 days ago695

reply all(3)I'll reply

  • PHP中文网

    PHP中文网2017-04-17 13:57:14

    Since I really don’t have the patience to read your code, and you still have to practice many things by yourself, here I will briefly talk about my plan, hoping to give you some direction; 当然我也可能有理解错误,如果谁发现错误,请及时告知.

    First explain some important concepts and prerequisite knowledge:

    • Golang’s coroutines correspond to actual operating system threads, and routines are independent

    • After calling a function using the go keyword, another new thread is just created

    • After specifying the cpu number, go can actually use multi-core CPUs, and goroutine can actually be scheduled in parallel;

    • Using an unbuffered channel is like a handshake, there must be writing and reading at the same time before execution can continue

    • The channel needs to be closed after use, otherwise there will be memory leaks

    • Channel can be read using range

    Okay, the above is a bit boring. Here I will start to explain to you the issues you need to pay attention to based on my code:

    /* 正确创建合适数量的goroutine */
    
    #建议创建routine的方式类似这样, 根据CPU数量创建合适的线程
    runtime.GOMAXPROCS(runtime.NumCPU())
    for i:=0; i < runtime.NumCPU()+2; i++ {
        go func(id int) {
            fmt.Printf("new routine id(%d)\n", i)
        }(i)
    }
    
    #你的创建方式是错误的, 根据你的数据数量创建等数量的线程, 每个线程只处理一个数据
    for _, v := range mypost {
        go ShowMsg(c, v)
    }

    Okay, after creating the appropriate threads, we start to analyze the execution of these threads. The thread (routine) where the above for loop code is located can be called main routine, because it is the first routine; go calls it later The anonymous functions execute the internal logic concurrently, main routine现在不知道哪个routine先执行完毕, 也就更不知道它们合适全部执行完毕, as a qualified programmer, you must consider the order between them;

    import "sync"
    /* 正确管理routine的生命周期, 防止死锁 */
    var barrier sync.WaitGroup /* 使用WaitGroup管理自己启动的多个线程 */
    
    for i:=0; i < runtime.NumCPU()+2; i++ {
        barrier.Add(1)      /* 总数加1 */
        go func(id int) {
            fmt.Printf("new routine id(%d)\n", i)
            barrier.Done()  /* 总数减1 */
        }(i)
    }
    
    go func() {
        barrier.Wait() /* 等待上面的所有routine结束(总数变为0) */
        #由于Wait会卡死, 为了不影响main routine的执行, 这里将它单独放在一个routine中
    }()

    OK, now that the routines are all under control, I can consider how to make them work for me! What we have to do is to use the channel to pass the data in, and let the routine receive the data through the routine and process it concurrently.

    #假想的任务: 多线程执行1到100得加法!
    var barrier sync.WaitGroup /* 使用WaitGroup管理自己启动的多个线程 */
    cin  := make(chan int)
    cout := make(chan int)
    #因为cin无缓冲的(有缓冲也不可能容纳全部100个数据), 所以增加新的routine往其中写数据, 保证main routine不卡死, 可以调用起来后面的for循环, 消耗数据.
    go func() {
        for i:=1; i <= 100; i++ {
            cin<-i
        }
        Close(cin)
    }
    
    for i:=0; i < runtime.NumCPU()+2; i++ {
        barrier.Add(1)      /* 总数加1 */
        go func() {
            sum := 0
            # for循环, 每个routine处理cin中读取的多个(不是全部100个)数据处理
            for x := range cin {
                sum += x
            }
            cout<-sum
            barrier.Done()  /* 总数减1 */
        }()
    }
    
    go func() {
        #由于Wait会卡死, 为了不影响main routine,这里将它单独放在一个routine中.
        barrier.Wait() /* 等待上面的所有routine结束(总数变为0) */
        Close(cout) #全部运算routine都结束了, 再也不会写cout, 关闭channel
    }()
    
    var totalSum int := 0
    for s := cout { #main routine读取各个routine的结果,做最终的统计
        totalSum += s
    }
    fmt.Println(totalSum)

    Note: The above code has not been strictly tested, it is just to illustrate my plan, haha, I am so tired. Give me credit for your hard work...

    reply
    0
  • 高洛峰

    高洛峰2017-04-17 13:57:14

    To operate 2 million pieces of data, my approach is to create a thread pool, and then insert the data of the specified record each time. In this way, multiple threads working together can also relieve the pressure of database IO.

    reply
    0
  • 巴扎黑

    巴扎黑2017-04-17 13:57:14

    First make it clear whether this question is about database processing capabilities or how golang uses multi-cores for multi-concurrency?

    The main question is: Does it get slower as you go later? Where is the slowness? mysql? Or your go program?

    What is slow is golang’s use of CPU (impossible)? Or mysql hard disk IO?

    It is basically definitely your mysql hard disk IO card.

    reply
    0
  • Cancelreply