Update queues, fix patch_selector
Some checks failed
ci / docker (push) Failing after 4m14s

This commit is contained in:
Álvaro 2024-06-22 21:54:53 +02:00
parent 078682fd48
commit d290a2c457
6 changed files with 114 additions and 102 deletions

View File

@ -4,11 +4,16 @@ defmodule LolAnalyticsWeb.PatchSelector do
def mount(socket) do def mount(socket) do
patches = patches =
LolAnalytics.Dimensions.Patch.PatchRepo.list_patches() LolAnalytics.Dimensions.Patch.PatchRepo.list_patches()
|> Enum.map(fn patch ->
%{patch_number: String.split(patch.patch_number, ".") |> Enum.take(2) |> Enum.join(".")}
end)
|> MapSet.new()
|> Enum.to_list()
|> Enum.sort(fn %{patch_number: p1}, %{patch_number: p2} -> |> Enum.sort(fn %{patch_number: p1}, %{patch_number: p2} ->
[_, minor_1] = String.split(p1, ".") |> Enum.map(&String.to_integer/1) [major_1, minor_1] = String.split(p1, ".") |> Enum.map(&String.to_integer/1)
[_, minor_2] = String.split(p2, ".") |> Enum.map(&String.to_integer/1) [major_2, minor_2] = String.split(p2, ".") |> Enum.map(&String.to_integer/1)
p1 > p2 && minor_1 > minor_2 major_1 > major_2 || (major_1 == major_2 && minor_1 > minor_2)
end) end)
patch_numbers = Enum.map(patches, & &1.patch_number) patch_numbers = Enum.map(patches, & &1.patch_number)

View File

@ -10,8 +10,8 @@ defmodule Scrapper.Application do
children = [ children = [
Scrapper.Queue.MatchQueue, Scrapper.Queue.MatchQueue,
Scrapper.Queue.PlayerQueue, Scrapper.Queue.PlayerQueue,
{Scrapper.Processor.MatchProcessor, []}, {Scrapper.Consumer.MatchConsumer, []},
{Scrapper.Processor.PlayerProcessor, []} {Scrapper.Consumer.PlayerConsumer, []}
# Starts a worker by calling: Scrapper.Worker.start_link(arg) # Starts a worker by calling: Scrapper.Worker.start_link(arg)
# {Scrapper.Worker, arg} # {Scrapper.Worker, arg}
] ]

View File

@ -0,0 +1,96 @@
defmodule Scrapper.Consumer.MatchConsumer do
require Logger
use Broadway
def start_link(_opts) do
Broadway.start_link(
__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: "match",
connection: [
username: "guest",
password: "guest",
host: "localhost"
],
on_failure: :reject_and_requeue,
qos: [
prefetch_count: 1
]},
concurrency: 1,
rate_limiting: [
interval: 300,
allowed_messages: 1
]
],
processors: [
default: [
concurrency: 1
]
]
)
end
@impl true
def handle_message(_, message = %Broadway.Message{}, _) do
match_id = message.data
resp = LoLAPI.MatchApi.get_match_by_id(match_id)
process_resp(resp, match_id)
message
end
def process_resp({:ok, raw_match}, match_id) do
Task.start_link(fn ->
decoded_match = Poison.decode!(raw_match, as: %LoLAPI.Model.MatchResponse{})
match_url =
case decoded_match.info.queueId do
420 ->
Logger.info("#{match_id} #{decoded_match.info.gameVersion}")
Storage.MatchStorage.S3MatchStorage.store_match(
match_id,
raw_match,
"ranked",
"#{decoded_match.info.gameVersion}"
)
LolAnalytics.Dimensions.Match.MatchRepo.get_or_create(%{
match_id: decoded_match.metadata.matchId,
patch_number: decoded_match.info.gameVersion,
queue_id: 420
})
_queue_id ->
Storage.MatchStorage.S3MatchStorage.store_match(match_id, raw_match, "matches")
end
match = LolAnalytics.Match.MatchRepo.get_match(match_id)
case match do
nil ->
LolAnalytics.Match.MatchRepo.insert_match(match_id)
_ ->
LolAnalytics.Match.MatchRepo.update_match(match, %{
:processed => true,
:match_url => match_url
})
end
decoded_match.metadata.participants
# |> Enum.shuffle()
# |> Enum.take(2)
|> Enum.each(fn participant_puuid ->
Scrapper.Queue.PlayerQueue.enqueue_puuid(participant_puuid)
end)
end)
end
def process_resp({:err, _code}, _match_id) do
end
end

View File

@ -1,4 +1,4 @@
defmodule Scrapper.Processor.PlayerProcessor do defmodule Scrapper.Consumer.PlayerConsumer do
use Broadway use Broadway
def start_link(_opts) do def start_link(_opts) do
@ -20,7 +20,7 @@ defmodule Scrapper.Processor.PlayerProcessor do
]}, ]},
concurrency: 1, concurrency: 1,
rate_limiting: [ rate_limiting: [
interval: 1000 * 10, interval: 6700,
allowed_messages: 1 allowed_messages: 1
] ]
], ],

View File

@ -1,94 +0,0 @@
defmodule Scrapper.Processor.MatchProcessor do
require Logger
use Broadway
def start_link(_opts) do
Broadway.start_link(
__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: "match",
connection: [
username: "guest",
password: "guest",
host: "localhost"
],
on_failure: :reject_and_requeue,
qos: [
prefetch_count: 1
]},
concurrency: 1,
rate_limiting: [
interval: 333 * 1,
allowed_messages: 1
]
],
processors: [
default: [
concurrency: 1
]
]
)
end
@impl true
def handle_message(_, message = %Broadway.Message{}, _) do
match_id = message.data
resp = LoLAPI.MatchApi.get_match_by_id(match_id)
process_resp(resp, match_id)
message
end
def process_resp({:ok, raw_match}, match_id) do
decoded_match = Poison.decode!(raw_match, as: %LoLAPI.Model.MatchResponse{})
match_url =
case decoded_match.info.queueId do
420 ->
Logger.info("#{match_id} #{decoded_match.info.gameVersion}")
Storage.MatchStorage.S3MatchStorage.store_match(
match_id,
raw_match,
"ranked",
"#{decoded_match.info.gameVersion}"
)
LolAnalytics.Dimensions.Match.MatchRepo.get_or_create(%{
match_id: decoded_match.metadata.matchId,
patch_number: decoded_match.info.gameVersion,
queue_id: 420
})
_queue_id ->
Storage.MatchStorage.S3MatchStorage.store_match(match_id, raw_match, "matches")
end
match = LolAnalytics.Match.MatchRepo.get_match(match_id)
case match do
nil ->
LolAnalytics.Match.MatchRepo.insert_match(match_id)
_ ->
LolAnalytics.Match.MatchRepo.update_match(match, %{
:processed => true,
:match_url => match_url
})
end
decoded_match.metadata.participants
|> Enum.shuffle()
|> Enum.take(2)
|> Enum.each(fn participant_puuid ->
Scrapper.Queue.PlayerQueue.enqueue_puuid(participant_puuid)
end)
end
def process_resp({:err, _code}, _match_id) do
end
end

View File

@ -12,7 +12,12 @@ defmodule Scrapper.Queue.PlayerQueue do
def init(_opts) do def init(_opts) do
{:ok, connection} = AMQP.Connection.open() {:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection) {:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "player", durable: true)
AMQP.Queue.declare(channel, "player",
durable: true,
arguments: [{"x-max-length", :long, 1000}]
)
{:ok, {channel, connection}} {:ok, {channel, connection}}
end end