From d290a2c4578af77ab5e0edcf6bffc4f4462ecf6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Sat, 22 Jun 2024 21:54:53 +0200 Subject: [PATCH] Update queues, fix patch_selector --- .../components/patch_selector.ex | 11 ++- apps/scrapper/lib/scrapper/application.ex | 4 +- .../lib/scrapper/consumer/match_consumer.ex | 96 +++++++++++++++++++ .../player_consumer.ex} | 4 +- .../lib/scrapper/processor/match_processor.ex | 94 ------------------ .../lib/scrapper/queue/player_queue.ex | 7 +- 6 files changed, 114 insertions(+), 102 deletions(-) create mode 100644 apps/scrapper/lib/scrapper/consumer/match_consumer.ex rename apps/scrapper/lib/scrapper/{processor/player_processor.ex => consumer/player_consumer.ex} (95%) delete mode 100644 apps/scrapper/lib/scrapper/processor/match_processor.ex diff --git a/apps/lol_analytics_web/lib/lol_analytics_web/components/patch_selector.ex b/apps/lol_analytics_web/lib/lol_analytics_web/components/patch_selector.ex index 566528a..986601d 100644 --- a/apps/lol_analytics_web/lib/lol_analytics_web/components/patch_selector.ex +++ b/apps/lol_analytics_web/lib/lol_analytics_web/components/patch_selector.ex @@ -4,11 +4,16 @@ defmodule LolAnalyticsWeb.PatchSelector do def mount(socket) do patches = LolAnalytics.Dimensions.Patch.PatchRepo.list_patches() + |> Enum.map(fn patch -> + %{patch_number: String.split(patch.patch_number, ".") |> Enum.take(2) |> Enum.join(".")} + end) + |> MapSet.new() + |> Enum.to_list() |> Enum.sort(fn %{patch_number: p1}, %{patch_number: p2} -> - [_, minor_1] = String.split(p1, ".") |> Enum.map(&String.to_integer/1) - [_, minor_2] = String.split(p2, ".") |> Enum.map(&String.to_integer/1) + [major_1, minor_1] = String.split(p1, ".") |> Enum.map(&String.to_integer/1) + [major_2, minor_2] = String.split(p2, ".") |> Enum.map(&String.to_integer/1) - p1 > p2 && minor_1 > minor_2 + major_1 > major_2 || (major_1 == major_2 && minor_1 > minor_2) end) patch_numbers = Enum.map(patches, & &1.patch_number) diff --git a/apps/scrapper/lib/scrapper/application.ex b/apps/scrapper/lib/scrapper/application.ex index d0c4b57..e50cb25 100644 --- a/apps/scrapper/lib/scrapper/application.ex +++ b/apps/scrapper/lib/scrapper/application.ex @@ -10,8 +10,8 @@ defmodule Scrapper.Application do children = [ Scrapper.Queue.MatchQueue, Scrapper.Queue.PlayerQueue, - {Scrapper.Processor.MatchProcessor, []}, - {Scrapper.Processor.PlayerProcessor, []} + {Scrapper.Consumer.MatchConsumer, []}, + {Scrapper.Consumer.PlayerConsumer, []} # Starts a worker by calling: Scrapper.Worker.start_link(arg) # {Scrapper.Worker, arg} ] diff --git a/apps/scrapper/lib/scrapper/consumer/match_consumer.ex b/apps/scrapper/lib/scrapper/consumer/match_consumer.ex new file mode 100644 index 0000000..8911d3e --- /dev/null +++ b/apps/scrapper/lib/scrapper/consumer/match_consumer.ex @@ -0,0 +1,96 @@ +defmodule Scrapper.Consumer.MatchConsumer do + require Logger + use Broadway + + def start_link(_opts) do + Broadway.start_link( + __MODULE__, + name: __MODULE__, + producer: [ + module: + {BroadwayRabbitMQ.Producer, + queue: "match", + connection: [ + username: "guest", + password: "guest", + host: "localhost" + ], + on_failure: :reject_and_requeue, + qos: [ + prefetch_count: 1 + ]}, + concurrency: 1, + rate_limiting: [ + interval: 300, + allowed_messages: 1 + ] + ], + processors: [ + default: [ + concurrency: 1 + ] + ] + ) + end + + @impl true + def handle_message(_, message = %Broadway.Message{}, _) do + match_id = message.data + + resp = LoLAPI.MatchApi.get_match_by_id(match_id) + process_resp(resp, match_id) + + message + end + + def process_resp({:ok, raw_match}, match_id) do + Task.start_link(fn -> + decoded_match = Poison.decode!(raw_match, as: %LoLAPI.Model.MatchResponse{}) + + match_url = + case decoded_match.info.queueId do + 420 -> + Logger.info("#{match_id} #{decoded_match.info.gameVersion}") + + Storage.MatchStorage.S3MatchStorage.store_match( + match_id, + raw_match, + "ranked", + "#{decoded_match.info.gameVersion}" + ) + + LolAnalytics.Dimensions.Match.MatchRepo.get_or_create(%{ + match_id: decoded_match.metadata.matchId, + patch_number: decoded_match.info.gameVersion, + queue_id: 420 + }) + + _queue_id -> + Storage.MatchStorage.S3MatchStorage.store_match(match_id, raw_match, "matches") + end + + match = LolAnalytics.Match.MatchRepo.get_match(match_id) + + case match do + nil -> + LolAnalytics.Match.MatchRepo.insert_match(match_id) + + _ -> + LolAnalytics.Match.MatchRepo.update_match(match, %{ + :processed => true, + :match_url => match_url + }) + end + + decoded_match.metadata.participants + # |> Enum.shuffle() + # |> Enum.take(2) + |> Enum.each(fn participant_puuid -> + Scrapper.Queue.PlayerQueue.enqueue_puuid(participant_puuid) + end) + end) + end + + def process_resp({:err, _code}, _match_id) do + end +end diff --git a/apps/scrapper/lib/scrapper/processor/player_processor.ex b/apps/scrapper/lib/scrapper/consumer/player_consumer.ex similarity index 95% rename from apps/scrapper/lib/scrapper/processor/player_processor.ex rename to apps/scrapper/lib/scrapper/consumer/player_consumer.ex index cc70e3d..ab6be65 100644 --- a/apps/scrapper/lib/scrapper/processor/player_processor.ex +++ b/apps/scrapper/lib/scrapper/consumer/player_consumer.ex @@ -1,4 +1,4 @@ -defmodule Scrapper.Processor.PlayerProcessor do +defmodule Scrapper.Consumer.PlayerConsumer do use Broadway def start_link(_opts) do @@ -20,7 +20,7 @@ defmodule Scrapper.Processor.PlayerProcessor do ]}, concurrency: 1, rate_limiting: [ - interval: 1000 * 10, + interval: 6700, allowed_messages: 1 ] ], diff --git a/apps/scrapper/lib/scrapper/processor/match_processor.ex b/apps/scrapper/lib/scrapper/processor/match_processor.ex deleted file mode 100644 index 9e29135..0000000 --- a/apps/scrapper/lib/scrapper/processor/match_processor.ex +++ /dev/null @@ -1,94 +0,0 @@ -defmodule Scrapper.Processor.MatchProcessor do - require Logger - use Broadway - - def start_link(_opts) do - Broadway.start_link( - __MODULE__, - name: __MODULE__, - producer: [ - module: - {BroadwayRabbitMQ.Producer, - queue: "match", - connection: [ - username: "guest", - password: "guest", - host: "localhost" - ], - on_failure: :reject_and_requeue, - qos: [ - prefetch_count: 1 - ]}, - concurrency: 1, - rate_limiting: [ - interval: 333 * 1, - allowed_messages: 1 - ] - ], - processors: [ - default: [ - concurrency: 1 - ] - ] - ) - end - - @impl true - def handle_message(_, message = %Broadway.Message{}, _) do - match_id = message.data - - resp = LoLAPI.MatchApi.get_match_by_id(match_id) - process_resp(resp, match_id) - - message - end - - def process_resp({:ok, raw_match}, match_id) do - decoded_match = Poison.decode!(raw_match, as: %LoLAPI.Model.MatchResponse{}) - - match_url = - case decoded_match.info.queueId do - 420 -> - Logger.info("#{match_id} #{decoded_match.info.gameVersion}") - - Storage.MatchStorage.S3MatchStorage.store_match( - match_id, - raw_match, - "ranked", - "#{decoded_match.info.gameVersion}" - ) - - LolAnalytics.Dimensions.Match.MatchRepo.get_or_create(%{ - match_id: decoded_match.metadata.matchId, - patch_number: decoded_match.info.gameVersion, - queue_id: 420 - }) - - _queue_id -> - Storage.MatchStorage.S3MatchStorage.store_match(match_id, raw_match, "matches") - end - - match = LolAnalytics.Match.MatchRepo.get_match(match_id) - - case match do - nil -> - LolAnalytics.Match.MatchRepo.insert_match(match_id) - - _ -> - LolAnalytics.Match.MatchRepo.update_match(match, %{ - :processed => true, - :match_url => match_url - }) - end - - decoded_match.metadata.participants - |> Enum.shuffle() - |> Enum.take(2) - |> Enum.each(fn participant_puuid -> - Scrapper.Queue.PlayerQueue.enqueue_puuid(participant_puuid) - end) - end - - def process_resp({:err, _code}, _match_id) do - end -end diff --git a/apps/scrapper/lib/scrapper/queue/player_queue.ex b/apps/scrapper/lib/scrapper/queue/player_queue.ex index c4b2b38..370e72b 100644 --- a/apps/scrapper/lib/scrapper/queue/player_queue.ex +++ b/apps/scrapper/lib/scrapper/queue/player_queue.ex @@ -12,7 +12,12 @@ defmodule Scrapper.Queue.PlayerQueue do def init(_opts) do {:ok, connection} = AMQP.Connection.open() {:ok, channel} = AMQP.Channel.open(connection) - AMQP.Queue.declare(channel, "player", durable: true) + + AMQP.Queue.declare(channel, "player", + durable: true, + arguments: [{"x-max-length", :long, 1000}] + ) + {:ok, {channel, connection}} end