Create Player queue and processor

This commit is contained in:
Álvaro 2024-05-02 18:45:02 +02:00
parent 58a1aacd5a
commit dd529133c1
6 changed files with 106 additions and 15 deletions

View File

@ -8,8 +8,10 @@ defmodule Scrapper.Application do
@impl true @impl true
def start(_type, _args) do def start(_type, _args) do
children = [ children = [
Scrapper.MatchQueue, Scrapper.Queue.MatchQueue,
{Scrapper.MatchBroadway, []} Scrapper.Queue.PlayerQueue,
{Scrapper.Processor.MatchProcessor, []},
{Scrapper.Processor.PlayerProcessor, []}
# Starts a worker by calling: Scrapper.Worker.start_link(arg) # Starts a worker by calling: Scrapper.Worker.start_link(arg)
# {Scrapper.Worker, arg} # {Scrapper.Worker, arg}
] ]

View File

@ -7,12 +7,12 @@ defmodule Scrapper.Data.Api.MatchApi do
iex> Scrapper.Data.MatchApi.get_match_by_id("EUW1_6921743825") iex> Scrapper.Data.MatchApi.get_match_by_id("EUW1_6921743825")
""" """
@spec get_match_by_id(String.t()) :: any() @spec get_match_by_id(String.t()) :: %Scrapper.Data.Api.Model.Match.MatchResponse{}
def get_match_by_id(match_id) do def get_match_by_id(match_id) do
url = String.replace(@match_base_endpoint, "%{matchid}", match_id) url = String.replace(@match_base_endpoint, "%{matchid}", match_id)
api_key = System.get_env("RIOT_API_KEY") api_key = System.get_env("RIOT_API_KEY")
headers = [{"X-Riot-Token", api_key}] headers = [{"X-Riot-Token", api_key}]
response = HTTPoison.get!(url, headers, timeout: 5000) response = HTTPoison.get!(url, headers, timeout: 5000)
case response.status_code do case response.status_code do
200 -> 200 ->
@ -26,7 +26,7 @@ defmodule Scrapper.Data.Api.MatchApi do
end end
end end
@spec get_matches_from_player(String.t()) :: any() @spec get_matches_from_player(String.t()) :: list(String.t()) | integer()
def get_matches_from_player(puuid) do def get_matches_from_player(puuid) do
url = String.replace(@puuid_matches_base_endpoint, "%{puuid}", puuid) url = String.replace(@puuid_matches_base_endpoint, "%{puuid}", puuid)
api_key = System.get_env("RIOT_API_KEY") api_key = System.get_env("RIOT_API_KEY")
@ -37,6 +37,7 @@ defmodule Scrapper.Data.Api.MatchApi do
200 -> 200 ->
# process the response here # process the response here
IO.inspect(response.body) IO.inspect(response.body)
Poison.decode!(response.body)
_ -> _ ->
# handle error responses # handle error responses

View File

@ -1,8 +1,6 @@
defmodule Scrapper.MatchBroadway do defmodule Scrapper.Processor.MatchProcessor do
use Broadway use Broadway
alias Broadway.Message
def start_link(_opts) do def start_link(_opts) do
Broadway.start_link( Broadway.start_link(
__MODULE__, __MODULE__,
@ -20,11 +18,15 @@ defmodule Scrapper.MatchBroadway do
qos: [ qos: [
prefetch_count: 3 prefetch_count: 3
]}, ]},
concurrency: 1 concurrency: 1,
rate_limiting: [
interval: 1000 * 90,
allowed_messages: 5
]
], ],
processors: [ processors: [
default: [ default: [
concurrency: 20 concurrency: 5
] ]
] ]
) )
@ -34,8 +36,19 @@ defmodule Scrapper.MatchBroadway do
def handle_message(_, message = %Broadway.Message{}, _) do def handle_message(_, message = %Broadway.Message{}, _) do
match_id = message.data match_id = message.data
IO.inspect(match_id) IO.inspect(match_id)
match = Scrapper.Data.Api.MatchApi.get_match_by_id(match_id) match = Scrapper.Data.Api.MatchApi.get_match_by_id(match_id)
IO.inspect(match)
message.data match.metadata.participants
|> Enum.each(fn participant ->
Scrapper.Data.Api.MatchApi.get_matches_from_player(participant)
|> Enum.each(fn match_id ->
nil
Scrapper.Queue.MatchQueue.queue_match(match_id)
end)
end)
IO.inspect(match.info.participants)
message
end end
end end

View File

@ -0,0 +1,49 @@
defmodule Scrapper.Processor.PlayerProcessor do
use Broadway
def start_link(_opts) do
Broadway.start_link(
__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: "player",
connection: [
username: "guest",
password: "guest",
host: "localhost"
],
on_failure: :reject,
qos: [
prefetch_count: 3
]},
concurrency: 1,
rate_limiting: [
interval: 1000 * 90,
allowed_messages: 2
]
],
processors: [
default: [
concurrency: 2
]
]
)
end
@impl true
def handle_message(_, message = %Broadway.Message{}, _) do
puuid = message.data
IO.inspect(puuid)
Scrapper.Data.Api.MatchApi.get_matches_from_player(puuid)
|> Enum.each(fn match_id ->
IO.inspect(match_id)
Scrapper.Queue.MatchQueue.queue_match(match_id)
end)
message
end
end

View File

@ -1,14 +1,16 @@
defmodule Scrapper.MatchQueue do defmodule Scrapper.Queue.MatchQueue do
use GenServer use GenServer
@spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()}
def start_link(_opts) do def start_link(_opts) do
GenServer.start_link(__MODULE__, {}, name: __MODULE__) GenServer.start_link(__MODULE__, {}, name: __MODULE__)
end end
@spec init({}) :: {:ok, {AMQP.Channel.t(), AMQP.Connection.t()}}
def init({}) do def init({}) do
{:ok, connection} = AMQP.Connection.open() {:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection) {:ok, channel} = AMQP.Channel.open(connection)
{:ok, {channel, connection}} {:ok, %{:channel => channel, :connection => connection}}
end end
@spec queue_match(String.t()) :: any() @spec queue_match(String.t()) :: any()
@ -16,7 +18,7 @@ defmodule Scrapper.MatchQueue do
GenServer.call(__MODULE__, {:queue_match, match_id}) GenServer.call(__MODULE__, {:queue_match, match_id})
end end
def handle_call({:queue_match, match_id}, from, {channel, _} = state) do def handle_call({:queue_match, match_id}, from, %{:channel => channel} = state) do
AMQP.Basic.publish(channel, "", "match", match_id) AMQP.Basic.publish(channel, "", "match", match_id)
{:reply, nil, state} {:reply, nil, state}
end end

View File

@ -0,0 +1,24 @@
defmodule Scrapper.Queue.PlayerQueue do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, {}, name: __MODULE__)
end
@spec init(any()) :: {:ok, {AMQP.Channel.t(), AMQP.Connection.t()}}
def init(_opts) do
{:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection)
{:ok, {channel, connection}}
end
@spec queue_player(String.t()) :: nil
def queue_player(puuid) do
GenServer.call(__MODULE__, {:queue_player, puuid})
end
def handle_call({:queue_player, puuid}, _from, {channel, _} = state) do
AMQP.Basic.publish(channel, "", "player", puuid)
{:reply, nil, state}
end
end