Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process event creation through SQS queue #195

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
782491e
add events to shoryuken config
wendelfabianchinsamy Nov 27, 2024
279b0e8
add events to shoryuken config
wendelfabianchinsamy Nov 28, 2024
4d4169d
include Queueable
wendelfabianchinsamy Nov 28, 2024
36cb616
rework specs
wendelfabianchinsamy Nov 28, 2024
3e0b150
creator EventImportWorker
wendelfabianchinsamy Nov 29, 2024
7a96452
move event import processing to a new Event model
wendelfabianchinsamy Nov 29, 2024
a06893a
add event import worker specs
wendelfabianchinsamy Nov 29, 2024
9717922
add localstack setup in order to run and test workflows locally
wendelfabianchinsamy Dec 3, 2024
c8ec789
refactor
wendelfabianchinsamy Dec 13, 2024
8897492
merge main
wendelfabianchinsamy Jan 6, 2025
4d9a864
fix specs
wendelfabianchinsamy Jan 6, 2025
13176bb
fix specs...this sucked
wendelfabianchinsamy Jan 8, 2025
33df62f
add env variable back
wendelfabianchinsamy Jan 9, 2025
3a40932
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
67a3bf3
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
a090d8d
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
8ef6ef2
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
8646e50
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
151466f
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
8cb4334
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
3c130a5
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
16fc129
add logging for debugging
wendelfabianchinsamy Jan 10, 2025
a03d7f8
add logging
wendelfabianchinsamy Jan 10, 2025
4cb73ac
add spec back
wendelfabianchinsamy Jan 10, 2025
89b433b
fix logging issue with spec
wendelfabianchinsamy Jan 10, 2025
92a8d57
hardcode subj_id
wendelfabianchinsamy Jan 10, 2025
08e71fc
comment out expect line temporarily
wendelfabianchinsamy Jan 10, 2025
284ef48
exclude test expection
wendelfabianchinsamy Jan 10, 2025
de7a987
rework spec for staging testing
wendelfabianchinsamy Jan 10, 2025
b343811
log event service response
wendelfabianchinsamy Jan 13, 2025
e001fac
rework specs temporarily
wendelfabianchinsamy Jan 13, 2025
9259487
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
32edcfd
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
3ae0afd
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
502bee8
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
7d4aac3
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
5e60deb
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
a38927b
add logging for debugging
wendelfabianchinsamy Jan 13, 2025
693c22e
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
6e1a02a
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
ef8fc54
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
1eaebcb
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
9133119
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
4ebd413
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
e7876ab
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
074bef1
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
82c5180
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
8250c18
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
9c44d1b
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
9b8829d
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
b2a0f55
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
83e2ca1
add logging for debugging
wendelfabianchinsamy Jan 14, 2025
06934e6
remove debug logging since funders has been tested successfully
wendelfabianchinsamy Jan 15, 2025
a9150ba
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
963eec8
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
970d307
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
efaa2b7
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
08e26bd
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
48c41b9
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
a0169e3
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
9d12874
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
8f0d2eb
add logging for debugging
wendelfabianchinsamy Jan 15, 2025
68c7668
add logging for debugging
wendelfabianchinsamy Jan 16, 2025
fb93250
revert change as test was successful
wendelfabianchinsamy Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 23 additions & 36 deletions app/models/affiliation_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class AffiliationIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -102,43 +104,28 @@ def self.push_item(item)

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
19 changes: 9 additions & 10 deletions app/models/concerns/importable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ def import_from_api
def parse_record(sqs_msg: nil, data: nil)
id = "https://doi.org/#{data['id']}"
response = get_datacite_json(id)
related_identifiers = Array.wrap(response.fetch("relatedIdentifiers",
nil)).select do |r|
["DOI", "URL"].include?(r["relatedIdentifierType"])

related_identifiers = Array.wrap(
response.fetch("relatedIdentifiers", nil)).select do |r|
["DOI", "URL"].include?(r["relatedIdentifierType"])
end

if related_identifiers.any? { |r| r["relatedIdentifierType"] == "DOI" }
Expand All @@ -240,10 +241,15 @@ def parse_record(sqs_msg: nil, data: nil)
RelatedUrl.push_item(item)
end

