必威-必威-欢迎您

必威,必威官网企业自成立以来,以策略先行,经营致胜,管理为本的商,业推广理念,一步一个脚印发展成为同类企业中经营范围最广,在行业内颇具影响力的企业。

我们在这篇讨论里先介绍MongoDB,由于jdbc数据库不

2019-09-20 17:23 来源:未知

FindObservable类型的功用可以是一连串施用的结果,因为是FindObservable[A] => FindObservable[A]那样的样式,所以大家得以用一串FindObservable[Document]来扩充类别/反连串化管理,然后再另行串连施用来获得最终的FindObservable。FindObservable对应的protobuf结构如下:

gRPC帮忙上面三种互动左券:

jdbc/JDBCEngine.scala

gRPC Streaming的操作对象由服务端和顾客端组成。在多少个带有了三个不等服务的集群景况中只怕要求从多个劳务里调用另叁个服务端提供的劳动。那时调用服务端又成为了提供服务端的客商端了。那么一旦我们用streaming格局来交给服务需求及得到总结结果正是以二个服务端为Source另二个服务端为经过式passthrough Flow的stream运算了。讲详细点正是呼吁方用供给构建Source,以连接Flow的格局把须求传递给劳务提供方。服务提供方在Flow内部对需求开展管理后再把结果重返来,诉求方run那几个延续的stream应该就足以博得须求的结果了。下边我们就针对以上气象在三个由JDBC,Cassandra,MongoDB二种gRPC服务组合的集群碰着里示范在那多少个服务时期的stream连接和平运动算。

与MongoDB创设连接后可用选定Database:

前两篇大家介绍了JDBC和Cassandra的gRPC streaming完成。相对MongoDB来讲,JDBC和Cassandra援救字符类型的query语句SQL,CQL,所以把query指令转换到protobuf structures是简轻易单直接的。而MongoDB未有提供字符类的query,所以大家亟须开展MongoDB query涉及的持有品种与protobuf类型的竞相转变,实现gRPC成效会复杂的多。大家在那篇研讨里先介绍MongoDB query的protobuf调换。

2、对.proto文件实行编译后爆发相关的java数据类型和抽象服务框架

main/resources/application.conf

streaming情势的gRPC服务实在正是一个akka-stream的Flow[R1,R2,M],它把吸取的数码中华V1管理后更改到人中学华V2出口。在处理大切诺基1的环节里也许会必要别的服务的运算结果。在上述例子里CQLService把接受的音信加工转换后传给MGOService并等待MGOService再深度加工返还的结果,所以sayHelloTo照旧一个有多个节点的Flow:在率先个节点中对抽取的音讯实行加工,第4个节点把加工的新闻传给另多少个劳动并连接它的运算结果作为作者最终的出口。调用别的跨集群节点的劳必须需经该服务的gRPC顾客端举办,这里调用的MGOClient:

上面是部分对应的MongoClient构建函数:

末尾是这些FindObservable:那么些类其余行使场景是如此的:

在scala编制程序世界里大家得以用scalaPB来贯彻对gRPC和protobuf的利用。

build.sbt

JDBCService连接CQLService, CQLService连接MGOService:

有了地点这几个tupleToCanBeBsonElement隐式转换函数就足以用下边包车型大巴点子营造Document了:

