Git
イベント
マガジン
技術ブログ
みなさんこんにちは!ワンキャリアでOCIDチームに所属しているエンジニアの吉田(X: @yoshida_baystar )です。 今回はワンキャリアエンジニアのターミナルをご紹介します!
Elastic がバージョン 9.4 をリリースしました。セキュリティ自動化エンジン「Workflows」の正式版リリースを筆頭に、AI によるルール生成・専門知識モジュール(Skills)・クエリ言語 ES|QL の大幅強化など、幅広い領域にわたるアップデートが含まれています。本記事では注目機能を速報でまとめます。 目次 1. Elastic Workflows が正式版(GA)に 2. Elastic AI Agent に専門知識モジュール「Skills」が登場 3. AI による ES|QL 検知ルール自動生成 4. ES|QL クエリ言語の強化 5. ベクトル検索の高速化 6. Kibana の強化 7. Observability:メトリクス・時系列分析の強化 8. セキュリティ:エンティティ管理の深化 まとめ:9.4 の位置付け 参照リンク 1. Elastic Workflows が正式版(GA)に セキュリティ運用の自動化エンジン「Elastic Workflows」が 9.3 の試験公開(Tech Preview)から正式版に昇格しました。 アラートの受信から、ケース作成・担当者割当・Slack 通知・外部システム連携までを YAML または自然言語で定義し、Kibana 内で完結して実行できます。データ移動が不要な点が他の SOAR ツールとの大きな違いです。 主な追加機能: ケース管理ステップが 4 種類 → 25 種類に拡張 :担当者割当・証拠添付・重要度設定・タグ管理など、インシデントの全ライフサイクルに対応 Human-in-the-Loop( waitForInput ) :AI が調査・分類を行い、エスカレーション判断だけをアナリストに委ねる仕組みを正式サポート ワークフローの組み合わせ( workflow.execute ) :マルウェア対応・フィッシング対応などを独立したモジュールとして再利用可能に 制御フロー強化 :switch(多方向分岐)・while(ループ)・loop.break/continue を追加 自然言語でのワークフロー作成 (Tech Preview):攻撃対応シナリオを日本語・英語で説明するだけで YAML を自動生成 最小構成のワークフロー例(YAML): name: Critical アラート対応ワークフロー enabled: true triggers: - type: alert # アラート発火を起点に起動 steps: - name: create_case type: cases.createCase with: owner: securitySolution title: "{{ event.rule.name }} - {{ event.alerts[0].host.name }}" severity: high tags: - auto-triage 📎 参照: Elastic Workflows GA: automation where your security data already lives 2. Elastic AI Agent に専門知識モジュール「Skills」が登場 従来の AI アシスタントは「すべての知識を1つのプロンプトに詰め込む」設計でした。9.4 では、タスクに応じて専門知識モジュール(スキル)をオンデマンドで切り替える「Skills」アーキテクチャが導入されました。 コンテキストウィンドウを知識ではなく実データに使える分、各スキルの回答精度が向上します。スキルを追加しても既存スキルの品質が落ちない設計が特徴です。 9.4 で搭載された 5 つのセキュリティスキル: スキル 役割 Detection Rule Edit 自然言語から ES|QL 検知ルールを生成・編集 Alert Analysis アラートのトリアージ・関連アラート探索・脅威インテル照合 Threat Hunting 仮説駆動型の脅威ハンティング(横移動・C2 テンプレート内蔵) Entity Analytics ユーザー・ホストのリスクスコア・行動パターンのプロファイリング Security ML Jobs 機械学習ジョブの異常をエンティティコンテキストと相関させて調査 このほか、Kibana ダッシュボード作成・ワークフロー YAML 生成・エンティティ関係グラフ作成の 3 つのプラットフォームスキルも追加されました。 実際の使い方の例(自然言語プロンプトを送るだけ): プロンプト例 起動するスキル 「アラート 82a1f を分析して。認証情報窃取キャンペーンと関連してる?」 Alert Analysis 「過去7日間で侵害されたホストからの横移動をハントして」 Threat Hunting 「今週最もリスクの高いユーザーとその要因を教えて」 Entity Analytics 「svc-backup-prod に関連する異常は何?」 Security ML Jobs 📎 参照: One agent, the right skills: Elastic Security 9.4 brings domain expertise on demand to every SOC workflow 3. AI による ES|QL 検知ルール自動生成 攻撃の挙動を自然言語で記述するだけで、本番対応の ES|QL 検知ルールを AI が自動生成します。ルール作成画面から「AI rule creation」を選択するだけで使用できます。 生成されるもの: ES|QL クエリ(構文検証済み) MITRE ATT&CK マッピング 重要度・リスクスコア タグ・スケジュール設定 生成後はチャット形式で反復的に調整でき、有効化前に自環境の実ログでプレビュー確認が可能です。 実際のプロンプト例(公式ブログより): 「Okta で、同一ユーザー&同一 IP から:3 回以上の認証失敗、1 回以上の MFA 失敗、ログイン成功、その後に権限付与またはポリシー更新。この全シーケンスが揃った場合に、認証情報窃取の成功攻撃として検知して」 これだけで、STATS で各段階をカウントして閾値判定する複雑な ES|QL クエリ・MITRE ATT&CK マッピング(T1110.004 Credential Stuffing・T1078 Valid Accounts)・重要度設定までが一括生成されます。 ⚠️ Enterprise ライセンスが必要です。 📎 参照: From plain English to production rule: AI-native Elasticsearch ES|QL detection in Elastic Security 4. ES|QL クエリ言語の強化 Elastic のパイプライン型クエリ言語 ES|QL に 4 つの大きな機能が追加されました。 サブクエリ(Subqueries) :異なるインデックスの複数クエリを 1 文で結合。例えばビジネスデータとサーバーログを時系列でまとめて分析できます。 FROM (FROM ecommerce | EVAL domain = "business" | KEEP ts, domain, summary), (FROM logs | EVAL domain = "ops" | KEEP ts, domain, summary) | SORT ts ロジカルビュー(Logical Views) :複雑な集計ロジックを名前付きで保存し、ダッシュボード・アラート全体で再利用できます。SQL の VIEW に相当する概念で、ビュー定義を更新すれば参照する全ダッシュボードに即時反映されます。 -- ビューを定義(一度だけ) CREATE VIEW high_risk_users AS FROM .alerts-security* | WHERE risk_score > 70 | STATS ... -- 以降、複数のダッシュボードから FROM view_name で参照可能 FROM high_risk_users -- 事前定義したビューを通常のインデックスのように利用 | WHERE @timestamp > NOW() - 24 hours | KEEP user.name, risk_score JSON 関数抽出 :JSON 形式で格納されたフィールドの値を、再インデックスなしに直接クエリで取り出せます。スキーマが不定形なログデータの分析に有効です。 ROW raw = "{ \"user\": { \"name\": \"yamada\", \"id\": 42 }, \"source\": { \"ip\": \"10.0.0.1\" } }" | EVAL user_name = JSON_EXTRACT(raw, "user.name") | EVAL user_id = JSON_EXTRACT(raw, "user.id") | EVAL src_ip = JSON_EXTRACT(raw, "source.ip") | KEEP user_name, user_id, src_ip 実行結果: user_name | user_id | src_ip ----------+---------+----------- yamada | 42 | 10.0.0.1 パスは user.name のドット記法、 groups[0] のブラケット記法のどちらも利用できます。キーに . が含まれる場合は ['event.id'] のブラケット記法が必須です。 マッピング外フィールドへのアクセス ( SET unmapped_fields="load" ) :インデックス定義に漏れていたフィールドも後から遡ってクエリ可能になりました(従来の「無知の崖」問題を解消)。 9.3のバージョン -- 従来:マッピング外のフィールドは結果に含まれない FROM my_unmapped_test | KEEP @timestamp, user, amount, department -- 9.4:SET ディレクティブで未マッピングフィールドも _source から取り出せる SET unmapped_fields="load"; FROM my_unmapped_test | KEEP @timestamp, user, amount, department 実行結果( SET unmapped_fields="load" を指定した場合): @timestamp | user | amount | department ---------------------+--------+--------+------------- 2026-05-06T13:00:00Z | tanaka | 320 | sales 2026-05-06T12:00:00Z | yamada | 150 | engineering "load" のほかに "nullify" (明示的に null を返す)も指定可能です。性能面では _source からの読み出しになるため、マッピング済みフィールドのクエリよりやや遅くなる点に注意が必要です。 📎 参照: What’s new in Elastic 9.4(公式ブログ) ・ ES|QL Logical Views 詳細ブログ ・ Elasticsearch リリースノート 5. ベクトル検索の高速化 AI ワークロード(RAG など)の基盤となるベクトル検索が大幅に強化されました。 DiskBBQ 改善 :フィルター付きクエリのレイテンシを最低 3 倍改善 GPU 加速インデックス作成が GA 昇格 :NVIDIA cuVS 統合により、セルフマネージド環境でインデックス速度が最大 12 倍、force merge が 7 倍高速化 量子化ビット数の選択肢拡大 :1 ビットのみだったところ、2・4・7 ビットが追加。精度とサイズのバランスを調整可能に 量子化ビットの設定例(インデックス作成時): PUT bbq_disk-index { "mappings": { "properties": { "my_vector": { "type": "dense_vector", "dims": 64, "index": true, "index_options": { "type": "bbq_disk", "bits": 2 // 1, 2, 4, 7 から選択(数値が大きいほど高精度・大容量) } } } } } ⚠️ GPU 機能はセルフマネージド環境(NVIDIA GPU 搭載サーバー)限定です。 📎 参照: What’s new in Elastic 9.4(公式ブログ) ・ DiskBBQ アーキテクチャ解説ブログ ・ GPU ベクトルインデックス加速ブログ ・ GPU ベクトルインデックス ドキュメント 6. Kibana の強化 AI によるダッシュボード作成 :自然言語でダッシュボードのレイアウトや可視化を説明すると、AI が反復的に構築します。 Dashboards as Code :ダッシュボードを API・JSON で定義・管理できます。Git による差分管理や CI/CD パイプラインでの自動デプロイが可能になりました。 API 経由でダッシュボードを作成する例: POST kbn:/api/dashboards { "title": "My first API dashboard", "panels": [ { "grid": { "x": 0, "y": 0, "w": 24, "h": 10 }, "type": "vis", "config": { "type": "xy", "title": "Total log entries over time", "layers": [ { "type": "line", "data_source": { "type": "esql", "query": "FROM kibana_sample_data_logs | STATS count = COUNT() BY BUCKET(@timestamp, 75, ?_tstart, ?_tend)" } } ] } } ] } Agent Builder の拡張 :Skills・Attachments・Connectors・Plugins の 4 プリミティブが追加。長い会話でのコンテキスト管理(クエリ結果オフロード・コンパクション・要約)が改善され、マルチターン対話のコスト効率も向上しました。 📎 参照: What’s new in Elastic 9.4(公式ブログ) ・ Elastic AI Agent ドキュメント 7. Observability:メトリクス・時系列分析の強化 TSDB パフォーマンス改善 :時系列メトリクスの保存効率が Prometheus 比 2.6 倍向上、クエリ速度は Prometheus/Mimir 比で最大 25 倍高速化。 ES|QL 時系列関数の追加 :rate(変化率)・changes(変化量)・cumulative(累積)・trange(時間範囲)・clamp(値クリップ)が利用可能に。 PromQL のネイティブサポート :Prometheus メトリクスを Elasticsearch に直接取り込み、Kibana 上で PromQL を実行できます。PromQL の結果を ES|QL パイプラインに流し込むことも可能です。 PromQL → ES|QL を組み合わせる例: # PromQL の結果を ES|QL コマンドにパイプして集計 PROMQL index=k8s step=1h bytes=(max by (cluster) (network.bytes_in)) | STATS max_bytes=MAX(bytes) BY cluster | SORT cluster 📎 参照: What’s new in Elastic 9.4(公式ブログ) ・ Elasticsearch リリースノート(TSDB・PromQL 詳細) 8. セキュリティ:エンティティ管理の深化 エンティティ統合(Entity Resolution) :Okta・Microsoft Entra など複数の IdP に存在するアカウントを 1 つの従業員レコードに名寄せ。「同一人物が複数 ID を持っている」問題を解消し、横断的なリスク評価が可能になります。 統合のイメージ: 従来(バラバラに存在) 9.4(1つのレコードに統合) ───────────────────────── ──────────────────────────── Okta: t.yamada@co.jp ┐ Entra: tyamada ├──→ 山田 剛(Takeshi Yamada) GitHub: yamada-t │ └─ リスクスコアを統合計算 Slack: takeshi.yamada ┘ └─ クロスシステムのアラートを集約 動的ウォッチリスト(Dynamic Watchlists) :組織的な重要度(例:役員・特権アカウント)をリスクスコアへの乗数として反映できます。 エンティティ起点のハンティングリード :アラート起点の受動的調査から、リスクの高い行動パターンを事前に洗い出す能動的なハンティングへのシフトを支援します。 エンドポイント調査の拡張 : Runscript Response Action + Script Library:感染端末へのリモートスクリプト実行とライブラリ管理 Memory Dump Response Action(Linux 対応):マルウェア解析用のメモリ取得 Osquery 強化・Jumplists テーブル拡張:最近使用したファイル履歴のフォレンジック照会 📎 参照: What’s new in Elastic 9.4(公式ブログ) ・ Elastic Security リリースノート ・ Entity Analytics AI Skill ブログ まとめ:9.4 の位置付け カテゴリ 主なアップデート セキュリティ自動化 Workflows GA・25 種ケースステップ・Human-in-the-Loop AI エージェント Skills 5 種・プラットフォームスキル 3 種・Agent Builder 拡張 検知エンジニアリング AI による ES|QL ルール自動生成・MITRE 自動マッピング クエリ言語 ES|QL サブクエリ・ビュー・JSON 抽出・マッピング外フィールド ベクトル検索 3〜12 倍高速化・量子化ビット選択肢拡大 可観測性 TSDB 効率化・PromQL ネイティブ対応・時系列集計関数 エンティティ管理 名寄せ・動的ウォッチリスト・能動的ハンティングリード 9.4 はセキュリティチームだけでなく、インフラ・SRE・AI 開発チームにも広く関わるリリースです。特に Workflows と Skills の組み合わせによる「Agentic SOC(自律型 SOC)」への移行は、今後の Elastic Security の方向性を示すものといえます。 参照リンク 記事・ドキュメント URL What’s new in Elastic 9.4(公式メインブログ) https://www.elastic.co/blog/whats-new-elastic-9-4-0 Elastic Workflows GA(9.4) https://www.elastic.co/security-labs/elastic-workflows-ga-9-4 Workflows 入門(9.3 Tech Preview) https://www.elastic.co/security-labs/security-automation-with-elastic-workflows Skills in Elastic Security 9.4 https://www.elastic.co/security-labs/skills-elastic-security-9-4 AI による ES|QL 検知ルール生成 https://www.elastic.co/security-labs/ai-esql-detection-rule-creation Entity Analytics AI Skill ブログ https://www.elastic.co/security-labs/entity-analytics-agent-builder ES|QL Logical Views 詳細ブログ https://www.elastic.co/search-labs/blog/elasticsearch-esql-logical-views DiskBBQ アーキテクチャ解説ブログ https://www.elastic.co/search-labs/blog/diskbbq-elasticsearch-introduction GPU ベクトルインデックス加速ブログ https://www.elastic.co/search-labs/blog/elasticsearch-gpu-accelerated-vector-indexing-nvidia GPU ベクトルインデックス ドキュメント https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/gpu-vector-indexing Elastic AI Agent ドキュメント https://www.elastic.co/docs/explore-analyze/ai-features/elastic-agent-builder Workflows ドキュメント https://www.elastic.co/docs/explore-analyze/workflows Elastic Workflow Library(GitHub) https://github.com/elastic/workflows Elasticsearch リリースノート(9.4) https://www.elastic.co/docs/release-notes/elasticsearch Elastic Security リリースノート(9.4) https://www.elastic.co/docs/release-notes/security Elastic リリースノート(全製品) https://www.elastic.co/docs/release-notes The post Elastic 9.4 リリースまとめ:AI・自動化・クエリ言語が一斉強化 first appeared on Elastic Portal .
本記事は 2026 年 5 月 7 日に公開された “ Announcing aggregations on Amazon ElastiCache ” を翻訳したものです。 Amazon ElastiCache が集計クエリをサポートするようになり、単一のクエリでキャッシュ内のデータを直接フィルタリング、グループ化、変換、集計することがより簡単になりました。集計クエリを使用することで、テラバイト規模のデータに対してマイクロ秒単位の低レイテンシーで、最新の書き込みが反映された結果を返す、リアルタイムなアプリケーション体験を構築できます。データがすでに存在する場所で、アプリケーションが要求する速度で分析を行うことができ、別途分析レイヤーを用意する必要はありません。集計機能を使用すると、ElastiCache に保存済みのデータに対して、リアルタイムのリーダーボード、ファセット検索によるカタログブラウジング、運用レポート、探索的な分析クエリなどを構築できます。 ElastiCache 内のメモリ上で集約処理を直接実行することで、アーキテクチャの複雑さを軽減し、応答時間を改善できます。集約クエリはサーバー側で計算を実行するため、アプリケーションはデータをその場で分析し、最終的なサマリーのみを返すことができます。例えば、1 つの集約クエリで、製品カタログをフィルタリングして特定のカテゴリのデータを取得し、結果をブランドごとにグループ化し、各ブランドの平均評価を計算し、パフォーマンス上位 10 件のみを返すといったことが可能です。これらのクエリは、GROUPBY、REDUCE、APPLY、FILTER、SORTBY、LIMIT などのステージをパイプラインに連結することで構築でき、各ステージの出力が次のステージへの入力になります。これらのステージを任意の順序で組み合わせ、繰り返し使用することで、1 つのコマンドで複数ステップの分析ワークフローを構築できます。集約はプライマリ上で Read-after-Write 整合性 (書き込み後読み取り整合性) を提供するため、結果には最新の書き込みが反映され、クライアントコードを変更することなくシャード間でスケールします。本投稿では、集約によって実現できるユースケースを紹介し、ElastiCache for Valkey を使ってファセットブラウジングエンジンを構築しながら、その仕組みを解説します。 これらの集約機能は、ElastiCache version 9.0 for Valkey で、全文検索、完全一致検索、範囲検索、ベクトル検索機能 ( Amazon ElastiCache での全文検索、完全一致検索、範囲検索、ハイブリッド検索 を参照) と並んで利用可能です。ElastiCache version 9.0 for Valkey ではまた、個々のフィールドに対するきめ細やかな TTL 制御を可能にするハッシュフィールドの有効期限機能や、最大 40% 向上したパイプラインスループットも導入されています。リリースの詳細については、 Amazon ElastiCache 向け Valkey 9.0 のお知らせ をご覧ください。 集約の使用が適している場面 アプリケーションは、リアルタイムでフィルタリング、グループ化、集計する必要があるデータを ElastiCache に保存することがよくあります。例えば、E コマースプラットフォームでは、カタログ全体にわたってカテゴリ別の平均評価や商品数を計算します。ストリーミングサービスでは、ジャンル別の総視聴数、平均視聴時間、再生数上位の作品を算出し、トレンドフィードやレコメンデーションランキングを構築します。金融サービスでは、ユーザーや時間枠ごとに取引をグループ化して合計を計算し、しきい値違反を検出し、コンプライアンスレポートを生成します。アプリケーションは、ユーザー向けのエクスペリエンス、ライブ分析、運用レポートを支えるためにこのデータをリアルタイムに分析する必要があり、古いデータや遅い結果はユーザーエクスペリエンスを低下させます。デバッグや単発の調査のためにアドホックなクエリが必要な開発者は、別の分析レイヤーを設定したり、データをアプリケーションレイヤーにエクスポートしたりすることなく、ライブデータに対して直接集計を実行することもできます。集計は、次の 3 つの一般的なユースケースをサポートします。 カタログフィルタリングのためのファセット検索: E コマースプラットフォームでは、買い物客がブラウジングする際に、各フィルタの組み合わせに一致する商品数を表示します。買い物客がカテゴリや価格帯を選択すると、UI は残りのすべてのフィルタ値のカウントを即座に更新します。1 つの集計クエリで、一致するカタログをブランド、色、または評価ごとにグループ化し、グループごとのカウントを返すため、アプリケーションは事前計算や古いカウントのキャッシュなしに正確なファセット数を表示できます。集計はインメモリで直接実行されるため、数百万の商品にまたがる場合でも、これらのカウントはマイクロ秒のレイテンシで返されます。 リアルタイムのトレンドとランキング: ゲームプラットフォーム、ストリーミングサービス、マーケットプレイスでは、ライブのエンゲージメント指標に基づいてトレンドコンテンツや上位ランカーを表示します。従来、これにはスケジュールに従ってランキングを再計算するバックグラウンドジョブが必要で、データの陳腐化を招いていました。あるいは、大量の結果セットに対するアプリケーション側でのソートが必要で、レイテンシーが増加していました。単一の集計クエリで、コンテンツをカテゴリーごとにグループ化し、総視聴回数、エンゲージメントスコア、プレイヤーランクを計算して、上位の結果を返すことができます。インデックスは書き込み時に同期的に更新されるため、集計クエリはポーリング、キャッシュ無効化、定期的な再計算を行うことなく最新のデータを反映します。 運用レポートと分析: ElastiCache を高速アクセスのために利用するアプリケーションでは、同じデータに対する運用分析やレポートが必要になることがよくあります。例えば、セッションストアではデバイスごとの平均セッション時間を計算し、E コマースプラットフォームではステータスやフルフィルメント段階ごとの注文量を計算します。Aggregations は、別途の分析用クラスターをプロビジョニングしたり ETL パイプラインを維持したりすることなく、スケジュールに基づいて、またはオンデマンドで、これらのクエリをインメモリデータに対して直接実行します。 ElastiCache を使ったファセット検索とリアルタイム分析の構築 これらの機能を組み合わせて実演するために、メディアストリーミングプラットフォームである AnyOrganization 向けに、ファセットブラウジングと分析エンジンを構築します。AnyOrganization はコンテンツカタログを ElastiCache にハッシュキーとして保存しており、各映画タイトルにはジャンル、言語、スタジオ、評価、リアルタイムの視聴回数といったメタデータが含まれています。以下のコードでは、このデータに対して 3 つのクエリパターンを構築します。ファセットフィルタリング、ライブトレンドアイテム、そしてスタジオレベルのエンゲージメントレポートです。 前提条件 この記事の例では、Python と valkey-py クライアントライブラリを使用します。手順を実行するには、以下が必要です (所要時間の目安: 30 分): AWS アカウント および AWS Command Line Interface (AWS CLI) ElastiCache レプリケーショングループを作成する権限を持つ AWS IAM ロール Amazon ElastiCache クラスターと同じ VPC 内にある Amazon EC2 インスタンス (または Amazon ElastiCache に接続可能な 任意のアプリケーション) Python 3.9 以降、および valkey-py バージョン 6.1.1 以降 (pip install valkey) この投稿の完全なサンプルコードは、 Amazon ElastiCache samples GitHub リポジトリで入手できます。 git clone https://github.com/aws-samples/amazon-elasticache-samples.git cd amazon-elasticache-samples/blogs/aggregations-blog ElastiCache for Valkey クラスターのセットアップ 集計機能を利用するための ElastiCache クラスターは、 AWS Management Console または AWS CLI で作成できます。集計機能は ElastiCache version 9.0 for Valkey 以降で利用可能です。以下の例では CLI を使用しています。 aws elasticache create-replication-group \ --replication-group-id AnyOrganization-cache \ --replication-group-description "AnyOrganization Valkey cluster" \ --engine valkey \ --engine-version 9.0 \ --transit-encryption-enabled \ --cache-node-type cache.r7g.large \ --num-node-groups 2 \ --replicas-per-node-group 1 \ --multi-az-enabled \ --automatic-failover-enabled # --transit-encryption-enabled が設定されている場合、Python クライアント接続に # ssl=True を追加します: # client = valkey.ValkeyCluster(..., ssl=True) データへのインデックス作成 ElastiCache に保存されているデータに対して、 catalog_index というインデックスを作成します。 Genre 、 language 、 studio は、ファセットフィルタリング用の完全一致タグとしてインデックス化されます。 Release_year 、 rating 、 views_24h は、範囲フィルタやソート用の数値フィールドとしてインデックス化されます。タイトルは、キーワード、プレフィックス、あいまい一致をサポートする全文検索可能なフィールドとしてインデックス化されます。 以下のコードは、valkey-py の search モジュールを使用して Valkey Search コマンドを構築し送信します。各 Python メソッド呼び出しは、ネットワーク越しに送信される Valkey コマンドに直接対応しています。例えば、 client.ft("catalog_index").create_index(...) は FT.CREATE コマンドを送信し、 client.ft("catalog_index").aggregate(req) は FT.AGGREGATE コマンドを送信します。各コードブロックの横に、対応する Valkey コマンドを示しています。 import valkey from valkey.commands.search.field import TextField, TagField, NumericField from valkey.commands.search.indexDefinition import IndexDefinition, IndexType # : Insert your ElastiCache cluster's discovery endpoint VALKEY_CLUSTER_ENDPOINT = "placeholder_cluster.cnxa6h.clustercfg.use1.cache.amazonaws.com" client = valkey.ValkeyCluster( host=VALKEY_CLUSTER_ENDPOINT, port=6379, decode_responses=True, ssl=True) client.ft("catalog_index").create_index(fields=[ TextField("title"), TagField("genre"), TagField("language"), TagField("studio"), NumericField("release_year"), NumericField("rating"), NumericField("views_24h")], definition=IndexDefinition(prefix=["title:"],index_type=IndexType.HASH)) 同等の Valkey コマンド: FT.CREATE catalog_index ON HASH PREFIX 1 title: SCHEMA title TEXT genre TAG language TAG studio TAG release_year NUMERIC rating NUMERIC views_24h NUMERIC ElastiCache for Valkey ストアにカタログデータを投入します。本記事では、ElastiCache GitHub リポジトリのサンプルデータを使用しますが、他のデータソースを使用することもできます。 import csv import urllib.request import io import time response = urllib.request.urlopen( "https://raw.githubusercontent.com/aws-samples/amazon-elasticache-samples/main/blogs/aggregations-blog/catalog_data.csv") reader = csv.DictReader(io.TextIOWrapper(response)) count = 0 for row in reader: key = row.pop("id") client.hset(key, mapping=row) count += 1 print(f"Loaded {count} records") インデックスはデータをロードする前でも後でも作成できます。プレフィックスに一致するキーが既に存在する場合、Valkey Search はそれらを自動的にインデックスにバックフィルします。 ファセットフィルター 以下の集計は、ユーザーが選択したフィルターを受け取り、一致する結果をジャンル、言語、評価、公開年でグループ化し、各グループのタイトル数を返します。これにより、UI は結果と並べてファセットの件数を表示できます。 from valkey.commands.search.aggregation import AggregateRequest, Desc from valkey.commands.search import reducers def get_facet_counts(filters): # Build query string from user-selected filters clauses = [] if "genre" in filters: clauses.append(f"@genre:{{{filters['genre']}}}") if "language" in filters: clauses.append(f"@language:{{{filters['language']}}}") if "min_rating" in filters: clauses.append(f"@rating:[{filters['min_rating']} + inf]") query = " ".join(clauses) if clauses else "@rating:[-inf + inf]" # Run an aggregation for each facet dimension dimensions = ["genre", "language", "rating"] facets = {} for dim in dimensions: req = AggregateRequest(query) \ .load(f"@{dim}") \ .group_by(f"@{dim}", reducers.count().alias("count")) facets[dim] = client.ft("catalog_index").aggregate(req).rows return facets # Example: user filters for dramas in english, get counts for each dimension facets = get_facet_counts({"genre": "drama", "language": "english"}) # Example output: # {'genre': [{'genre': 'drama', 'count': '6'}], # 'language': [{'language': 'english', 'count': '6'}], # 'rating': [{'rating': '4', 'count': '4'}, # {'rating': '5', 'count': '2'}]} 同等の Valkey コマンド (1 つのファセットディメンション、英語のドラマでフィルタリングする場合): FT.AGGREGATE catalog_index "@genre:{drama} @language:{english}" LOAD 1 @genre GROUPBY 1 @genre REDUCE COUNT 0 AS count リアルタイムで急上昇中のアイテム 以下のコードは、ジャンルごとのトップトレンドタイトルを取得します。これは、ユーザーがコンテンツを視聴するとリアルタイムで更新される views_24h フィールドに対する集計によって実現されています。 def get_trending_by_genre(limit=10): # Get the highest view count per genre # sorted by most popular genre first req = AggregateRequest("@rating:[-inf + inf]") \ .load("@genre", "@views_24h") \ .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \ .sort_by(Desc("@max_views"), max=limit) return client.ft("catalog_index").aggregate(req).rows trending_by_genre = get_trending_by_genre() # Example output: # [{'genre': 'action', 'max_views': '4500'}, # {'genre': 'comedy', 'max_views': '3800'}, # {'genre': 'thriller', 'max_views': '3600'}, # {'genre': 'sci-fi', 'max_views': '3400'}, # {'genre': 'drama', 'max_views': '3200'}, # {'genre': 'animation', 'max_views': '3100'}, # {'genre': 'romance', 'max_views': '2800'}, # {'genre': 'horror', 'max_views': '2600'}, # {'genre': 'documentary', 'max_views': '1900'}] 同等の Valkey コマンド (1 つのファセットディメンションで、英語のドラマをフィルタリングする場合): FT.AGGREGATE catalog_index "@rating:[-inf + inf]" LOAD 2 @genre @views_24h GROUPBY 1 @genre REDUCE MAX 1 @views_24h AS max_views SORTBY 2 @max_views DESC MAX 10 リアルタイムトレンドアイテム 以下のコードは、ジャンルごとにトレンドの上位タイトルを取得するもので、ユーザーがコンテンツを視聴するとリアルタイムに更新される views_24h フィールドに対する集計によって実現されています。 def get_trending_by_genre(limit=10): # Get the highest view count per genre # sorted by most popular genre first req = AggregateRequest("@rating:[-inf + inf]") \ .load("@genre", "@views_24h") \ .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \ .sort_by(Desc("@max_views"), max=limit) return client.ft("catalog_index").aggregate(req).rows trending_by_genre = get_trending_by_genre() # Example output: # [{'genre': 'action', 'max_views': '4500'}, # {'genre': 'comedy', 'max_views': '3800'}, # {'genre': 'thriller', 'max_views': '3600'}, # {'genre': 'sci-fi', 'max_views': '3400'}, # {'genre': 'drama', 'max_views': '3200'}, # {'genre': 'animation', 'max_views': '3100'}, # {'genre': 'romance', 'max_views': '2800'}, # {'genre': 'horror', 'max_views': '2600'}, # {'genre': 'documentary', 'max_views': '1900'}] 同等の Valkey コマンド: FT.AGGREGATE catalog_index "@rating:[-inf + inf]" LOAD 2 @genre @views_24h GROUPBY 1 @genre REDUCE MAX 1 @views_24h AS max_views SORTBY 2 @max_views DESC MAX 10 オンデマンドエンゲージメントレポート AnyOrganization は、制作スタジオ別のコンテンツパフォーマンスを測定するために、日次のレポートジョブを実行しています。次のコードは、同じインデックスに対する集計を使用して、タイトル数、平均評価、総エンゲージメントなどのスタジオレベルのメトリクスを計算します。 def get_studio_report(): # Studio performance: title count, average rating, total 24h views req = AggregateRequest("@rating:[-inf + inf]") \ .load("@studio", "@rating", "@views_24h") \ .group_by("@studio", reducers.count().alias("title_count"), reducers.avg("@rating").alias("avg_rating"), reducers.sum("@views_24h").alias("total_views")) \ .sort_by(Desc("@total_views")) return client.ft("catalog_index").aggregate(req).rows studio_report = get_studio_report() # Example output: # [{'studio': 'StreamFlix Originals', 'title_count': '18', # 'avg_rating': '4.3333333333', 'total_views': '46200'}, # {'studio': 'Summit Pictures', 'title_count': '13', # 'avg_rating': '3.8461538462', 'total_views': '30000'}, # {'studio': 'Crimson Studios', 'title_count': '11', # 'avg_rating': '4.4545454545', 'total_views': '23100'}, # {'studio': 'Emerald Films', 'title_count': '8', # 'avg_rating': '4', 'total_views': '13600'}] 同等の Valkey コマンド: FT.AGGREGATE catalog_index "@rating:[-inf + inf]" LOAD 3 @studio @rating @views_24h GROUPBY 1 @studio REDUCE COUNT 0 AS title_count REDUCE AVG 1 @rating AS avg_rating REDUCE SUM 1 @views_24h AS total_views SORTBY 2 @total_views DESC ベストプラクティス 集計クエリのレイテンシーとスループットを改善するには、各パイプラインステージを通過するドキュメント数を減らすために早い段階でフィルタリングします。マッチする範囲が広いクエリは、パイプラインに入るキーの数が増え、初期スキャンと初期ステージのコストが増加します。例えば、上記のファセットフィルタリングの例では、ユーザーのアクティブなフィルターをクエリ文字列で渡すことで、マッチするドキュメントのみが GROUPBY ステージに入ります。また、しきい値を満たさないグループを削除するために GROUPBY ステージの後に FILTER を追加することもできます。例えば、結果を返す前にタイトル数が 5 未満のジャンルを除外する場合などです。さらに、上位の結果のみが必要な場合は、 SORTBY に MAX を追加することで、トレンドアイテムの例で示すように、エンジンはワーキングセット全体をソートするのではなく、上位の結果のみを追跡します。 LOAD を使うと、インデックスに含まれていないフィールドであっても、基となるハッシュデータから直接フィールドを取得して集約パイプラインに取り込むことができます。例えば、ハッシュに actors フィールドを保存しているがインデックス化していない場合、クエリ実行時に LOAD で読み込み、それを使ってグループ化やソートを行うことができます。ただし、 LOAD は一致するドキュメントごとに基となるキーから生データを取得する必要があるため、結果セットのサイズに応じてレイテンシーが増加します。このオーバーヘッドを避けるため、ロードするフィールドの数は最小限に抑えてください。 クリーンアップ このウォークスルーのために ElastiCache クラスターを作成し、不要になった場合は、今後の課金を避けるために、次の AWS CLI コマンドを使用してクラスターを削除してください。 aws elasticache delete-replication-group --replication-group-id AnyOrganization-cache はじめに この記事では、ElastiCache のアグリゲーションについて、ファセットフィルタリング、ライブトレンドのレコメンデーション、オンデマンドのエンゲージメントレポートを取り上げ、これらすべてを単一の Valkey Search インデックス上に構築する方法を解説しました。アグリゲーションは、すべての商用 AWS リージョン、AWS GovCloud (US) リージョン、および中国リージョンにおいて、ElastiCache for Valkey バージョン 9.0 を実行するノードベースのクラスターで追加費用なしでご利用いただけます。Valkey は、Redis に代わる最も寛容なライセンスのオープンソースかつベンダー中立な選択肢であり、ElastiCache で推奨されるエンジンです。使い始めるには、AWS Management Console、AWS SDK、または AWS CLI を使用して、Valkey 9.0 以降の新しいクラスターを作成するか、 既存のクラスターをアップグレード してください。詳細については、 ElastiCache のドキュメント をご覧ください。質問やフィードバックがある場合は、 AWS re:Post for ElastiCache をご覧ください。 著者について Chaitanya Nuthalapati Chaitanya は AWS インメモリデータベースサービスのシニアテクニカルプロダクトマネージャーで、Amazon ElastiCache for Valkey に注力しています。以前は、生成 AI、機械学習、グラフネットワークを活用したソリューションを構築していました。仕事以外の時間では、Chaitanya は趣味を集めるのに忙しく、現在はテニス、スケートボード、パドルボードを楽しんでいます。 Karthik Subbarao Karthik は Amazon ElastiCache のシニアソフトウェアエンジニアであり、オープンソースの Valkey プロジェクトに積極的に貢献しています。分散システム、データベース、Rust、そして全般的にソフトウェア開発/テクノロジーを通じたイノベーションに情熱を注いでいます。 Allen Samuels Allen は AWS のプリンシパルエンジニアです。分散型で高性能なシステムに情熱を注いでいます。世界中を旅したりデュプリケートブリッジをプレイしたりしていない時は、カリフォルニア州サンノゼで過ごしています。 Siva Subramaniam Siva は AWS のシニアソリューションアーキテクトで、技術リーダーシップとデータベースアーキテクチャにおいて 20 年の経験を持っています。お客様が AWS の専用データベースを使用して移行とイノベーションを実現できるよう支援しています。仕事以外では、Siva はクリケット、農作業、そして妻から料理を学ぶことを楽しんでいます。 ご指導いただいた Ian Childress 氏、そしてプロジェクト全体を通じて実装面で貢献いただいた Miles Song 氏に特に感謝いたします。 本記事は、 Announcing aggregations on Amazon ElastiCache を翻訳したものです。翻訳は Solutions Architect の Hayato Tsutsumi が担当しました。