Skip to content

Commit

Permalink
refactor process request updates into their own functions
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
  • Loading branch information
jsternberg committed Oct 7, 2024
1 parent c3a67d4 commit 1ef471b
Showing 1 changed file with 150 additions and 127 deletions.
277 changes: 150 additions & 127 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,152 +392,30 @@ func (e *edge) processUpdates(updates []pipe.Receiver[*edgeRequest, any]) {
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
e.cacheMapReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
resp := upt.Status().Value.(*cacheMapResp)
e.cacheMap = resp.CacheMap
e.cacheMapDone = resp.complete
e.cacheMapIndex++
if len(e.deps) == 0 {
e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
if !e.op.IgnoreCache() {
keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
if err != nil {
bklog.G(context.TODO()).Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, k := range keys {
k.vtx = e.edge.Vertex.Digest()
records, err := e.op.Cache().Records(context.Background(), k)
if err != nil {
bklog.G(context.TODO()).Errorf("error receiving cache records: %v", err)
continue
}

for _, r := range records {
e.cacheRecords[r.ID] = r
}

e.keys = append(e.keys, e.makeExportable(k, records))
}
}
}
e.state = edgeStatusCacheSlow
}
if e.allDepsHaveKeys(false) {
e.keysDidChange = true
}
// probe keys that were loaded before cache map
for i, dep := range e.deps {
e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
e.checkDepMatchPossible(dep)
}
if !e.cacheMapDone {
e.cacheMapReq = nil
}
}
e.processCacheMapReq()
return true
}

// response for exec request
if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
e.execReq = nil
if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
e.state = edgeStatusComplete
}
e.processExecReq()
return true
}

// response for requests to dependencies
if dep, ok := e.depRequests[upt]; ok {
if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
if e.err == nil {
e.err = err
}
dep.err = err
}

if upt.Status().Value == nil {
return
}
state, isEdgeState := upt.Status().Value.(*edgeState)
if !isEdgeState {
bklog.G(context.TODO()).Warnf("invalid edgeState value for update: %T", state)
return
}

if len(dep.keys) < len(state.keys) {
newKeys := state.keys[len(dep.keys):]
if e.cacheMap != nil {
e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
dep.edgeState.keys = state.keys
if e.allDepsHaveKeys(false) {
e.keysDidChange = true
}
}
depChanged = true
}
if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
e.keysDidChange = true
}

recheck := state.state != dep.state

dep.edgeState = *state

if recheck && e.cacheMap != nil {
e.checkDepMatchPossible(dep)
depChanged = true
}

return
return e.processDepReq(dep)
}

// response for result based cache function
for i, dep := range e.deps {
if upt == dep.slowCacheReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
dep.slowCacheReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
dgst := upt.Status().Value.(digest.Digest)
if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
k := NewCacheKey(dgst, "", -1)
dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
for _, dk := range dep.result.CacheKeys() {
defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector})
}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})

// connect def key to slow key
e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
}

dep.slowCacheComplete = true
e.keysDidChange = true
e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
}
e.processDepSlowCacheReq(i, dep)
return true
}
}

return
return false
}

// recalcCurrentState is called by unpark to recompute internal state after
Expand Down Expand Up @@ -690,6 +568,151 @@ func (e *edge) recalcCurrentState() {
}
}

func (e *edge) processCacheMapReq() {
upt := e.cacheMapReq
if err := upt.Status().Err; err != nil {
e.cacheMapReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = err
}
return
}

resp := upt.Status().Value.(*cacheMapResp)
e.cacheMap = resp.CacheMap
e.cacheMapDone = resp.complete
e.cacheMapIndex++
if len(e.deps) == 0 {
e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
if !e.op.IgnoreCache() {
keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
if err != nil {
bklog.G(context.TODO()).Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, k := range keys {
k.vtx = e.edge.Vertex.Digest()
records, err := e.op.Cache().Records(context.Background(), k)
if err != nil {
bklog.G(context.TODO()).Errorf("error receiving cache records: %v", err)
continue
}

for _, r := range records {
e.cacheRecords[r.ID] = r
}

e.keys = append(e.keys, e.makeExportable(k, records))
}
}
}
e.state = edgeStatusCacheSlow
}
if e.allDepsHaveKeys(false) {
e.keysDidChange = true
}
// probe keys that were loaded before cache map
for i, dep := range e.deps {
e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
e.checkDepMatchPossible(dep)
}
if !e.cacheMapDone {
e.cacheMapReq = nil
}
}

func (e *edge) processExecReq() {
upt := e.execReq
if err := upt.Status().Err; err != nil {
e.execReq = nil
if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err
}
return
}

e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
e.state = edgeStatusComplete
}

func (e *edge) processDepReq(dep *dep) (depChanged bool) {
upt := dep.req
if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
if e.err == nil {
e.err = err
}
dep.err = err
}

if upt.Status().Value == nil {
return false
}

state, isEdgeState := upt.Status().Value.(*edgeState)
if !isEdgeState {
bklog.G(context.TODO()).Warnf("invalid edgeState value for update: %T", state)
return false
}

if len(dep.keys) < len(state.keys) {
newKeys := state.keys[len(dep.keys):]
if e.cacheMap != nil {
e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
dep.edgeState.keys = state.keys
if e.allDepsHaveKeys(false) {
e.keysDidChange = true
}
}
depChanged = true
}

if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
e.keysDidChange = true
}

recheck := state.state != dep.state

dep.edgeState = *state

if recheck && e.cacheMap != nil {
e.checkDepMatchPossible(dep)
depChanged = true
}
return depChanged
}

func (e *edge) processDepSlowCacheReq(index int, dep *dep) {
upt := dep.slowCacheReq
if err := upt.Status().Err; err != nil {
dep.slowCacheReq = nil
if !upt.Status().Canceled && e.err == nil {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
dgst := upt.Status().Value.(digest.Digest)
if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
k := NewCacheKey(dgst, "", -1)
dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
for _, dk := range dep.result.CacheKeys() {
defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[index].Selector})
}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})

// connect def key to slow key
e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
}

dep.slowCacheComplete = true
e.keysDidChange = true
e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
}
}

// respondToIncoming responds to all incoming requests. completing or
// updating them when possible
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
Expand Down

0 comments on commit 1ef471b

Please sign in to comment.