diff --git a/build.sbt b/build.sbt index 137b0751..7defaf08 100644 --- a/build.sbt +++ b/build.sbt @@ -46,6 +46,20 @@ lazy val core = project ) ) +lazy val api = project + .in(file("modules/api")) + .enablePlugins(Smithy4sCodegenPlugin) + .settings( + name := "api", + commonSettings, + smithy4sWildcardArgument := "?", + libraryDependencies ++= Seq( + catsCore, + smithy4sCore + ) + ) + .dependsOn(core) + lazy val elastic = project .in(file("modules/elastic")) .settings( @@ -61,21 +75,7 @@ lazy val elastic = project otel4sCore ) ) - .dependsOn(core) - -lazy val api = project - .in(file("modules/api")) - .enablePlugins(Smithy4sCodegenPlugin) - .settings( - name := "api", - commonSettings, - smithy4sWildcardArgument := "?", - libraryDependencies ++= Seq( - catsCore, - smithy4sCore - ) - ) - .dependsOn(core) + .dependsOn(api, core) lazy val ingestor = project .in(file("modules/ingestor")) @@ -84,8 +84,9 @@ lazy val ingestor = project name := "ingestor", commonSettings, buildInfoSettings, - publish := {}, - publish / skip := true, + dockerBaseImage := "docker.io/library/eclipse-temurin:21-jdk", + publish := {}, + publish / skip := true, libraryDependencies ++= Seq( chess, catsCore, @@ -141,8 +142,9 @@ lazy val app = project name := "lila-search", commonSettings, buildInfoSettings, - publish := {}, - publish / skip := true, + dockerBaseImage := "docker.io/library/eclipse-temurin:21-jdk", + publish := {}, + publish / skip := true, libraryDependencies ++= Seq( smithy4sHttp4s, jsoniterCore, diff --git a/modules/api/src/main/smithy/search.smithy b/modules/api/src/main/smithy/search.smithy index 0dd78674..bc72cad6 100644 --- a/modules/api/src/main/smithy/search.smithy +++ b/modules/api/src/main/smithy/search.smithy @@ -70,6 +70,15 @@ structure Forum { troll: Boolean = false } +structure Ublog { + @required + queryText: String + @required + by: SortBlogsBy + minQuality: Integer + language: String +} + structure Team { @required text: String @@ -133,9 +142,17 @@ list Perfs { member: Integer } +enum SortBlogsBy { + newest + oldest + score + likes +} + @adt union Query { forum: Forum + ublog: Ublog game: Game study: Study team: Team diff --git a/modules/app/src/main/scala/service.search.scala b/modules/app/src/main/scala/service.search.scala index 6198ac4a..f6e5db79 100644 --- a/modules/app/src/main/scala/service.search.scala +++ b/modules/app/src/main/scala/service.search.scala @@ -8,6 +8,7 @@ import lila.search.game.Game import lila.search.spec.* import lila.search.study.Study import lila.search.team.Team +import lila.search.ublog.Ublog import org.typelevel.log4cats.{ Logger, LoggerFactory } import org.typelevel.otel4s.metrics.{ Histogram, Meter } import org.typelevel.otel4s.{ Attribute, AttributeKey, Attributes } @@ -82,6 +83,7 @@ object SearchServiceImpl: def searchDef(from: From, size: Size) = query match case q: Query.Forum => q.to[Forum].searchDef(from, size) + case q: Query.Ublog => q.to[Ublog].searchDef(from, size) case q: Query.Game => q.to[Game].searchDef(from, size) case q: Query.Study => q.to[Study].searchDef(from, size) case q: Query.Team => q.to[Team].searchDef(from, size) @@ -89,12 +91,14 @@ object SearchServiceImpl: def countDef = query match case q: Query.Forum => q.to[Forum].countDef + case q: Query.Ublog => q.to[Ublog].countDef case q: Query.Game => q.to[Game].countDef case q: Query.Study => q.to[Study].countDef case q: Query.Team => q.to[Team].countDef def index = query match case _: Query.Forum => Index.Forum + case _: Query.Ublog => Index.Ublog case _: Query.Game => Index.Game case _: Query.Study => Index.Study case _: Query.Team => Index.Team diff --git a/modules/core/src/main/scala/models.scala b/modules/core/src/main/scala/models.scala index 403ca382..b99a7c43 100644 --- a/modules/core/src/main/scala/models.scala +++ b/modules/core/src/main/scala/models.scala @@ -29,6 +29,7 @@ object Id: enum Index(val value: String): case Forum extends Index("forum") + case Ublog extends Index("ublog") case Game extends Index("game") case Study extends Index("study") case Team extends Index("team") @@ -37,6 +38,7 @@ object Index: def fromString(value: String): Either[String, Index] = value match case "forum" => Index.Forum.asRight + case "ublog" => Index.Ublog.asRight case "game" => Index.Game.asRight case "study" => Index.Study.asRight case "team" => Index.Team.asRight diff --git a/modules/e2e/src/test/scala/IntegrationSuite.scala b/modules/e2e/src/test/scala/IntegrationSuite.scala index c74f6711..337131d4 100644 --- a/modules/e2e/src/test/scala/IntegrationSuite.scala +++ b/modules/e2e/src/test/scala/IntegrationSuite.scala @@ -74,6 +74,28 @@ object IntegrationSuite extends IOSuite: y <- service.search(Query.forum("nt9", false), from, size) yield expect(x.hitIds.size == 1 && x == y) + test("ublog"): res => + Clients + .search(uri) + .use: service => + for + _ <- res.esClient.putMapping(Index.Ublog) + _ <- res.esClient.store( + Index.Ublog, + Id("abcdefgh"), + ingestor.UblogSource( + text = "lil bubber, hayo!", + language = "en", + likes = 0, + date = Instant.now().toEpochMilli(), + quality = 1.some + ) + ) + _ <- res.esClient.refreshIndex(Index.Ublog) + x <- service.search(Query.ublog("lil bubber", SortBlogsBy.score, 1.some), from, size) + y <- service.search(Query.ublog("hayo", SortBlogsBy.newest, 2.some), from, size) + yield expect(x.hitIds.size == 1 && y.hitIds.isEmpty) + test("team"): res => Clients .search(uri) diff --git a/modules/elastic/src/main/scala/package.scala b/modules/elastic/src/main/scala/package.scala index 3fa3fecd..e791c479 100644 --- a/modules/elastic/src/main/scala/package.scala +++ b/modules/elastic/src/main/scala/package.scala @@ -21,6 +21,7 @@ extension (index: Index) def mapping = index match case Index.Forum => forum.Mapping.fields + case Index.Ublog => ublog.Mapping.fields case Index.Game => game.Mapping.fields case Index.Study => study.Mapping.fields case Index.Team => team.Mapping.fields diff --git a/modules/elastic/src/main/scala/study.scala b/modules/elastic/src/main/scala/study.scala index db7c1ae6..d7ec6b66 100644 --- a/modules/elastic/src/main/scala/study.scala +++ b/modules/elastic/src/main/scala/study.scala @@ -9,7 +9,7 @@ case class Study(text: String, userId: Option[String]): def searchDef(from: From, size: Size) = search(Study.index) - .query(makeQuery) + .query(makeQuery()) .fetchSource(false) .sortBy( fieldSort("_score").order(SortOrder.DESC), @@ -18,9 +18,9 @@ case class Study(text: String, userId: Option[String]): .start(from.value) .size(size.value) - def countDef = count(Study.index).query(makeQuery) + def countDef = count(Study.index).query(makeQuery()) - private def makeQuery = { + private def makeQuery() = { val parsed = QueryParser(text, List("owner", "member")) val matcher: Query = if parsed.terms.isEmpty then matchAllQuery() diff --git a/modules/elastic/src/main/scala/ublog.scala b/modules/elastic/src/main/scala/ublog.scala new file mode 100644 index 00000000..0f0ee927 --- /dev/null +++ b/modules/elastic/src/main/scala/ublog.scala @@ -0,0 +1,76 @@ +package lila.search +package ublog + +import com.sksamuel.elastic4s.ElasticDsl.* +import com.sksamuel.elastic4s.requests.searches.sort.SortOrder + +import spec.SortBlogsBy + +case class Ublog( + queryText: String, + by: SortBlogsBy, + minQuality: Option[Int], + language: Option[String] +): + + val sanitized = queryText + .trim() + .toLowerCase() + .replaceAll("""([\-=&|> s + case s => s.replace(":", " ") // devs can use the query string until we get a ui for lang/quality + .mkString(" ") + + def searchDef(from: From, size: Size) = + val sortFields = + (if by == SortBlogsBy.score then Seq(scoreSort().order(SortOrder.DESC)) + else if by == SortBlogsBy.likes then Seq(fieldSort("likes").order(SortOrder.DESC)) + else Nil) ++ Seq( + fieldSort("quality").order(SortOrder.DESC).missing("_last"), + fieldSort("date") + .order(if by == SortBlogsBy.oldest then SortOrder.ASC else SortOrder.DESC) + .missing("_last") + ) + search(Ublog.index) + .query(makeQuery()) + .fetchSource(false) + .sortBy(sortFields*) + .start(from.value) + .size(size.value) + + def countDef = count(Ublog.index).query(makeQuery()) + + private def makeQuery() = + boolQuery() + .must(queryStringQuery(sanitized).defaultField(Fields.text)) + .filter( + List( + minQuality.map(f => rangeQuery(Fields.quality).gte(f)), + language.map(l => termQuery(Fields.language, l)) + ).flatten + ) + +object Ublog: + val index = "ublog" + +object Fields: + val text = "text" + val likes = "likes" + val quality = "quality" + val language = "language" + val date = "date" + +object Mapping: + import Fields.* + def fields = + Seq( + textField(text), + shortField(quality).copy(docValues = Some(true)), + keywordField(language).copy(docValues = Some(false)), + shortField(likes).copy(docValues = Some(true)), + dateField(date).copy(docValues = Some(true)) + ) diff --git a/modules/ingestor/src/main/scala/app.config.scala b/modules/ingestor/src/main/scala/app.config.scala index e50f8516..1f4f4e16 100644 --- a/modules/ingestor/src/main/scala/app.config.scala +++ b/modules/ingestor/src/main/scala/app.config.scala @@ -51,6 +51,7 @@ object ElasticConfig: case class IngestorConfig( forum: IngestorConfig.Forum, + ublog: IngestorConfig.Ublog, team: IngestorConfig.Team, study: IngestorConfig.Study, game: IngestorConfig.Game @@ -58,6 +59,7 @@ case class IngestorConfig( object IngestorConfig: case class Forum(batchSize: Int, timeWindows: Int, startAt: Option[Instant], maxPostLength: Int) + case class Ublog(batchSize: Int, timeWindows: Int, startAt: Option[Instant]) case class Team(batchSize: Int, timeWindows: Int, startAt: Option[Instant]) case class Study(batchSize: Int, startAt: Option[Instant], interval: FiniteDuration, databaseName: String) case class Game(batchSize: Int, timeWindows: Int, startAt: Option[Instant]) @@ -73,6 +75,15 @@ object IngestorConfig: env("INGESTOR_FORUM_MAX_POST_LENGTH").or(prop("ingestor.forum.max.post.length")).as[Int].default(5_000) def config = (batchSize, timeWindows, startAt, maxPostLength).parMapN(Forum.apply) + private object Ublog: + private def batchSize = + env("INGESTOR_UBLOG_BATCH_SIZE").or(prop("ingestor.ublog.batch.size")).as[Int].default(100) + private def timeWindows = + env("INGESTOR_UBLOG_TIME_WINDOWS").or(prop("ingestor.ublog.time.windows")).as[Int].default(10) + private def startAt = + env("INGESTOR_UBLOG_START_AT").or(prop("ingestor.ublog.start.at")).as[Instant].option + def config = (batchSize, timeWindows, startAt).parMapN(Ublog.apply) + private object Team: private def batchSize = env("INGESTOR_TEAM_BATCH_SIZE").or(prop("ingestor.team.batch.size")).as[Int].default(100) @@ -104,7 +115,7 @@ object IngestorConfig: env("INGESTOR_GAME_START_AT").or(prop("ingestor.game.start.at")).as[Instant].option def config = (batchSize, timeWindows, startAt).mapN(Game.apply) - def config = (Forum.config, Team.config, Study.config, Game.config).mapN(IngestorConfig.apply) + def config = (Forum.config, Ublog.config, Team.config, Study.config, Game.config).mapN(IngestorConfig.apply) object CirisCodec: given ConfigDecoder[String, Instant] = ConfigDecoder[String] diff --git a/modules/ingestor/src/main/scala/cli.scala b/modules/ingestor/src/main/scala/cli.scala index dac0a3a1..c512c2f2 100644 --- a/modules/ingestor/src/main/scala/cli.scala +++ b/modules/ingestor/src/main/scala/cli.scala @@ -54,6 +54,8 @@ object cli opts.index match case Index.Forum => ingestor.forum.run(opts.since, opts.until, opts.dry) + case Index.Ublog => + ingestor.ublog.run(opts.since, opts.until, opts.dry) case Index.Study => ingestor.study.run(opts.since, opts.until, opts.dry) case Index.Game => @@ -62,6 +64,7 @@ object cli ingestor.team.run(opts.since, opts.until, opts.dry) case _ => ingestor.forum.run(opts.since, opts.until, opts.dry) *> + ingestor.ublog.run(opts.since, opts.until, opts.dry) *> ingestor.study.run(opts.since, opts.until, opts.dry) *> ingestor.game.run(opts.since, opts.until, opts.dry) *> ingestor.team.run(opts.since, opts.until, opts.dry) @@ -72,12 +75,15 @@ object cli ingestor.game.watch(opts.since.some, opts.dry) case Index.Forum => ingestor.forum.watch(opts.since.some, opts.dry) + case Index.Ublog => + ingestor.ublog.watch(opts.since.some, opts.dry) case Index.Team => ingestor.team.watch(opts.since.some, opts.dry) case Index.Study => ingestor.study.watch(opts.since.some, opts.dry) case _ => ingestor.forum.watch(opts.since.some, opts.dry) *> + ingestor.ublog.watch(opts.since.some, opts.dry) *> ingestor.team.watch(opts.since.some, opts.dry) *> ingestor.study.watch(opts.since.some, opts.dry) *> ingestor.game.watch(opts.since.some, opts.dry) @@ -95,7 +101,7 @@ object opts: long = "index", help = "Target index", short = "i", - metavar = "forum|team|study|game" + metavar = "forum|ublog|team|study|game" ) val allIndexOpt = diff --git a/modules/ingestor/src/main/scala/ingestors.scala b/modules/ingestor/src/main/scala/ingestors.scala index 9a36057a..55f63f32 100644 --- a/modules/ingestor/src/main/scala/ingestors.scala +++ b/modules/ingestor/src/main/scala/ingestors.scala @@ -8,12 +8,13 @@ import org.typelevel.log4cats.LoggerFactory class Ingestors( val forum: Ingestor, + val ublog: Ingestor, val study: Ingestor, val game: Ingestor, val team: Ingestor ): def run(): IO[Unit] = - List(forum.watch, team.watch, study.watch, game.watch).parSequence_ + List(forum.watch, ublog.watch, team.watch, study.watch, game.watch).parSequence_ object Ingestors: @@ -27,12 +28,14 @@ object Ingestors: )(using LoggerFactory[IO]): IO[Ingestors] = ( ForumRepo(lichess, config.forum), + UblogRepo(lichess, config.ublog), StudyRepo(study, local, config.study), GameRepo(lichess, config.game), TeamRepo(lichess, config.team) - ).mapN: (forums, studies, games, teams) => + ).mapN: (forums, ublogs, studies, games, teams) => new Ingestors( Ingestor(Index.Forum, forums, store, elastic, config.forum.startAt), + Ingestor(Index.Ublog, ublogs, store, elastic, config.ublog.startAt), Ingestor(Index.Study, studies, store, elastic, config.study.startAt), Ingestor(Index.Game, games, store, elastic, config.game.startAt), Ingestor(Index.Team, teams, store, elastic, config.team.startAt) diff --git a/modules/ingestor/src/main/scala/mongo.ublog.scala b/modules/ingestor/src/main/scala/mongo.ublog.scala new file mode 100644 index 00000000..981d165e --- /dev/null +++ b/modules/ingestor/src/main/scala/mongo.ublog.scala @@ -0,0 +1,132 @@ +package lila.search +package ingestor + +import cats.effect.IO +import cats.syntax.all.* +import com.mongodb.client.model.changestream.FullDocument +import com.mongodb.client.model.changestream.OperationType.* +import mongo4cats.bson.Document +import mongo4cats.database.MongoDatabase +import mongo4cats.models.collection.ChangeStreamDocument +import mongo4cats.operations.{ Aggregate, Filter, Projection } +import org.typelevel.log4cats.syntax.* +import org.typelevel.log4cats.{ Logger, LoggerFactory } + +import java.time.Instant +import scala.concurrent.duration.* + +import Repo.{ *, given } + +object UblogRepo: + + private val interestedOperations = List(DELETE, INSERT, REPLACE, UPDATE).map(_.getValue) + + private val interestedFields = + List( + _id, + F.markdown, + F.title, + F.intro, + F.topics, + F.blog, + F.live, + F.livedAt, + F.likes, + F.language, + F.quality + ) + private val postProjection = Projection.include(interestedFields) + + private val interestedEventFields = + List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _) + private val eventProjection = Projection.include(interestedEventFields) + + private def aggregate() = + Aggregate + .matchBy(Filter.in("operationType", interestedOperations)) + .combinedWith(Aggregate.project(eventProjection)) + + def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Ublog)(using + LoggerFactory[IO] + ): IO[Repo[UblogSource]] = + given Logger[IO] = LoggerFactory[IO].getLogger + mongo.getCollection("ublog_post").map(apply(config)) + + def apply(config: IngestorConfig.Ublog)( + posts: MongoCollection + )(using Logger[IO]): Repo[UblogSource] = new: + + def fetch(since: Instant, until: Instant) = + val filter = range(F.livedAt)(since, until.some) + fs2.Stream.eval(info"Fetching blog posts from $since to $until") *> + posts + .find(filter) + .projection(postProjection) + .boundedStream(config.batchSize) + .chunkN(config.batchSize) + .map(_.toList) + .metered(1.second) + .map: docs => + val (toDelete, toIndex) = docs.partition(!_.isLive) + Result(toIndex.toSources, toDelete.flatten(using _.id.map(Id.apply)), none) + + def watch(since: Option[Instant]): fs2.Stream[IO, Result[UblogSource]] = + val builder = posts.watch(aggregate()) + // skip the first event if we're starting from a specific timestamp + // since the event at that timestamp is already indexed + val skip = since.fold(0)(_ => 1) + since + .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) + .fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event + .batchSize(config.batchSize) + .boundedStream(config.batchSize) + .drop(skip) + .evalTap(x => debug"Ublog event: $x") + .groupWithin(config.batchSize, config.timeWindows.second) + .map(_.toList.distincByDocId) + .map: docs => + val lastEventTimestamp = docs.flatten(using _.clusterTime.flatMap(_.asInstant)).maxOption + val (toDelete, toIndex) = docs.partition(_.isDelete) + Result( + toIndex.flatten(using _.fullDocument).toSources, + toDelete.flatten(using _.docId.map(Id.apply)), + lastEventTimestamp + ) + + extension (docs: List[Document]) + private def toSources: List[(String, UblogSource)] = + docs.flatten(using doc => (doc.id, doc.toSource).mapN(_ -> _)) + + extension (doc: Document) + private def toSource: Option[UblogSource] = + for + title <- doc.getString(F.title) + intro <- doc.getString(F.intro) + body <- doc.getString(F.markdown) + author <- doc.getString(F.blog).map(_.split(":")(1)) + language <- doc.getString(F.language) + likes <- doc.getAs[Int](F.likes) + topics <- doc.getAs[List[String]](F.topics).map(_.mkString(" ").replaceAll("Chess", "")) + text = s"$title\n$topics\n$author\n$intro\n$body" + date <- doc.getNested(F.livedAt).flatMap(_.asInstant).map(_.toEpochMilli) + quality = doc.getNestedAs[Int](F.quality) + yield UblogSource(text, language, likes, date, quality) + + private def isLive: Boolean = + doc.getBoolean("live").contains(true) && !doc.getNestedAs[Int](F.quality).exists(_ == 0) + + extension (event: ChangeStreamDocument[Document]) + private def isDelete: Boolean = + event.operationType == DELETE || event.fullDocument.exists(!_.isLive) + + object F: + val markdown = "markdown" + val title = "title" + val intro = "intro" + val blog = "blog" + val language = "language" + val likes = "likes" + val live = "live" + val livedAt = "lived.at" + val quality = "automod.quality" + val topics = "topics" diff --git a/modules/ingestor/src/main/smithy/model.smithy b/modules/ingestor/src/main/smithy/model.smithy index 26ae628f..1bf01669 100644 --- a/modules/ingestor/src/main/smithy/model.smithy +++ b/modules/ingestor/src/main/smithy/model.smithy @@ -111,3 +111,15 @@ structure TeamSource { @jsonName("nbm") nbMembers: Integer } + +structure UblogSource { + @required + text: String + @required + language: String + @required + likes: Integer + @required + date: Long + quality: Integer +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 22002d49..19cd409c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ourResolvers = Seq(lilaMaven, jitpack) object V { - val catsEffect = "3.6.1" + val catsEffect = "3.6.2" val chess = "17.8.5" val ciris = "3.9.0" val decline = "2.5.0" @@ -17,7 +17,7 @@ object Dependencies { val http4s = "0.23.30" val iron = "2.5.0" val mongo4cats = "0.7.13" - val otel4s = "0.13.0" + val otel4s = "0.13.1" } def http4s(artifact: String) = "org.http4s" %% s"http4s-$artifact" % V.http4s @@ -45,8 +45,8 @@ object Dependencies { lazy val smithy4sHttp4sSwagger = smithy4s("http4s-swagger") lazy val smithy4sJson = smithy4s("json") - val jsoniterCore = "com.github.plokhotnyuk.jsoniter-scala" %% "jsoniter-scala-core" % "2.36.6" - val jsoniterMacro = "com.github.plokhotnyuk.jsoniter-scala" %% "jsoniter-scala-macros" % "2.36.6" + val jsoniterCore = "com.github.plokhotnyuk.jsoniter-scala" %% "jsoniter-scala-core" % "2.36.7" + val jsoniterMacro = "com.github.plokhotnyuk.jsoniter-scala" %% "jsoniter-scala-macros" % "2.36.7" val playWS = "com.typesafe.play" %% "play-ahc-ws-standalone" % "2.2.11" diff --git a/project/build.properties b/project/build.properties index bbb0b608..c02c575f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.11.2 +sbt.version=1.11.3 diff --git a/version.sbt b/version.sbt index 06b00f3c..e451539c 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "3.1.10" +ThisBuild / version := "3.2.0" \ No newline at end of file