1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { defer func() { c.mu.RLock() committed := c.mu.committed undetermined := c.mu.undeterminedErr != nil c.mu.RUnlock() if !committed && !undetermined { c.cleanWg.Add(1) go func() { cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey)) err := c.cleanupKeys(NewBackoffer(cleanupKeysCtx, cleanupMaxBackoff).WithVars(c.txn.vars), c.keys) if err != nil { logutil.Logger(ctx).Info("2PC cleanup failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) } else { logutil.Logger(ctx).Info("2PC clean up done", zap.Uint64("txnStartTS", c.startTS)) } c.cleanWg.Done() }() } c.txn.commitTS = c.commitTS }()
prewriteBo := NewBackoffer(ctx, PrewriteMaxBackoff).WithVars(c.txn.vars) err = c.prewriteKeys(prewriteBo, c.keys) if err != nil { logutil.Logger(ctx).Debug("2PC failed on prewrite", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return errors.Trace(err) }
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars)) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return errors.Trace(err) }
if commitTS <= c.startTS { err = errors.Errorf("conn %d Invalid transaction tso with txnStartTS=%v while txnCommitTS=%v", c.connID, c.startTS, commitTS) logutil.BgLogger().Error("invalid transaction", zap.Error(err)) return errors.Trace(err) } c.commitTS = commitTS if err = c.checkSchemaValid(); err != nil { return errors.Trace(err) }
if c.store.oracle.IsExpired(c.startTS, kv.MaxTxnTimeUse) { err = errors.Errorf("conn %d txn takes too much time, txnStartTS: %d, comm: %d", c.connID, c.startTS, c.commitTS) return err }
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) err = c.commitKeys(commitBo, c.keys) if err != nil { if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { logutil.Logger(ctx).Error("2PC commit result undetermined", zap.Error(err), zap.NamedError("rpcErr", undeterminedErr), zap.Uint64("txnStartTS", c.startTS)) err = errors.Trace(terror.ErrResultUndetermined) } if !c.mu.committed { logutil.Logger(ctx).Debug("2PC failed on commit", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return errors.Trace(err) } logutil.Logger(ctx).Debug("got some exceptions, but 2PC was still successful", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) } return nil }
|