Rails.logger.info("[Event Import Worker]: Funding references #{response.fetch("fundingReferences", [])}")

funding_references = Array.wrap(response.fetch("fundingReferences",
nil)).select do |f|
f.fetch("funderIdentifierType", nil) == "Crossref Funder ID"
end

Rails.logger.info("[Event Import Worker]: Funding references count #{funding_references.length}")

if funding_references.present?
item = {
"doi" => data["id"],
Expand Down Expand Up @@ -304,13 +310,6 @@ def parse_record(sqs_msg: nil, data: nil)
OrcidAffiliation.push_item(item)
end

Rails.logger.info "[Event Data] #{related_identifiers.length} related_identifiers found for DOI #{data['id']}" if related_identifiers.present?
Rails.logger.info "[Event Data] #{name_identifiers.length} name_identifiers found for DOI #{data['id']}" if name_identifiers.present?
Rails.logger.info "[Event Data] #{affiliation_identifiers.length} affiliation_identifiers found for DOI #{data['id']}" if affiliation_identifiers.present?
Rails.logger.info "[Event Data] #{orcid_affiliation.length} orcid_affiliations found for DOI #{data['id']}" if affiliation_identifiers.present?
Rails.logger.info "[Event Data] #{funding_references.length} funding_references found for DOI #{data['id']}" if funding_references.present?
Rails.logger.info "No events found for DOI #{data['id']}" if related_identifiers.blank? && name_identifiers.blank? && funding_references.blank? && affiliation_identifiers.blank?

related_identifiers + name_identifiers + funding_references + affiliation_identifiers + orcid_affiliation
end

Expand Down
40 changes: 40 additions & 0 deletions app/models/concerns/queueable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Queueable
extend ActiveSupport::Concern

require "aws-sdk-sqs"

class_methods do
def send_event_import_message(data)
send_message(data, shoryuken_class: "EventImportWorker", queue_name: "events")
end

private

def send_message(body, options = {})
sqs = get_sqs_client
queue_name_prefix = ENV["SQS_PREFIX"].present? ? ENV["SQS_PREFIX"] : Rails.env
queue_url = sqs.get_queue_url(queue_name: "#{queue_name_prefix}_#{options[:queue_name]}").queue_url

options = {
queue_url: queue_url,
message_attributes: {
"shoryuken_class" => {
string_value: options[:shoryuken_class],
data_type: "String"
},
},
message_body: body.to_json,
}

sqs.send_message(options)
end

def get_sqs_client()
if Rails.env.development?
Aws::SQS::Client.new(endpoint: ENV["AWS_ENDPOINT"])
else
Aws::SQS::Client.new
end
end
end
end
59 changes: 23 additions & 36 deletions app/models/funder_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class FunderIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -92,43 +94,28 @@ def self.push_item(item)

# there can be one or more funder_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data Query API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
55 changes: 21 additions & 34 deletions app/models/name_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class NameIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -113,43 +115,28 @@ def self.push_item(item)

# there can be one or more name_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")
send_event_import_message(data)

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."

# send to Profiles service, which then pushes to ORCID
if ENV["STAFF_PROFILES_ADMIN_TOKEN"].present?
Expand Down
66 changes: 28 additions & 38 deletions app/models/orcid_affiliation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class OrcidAffiliation < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -51,8 +53,7 @@ def push_data(result, _options = {})

def self.push_item(item)
attributes = item.fetch("attributes", {})
related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers",
nil))
related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers", nil))
skip_doi = related_identifiers.any? do |related_identifier|
["IsIdenticalTo", "IsPartOf", "IsPreviousVersionOf",
"IsVersionOf"].include?(related_identifier["relatedIdentifierType"])
Expand Down Expand Up @@ -105,45 +106,34 @@ def self.push_item(item)
ssum
end

Rails.logger.info("[Event Import Worker]: send orcid aff push_items = #{push_items.inspect}")

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

Rails.logger.info("[Event Import Worker]: send orcid aff to events")
Rails.logger.info("[Event Import Worker]: orcid data = #{data.inspect}")
send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

total_push_items += push_items
Expand Down
Loading
Loading