message ResultTransformer { //FindObservable   int32 optType = 1;   MGOBson bsonParam = 2;   int32 valueParam = 3;}  type FOD_TYPE       = Int  val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item  val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]  val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]  val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]  val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]                           //Sets a document describing the fields to return for all matching documents  val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]  val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]                           //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)  val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]                           //Sets the cursor type  val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]                           //Sets the hint for which index to use. A null value means no hint is set  val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]                           //Sets the exclusive upper bound for a specific index. A null value means no max is set  val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]                           //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set  val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]                           //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents  val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]                           //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents  case class ResultOptions(                          optType: FOD_TYPE,                          bsonParam: Option[Bson] = None,                          valueParam: Int = 0 ){    def toProto = new sdp.grpc.services.ResultTransformer(      optType = this.optType,      bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal},      valueParam = this.valueParam    )    def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {      optType match {        case  FOD_FIRST        => find        case  FOD_FILTER       => find.filter(bsonParam.get)        case  FOD_LIMIT        => find.limit(valueParam)        case  FOD_SKIP         => find.skip(valueParam)        case  FOD_PROJECTION   => find.projection(bsonParam.get)        case  FOD_SORT         => find.sort(bsonParam.get)        case  FOD_PARTIAL      => find.partial(valueParam != 0)        case  FOD_CURSORTYPE   => find        case  FOD_HINT         => find.hint(bsonParam.get)        case  FOD_MAX          => find.max(bsonParam.get)        case  FOD_MIN          => find.min(bsonParam.get)        case  FOD_RETURNKEY    => find.returnKey(valueParam != 0)        case  FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)      }    }  }  object ResultOptions {    def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(      optType = msg.optType,      bsonParam = msg.bsonParam.map(b => unmarshal[Bson],      valueParam = msg.valueParam    )  }

3、Client-Streaming:client向server发送一串多少个request后从server接收三个response

package sdp.jdbc.engineimport java.sql.PreparedStatementimport scala.collection.generic.CanBuildFromimport akka.stream.scaladsl._import scalikejdbc._import scalikejdbc.streams._import akka.NotUsedimport akka.stream._import scala.util._import java.time._import scala.concurrent.duration._import sdp.jdbc.FileStreaming._import scalikejdbc.TxBoundary.Try._import scala.concurrent.ExecutionContextExecutorimport java.io.InputStreamobject JDBCContext {  type SQLTYPE = Int  val SQL_EXEDDL= 1  val SQL_UPDATE = 2  val RETURN_GENERATED_KEYVALUE = true  val RETURN_UPDATED_COUNT = false}case class JDBCQueryContext[M](                                dbName: Symbol,                                statement: String,                                parameters: Seq[Any] = Nil,                                fetchSize: Int = 100,                                autoCommit: Boolean = false,                                queryTimeout: Option[Int] = None)                               // extractor: WrappedResultSet => M)case class JDBCContext(                        dbName: Symbol,                        statements: Seq[String] = Nil,                        parameters: Seq[Seq[Any]] = Nil,                        fetchSize: Int = 100,                        queryTimeout: Option[Int] = None,                        queryTags: Seq[String] = Nil,                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,                        batch: Boolean = false,                        returnGeneratedKey: Seq[Option[Any]] = Nil,                        // no return: None, return by index: Some, by name: Some                        preAction: Option[PreparedStatement => Unit] = None,                        postAction: Option[PreparedStatement => Unit] = None) {  ctx =>  //helper functions  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&      !ctx.batch && ctx.statements.size == 1)      ctx.copy(preAction = action)    else      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")  }  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&      !ctx.batch && ctx.statements.size == 1)      ctx.copy(postAction = action)    else      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")  }  def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {      ctx.copy(        statements = ctx.statements ++ Seq(_statement),        parameters = ctx.parameters ++ Seq(Seq(_parameters))      )    } else      throw new IllegalStateException("JDBCContex setting error: option not supported!")  }  def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {      ctx.copy(        statements = ctx.statements ++ Seq(_statement),        parameters = ctx.parameters ++ Seq(_parameters),        returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq      )    } else      throw new IllegalStateException("JDBCContex setting error: option not supported!")  }  def appendBatchParameters(_parameters: Any*): JDBCContext = {    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")    var matchParams = true    if (ctx.parameters != Nil)      if (ctx.parameters.head.size != _parameters.size)        matchParams = false    if (matchParams) {      ctx.copy(        parameters = ctx.parameters ++ Seq(_parameters)      )    } else      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")  }  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")    ctx.copy(      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil    )  }  def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {    ctx.copy(      statements = Seq(_statement),      parameters = Seq(_parameters),      sqlType = JDBCContext.SQL_EXEDDL,      batch = false    )  }  def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {    ctx.copy(      statements = Seq(_statement),      parameters = Seq(_parameters),      returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq,      sqlType = JDBCContext.SQL_UPDATE,      batch = false    )  }  def setBatchCommand(_statement: String): JDBCContext = {    ctx.copy (      statements = Seq(_statement),      sqlType = JDBCContext.SQL_UPDATE,      batch = true    )  }  type JDBCDate = LocalDate  type JDBCDateTime = LocalDateTime  def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)  def jdbcSetNow = LocalDateTime.now()  type JDBCBlob = InputStream  def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer) = FileToInputStream(fileName,timeOut)  def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(    implicit mat: Materializer) =  InputStreamToFile(blob,fileName)}object JDBCEngine {  import JDBCContext._  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>    throw new IllegalStateException  }  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {    val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {      val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))      ctx.queryTimeout.foreach(rawSql.queryTimeout      val sql: SQL[A, HasExtractor] = rawSql.map(extractor)      sql.iterator        .withDBSessionForceAdjuster(session => {          session.connection.setAutoCommit(ctx.autoCommit)          session.fetchSize(ctx.fetchSize)        })    }    Source.fromPublisher[A](publisher)  }  def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],                                                     extractor: WrappedResultSet => A)(                                                      implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {    val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))    ctx.queryTimeout.foreach(rawSql.queryTimeout    rawSql.fetchSize(ctx.fetchSize)    implicit val session = NamedAutoSession(ctx.dbName)    val sql: SQL[A, HasExtractor] = rawSql.map(extractor)    sql.collection.apply[C]()  }  def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {    if (ctx.sqlType != SQL_EXEDDL) {      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))    }    else {      NamedDB(ctx.dbName) localTx { implicit session =>        Try {          ctx.statements.foreach { stm =>            val ddl = new SQLExecution(statement = stm, parameters = Nil)(              before = WrappedResultSet => {})(              after = WrappedResultSet => {})            ddl.apply()          }          "SQL_EXEDDL executed succesfully."        }      }    }  }  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    if (ctx.statements == Nil)      throw new IllegalStateException("JDBCContex setting error: statements empty!")    if (ctx.sqlType != SQL_UPDATE) {      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))    }    else {      if (ctx.batch) {        if (noReturnKey {          val usql = SQL(ctx.statements.head)            .tags(ctx.queryTags: _*)            .batch(ctx.parameters: _*)          Try {            NamedDB(ctx.dbName) localTx { implicit session =>              ctx.queryTimeout.foreach(session.queryTimeout              usql.apply[Seq]()              Seq.empty[Long].to[C]            }          }        } else {          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)          Try {            NamedDB(ctx.dbName) localTx { implicit session =>              ctx.queryTimeout.foreach(session.queryTimeout              usql.apply[C]()            }          }        }      } else {        Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))      }    }  }  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    val Some :: xs = ctx.returnGeneratedKey    val params: Seq[Any] = ctx.parameters match {      case Nil => Nil      case p@_ => p.head    }    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)    Try {      NamedDB(ctx.dbName) localTx { implicit session =>        session.fetchSize(ctx.fetchSize)        ctx.queryTimeout.foreach(session.queryTimeout        val result = usql.apply()        Seq.to[C]      }    }  }  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    val params: Seq[Any] = ctx.parameters match {      case Nil => Nil      case p@_ => p.head    }    val before = ctx.preAction match {      case None => pstm: PreparedStatement => {}      case Some => f    }    val after = ctx.postAction match {      case None => pstm: PreparedStatement => {}      case Some => f    }    val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)    Try {      NamedDB(ctx.dbName) localTx {implicit session =>        session.fetchSize(ctx.fetchSize)        ctx.queryTimeout.foreach(session.queryTimeout        val result = usql.apply()        Seq(result.toLong).to[C]      }    }  }  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    if (noReturnKey      singleTxUpdateNoReturnKey    else      singleTxUpdateWithReturnKey  }  private def noReturnKey(ctx: JDBCContext): Boolean = {    if (ctx.returnGeneratedKey != Nil) {      val k :: xs = ctx.returnGeneratedKey      k match {        case None => true        case Some => false      }    } else true  }  def noActon: PreparedStatement=>Unit = pstm => {}  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    Try {      NamedDB(ctx.dbName) localTx { implicit session =>        session.fetchSize(ctx.fetchSize)        ctx.queryTimeout.foreach(session.queryTimeout        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {          case Nil => Seq.fill(ctx.statements.size)          case k@_ => k        }        val sqlcmd = ctx.statements zip ctx.parameters zip keys        val results = sqlcmd.map { case ((stm, param), key) =>          key match {            case None =>              new SQLUpdate(stm, param, Nil).apply().toLong            case Some =>              new SQLUpdateWithGeneratedKey(stm, param, Nil).apply().toLong          }        }        results.to[C]      }    }  }  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {    if (ctx.statements == Nil)      throw new IllegalStateException("JDBCContex setting error: statements empty!")    if (ctx.sqlType != SQL_UPDATE) {      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))    }    else {      if (!ctx.batch) {        if (ctx.statements.size == 1)          singleTxUpdate        else          multiTxUpdates      } else        Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))    }  }  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,                                 statement: String, prepareParams: R => Seq[Any]) {    jas =>    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)    private def perform = {      import scala.concurrent._      val params = prepareParams      NamedDB autoCommit { session =>        session.execute(statement,params: _*)      }      Future.successful    }    def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =      if (processInOrder)        Flow[R].mapAsync(parallelism)      else        Flow[R].mapAsyncUnordered(parallelism)  }  object JDBCActionStream {    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)  }}
syntax = "proto3";package sdp.grpc.services;message HelloMsg {  string hello = 1;}message DataRow {    string countyname = 1;    string statename = 2;    int32 reportyear = 3;    int32 value = 4;}

这种key/value关系对应了貌似数据库表中的字段名称/字段值。上边我们品尝建三个分化格式的Document并把它们投入到同五个collection里:

MgoProtoConversion.scala

前一段时间大家追究了SDP的八个基于集群的归咎数据平台应用方案,由八种数据库组成,富含:JDBC, Cassandra及MongoDB。在那之中Cassandra和MongoDB属于遍布式数据库,能够在集群中任何计划节点调用。而JDBC数据库则是非布满式的,不大概布署在多少个节点。假诺大家把各样数据库的数目管理效果以微服务microservice格局提供出来的话,任何从别的集群节点对JDBC数据库微服务的调用都亟待开展多少的体系化(serialization)。尽管Cassandra和MongoDB是布满式的,但也只限在它们已安顿的例外节点上随意调换。未来看来,分歧的数量微服务之间的相互仍然要求经过serialization来贯彻的。也正是说数据须要经过种类化marshalling手艺发送,接收后反种类化unmarshalling能力还原数据。曾经花了点时间商量了须臾间akka-http:当时自身初步评选的所谓系统融为一体育工作具,它是由此json格式数据沟通来集成系统的。json是个规范数量格式,所以可以兑现异类系统里面包车型地铁数据交流。一向在思量,假使SDP数据平台微服务之间是透过akka-http进行数据调换的样式落实并轨的话,这几个集成的系统里面交互功能会相当的低,那是因为1、json是一种字符型的多少,并吞空间极大,传输成效自然低。2、受制于http1.0并行形式,方便了数据下行但上行数据只限于央求指令。这种方式在系统层面包车型客车互相工夫很单薄,大概很不自然,很不方便人民群众。为此也郁闷了一阵。实际上采用akka-http需求的秘籍非常高,尽管akka-http已经提供了多数赞助http操作的品类,但只不过驾驭http协议内容及httprequest,httpresponse细节、创设、使用方法就花去了本人多少个礼拜的活力,何况才刚刚完成枯浅的接头水平,假如在实际上利用中可见真的调动自然,则要求丰盛更加的多的着力了。

        val ctx = JDBCQueryContext[JDBCDataRow](          dbName = Symbol,          statement = q.statement,          parameters = params,          fetchSize = q.fetchSize.getOrElse(100),          autoCommit = q.autoCommit.getOrElse(false),          queryTimeout = q.queryTimeout        )        jdbcAkkaStream(ctx, toRow)

末尾我们用德姆oApp来演示整个进度:

其一BsonString很刺眼,用隐式调换到把它转成String:

 case class MGOContext(                         dbName: String,                         collName: String,                         actionType: MGO_ACTION_TYPE = MGO_QUERY,                         action: Option[MGOCommands] = None                       ) {    ctx =>    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)    def setCollName(name: String): MGOContext = ctx.copy(collName = name)    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some  }

