Approche fonctionnelle des transactions sur Scala ou écrivez votre propre monade utile

Si vous travaillez avec une seule base de données qui prend en charge les transactions, vous ne pensez même pas à la cohérence - la base de données fait tout pour vous. Si vous avez plusieurs bases de données, un système distribué ou même, par exemple, MongoDB jusqu'à la version 4, tout n'est pas si rose.


Prenons un exemple - nous voulons enregistrer le fichier dans le référentiel et ajouter un lien vers celui-ci dans deux documents. Bien sûr, nous voulons l'atomicité - soit le fichier est enregistré et ajouté aux documents, soit aucun (les IO des effets chats sont utilisés ci-après):


saveDataToFile(data) // (1)
  .flatMap { file =>
    addFileRef(documentId, file) // (2)
      .flatMap { result =>
        addFileRef(fileRegistry, file) // (3)
          .flatMap { result =>
            ??? // (4, 5, ...)
          }
          .handleErrorWith { error =>
            // revert (2)
            removeFileRef(documentId, file).attempt >> IO.raiseError(error)
          }
      }
      .handleErrorWith { error =>
        // revert (1)
        removeFile(file).attempt >> IO.raiseError(error)
      }
  }

? Pyramid of doom.


! , .



, , . ( ) "" .


- Saga, . / , . , . — Scala .



— :



final case class Action(
  perform:    ???,
  commit:     ???,
  compensate: ???
)

? - () => Try[T], IO — , cats.


final case class Action(
  perform:    IO[Unit],
  commit:     IO[Unit],
  compensate: IO[Unit]
)

commit perform, compensate perform ?


:


final case class Action[T](
  perform:    IO[T],
  commit:     T => IO[Unit],
  compensate: T => IO[Unit]
)

, T — .. perform.


:


def saveDataToFile(data: Data): IO[File] = ???
def removeFile(file: File): IO[Unit] = ???

def saveDataAction(data: Data): Action[File] = Action(
  perform = saveDataToFile(data),
  compensate = removeFile
)


, , ? Seq[Action[_]] — . — , .


— - ? .


:


final case class ActionChain[A, B](
  first: Action[A],
  next:  A => Action[B]
)

, — . :


sealed trait Transaction[T]

final case class Action[T](...) extends Transaction[T]
final case class ActionChain[A, B](
  first: Transaction[A],
  next:  A => Transaction[B]
) extends Transaction[B]

! , :


ActionChain(
  saveDataAction(data),
  { file => 
    ActionChain(
      addFileRefAction(documentId, file),
      { _ => 
        addFileRefAction(fileRegistry, file)
      }
    )
  }
)

, .


, — , ?



, , IO .


, . "" — .


Transaction case' . , :




private def compile[R](restOfTransaction: T => IO[R]): IO[R] = this match {
  case Action(perform, commit, compensate) => perform.flatMap { t =>
    restOfTransaction(t).redeemWith(
      bind = commit(t).attempt >> IO.pure(_),
      recover = compensate(t).attempt >> IO.raiseError(_)
    )
  }

  case ActionChain(first, next) => ???
}

, Cats

redeemWith , attempt / ( , ), >> " ", IO.pure IO.raiseError continue(t) — .


— , :


private def compile[R](restOfTransaction: T => IO[R]): IO[R] = this match {
  case Action(perform, commit, compensate) => ...

  case ActionChain(first, next) =>
    first.compile { a =>
      next(a).compile { t =>
        restOfTransaction(t)
      }
    }
}

, :


sealed trait Transaction[T] {

  def compile: IO[T] = compile(IO.pure) // "" --   
}

IO , commit/compensate , ( / IO, ).


, . , ? , .


, . :


-, Action.compile(restOfTransaction):


  1. perform , restOfTransaction ( T, perform )
  2. compile: perform, restOfTransaction ( T)
  3. perform , commit compensate restOfTransaction ( redeemWith)

ActionChain.compile(restOfTransaction) , Action' , compile':


transaction.compile(restOfTransaction) 
=== 
action1.compile(t1 => 
  action2.compile(t2 =>
    action3.compile(t3 => ...
      restOfTransaction(tn))))

, ActionChain.first ActionChain:


ActionChain(ActionChain(action1, t1 => action2), t2 => action3).compile(restOfTransaction) >>
ActionChain(action1, t1 => action2).compile(t2 => action3.compile(restOfTransaction)) >>
action1.compile(t1 => action2.compile(t2 => action3.compile(restOfTransaction))) []

Action.compile :


  1. commit compensate

?


:


  1. ( )
  2. ,
  3. ???
  4. PROFIT!

, :


ActionChain(
  saveDataAction(data),
  { file => 
    ActionChain(
      addFileRefAction(documentId, file),
      { _ => 
        addFileRefAction(fileRegistry, file)
      }
    )
  }
).compile

chain ActionChain, :


sealed trait Transaction[T] {
  def chain[R](f: T => Transaction[R]): Transaction[R] = ActionChain(this, f)
}

saveDataAction(data).chain { file =>
  addFileRefAction(documentId, file).chain { _ =>
    addFileRefAction(fileRegistry, file)
  }
}.compile

chain flatMap , , () !


, Scala, for cats, ( ).


, — !


sealed trait Transaction[T] {
  def flatMap[R](f: T => Transaction[R]): Transaction[R] = ActionChain(this, f)

  def map[R](f: T => R): Transaction[R] = flatMap { t => Action(IO(f(t))) }
}

Scala, :


def saveDataAndAddFileRefs(data: Data, documentId: ID): Transaction[Unit] = for {
  file <- saveDataAction(data)
  _    <- addFileRefAction(documentId, file)
  _    <- addFileRefAction(fileRegistry, file)
} yield ()

saveDataAndAddFileRefs(data, documentId).compile

, — . — , .


Car catsvous devez déclarer explicitement la preuve qu'il Transactions'agit d'une monade - une instance d'une classe de types:


object Transaction {
  implicit object MonadInstance extends Monad[Transaction] {
    override def pure[A](x: A): Transaction[A] = Action(IO.pure(x))

    override def flatMap[A, B](fa: Transaction[A])(f: A => Transaction[B]): Transaction[B] = fa.flatMap(f)

    //    cats-effects,       
    override def tailRecM[A, B](a: A)(f: A => Transaction[Either[A, B]]): Transaction[B] = f(a).flatMap {
      case Left(a) => tailRecM(a)(f)
      case Right(b) => pure(b)
    }
  }
}

Qu'est-ce que cela nous donne? La possibilité d'utiliser des méthodes toutes faites cats, par exemple:


val transaction = dataChunks.traverse_ { data =>
  saveDataAndAddFileRefs(data, documentId) //      
}
transaction.compile //      

Tout le code est disponible ici: https://github.com/atamurius/scala-transactions


All Articles