Skip to content

Commit

Permalink
Proper support for archiveFields in policy files
Browse files Browse the repository at this point in the history
This is https://github.com/slacmshankar/epicsarchiverap/issues/69.
Please see issue for more details.

This change fixes the archive PV workflow such that
1) Only those archiveFields that are in
getFieldsArchivedAsPartOfStream are added to the extraFields in the .VAL's PVTypeInfo
2) The other fields are converted to archive requests so that they
become their own PVTypeInfos.

There's a new unit test to test this - ArchiveFieldsNotInStreamTest.

There's no BPL to fix existing PVTypeInfo's as of yet.
Per Eric, he is ok to deleting these PV's and then readding the PV's
back to exercise the policy again. That seems to be a decent solution.
  • Loading branch information
slacmshankar committed Nov 15, 2018
1 parent 6dc1140 commit 66e3d53
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.URLEncoder;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;
import org.epics.archiverappliance.common.TimeUtils;
Expand Down Expand Up @@ -36,6 +37,7 @@ public enum ArchivePVStateMachine { START, METAINFO_REQUESTED, METAINFO_OBTAINED
private String pvName;
private String abortReason = "";
private ConfigService configService;
private List<String> fieldsArchivedAsPartOfStream;
private String applianceIdentityAfterCapacityPlanning;
private Timestamp startOfWorkflow = TimeUtils.now();
private Timestamp metaInfoRequestedSubmitted = null;
Expand All @@ -46,6 +48,12 @@ public ArchivePVState(String pvName, ConfigService configService) {
this.pvName = pvName;
this.configService = configService;
this.myIdentity = this.configService.getMyApplianceInfo().getIdentity();
try {
this.fieldsArchivedAsPartOfStream = configService.getFieldsArchivedAsPartOfStream();
} catch(IOException ex) {
logger.error("Cannot determine fields that are to be archived as part of stream", ex);
this.fieldsArchivedAsPartOfStream = null;
}
}

public synchronized void nextStep() {
Expand Down Expand Up @@ -127,6 +135,27 @@ public synchronized void nextStep() {
typeInfo.setUsePVAccess(userSpec.isUsePVAccess());
typeInfo.setPolicyName(thePolicy.getPolicyName());

if(!isField) {
for(String field : thePolicy.getArchiveFields()) {
if(fieldsArchivedAsPartOfStream != null && fieldsArchivedAsPartOfStream.contains(field)) {
typeInfo.addArchiveField(field);
} else {
initiateWorkFlowForNonStreamField(PVNames.normalizePVNameWithField(pvName, field), userSpec);
}
}

// Copy over any archive fields from the user spec
if(userSpec != null && userSpec.wereAnyFieldsSpecified()) {
for(String fieldName : userSpec.getArchiveFields()) {
if(fieldsArchivedAsPartOfStream != null && fieldsArchivedAsPartOfStream.contains(fieldName)) {
typeInfo.addArchiveField(fieldName);
} else {
initiateWorkFlowForNonStreamField(PVNames.normalizePVNameWithField(pvName, fieldName), userSpec);
}
}
}
}

String aliasFieldName = "NAME";
if(typeInfo.hasExtraField(aliasFieldName)) {
if(userSpec.isSkipAliasCheck()) {
Expand All @@ -152,20 +181,7 @@ public synchronized void nextStep() {
}
}
}

if(!isField) {
for(String field : thePolicy.getArchiveFields()) {
typeInfo.addArchiveField(field);
}

// Copy over any archive fields from the user spec
if(userSpec != null && userSpec.wereAnyFieldsSpecified()) {
for(String fieldName : userSpec.getArchiveFields()) {
typeInfo.addArchiveField(fieldName);
}
}
}


ApplianceInfo applianceInfoForPV = null;

if(userSpec.isSkipCapacityPlanning()) {
Expand Down Expand Up @@ -383,6 +399,24 @@ private void convertAliasToRealWorkflow(UserSpecifiedSamplingParams userSpec, St
logger.error("Exception archiving alias " + realName + " in workflow for " + pvName, ex);
}
}

/**
* Add a request for a field that is not to be stored with the stream (aka .VAL).
* This will convert the request to archive this field into a separate pvName.fieldName request after making the necessary checks.
* @throws IOException
*/
private void initiateWorkFlowForNonStreamField(String pvWithFieldName, UserSpecifiedSamplingParams userSpec) throws IOException {
logger.debug("Converting " + pvWithFieldName + " into a separate request separate from the main PV as it is not to be archived as part of the stream.");
if(PVNames.determineAppropriatePVTypeInfo(pvWithFieldName, configService) == null && !configService.doesPVHaveArchiveRequestInWorkflow(pvWithFieldName)) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintWriter out = new PrintWriter(bos);
ArchivePVAction.archivePV(out, pvWithFieldName, userSpec.isUserOverrideParams(), userSpec.getUserSpecifedsamplingMethod(), userSpec.getUserSpecifedSamplingPeriod(), userSpec.getControllingPV(), userSpec.getPolicyName(), pvName, true, configService, ArchivePVAction.getFieldsAsPartOfStream(configService));
out.close();
} else {
logger.debug("Already have " + pvWithFieldName + " in typeinfo or in pending requests");
}
}


/**
* @return The current archiving state machine state
Expand Down
1 change: 1 addition & 0 deletions src/test/org/epics/archiverappliance/TomcatSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private HashMap<String, String> createEnvironment(String testName, String applia
overrideEnvWithSystemProperty(environment, "ARCHAPPL_SHORT_TERM_FOLDER");
overrideEnvWithSystemProperty(environment, "ARCHAPPL_MEDIUM_TERM_FOLDER");
overrideEnvWithSystemProperty(environment, "ARCHAPPL_LONG_TERM_FOLDER");
overrideEnvWithSystemProperty(environment, "ARCHAPPL_POLICIES");

if(logger.isDebugEnabled()) {
for(String key : environment.keySet()) {
Expand Down
20 changes: 20 additions & 0 deletions src/test/org/epics/archiverappliance/UnitTestPVs.db
Original file line number Diff line number Diff line change
Expand Up @@ -180156,3 +180156,23 @@ record(ai, "ArchUnitTest:manual") {
field(ADEL, "0")
field(MDEL, "0")
}

record(calcout, "ArchUnitTest:fieldtst:cnt") {
field(SCAN, "1 second")
field(CALC, "A#C ? A+B : C")
field(INPA, "ArchUnitTest:fieldtst:cnt.VAL NPP")
field(INPB, "0.5")
field(INPC, "4.0")
field(OCAL, "A")
field(DOPT, "Use OCAL")
field(OOPT, "On Change")
field(OUT , "ArchUnitTest:fieldtst.C PP")
}

record(calc, "ArchUnitTest:fieldtst") {
alias("ArchUnitTest:fieldtstalias")
field(MDEL, "0.0")
field(ADEL, "0.0")
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/python

# policies.py for testing archive fileds not in stream


import sys
import os

# Generate a list of policy names. This is used to feed the dropdown in the UI.
def getPolicyList():
pvPoliciesDict = {}
pvPoliciesDict['Default'] = 'The default policy'
return pvPoliciesDict

# Define a list of fields that will be archived as part of every PV.
def getFieldsArchivedAsPartOfStream():
return ['HIHI','HIGH','LOW','LOLO','LOPR','HOPR','DRVH','DRVL'];


# We use the environment variables ARCHAPPL_SHORT_TERM_FOLDER and ARCHAPPL_MEDIUM_TERM_FOLDER to determine the location of the STS and the MTS in the appliance
shorttermstore_plugin_url = 'pb://localhost?name=STS&rootFolder=${ARCHAPPL_SHORT_TERM_FOLDER}&partitionGranularity=PARTITION_HOUR&consolidateOnShutdown=true'
mediumtermstore_plugin_url = 'pb://localhost?name=MTS&rootFolder=${ARCHAPPL_MEDIUM_TERM_FOLDER}&partitionGranularity=PARTITION_DAY&hold=2&gather=1'
longtermstore_plugin_url = 'pb://localhost?name=LTS&rootFolder=${ARCHAPPL_LONG_TERM_FOLDER}&partitionGranularity=PARTITION_YEAR'
#longtermstore_plugin_url = 'blackhole://localhost'

def determinePolicy(pvInfoDict):
pvPolicyDict = {}

pvPolicyDict['samplingPeriod'] = 1.0
pvPolicyDict['samplingMethod'] = 'MONITOR'
pvPolicyDict['dataStores'] = [
shorttermstore_plugin_url,
mediumtermstore_plugin_url,
longtermstore_plugin_url
]
pvPolicyDict['policyName'] = 'Default';

# C is not part of getFieldsArchivedAsPartOfStream
pvPolicyDict["archiveFields"]=['C', 'HIHI','HIGH','LOW','LOLO','LOPR','HOPR','DRVH','DRVL']

return pvPolicyDict
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.epics.archiverappliance.mgmt;

import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;

import org.apache.log4j.Logger;
import org.epics.archiverappliance.Event;
import org.epics.archiverappliance.EventStream;
import org.epics.archiverappliance.SIOCSetup;
import org.epics.archiverappliance.TomcatSetup;
import org.epics.archiverappliance.common.TimeUtils;
import org.epics.archiverappliance.config.ConfigServiceForTests;
import org.epics.archiverappliance.retrieval.client.RawDataRetrievalAsEventStream;
import org.epics.archiverappliance.utils.ui.GetUrlContent;
import org.json.simple.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.openqa.selenium.By;
import org.openqa.selenium.JavascriptExecutor;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.firefox.FirefoxDriver;

/**
* This relates to issue - https://github.com/slacmshankar/epicsarchiverap/issues/69
* We want to create a policy that has extra fields that are not in getFieldsArchivedAsPartOfStream
* When archiving PV's with this policy, we should check
* <ol>
* <li>Fields that are part of getFieldsArchivedAsPartOfStream should be in the .VAL's archiveFields</li>
* <li>Fields that are NOT part of getFieldsArchivedAsPartOfStream should NOT be in the .VAL's archiveFields</li>
* <li>Fields that are NOT part of getFieldsArchivedAsPartOfStream should have separate PVTypeInfo's.</li>
* <li>Just to make things interesting, let's throw in EPICS aliases as well.</li>
* </ol>
*
* The best RTYP to use test this is the MOTOR record; however this has a lot of dependencies.
* So, we approximate this using a couple of CALC records in the UnitTestPVs
* If we <code>caput ArchUnitTest:fieldtst:cnt 0.0</code>, we should see...
* <pre><code>
* $ camonitor ArchUnitTest:fieldtst ArchUnitTest:fieldtst.C
* ArchUnitTest:fieldtst 2018-11-14 15:36:26.730758 0
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:26.730758 3.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:26.730758 0
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:57.730752 0
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:57.730752 0.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:58.730694 0.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:58.730694 1
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:59.730766 1
* ArchUnitTest:fieldtst.C 2018-11-14 15:36:59.730766 1.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:00.730725 1.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:00.730725 2
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:01.730730 2
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:01.730730 2.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:02.730723 2.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:02.730723 3
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:03.730723 3
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:03.730723 3.5
* ArchUnitTest:fieldtst.C 2018-11-14 15:37:04.730761 3.5
* </code></pre>
* Rather unfortunate about the duplicate timestamps; but we should at least be able to test to make sure we capture all the data.
*
* So, we archive ArchUnitTest:fieldtstalias from within a special policies file which add the .C archiveField
* Make sure all the listed conditions are true for ArchUnitTest:fieldtst and ArchUnitTest:fieldtst.C.
* Make sure we can get the data for ArchUnitTest:fieldtst and ArchUnitTest:fieldtst.C (and for the alias as well).
*
*
* @author mshankar
*
*/
public class ArchiveFieldsNotInStreamTest {
private static Logger logger = Logger.getLogger(ArchiveFieldsNotInStreamTest.class.getName());
TomcatSetup tomcatSetup = new TomcatSetup();
SIOCSetup siocSetup = new SIOCSetup();
WebDriver driver;

@Before
public void setUp() throws Exception {
System.getProperties().put("ARCHAPPL_POLICIES", System.getProperty("user.dir") + "/src/test/org/epics/archiverappliance/mgmt/ArchiveFieldsNotInStream.py");
siocSetup.startSIOCWithDefaultDB();
tomcatSetup.setUpWebApps(this.getClass().getSimpleName());
driver = new FirefoxDriver();
}

@After
public void tearDown() throws Exception {
driver.quit();
tomcatSetup.tearDown();
siocSetup.stopSIOC();
}

@Test
public void testArchiveFieldsPV() throws Exception {
driver.get("http://localhost:17665/mgmt/ui/index.html");
((JavascriptExecutor)driver).executeScript("window.skipAutoRefresh = true;");
WebElement pvstextarea = driver.findElement(By.id("archstatpVNames"));
String[] fieldsToArchive = new String[] {
"ArchUnitTest:fieldtstalias"
};
pvstextarea.sendKeys(String.join("\n", fieldsToArchive));
WebElement archiveButton = driver.findElement(By.id("archstatArchive"));
logger.debug("About to submit");
archiveButton.click();
Thread.sleep(4*60*1000);
logger.debug("Checking for archive status");
WebElement checkStatusButton = driver.findElement(By.id("archstatCheckStatus"));
checkStatusButton.click();
Thread.sleep(17*1000);
for(int i = 0; i < fieldsToArchive.length; i++) {
int rowWithInfo = i+1;
WebElement statusPVName = driver.findElement(By.cssSelector("#archstatsdiv_table tr:nth-child(" + rowWithInfo + ") td:nth-child(1)"));
String pvNameObtainedFromTable = statusPVName.getText();
assertTrue("PV Name is not " + fieldsToArchive[i] + "; instead we get " + pvNameObtainedFromTable, fieldsToArchive[i].equals(pvNameObtainedFromTable));
WebElement statusPVStatus = driver.findElement(By.cssSelector("#archstatsdiv_table tr:nth-child(" + rowWithInfo + ") td:nth-child(2)"));
String pvArchiveStatusObtainedFromTable = statusPVStatus.getText();
String expectedPVStatus = "Being archived";
assertTrue("Expecting PV archive status to be " + expectedPVStatus + "; instead it is " + pvArchiveStatusObtainedFromTable + " for field " + fieldsToArchive[i], expectedPVStatus.equals(pvArchiveStatusObtainedFromTable));
}

// Check that we have PVTypeInfo's for the main PV. Also check the archiveFields.
JSONObject valInfo = GetUrlContent.getURLContentAsJSONObject("http://localhost:17665/mgmt/bpl/getPVTypeInfo?pv=ArchUnitTest:fieldtst", true);
logger.debug(valInfo.toJSONString());
@SuppressWarnings("unchecked")
List<String> archiveFields = (List<String>) valInfo.get("archiveFields");
assertTrue("TypeInfo should contain the HIHI field but it does not", archiveFields.contains("HIHI"));
assertTrue("TypeInfo should contain the LOLO field but it does not", archiveFields.contains("LOLO"));
assertTrue("TypeInfo should not contain the DESC field but it does", !archiveFields.contains("DESC"));
assertTrue("TypeInfo should not contain the C field but it does", !archiveFields.contains("C"));

JSONObject C_Info = GetUrlContent.getURLContentAsJSONObject("http://localhost:17665/mgmt/bpl/getPVTypeInfo?pv=ArchUnitTest:fieldtst.C", true);
assertTrue("Did not find a typeinfo for ArchUnitTest:fieldtst.C", C_Info != null);
logger.debug(C_Info.toJSONString());

testRetrievalCount("ArchUnitTest:fieldtst", new double[] { 0.0 } );
siocSetup.caput("ArchUnitTest:fieldtst:cnt", "0.0");
Thread.sleep(2*60*1000);
testRetrievalCount("ArchUnitTest:fieldtst", new double[] { 0.0 } );
testRetrievalCount("ArchUnitTest:fieldtst.C", new double[] { 3.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5 } );
testRetrievalCount("ArchUnitTest:fieldtstalias", new double[] { 0.0 } );
testRetrievalCount("ArchUnitTest:fieldtstalias.C", new double[] { 3.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5 } );
}

private void testRetrievalCount(String pvName, double[] expectedValues) throws IOException {
RawDataRetrievalAsEventStream rawDataRetrieval = new RawDataRetrievalAsEventStream("http://localhost:" + ConfigServiceForTests.RETRIEVAL_TEST_PORT+ "/retrieval/data/getData.raw");
Timestamp end = TimeUtils.plusDays(TimeUtils.now(), 1);
Timestamp start = TimeUtils.minusDays(end, 2);
try(EventStream stream = rawDataRetrieval.getDataForPVS(new String[] { pvName }, start, end, null)) {
long previousEpochSeconds = 0;
int eventCount = 0;
assertTrue("Got a null event stream for PV " + pvName, stream != null);
for(Event e : stream) {
long actualSeconds = e.getEpochSeconds();
logger.debug("For " + pvName + " got value " + e.getSampleValue().getValue().doubleValue());
assertTrue("Got a sample at or before the previous sample " + actualSeconds + " ! >= " + previousEpochSeconds, actualSeconds > previousEpochSeconds);
previousEpochSeconds = actualSeconds;
assertTrue("Got " + e.getSampleValue().getValue().doubleValue() + " expecting " + expectedValues[eventCount] + " at " + eventCount,
Math.abs(Math.abs(e.getSampleValue().getValue().doubleValue()) - Math.abs(expectedValues[eventCount])) < 0.001);
eventCount++;
}

assertTrue("Expecting " + expectedValues.length + " got " + eventCount + " for pv " + pvName, eventCount == expectedValues.length);
}
}
}

0 comments on commit 66e3d53

Please sign in to comment.