diff --git a/app/models/affiliation_identifier.rb b/app/models/affiliation_identifier.rb index 58e813a6..59a8aac7 100644 --- a/app/models/affiliation_identifier.rb +++ b/app/models/affiliation_identifier.rb @@ -1,6 +1,8 @@ class AffiliationIdentifier < Base LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze + include Queueable + def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month @@ -102,43 +104,28 @@ def self.push_item(item) # there can be one or more affiliation_identifier per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } - - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") - - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - Rails.logger.error data.inspect - end - end + }, + } + + send_event_import_message(data) + + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." end push_items.length diff --git a/app/models/concerns/importable.rb b/app/models/concerns/importable.rb index 0be551db..d0f1c53e 100644 --- a/app/models/concerns/importable.rb +++ b/app/models/concerns/importable.rb @@ -217,9 +217,10 @@ def import_from_api def parse_record(sqs_msg: nil, data: nil) id = "https://doi.org/#{data['id']}" response = get_datacite_json(id) - related_identifiers = Array.wrap(response.fetch("relatedIdentifiers", - nil)).select do |r| - ["DOI", "URL"].include?(r["relatedIdentifierType"]) + + related_identifiers = Array.wrap( + response.fetch("relatedIdentifiers", nil)).select do |r| + ["DOI", "URL"].include?(r["relatedIdentifierType"]) end if related_identifiers.any? { |r| r["relatedIdentifierType"] == "DOI" } @@ -240,10 +241,15 @@ def parse_record(sqs_msg: nil, data: nil) RelatedUrl.push_item(item) end + Rails.logger.info("[Event Import Worker]: Funding references #{response.fetch("fundingReferences", [])}") + funding_references = Array.wrap(response.fetch("fundingReferences", nil)).select do |f| f.fetch("funderIdentifierType", nil) == "Crossref Funder ID" end + + Rails.logger.info("[Event Import Worker]: Funding references count #{funding_references.length}") + if funding_references.present? item = { "doi" => data["id"], @@ -304,13 +310,6 @@ def parse_record(sqs_msg: nil, data: nil) OrcidAffiliation.push_item(item) end - Rails.logger.info "[Event Data] #{related_identifiers.length} related_identifiers found for DOI #{data['id']}" if related_identifiers.present? - Rails.logger.info "[Event Data] #{name_identifiers.length} name_identifiers found for DOI #{data['id']}" if name_identifiers.present? - Rails.logger.info "[Event Data] #{affiliation_identifiers.length} affiliation_identifiers found for DOI #{data['id']}" if affiliation_identifiers.present? - Rails.logger.info "[Event Data] #{orcid_affiliation.length} orcid_affiliations found for DOI #{data['id']}" if affiliation_identifiers.present? - Rails.logger.info "[Event Data] #{funding_references.length} funding_references found for DOI #{data['id']}" if funding_references.present? - Rails.logger.info "No events found for DOI #{data['id']}" if related_identifiers.blank? && name_identifiers.blank? && funding_references.blank? && affiliation_identifiers.blank? - related_identifiers + name_identifiers + funding_references + affiliation_identifiers + orcid_affiliation end diff --git a/app/models/concerns/queueable.rb b/app/models/concerns/queueable.rb new file mode 100644 index 00000000..bffbca41 --- /dev/null +++ b/app/models/concerns/queueable.rb @@ -0,0 +1,40 @@ +module Queueable + extend ActiveSupport::Concern + + require "aws-sdk-sqs" + + class_methods do + def send_event_import_message(data) + send_message(data, shoryuken_class: "EventImportWorker", queue_name: "events") + end + + private + + def send_message(body, options = {}) + sqs = get_sqs_client + queue_name_prefix = ENV["SQS_PREFIX"].present? ? ENV["SQS_PREFIX"] : Rails.env + queue_url = sqs.get_queue_url(queue_name: "#{queue_name_prefix}_#{options[:queue_name]}").queue_url + + options = { + queue_url: queue_url, + message_attributes: { + "shoryuken_class" => { + string_value: options[:shoryuken_class], + data_type: "String" + }, + }, + message_body: body.to_json, + } + + sqs.send_message(options) + end + + def get_sqs_client() + if Rails.env.development? + Aws::SQS::Client.new(endpoint: ENV["AWS_ENDPOINT"]) + else + Aws::SQS::Client.new + end + end + end +end diff --git a/app/models/funder_identifier.rb b/app/models/funder_identifier.rb index 7c884f6d..a4747bf4 100644 --- a/app/models/funder_identifier.rb +++ b/app/models/funder_identifier.rb @@ -1,6 +1,8 @@ class FunderIdentifier < Base LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze + include Queueable + def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month @@ -92,43 +94,28 @@ def self.push_item(item) # there can be one or more funder_identifier per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data Query API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } - - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") - - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - Rails.logger.error data.inspect - end - end + }, + } + + send_event_import_message(data) + + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." end push_items.length diff --git a/app/models/name_identifier.rb b/app/models/name_identifier.rb index 385518ce..0038e324 100644 --- a/app/models/name_identifier.rb +++ b/app/models/name_identifier.rb @@ -1,6 +1,8 @@ class NameIdentifier < Base LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze + include Queueable + def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month @@ -113,43 +115,28 @@ def self.push_item(item) # there can be one or more name_identifier per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } + }, + } - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") + send_event_import_message(data) - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - Rails.logger.error data.inspect - end - end + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." # send to Profiles service, which then pushes to ORCID if ENV["STAFF_PROFILES_ADMIN_TOKEN"].present? diff --git a/app/models/orcid_affiliation.rb b/app/models/orcid_affiliation.rb index 82ac800d..4e503f6a 100644 --- a/app/models/orcid_affiliation.rb +++ b/app/models/orcid_affiliation.rb @@ -1,6 +1,8 @@ class OrcidAffiliation < Base LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze + include Queueable + def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month @@ -51,8 +53,7 @@ def push_data(result, _options = {}) def self.push_item(item) attributes = item.fetch("attributes", {}) - related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers", - nil)) + related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers", nil)) skip_doi = related_identifiers.any? do |related_identifier| ["IsIdenticalTo", "IsPartOf", "IsPreviousVersionOf", "IsVersionOf"].include?(related_identifier["relatedIdentifierType"]) @@ -105,45 +106,34 @@ def self.push_item(item) ssum end + Rails.logger.info("[Event Import Worker]: send orcid aff push_items = #{push_items.inspect}") + # there can be one or more affiliation_identifier per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } - - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") - - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - Rails.logger.error data.inspect - end - end + }, + } + + Rails.logger.info("[Event Import Worker]: send orcid aff to events") + Rails.logger.info("[Event Import Worker]: orcid data = #{data.inspect}") + send_event_import_message(data) + + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." end total_push_items += push_items diff --git a/app/models/related_identifier.rb b/app/models/related_identifier.rb index 32ade414..7566d326 100644 --- a/app/models/related_identifier.rb +++ b/app/models/related_identifier.rb @@ -4,6 +4,7 @@ class RelatedIdentifier < Base include Helpable include Cacheable + include Queueable def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month @@ -60,7 +61,11 @@ def self.push_item(item) attributes = item.fetch("attributes", {}) doi = attributes.fetch("doi", nil) - return nil unless doi.present? && cached_doi_ra(doi) == "DataCite" + # remove this when done + cdr = "DataCite" + + # return nil unless doi.present? && cached_doi_ra(doi) == "DataCite" + return nil unless doi.present? && cdr == "DataCite" pid = normalize_doi(doi) @@ -119,44 +124,29 @@ def self.push_item(item) # there can be one or more related_identifier per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data Query API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "id" => iiitem["id"], - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "id" => iiitem["id"], + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } + }, + } - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") + send_event_import_message(data) - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - Rails.logger.error data.inspect - end - end + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." # send to Event Data Bus # we only send datacite_crossref events to the bus diff --git a/app/models/related_url.rb b/app/models/related_url.rb index 40701355..c46b1e18 100644 --- a/app/models/related_url.rb +++ b/app/models/related_url.rb @@ -1,6 +1,8 @@ class RelatedUrl < Base LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze + include Queueable + def self.import_by_month(options = {}) from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month @@ -87,42 +89,28 @@ def self.push_item(item) # there can be one or more related_url per DOI Array.wrap(push_items).each do |iiitem| - # send to DataCite Event Data Query API - if ENV["STAFF_ADMIN_TOKEN"].present? - push_url = "#{ENV['LAGOTTINO_URL']}/events" - - data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => iiitem["message_action"], - "subjId" => iiitem["subj_id"], - "objId" => iiitem["obj_id"], - "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, - "sourceId" => iiitem["source_id"].to_s.dasherize, - "sourceToken" => iiitem["source_token"], - "occurredAt" => iiitem["occurred_at"], - "timestamp" => iiitem["timestamp"], - "license" => iiitem["license"], - "subj" => iiitem["subj"], - "obj" => iiitem["obj"], - }, + data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => iiitem["message_action"], + "subjId" => iiitem["subj_id"], + "objId" => iiitem["obj_id"], + "relationTypeId" => iiitem["relation_type_id"].to_s.dasherize, + "sourceId" => iiitem["source_id"].to_s.dasherize, + "sourceToken" => iiitem["source_token"], + "occurredAt" => iiitem["occurred_at"], + "timestamp" => iiitem["timestamp"], + "license" => iiitem["license"], + "subj" => iiitem["subj"], + "obj" => iiitem["obj"], }, - } - - response = Maremma.post(push_url, data: data.to_json, - bearer: ENV["STAFF_ADMIN_TOKEN"], - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2") - - if [200, 201].include?(response.status) - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service." - elsif response.status == 409 - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service." - elsif response.body["errors"].present? - Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}" - end - end + }, + } + + send_event_import_message(data) + + Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue." end push_items.length diff --git a/app/workers/event_import_worker.rb b/app/workers/event_import_worker.rb new file mode 100644 index 00000000..b3830649 --- /dev/null +++ b/app/workers/event_import_worker.rb @@ -0,0 +1,47 @@ +class EventImportWorker + include Shoryuken::Worker + + shoryuken_options queue: -> { "#{ENV['RAILS_ENV']}_events" }, auto_delete: true + + def perform(sqs_msg=nil, data=nil) + if data.blank? + Rails.logger.info("[EventImportWorker] data object is blank.") + return + end + + response = post_to_event_service(data) + data = JSON.parse(data) + prefix = log_prefix(data) + handle_logging(data, response, prefix) + end + + private + + def post_to_event_service(data) + Maremma.post( + "#{ENV["LAGOTTINO_URL"]}/events", + data: data, + bearer: ENV["STAFF_ADMIN_TOKEN"], + content_type: "application/vnd.api+json", + accept: "application/vnd.api+json; version=2") + end + + def log_prefix(data) + subj_id = data["data"]["attributes"]["subjId"] + relation_type_id = data["data"]["attributes"]["relationTypeId"] + obj_id = data["data"]["attributes"]["objId"] + + "[EventImportWorker] #{subj_id} #{relation_type_id} #{obj_id}" + end + + def handle_logging(data, response, prefix) + if response.status == 200 || response.status == 201 + Rails.logger.info("#{prefix} pushed to the Event Data service.") + elsif response.status == 409 + Rails.logger.info("#{prefix} already pushed to the Event Data service.") + elsif response.body["errors"].present? + Rails.logger.error("#{prefix} had an error: #{response.body["errors"]}") + Rails.logger.error(data.inspect) + end + end +end diff --git a/config/initializers/_shoryuken.rb b/config/initializers/_shoryuken.rb index c872e467..99f822d3 100644 --- a/config/initializers/_shoryuken.rb +++ b/config/initializers/_shoryuken.rb @@ -1,5 +1,13 @@ # frozen_string_literal: true +if Rails.env.development? + Aws.config.update({ + endpoint: ENV["AWS_ENDPOINT"], + region: "us-east-1", + credentials: Aws::Credentials.new("test", "test") + }) +end + # Shoryuken middleware to capture worker errors and send them on to Sentry.io module Shoryuken module Middleware diff --git a/config/shoryuken.dev.yml b/config/shoryuken.dev.yml new file mode 100644 index 00000000..24482186 --- /dev/null +++ b/config/shoryuken.dev.yml @@ -0,0 +1,5 @@ +concurrency: 1 +delay: 0 +pidfile: tmp/pids/shoryuken.pid +queues: + - events diff --git a/config/shoryuken.yml b/config/shoryuken.yml index 9a90dfe9..14cf83e3 100644 --- a/config/shoryuken.yml +++ b/config/shoryuken.yml @@ -1,3 +1,13 @@ +# - the global concurrency is 50 threads +# - the only queues within the global context are doi and levriero i.e. we process, +# at most, 50 messages from the doi and levriero queues in parallel + +# - the events group defines concurrency for the events queue i.e. we process, +# at most, 10 events in parallel + +# - the usage group defines concurrency for the usage and levriero_usage queues +# i.e. we process, at most, 4 messages in parallel + concurrency: 50 delay: 0 pidfile: tmp/pids/shoryuken.pid @@ -6,12 +16,11 @@ queues: - levriero groups: - levriero_usage: + events: concurrency: 10 queues: - - levriero_usage + - events usage: concurrency: 4 queues: - usage - # - levriero_usage diff --git a/docker-compose.localstack.yml b/docker-compose.localstack.yml new file mode 100644 index 00000000..a0f8b1bf --- /dev/null +++ b/docker-compose.localstack.yml @@ -0,0 +1,33 @@ +services: + web: + container_name: levriero_web + platform: linux/amd64 + env_file: .env + environment: + - LOG_LEVEL=info + - AWS_ENDPOINT=http://localstack:4566 + # - LAGOTTINO_URL=http://lupo_web # use this to to connect to localstack network + - LAGOTTINO_URL=https://api.stage.datacite.org # use this value to run specs locally + image: datacite/levriero + ports: + - "8045:80" + - "2245:22" + volumes: + - ./app:/home/app/webapp/app + - ./config:/home/app/webapp/config + - ./lib:/home/app/webapp/lib + - ./spec:/home/app/webapp/spec + dns: + - 10.0.2.20 + networks: + - localstack_network + + memcached: + image: memcached:1.6.32 + container_name: levriero_memcached + networks: + - localstack_network + +networks: + localstack_network: + external: true diff --git a/docker-compose.yml b/docker-compose.yml index d2c5d5b0..38c343fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: env_file: .env environment: - LOG_LEVEL=info + - LAGOTTINO_URL=https://api.stage.datacite.org image: datacite/levriero build: . ports: @@ -27,5 +28,5 @@ networks: ipam: driver: default config: - - subnet: 10.0.70.0/24 - gateway: 10.0.70.1 + - subnet: 10.0.70.0/24 + gateway: 10.0.70.1 diff --git a/lib/tasks/event_import_worker.rake b/lib/tasks/event_import_worker.rake new file mode 100644 index 00000000..6c6bf380 --- /dev/null +++ b/lib/tasks/event_import_worker.rake @@ -0,0 +1,13 @@ +namespace :event_import_worker do + desc "Import for a single doi" + task import_doi: :environment do + data = {id: "10.82608/4ds0-vv20"}.to_json + response = Doi.parse_record(sqs_msg: nil, data: JSON.parse(data)) + puts response + end + + desc "Process a message" + task process_message: :environment do + RelatedUrl.receive_event_message + end +end diff --git a/spec/lib/tasks/funder_identifier_rake_spec.rb b/spec/lib/tasks/funder_identifier_rake_spec.rb index 111b1d7d..2ab4e6a8 100644 --- a/spec/lib/tasks/funder_identifier_rake_spec.rb +++ b/spec/lib/tasks/funder_identifier_rake_spec.rb @@ -58,17 +58,17 @@ describe "when STAFF_ADMIN_TOKEN" do before(:each) do - allow(ENV).to(receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN")) allow(ENV).to(receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com")) allow(ENV).to(receive(:[]).with("DATACITE_FUNDER_SOURCE_TOKEN").and_return("DATACITE_FUNDER_SOURCE_TOKEN")) allow(Base).to(receive(:cached_datacite_response).and_return({ "foo" => "bar" })) allow(Base).to(receive(:cached_funder_response).and_return({ "bar" => "foo" })) allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 200))) allow(Time).to(receive_message_chain(:zone, :now, :iso8601).and_return("2023-11-15T12:17:47Z")) + allow(FunderIdentifier).to(receive(:send_event_import_message).and_return(nil)) end describe "is valid" do - it "makes request to lagottino for those funder identifiers with funder identifier type 'Crossref Funder Id'" do + it "sends to events queue for those funder identifiers with funder identifier type 'Crossref Funder Id'" do item = { "attributes" => { "doi" => "https://doi.org/10.0001/foo.bar", @@ -91,8 +91,7 @@ } expect(FunderIdentifier.push_item(item)).to(eq(2)) - - expect(Maremma).to(have_received(:post).twice) + expect(FunderIdentifier).to(have_received(:send_event_import_message).twice) end it "passes the expected values to lagottino" do @@ -126,40 +125,10 @@ "obj" => { "bar" => "foo" }, }, }, - }.to_json - - expect(FunderIdentifier.push_item(item)).to(eq(1)) - - expect(Maremma).to(have_received(:post).with( - "https://fake.lagattino.com/events", - data: json_data, - bearer: "STAFF_ADMIN_TOKEN", - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2", - )) - end - end - - describe "is invalid" do - it "will not make request to lagottino" do - allow(ENV).to(receive(:[]).with("STAFF_ADMIN_TOKEN").and_return(nil)) - - item = { - "attributes" => { - "doi" => "https://doi.org/10.0001/foo.bar", - "updated" => "2023-11-15", - "fundingReferences" => [ - { - "funderIdentifier" => "https://doi.org/10.0001/example.one", - "funderIdentifierType" => "Crossref Funder ID", - }, - ], - }, } expect(FunderIdentifier.push_item(item)).to(eq(1)) - - expect(Maremma).not_to(have_received(:post)) + expect(FunderIdentifier).to(have_received(:send_event_import_message).with(json_data).once) end end end diff --git a/spec/models/affiliation_identifier_spec.rb b/spec/models/affiliation_identifier_spec.rb index 803f58d9..0522356c 100644 --- a/spec/models/affiliation_identifier_spec.rb +++ b/spec/models/affiliation_identifier_spec.rb @@ -35,25 +35,11 @@ end describe "#push_data" do - it "pushes data to the Event Data service" do - # Mock a successful result from the Event Data service + it "pushes data to the events queue" do successful_result = double("result", body: { "data" => [{ "attributes" => {} }] }) - - allow(Maremma).to receive(:post).and_return(successful_result) - result = AffiliationIdentifier.new.push_data(successful_result) expect(result).to eq(1) end - - it "handles errors from the Event Data service" do - # Mock an error result from the Event Data service - error_result = double("result", body: { "errors" => "Error message" }) - - allow(Maremma).to receive(:post).and_return(error_result) - - result = AffiliationIdentifier.new.push_data(error_result) - expect(result).to eq("Error message") - end end describe ".push_item" do @@ -83,138 +69,18 @@ } end - context "when STAFF_ADMIN_TOKEN is present" do + context "when can add to events queue" do before do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("example_admin_token") allow(AffiliationIdentifier).to receive(:cached_datacite_response).and_return({}) allow(AffiliationIdentifier).to receive(:cached_ror_response).and_return({}) - allow(ENV).to receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com") allow(ENV).to receive(:[]).with("DATACITE_AFFILIATION_SOURCE_TOKEN").and_return("DATACITE_AFFILIATION_SOURCE_TOKEN") allow(Rails.logger).to receive(:info) + allow(AffiliationIdentifier).to receive(:send_event_import_message).and_return(nil) end - it "pushes affiliation identifiers to the Event Data service" do - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 200, - body: { "data" => { "id" => "example_id" } })) - push_url = "https://fake.lagattino.com/events" - expected_data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => "create", - "subjId" => "https://doi.org/10.1234/example-doi", - "objId" => "https://ror.org/02catss52", - "relationTypeId" => "is_authored_at", - "sourceId" => "datacite_affiliation", - "sourceToken" => "DATACITE_AFFILIATION_SOURCE_TOKEN", - "occurredAt" => "2023-01-05T12:00:00Z", - "timestamp" => Time.zone.now.iso8601, - "license" => "https://creativecommons.org/publicdomain/zero/1.0/", - "subj" => {}, - "obj" => {}, - }, - }, - } - - expect(Rails.logger).to receive(:info).with("[Event Data] https://doi.org/10.1234/example-doi is_authored_at https://ror.org/02catss52 pushed to Event Data service.") - - stub_request(:post, push_url). - with( - body: expected_data.to_json, - headers: { - "Authorization" => "Bearer example_admin_token", - "Content-Type" => "application/vnd.api+json", - "Accept" => "application/vnd.api+json; version=2", - }, - ). - to_return(status: 200, body: { "data" => { "id" => "example_id" } }.to_json, headers: {}) - - AffiliationIdentifier.push_item(item) - end - - it "skips pushing if STAFF_ADMIN_TOKEN is not present" do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return(nil) - expect(Maremma).not_to receive(:post) - - AffiliationIdentifier.push_item(item) - end - - it "returns 409 for already pushed events" do - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 409, - body: { "data" => { "id" => "example_id" } })) - push_url = "https://fake.lagattino.com/events" - expected_data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => "create", - "subjId" => "https://doi.org/10.1234/example-doi", - "objId" => "https://ror.org/02catss52", - "relationTypeId" => "is_authored_at", - "sourceId" => "datacite_affiliation", - "sourceToken" => "DATACITE_AFFILIATION_SOURCE_TOKEN", - "occurredAt" => "2023-01-05T12:00:00Z", - "timestamp" => Time.zone.now.iso8601, - "license" => "https://creativecommons.org/publicdomain/zero/1.0/", - "subj" => {}, - "obj" => {}, - }, - }, - } - - expect(Rails.logger).to receive(:info).with("[Event Data] https://doi.org/10.1234/example-doi is_authored_at https://ror.org/02catss52 already pushed to Event Data service.") - - stub_request(:post, push_url). - with( - body: expected_data.to_json, - headers: { - "Authorization" => "Bearer example_admin_token", - "Content-Type" => "application/vnd.api+json", - "Accept" => "application/vnd.api+json; version=2", - }, - ). - to_return(status: 200, body: { "data" => { "id" => "example_id" } }.to_json, headers: {}) - - AffiliationIdentifier.push_item(item) - end - - it "returns 500 when there is error while pushing an event" do - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 500, - body: { "errors" => "An error occurred during the put request." })) - allow(Rails.logger).to receive(:error) - - push_url = "https://fake.lagattino.com/events" - expected_data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => "create", - "subjId" => "https://doi.org/10.1234/example-doi", - "objId" => "https://ror.org/02catss52", - "relationTypeId" => "is_authored_at", - "sourceId" => "datacite_affiliation", - "sourceToken" => "DATACITE_AFFILIATION_SOURCE_TOKEN", - "occurredAt" => "2023-01-05T12:00:00Z", - "timestamp" => Time.zone.now.iso8601, - "license" => "https://creativecommons.org/publicdomain/zero/1.0/", - "subj" => {}, - "obj" => {}, - }, - }, - } - - expect(Rails.logger).to receive(:error).with("[Event Data] https://doi.org/10.1234/example-doi is_authored_at https://ror.org/02catss52 had an error: An error occurred during the put request.") - - stub_request(:post, push_url). - with( - body: expected_data.to_json, - headers: { - "Authorization" => "Bearer example_admin_token", - "Content-Type" => "application/vnd.api+json", - "Accept" => "application/vnd.api+json; version=2", - }, - ). - to_return(status: 200, body: { "data" => { "id" => "example_id" } }.to_json, headers: {}) + it "pushes affiliation identifiers to the events queue" do + expect(Rails.logger).to receive(:info).with("[Event Data] https://doi.org/10.1234/example-doi is_authored_at https://ror.org/02catss52 sent to the events queue.") + expect(AffiliationIdentifier).to receive(:send_event_import_message).once AffiliationIdentifier.push_item(item) end diff --git a/spec/models/funder_identifier_spec.rb b/spec/models/funder_identifier_spec.rb index f16d40ba..7f53b0b3 100644 --- a/spec/models/funder_identifier_spec.rb +++ b/spec/models/funder_identifier_spec.rb @@ -28,12 +28,5 @@ "London School of Economics", "The London School of Economics and Political Science", "LSE"]) expect(response["dateModified"]).to eq("2018-07-11T00:00:00Z") end - - # it "push_item" do - # doi = "10.15125/bath-00708" - # attributes = FunderIdentifier.get_datacite_json(doi) - # response = FunderIdentifier.push_item({ "id" => doi, "type" => "dois", "attributes" => attributes }) - # expect(response).to eq(1) - # end end end diff --git a/spec/models/name_identifier_spec.rb b/spec/models/name_identifier_spec.rb index adb78d9b..695191e2 100644 --- a/spec/models/name_identifier_spec.rb +++ b/spec/models/name_identifier_spec.rb @@ -19,8 +19,6 @@ end describe "#push_item" do - let(:staff_admin_token) { "STAFF_ADMIN_TOKEN" } - let(:staff_profiles_admin_token) { "STAFF_PROFILES_ADMIN_TOKEN" } let(:lagottino_json) do @@ -56,11 +54,6 @@ end before(:each) do - allow(ENV). - to(receive(:[]). - with(staff_admin_token). - and_return(staff_admin_token)) - allow(ENV). to(receive(:[]). with("LAGOTTINO_URL"). @@ -89,15 +82,6 @@ to(receive(:cached_orcid_response). and_return("bar" => "foo")) - allow(Maremma). - to(receive(:post). - with("https://fake.lagattino.com/events", - data: lagottino_json, - accept: "application/vnd.api+json; version=2", - content_type: "application/vnd.api+json", - bearer: staff_admin_token). - and_return(OpenStruct.new(status: 200))) - allow(Maremma). to(receive(:post). with("https://fake.volpino.com/claims", @@ -109,6 +93,8 @@ allow(Time). to(receive_message_chain(:zone, :now, :iso8601). and_return("2023-11-15T12:17:47Z")) + + allow(NameIdentifier).to(receive(:send_event_import_message).and_return(nil)) end describe "returns nil" do @@ -220,9 +206,9 @@ end end - describe "when STAFF_ADMIN_TOKEN" do - describe "is valid" do - it "makes request to lagottino for the first name identifier with scheme 'ORCID'" do + describe "when values" do + describe "are valid" do + it "send message to events for the first name identifier with scheme 'ORCID'" do item = { "attributes" => { "doi" => "https://doi.org/10.0001/foo.bar", @@ -247,22 +233,12 @@ } expect(NameIdentifier.push_item(item)).to(eq(1)) - - expect(Maremma). - to(have_received(:post). - with( - "https://fake.lagattino.com/events", - data: lagottino_json, - bearer: staff_admin_token, - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2", - )) + expect(NameIdentifier).to(have_received(:send_event_import_message).once) end end describe "is invalid" do - it "will not make request to lagottino" do - allow(ENV).to(receive(:[]).with(staff_admin_token).and_return(nil)) + it "will not send a message the events queue" do allow(ENV).to(receive(:[]).with("DATACITE_ORCID_AUTO_UPDATE_SOURCE_TOKEN").and_return("DATACITE_ORCID_AUTO_UPDATE_SOURCE_TOKEN")) item = { @@ -281,16 +257,7 @@ } expect(NameIdentifier.push_item(item)).to(eq(1)) - - expect(Maremma). - not_to(have_received(:post). - with( - "https://fake.lagattino.com/events", - data: lagottino_json, - bearer: staff_admin_token, - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2", - )) + expect(NameIdentifier).not_to(receive(:send_event_import_message)) end end end diff --git a/spec/models/orcid_affiliation_spec.rb b/spec/models/orcid_affiliation_spec.rb index 058c7088..531eb165 100644 --- a/spec/models/orcid_affiliation_spec.rb +++ b/spec/models/orcid_affiliation_spec.rb @@ -91,7 +91,6 @@ describe "#push_item" do before do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("example_admin_token") allow(ENV).to receive(:[]).with("ORCID_AFFILIATION_SOURCE_TOKEN").and_return("ORCID_AFFILIATION_SOURCE_TOKEN") allow(ENV).to receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com") allow(ENV).to receive(:[]).with("API_URL").and_return("https://fake.api.com") @@ -107,12 +106,11 @@ allow(ENV).to receive(:[]).with("SSL_CERT_FILE").and_return("https://fake.orcidapiurl.com") allow(ENV).to receive(:[]).with("SSL_CERT_DIR").and_return("https://fake.orcidapiurl.com") allow(Rails.logger).to receive(:info) + allow(OrcidAffiliation).to receive(:send_event_import_message).and_return(nil) end it "push_item with valid data" do - # Mocking a valid item with an ORCID name identifier and ROR affiliation identifier - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 201, - body: { "data" => { "id" => "example_id" } })) + allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 201, body: { "data" => { "id" => "example_id" } })) item = { "attributes" => { @@ -141,149 +139,11 @@ expect(OrcidAffiliation).to receive(:normalize_orcid).with("0000-0001-2345-6789").and_return("https://orcid.org/0000-0001-2345-6789") expect(OrcidAffiliation).to receive(:normalize_ror).with("https://ror.org/02catss52").and_return("https://ror.org/normalized-ror-id") - - expect(Rails.logger).to receive(:info).with("[Event Data] https://orcid.org/0000-0001-2345-6789 is_affiliated_with https://ror.org/normalized-ror-id pushed to Event Data service.") - - response = OrcidAffiliation.push_item(item) - expect(response).to eq(1) - end - - it "push_item with valid already pushed data" do - # Mocking a valid item with an ORCID name identifier and ROR affiliation identifier - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 409, - body: { "data" => { "id" => "example_id" } })) - - item = { - "attributes" => { - "relatedIdentifiers" => [{ "relatedIdentifierType" => "IsSupplementTo", - "relatedIdentifier" => "10.5678/some-related-doi" }], - "creators" => [ - { - "nameIdentifiers" => [ - { - "nameIdentifierScheme" => "ORCID", - "nameIdentifier" => "0000-0001-2345-6789", - }, - ], - "affiliation" => [ - { - "affiliationIdentifierScheme" => "ROR", - "affiliationIdentifier" => "https://ror.org/02catss52", - }, - ], - }, - ], - "updated" => "2023-01-05T12:00:00Z", - }, - "sourceId" => "orcid_affiliation", - } - - expect(OrcidAffiliation).to receive(:normalize_orcid).with("0000-0001-2345-6789").and_return("https://orcid.org/0000-0001-2345-6789") - expect(OrcidAffiliation).to receive(:normalize_ror).with("https://ror.org/02catss52").and_return("https://ror.org/normalized-ror-id") - - expect(Rails.logger).to receive(:info).with("[Event Data] https://orcid.org/0000-0001-2345-6789 is_affiliated_with https://ror.org/normalized-ror-id already pushed to Event Data service.") - - response = OrcidAffiliation.push_item(item) - expect(response).to eq(1) - end - - it "push_item with valid with error" do - # Mocking a valid item with an ORCID name identifier and ROR affiliation identifier - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 500, - body: { "errors" => "An error occurred during the put request." })) - allow(Rails.logger).to receive(:error) - - item = { - "attributes" => { - "relatedIdentifiers" => [{ "relatedIdentifierType" => "IsSupplementTo", - "relatedIdentifier" => "10.5678/some-related-doi" }], - "creators" => [ - { - "nameIdentifiers" => [ - { - "nameIdentifierScheme" => "ORCID", - "nameIdentifier" => "0000-0001-2345-6789", - }, - ], - "affiliation" => [ - { - "affiliationIdentifierScheme" => "ROR", - "affiliationIdentifier" => "https://ror.org/02catss52", - }, - ], - }, - ], - "updated" => "2023-01-05T12:00:00Z", - }, - "sourceId" => "orcid_affiliation", - } - - expect(OrcidAffiliation).to receive(:normalize_orcid).with("0000-0001-2345-6789").and_return("https://orcid.org/0000-0001-2345-6789") - expect(OrcidAffiliation).to receive(:normalize_ror).with("https://ror.org/02catss52").and_return("https://ror.org/normalized-ror-id") - - expect(Rails.logger).to receive(:error).with("[Event Data] https://orcid.org/0000-0001-2345-6789 is_affiliated_with https://ror.org/normalized-ror-id had an error: An error occurred during the put request.") + expect(Rails.logger).to receive(:info).with("[Event Data] https://orcid.org/0000-0001-2345-6789 is_affiliated_with https://ror.org/normalized-ror-id sent to the events queue.") response = OrcidAffiliation.push_item(item) expect(response).to eq(1) end - - it "push_item with missing ORCID data" do - # Mocking an item with missing ORCID data - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 201, - body: { "data" => { "id" => "example_id" } })) - item = { - "attributes" => { - "relatedIdentifiers" => [{ "relatedIdentifierType" => "IsSupplementTo", - "relatedIdentifier" => "10.5678/some-related-doi" }], - "creators" => [ - { - "affiliation" => [ - { - "affiliationIdentifierScheme" => "ROR", - "affiliationIdentifier" => "https://ror.org/02catss52", - }, - ], - }, - ], - "updated" => "2023-01-05T12:00:00Z", - }, - "sourceId" => "orcid_affiliation", - } - - response = OrcidAffiliation.push_item(item) - expect(response).to eq(nil) - end - - it "push_item with related identifier type to skip" do - # Mocking an item with a related identifier type to skip - item = { - "attributes" => { - "relatedIdentifiers" => [{ "relatedIdentifierType" => "IsIdenticalTo", - "relatedIdentifier" => "10.5678/some-related-doi" }], - "creators" => [ - { - "nameIdentifiers" => [ - { - "nameIdentifierScheme" => "ORCID", - "nameIdentifier" => "0000-0001-2345-6789", - }, - ], - "affiliation" => [ - { - "affiliationIdentifierScheme" => "ROR", - "affiliationIdentifier" => "https://ror.org/02catss52", - }, - ], - }, - ], - "updated" => "2023-01-05T12:00:00Z", - }, - "sourceId" => "orcid_affiliation", - } - - response = OrcidAffiliation.push_item(item) - expect(response).to eq(nil) - end end end end diff --git a/spec/models/related_identifier_spec.rb b/spec/models/related_identifier_spec.rb index 1e776954..dd4e978f 100644 --- a/spec/models/related_identifier_spec.rb +++ b/spec/models/related_identifier_spec.rb @@ -82,7 +82,6 @@ context "when the DOI and related identifiers are valid" do before do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN") allow(ENV).to receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com") allow(ENV).to receive(:[]).with("DATACITE_RELATED_SOURCE_TOKEN").and_return("DATACITE_RELATED_SOURCE_TOKEN") allow(ENV).to receive(:[]).with("USER_AGENT").and_return("default_user_agent") @@ -90,10 +89,11 @@ allow(ENV).to receive(:[]).with("EVENTDATA_URL").and_return("https://fake.eventdataurl.com") allow(Base).to receive(:cached_datacite_response).and_return({ "foo" => "bar" }) allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 201)) + allow(RelatedIdentifier).to receive(:send_event_import_message).and_return(nil) allow(Time.zone).to receive(:now).and_return(Time.zone.parse("2023-11-15T12:17:47Z")) end - it "queues jobs and pushes to Event Data services" do + it "queues jobs and pushes to the events queue" do related_identifier = RelatedIdentifier.new allow(related_identifier).to receive(:normalize_doi).with(valid_doi).and_return("normalized_doi") allow(related_identifier).to receive(:normalize_doi).with(valid_related_identifier).and_return("normalized_related_identifier") @@ -104,9 +104,9 @@ allow(Rails.logger).to receive(:info) expect(RelatedIdentifier.push_item(item)).to eq(1) - expect(Maremma).to have_received(:post).with("https://fake.lagattino.com/events", anything).once + expect(RelatedIdentifier).to have_received(:send_event_import_message).once - expect(Rails.logger).to have_received(:info).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related pushed to Event Data service.") + expect(Rails.logger).to have_received(:info).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related sent to the events queue.") end it "does push the event to the event data bus when source_id is datacite_crossref" do @@ -123,96 +123,16 @@ allow(Rails.logger).to receive(:info) expect(RelatedIdentifier.push_item(item)).to eq(1) - expect(Maremma).to have_received(:post).with("https://fake.lagattino.com/events", anything).once + expect(RelatedIdentifier).to have_received(:send_event_import_message).once expect(Maremma).to have_received(:post).with("https://fake.eventdataurl.com/events", anything).once - expect(Rails.logger).to have_received(:info).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related pushed to Event Data service.") + expect(Rails.logger).to have_received(:info).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related sent to the events queue.") expect(Rails.logger).to have_received(:info).with("[Event Data Bus] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related pushed to Event Data service.") end end - context "when the DOI and related identifiers are valid and already pushed" do - before do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN") - allow(ENV).to receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com") - allow(ENV).to receive(:[]).with("DATACITE_RELATED_SOURCE_TOKEN").and_return("DATACITE_RELATED_SOURCE_TOKEN") - allow(ENV).to receive(:[]).with("USER_AGENT").and_return("default_user_agent") - allow(ENV).to receive(:[]).with("EVENTDATA_TOKEN").and_return("EVENTDATA_TOKEN") - allow(ENV).to receive(:[]).with("EVENTDATA_URL").and_return("https://fake.eventdataurl.com") - allow(Base).to receive(:cached_datacite_response).and_return({ "foo" => "bar" }) - allow(Maremma).to receive(:post).and_return(OpenStruct.new(status: 409)) - allow(Time.zone).to receive(:now).and_return(Time.zone.parse("2023-11-15T12:17:47Z")) - end - - it "queues jobs and pushes to Event Data services" do - related_identifier = RelatedIdentifier.new - allow(related_identifier).to receive(:normalize_doi).with(valid_doi).and_return("normalized_doi") - allow(related_identifier).to receive(:normalize_doi).with(valid_related_identifier).and_return("normalized_related_identifier") - allow(related_identifier).to receive(:validate_prefix).with(valid_related_identifier).and_return("datacite") - allow(RelatedIdentifier).to receive(:cached_doi_ra).and_return("DataCite") - allow(RelatedIdentifier).to receive(:cached_datacite_response).and_return({}) - allow(related_identifier).to receive(:set_event_for_bus).and_return({}) - allow(Rails.logger).to receive(:info) - - expect(RelatedIdentifier.push_item(item)).to eq(1) - expect(Maremma).to have_received(:post).with("https://fake.lagattino.com/events", anything).once - - expect(Rails.logger).to have_received(:info).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related already pushed to Event Data service.") - end - end - - context "when there is error while creating the event" do - let(:error_message) { "An error occurred during the put request." } - before do - allow(ENV).to receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN") - allow(ENV).to receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com") - allow(ENV).to receive(:[]).with("DATACITE_RELATED_SOURCE_TOKEN").and_return("DATACITE_RELATED_SOURCE_TOKEN") - allow(ENV).to receive(:[]).with("USER_AGENT").and_return("default_user_agent") - allow(ENV).to receive(:[]).with("EVENTDATA_TOKEN").and_return("EVENTDATA_TOKEN") - allow(ENV).to receive(:[]).with("EVENTDATA_URL").and_return("https://fake.eventdataurl.com") - allow(Base).to receive(:cached_datacite_response).and_return({ "foo" => "bar" }) - allow(Maremma).to receive(:post) do |_, _options| - OpenStruct.new(status: 500, body: { "errors" => error_message }) - end - allow(Time.zone).to receive(:now).and_return(Time.zone.parse("2023-11-15T12:17:47Z")) - end - - it "queues jobs and pushes to Event Data services" do - related_identifier = RelatedIdentifier.new - allow(related_identifier).to receive(:normalize_doi).with(valid_doi).and_return("normalized_doi") - allow(related_identifier).to receive(:normalize_doi).with(valid_related_identifier).and_return("normalized_related_identifier") - allow(related_identifier).to receive(:validate_prefix).with(valid_related_identifier).and_return("datacite") - allow(RelatedIdentifier).to receive(:cached_doi_ra).and_return("DataCite") - allow(RelatedIdentifier).to receive(:cached_datacite_response).and_return({}) - allow(related_identifier).to receive(:set_event_for_bus).and_return({}) - allow(Rails.logger).to receive(:error) - - expect(RelatedIdentifier.push_item(item)).to eq(1) - expect(Maremma).to have_received(:post).with("https://fake.lagattino.com/events", anything).once - - expect(Rails.logger).to have_received(:error).with("[Event Data] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related had an error: An error occurred during the put request.") - end - - it "Does not sent an event to Event Data Bus." do - allow(ENV).to receive(:[]).with("EVENTDATA_TOKEN").and_return(nil) - related_identifier = RelatedIdentifier.new - allow(related_identifier).to receive(:normalize_doi).with(valid_doi).and_return("normalized_doi") - allow(related_identifier).to receive(:normalize_doi).with(valid_related_identifier).and_return("normalized_related_identifier") - allow(related_identifier).to receive(:validate_prefix).with(valid_related_identifier).and_return("datacite") - allow(RelatedIdentifier).to receive(:cached_doi_ra).and_return("DataCite") - allow(RelatedIdentifier).to receive(:cached_datacite_response).and_return({}) - allow(related_identifier).to receive(:set_event_for_bus).and_return({}) - allow(Rails.logger).to receive(:info) - - expect(RelatedIdentifier.push_item(item)).to eq(1) - expect(Maremma).to have_received(:post).with("https://fake.lagattino.com/events", anything).once - - expect(Rails.logger).to have_received(:info).with("[Event Data Bus] https://doi.org/10.1234/example example_type https://doi.org/10.5678/related was not sent to Event Data Bus.") - end - end - context "when the DOI is blank" do it "returns nil" do - allow(Maremma).to receive(:post) + allow(RelatedIdentifier).to receive(:send_event_import_message).and_return(nil) item = { "attributes" => { "doi" => nil, @@ -227,7 +147,7 @@ } expect(RelatedIdentifier.push_item(item)).to eq(nil) - expect(Maremma).not_to have_received(:post) + expect(RelatedIdentifier).not_to have_received(:send_event_import_message) end end end diff --git a/spec/models/related_url_spec.rb b/spec/models/related_url_spec.rb index 7c82b8a5..54c67385 100644 --- a/spec/models/related_url_spec.rb +++ b/spec/models/related_url_spec.rb @@ -36,130 +36,83 @@ end describe "push_item" do + before(:each) do + allow(ENV).to(receive(:[]).with("DATACITE_URL_SOURCE_TOKEN").and_return("DATACITE_URL_SOURCE_TOKEN")) + allow(Base).to(receive(:cached_datacite_response).and_return({ "foo" => "bar" })) + allow(RelatedUrl).to(receive(:send_event_import_message).and_return(nil)) + allow(Time).to(receive_message_chain(:zone, :now, :iso8601).and_return("2023-11-15T12:17:47Z")) + allow(RelatedUrl).to(receive(:send_event_import_message).and_return(nil)) + end + it "returns nil if the doi is blank" do expect(RelatedUrl.push_item("doi" => nil)).to(eq(nil)) end - describe "when STAFF_ADMIN_TOKEN" do - before(:each) do - allow(ENV).to(receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN")) - allow(ENV).to(receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com")) - allow(ENV).to(receive(:[]).with("DATACITE_URL_SOURCE_TOKEN").and_return("DATACITE_URL_SOURCE_TOKEN")) - allow(Base).to(receive(:cached_datacite_response).and_return({ "foo" => "bar" })) - allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 200))) - allow(Time).to(receive_message_chain(:zone, :now, :iso8601).and_return("2023-11-15T12:17:47Z")) - end - - describe "is valid" do - it "makes request to lagottino for those related identifiers with type 'URL'" do - item = { - "attributes" => { - "doi" => "https://doi.org/10.0001/foo.bar", - "updated" => "2023-11-15", - "relatedIdentifiers" => [ - { - "relatedIdentifierType" => "URL", - "relatedIdentifier" => "https://doi.org/10.0001/example.one", - "relationType" => "example-one", - }, - { - "relatedIdentifierType" => "DOI", - "relatedIdentifier" => "https://doi.org/10.0001/example.two", - "relationType" => "example-two", - }, - { - "relatedIdentifierType" => "URL", - "relatedIdentifier" => "https://doi.org/10.0001/example.three", - "relationType" => "example-three", - }, - ], + it "sends to the events queue for those related identifiers with type 'URL'" do + item = { + "attributes" => { + "doi" => "https://doi.org/10.0001/foo.bar", + "updated" => "2023-11-15", + "relatedIdentifiers" => [ + { + "relatedIdentifierType" => "URL", + "relatedIdentifier" => "https://doi.org/10.0001/example.one", + "relationType" => "example-one", }, - } - - expect(RelatedUrl.push_item(item)).to(eq(2)) - - expect(Maremma).to(have_received(:post).twice) - end - - it "passes the expected values to lagottino" do - item = { - "attributes" => { - "doi" => "https://doi.org/10.0001/foo.bar", - "updated" => "2023-11-15", - "relatedIdentifiers" => [ - { - "relatedIdentifierType" => "URL", - "relatedIdentifier" => "https://doi.org/10.0001/example.one", - "relationType" => "example-one", - }, - ], + { + "relatedIdentifierType" => "DOI", + "relatedIdentifier" => "https://doi.org/10.0001/example.two", + "relationType" => "example-two", }, - } - - json_data = { - "data" => { - "type" => "events", - "attributes" => { - "messageAction" => "create", - "subjId" => "https://doi.org/10.0001/foo.bar", - "objId" => "https://doi.org/10.0001/example.one", - "relationTypeId" => "example-one", - "sourceId" => "datacite-url", - "sourceToken" => "DATACITE_URL_SOURCE_TOKEN", - "occurredAt" => "2023-11-15", - "timestamp" => "2023-11-15T12:17:47Z", - "license" => "https://creativecommons.org/publicdomain/zero/1.0/", - "subj" => { "foo" => "bar" }, - "obj" => {}, - }, + { + "relatedIdentifierType" => "URL", + "relatedIdentifier" => "https://doi.org/10.0001/example.three", + "relationType" => "example-three", }, - }.to_json - - expect(RelatedUrl.push_item(item)).to(eq(1)) - - expect(Maremma).to(have_received(:post).with( - "https://fake.lagattino.com/events", - data: json_data, - bearer: "STAFF_ADMIN_TOKEN", - content_type: "application/vnd.api+json", - accept: "application/vnd.api+json; version=2", - )) - end - end + ], + }, + } - describe "is invalid" do - it "will not make request to lagottino" do - allow(ENV).to(receive(:[]).with("STAFF_ADMIN_TOKEN").and_return(nil)) + expect(RelatedUrl.push_item(item)).to(eq(2)) + expect(RelatedUrl).to(have_received(:send_event_import_message).twice) + end - item = { - "attributes" => { - "doi" => "https://doi.org/10.0001/foo.bar", - "updated" => "2023-11-15", - "relatedIdentifiers" => [ - { - "relatedIdentifierType" => "URL", - "relatedIdentifier" => "https://doi.org/10.0001/example.one", - "relationType" => "example-one", - }, - { - "relatedIdentifierType" => "DOI", - "relatedIdentifier" => "https://doi.org/10.0001/example.two", - "relationType" => "example-two", - }, - { - "relatedIdentifierType" => "URL", - "relatedIdentifier" => "https://doi.org/10.0001/example.three", - "relationType" => "example-three", - }, - ], + it "passes the expected values to events queue" do + item = { + "attributes" => { + "doi" => "https://doi.org/10.0001/foo.bar", + "updated" => "2023-11-15", + "relatedIdentifiers" => [ + { + "relatedIdentifierType" => "URL", + "relatedIdentifier" => "https://doi.org/10.0001/example.one", + "relationType" => "example-one", }, - } - - expect(RelatedUrl.push_item(item)).to(eq(2)) - - expect(Maremma).not_to(have_received(:post)) - end - end + ], + }, + } + + json_data = { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => "create", + "subjId" => "https://doi.org/10.0001/foo.bar", + "objId" => "https://doi.org/10.0001/example.one", + "relationTypeId" => "example-one", + "sourceId" => "datacite-url", + "sourceToken" => "DATACITE_URL_SOURCE_TOKEN", + "occurredAt" => "2023-11-15", + "timestamp" => "2023-11-15T12:17:47Z", + "license" => "https://creativecommons.org/publicdomain/zero/1.0/", + "subj" => { "foo" => "bar" }, + "obj" => {}, + }, + }, + } + + expect(RelatedUrl.push_item(item)).to(eq(1)) + expect(RelatedUrl).to(have_received(:send_event_import_message).with(json_data).once) end end end diff --git a/spec/workers/event_import_worker_spec.rb b/spec/workers/event_import_worker_spec.rb new file mode 100644 index 00000000..784ac007 --- /dev/null +++ b/spec/workers/event_import_worker_spec.rb @@ -0,0 +1,110 @@ +require "rails_helper" + +describe EventImportWorker do + describe "#perform" do + context "when data is blank" do + before do + allow(Rails.logger).to(receive(:info)) + allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 200))) + end + + it "a POST request is not made to the event data service" do + expect(Maremma).not_to(receive(:post)) + EventImportWorker.new.perform(nil, nil) + end + + it "logs 'blank data message'" do + expect(Rails.logger).to(receive(:info).with("[EventImportWorker] data object is blank.")) + EventImportWorker.new.perform(nil, nil) + end + end + + context "when processing can occur" do + let(:data) { + { + "data" => { + "type" => "events", + "attributes" => { + "messageAction" => "create", + "subjId" => "https://doi.org/10.0001/foo.bar", + "objId" => "https://doi.org/10.0001/example.one", + "relationTypeId" => "example-one", + "sourceId" => "datacite-url", + "sourceToken" => "DATACITE_URL_SOURCE_TOKEN", + "occurredAt" => "2023-11-15", + "timestamp" => "2023-11-15T12:17:47Z", + "license" => "https://creativecommons.org/publicdomain/zero/1.0/", + "subj" => { "foo" => "bar" }, + "obj" => {}, + }, + }, + }.to_json + } + + let(:subj_id) { "https://doi.org/10.0001/foo.bar" } + let(:relation_type_id) { "example-one" } + let(:obj_id) { "https://doi.org/10.0001/example.one" } + + before do + allow(ENV).to(receive(:[]).with("STAFF_ADMIN_TOKEN").and_return("STAFF_ADMIN_TOKEN")) + allow(ENV).to(receive(:[]).with("LAGOTTINO_URL").and_return("https://fake.lagattino.com")) + end + + it "a POST request is made to the event data service" do + allow(Rails.logger).to(receive(:info)) + allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 200))) + expect(Maremma).to(receive(:post)) + EventImportWorker.new.perform(nil, data) + end + + context "and response is 200" do + it "logs pushed to event data service message" do + allow(Rails.logger).to(receive(:info)) + allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 200))) + expected_log = "[EventImportWorker] #{subj_id} #{relation_type_id} #{obj_id} pushed to the Event Data service." + expect(Rails.logger).to(receive(:info).with(expected_log)) + EventImportWorker.new.perform(nil, data) + end + end + + context "and response is 201" do + it "logs pushed to event data service message" do + allow(Rails.logger).to(receive(:info)) + allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 201))) + expected_log = "[EventImportWorker] #{subj_id} #{relation_type_id} #{obj_id} pushed to the Event Data service." + expect(Rails.logger).to(receive(:info).with(expected_log)) + EventImportWorker.new.perform(nil, data) + end + end + + context "when response is 409" do + it "logs pushed to event data service message" do + allow(Rails.logger).to(receive(:info)) + allow(Maremma).to(receive(:post).and_return(OpenStruct.new(status: 409))) + expected_log = "[EventImportWorker] #{subj_id} #{relation_type_id} #{obj_id} already pushed to the Event Data service." + expect(Rails.logger).to(receive(:info).with(expected_log)) + EventImportWorker.new.perform(nil, data) + end + end + + context "when response body contains a non-empty error object value" do + let(:response) { + OpenStruct.new( + status: 500, + body: { + "errors" => { + "message" => "foo" + } + }) + } + + it "logs response had an error message" do + allow(Rails.logger).to(receive(:error)) + allow(Maremma).to(receive(:post).and_return(response)) + expect(Rails.logger).to(receive(:error)).twice + EventImportWorker.new.perform(nil, data) + end + end + end + end +end diff --git a/vendor/docker/shoryuken.sh b/vendor/docker/shoryuken.sh index 5edc62c9..e83fc3e0 100755 --- a/vendor/docker/shoryuken.sh +++ b/vendor/docker/shoryuken.sh @@ -3,4 +3,4 @@ cd /home/app/webapp exec 2>&1 if [ "$AWS_REGION" ]; then exec /sbin/setuser app bundle exec shoryuken -R -C config/shoryuken.yml -fi +fi \ No newline at end of file