1、Unary:独立的一对client-request/server-response,是我们常用的http交互方式

留神大家把Seq[Any]]品种对应成bytes类型。那样能够免止protobuf,Any类型的复杂性操作。scalaPB自动把bytes类型对应成ByteString如下:

七个劳务共用了protobuf新闻类型HelloMsg。大家把共用的音讯统一置于一个common.proto文件里:

也得以用insertMany来成批进入:

第一,Document是个serializable类,能够平昔开展连串/反连串化:

认为第4种方法最适合程序流程层面包车型地铁相互。也正是说它能够代表一种自然的程序流程,纵然它依然需求client主动发起交互连接。由于爆发的源代码中不关乎其余http左券相关项目及操作,使用起来更便于上手。

jdbc/JDBCConfig.scala

以下分别是JDBC,Cassandra,MongoDB gRPC IDL定义:

name := "learn-mongo"version := "0.1"scalaVersion := "2.12.4"libraryDependencies := Seq(    "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",    "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17")
message MGODocument {  bytes document = 1;} message MGOBson {  bytes bson = 1;}message ResultTransformer { //FindObservable   int32 optType = 1;   MGOBson bsonParam = 2;   int32 valueParam = 3;}message MGOAdminOptons {  string tarName = 1;  repeated MGOBson bsonParam  = 2;  OptionAny options = 3;  string objName = 4;}message MGOOperations {  //MGOContext  string dbName = 1;  string collName = 2;  int32 commandType = 3;  repeated MGOBson bsonParam = 4;  repeated ResultTransformer resultOptions = 5;  OptionAny options = 6;  repeated MGODocument documents = 7;  google.protobuf.BoolValue only = 8;  MGOAdminOptons adminOptions = 9;}

2、Server-Streaming:client发出三个request后从server端接收一串多少个response

main/protobuf/jdbc.proto

import sdp.grpc.cassandra.client.CQLClientclass JDBCStreamingServices(implicit ec: ExecutionContextExecutor)       extends JdbcGrpcAkkaStream.JDBCServices with LogSupport {  val cassandraClient = new CQLClient  val stub = cassandraClient.stub  def sayHelloTo(msg: String): Flow[HelloMsg,HelloMsg,NotUsed] =    Flow[HelloMsg]    .map {r => HelloMsg(r.hello + msg)}    .via(stub.greeting)  override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =    Flow[HelloMsg]    .via(sayHelloTo(",from jdbc to cassandra"))}

近年来可读性强多了。find()无条件选出全数Document。MongoDB-Scala通过Filters对象提供了整机的询问条件创设函数如equal:

BytesConverter.scala

4、Bidirectional-Streaming:如故由client首首发送request运转连接,然后在这么些一而再上client/server双方能够持续互动消息。

以上也落到实处了JDBCDataRow类型与JDBC数据表的字段相称。顾客端需求创设JDBCQuery类型如下:

接下来在示范应用的.proto文件中用import 把装有protobuf,gRPC服务概念都集中起来:

又或许转成Future后用Future方法如Await来运算:

package sdp.mongo.engineimport java.text.SimpleDateFormatimport akka.NotUsedimport akka.stream.alpakka.mongodb.scaladsl._import akka.stream.scaladsl.{Flow, Sink, Source}import org.mongodb.scala.MongoClientimport org.mongodb.scala.bson.collection.immutable.Documentimport org.bson.conversions.Bsonimport org.mongodb.scala._import org.mongodb.scala.model._import java.util.Calendarimport scala.collection.JavaConverters._import sdp.file.Streaming._import akka.stream.Materializerimport org.mongodb.scala.bson.{BsonArray, BsonBinary}import scala.concurrent._import scala.concurrent.duration._import sdp.logging.LogSupportobject MGOContext {  type MGO_ACTION_TYPE = Int  val MGO_QUERY        = 0  val MGO_UPDATE       = 1  val MGO_ADMIN        = 2  trait MGOCommands  object MGOCommands {    case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands    case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands    /*  org.mongodb.scala.FindObservable    import com.mongodb.async.client.FindIterable    val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan    .limit    .sort    .project */    case class Find(filter: Option[Bson] = None,                       andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,                       firstOnly: Boolean = false) extends MGOCommands    case class DocumentStream(filter: Option[Bson] = None,                              andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,                             ) extends MGOCommands    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands  }  object MGOAdmins {    case class DropCollection(collName: String) extends MGOCommands    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands    case class ListCollection(dbName: String) extends MGOCommands    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands  }  case class MGOContext(                         dbName: String,                         collName: String,                         actionType: MGO_ACTION_TYPE = MGO_QUERY,                         action: Option[MGOCommands] = None                       ) {    ctx =>    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)    def setCollName(name: String): MGOContext = ctx.copy(collName = name)    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some  }  object MGOContext {    def apply(db: String, coll: String) = new MGOContext  }  case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {    ctxs =>    def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)    def appendContext(ctx: MGOContext): MGOBatContext =      ctxs.copy(contexts = contexts :+ ctx)  }  object MGOBatContext {    def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext  }  type MGODate = java.util.Date  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd)    ca.getTime()  }  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd,hr,min,sec)    ca.getTime()  }  def mgoDateTimeNow: MGODate = {    val ca = Calendar.getInstance()    ca.getTime  }  def mgoDateToString(dt: MGODate, formatString: String): String = {    val fmt= new SimpleDateFormat(formatString)    fmt.format  }  type MGOBlob = BsonBinary  type MGOArray = BsonArray  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)  def mgoBlobToFile(blob: MGOBlob, fileName: String)(    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)  def mgoGetStringOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getString(fieldName))    else None  }  def mgoGetIntOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getInteger(fieldName))    else None  }  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getLong(fieldName))    else None  }  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDouble(fieldName))    else None  }  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getBoolean(fieldName))    else None  }  def mgoGetDateOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDate(fieldName))    else None  }  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]    else None  }  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOArray]]    else None  }  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {    (arr.getValues.asScala.toList)      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]  }  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]}object MGOEngine extends LogSupport {  import MGOContext._  import MGOCommands._  import MGOAdmins._  object TxUpdateMode {    private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(              implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {      log.info(s"mgoTxUpdate> calling ...")      observable.map(clientSession => {        val transactionOptions =          TransactionOptions.builder()            .readConcern(ReadConcern.SNAPSHOT)            .writeConcern(WriteConcern.MAJORITY).build()        clientSession.startTransaction(transactionOptions)        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdateObservable[Completed].map.toFuture()        }        Await.ready(fut, 3 seconds)        clientSession      })    }    private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"commitAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {          log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")          commitAndRetry(observable)        }        case e: Exception => {          log.error(s"commitAndRetry> Exception during commit ...: $e")          throw e        }      })    }    private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"runTransactionAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {          log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")          runTransactionAndRetry(observable)        }      })    }    def mgoTxBatch(ctxs: MGOBatContext)(            implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {      log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")      val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession      val commitTransactionObservable: SingleObservable[Completed] =        updateObservable.flatMap(clientSession => clientSession.commitTransaction      val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)      runTransactionAndRetry(commitAndRetryObservable)    }.toFuture()  }  def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {    log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")    if  {        TxUpdateMode.mgoTxBatch      } else {        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdate[Completed].map }        Await.ready(fut, 3 seconds)        Future.successful(new Completed)      }  }  // T => FindIterable  e.g List[Document]  def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = {    log.info(s"mgoQuery>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoQuery> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    ctx.action.get match {      /* count */      case Count(Some, Some => //SingleObservable        coll.countDocuments(filter, opt.asInstanceOf[CountOptions])          .toFuture().asInstanceOf[Future[T]]      case Count(Some, None) => //SingleObservable        coll.countDocuments.toFuture()          .asInstanceOf[Future[T]]      case Count(None, None) => //SingleObservable        coll.countDocuments().toFuture()          .asInstanceOf[Future[T]]      /* distinct */      case Distict(field, Some => //DistinctObservable        coll.distinct(field, filter).toFuture()          .asInstanceOf[Future[T]]      case Distict(field, None) => //DistinctObservable        coll.distinct.toFuture()          .asInstanceOf[Future[T]]      /* find */      case Find(None, None, false) => //FindObservable        if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]        else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]      case Find(None, None, true) => //FindObservable        if (Converter == None) coll.find.head().asInstanceOf[Future[T]]        else coll.find.map(Converter.get).head().asInstanceOf[Future[T]]      case Find(Some, None, false) => //FindObservable        if (Converter == None) coll.find.toFuture().asInstanceOf[Future[T]]        else coll.find.map(Converter.get).toFuture().asInstanceOf[Future[T]]      case Find(Some, None, true) => //FindObservable        if (Converter == None) coll.find.first.asInstanceOf[Future[T]]        else coll.find.first().map(Converter.get).head().asInstanceOf[Future[T]]      case Find(None, Some => //FindObservable        if (Converter == None) next(coll.find[Document].toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document].map(Converter.get).toFuture().asInstanceOf[Future[T]]      case Find(Some, Some => //FindObservable        if (Converter == None) next(coll.find[Document].toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document].map(Converter.get).toFuture().asInstanceOf[Future[T]]      /* aggregate AggregateObservable*/      case Aggregate => coll.aggregate.toFuture().asInstanceOf[Future[T]]      /* mapReduce MapReduceObservable*/      case MapReduce => coll.mapReduce.toFuture().asInstanceOf[Future[T]]      /* list collection */      case ListCollection =>   //ListConllectionObservable        client.getDatabase.listCollections().toFuture().asInstanceOf[Future[T]]    }  }  //T => Completed, result.UpdateResult, result.DeleteResult  def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] =    mgoUpdateObservable[T].toFuture()  def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {    log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoUpdateObservable> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    ctx.action.get match {      /* insert */      case Insert(docs, Some =>                  //SingleObservable[Completed]        if (docs.size > 1)          coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]        else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]      case Insert(docs, None) =>                       //SingleObservable        if (docs.size > 1) coll.insertMany.asInstanceOf[SingleObservable[T]]        else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]      /* delete */      case Delete(filter, None, onlyOne) =>            //SingleObservable        if  coll.deleteOne.asInstanceOf[SingleObservable[T]]        else coll.deleteMany.asInstanceOf[SingleObservable[T]]      case Delete(filter, Some, onlyOne) =>       //SingleObservable        if  coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]        else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]      /* replace */      case Replace(filter, replacement, None) =>        //SingleObservable        coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]      case Replace(filter, replacement, Some =>    //SingleObservable        coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]      /* update */      case Update(filter, update, None, onlyOne) =>      //SingleObservable        if  coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]        else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]      case Update(filter, update, Some, onlyOne) => //SingleObservable        if  coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]        else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]      /* bulkWrite */      case BulkWrite(commands, None) =>                  //SingleObservable        coll.bulkWrite.asInstanceOf[SingleObservable[T]]      case BulkWrite(commands, Some =>             //SingleObservable        coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]    }  }  def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = {    log.info(s"mgoAdmin>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoAdmin> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    ctx.action.get match {      /* drop collection */      case DropCollection =>                   //SingleObservable        val coll = db.getCollection        coll.drop()      /* create collection */      case CreateCollection(collName, None) =>           //SingleObservable        db.createCollection      case CreateCollection(collName, Some =>      //SingleObservable        db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions])      /* list collection      case ListCollection =>   //ListConllectionObservable        client.getDatabase.listCollections().toFuture().asInstanceOf[Future[T]]        */      /* create view */      case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable        db.createView(viewName, viewOn, pline)      case CreateView(viewName, viewOn, pline, Some =>  //SingleObservable        db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions])      /* create index */      case CreateIndex(key, None) =>                     //SingleObservable        coll.createIndex.asInstanceOf[SingleObservable[Completed]]      case CreateIndex(key, Some =>                //SingleObservable        coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]]      /* drop index */      case DropIndexByName(indexName, None) =>           //SingleObservable        coll.dropIndex(indexName)      case DropIndexByName(indexName, Some =>      //SingleObservable        coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions])      case DropIndexByKey(key, None) =>                  //SingleObservable        coll.dropIndex      case DropIndexByKey(key, Some =>             //SingleObservable        coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions])      case DropAllIndexes =>                       //SingleObservable        coll.dropIndexes()      case DropAllIndexes) =>                  //SingleObservable        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions])    }  }/*    def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    ctx.action match {      /* count */      case Count(Some, Some =>   //SingleObservable        coll.countDocuments(filter, opt.asInstanceOf[CountOptions])          .toFuture().asInstanceOf[Future[T]]      case Count(Some, None) =>        //SingleObservable        coll.countDocuments.toFuture()          .asInstanceOf[Future[T]]      case Count(None, None) =>                //SingleObservable        coll.countDocuments().toFuture()          .asInstanceOf[Future[T]]      /* distinct */      case Distict(field, Some =>     //DistinctObservable        coll.distinct(field, filter).toFuture()          .asInstanceOf[Future[T]]      case Distict(field, None) =>             //DistinctObservable        coll.distinct.toFuture()          .asInstanceOf[Future[T]]      /* find */      case Find(None, None, optConv, false) =>  //FindObservable        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(None, None, optConv, true) =>   //FindObservable        if (optConv == None) coll.find.head().asInstanceOf[Future[T]]        else coll.find.map(optConv.get).head().asInstanceOf[Future[T]]      case Find(Some, None, optConv, false) =>   //FindObservable        if (optConv == None) coll.find.toFuture().asInstanceOf[Future[T]]        else coll.find.map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(Some, None, optConv, true) =>   //FindObservable        if (optConv == None) coll.find.first.asInstanceOf[Future[T]]        else coll.find.first().map(optConv.get).head().asInstanceOf[Future[T]]      case Find(None, Some, optConv, _) =>   //FindObservable        if (optConv == None) next(coll.find[Document].toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document].map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(Some, Some, optConv, _) =>  //FindObservable        if (optConv == None) next(coll.find[Document].toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document].map(optConv.get).toFuture().asInstanceOf[Future[T]]      /* aggregate AggregateObservable*/      case Aggregate => coll.aggregate.toFuture().asInstanceOf[Future[T]]      /* mapReduce MapReduceObservable*/      case MapReduce => coll.mapReduce.toFuture().asInstanceOf[Future[T]]      /* insert */      case Insert(docs, Some =>                  //SingleObservable[Completed]        if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()          .asInstanceOf[Future[T]]        else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()          .asInstanceOf[Future[T]]      case Insert(docs, None) =>                       //SingleObservable        if (docs.size > 1) coll.insertMany.toFuture().asInstanceOf[Future[T]]        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]      /* delete */      case Delete(filter, None, onlyOne) =>            //SingleObservable        if  coll.deleteOne.toFuture().asInstanceOf[Future[T]]        else coll.deleteMany.toFuture().asInstanceOf[Future[T]]      case Delete(filter, Some, onlyOne) =>       //SingleObservable        if  coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]        else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]      /* replace */      case Replace(filter, replacement, None) =>        //SingleObservable        coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]      case Replace(filter, replacement, Some =>    //SingleObservable        coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]      /* update */      case Update(filter, update, None, onlyOne) =>      //SingleObservable        if  coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]        else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]      case Update(filter, update, Some, onlyOne) => //SingleObservable        if  coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]        else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]      /* bulkWrite */      case BulkWrite(commands, None) =>                  //SingleObservable        coll.bulkWrite.toFuture().asInstanceOf[Future[T]]      case BulkWrite(commands, Some =>             //SingleObservable        coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]      /* drop collection */      case DropCollection =>                   //SingleObservable        val coll = db.getCollection        coll.drop().toFuture().asInstanceOf[Future[T]]      /* create collection */      case CreateCollection(collName, None) =>           //SingleObservable        db.createCollection.toFuture().asInstanceOf[Future[T]]      case CreateCollection(collName, Some =>      //SingleObservable        db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]      /* list collection */      case ListCollection =>   //ListConllectionObservable        client.getDatabase.listCollections().toFuture().asInstanceOf[Future[T]]      /* create view */      case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable        db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]      case CreateView(viewName, viewOn, pline, Some =>  //SingleObservable        db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]      /* create index */      case CreateIndex(key, None) =>                     //SingleObservable        coll.createIndex.toFuture().asInstanceOf[Future[T]]      case CreateIndex(key, Some =>                //SingleObservable        coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]      /* drop index */      case DropIndexByName(indexName, None) =>           //SingleObservable        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]      case DropIndexByName(indexName, Some =>      //SingleObservable        coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]      case DropIndexByKey(key, None) =>                  //SingleObservable        coll.dropIndex.toFuture().asInstanceOf[Future[T]]      case DropIndexByKey(key, Some =>             //SingleObservable        coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]      case DropAllIndexes =>                       //SingleObservable        coll.dropIndexes().toFuture().asInstanceOf[Future[T]]      case DropAllIndexes) =>                  //SingleObservable        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]    }  }*/  def mongoStream(ctx: MGOContext)(    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {    log.info(s"mongoStream>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mongoStream> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    ctx.action.get match {      case DocumentStream(None, None) =>        MongoSource(coll.find      case DocumentStream(Some, None) =>        MongoSource(coll.find      case DocumentStream(None, Some =>        MongoSource(next(coll.find      case DocumentStream(Some, Some =>        MongoSource(next(coll.find    }  }}object MongoActionStream {  import MGOContext._  case class StreamingInsert[A](dbName: String,                                collName: String,                                converter: A => Document,                                parallelism: Int = 1                               ) extends MGOCommands  case class StreamingDelete[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class StreamingUpdate[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                toUpdate: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class InsertAction[A](ctx: StreamingInsert[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =      Flow[A].map(ctx.converter)        .mapAsync(ctx.parallelism)(doc => collection.insertOne.toFuture().map(_ => doc))  }  case class UpdateAction[A](ctx: StreamingUpdate[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateOne(ctx.toFilter, ctx.toUpdate.toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateMany(ctx.toFilter, ctx.toUpdate.toFuture().map(_ => a))  }  case class DeleteAction[A](ctx: StreamingDelete[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteOne(ctx.toFilter.toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteMany(ctx.toFilter.toFuture().map(_ => a))  }}object MGOHelpers {  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {    override val converter:  => String =  => doc.toJson  }  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {    override val converter:  => String =  => doc.toString  }  trait ImplicitObservable[C] {    val observable: Observable[C]    val converter:  => String    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)    def headResult() = Await.result(observable.head(), 10 seconds)    def printResults(initial: String = ""): Unit = {      if (initial.length > 0) print      results().foreach(res => println(converter    }    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult}")  }  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {    Await.result(fut, timeOut)  }  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {    Await.result(fut, timeOut)  }  import monix.eval.Task  import monix.execution.Scheduler.Implicits.global  final class FutureToTask[A](x: => Future[A]) {    def asTask: Task[A] = Task.deferFuture[A]  }  final class TaskToFuture[A](x: => Task[A]) {    def asFuture: Future[A] = x.runAsync  }}

