Skip to content

Commit

Permalink
Merge pull request opensearch-project#1054 from gregschohn/Kubernetes…
Browse files Browse the repository at this point in the history
…Experimentation

Description
Longer term, Kubernetes (K8s) will allow us

To unify the single-node developer experience with and on prem experience and the cloud one.
Simplify and speed up deployment, especially around testing.
To enable auto-scaling
Category Enhancement / New feature
Why these changes are required? See above
What is the old behavior before changes and new behavior after changes? CDK deployment should still work as it did before.
For this first cut...

Installing
The k8s directory contains helm charts that can be installed with helm install charts/aggregate/migrationAssistant. Optionally, -n ma can be added to add the helm installation and MA resources to the "ma" namespace (or whatever is specified). aggregate/mockCustomerClusters is a helm chart for testing purposes to bring up the resources external to the migration assistant that are required to facilitate testing all of the migration assistant functionality.

Those aggregate charts pull other helm charts, either local to this repo (charts/components/(captureProxy|migrationConsole...) or 3rd party charts (opensearch, elasticsearch, prometheus, etc) into them. Each chart defines default values in the values.yaml contained within the charts directory, alongside the Chart.yaml manifest file. Those default values can be used to install a working version of the MA solution. Today's functionality is limited and buggy, but doing a reasonable demo install w/out providing any value overrides will be an ongoing requirement.

Before running helm install, you'll likely need to run helm dependency build CHART_DIR, which downloads or copies charts into the CHART_DIR/charts directory. There's a script (linkSubChartsToDependencies.sh), which still has some bugs, that attempts do do some of this manually w/ symlinks to local directories when possible. Symlinks are preferable to tarballs because you don't need to keep rebuilding the dependencies, which can be time consuming.

Configurations
Command line parameters are configured via helm through the values (such as values.yaml). Here's sample contents for the bulkLoad deployment.

parameters:
  initialLeaseDuration:
    value: PT10M
  documentsPerBulkRequest:
    value: 1000
All of Migration Assistant's custom processes use the same paradigm to translate those yaml parameters into the command line parameters. Shared helm charts (helmCommon) provide helper macros that allow consistent handling across our fleet of helm packages. For command line parsing, helm charts create deployments, mostly via the common code that does the following. 1) Create config map stores for each of the parameters specified in the values.yaml file; 2) load the values from those config maps as environment variables into an init container; and then 3) that init container runs shell script commands to construct the arguments that will be passed into the main program. For the last part, those arguments are passed via a file (vars.sh) within a shared mount and 4) the main container loads those variables before running the program.

The migration console has an extra init container that constructs the migration_services.yaml - which, like vars.sh is written into a shared container. However, since pods can't refresh environment variables when config maps update and because we're bundling those into a single services yaml file, we have a separate init container to maintain that. That container uses a custom shell script that uses the k8s client to watch config maps and formats the configured values themselves as yaml.
  • Loading branch information
lewijacn authored Jan 13, 2025
2 parents 38513f9 + a31e9dc commit de5ef3a
Show file tree
Hide file tree
Showing 95 changed files with 3,149 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,93 +76,93 @@ public static class Args {
private boolean help;

@Parameter(required = true,
names = { "--snapshot-name" },
names = { "--snapshot-name", "--snapshotName" },
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(required = false,
names = { "--snapshot-local-dir" },
names = { "--snapshot-local-dir", "--snapshotLocalDir" },
description = ("The absolute path to the directory on local disk where the snapshot exists. " +
"Use this parameter if have a copy of the snapshot disk. Mutually exclusive with " +
"Use this parameter if there is a reachable copy of the snapshot on disk. Mutually exclusive with " +
"--s3-local-dir, --s3-repo-uri, and --s3-region."))
public String snapshotLocalDir = null;

@Parameter(required = false,
names = { "--s3-local-dir" },
names = { "--s3-local-dir", "--s3LocalDir" },
description = ("The absolute path to the directory on local disk to download S3 files to. " +
"If you supply this, you must also supply --s3-repo-uri and --s3-region. " +
"Mutually exclusive with --snapshot-local-dir."))
public String s3LocalDir = null;

@Parameter(required = false,
names = {"--s3-repo-uri" },
names = {"--s3-repo-uri", "--s3RepoUri" },
description = ("The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2. " +
"If you supply this, you must also supply --s3-local-dir and --s3-region. " +
"Mutually exclusive with --snapshot-local-dir."))
public String s3RepoUri = null;

@Parameter(required = false,
names = { "--s3-region" },
names = { "--s3-region", "--s3Region" },
description = ("The AWS Region the S3 bucket is in, like: us-east-2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-repo-uri. Mutually exclusive with --snapshot-local-dir."))
public String s3Region = null;

@Parameter(required = true,
names = { "--lucene-dir" },
names = { "--lucene-dir", "--luceneDir" },
description = "The absolute path to the directory where we'll put the Lucene docs")
public String luceneDir;

@ParametersDelegate
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@Parameter(required = false,
names = { "--index-allowlist" },
names = { "--index-allowlist", "--indexAllowlist" },
description = ("Optional. List of index names to migrate (e.g. 'logs_2024_01, logs_2024_02'). " +
"Default: all non-system indices (e.g. those not starting with '.')"))
public List<String> indexAllowlist = List.of();

@Parameter(required = false,
names = { "--max-shard-size-bytes" },
names = { "--max-shard-size-bytes", "--maxShardSizeBytes" },
description = ("Optional. The maximum shard size, in bytes, to allow when " +
"performing the document migration. " +
"Useful for preventing disk overflow. Default: 80 * 1024 * 1024 * 1024 (80 GB)"))
public long maxShardSizeBytes = 80 * 1024 * 1024 * 1024L;

@Parameter(required = false,
names = { "--initial-lease-duration" },
names = { "--initial-lease-duration", "--initialLeaseDuration" },
converter = DurationConverter.class,
description = "Optional. The time that the first attempt to migrate a shard's documents should take. " +
"If a process takes longer than this the process will terminate, allowing another process to " +
"attempt the migration, but with double the amount of time than the last time. Default: PT10M")
public Duration initialLeaseDuration = Duration.ofMinutes(10);

@Parameter(required = false,
names = { "--otel-collector-endpoint" },
names = { "--otel-collector-endpoint", "--otelCollectorEndpoint" },
arity = 1,
description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
+ "forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;

@Parameter(required = false,
names = "--documents-per-bulk-request",
names = {"--documents-per-bulk-request", "--documentsPerBulkRequest"},
description = "Optional. The number of documents to be included within each bulk request sent. " +
"Default no max (controlled by documents size)")
int numDocsPerBulkRequest = Integer.MAX_VALUE;

@Parameter(required = false,
names = "--documents-size-per-bulk-request",
names = { "--documents-size-per-bulk-request", "--documentsSizePerBulkRequest" },
description = "Optional. The maximum aggregate document size to be used in bulk requests in bytes. " +
"Note does not apply to single document requests. Default 10 MiB")
long numBytesPerBulkRequest = 10 * 1024L * 1024L;

@Parameter(required = false,
names = "--max-connections",
names = {"--max-connections", "--maxConnections" },
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(required = true,
names = { "--source-version" },
names = { "--source-version", "--sourceVersion" },
converter = VersionConverter.class,
description = ("Version of the source cluster."))
public Version sourceVersion = Version.fromString("ES 7.10");
Expand Down Expand Up @@ -239,7 +239,8 @@ public static void validateArgs(Args args) {
public static void main(String[] args) throws Exception {
// TODO: Add back arg printing after not consuming plaintext password MIGRATIONS-1915
var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting RfsMigrateDocuments with workerId =" + workerId);
System.err.println("Starting program with: " + String.join(" ", args));
log.info("Starting RfsMigrateDocuments with workerId=" + workerId);

Args arguments = new Args();
JCommander jCommander = JCommander.newBuilder().addObject(arguments).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,41 @@ default ConnectionContext toConnectionContext() {

@Getter
public static class TargetArgs implements IParams {
@Parameter(names = {
"--target-host" }, description = "The target host and port (e.g. http://localhost:9200)", required = true)
@Parameter(
names = {"--target-host", "--targetHost" },
description = "The target host and port (e.g. http://localhost:9200)",
required = true)
public String host;

@Parameter(names = {
"--target-username" }, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
@Parameter(
names = {"--target-username", "--targetUsername" },
description = "Optional. The target username; if not provided, will assume no auth on target",
required = false)
public String username = null;

@Parameter(names = {
"--target-password" }, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
@Parameter(
names = {"--target-password", "--targetPassword" },
description = "Optional. The target password; if not provided, will assume no auth on target",
required = false)
public String password = null;

@Parameter(names = {
"--target-aws-region" }, description = "Optional. The target aws region. Required only if sigv4 auth is used", required = false)
@Parameter(
names = {"--target-aws-region", "--targetAwsRegion" },
description = "Optional. The target aws region. Required only if sigv4 auth is used",
required = false)
public String awsRegion = null;

@Parameter(names = {
"--target-aws-service-signing-name" }, description = "Optional. The target aws service signing name, e.g 'es' for Amazon OpenSearch Service and 'aoss' for Amazon OpenSearch Serverless. Required if sigv4 auth is used.", required = false)
@Parameter(
names = {"--target-aws-service-signing-name", "--targetAwsServiceSigningName" },
description = "Optional. The target aws service signing name, e.g 'es' for " +
"Amazon OpenSearch Service and 'aoss' for Amazon OpenSearch Serverless. " +
"Required if sigv4 auth is used.",
required = false)
public String awsServiceSigningName = null;

@Parameter(names = {
"--target-insecure" }, description = "Allow untrusted SSL certificates for target", required = false)
@Parameter(
names = { "--target-insecure", "--targetInsecure" },
description = "Allow untrusted SSL certificates for target", required = false)
public boolean insecure = false;

@ParametersDelegate
Expand All @@ -139,35 +152,50 @@ public boolean isCompressionEnabled() {
// Flags that require more testing and validation before recommendations are made
@Getter
public static class TargetAdvancedArgs {
@Parameter(names = {
"--target-compression" }, description = "**Advanced**. Allow request compression to target", required = false)
@Parameter(names = {"--target-compression", "--targetCompression" },
description = "**Advanced**. Allow request compression to target",
required = false)
public boolean compressionEnabled = false;
}

@Getter
public static class SourceArgs implements IParams {
@Parameter(names = {
"--source-host" }, description = "The source host and port (e.g. http://localhost:9200)", required = false)
@Parameter(
names = {"--source-host", "--sourceHost" },
description = "The source host and port (e.g. http://localhost:9200)",
required = false)
public String host = null;

@Parameter(names = {
"--source-username" }, description = "The source username; if not provided, will assume no auth on source", required = false)
@Parameter(
names = {"--source-username", "--sourceUsername" },
description = "The source username; if not provided, will assume no auth on source",
required = false)
public String username = null;

@Parameter(names = {
"--source-password" }, description = "The source password; if not provided, will assume no auth on source", required = false)
@Parameter(
names = {"--source-password", "--sourcePassword" },
description = "The source password; if not provided, will assume no auth on source",
required = false)
public String password = null;

@Parameter(names = {
"--source-aws-region" }, description = "Optional. The source aws region, e.g. 'us-east-1'. Required if sigv4 auth is used", required = false)
@Parameter(
names = {"--source-aws-region", "--sourceAwsRegion" },
description = "Optional. The source aws region, e.g. 'us-east-1'. Required if sigv4 auth is used",
required = false)
public String awsRegion = null;

@Parameter(names = {
"--source-aws-service-signing-name" }, description = "Optional. The source aws service signing name, e.g 'es' for Amazon OpenSearch Service and 'aoss' for Amazon OpenSearch Serverless. Required if sigv4 auth is used.", required = false)
@Parameter(
names = {"--source-aws-service-signing-name", "--sourceAwsServiceSigningName" },
description = "Optional. The source aws service signing name, e.g 'es' for " +
"Amazon OpenSearch Service and 'aoss' for Amazon OpenSearch Serverless. " +
"Required if sigv4 auth is used.",
required = false)
public String awsServiceSigningName = null;

@Parameter(names = {
"--source-insecure" }, description = "Allow untrusted SSL certificates for source", required = false)
@Parameter(
names = {"--source-insecure", "--sourceInsecure" },
description = "Allow untrusted SSL certificates for source",
required = false)
public boolean insecure = false;

public boolean isCompressionEnabled() {
Expand Down
2 changes: 2 additions & 0 deletions TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
def dockerFilesForExternalServices = [
"elasticsearch_searchguard": "elasticsearchWithSearchGuard",
"capture_proxy_base": "captureProxyBase",
"k8s_config_map_util_scripts": "k8sConfigMapUtilScripts",
"elasticsearch_client_test_console": "elasticsearchTestConsole",
"migration_console": "migrationConsole",
"otel_collector": "otelCollector",
Expand Down Expand Up @@ -125,6 +126,7 @@ dockerCompose {
}

task buildDockerImages {
dependsOn buildDockerImage_k8s_config_map_util_scripts
dependsOn buildDockerImage_elasticsearch_searchguard
dependsOn buildDockerImage_migration_console
dependsOn buildDockerImage_otel_collector
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM amazonlinux:2023

ENV PIP_ROOT_USER_ACTION ignore
ENV LANG C.UTF-8

RUN dnf install -y \
jq \
less \
python3.11 \
python3.11-devel \
python3.11-pip \
python3.11-wheel \
tar \
unzip \
vim \
wget \
&& \
dnf clean all && \
rm -rf /var/cache/dnf

# Define the virtual environment path to use for all pipenv runs
ENV WORKON_HOME=/
ENV PIPENV_CUSTOM_VENV_NAME=.venv
ENV PIPENV_DEFAULT_PYTHON_VERSION=3.11
ENV PIPENV_MAX_DEPTH=1

RUN python3.11 -m pip install pipenv
WORKDIR /
RUN python3.11 -m venv .venv

WORKDIR /root
COPY Pipfile .
COPY Pipfile.lock .
RUN pipenv install --deploy

COPY configmap2yaml/* /root/
RUN chmod ug+x /root/*.py

ENTRYPOINT ["tail", "-f", "/dev/null"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
kubernetes = ">=30.1.0"
pyyaml = ">=6.0.2"
Jinja2 = ">=3.1.4"

[dev-packages]

[requires]
python_version = "3.11"
Loading

0 comments on commit de5ef3a

Please sign in to comment.