From dd529133c17e168087d670f0c08ac27f74ed8e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro?= Date: Thu, 2 May 2024 18:45:02 +0200 Subject: [PATCH] Create Player queue and processor --- apps/scrapper/lib/scrapper/application.ex | 6 ++- .../lib/scrapper/data/api/match_api.ex | 7 +-- .../match_processor.ex} | 27 +++++++--- .../scrapper/processor/player_processor.ex | 49 +++++++++++++++++++ .../{MatchQueue.ex => queue/match_queue.ex} | 8 +-- .../lib/scrapper/queue/player_queue.ex | 24 +++++++++ 6 files changed, 106 insertions(+), 15 deletions(-) rename apps/scrapper/lib/scrapper/{MatchBroadway.ex => processor/match_processor.ex} (58%) create mode 100644 apps/scrapper/lib/scrapper/processor/player_processor.ex rename apps/scrapper/lib/scrapper/{MatchQueue.ex => queue/match_queue.ex} (63%) create mode 100644 apps/scrapper/lib/scrapper/queue/player_queue.ex diff --git a/apps/scrapper/lib/scrapper/application.ex b/apps/scrapper/lib/scrapper/application.ex index 654bc58..d0c4b57 100644 --- a/apps/scrapper/lib/scrapper/application.ex +++ b/apps/scrapper/lib/scrapper/application.ex @@ -8,8 +8,10 @@ defmodule Scrapper.Application do @impl true def start(_type, _args) do children = [ - Scrapper.MatchQueue, - {Scrapper.MatchBroadway, []} + Scrapper.Queue.MatchQueue, + Scrapper.Queue.PlayerQueue, + {Scrapper.Processor.MatchProcessor, []}, + {Scrapper.Processor.PlayerProcessor, []} # Starts a worker by calling: Scrapper.Worker.start_link(arg) # {Scrapper.Worker, arg} ] diff --git a/apps/scrapper/lib/scrapper/data/api/match_api.ex b/apps/scrapper/lib/scrapper/data/api/match_api.ex index bed1019..1f6e2ab 100644 --- a/apps/scrapper/lib/scrapper/data/api/match_api.ex +++ b/apps/scrapper/lib/scrapper/data/api/match_api.ex @@ -7,12 +7,12 @@ defmodule Scrapper.Data.Api.MatchApi do iex> Scrapper.Data.MatchApi.get_match_by_id("EUW1_6921743825") """ - @spec get_match_by_id(String.t()) :: any() + @spec get_match_by_id(String.t()) :: %Scrapper.Data.Api.Model.Match.MatchResponse{} def get_match_by_id(match_id) do url = String.replace(@match_base_endpoint, "%{matchid}", match_id) api_key = System.get_env("RIOT_API_KEY") headers = [{"X-Riot-Token", api_key}] - response = HTTPoison.get!(url, headers, timeout: 5000) + response = HTTPoison.get!(url, headers, timeout: 5000) case response.status_code do 200 -> @@ -26,7 +26,7 @@ defmodule Scrapper.Data.Api.MatchApi do end end - @spec get_matches_from_player(String.t()) :: any() + @spec get_matches_from_player(String.t()) :: list(String.t()) | integer() def get_matches_from_player(puuid) do url = String.replace(@puuid_matches_base_endpoint, "%{puuid}", puuid) api_key = System.get_env("RIOT_API_KEY") @@ -37,6 +37,7 @@ defmodule Scrapper.Data.Api.MatchApi do 200 -> # process the response here IO.inspect(response.body) + Poison.decode!(response.body) _ -> # handle error responses diff --git a/apps/scrapper/lib/scrapper/MatchBroadway.ex b/apps/scrapper/lib/scrapper/processor/match_processor.ex similarity index 58% rename from apps/scrapper/lib/scrapper/MatchBroadway.ex rename to apps/scrapper/lib/scrapper/processor/match_processor.ex index 7bcca38..527b324 100644 --- a/apps/scrapper/lib/scrapper/MatchBroadway.ex +++ b/apps/scrapper/lib/scrapper/processor/match_processor.ex @@ -1,8 +1,6 @@ -defmodule Scrapper.MatchBroadway do +defmodule Scrapper.Processor.MatchProcessor do use Broadway - alias Broadway.Message - def start_link(_opts) do Broadway.start_link( __MODULE__, @@ -20,11 +18,15 @@ defmodule Scrapper.MatchBroadway do qos: [ prefetch_count: 3 ]}, - concurrency: 1 + concurrency: 1, + rate_limiting: [ + interval: 1000 * 90, + allowed_messages: 5 + ] ], processors: [ default: [ - concurrency: 20 + concurrency: 5 ] ] ) @@ -34,8 +36,19 @@ defmodule Scrapper.MatchBroadway do def handle_message(_, message = %Broadway.Message{}, _) do match_id = message.data IO.inspect(match_id) + match = Scrapper.Data.Api.MatchApi.get_match_by_id(match_id) - IO.inspect(match) - message.data + + match.metadata.participants + |> Enum.each(fn participant -> + Scrapper.Data.Api.MatchApi.get_matches_from_player(participant) + |> Enum.each(fn match_id -> + nil + Scrapper.Queue.MatchQueue.queue_match(match_id) + end) + end) + + IO.inspect(match.info.participants) + message end end diff --git a/apps/scrapper/lib/scrapper/processor/player_processor.ex b/apps/scrapper/lib/scrapper/processor/player_processor.ex new file mode 100644 index 0000000..fc0dfe5 --- /dev/null +++ b/apps/scrapper/lib/scrapper/processor/player_processor.ex @@ -0,0 +1,49 @@ +defmodule Scrapper.Processor.PlayerProcessor do + use Broadway + + def start_link(_opts) do + Broadway.start_link( + __MODULE__, + name: __MODULE__, + producer: [ + module: + {BroadwayRabbitMQ.Producer, + queue: "player", + connection: [ + username: "guest", + password: "guest", + host: "localhost" + ], + on_failure: :reject, + qos: [ + prefetch_count: 3 + ]}, + concurrency: 1, + rate_limiting: [ + interval: 1000 * 90, + allowed_messages: 2 + ] + ], + processors: [ + default: [ + concurrency: 2 + ] + ] + ) + end + + @impl true + def handle_message(_, message = %Broadway.Message{}, _) do + puuid = message.data + + IO.inspect(puuid) + + Scrapper.Data.Api.MatchApi.get_matches_from_player(puuid) + |> Enum.each(fn match_id -> + IO.inspect(match_id) + Scrapper.Queue.MatchQueue.queue_match(match_id) + end) + + message + end +end diff --git a/apps/scrapper/lib/scrapper/MatchQueue.ex b/apps/scrapper/lib/scrapper/queue/match_queue.ex similarity index 63% rename from apps/scrapper/lib/scrapper/MatchQueue.ex rename to apps/scrapper/lib/scrapper/queue/match_queue.ex index 922e768..29ab45b 100644 --- a/apps/scrapper/lib/scrapper/MatchQueue.ex +++ b/apps/scrapper/lib/scrapper/queue/match_queue.ex @@ -1,14 +1,16 @@ -defmodule Scrapper.MatchQueue do +defmodule Scrapper.Queue.MatchQueue do use GenServer + @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()} def start_link(_opts) do GenServer.start_link(__MODULE__, {}, name: __MODULE__) end + @spec init({}) :: {:ok, {AMQP.Channel.t(), AMQP.Connection.t()}} def init({}) do {:ok, connection} = AMQP.Connection.open() {:ok, channel} = AMQP.Channel.open(connection) - {:ok, {channel, connection}} + {:ok, %{:channel => channel, :connection => connection}} end @spec queue_match(String.t()) :: any() @@ -16,7 +18,7 @@ defmodule Scrapper.MatchQueue do GenServer.call(__MODULE__, {:queue_match, match_id}) end - def handle_call({:queue_match, match_id}, from, {channel, _} = state) do + def handle_call({:queue_match, match_id}, from, %{:channel => channel} = state) do AMQP.Basic.publish(channel, "", "match", match_id) {:reply, nil, state} end diff --git a/apps/scrapper/lib/scrapper/queue/player_queue.ex b/apps/scrapper/lib/scrapper/queue/player_queue.ex new file mode 100644 index 0000000..18086bc --- /dev/null +++ b/apps/scrapper/lib/scrapper/queue/player_queue.ex @@ -0,0 +1,24 @@ +defmodule Scrapper.Queue.PlayerQueue do + use GenServer + + def start_link(_opts) do + GenServer.start_link(__MODULE__, {}, name: __MODULE__) + end + + @spec init(any()) :: {:ok, {AMQP.Channel.t(), AMQP.Connection.t()}} + def init(_opts) do + {:ok, connection} = AMQP.Connection.open() + {:ok, channel} = AMQP.Channel.open(connection) + {:ok, {channel, connection}} + end + + @spec queue_player(String.t()) :: nil + def queue_player(puuid) do + GenServer.call(__MODULE__, {:queue_player, puuid}) + end + + def handle_call({:queue_player, puuid}, _from, {channel, _} = state) do + AMQP.Basic.publish(channel, "", "player", puuid) + {:reply, nil, state} + end +end