diff --git a/solver/edge.go b/solver/edge.go index c9e7e3fca43f..27aa54ec0bda 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -329,17 +329,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool { // incoming requests func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) { // process all incoming changes - depChanged := false - for _, upt := range updates { - if changed := e.processUpdate(upt); changed { - depChanged = true - } - } - - if depChanged { - // the dep responses had changes. need to reevaluate edge state - e.recalcCurrentState() - } + e.processUpdates(updates) desiredState, done := e.respondToIncoming(incoming, allPipes) if done { @@ -387,156 +377,45 @@ func (e *edge) markFailed(f *pipeFactory, err error) { }) } +func (e *edge) processUpdates(updates []pipe.Receiver[*edgeRequest, any]) { + depChanged := false + for _, upt := range updates { + depChanged = e.processUpdate(upt) || depChanged + } + + if depChanged { + e.recalcCurrentState() + } +} + // processUpdate is called by unpark for every updated pipe request func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) { // response for cachemap request if upt == e.cacheMapReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil { - e.cacheMapReq = nil - if !upt.Status().Canceled && e.err == nil { - e.err = err - } - } else { - resp := upt.Status().Value.(*cacheMapResp) - e.cacheMap = resp.CacheMap - e.cacheMapDone = resp.complete - e.cacheMapIndex++ - if len(e.deps) == 0 { - e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest) - if !e.op.IgnoreCache() { - keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) - if err != nil { - bklog.G(context.TODO()).Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error - } else { - for _, k := range keys { - k.vtx = e.edge.Vertex.Digest() - records, err := e.op.Cache().Records(context.Background(), k) - if err != nil { - bklog.G(context.TODO()).Errorf("error receiving cache records: %v", err) - continue - } - - for _, r := range records { - e.cacheRecords[r.ID] = r - } - - e.keys = append(e.keys, e.makeExportable(k, records)) - } - } - } - e.state = edgeStatusCacheSlow - } - if e.allDepsHaveKeys(false) { - e.keysDidChange = true - } - // probe keys that were loaded before cache map - for i, dep := range e.deps { - e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector)) - e.checkDepMatchPossible(dep) - } - if !e.cacheMapDone { - e.cacheMapReq = nil - } - } + e.processCacheMapReq() return true } // response for exec request if upt == e.execReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil { - e.execReq = nil - if e.execCacheLoad { - for k := range e.cacheRecordsLoaded { - delete(e.cacheRecords, k) - } - } else if !upt.Status().Canceled && e.err == nil { - e.err = err - } - } else { - e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult)) - e.state = edgeStatusComplete - } + e.processExecReq() return true } // response for requests to dependencies if dep, ok := e.depRequests[upt]; ok { - if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil { - if e.err == nil { - e.err = err - } - dep.err = err - } - - if upt.Status().Value == nil { - return - } - state, isEdgeState := upt.Status().Value.(*edgeState) - if !isEdgeState { - bklog.G(context.TODO()).Warnf("invalid edgeState value for update: %T", state) - return - } - - if len(dep.keys) < len(state.keys) { - newKeys := state.keys[len(dep.keys):] - if e.cacheMap != nil { - e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector)) - dep.edgeState.keys = state.keys - if e.allDepsHaveKeys(false) { - e.keysDidChange = true - } - } - depChanged = true - } - if dep.state != edgeStatusComplete && state.state == edgeStatusComplete { - e.keysDidChange = true - } - - recheck := state.state != dep.state - - dep.edgeState = *state - - if recheck && e.cacheMap != nil { - e.checkDepMatchPossible(dep) - depChanged = true - } - - return + return e.processDepReq(dep) } // response for result based cache function for i, dep := range e.deps { if upt == dep.slowCacheReq && upt.Status().Completed { - if err := upt.Status().Err; err != nil { - dep.slowCacheReq = nil - if !upt.Status().Canceled && e.err == nil { - e.err = upt.Status().Err - } - } else if !dep.slowCacheComplete { - dgst := upt.Status().Value.(digest.Digest) - if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" { - k := NewCacheKey(dgst, "", -1) - dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} - slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} - defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) - for _, dk := range dep.result.CacheKeys() { - defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector}) - } - dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) - - // connect def key to slow key - e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) - } - - dep.slowCacheComplete = true - e.keysDidChange = true - e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true - } + e.processDepSlowCacheReq(i, dep) return true } } - return + return false } // recalcCurrentState is called by unpark to recompute internal state after @@ -689,6 +568,151 @@ func (e *edge) recalcCurrentState() { } } +func (e *edge) processCacheMapReq() { + upt := e.cacheMapReq + if err := upt.Status().Err; err != nil { + e.cacheMapReq = nil + if !upt.Status().Canceled && e.err == nil { + e.err = err + } + return + } + + resp := upt.Status().Value.(*cacheMapResp) + e.cacheMap = resp.CacheMap + e.cacheMapDone = resp.complete + e.cacheMapIndex++ + if len(e.deps) == 0 { + e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest) + if !e.op.IgnoreCache() { + keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) + if err != nil { + bklog.G(context.TODO()).Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error + } else { + for _, k := range keys { + k.vtx = e.edge.Vertex.Digest() + records, err := e.op.Cache().Records(context.Background(), k) + if err != nil { + bklog.G(context.TODO()).Errorf("error receiving cache records: %v", err) + continue + } + + for _, r := range records { + e.cacheRecords[r.ID] = r + } + + e.keys = append(e.keys, e.makeExportable(k, records)) + } + } + } + e.state = edgeStatusCacheSlow + } + if e.allDepsHaveKeys(false) { + e.keysDidChange = true + } + // probe keys that were loaded before cache map + for i, dep := range e.deps { + e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector)) + e.checkDepMatchPossible(dep) + } + if !e.cacheMapDone { + e.cacheMapReq = nil + } +} + +func (e *edge) processExecReq() { + upt := e.execReq + if err := upt.Status().Err; err != nil { + e.execReq = nil + if e.execCacheLoad { + for k := range e.cacheRecordsLoaded { + delete(e.cacheRecords, k) + } + } else if !upt.Status().Canceled && e.err == nil { + e.err = err + } + return + } + + e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult)) + e.state = edgeStatusComplete +} + +func (e *edge) processDepReq(dep *dep) (depChanged bool) { + upt := dep.req + if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil { + if e.err == nil { + e.err = err + } + dep.err = err + } + + if upt.Status().Value == nil { + return false + } + + state, isEdgeState := upt.Status().Value.(*edgeState) + if !isEdgeState { + bklog.G(context.TODO()).Warnf("invalid edgeState value for update: %T", state) + return false + } + + if len(dep.keys) < len(state.keys) { + newKeys := state.keys[len(dep.keys):] + if e.cacheMap != nil { + e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector)) + dep.edgeState.keys = state.keys + if e.allDepsHaveKeys(false) { + e.keysDidChange = true + } + } + depChanged = true + } + + if dep.state != edgeStatusComplete && state.state == edgeStatusComplete { + e.keysDidChange = true + } + + recheck := state.state != dep.state + + dep.edgeState = *state + + if recheck && e.cacheMap != nil { + e.checkDepMatchPossible(dep) + depChanged = true + } + return depChanged +} + +func (e *edge) processDepSlowCacheReq(index int, dep *dep) { + upt := dep.slowCacheReq + if err := upt.Status().Err; err != nil { + dep.slowCacheReq = nil + if !upt.Status().Canceled && e.err == nil { + e.err = upt.Status().Err + } + } else if !dep.slowCacheComplete { + dgst := upt.Status().Value.(digest.Digest) + if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" { + k := NewCacheKey(dgst, "", -1) + dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} + slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} + defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) + for _, dk := range dep.result.CacheKeys() { + defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[index].Selector}) + } + dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) + + // connect def key to slow key + e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) + } + + dep.slowCacheComplete = true + e.keysDidChange = true + e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true + } +} + // respondToIncoming responds to all incoming requests. completing or // updating them when possible func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) { @@ -768,89 +792,119 @@ func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) // createInputRequests creates new requests for dependencies or async functions // that need to complete to continue processing the edge -func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) bool { - addedNew := false - +func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) (addedNew bool) { // initialize deps state - if e.deps == nil { - e.depRequests = make(map[pipeReceiver]*dep) - e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) - for i := range e.edge.Vertex.Inputs() { - e.deps = append(e.deps, newDep(Index(i))) - } - } + e.ensureDepsInitialized() // cycle all dependencies. set up outgoing requests if needed for _, dep := range e.deps { - desiredStateDep := dep.state - - if e.noCacheMatchPossible || force { - desiredStateDep = edgeStatusComplete - } else if dep.state == edgeStatusInitial && desiredState > dep.state { - desiredStateDep = edgeStatusCacheFast - } else if dep.state == edgeStatusCacheFast && desiredState > dep.state { - // wait all deps to complete cache fast before continuing with slow cache - if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) { - if !e.skipPhase2FastCache(dep) && e.cacheMap != nil { - desiredStateDep = edgeStatusCacheSlow - } - } - } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete { - // if all deps have completed cache-slow or content based cache for input is available - if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) { - if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { - desiredStateDep = edgeStatusComplete - } - } - } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow { - if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { - desiredStateDep = edgeStatusComplete - } - } + desiredStateDep := e.desiredStateDep(dep, desiredState, force) // outgoing request is needed if dep.state < desiredStateDep { - addNew := true - if dep.req != nil && !dep.req.Status().Completed { - if dep.req.Request().desiredState != desiredStateDep { - debugSchedulerCancelInputRequest(e, dep, desiredStateDep) - dep.req.Cancel() - } else { - debugSchedulerSkipInputRequestBasedOnExistingRequest(e, dep, desiredStateDep) - addNew = false - } - } - if addNew { - debugSchedulerAddInputRequest(e, dep, desiredStateDep) - req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{ - currentState: dep.edgeState, - desiredState: desiredStateDep, - currentKeys: len(dep.keys), - }) - e.depRequests[req] = dep - dep.req = req - addedNew = true - } + addedNew = e.createOutgoingRequest(dep, desiredStateDep, f) || addedNew } else { debugSchedulerSkipInputRequestBasedOnDepState(e, dep, desiredStateDep) } + // initialize function to compute cache key based on dependency result - if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil { - pfn := e.preprocessFunc(dep) - fn := e.slowCacheFunc(dep) - res := dep.result - func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) { - dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { - v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res) - return v, errors.Wrap(err, "failed to compute cache key") - }) - }(pfn, fn, res, dep.index) - addedNew = true - } + addedNew = e.computeCacheKeyFromDep(dep, f) || addedNew } return addedNew } +func (e *edge) ensureDepsInitialized() { + if e.deps != nil { + return + } + + e.depRequests = make(map[pipeReceiver]*dep) + e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) + for i := range e.edge.Vertex.Inputs() { + e.deps = append(e.deps, newDep(Index(i))) + } +} + +func (e *edge) desiredStateDep(dep *dep, desiredState edgeStatusType, force bool) edgeStatusType { + if e.noCacheMatchPossible || force { + return edgeStatusComplete + } + + if dep.state == edgeStatusInitial && desiredState > dep.state { + return edgeStatusCacheFast + } + + if dep.state == edgeStatusCacheFast && desiredState > dep.state { + // wait all deps to complete cache fast before continuing with slow cache + if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) { + if !e.skipPhase2FastCache(dep) && e.cacheMap != nil { + return edgeStatusCacheSlow + } + } + return dep.state + } + + if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete { + // if all deps have completed cache-slow or content based cache for input is available + if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) { + if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { + return edgeStatusComplete + } + } + return dep.state + } + + if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow { + if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { + return edgeStatusComplete + } + return dep.state + } + + return dep.state +} + +func (e *edge) createOutgoingRequest(dep *dep, desiredStateDep edgeStatusType, f *pipeFactory) (addedNew bool) { + if dep.req != nil && !dep.req.Status().Completed { + if dep.req.Request().desiredState == desiredStateDep { + debugSchedulerSkipInputRequestBasedOnExistingRequest(e, dep, desiredStateDep) + return false + } + debugSchedulerCancelInputRequest(e, dep, desiredStateDep) + dep.req.Cancel() + } + + debugSchedulerAddInputRequest(e, dep, desiredStateDep) + req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{ + currentState: dep.edgeState, + desiredState: desiredStateDep, + currentKeys: len(dep.keys), + }) + e.depRequests[req] = dep + dep.req = req + return true +} + +func (e *edge) computeCacheKeyFromDep(dep *dep, f *pipeFactory) (addedNew bool) { + if dep.state != edgeStatusComplete || dep.slowCacheReq != nil || e.cacheMap == nil { + return false + } + + pfn := e.preprocessFunc(dep) + fn := e.slowCacheFunc(dep) + if pfn == nil && fn == nil { + return false + } + + res := dep.result + index := dep.index + dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { + v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res) + return v, errors.Wrap(err, "failed to compute cache key") + }) + return true +} + // execIfPossible creates a request for getting the edge result if there is // enough state func (e *edge) execIfPossible(f *pipeFactory) bool {