案例:將大量資料取出並加以運算後傳到其它系統。
解決方式,有二個關鍵。
一是不論多協程,多線程或多行程,都必需加上lock,以確保不會有同時query的狀況。
二是,查詢出來後加上標記,以確保不會被其它查詢取出。
lock:
1.若只是multi-thread的程式,可以採用程式用mutex lock再加上一些技巧來完成。
2.但是若是multi-process 甚至corss-platform的多程式,就要使用database的lock機制。此例,建議採用user lock(mysql: select get_lock), app resource lock(mssql: sp_getapplock)。不建議採用row lock或table lock,這會導致其它應用也無法取得此資料。
加上已讀取標記
1.multi-thread 或 coroutine (goroutine):
程式只會在一個平台執行單一行程(process),而行程中可以多執行緒併發執行。而此標記可以做在程式裡,以減少db io,提供執行效率。
2.若是multi-process行程 或corss-platform跨平台,就只能在每次查詢出來後,對每一筆update加上標記。若考慮執行效率,可以採用redis記錄讀取標記,以減少db io。
mysql user lock
defer func() {
if tx != nil {
if getLock == 1 && releaseLock == 0 {
releaseLock, _ = tx.ReleaseSessionLock(SESSION_LOCK_KEY)
}
if err != nil {
if errRollback := tx.Rollback(); errRollback != nil {
err = errors.New(err.Error() + "\n" + errRollback.Error())
}
}
}
}()
if tx, err = dao.Begin(); err == nil {
if getLock, err = tx.GetSessionLock(SESSION_LOCK_KEY, mysqlLockTimeout); err == nil && getLock == 1 {
myJobs.IncWorkingJob()
defer myJobs.DecWorkingJob()
if datas, err = tx.GetData(procLimit, realmLocate); err == nil && kwds != nil {
if releaseLock, err = tx.ReleaseSessionLock(SESSION_LOCK_KEY); err == nil {
rIDs, nIDs := DataProcess(datas)
if rIDs != "" { //完成
removeIdx(rIDs)
}
if err == nil && nIDs != "" { //未完成
removeIdx(nIDs)
}
}
}
}
if err == nil {
if getLock == 1 && releaseLock == 0 {
releaseLock, _ = tx.ReleaseSessionLock(SESSION_LOCK_KEY)
}
err = tx.Commit()
}
}
func (t *MyTx) GetData(l, r int) (datas []Data, err error) {
sqlStr := "SELECT idx, f1, f2 FROM " + dataTb + " WHERE dt is null"
if lenIdx() > 0 {
sqlStr += " AND idx not in " + whereIdx()
}
sqlStr += " ORDER BY RAND() LIMIT ?"
if rows, e := t.Query(sqlStr, l); e != nil {
errLog.Println(tools.GoID(), dataTb, e)
err = e
} else {
for rows.Next() {
data := Data{}
if err = rows.Scan(tools.Strut2Slice(&data)...); err == nil {
kwds = append(datas, data)
addIdx(strconv.FormatUint(data.IDx, 10))
} else {
errLog.Println(tools.GoID(), dataTb, err)
}
}
}
return
}
func whereIdx() (s string) {
selectedIdxLock.Lock()
defer selectedIdxLock.Unlock()
s = ""
for k := range selectedIdx {
s += k + ","
}
s = strings.Trim(s, ",")
if s != "" {
s = "(" + s + ")"
}
return
}
func lenIdx() int {
selectedIdxLock.Lock()
defer selectedIdxLock.Unlock()
return len(selectedIdx)
}
func addIdx(s string) {
selectedIdxLock.Lock()
defer selectedIdxLock.Unlock()
selectedIdx[s] = true
}
func resetIdx() {
selectedIdxLock.Lock()
defer selectedIdxLock.Unlock()
selectedIdx = make(map[string]bool)
}
func removeIdx(s string) {
selectedIdxLock.Lock()
defer selectedIdxLock.Unlock()
if s != "" {
ids := strings.Split(s, ",")
for _, id := range ids {
delete(selectedIdx, id)
}
}
}
沒有留言:
張貼留言