Compare commits
No commits in common. "4dfae60875d05f51759e2e45203bbde748ca329d" and "3993f97de195018109cff9ccaae0e9bfb5801e48" have entirely different histories.
4dfae60875
...
3993f97de1
@ -11,9 +11,7 @@ defmodule LoLAnalytics.Application do
|
||||
LoLAnalytics.Repo,
|
||||
{DNSCluster, query: Application.get_env(:lol_analytics, :dns_cluster_query) || :ignore},
|
||||
{Phoenix.PubSub, name: LoLAnalytics.PubSub},
|
||||
{Task.Supervisor, name: LoLAnalytics.TaskSupervisor},
|
||||
{LolAnalytics.MatchProcessor.MatchesBroadwayProcessor, []},
|
||||
{LolAnalytics.MatchProcessor.MatchesProducer, []}
|
||||
{Task.Supervisor, name: LoLAnalytics.TaskSupervisor}
|
||||
# Start a worker by calling: LoLAnalytics.Worker.start_link(arg)
|
||||
# {LoLAnalytics.Worker, arg}
|
||||
]
|
||||
|
@ -61,25 +61,10 @@ defmodule LolAnalytics.Dimensions.Match.MatchRepo do
|
||||
Repo.all(MatchSchema)
|
||||
end
|
||||
|
||||
def list_unprocessed_matches(limit, queue \\ 420) do
|
||||
query =
|
||||
from m in MatchSchema,
|
||||
where:
|
||||
(m.fact_champion_picked_item_status == 0 or
|
||||
m.fact_champion_picked_summoner_spell_status == 0 or
|
||||
m.fact_champion_played_game_status == 0) and
|
||||
m.queue_id == ^queue,
|
||||
order_by: [desc: m.updated_at],
|
||||
limit: ^limit
|
||||
|
||||
Repo.all(query)
|
||||
end
|
||||
|
||||
@type process_status :: :not_processed | :processed | :error
|
||||
defp process_status_atom_to_db(:not_processed), do: 0
|
||||
defp process_status_atom_to_db(:enqueued), do: 1
|
||||
defp process_status_atom_to_db(:processed), do: 2
|
||||
defp process_status_atom_to_db(:error), do: 3
|
||||
defp process_status_atom_to_db(:error_match_not_found), do: 4
|
||||
defp process_status_atom_to_db(_), do: raise("Invalid processing status")
|
||||
end
|
||||
|
@ -1,9 +1,8 @@
|
||||
defmodule LolAnalytics.Facts.ChampionPickedItem.FactProcessor do
|
||||
require Logger
|
||||
|
||||
alias LolAnalytics.Dimensions.Match.MatchSchema
|
||||
alias LolAnalytics.Dimensions.Match.MatchRepo
|
||||
alias LolAnalytics.Facts.ChampionPickedItem.Repo
|
||||
require Logger
|
||||
@behaviour LolAnalytics.Facts.FactBehaviour
|
||||
|
||||
@doc """
|
||||
|
||||
@ -22,24 +21,6 @@ defmodule LolAnalytics.Facts.ChampionPickedItem.FactProcessor do
|
||||
end
|
||||
end
|
||||
|
||||
@spec process_match(%MatchSchema{}) :: :ok | {:error, String.t()}
|
||||
def process_match(match) do
|
||||
match_url = "http://192.168.1.55:9000/ranked/#{match.patch_number}/#{match.match_id}.json"
|
||||
|
||||
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <-
|
||||
HTTPoison.get(match_url),
|
||||
{:ok, decoded_match} <- Poison.decode(body, as: %LoLAPI.Model.MatchResponse{}) do
|
||||
process_game_data(decoded_match)
|
||||
MatchRepo.update(match, %{fact_champion_picked_item_status: :processed})
|
||||
:ok
|
||||
else
|
||||
_ ->
|
||||
MatchRepo.update(match, fact_champion_picked_item_status: :error_match_not_found)
|
||||
Logger.error("Could not process data from #{match_url} for ChampionPickedItem")
|
||||
{:error, "Could not process data from #{match_url}"}
|
||||
end
|
||||
end
|
||||
|
||||
defp process_game_data(decoded_match) do
|
||||
participants = decoded_match.info.participants
|
||||
version = extract_game_version(decoded_match)
|
||||
@ -76,6 +57,8 @@ defmodule LolAnalytics.Facts.ChampionPickedItem.FactProcessor do
|
||||
end)
|
||||
end
|
||||
end)
|
||||
|
||||
MatchRepo.update(match, %{fact_champion_picked_item_status: :processed})
|
||||
end
|
||||
|
||||
defp extract_game_version(game_data) do
|
||||
|
@ -1,25 +1,22 @@
|
||||
defmodule LolAnalytics.Facts.ChampionPickedSummonerSpell.FactProcessor do
|
||||
@behaviour LolAnalytics.Facts.FactBehaviour
|
||||
|
||||
require Logger
|
||||
|
||||
alias LolAnalytics.Dimensions.Match.MatchSchema
|
||||
alias LolAnalytics.Dimensions.Match.MatchRepo
|
||||
alias LolAnalytics.Facts.ChampionPickedSummonerSpell
|
||||
|
||||
@spec process_match(%MatchSchema{}) :: :ok | {:error, String.t()}
|
||||
def process_match(match) do
|
||||
match_url = "http://192.168.1.55:9000/ranked/#{match.patch_number}/#{match.match_id}.json"
|
||||
|
||||
@impl true
|
||||
@spec process_game_at_url(String.t()) :: any()
|
||||
def process_game_at_url(url) do
|
||||
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <-
|
||||
HTTPoison.get(match_url),
|
||||
HTTPoison.get(url),
|
||||
{:ok, decoded_match} <- Poison.decode(body, as: %LoLAPI.Model.MatchResponse{}) do
|
||||
process_game_data(decoded_match)
|
||||
MatchRepo.update(match, %{fact_champion_picked_summoner_spell_status: :processed})
|
||||
:ok
|
||||
else
|
||||
_ ->
|
||||
MatchRepo.update(match, fact_champion_picked_summoner_spell_status: :error_match_not_found)
|
||||
Logger.error("Could not process data from #{match_url} for ChampionPickedItem")
|
||||
{:error, "Could not process data from #{match_url}"}
|
||||
Logger.error("Could not process data from #{url} for ChampionPickedSummonerSpell")
|
||||
{:error, "Could not process data from #{url}"}
|
||||
end
|
||||
end
|
||||
|
||||
@ -67,6 +64,8 @@ defmodule LolAnalytics.Facts.ChampionPickedSummonerSpell.FactProcessor do
|
||||
ChampionPickedSummonerSpell.Repo.insert(attrs_spell_2)
|
||||
end
|
||||
end)
|
||||
|
||||
MatchRepo.update(match, %{fact_champion_picked_summoner_spell_status: :processed})
|
||||
end
|
||||
|
||||
defp extract_game_version(game_data) do
|
||||
|
@ -1,24 +1,20 @@
|
||||
defmodule LolAnalytics.Facts.ChampionPlayedGame.FactProcessor do
|
||||
alias LolAnalytics.Dimensions.Match.MatchRepo
|
||||
require Logger
|
||||
|
||||
alias LolAnalytics.Dimensions.Match.MatchSchema
|
||||
alias LolAnalytics.Dimensions.Match.MatchRepo
|
||||
|
||||
@spec process_match(%MatchSchema{}) :: :ok | {:error, String.t()}
|
||||
def process_match(match) do
|
||||
match_url = "http://192.168.1.55:9000/ranked/#{match.patch_number}/#{match.match_id}.json"
|
||||
@behaviour LolAnalytics.Facts.FactBehaviour
|
||||
|
||||
@impl true
|
||||
@spec process_game_at_url(String.t()) :: none()
|
||||
def process_game_at_url(url) do
|
||||
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <-
|
||||
HTTPoison.get(match_url),
|
||||
HTTPoison.get(url),
|
||||
{:ok, decoded_match} <- Poison.decode(body, as: %LoLAPI.Model.MatchResponse{}) do
|
||||
process_game_data(decoded_match)
|
||||
MatchRepo.update(match, %{fact_champion_played_game_status: :processed})
|
||||
:ok
|
||||
else
|
||||
_ ->
|
||||
MatchRepo.update(match, fact_champion_played_game_status: :error_match_not_found)
|
||||
Logger.error("Could not process data from #{match_url} for ChampionPickedItem")
|
||||
{:error, "Could not process data from #{match_url}"}
|
||||
Logger.error("Could not process data from #{url} for ChampionPlayedGame")
|
||||
{:error, "Could not process data from #{url}"}
|
||||
end
|
||||
end
|
||||
|
||||
@ -52,6 +48,8 @@ defmodule LolAnalytics.Facts.ChampionPlayedGame.FactProcessor do
|
||||
LolAnalytics.Facts.ChampionPlayedGame.Repo.insert(attrs)
|
||||
end
|
||||
end)
|
||||
|
||||
MatchRepo.update(match, %{fact_champion_played_game_status: :processed})
|
||||
end
|
||||
|
||||
defp extract_game_version(game_data) do
|
||||
|
@ -0,0 +1,3 @@
|
||||
defmodule LolAnalytics.Facts.FactBehaviour do
|
||||
@callback process_game_at_url(String.t()) :: any()
|
||||
end
|
@ -1,18 +1,34 @@
|
||||
defmodule LolAnalytics.Facts.FactsRunner do
|
||||
alias LolAnalytics.Facts
|
||||
|
||||
def analyze_match(match) do
|
||||
get_facts()
|
||||
|> Enum.each(fn fact_runner ->
|
||||
apply(fact_runner, [match])
|
||||
def analyze_by_patch(patch) do
|
||||
Storage.MatchStorage.S3MatchStorage.stream_files("ranked", patch: patch)
|
||||
|> peach(fn %{key: path} ->
|
||||
get_facts()
|
||||
|> Enum.each(fn fact_runner ->
|
||||
apply(fact_runner, ["http://192.168.1.55:9000/ranked/#{path}"])
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
def analyze_all_matches do
|
||||
Storage.MatchStorage.S3MatchStorage.stream_files("ranked")
|
||||
|> peach(fn %{key: path} ->
|
||||
get_facts()
|
||||
|> Enum.each(fn fact_runner ->
|
||||
apply(fact_runner, ["http://192.168.1.55:9000/ranked/#{path}"])
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
def analyze_match() do
|
||||
end
|
||||
|
||||
def get_facts() do
|
||||
[
|
||||
&Facts.ChampionPickedSummonerSpell.FactProcessor.process_match/1,
|
||||
&Facts.ChampionPlayedGame.FactProcessor.process_match/1,
|
||||
&Facts.ChampionPickedItem.FactProcessor.process_match/1
|
||||
&Facts.ChampionPickedSummonerSpell.FactProcessor.process_game_at_url/1,
|
||||
&Facts.ChampionPlayedGame.FactProcessor.process_game_at_url/1,
|
||||
&Facts.ChampionPickedItem.FactProcessor.process_game_at_url/1
|
||||
]
|
||||
end
|
||||
|
||||
|
@ -1,34 +0,0 @@
|
||||
defmodule LolAnalytics.MatchProcessor.MatchesBroadwayProcessor do
|
||||
alias LolAnalytics.Facts.FactsRunner
|
||||
use Broadway
|
||||
|
||||
def start_link(opts) do
|
||||
Broadway.start_link(__MODULE__,
|
||||
name: __MODULE__,
|
||||
processors: [default: []],
|
||||
producer: [
|
||||
module: {LolAnalytics.MatchProcessor.MatchesProducer, []},
|
||||
rate_limiting: [
|
||||
interval: 1000,
|
||||
allowed_messages: 40
|
||||
]
|
||||
]
|
||||
)
|
||||
end
|
||||
|
||||
@impl Broadway
|
||||
def handle_message(_processor, message, _context) do
|
||||
message.data
|
||||
# build_match_url(message.data.queue_id, message.data.patch_number, message.data.match_id)
|
||||
|> FactsRunner.analyze_match()
|
||||
|
||||
message
|
||||
end
|
||||
|
||||
defp build_match_url(queue, patch_id, match_id) do
|
||||
"http://192.168.1.55:9000/#{queue_to_dir(queue)}/#{patch_id}/#{match_id}.json"
|
||||
end
|
||||
|
||||
defp queue_to_dir(420), do: "ranked"
|
||||
defp queue_to_dir(_), do: "ranked"
|
||||
end
|
@ -1,33 +0,0 @@
|
||||
defmodule LolAnalytics.MatchProcessor.MatchesProducer do
|
||||
use GenStage
|
||||
|
||||
@impl GenStage
|
||||
def init(opts) do
|
||||
{:producer, opts}
|
||||
end
|
||||
|
||||
def start_link(opts) do
|
||||
GenStage.start_link(__MODULE__, :ok)
|
||||
end
|
||||
|
||||
@impl GenStage
|
||||
def handle_demand(demand, state) do
|
||||
matches = query_unprocessed_matches(demand)
|
||||
|
||||
{:noreply, matches, state}
|
||||
end
|
||||
|
||||
defp query_unprocessed_matches(demand) when demand <= 0, do: []
|
||||
|
||||
defp query_unprocessed_matches(demand) do
|
||||
LolAnalytics.Dimensions.Match.MatchRepo.list_unprocessed_matches(demand)
|
||||
|> Enum.map(&broadway_transform/1)
|
||||
end
|
||||
|
||||
defp broadway_transform(match) do
|
||||
%Broadway.Message{
|
||||
data: match,
|
||||
acknowledger: Broadway.NoopAcknowledger.init()
|
||||
}
|
||||
end
|
||||
end
|
26
apps/lol_analytics/lib/lol_analytics/matches_processor.ex
Normal file
26
apps/lol_analytics/lib/lol_analytics/matches_processor.ex
Normal file
@ -0,0 +1,26 @@
|
||||
defmodule LolAnalytics.MatchesProcessor do
|
||||
use GenServer
|
||||
|
||||
def init(init_args) do
|
||||
{:ok, init_args}
|
||||
end
|
||||
|
||||
@doc """
|
||||
iex> LolAnalytics.MatchesProcessor.process_for_patch "14.12.593.5894"
|
||||
"""
|
||||
def process_for_patch(patch) do
|
||||
Task.Supervisor.async(LoLAnalytics.TaskSupervisor, fn ->
|
||||
LolAnalytics.Facts.FactsRunner.analyze_by_patch(patch)
|
||||
end)
|
||||
end
|
||||
|
||||
def process_all_matches() do
|
||||
Task.Supervisor.async(LoLAnalytics.TaskSupervisor, fn ->
|
||||
LolAnalytics.Facts.FactsRunner.analyze_all_matches()
|
||||
end)
|
||||
end
|
||||
|
||||
def get_running_processes() do
|
||||
Task.Supervisor.children(LoLAnalytics.TaskSupervisor)
|
||||
end
|
||||
end
|
@ -44,9 +44,7 @@ defmodule LoLAnalytics.MixProject do
|
||||
{:lol_api, in_umbrella: true},
|
||||
{:storage, in_umbrella: true},
|
||||
{:httpoison, "~> 2.2"},
|
||||
{:poison, "~> 5.0"},
|
||||
{:gen_stage, "~> 1.2.1"},
|
||||
{:broadway, "~> 1.1"}
|
||||
{:poison, "~> 5.0"}
|
||||
]
|
||||
end
|
||||
|
||||
|
4
mix.lock
4
mix.lock
@ -2,7 +2,7 @@
|
||||
"amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"},
|
||||
"amqp_client": {:hex, :amqp_client, "3.12.13", "6fc6a7c681e53fed4cbd3f5bcdda342a2b46976345e460ef85414c63698cfe70", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.13", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "76f41bff0792193f00e0062128db51eb68bcee0eb8236139247a7d1866438d03"},
|
||||
"bandit": {:hex, :bandit, "1.5.0", "3bc864a0da7f013ad3713a7f550c6a6ec0e19b8d8715ec678256a0dc197d5539", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "92d18d9a7228a597e0d4661ef69a874ea82d63ff49c7d801a5c68cb18ebbbd72"},
|
||||
"broadway": {:hex, :broadway, "1.1.0", "8ed3aea01fd6f5640b3e1515b90eca51c4fc1fac15fb954cdcf75dc054ae719c", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "25e315ef1afe823129485d981dcc6d9b221cea30e625fd5439e9b05f44fb60e4"},
|
||||
"broadway": {:hex, :broadway, "1.0.7", "7808f9e3eb6f53ca6d060f0f9d61012dd8feb0d7a82e62d087dd517b9b66fa53", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e76cfb0a7d64176c387b8b1ddbfb023e2ee8a63e92f43664d78e6d5d0b1177c6"},
|
||||
"broadway_rabbitmq": {:hex, :broadway_rabbitmq, "0.8.1", "6d68a480b2e49694e4f3836dcbbf8e621bb97b34e84787a2093d5cc3078a4d87", [:mix], [{:amqp, "~> 1.3 or ~> 2.0 or ~> 3.0", [hex: :amqp, repo: "hexpm", optional: false]}, {:broadway, "~> 1.0", [hex: :broadway, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.5 or ~> 0.4.0 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eebe9b03c9673cbda790430389c47e4ca867f9904418cff1b71a74a59c2a986"},
|
||||
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
|
||||
"certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"},
|
||||
@ -29,7 +29,7 @@
|
||||
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
|
||||
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
|
||||
"mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"},
|
||||
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
|
||||
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
|
||||
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
|
||||
"phoenix": {:hex, :phoenix, "1.7.12", "1cc589e0eab99f593a8aa38ec45f15d25297dd6187ee801c8de8947090b5a9d3", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "d646192fbade9f485b01bc9920c139bfdd19d0f8df3d73fd8eaf2dfbe0d2837c"},
|
||||
"phoenix_ecto": {:hex, :phoenix_ecto, "4.5.1", "6fdbc334ea53620e71655664df6f33f670747b3a7a6c4041cdda3e2c32df6257", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ebe43aa580db129e54408e719fb9659b7f9e0d52b965c5be26cdca416ecead28"},
|
||||
|
Loading…
x
Reference in New Issue
Block a user