2020年3月2日 星期一

如何對sql 查詢select不重覆的資料

不論concurrency併發 或 parallelism平行,  coroutine協程、multi-thread線程 或multi-process行程, 甚至corss-platform跨平台(可視為多行程),的向同一資料表以相同條件query select 不重複資料。也就是單一應用情境取出不重複資料。

案例:將大量資料取出並加以運算後傳到其它系統。

解決方式,有二個關鍵。
一是不論多協程,多線程或多行程,都必需加上lock,以確保不會有同時query的狀況。
二是,查詢出來後加上標記,以確保不會被其它查詢取出。

lock:
1.若只是multi-thread的程式,可以採用程式用mutex lock再加上一些技巧來完成。
2.但是若是multi-process 甚至corss-platform的多程式,就要使用database的lock機制。此例,建議採用user lock(mysql: select get_lock), app resource lock(mssql: sp_getapplock)。不建議採用row lock或table lock,這會導致其它應用也無法取得此資料。

加上已讀取標記
1.multi-thread 或 coroutine (goroutine):
程式只會在一個平台執行單一行程(process),而行程中可以多執行緒併發執行。而此標記可以做在程式裡,以減少db io,提供執行效率。
2.若是multi-process行程 或corss-platform跨平台,就只能在每次查詢出來後,對每一筆update加上標記。若考慮執行效率,可以採用redis記錄讀取標記,以減少db io。

mysql user lock

    defer func() {
        if tx != nil {
            if getLock == 1 && releaseLock == 0 {
                releaseLock, _ = tx.ReleaseSessionLock(SESSION_LOCK_KEY)
            }
            if err != nil {
                if errRollback := tx.Rollback(); errRollback != nil {
                    err = errors.New(err.Error() + "\n" + errRollback.Error())
                }
            }
        }
    }()
    if tx, err = dao.Begin(); err == nil {
        if getLock, err = tx.GetSessionLock(SESSION_LOCK_KEY, mysqlLockTimeout); err == nil && getLock == 1 {
            myJobs.IncWorkingJob()
            defer myJobs.DecWorkingJob()
            if datas, err = tx.GetData(procLimit, realmLocate); err == nil && kwds != nil {
                if releaseLock, err = tx.ReleaseSessionLock(SESSION_LOCK_KEY); err == nil {
                    rIDs, nIDs := DataProcess(datas)
                    if rIDs != "" { //完成
                        removeIdx(rIDs)
                    }
                    if err == nil && nIDs != "" { //未完成
                        removeIdx(nIDs)
                    }
                }
            }
        }
        if err == nil {
            if getLock == 1 && releaseLock == 0 {
                releaseLock, _ = tx.ReleaseSessionLock(SESSION_LOCK_KEY)
            }
            err = tx.Commit()
        }
    }


func (t *MyTx) GetData(l, r int) (datas []Data, err error) {
    sqlStr := "SELECT idx, f1, f2 FROM " + dataTb + " WHERE dt is null"
    if lenIdx() > 0 {
        sqlStr += " AND idx not in " + whereIdx()
    }
    sqlStr += " ORDER BY RAND() LIMIT ?"
    if rows, e := t.Query(sqlStr, l); e != nil {
        errLog.Println(tools.GoID(), dataTb, e)
        err = e
    } else {
        for rows.Next() {
            data := Data{}
            if err = rows.Scan(tools.Strut2Slice(&data)...); err == nil {
                kwds = append(datas, data)
                addIdx(strconv.FormatUint(data.IDx, 10))
            } else {
                errLog.Println(tools.GoID(), dataTb, err)
            }
        }
    }
    return
}
func whereIdx() (s string) {
    selectedIdxLock.Lock()
    defer selectedIdxLock.Unlock()
    s = ""
    for k := range selectedIdx {
        s += k + ","
    }
    s = strings.Trim(s, ",")
    if s != "" {
        s = "(" + s + ")"
    }
    return
}
func lenIdx() int {
    selectedIdxLock.Lock()
    defer selectedIdxLock.Unlock()
    return len(selectedIdx)
}
func addIdx(s string) {
    selectedIdxLock.Lock()
    defer selectedIdxLock.Unlock()
    selectedIdx[s] = true
}
func resetIdx() {
    selectedIdxLock.Lock()
    defer selectedIdxLock.Unlock()
    selectedIdx = make(map[string]bool)
}
func removeIdx(s string) {
    selectedIdxLock.Lock()
    defer selectedIdxLock.Unlock()
    if s != "" {
        ids := strings.Split(s, ",")
        for _, id := range ids {
            delete(selectedIdx, id)
        }
    }
}

沒有留言:

張貼留言