diff --git a/Gopkg.lock b/Gopkg.lock index e3f3c0a..8e67689 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -112,9 +112,10 @@ revision = "0967fc9aceab2ce9da34061253ac10fb99bba5b2" [[projects]] + branch = "master" name = "github.com/streadway/amqp" packages = ["."] - revision = "1c63b162b69cf6834e0b30e496c4cd5d6ee2a1d1" + revision = "70e15c650864f4fc47f5d3c82ea117285480895d" [[projects]] name = "golang.org/x/crypto" @@ -139,10 +140,10 @@ revision = "19e51611da83d6be54ddafce4a4af510cb3e9ea4" [[projects]] - name = "gopkg.in/cyverse-de/messaging.v4" + name = "gopkg.in/cyverse-de/messaging.v6" packages = ["."] - revision = "bef7609a2fe6e9418eeb36906929abee2298cf93" - version = "v4" + revision = "75e35ff8f1eb7027ead7ac9f1fdb5d0a51ed2bf4" + version = "v6" [[projects]] name = "gopkg.in/cyverse-de/model.v2" @@ -150,6 +151,12 @@ revision = "b6fae6c0f89c0d1ee641eef0016db8d54ee3ca7a" version = "v2.11" +[[projects]] + name = "gopkg.in/cyverse-de/model.v4" + packages = ["."] + revision = "51f67b05d6a692a14a233651ef2f5d49d1323133" + version = "v4.1" + [[projects]] branch = "v2" name = "gopkg.in/yaml.v2" @@ -159,6 +166,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "2b05889b3fed964ecd0fb2a0ca83eaa57440356066a39d7f82990cba2e92c9b6" + inputs-digest = "0ba31fd2cfc30f0929844a40753e501aee05b4980cd4147b2a35e4dc2229d667" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 65d6654..1570d25 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -38,12 +38,12 @@ version = "1.0.0" [[constraint]] - name = "gopkg.in/cyverse-de/messaging.v4" - version = "4.0.0" + name = "gopkg.in/cyverse-de/messaging.v6" + version = "6.0.0" [[constraint]] - name = "gopkg.in/cyverse-de/model.v2" - version = "2.11.0" + name = "gopkg.in/cyverse-de/model.v4" + version = "4.1.0" [[constraint]] branch = "v2" diff --git a/dcompose/dcompose.go b/dcompose/dcompose.go index c926bc6..30236c8 100644 --- a/dcompose/dcompose.go +++ b/dcompose/dcompose.go @@ -9,7 +9,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/viper" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/model.v4" ) // WORKDIR is the path to the working directory inside all of the containers @@ -83,6 +83,7 @@ type Service struct { CapDrop []string `yaml:"cap_drop,flow"` Command []string `yaml:",omitempty"` ContainerName string `yaml:"container_name,omitempty"` + CPUs string `yaml:"cpus,omitempty"` CPUSet string `yaml:"cpuset,omitempty"` CPUShares int64 `yaml:"cpu_shares,omitempty"` CPUQuota string `yaml:"cpu_quota,omitempty"` @@ -132,7 +133,7 @@ func New(ld string, pathprefix string) (*JobCompose, error) { } return &JobCompose{ - Version: "2.1", + Version: "2.2", Volumes: make(map[string]*Volume), Networks: make(map[string]*Network), Services: make(map[string]*Service), @@ -200,7 +201,7 @@ func (j *JobCompose) InitFromJob(job *model.Job, cfg *viper.Viper, workingdir st strings.Join([]string{excludesPath, excludesMount, "ro"}, ":"), ) - j.Services["upload_outputs"] = uploadOutputsSvc; + j.Services["upload_outputs"] = uploadOutputsSvc } func NewPorklockService(containertype int, invocationID, workingVolumeHostPath, porklockImageName string, porklockCommand []string) *Service { @@ -311,7 +312,9 @@ func (j *JobCompose) ConvertStep(step *model.Step, index int, user, invID, worki svc.MemLimit = strconv.FormatInt(stepContainer.MemoryLimit, 10) } - if stepContainer.CPUShares > 0 { + if stepContainer.MaxCPUCores > 0.0 { + svc.CPUs = fmt.Sprintf("%f", stepContainer.MaxCPUCores) + } else if stepContainer.CPUShares > 0 { svc.CPUShares = stepContainer.CPUShares } diff --git a/dcompose/dcompose_test.go b/dcompose/dcompose_test.go index 2450032..c63061b 100644 --- a/dcompose/dcompose_test.go +++ b/dcompose/dcompose_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/model.v4" yaml "gopkg.in/yaml.v2" ) @@ -62,10 +62,11 @@ var testJob = &model.Job{ }, Component: model.StepComponent{ Container: model.Container{ - ID: "container-id-1", - Name: "container-name-1", - CPUShares: 0, - PIDsLimit: 64, + ID: "container-id-1", + Name: "container-name-1", + CPUShares: 0, + MaxCPUCores: 2, + PIDsLimit: 64, Image: model.ContainerImage{ ID: "container-image-1", Name: "container-image-name-1", diff --git a/exit.go b/exit.go index aafcffc..7a4c388 100644 --- a/exit.go +++ b/exit.go @@ -6,7 +6,7 @@ import ( "strings" "github.com/spf13/viper" - "gopkg.in/cyverse-de/messaging.v4" + "gopkg.in/cyverse-de/messaging.v6" ) func cleanup(cfg *viper.Viper) { diff --git a/fs/fs.go b/fs/fs.go index b5912cc..6a05e66 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -9,7 +9,7 @@ import ( "path" "github.com/pkg/errors" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/model.v4" ) // adapted from https://talks.golang.org/2012/10things.slide#8 diff --git a/fs/fs_test.go b/fs/fs_test.go index fce26d2..35897e0 100644 --- a/fs/fs_test.go +++ b/fs/fs_test.go @@ -10,7 +10,7 @@ import ( "reflect" "testing" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/model.v4" ) type testFS struct { diff --git a/main.go b/main.go index e8fa1a4..c51f25f 100644 --- a/main.go +++ b/main.go @@ -27,8 +27,8 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/streadway/amqp" - "gopkg.in/cyverse-de/messaging.v4" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/messaging.v6" + "gopkg.in/cyverse-de/model.v4" ) var ( diff --git a/run.go b/run.go index f074371..15bb1f3 100644 --- a/run.go +++ b/run.go @@ -17,8 +17,8 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/viper" - "gopkg.in/cyverse-de/messaging.v4" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/messaging.v6" + "gopkg.in/cyverse-de/model.v4" ) // logrusProxyWriter will prevent diff --git a/status.go b/status.go index 9929b5c..3a27414 100644 --- a/status.go +++ b/status.go @@ -3,8 +3,8 @@ package main import ( "os" - "gopkg.in/cyverse-de/messaging.v4" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/messaging.v6" + "gopkg.in/cyverse-de/model.v4" ) func hostname() string { diff --git a/status_test.go b/status_test.go index 2a61830..573e6d9 100644 --- a/status_test.go +++ b/status_test.go @@ -4,8 +4,8 @@ import ( "errors" "testing" - "gopkg.in/cyverse-de/messaging.v4" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/messaging.v6" + "gopkg.in/cyverse-de/model.v4" ) type TestJobUpdatePublisher struct { diff --git a/vendor/github.com/streadway/amqp/.travis.yml b/vendor/github.com/streadway/amqp/.travis.yml index dffd66a..7166964 100644 --- a/vendor/github.com/streadway/amqp/.travis.yml +++ b/vendor/github.com/streadway/amqp/.travis.yml @@ -1,10 +1,8 @@ language: go go: - - 1.6 - - 1.7 - - 1.8 - - tip + - 1.9.x + - 1.10.x services: - rabbitmq diff --git a/vendor/github.com/streadway/amqp/README.md b/vendor/github.com/streadway/amqp/README.md index d7454e4..099db27 100644 --- a/vendor/github.com/streadway/amqp/README.md +++ b/vendor/github.com/streadway/amqp/README.md @@ -13,6 +13,11 @@ Future API changes are unlikely but possible. They will be discussed on [Github issues](https://github.com/streadway/amqp/issues) along with any bugs or enhancements. +## Supported Go Versions + +This library supports two most recent Go release series, currently 1.8 and 1.9. + + ## Supported RabbitMQ Versions This project supports RabbitMQ versions starting with `2.0` but primarily tested diff --git a/vendor/github.com/streadway/amqp/channel.go b/vendor/github.com/streadway/amqp/channel.go index c27b685..dd2552c 100644 --- a/vendor/github.com/streadway/amqp/channel.go +++ b/vendor/github.com/streadway/amqp/channel.go @@ -114,7 +114,7 @@ func (ch *Channel) shutdown(e *Error) { ch.errors <- e } - ch.consumers.closeAll() + ch.consumers.close() for _, c := range ch.closes { close(c) @@ -143,6 +143,7 @@ func (ch *Channel) shutdown(e *Error) { ch.confirms.Close() } + close(ch.errors) ch.noNotify = true }) } @@ -173,8 +174,11 @@ func (ch *Channel) call(req message, res ...message) error { if req.wait() { select { - case e := <-ch.errors: - return e + case e, ok := <-ch.errors: + if ok { + return e + } + return ErrClosed case msg := <-ch.rpc: if msg != nil { @@ -270,8 +274,13 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: - ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText)) + // lock before sending connection.close-ok + // to avoid unexpected interleaving with basic.publish frames if + // publishing is happening concurrently + ch.m.Lock() ch.send(&channelCloseOk{}) + ch.m.Unlock() + ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText)) case *channelFlow: ch.notifyM.RLock() @@ -287,7 +296,7 @@ func (ch *Channel) dispatch(msg message) { c <- m.ConsumerTag } ch.notifyM.RUnlock() - ch.consumers.close(m.ConsumerTag) + ch.consumers.cancel(m.ConsumerTag) case *basicReturn: ret := newReturn(*m) @@ -447,8 +456,8 @@ func (ch *Channel) NotifyClose(c chan *Error) chan *Error { /* NotifyFlow registers a listener for basic.flow methods sent by the server. -When `true` is sent on one of the listener channels, all publishers should -pause until a `false` is sent. +When `false` is sent on one of the listener channels, all publishers should +pause until a `true` is sent. The server may ask the producer to pause or restart the flow of Publishings sent by on a channel. This is a simple flow-control mechanism that a server can @@ -615,7 +624,11 @@ started with noAck. When global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel. -RabbitMQ does not implement the global flag. + +Please see the RabbitMQ Consumer Prefetch documentation for an explanation of +how the global flag is implemented in RabbitMQ, as it differs from the +AMQP 0.9.1 specification in that global Qos settings are limited in scope to +channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html). To get round-robin behavior between consumers consuming from the same queue on different connections, set the prefetch count to 1, and the next available @@ -670,10 +683,10 @@ func (ch *Channel) Cancel(consumer string, noWait bool) error { } if req.wait() { - ch.consumers.close(res.ConsumerTag) + ch.consumers.cancel(res.ConsumerTag) } else { // Potentially could drop deliveries in flight - ch.consumers.close(consumer) + ch.consumers.cancel(consumer) } return nil @@ -807,7 +820,7 @@ func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusi QueueInspect passively declares a queue by name to inspect the current message count and consumer count. -Use this method to check how many unacknowledged messages reside in the queue, +Use this method to check how many messages ready for delivery reside in the queue, how many consumers are receiving deliveries, and whether a queue by this name already exists. @@ -1000,16 +1013,18 @@ included in every Delivery in the ConsumerTag field When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When -autoAck is true, the consumer should not call Delivery.Ack. Automatically +autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. +See http://www.rabbitmq.com/confirms.html for more details. When exclusive is true, the server will ensure that this is the sole consumer -from this queue. When exclusive is false, the server will fairly distribute +from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers. -When noLocal is true, the server will not deliver publishing sent from the same -connection to this consumer. It's advisable to use separate connections for +The noLocal flag is not supported by RabbitMQ. + +It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness. @@ -1021,11 +1036,14 @@ exception will be raised and the channel will be closed. Optional arguments can be provided that have specific semantics for the queue or server. -When the channel or connection closes, all delivery chans will also close. +Inflight messages, limited by Channel.Qos will be buffered until received from +the returned chan. -Deliveries on the returned chan will be buffered indefinitely. To limit memory -of this buffer, use the Channel.Qos method to limit the amount of -unacknowledged/buffered deliveries the server will deliver on this Channel. +When the Channel or Connection is closed, all buffered and inflight messages will +be dropped. + +When the consumer tag is cancelled, all inflight messages will be delivered until +the returned chan is closed. */ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { @@ -1057,7 +1075,7 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, ch.consumers.add(consumer, deliveries) if err := ch.call(req, res); err != nil { - ch.consumers.close(consumer) + ch.consumers.cancel(consumer) return nil, err } @@ -1429,12 +1447,12 @@ func (ch *Channel) TxRollback() error { /* Flow pauses the delivery of messages to consumers on this channel. Channels -are opened with flow control not active, to open a channel with paused -deliveries immediately call this method with true after calling +are opened with flow control active, to open a channel with paused +deliveries immediately call this method with `false` after calling Connection.Channel. -When active is true, this method asks the server to temporarily pause deliveries -until called again with active as false. +When active is `false`, this method asks the server to temporarily pause deliveries +until called again with active as `true`. Channel.Get methods will not be affected by flow control. @@ -1443,7 +1461,7 @@ the number of unacknowledged messages or bytes in flight instead. The server may also send us flow methods to throttle our publishings. A well behaving publishing client should add a listener with Channel.NotifyFlow and -pause its publishings when true is sent on that channel. +pause its publishings when `false` is sent on that channel. Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate @@ -1527,6 +1545,9 @@ is true. See also Delivery.Ack */ func (ch *Channel) Ack(tag uint64, multiple bool) error { + ch.m.Lock() + defer ch.m.Unlock() + return ch.send(&basicAck{ DeliveryTag: tag, Multiple: multiple, @@ -1541,6 +1562,9 @@ it must be redelivered or dropped. See also Delivery.Nack */ func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error { + ch.m.Lock() + defer ch.m.Unlock() + return ch.send(&basicNack{ DeliveryTag: tag, Multiple: multiple, diff --git a/vendor/github.com/streadway/amqp/connection.go b/vendor/github.com/streadway/amqp/connection.go index 8a84fd2..ca1372d 100644 --- a/vendor/github.com/streadway/amqp/connection.go +++ b/vendor/github.com/streadway/amqp/connection.go @@ -25,8 +25,10 @@ const ( defaultConnectionTimeout = 30 * time.Second defaultProduct = "https://github.com/streadway/amqp" defaultVersion = "β" - defaultChannelMax = maxChannelMax - defaultLocale = "en_US" + // Safer default that makes channel leaks a lot easier to spot + // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. + defaultChannelMax = (2 << 10) - 1 + defaultLocale = "en_US" ) // Config is used in DialConfig and Open to specify the desired tuning @@ -71,7 +73,7 @@ type Config struct { // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. All RPC methods and -// asyncronous Publishing, Delivery, Ack, Nack and Return messages are +// asynchronous Publishing, Delivery, Ack, Nack and Return messages are // multiplexed on this channel. There must always be active receivers for // every asynchronous message on this connection. type Connection struct { @@ -256,7 +258,7 @@ func (c *Connection) ConnectionState() tls.ConnectionState { /* NotifyClose registers a listener for close events either initiated by an error -accompaning a connection.close method or by a normal shutdown. +accompanying a connection.close method or by a normal shutdown. On normal shutdowns, the chan will be closed. diff --git a/vendor/github.com/streadway/amqp/consumers.go b/vendor/github.com/streadway/amqp/consumers.go index 097a15b..887ac74 100644 --- a/vendor/github.com/streadway/amqp/consumers.go +++ b/vendor/github.com/streadway/amqp/consumers.go @@ -6,16 +6,30 @@ package amqp import ( - "fmt" "os" + "strconv" "sync" "sync/atomic" ) var consumerSeq uint64 +const consumerTagLengthMax = 0xFF // see writeShortstr + func uniqueConsumerTag() string { - return fmt.Sprintf("ctag-%s-%d", os.Args[0], atomic.AddUint64(&consumerSeq, 1)) + return commandNameBasedUniqueConsumerTag(os.Args[0]) +} + +func commandNameBasedUniqueConsumerTag(commandName string) string { + tagPrefix := "ctag-" + tagInfix := commandName + tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10) + + if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax { + tagInfix = "streadway/amqp" + } + + return tagPrefix + tagInfix + tagSuffix } type consumerBuffers map[string]chan *Delivery @@ -23,42 +37,48 @@ type consumerBuffers map[string]chan *Delivery // Concurrent type that manages the consumerTag -> // ingress consumerBuffer mapping type consumers struct { - sync.Mutex - chans consumerBuffers + sync.WaitGroup // one for buffer + closed chan struct{} // signal buffer + + sync.Mutex // protects below + chans consumerBuffers } func makeConsumers() *consumers { - return &consumers{chans: make(consumerBuffers)} + return &consumers{ + closed: make(chan struct{}), + chans: make(consumerBuffers), + } } -func bufferDeliveries(in chan *Delivery, out chan Delivery) { +func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { + defer close(out) + defer subs.Done() + + var inflight = in var queue []*Delivery - var queueIn = in for delivery := range in { - select { - case out <- *delivery: - // delivered immediately while the consumer chan can receive - default: - queue = append(queue, delivery) - } + queue = append(queue, delivery) for len(queue) > 0 { select { - case out <- *queue[0]: - queue = queue[1:] - case delivery, open := <-queueIn: - if open { + case <-subs.closed: + // closed before drained, drop in-flight + return + + case delivery, consuming := <-inflight: + if consuming { queue = append(queue, delivery) } else { - // stop receiving to drain the queue - queueIn = nil + inflight = nil } + + case out <- *queue[0]: + queue = queue[1:] } } } - - close(out) } // On key conflict, close the previous channel. @@ -71,12 +91,13 @@ func (subs *consumers) add(tag string, consumer chan Delivery) { } in := make(chan *Delivery) - go bufferDeliveries(in, consumer) - subs.chans[tag] = in + + subs.Add(1) + go subs.buffer(in, consumer) } -func (subs *consumers) close(tag string) (found bool) { +func (subs *consumers) cancel(tag string) (found bool) { subs.Lock() defer subs.Unlock() @@ -90,15 +111,18 @@ func (subs *consumers) close(tag string) (found bool) { return found } -func (subs *consumers) closeAll() { +func (subs *consumers) close() { subs.Lock() defer subs.Unlock() - for _, ch := range subs.chans { + close(subs.closed) + + for tag, ch := range subs.chans { + delete(subs.chans, tag) close(ch) } - subs.chans = make(consumerBuffers) + subs.Wait() } // Sends a delivery to a the consumer identified by `tag`. diff --git a/vendor/github.com/streadway/amqp/delivery.go b/vendor/github.com/streadway/amqp/delivery.go index 7a4e5a6..304c834 100644 --- a/vendor/github.com/streadway/amqp/delivery.go +++ b/vendor/github.com/streadway/amqp/delivery.go @@ -36,7 +36,7 @@ type Delivery struct { DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) Priority uint8 // queue implementation use - 0 to 9 CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) + ReplyTo string // application use - address to reply to (ex: RPC) Expiration string // implementation use - message expiration spec MessageId string // application use - message identifier Timestamp time.Time // application use - message timestamp diff --git a/vendor/github.com/streadway/amqp/uri.go b/vendor/github.com/streadway/amqp/uri.go index ee006b3..35fefdc 100644 --- a/vendor/github.com/streadway/amqp/uri.go +++ b/vendor/github.com/streadway/amqp/uri.go @@ -7,7 +7,7 @@ package amqp import ( "errors" - "fmt" + "net" "net/url" "strconv" "strings" @@ -72,7 +72,8 @@ func ParseURI(uri string) (URI, error) { return builder, errURIScheme } - host, port := splitHostPort(u.Host) + host := u.Hostname() + port := u.Port() if host != "" { builder.Host = host @@ -115,29 +116,6 @@ func ParseURI(uri string) (URI, error) { return builder, nil } -// Splits host:port, host, [ho:st]:port, or [ho:st]. Unlike net.SplitHostPort -// which splits :port, host:port or [host]:port -// -// Handles hosts that have colons that are in brackets like [::1]:http -func splitHostPort(addr string) (host, port string) { - i := strings.LastIndex(addr, ":") - - if i >= 0 { - host, port = addr[:i], addr[i+1:] - - if len(port) > 0 && port[len(port)-1] == ']' && addr[0] == '[' { - // we've split on an inner colon, the port was missing outside of the - // brackets so use the full addr. We could assert that host should not - // contain any colons here - host, port = addr, "" - } - } else { - host = addr - } - - return -} - // PlainAuth returns a PlainAuth structure based on the parsed URI's // Username and Password fields. func (uri URI) PlainAuth() *PlainAuth { @@ -148,28 +126,42 @@ func (uri URI) PlainAuth() *PlainAuth { } func (uri URI) String() string { - var authority string + authority, err := url.Parse("") + if err != nil { + return err.Error() + } + + authority.Scheme = uri.Scheme if uri.Username != defaultURI.Username || uri.Password != defaultURI.Password { - authority += uri.Username + authority.User = url.User(uri.Username) if uri.Password != defaultURI.Password { - authority += ":" + uri.Password + authority.User = url.UserPassword(uri.Username, uri.Password) } - - authority += "@" } - authority += uri.Host + authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) if defaultPort, found := schemePorts[uri.Scheme]; !found || defaultPort != uri.Port { - authority += ":" + strconv.FormatInt(int64(uri.Port), 10) + authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) + } else { + // JoinHostPort() automatically add brackets to the host if it's + // an IPv6 address. + // + // If not port is specified, JoinHostPort() return an IP address in the + // form of "[::1]:", so we use TrimSuffix() to remove the extra ":". + authority.Host = strings.TrimSuffix(net.JoinHostPort(uri.Host, ""), ":") } - var vhost string if uri.Vhost != defaultURI.Vhost { - vhost = uri.Vhost + // Make sure net/url does not double escape, e.g. + // "%2F" does not become "%252F". + authority.Path = uri.Vhost + authority.RawPath = url.QueryEscape(uri.Vhost) + } else { + authority.Path = "/" } - return fmt.Sprintf("%s://%s/%s", uri.Scheme, authority, url.QueryEscape(vhost)) + return authority.String() } diff --git a/vendor/gopkg.in/cyverse-de/messaging.v4/Dockerfile b/vendor/gopkg.in/cyverse-de/messaging.v4/Dockerfile deleted file mode 100644 index 89c16e1..0000000 --- a/vendor/gopkg.in/cyverse-de/messaging.v4/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM golang:1.7-alpine - -RUN apk update && apk add git - -RUN go get github.com/jstemmer/go-junit-report - -RUN go get github.com/olebedev/config -RUN go get github.com/cyverse-de/logcabin -RUN go get github.com/cyverse-de/model -RUN go get github.com/cyverse-de/configurate -RUN go get github.com/streadway/amqp - -COPY . /go/src/github.com/cyverse-de/messaging - -CMD go test -v github.com/cyverse-de/messaging | tee /dev/stderr | go-junit-report diff --git a/vendor/gopkg.in/cyverse-de/messaging.v6/.gitignore b/vendor/gopkg.in/cyverse-de/messaging.v6/.gitignore new file mode 100644 index 0000000..48b8bf9 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/messaging.v6/.gitignore @@ -0,0 +1 @@ +vendor/ diff --git a/vendor/gopkg.in/cyverse-de/messaging.v6/Dockerfile b/vendor/gopkg.in/cyverse-de/messaging.v6/Dockerfile new file mode 100644 index 0000000..1b27667 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/messaging.v6/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:1.10.2 + +COPY . /go/src/github.com/cyverse-de/messaging + +RUN curl -LO https://raw.githubusercontent.com/golang/dep/master/install.sh && \ + chmod +x install.sh && \ + ./install.sh + +WORKDIR /go/src/github.com/cyverse-de/messaging + +RUN go get github.com/jstemmer/go-junit-report + +RUN dep ensure + +CMD go test -v github.com/cyverse-de/messaging | tee /dev/stderr | go-junit-report diff --git a/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.lock b/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.lock new file mode 100644 index 0000000..847a3ab --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.lock @@ -0,0 +1,126 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/cyverse-de/model" + packages = ["submitfile"] + revision = "9435ba8c8862b1ced3af7d1bdb8b07ed6316a2aa" + version = "v3.0" + +[[projects]] + name = "github.com/fsnotify/fsnotify" + packages = ["."] + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/hcl" + packages = [ + ".", + "hcl/ast", + "hcl/parser", + "hcl/printer", + "hcl/scanner", + "hcl/strconv", + "hcl/token", + "json/parser", + "json/scanner", + "json/token" + ] + revision = "ef8a98b0bbce4a65b5aa4c368430a80ddc533168" + +[[projects]] + name = "github.com/magiconair/properties" + packages = ["."] + revision = "c2353362d570a7bfa228149c62842019201cfb71" + version = "v1.8.0" + +[[projects]] + branch = "master" + name = "github.com/mitchellh/mapstructure" + packages = ["."] + revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" + +[[projects]] + name = "github.com/pelletier/go-toml" + packages = ["."] + revision = "acdc4509485b587f5e675510c4f2c63e90ff68a8" + version = "v1.1.0" + +[[projects]] + name = "github.com/spf13/afero" + packages = [ + ".", + "mem" + ] + revision = "63644898a8da0bc22138abf860edaf5277b6102e" + version = "v1.1.0" + +[[projects]] + name = "github.com/spf13/cast" + packages = ["."] + revision = "8965335b8c7107321228e3e3702cab9832751bac" + version = "v1.2.0" + +[[projects]] + branch = "master" + name = "github.com/spf13/jwalterweatherman" + packages = ["."] + revision = "7c0cea34c8ece3fbeb2b27ab9b59511d360fb394" + +[[projects]] + name = "github.com/spf13/pflag" + packages = ["."] + revision = "583c0c0531f06d5278b7d917446061adc344b5cd" + version = "v1.0.1" + +[[projects]] + name = "github.com/spf13/viper" + packages = ["."] + revision = "b5e8006cbee93ec955a89ab31e0e3ce3204f3736" + version = "v1.0.2" + +[[projects]] + branch = "master" + name = "github.com/streadway/amqp" + packages = ["."] + revision = "e5adc2ada8b8efff032bf61173a233d143e9318e" + +[[projects]] + branch = "master" + name = "golang.org/x/sys" + packages = ["unix"] + revision = "c11f84a56e43e20a78cee75a7c034031ecf57d1f" + +[[projects]] + name = "golang.org/x/text" + packages = [ + "internal/gen", + "internal/triegen", + "internal/ucd", + "transform", + "unicode/cldr", + "unicode/norm" + ] + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" + version = "v0.3.0" + +[[projects]] + name = "gopkg.in/cyverse-de/model.v4" + packages = ["."] + revision = "36b13d37bb4126f4b525a393512b0f420fea5ab4" + version = "v4.0" + +[[projects]] + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" + version = "v2.2.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "208c136ad16df30386037404a0d8a7c050fcc8a18d40157230f154027390fcc8" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.toml b/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.toml new file mode 100644 index 0000000..edc1ab1 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/messaging.v6/Gopkg.toml @@ -0,0 +1,38 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + branch = "master" + name = "github.com/streadway/amqp" + +[prune] + go-tests = true + unused-packages = true + +[[constraint]] + name = "gopkg.in/cyverse-de/model.v4" + version = "4.0.0" diff --git a/vendor/gopkg.in/cyverse-de/messaging.v4/Jenkinsfile b/vendor/gopkg.in/cyverse-de/messaging.v6/Jenkinsfile similarity index 100% rename from vendor/gopkg.in/cyverse-de/messaging.v4/Jenkinsfile rename to vendor/gopkg.in/cyverse-de/messaging.v6/Jenkinsfile diff --git a/vendor/gopkg.in/cyverse-de/messaging.v4/LICENSE b/vendor/gopkg.in/cyverse-de/messaging.v6/LICENSE similarity index 100% rename from vendor/gopkg.in/cyverse-de/messaging.v4/LICENSE rename to vendor/gopkg.in/cyverse-de/messaging.v6/LICENSE diff --git a/vendor/gopkg.in/cyverse-de/messaging.v4/amqp.go b/vendor/gopkg.in/cyverse-de/messaging.v6/amqp.go similarity index 99% rename from vendor/gopkg.in/cyverse-de/messaging.v4/amqp.go rename to vendor/gopkg.in/cyverse-de/messaging.v6/amqp.go index 5276633..f53e402 100644 --- a/vendor/gopkg.in/cyverse-de/messaging.v4/amqp.go +++ b/vendor/gopkg.in/cyverse-de/messaging.v6/amqp.go @@ -12,7 +12,7 @@ import ( "strconv" "time" - "gopkg.in/cyverse-de/model.v2" + "gopkg.in/cyverse-de/model.v4" "github.com/streadway/amqp" ) diff --git a/vendor/gopkg.in/cyverse-de/messaging.v4/service.properties b/vendor/gopkg.in/cyverse-de/messaging.v6/service.properties similarity index 100% rename from vendor/gopkg.in/cyverse-de/messaging.v4/service.properties rename to vendor/gopkg.in/cyverse-de/messaging.v6/service.properties diff --git a/vendor/gopkg.in/cyverse-de/model.v4/.gitignore b/vendor/gopkg.in/cyverse-de/model.v4/.gitignore new file mode 100644 index 0000000..f1160bf --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/.gitignore @@ -0,0 +1,3 @@ +*.swp +*~ +*.iml diff --git a/vendor/gopkg.in/cyverse-de/model.v4/Dockerfile b/vendor/gopkg.in/cyverse-de/model.v4/Dockerfile new file mode 100644 index 0000000..13852e3 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.7-alpine + +RUN apk update && apk add git + +RUN go get github.com/jstemmer/go-junit-report + +RUN go get github.com/spf13/viper +RUN go get github.com/cyverse-de/configurate + +COPY . /go/src/github.com/cyverse-de/model + +CMD go test -v github.com/cyverse-de/model | tee /dev/stderr | go-junit-report diff --git a/vendor/gopkg.in/cyverse-de/model.v4/Jenkinsfile b/vendor/gopkg.in/cyverse-de/model.v4/Jenkinsfile new file mode 100644 index 0000000..2f35d98 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/Jenkinsfile @@ -0,0 +1,52 @@ +#!groovy +node('docker') { + slackJobDescription = "job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' (${env.BUILD_URL})" + try { + stage "Build" + checkout scm + + service = readProperties file: 'service.properties' + + descriptive_version = sh(returnStdout: true, script: 'git describe --long --tags --dirty --always').trim() + echo descriptive_version + + dockerRepo = "test-${env.BUILD_TAG}" + + sh "docker build --rm -t ${dockerRepo} ." + + dockerTestRunner = "test-${env.BUILD_TAG}" + dockerTestCleanup = "test-cleanup-${env.BUILD_TAG}" + try { + stage "Test" + try { + sh "docker run --name ${dockerTestRunner} --rm ${dockerRepo} > test-results.xml" + } finally { + junit 'test-results.xml' + + sh "docker run --rm --name ${dockerTestCleanup} -v \$(pwd):/build -w /build alpine rm -r test-results.xml" + } + } finally { + sh returnStatus: true, script: "docker kill ${dockerTestRunner}" + sh returnStatus: true, script: "docker rm ${dockerTestRunner}" + + sh returnStatus: true, script: "docker kill ${dockerTestCleanup}" + sh returnStatus: true, script: "docker rm ${dockerTestCleanup}" + + sh returnStatus: true, script: "docker rmi ${dockerRepo}" + + step([$class: 'hudson.plugins.jira.JiraIssueUpdater', + issueSelector: [$class: 'hudson.plugins.jira.selector.DefaultIssueSelector'], + scm: scm, + labels: [ "${service.repo}-${descriptive_version}" ]]) + } + } catch (InterruptedException e) { + currentBuild.result = "ABORTED" + slackSend color: 'warning', message: "ABORTED: ${slackJobDescription}" + throw e + } catch (e) { + currentBuild.result = "FAILED" + sh "echo ${e}" + slackSend color: 'danger', message: "FAILED: ${slackJobDescription}" + throw e + } +} diff --git a/vendor/gopkg.in/cyverse-de/model.v4/LICENSE b/vendor/gopkg.in/cyverse-de/model.v4/LICENSE new file mode 100644 index 0000000..8cfdf58 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/LICENSE @@ -0,0 +1,37 @@ +Copyright (c) 2011, The Arizona Board of Regents on behalf of +The University of Arizona + +All rights reserved. + +Developed by: iPlant Collaborative as a collaboration between +participants at BIO5 at The University of Arizona (the primary hosting +institution), Cold Spring Harbor Laboratory, The University of Texas at +Austin, and individual contributors. Find out more at +http://www.iplantcollaborative.org/. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the iPlant Collaborative, BIO5, The University + of Arizona, Cold Spring Harbor Laboratory, The University of Texas at + Austin, nor the names of other contributors may be used to endorse or + promote products derived from this software without specific prior + written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/gopkg.in/cyverse-de/model.v4/container.go b/vendor/gopkg.in/cyverse-de/model.v4/container.go new file mode 100644 index 0000000..8f1d8d5 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/container.go @@ -0,0 +1,89 @@ +package model + +// Volume describes how a local path is mounted into a container. +type Volume struct { + HostPath string `json:"host_path"` + ContainerPath string `json:"container_path"` + ReadOnly bool `json:"read_only"` + Mode string `json:"mode"` +} + +// Ports contains port mapping information for a container. The ports should be +// parseable as an integer. Callers should not provide interface information, +// that will be handled by the services. +type Ports struct { + HostPort int `json:"host_port"` + ContainerPort int `json:"container_port"` + BindToHost bool `json:"bind_to_host"` +} + +// Device describes the mapping between a host device and the container device. +type Device struct { + HostPath string `json:"host_path"` + ContainerPath string `json:"container_path"` + CgroupPermissions string `json:"cgroup_permissions"` +} + +// VolumesFrom describes a container that volumes are imported from. +type VolumesFrom struct { + Tag string `json:"tag"` + Name string `json:"name"` + Auth string `json:"auth"` + NamePrefix string `json:"name_prefix"` + URL string `json:"url"` + HostPath string `json:"host_path"` + ContainerPath string `json:"container_path"` + ReadOnly bool `json:"read_only"` +} + +// ContainerImage describes a docker container image. +type ContainerImage struct { + ID string `json:"id"` + Name string `json:"name"` + Tag string `json:"tag"` + Auth string `json:"auth"` + URL string `json:"url"` + OSGImagePath string `json:"osg_image_path"` +} + +// Container describes a container used as part of a DE job. +type Container struct { + ID string `json:"id"` + Volumes []Volume `json:"container_volumes"` + Devices []Device `json:"container_devices"` + VolumesFrom []VolumesFrom `json:"container_volumes_from"` + Name string `json:"name"` + NetworkMode string `json:"network_mode"` + CPUShares int64 `json:"cpu_shares"` + InteractiveApps InteractiveApps `json:"interactive_apps"` + MemoryLimit int64 `json:"memory_limit"` // The maximum the container is allowed to have. + MinMemoryLimit int64 `json:"min_memory_limit"` // The minimum the container needs. + MaxCPUCores float32 `json:"max_cpu_cores"` // The maximum number of cores the container needs. + MinCPUCores float32 `json:"min_cpu_cores"` // The minimum number of cores the container needs. + MinDiskSpace int64 `json:"min_disk_space"` // The minimum amount of disk space that the container needs. + PIDsLimit int64 `json:"pids_limit"` + Image ContainerImage `json:"image"` + EntryPoint string `json:"entrypoint"` + WorkingDir string `json:"working_directory"` + Ports []Ports `json:"ports"` + SkipTmpMount bool `json:"skip_tmp_mount"` + UID int `json:"uid"` +} + +// WorkingDirectory returns the container's working directory. Defaults to +// /de-app-work if the job submission didn't specify one. Use this function +// rather than accessing the field directly. +func (c *Container) WorkingDirectory() string { + if c.WorkingDir == "" { + return "/de-app-work" + } + return c.WorkingDir +} + +// UsesVolumes returns a boolean value which indicates if a container uses host-mounted volumes +func (c *Container) UsesVolumes() bool { + if len(c.Volumes) > 0 { + return true + } + return false +} diff --git a/vendor/gopkg.in/cyverse-de/model.v4/interapps.go b/vendor/gopkg.in/cyverse-de/model.v4/interapps.go new file mode 100644 index 0000000..0c31476 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/interapps.go @@ -0,0 +1,43 @@ +package model + +// InteractiveApps contains the settings needed for interactive apps across all +// steps in a Job. +type InteractiveApps struct { + // The docker image for the reverse proxy that runs on the cluster with the + // job steps. + ProxyImage string `json:"proxy_image"` + + // The name of the container for the reverse proxy. + ProxyName string `json:"proxy_name"` + + // The URL for the frontend of the application. Will get prefixed with the job + // id. + FrontendURL string `json:"frontend_url"` + + // The base URL for the CAS server. + CASURL string `json:"cas_url"` + + // The path to the validate endpoint on the CAS server. + CASValidate string `json:"cas_validate"` + + // The path to the SSL cert file on the Condor nodes. + SSLCertPath string `json:"ssl_cert_path"` + + // The path to the SSL key file on the Condor nodes. + SSLKeyPath string `json:"ssl_key_path"` + + // If websocket handling requires a special path in the app. The default is to + // have this be empty. + WebsocketPath string `json:"websocket_path"` + + // If websocket handling requires a special port in the app. The default is to + // use the same port as the backend URL. + WebsocketPort string `json:"websocket_port"` + + // If websocket handling requires a protocol other than ws://. + WebsocketProto string `json:"websocket_proto"` + + // Only used if you need to override the default backendURL, which should be + // http://. + BackendURL string `json:"backend_url"` +} diff --git a/vendor/gopkg.in/cyverse-de/model.v4/io.go b/vendor/gopkg.in/cyverse-de/model.v4/io.go new file mode 100644 index 0000000..ea6d6a5 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/io.go @@ -0,0 +1,118 @@ +package model + +import ( + "fmt" + "path" + "strings" +) + +// StepInput describes a single input for a job step. +type StepInput struct { + ID string `json:"id"` + Ticket string `json:"ticket"` + Multiplicity string `json:"multiplicity"` + Name string `json:"name"` + Property string `json:"property"` + Retain bool `json:"retain"` + Type string `json:"type"` + Value string `json:"value"` +} + +// IRODSPath returns a string containing the iRODS path to an input file. +func (i *StepInput) IRODSPath() string { + if i.Multiplicity == "collection" { + if !strings.HasSuffix(i.Value, "/") { + return fmt.Sprintf("%s/", i.Value) + } + } + return i.Value +} + +// Identifier returns a string containing the input job's identifier in the +// format "input-" +func (i *StepInput) Identifier(suffix string) string { + return fmt.Sprintf("input-%s", suffix) +} + +// Stdout returns a string containing the path to the input job's stdout file. +// It should be a relative path in the format "logs/logs-stdout-" +func (i *StepInput) Stdout(suffix string) string { + return path.Join("logs", fmt.Sprintf("logs-stdout-%s", i.Identifier(suffix))) +} + +// Stderr returns a string containing the path to the input job's stderr file. +// It should be a relative path in the format "logs/logs-stderr-" +func (i *StepInput) Stderr(suffix string) string { + return path.Join("logs", fmt.Sprintf("logs-stderr-%s", i.Identifier(suffix))) +} + +// LogPath returns the path to the Condor log file for the input job. The returned +// path will be in the format "/logs/logs-condor-" +func (i *StepInput) LogPath(parent, suffix string) string { + return path.Join(parent, "logs", fmt.Sprintf("logs-condor-%s", i.Identifier(suffix))) +} + +// Source returns the path to the local filename of the input file. +func (i *StepInput) Source() string { + value := path.Base(i.Value) + if i.Multiplicity == "collection" { + if !strings.HasSuffix(value, "/") { + return fmt.Sprintf("%s/", value) + } + } + return value +} + +// InputSourceListArguments returns the porklock settings needed for a get command with an input path list. +func (j *Job) InputSourceListArguments(sourceListPath string) []string { + args := []string{ + "get", + "--user", j.Submitter, + "--source-list", sourceListPath, + } + + for _, m := range MetadataArgs(j.FileMetadata).FileMetadataArguments() { + args = append(args, m) + } + + return args +} + +// Arguments returns the porklock settings needed for the input operation. +func (i *StepInput) Arguments(username string, metadata []FileMetadata) []string { + args := []string{ + "get", + "--user", username, + "--source", i.IRODSPath(), + } + + for _, m := range MetadataArgs(metadata).FileMetadataArguments() { + args = append(args, m) + } + + return args +} + +// StepOutput describes a single output for a job step. +type StepOutput struct { + Multiplicity string `json:"multiplicity"` + Name string `json:"name"` + Property string `json:"property"` + QualID string `json:"qual-id"` + Retain bool `json:"retain"` + Type string `json:"type"` +} + +// Source returns the path to the local filename for the output file. +func (o *StepOutput) Source() string { + value := o.Name + if o.Multiplicity == "collection" { + if !path.IsAbs(value) { + value = fmt.Sprintf("/de-app-work/%s", value) + } + if !strings.HasSuffix(value, "/") { + value = fmt.Sprintf("%s/", value) + } + } + return value +} diff --git a/vendor/gopkg.in/cyverse-de/model.v4/jobs.go b/vendor/gopkg.in/cyverse-de/model.v4/jobs.go new file mode 100644 index 0000000..8f36c73 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/jobs.go @@ -0,0 +1,470 @@ +package model + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "path" + "regexp" + "strings" + "time" + + "github.com/cyverse-de/model/submitfile" + "github.com/spf13/viper" +) + +var ( + validName = regexp.MustCompile(`-\d{4}(?:-\d{2}){5}\.\d+$`) // this isn't included in the Dirname() function so it isn't re-evaluated a lot + quoteStr = regexp.MustCompile(`^''|''$`) +) + +const ( + nowfmt = "2006-01-02-15-04-05.000" // appears in file and directory names. + + //DockerLabelKey is the key for the labels applied to all containers associated with a job. + DockerLabelKey = "org.iplantc.analysis" +) + +// naivelyquote single-quotes a string that will be placed on the command line +// using plain string substitution. This works, but may leave extra pairs +// of leading or trailing quotes if there was a leading or trailing quote +// in the original string, which is valid, but may be confusing to human +// readers. +func naivelyquote(s string) string { + return fmt.Sprintf("'%s'", strings.Replace(s, "'", "''", -1)) +} + +// quote quotes and escapes a string that is supposed to be passed in to a tool on +// the command line. +func quote(s string) string { + return quoteStr.ReplaceAllString(naivelyquote(s), "") +} + +// ExtractJobID pulls the job id from the given []byte, if it exists. Returns +// an empty []byte if it doesn't. +func ExtractJobID(output []byte) []byte { + extractor := regexp.MustCompile(`submitted to cluster ((\d+)+)`) + matches := extractor.FindAllSubmatch(output, -1) + var thematch []byte + if len(matches) > 0 { + if len(matches[0]) > 1 { + thematch = matches[0][1] + } + } + return thematch +} + +// Job is a type that contains info that goes into the jobs table. +type Job struct { + AppDescription string `json:"app_description"` + AppID string `json:"app_id"` + AppName string `json:"app_name"` + ArchiveLogs bool `json:"archive_logs"` + ID string `json:"id"` + BatchID string `json:"batch_id"` + CondorID string `json:"condor_id"` + CondorLogPath string `json:"condor_log_path"` //comes from config, not upstream service + CreateOutputSubdir bool `json:"create_output_subdir"` + DateSubmitted time.Time `json:"date_submitted"` + DateStarted time.Time `json:"date_started"` + DateCompleted time.Time `json:"date_completed"` + Description string `json:"description"` + Email string `json:"email"` + ExecutionTarget string `json:"execution_target"` + ExitCode int `json:"exit_code"` + FailureCount int64 `json:"failure_count"` + FailureThreshold int64 `json:"failure_threshold"` + FileMetadata []FileMetadata `json:"file-metadata"` + FilterFiles []string `json:"filter_files"` //comes from config, not upstream service + Group string `json:"group"` //untested for now + InputPathListFile string `json:"input_path_list"` //path to a list of inputs (not from upstream). + InputTicketsFile string `json:"input_ticket_list"` //path to a list of inputs with tickets (not from upstream). + InvocationID string `json:"uuid"` + IRODSBase string `json:"irods_base"` + Name string `json:"name"` + NFSBase string `json:"nfs_base"` + Notify bool `json:"notify"` + NowDate string `json:"now_date"` + OutputDir string `json:"output_dir"` //the value parsed out of the JSON. Use OutputDirectory() instead. + OutputDirTicket string `json:"output_dir_ticket"` //the write ticket for output_dir (assumes output_dir is set correctly). + OutputTicketFile string `json:"output_ticket_list"` //path to the file of the output dest with ticket (not from upstream). + RequestType string `json:"request_type"` + RunOnNFS bool `json:"run-on-nfs"` + SkipParentMetadata bool `json:"skip-parent-meta"` + Steps []Step `json:"steps"` + SubmissionDate string `json:"submission_date"` + Submitter string `json:"username"` + Type string `json:"type"` + UserID string `json:"user_id"` + UserGroups []string `json:"user_groups"` + WikiURL string `json:"wiki_url"` + ConfigFile string `json:"config_file"` //path to the job configuration file (not from upstream) +} + +// New returns a pointer to a newly instantiated Job with NowDate set. +// Accesses the following configuration settings: +// * condor.log_path +// * condor.filter_files +// * irods.base +func New(cfg *viper.Viper) *Job { + n := time.Now().Format(nowfmt) + lp := cfg.GetString("condor.log_path") + var paths []string + filterFiles := cfg.GetString("condor.filter_files") + for _, filter := range strings.Split(filterFiles, ",") { + paths = append(paths, filter) + } + irodsBase := cfg.GetString("irods.base") + return &Job{ + NowDate: n, + SubmissionDate: n, + ArchiveLogs: true, + CondorLogPath: lp, + FilterFiles: paths, + IRODSBase: irodsBase, + } +} + +// NewFromData creates a new submission and populates it by parsing the passed +// in []byte as JSON. +func NewFromData(cfg *viper.Viper, data []byte) (*Job, error) { + var err error + s := New(cfg) + err = json.Unmarshal(data, s) + if err != nil { + return nil, err + } + s.Sanitize() + s.AddRequiredMetadata() + return s, err +} + +// sanitize replaces @ and spaces with _, making a string safe to use as a +// part of a path. Mostly to keep things from getting really confusing when +// a path is passed to Condor. +func sanitize(s string) string { + step := strings.Replace(s, "@", "_", -1) + step = strings.Replace(step, " ", "_", -1) + return step +} + +// Sanitize makes sure the fields in a submission are ready to be used in things +// like file names. +func (job *Job) Sanitize() { + job.Submitter = sanitize(job.Submitter) + + if job.Type == "" { + job.Type = "analysis" + } + + job.Name = sanitize(job.Name) + + for i, step := range job.Steps { + step.Component.Container.Image.Name = strings.TrimSpace(step.Component.Container.Image.Name) + step.Component.Container.Image.Tag = strings.TrimSpace(step.Component.Container.Image.Tag) + step.Component.Container.Image.OSGImagePath = strings.TrimSpace(step.Component.Container.Image.OSGImagePath) + step.Component.Container.Name = strings.TrimSpace(step.Component.Container.Name) + + for j, vf := range step.Component.Container.VolumesFrom { + vf.Name = strings.TrimSpace(vf.Name) + vf.Tag = strings.TrimSpace(vf.Tag) + vf.NamePrefix = strings.TrimSpace(vf.NamePrefix) + vf.HostPath = strings.TrimSpace(vf.HostPath) + vf.ContainerPath = strings.TrimSpace(vf.ContainerPath) + step.Component.Container.VolumesFrom[j] = vf + } + job.Steps[i] = step + } +} + +// DirectoryName creates a directory name for an analysis. Used when the submission +// doesn't specify an output directory. Some types of jobs, for example +// Foundational API jobs, include a timestamp in the job name, so a timestamp +// will not be appended to the directory name in those cases. +func (job *Job) DirectoryName() string { + if validName.MatchString(job.Name) { + return job.Name + } + return fmt.Sprintf("%s-%s", job.Name, job.NowDate) +} + +// UserIDForSubmission returns the cleaned up user ID for use in the iplant.cmd file. This +// is dumb. Very, very dumb. +func (job *Job) UserIDForSubmission() string { + var retval string + if job.UserID == "" { + hash := sha256.New() + hash.Write([]byte(job.Submitter)) + md := hash.Sum(nil) + retval = hex.EncodeToString(md) + } else { + retval = job.UserID + } + return fmt.Sprintf("_%s", strings.Replace(retval, "-", "", -1)) +} + +// CondorLogDirectory returns the path to the directory containing condor logs on the +// submission node. This a computed value, so it isn't in the struct. +func (job *Job) CondorLogDirectory() string { + return fmt.Sprintf("%s/", path.Join(job.CondorLogPath, job.Submitter, job.DirectoryName())) +} + +// IRODSConfig returns the path to iRODS config inside the working directory. +func (job *Job) IRODSConfig() string { + return path.Join("logs", "irods-config") +} + +// OutputDirectory returns the path to the output directory in iRODS. It's +// computed, which is why it isn't in the struct. Use this instead of directly +// accessing the OutputDir field. +func (job *Job) OutputDirectory() string { + if job.OutputDir == "" { + return path.Join(job.IRODSBase, job.Submitter, "analyses", job.DirectoryName()) + } else if job.OutputDir != "" && job.CreateOutputSubdir { + return path.Join(job.OutputDir, job.DirectoryName()) + } else if job.OutputDir != "" && !job.CreateOutputSubdir { + return strings.TrimSuffix(job.OutputDir, "/") + } + //probably won't ever reach this, but just in case... + return path.Join(job.IRODSBase, job.Submitter, "analyses", job.DirectoryName()) +} + +// DataContainers returns a list of VolumesFrom that describe the data +// containers associated with the job submission. +func (job *Job) DataContainers() []VolumesFrom { + var vfs []VolumesFrom + for _, step := range job.Steps { + for _, vf := range step.Component.Container.VolumesFrom { + vfs = append(vfs, vf) + } + } + return vfs +} + +// ContainerImages returns a []ContainerImage of all of the images associated +// with this submission. +func (job *Job) ContainerImages() []ContainerImage { + var ci []ContainerImage + for _, step := range job.Steps { + ci = append(ci, step.Component.Container.Image) + } + return ci +} + +// Inputs returns all of the StepInputs associated with the submission, +// regardless of what step they're associated with. +func (job *Job) Inputs() []StepInput { + var inputs []StepInput + for _, step := range job.Steps { + for _, input := range step.Config.Inputs { + inputs = append(inputs, input) + } + } + return inputs +} + +// Outputs returns all of the StepOutputs associated with the submission, +// regardless of what step they're associated with. +func (job *Job) Outputs() []StepOutput { + var outputs []StepOutput + for _, step := range job.Steps { + for _, output := range step.Config.Outputs { + outputs = append(outputs, output) + } + } + return outputs +} + +// ExcludeArguments returns a list of paths that should not upload as outputs. +func (job *Job) ExcludeArguments() []string { + var paths []string + for _, input := range job.Inputs() { + if !input.Retain && input.Value != "" { + paths = append(paths, input.Source()) + } + } + for _, output := range job.Outputs() { + if !output.Retain { + paths = append(paths, output.Source()) + } + } + for _, ff := range job.FilterFiles { + paths = append(paths, ff) + } + if !job.ArchiveLogs { + paths = append(paths, "logs") + } + + return paths +} + +// AddRequiredMetadata adds any required AVUs that are required but are missing +// from Job.FileMetadata. This should be called after both of the New*() +// functions and after the Job has been initialized from JSON. +func (job *Job) AddRequiredMetadata() { + foundAnalysis := false + foundExecution := false + for _, md := range job.FileMetadata { + if md.Attribute == "ipc-analysis-id" { + foundAnalysis = true + } + if md.Attribute == "ipc-execution-id" { + foundExecution = true + } + } + if !foundAnalysis { + job.FileMetadata = append( + job.FileMetadata, + FileMetadata{ + Attribute: "ipc-analysis-id", + Value: job.AppID, + Unit: "UUID", + }, + ) + } + if !foundExecution { + job.FileMetadata = append( + job.FileMetadata, + FileMetadata{ + Attribute: "ipc-execution-id", + Value: job.InvocationID, + Unit: "UUID", + }, + ) + } +} + +// FinalOutputArguments returns a string containing the arguments passed to +// porklock for the final output operation, which transfers all files back into +// iRODS. +func (job *Job) FinalOutputArguments(excludeFilePath string) []string { + dest := job.OutputDirectory() + retval := []string{ + "put", + "--user", job.Submitter, + "--destination", dest, + } + for _, m := range MetadataArgs(job.FileMetadata).FileMetadataArguments() { + retval = append(retval, m) + } + if excludeFilePath != "" { + retval = append(retval, "--exclude", excludeFilePath) + } + if job.SkipParentMetadata { + retval = append(retval, "--skip-parent-meta") + } + return retval +} + +// FormatUserGroups converts the list of user groups to the list format used by the +// HTCondor job submission file. +func (job *Job) FormatUserGroups() string { + return submitfile.FormatList(job.UserGroups) +} + +// UsesVolumes returns a boolean value which indicates if any step of a job uses host-mounted volumes +func (job *Job) UsesVolumes() bool { + for _, step := range job.Steps { + if step.UsesVolumes() { + return true + } + } + return false +} + +// FilterInputsWithoutTickets returns a list of inputs that do not have download tickets. +func (job *Job) FilterInputsWithoutTickets() []StepInput { + var inputs []StepInput + for _, input := range job.Inputs() { + if input.Ticket == "" { + inputs = append(inputs, input) + } + } + return inputs +} + +// FilterInputsWithTickets returns a list of inputs that have download tickets. +func (job *Job) FilterInputsWithTickets() []StepInput { + var inputs []StepInput + for _, input := range job.Inputs() { + if input.Ticket != "" { + inputs = append(inputs, input) + } + } + return inputs +} + +// CPURequest calculates the highest maximum CPU among the steps of a job (i.e. +// the largest slot size the job will need), or 0 if no steps have maximum CPUs +// set +func (job *Job) CPURequest() float32 { + var cpu float32 + + for _, step := range job.Steps { + if step.Component.Container.MaxCPUCores > cpu { + cpu = step.Component.Container.MaxCPUCores + } + } + + return cpu +} + +// MemoryRequest calculates the highest maximum memory among the steps of a job +// (i.e. the largest slot size the job will need), or 0 if no steps have +// maximum memory set +func (job *Job) MemoryRequest() int64 { + var mem int64 + + for _, step := range job.Steps { + if step.Component.Container.MemoryLimit > mem { + mem = step.Component.Container.MemoryLimit + } + } + + return mem +} + +// DiskRequest calculates the highest disk need among the steps of a job +// (i.e. the largest slot size the job will need), or 0 if no steps have +// disk set. As we only track minimum disk space, it uses that number. +func (job *Job) DiskRequest() int64 { + var disk int64 + + for _, step := range job.Steps { + if step.Component.Container.MinDiskSpace > disk { + disk = step.Component.Container.MinDiskSpace + } + } + + return disk +} + +// FileMetadata describes a unit of metadata that should get associated with +// all of the files associated with the job submission. +type FileMetadata struct { + Attribute string `json:"attr"` + Value string `json:"value"` + Unit string `json:"unit"` +} + +// Argument returns a string containing the command-line settings for the +// file transfer tool. +func (m *FileMetadata) Argument() []string { + return []string{"-m", fmt.Sprintf("%s,%s,%s", m.Attribute, m.Value, m.Unit)} +} + +// MetadataArgs is a list of FileMetadata +type MetadataArgs []FileMetadata + +// FileMetadataArguments returns a string containing the command-line arguments +// for porklock that sets all of the metadata triples. +func (m MetadataArgs) FileMetadataArguments() []string { + retval := []string{} + for _, fm := range m { + for _, a := range fm.Argument() { + retval = append(retval, a) + } + } + return retval +} diff --git a/vendor/gopkg.in/cyverse-de/model.v4/service.properties b/vendor/gopkg.in/cyverse-de/model.v4/service.properties new file mode 100644 index 0000000..c9f138e --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/service.properties @@ -0,0 +1 @@ +repo = model diff --git a/vendor/gopkg.in/cyverse-de/model.v4/step.go b/vendor/gopkg.in/cyverse-de/model.v4/step.go new file mode 100644 index 0000000..5f96823 --- /dev/null +++ b/vendor/gopkg.in/cyverse-de/model.v4/step.go @@ -0,0 +1,188 @@ +package model + +import ( + "bytes" + "fmt" + "path" + "sort" + "strings" +) + +// StepComponent is where the settings for a tool in a job step are located. +type StepComponent struct { + Container Container `json:"container"` + Type string `json:"type"` + Name string `json:"name"` + Location string `json:"location"` + Description string `json:"description"` + TimeLimit int `json:"time_limit_seconds"` + Restricted bool `json:"restricted"` + IsInteractive bool `json:"interactive"` +} + +// StepEnvironment defines the environment variables that should be set for a +// step +type StepEnvironment map[string]string + +// Step describes a single step in a job. All jobs contain multiple steps. +type Step struct { + Component StepComponent + Config StepConfig + Type string `json:"type"` + StdinPath string `json:"stdin"` + StdoutPath string `json:"stdout"` + StderrPath string `json:"stderr"` + LogFile string `json:"log-file"` + Environment StepEnvironment `json:"environment"` + Input []StepInput `json:"input"` + Output []StepOutput `json:"output"` +} + +// EnvOptions returns a string containing the docker command-line options +// that set the environment variables listed in the Environment field. +func (s *Step) EnvOptions() []string { + retval := []string{} + for k, v := range s.Environment { + retval = append(retval, fmt.Sprintf("--env=\"%s=%s\"", k, v)) + } + return retval +} + +// IsBackwardsCompatible returns true if the job submission uses the container +// image(s) put together to maintain compatibility with non-dockerized versions +// of the DE. +func (s *Step) IsBackwardsCompatible() bool { + img := s.Component.Container.Image.Name + return strings.HasPrefix(img, "discoenv/backwards-compat") || + strings.HasPrefix(img, "gims.iplantcollaborative.org:5000/backwards-compat") || + strings.HasPrefix(img, "docker.cyverse.org/backwards-compat") +} + +// UsesVolumes returns a boolean value which indicates if a step uses host-mounted volumes +func (s *Step) UsesVolumes() bool { + return s.Component.Container.UsesVolumes() +} + +// Executable returns a string containing the executable path as it gets placed +// inside the docker command-line. +func (s *Step) Executable() string { + if s.IsBackwardsCompatible() { + return path.Join(s.Component.Location, s.Component.Name) + } + return "" +} + +// Arguments returns a []string containing all of the options passed to the +// docker run command for this step in the submission. +func (s *Step) Arguments() []string { + allLines := []string{strings.TrimSpace(s.Executable())} + for _, p := range s.Config.Parameters() { + if p.Name != "" { + allLines = append(allLines, strings.TrimSpace(p.Name)) + } + if p.Value != "" { + allLines = append(allLines, strings.TrimSpace(p.Value)) + } + } + var cmdLine []string + for _, l := range allLines { + if l != "" { + cmdLine = append(cmdLine, l) + } + } + return cmdLine +} + +// Stdin returns the a quoted version of s.StdinPath or an empty string if it's +// not set. +func (s *Step) Stdin() string { + if s.StdinPath != "" { + return quote(s.StdinPath) + } + return s.StdinPath +} + +// Stdout returns the quoted version of s.StdoutPath or a default value located in +// the logs directory of the working directory. 'suffix' is appended to the +// filename in the logs directory, but only if s.StdoutPath isn't set. +func (s *Step) Stdout(suffix string) string { + if s.StdoutPath != "" { + return s.StdoutPath + } + return path.Join("logs", fmt.Sprintf("%s%s", "condor-stdout-", suffix)) +} + +// Stderr returns the quoted version of s.StderrPath or a default value located in +// the logs directory of the working directory. 'suffix' is appended to the +// filename in the logs directory, but only if s.StderrPath isn't set. +func (s *Step) Stderr(suffix string) string { + if s.StderrPath != "" { + return s.StderrPath + } + return path.Join("logs", fmt.Sprintf("%s%s", "condor-stderr-", suffix)) +} + +// LogPath uses the value of step.LogFile and params to generate a path to a +// log file. If Step.LogFile isn't empty, it's placed inside the directory +// specified by parent. If it is empty, a path like +// "/logs/condor-log-" is returned. +func (s *Step) LogPath(parent, suffix string) string { + if s.LogFile != "" { + return path.Join(parent, s.LogFile) + } + return path.Join(parent, "logs", fmt.Sprintf("condor-log-%s", suffix)) +} + +// StepConfig is where configuration settings for a job step are located. +type StepConfig struct { + Params []StepParam `json:"params"` + Inputs []StepInput `json:"input"` + Outputs []StepOutput `json:"output"` +} + +// Parameters returns the StepParams associated with a Step in the correct order. +// Use this to get the list of Params rather than accessing the field directory. +func (c *StepConfig) Parameters() []StepParam { + sort.Sort(ByOrder(c.Params)) + return c.Params +} + +// StepParam is where the params for a step are located. +type StepParam struct { + ID string `json:"id"` + Name string `json:"name"` + Value string `json:"value"` + Order int `json:"order"` +} + +// ByOrder implements the sort interface for a []StepParam based on the Order +// field. +type ByOrder []StepParam + +// Len returns the number of elements in a ByOrder +func (o ByOrder) Len() int { + return len(o) +} + +// Swap swaps two positions in a []StepParam +func (o ByOrder) Swap(i, j int) { + o[i], o[j] = o[j], o[i] +} + +// Less returns true if position i is less than position j. +func (o ByOrder) Less(i, j int) bool { + return o[i].Order < o[j].Order +} + +// PreviewableStepParam is a list of StepParams that can be returned as a string +// that previews the command-line for a submission. +type PreviewableStepParam []StepParam + +func (p PreviewableStepParam) String() string { + var buffer bytes.Buffer + sort.Sort(ByOrder(p)) + for _, param := range p { + buffer.WriteString(fmt.Sprintf("%s %s ", param.Name, param.Value)) + } + return strings.TrimSpace(buffer.String()) +}