google gRPC的运用流程如下:

syntax = "proto3";import "google/protobuf/wrappers.proto";import "google/protobuf/any.proto";import "scalapb/scalapb.proto";package grpc.jdbc.services;option (scalapb.options) = {  // use a custom Scala package name  // package_name: "io.ontherocks.introgrpc.demo"  // don't append file name to package  flat_package: true  // generate one Scala file for all messages (services still get their own file)  single_file: true  // add imports to generated file  // useful when extending traits or using custom types  // import: "io.ontherocks.hellogrpc.RockingMessage"  // code to put at the top of generated file  // works only with `single_file: true`  //preamble: "sealed trait SomeSealedTrait"};/* * Demoes various customization options provided by ScalaPBs. */message JDBCDataRow { string year = 1; string state = 2; string county = 3; string value = 4;}message JDBCQuery {  string dbName = 1;  string statement = 2;  bytes parameters = 3;  google.protobuf.Int32Value fetchSize= 4;  google.protobuf.BoolValue autoCommit = 5;  google.protobuf.Int32Value queryTimeout = 6;}service JDBCServices {  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}}
package demo.sdp.grpcimport akka.actor.ActorSystemimport akka.stream.{ActorMaterializer, ThrottleMode}import sdp.grpc.jdbc.client.JDBCClientobject DemoApp extends App {  implicit val system = ActorSystem("jdbcClient")  implicit val mat = ActorMaterializer.create  implicit val ec = system.dispatcher  val jdbcClient = new JDBCClient  jdbcClient.sayHello.runForeach(r => println  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}

collection中Document类的营造函数如下:

TAG标签:
版权声明:本文由必威发布于必威-编程,转载请注明出处:我们在这篇讨论里先介绍MongoDB,由于jdbc数据库不