diff --git a/docs/sources/setup/install/helm/_index.md b/docs/sources/setup/install/helm/_index.md index 5392838e2ca74..05def151c5ca7 100644 --- a/docs/sources/setup/install/helm/_index.md +++ b/docs/sources/setup/install/helm/_index.md @@ -22,12 +22,22 @@ This guide references the Loki Helm chart version 3.0 or greater and contains th If you are installing Grafana Enterprise Logs, follow the [GEL Helm installation](https://grafana.com/docs/enterprise-logs//setup/helm/). +## Deployment Recommendations + +Loki is designed to be run in two states: +* [Monolithic](https://grafana.com/docs/loki//get-started/deployment-modes/#monolithic-mode): Recommended when you are running Loki as part of a small meta monitoring stack. +* [Microservices](https://grafana.com/docs/loki//get-started/deployment-modes/#microservices-mode): For workloads that require high availability and scalability. Loki is deployed in this mode internally at Grafana Labs. + +{{< admonition type="tip" >}} +Loki can also be deployed in [Simple Scalable mode](https://grafana.com/docs/loki//get-started/deployment-modes/#simple-scalable). For the best possible experience in production, we recommend deploying Loki in *microservices* mode. +{{< /admonition >}} ## Cloud Deployment Guides The following guides provide step-by-step instructions for deploying Loki on cloud providers: - [Amazon EKS](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/aws/) +- [Azure AKS](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/azure/) ## Reference diff --git a/docs/sources/setup/install/helm/deployment-guides/_index.md b/docs/sources/setup/install/helm/deployment-guides/_index.md index 4ff7d5dcaa983..3e408257f9afe 100644 --- a/docs/sources/setup/install/helm/deployment-guides/_index.md +++ b/docs/sources/setup/install/helm/deployment-guides/_index.md @@ -10,4 +10,5 @@ keywords: The following guides provide step-by-step instructions for deploying Loki on cloud providers: -- [Deploy Loki on AWS](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/aws/) \ No newline at end of file +- [Deploy Loki on AWS](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/aws/) +- [Deploy Loki on Azure](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/azure/) \ No newline at end of file diff --git a/docs/sources/setup/install/helm/deployment-guides/aws.md b/docs/sources/setup/install/helm/deployment-guides/aws.md index 82123318f8bb4..29ead8dfbfcd4 100644 --- a/docs/sources/setup/install/helm/deployment-guides/aws.md +++ b/docs/sources/setup/install/helm/deployment-guides/aws.md @@ -531,9 +531,6 @@ k6 is one of the fastest ways to test your Loki deployment. This will allow you iterations: 10, }; - **It is important to create a namespace called `loki` as our trust policy is set to allow the IAM role to be used by the `loki` service account in the `loki` namespace. This is configurable but make sure to update your service account.** - * "main" function for each VU iteration - */ export default () => { // Push request with 10 streams and uncompressed logs between 800KB and 2MB var res = client.pushParameterized(10, 800 * KB, 2 * MB); diff --git a/docs/sources/setup/install/helm/deployment-guides/azure.md b/docs/sources/setup/install/helm/deployment-guides/azure.md new file mode 100644 index 0000000000000..7f4a2fdd85095 --- /dev/null +++ b/docs/sources/setup/install/helm/deployment-guides/azure.md @@ -0,0 +1,590 @@ +--- +title: Deploy the Loki Helm chart on Azure +menuTitle: Deploy on Azure +description: Installing the Loki Helm chart on Azure. +keywords: +--- + +# Deploy the Loki Helm chart on Azure + +This guide shows how to deploy a minimally viable Loki in **microservice** mode on Azure using the Helm chart. In order to successfully complete this guide, you must have the necessary tools and permissions to deploy resources on Azure, such as: + +- Full access to AKS (Azure Kubernetes Service) +- Full access to Azure Blob Storage +- Sufficient permissions to create federated credentials and roles in Azure AD (Active Directory) + +There are three primary ways to authenticate Loki with Azure: + +- Hard coding a connection string - this is the simplest method but is not recommended for production environments. +- Manged identity +- Federated token + +In this guide, we will use the federated token method to deploy Loki on Azure. This method is more secure than hard coding a connection string and is more suitable for production environments. + +## Considerations + +{{< admonition type="caution" >}} +This guide was accurate at the time it was last updated on **8th of January, 2025**. As cloud providers frequently update their services and offerings, as a best practice, you should refer to the [Azure documentation](https://learn.microsoft.com/en-us/azure/) before creating your storage account and assigning roles. +{{< /admonition >}} + +- **AD Role:** In this tutorial we will create a role in Azure Active Directory (Azure AD) to allow Loki to read and write from Azure Blob Storage. This role will be assigned to the Loki service account. You may want to adjust the permissions based on your requirements. + +- **Authentication:** Grafana Loki comes with a basic authentication layer. The Loki gateway (NGINX) is exposed to the internet using basic authentication in this example. NGINX can also be replaced with other open-source reverse proxies. Refer to [Authentication](https://grafana.com/docs/loki//operations/authentication/) for more information. + +- **Retention:** The retention period is set to 28 days in the `values.yaml` file. You may wish to adjust this based on your requirements. + +- **Costs:** Running Loki on Azure will incur costs. Make sure to monitor your usage and costs to avoid any unexpected bills. In this guide we have used a simple AKS cluster with 3 nodes and `Standard_E2ds_v5` instances. You may wish to adjust the instance types and number of nodes based on your workload. + +## Prerequisites + +- Helm 3 or above. Refer to [Installing Helm](https://helm.sh/docs/intro/install/). This should be installed on your local machine. +- Kubectl installed on your local machine. Refer to [Install and Set Up kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). +- Azure CLI installed on your local machine. Refer to [Installing the Azure CLI](https://learn.microsoft.com/en-us/cli/azure/install-azure-cli). This is a requirement for following this guide as all resources will be created using the Azure CLI. + +### AKS minimum requirements + +Before creating an AKS cluster in Azure you need to create a resource group. You can create a resource group using the Azure CLI: + +```bash +az group create --name -location +``` + +{{< admonition type="caution" >}} +These AKS requirements are the minimum specification needed to deploy Loki using this guide. You may wish to adjust the instance types based on your Azure environment and workload. **If you choose to do so, we cannot guarantee that this sample configuration will still meet your needs.** + +In this guide, we deploy Loki using `Standard_E2ds_v5` instances. This is to make sure we remain within the free tier limits for Azure. Which allows us to deploy up to 10 vCPUs within a region. We recommend for large production workloads to scale these nodes up to `Standard_D4_v5`. +{{< /admonition >}} + +The minimum requirements for deploying Loki on AKS are: + +- Kubernetes version `1.30` or above. +- `3` nodes for the AKS cluster. +- Instance type depends on your workload. A good starting point for a production cluster in the free tier is `Standard_E2ds_v5` instances and for large production workloads `Standard_D4_v5` instances. + +Here is how to create an AKS cluster with the minimum requirements: + +```bash +az aks create \ + --resource-group \ + --name \ + --node-count 3 \ + --node-vm-size Standard_E2ds_v5 \ + --generate-ssh-keys \ + --enable-workload-identity \ + --enable-oidc-issuer +``` + +Note in the above command we have enabled workload identity and OIDC issuer. This is required for the Loki service account to authenticate with Azure AD. If you have already created an AKS cluster, you can enable these features using the following command: + +```bash +az aks update \ + --resource-group \ + --name \ + --enable-workload-identity \ + --enable-oidc-issuer +``` + +The Azure CLI also lets you bind the AKS cluster to kubectl. You can do this by running the following command: + +```bash +az aks get-credentials --resource-group --name +``` + +## Configuring Azure Blob Storage + +{{< admonition type="tip" >}} + Consider using unique bucket names rather than: `chunk`, `ruler`, and `admin`. Although Azure Blog Storage is not directly affected by this [security update](https://grafana.com/blog/2024/06/27/grafana-security-update-grafana-loki-and-unintended-data-write-attempts-to-amazon-s3-buckets/) it is a best practice to use unique container names for buckets. +{{< /admonition >}} + +Before deploying Loki, you need to create two Azure storage containers; one to store logs (chunks), the second to store alert rules. You can create the containers using the Azure CLI. Containers must exist inside a storage account. + +{{< admonition type="note" >}} +GEL customers will require a third container to store the admin data. This container is not required for OSS users. +{{< /admonition >}} + +1. Create a storage account: + + ```bash + az storage account create \ + --name \ + --location \ + --sku Standard_ZRS \ + --encryption-services blob \ + --resource-group + ``` + Replace the placeholders with your desired values. + +1. Create the containers for chunks and ruler: + + ```bash + az storage container create --account-name --name --auth-mode login && \ + az storage container create --account-name --name --auth-mode login + ``` + Make sure `--account-name` matches the account you just created + + +With the storage account and containers created, you can now proceed to creating the Azure AD role and federated credentials. + +## Creating the Azure AD role and federated credentials + +The recommended way to authenticate Loki with Azure Blob Storage is to use federated credentials. This method is more secure than hard coding a connection string directly into the Loki configuration. In this next section, we will create an Azure AD role and federated credentials for Loki to allow it to read and write from Azure Blob Storage: + +1. Locate the OpenID Connect (OIDC) issuer URL: + + ```bash + az aks show \ + --resource-group \ + --name \ + --query "oidcIssuerProfile.issuerUrl" \ + -o tsv + ``` + This command will return the OIDC issuer URL. You will need this URL to create the federated credentials. + +1. Generate a `credentials.json` file with the following content: + ```json + { + "name": "LokiFederatedIdentity", + "issuer": "", + "subject": "system:serviceaccount:loki:loki", + "description": "Federated identity for Loki accessing Azure resources", + "audiences": [ + "api://AzureADTokenExchange" + ] + } + ``` + Replace `` with the OIDC issuer URL you found in the previous step. + +1. Make sure you to save the `credentials.json` file before continuing. + +1. Next generate an Azure directory `app`. We will use this to assign our federated credentials to: + ```bash + az ad app create \ + --display-name loki \ + --query appId \ + -o tsv + ``` + This will return the app ID. Save this for later use. If you need to find the app ID later you can run the following command: + ```bash + az ad app list --display-name loki --query "[].appId" -o tsv + ``` + +1. The app requires a service principal to authenticate with Azure AD. Create a service principal for the app: + + ```bash + az ad sp create --id + ``` + Replace `` with the app ID you generated in the previous step. + +1. Next assign the federated credentials to the app: + + ```bash + az ad app federated-credential create \ + --id \ + --parameters credentials.json + ``` + Replace `` with the app ID you generated in the previous step. + +1. Lastly add a role assignment to the app: + + ```bash + az role assignment create \ + --role "Storage Blob Data Contributor" \ + --assignee \ + --scope /subscriptions//resourceGroups//providers/Microsoft.Storage/storageAccounts/ + ``` + Replace the placeholders with your actual values. + +Now that you have created the Azure AD role and federated credentials, you can proceed to deploying Loki using the Helm chart. + + +## Deploying the Helm chart + +The following steps require the use of `helm` and `kubectl`. Make sure you have run the `az` command to bind your AKS cluster to `kubectl`: + +```bash +az aks get-credentials --resource-group --name +``` + +Before we can deploy the Loki Helm chart, we need to add the Grafana chart repository to Helm. This repository contains the Loki Helm chart. + +1. Add the Grafana chart repository to Helm: + + ```bash + helm repo add grafana https://grafana.github.io/helm-charts + ``` +1. Update the chart repository: + + ```bash + helm repo update + ``` +1. Create a new namespace for Loki: + + ```bash + kubectl create namespace loki + ``` +### Loki basic authentication + +Loki by default does not come with any authentication. Since we will be deploying Loki to Azure and exposing the gateway to the internet, we recommend adding at least basic authentication. In this guide we will give Loki a username and password: + +1. To start we will need create a `.htpasswd` file with the username and password. You can use the `htpasswd` command to create the file: + + {{< admonition type="tip" >}} + If you don't have the `htpasswd` command installed, you can install it using `brew` or `apt-get` or `yum` depending on your OS. + {{< /admonition >}} + + ```bash + htpasswd -c .htpasswd + ``` + This will create a file called `auth` with the username `loki`. You will be prompted to enter a password. + + 1. Create a Kubernetes secret with the `.htpasswd` file: + + ```bash + kubectl create secret generic loki-basic-auth --from-file=.htpasswd -n loki + ``` + + This will create a secret called `loki-basic-auth` in the `loki` namespace. We will reference this secret in the Loki Helm chart configuration. + +1. Create a `canary-basic-auth` secret for the canary: + + ```bash + kubectl create secret generic canary-basic-auth \ + --from-literal=username= \ + --from-literal=password= \ + -n loki + ``` + We create a literal secret with the username and password for Loki canary to authenticate with the Loki gateway. Make sure to replace the placeholders with your desired username and password. + +### Loki Helm chart configuration + +Create a `values.yaml` file choosing the configuration options that best suit your requirements. Below there is an example of `values.yaml` files for the Loki Helm chart in [microservices](https://grafana.com/docs/loki//get-started/deployment-modes/#microservices-mode) mode. + +```yaml +loki: + podLabels: + "azure.workload.identity/use": "true" # Add this label to the Loki pods to enable workload identity + schemaConfig: + configs: + - from: "2024-04-01" + store: tsdb + object_store: azure + schema: v13 + index: + prefix: loki_index_ + period: 24h + storage_config: + azure: + account_name: "" + container_name: "" # Your actual Azure Blob Storage container name (loki-azure-dev-chunks) + use_federated_token: true # Use federated token for authentication + ingester: + chunk_encoding: snappy + pattern_ingester: + enabled: true + limits_config: + allow_structured_metadata: true + volume_enabled: true + retention_period: 672h # 28 days retention + compactor: + retention_enabled: true + delete_request_store: azure + ruler: + enable_api: true + storage: + type: azure + azure: + account_name: + container_name: # Your actual Azure Blob Storage container name (loki-azure-dev-ruler) + use_federated_token: true # Use federated token for authentication + alertmanager_url: http://prom:9093 # The URL of the Alertmanager to send alerts (Prometheus, Mimir, etc.) + + querier: + max_concurrent: 4 + + storage: + type: azure + bucketNames: + chunks: "" # Your actual Azure Blob Storage container name (loki-azure-dev-chunks) + ruler: "" # Your actual Azure Blob Storage container name (loki-azure-dev-ruler) + # admin: "admin-loki-devrel" # Your actual Azure Blob Storage container name (loki-azure-dev-admin) + azure: + accountName: + useFederatedToken: true # Use federated token for authentication + +# Define the Azure workload identity +serviceAccount: + name: loki + annotations: + "azure.workload.identity/client-id": "" # The app ID of the Azure AD app + labels: + "azure.workload.identity/use": "true" + +deploymentMode: Distributed + +ingester: + replicas: 3 + zoneAwareReplication: + enabled: false + +querier: + replicas: 3 + maxUnavailable: 2 + +queryFrontend: + replicas: 2 + maxUnavailable: 1 + +queryScheduler: + replicas: 2 + +distributor: + replicas: 3 + maxUnavailable: 2 +compactor: + replicas: 1 + +indexGateway: + replicas: 2 + maxUnavailable: 1 + +ruler: + replicas: 1 + maxUnavailable: 1 + + +# This exposes the Loki gateway so it can be written to and queried externaly +gateway: + service: + type: LoadBalancer + basicAuth: + enabled: true + existingSecret: loki-basic-auth + +# Since we are using basic auth, we need to pass the username and password to the canary +lokiCanary: + extraArgs: + - -pass=$(LOKI_PASS) + - -user=$(LOKI_USER) + extraEnv: + - name: LOKI_PASS + valueFrom: + secretKeyRef: + name: canary-basic-auth + key: password + - name: LOKI_USER + valueFrom: + secretKeyRef: + name: canary-basic-auth + key: username + +# Enable minio for storage +minio: + enabled: false + +backend: + replicas: 0 +read: + replicas: 0 +write: + replicas: 0 + +singleBinary: + replicas: 0 +``` + +{{< admonition type="caution" >}} +Make sure to replace the placeholders with your actual values. +{{< /admonition >}} + +It is critical to define a valid `values.yaml` file for the Loki deployment. To remove the risk of misconfiguration, let's break down the configuration options to keep in mind when deploying to Azure: + +- **Loki Config vs. Values Config:** + - The `values.yaml` file contains a section called `loki`, which contains a direct representation of the Loki configuration file. + - This section defines the Loki configuration, including the schema, storage, and querier configuration. + - The key configuration to focus on for chunks is the `storage_config` section, where you define the Azure container name and storage account. This tells Loki where to store the chunks. + - The `ruler` section defines the configuration for the ruler, including the Azure container name and storage account. This tells Loki where to store the alert and recording rules. + - For the full Loki configuration, refer to the [Loki Configuration](https://grafana.com/docs/loki//configure/) documentation. + +- **Storage:** + - Defines where the Helm chart stores data. + - Set the type to `azure` since we are using Azure Blob Storage. + - Configure the container names for the chunks and ruler to match the containers created earlier. + - The `azure` section specifies the storage account name and also sets `useFederatedToken` to `true`. This tells Loki to use federated credentials for authentication. + +- **Service Account:** + - The `serviceAccount` section is used to define the federated workload identity Loki will use to authenticate with Azure AD. + - We set the `azure.workload.identity/client-id` annotation to the app ID of the Azure AD app. + +- **Gateway:** + - Defines how the Loki gateway will be exposed. + - We are using a `LoadBalancer` service type in this configuration. + + +### Deploy Loki + +Now that you have created the `values.yaml` file, you can deploy Loki using the Helm chart. + +1. Deploy using the newly created `values.yaml` file: + + ```bash + helm install --values values.yaml loki grafana/loki -n loki --create-namespace + ``` + It is important to create a namespace called `loki` as our federated credentials were generated with the value `system:serviceaccount:loki:loki`. This translates to the `loki` service account in the `loki` namespace. This is configurable but make sure to update the federated credentials file first. + +1. Verify the deployment: + + ```bash + kubectl get pods -n loki + ``` + You should see the Loki pods running. + + ```console + NAME READY STATUS RESTARTS AGE + loki-canary-crqpg 1/1 Running 0 10m + loki-canary-hm26p 1/1 Running 0 10m + loki-canary-v9wv9 1/1 Running 0 10m + loki-chunks-cache-0 2/2 Running 0 10m + loki-compactor-0 1/1 Running 0 10m + loki-distributor-78ccdcc9b4-9wlhl 1/1 Running 0 10m + loki-distributor-78ccdcc9b4-km6j2 1/1 Running 0 10m + loki-distributor-78ccdcc9b4-ptwrb 1/1 Running 0 10m + loki-gateway-5f97f78755-hm6mx 1/1 Running 0 10m + loki-index-gateway-0 1/1 Running 0 10m + loki-index-gateway-1 1/1 Running 0 10m + loki-ingester-zone-a-0 1/1 Running 0 10m + loki-ingester-zone-b-0 1/1 Running 0 10m + loki-ingester-zone-c-0 1/1 Running 0 10m + loki-querier-89d4ff448-4vr9b 1/1 Running 0 10m + loki-querier-89d4ff448-7nvrf 1/1 Running 0 10m + loki-querier-89d4ff448-q89kh 1/1 Running 0 10m + loki-query-frontend-678899db5-n5wc4 1/1 Running 0 10m + loki-query-frontend-678899db5-tf69b 1/1 Running 0 10m + loki-query-scheduler-7d666bf759-9xqb5 1/1 Running 0 10m + loki-query-scheduler-7d666bf759-kpb5q 1/1 Running 0 10m + loki-results-cache-0 2/2 Running 0 10m + loki-ruler-0 1/1 Running 0 10m + ``` + +### Find the Loki gateway service + +The Loki gateway service is a load balancer service that exposes the Loki gateway to the internet. This is where you will write logs to and query logs from. By default NGINX is used as the gateway. + +{{< admonition type="caution" >}} +The Loki gateway service is exposed to the internet. We provide basic authentication using a username and password in this tutorial. Refer to the [Authentication](https://grafana.com/docs/loki//operations/authentication/) documentation for more information. +{{< /admonition >}} + +To find the Loki gateway service, run the following command: + +```bash +kubectl get svc -n loki +``` +You should see the Loki gateway service with an external IP address. This is the address you will use to write to and query Loki. + +```console + NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE +loki-gateway LoadBalancer 10.100.201.74 134.236.21.145 80:30707/TCP 46m +``` + +Congratulations! You have successfully deployed Loki on Azure using the Helm chart. Before we finish, let's test the deployment. + +## Testing Your Loki Deployment + +k6 is one of the fastest ways to test your Loki deployment. This will allow you to both write and query logs to Loki. To get started with k6, follow the steps below: + +1. Install k6 with the Loki extension on your local machine. Refer to [Installing k6 and the xk6-loki extension](https://grafana.com/docs/loki//send-data/k6/). + +2. Create a `azure-test.js` file with the following content: + + ```javascript + import {sleep, check} from 'k6'; + import loki from 'k6/x/loki'; + + /** + * URL used for push and query requests + * Path is automatically appended by the client + * @constant {string} + */ + + const username = ''; + const password = ''; + const external_ip = ''; + + const credentials = `${username}:${password}`; + + const BASE_URL = `http://${credentials}@${external_ip}`; + + /** + * Helper constant for byte values + * @constant {number} + */ + const KB = 1024; + + /** + * Helper constant for byte values + * @constant {number} + */ + const MB = KB * KB; + + /** + * Instantiate config and Loki client + */ + + const conf = new loki.Config(BASE_URL); + const client = new loki.Client(conf); + + /** + * Define test scenario + */ + export const options = { + vus: 10, + iterations: 10, + }; + + export default () => { + // Push request with 10 streams and uncompressed logs between 800KB and 2MB + var res = client.pushParameterized(10, 800 * KB, 2 * MB); + // Check for successful write + check(res, { 'successful write': (res) => res.status == 204 }); + + // Pick a random log format from label pool + let format = randomChoice(conf.labels["format"]); + + // Execute instant query with limit 1 + res = client.instantQuery(`count_over_time({format="${format}"}[1m])`, 1) + // Check for successful read + check(res, { 'successful instant query': (res) => res.status == 200 }); + + // Execute range query over last 5m and limit 1000 + res = client.rangeQuery(`{format="${format}"}`, "5m", 1000) + // Check for successful read + check(res, { 'successful range query': (res) => res.status == 200 }); + + // Wait before next iteration + sleep(1); + } + + /** + * Helper function to get random item from array + */ + function randomChoice(items) { + return items[Math.floor(Math.random() * items.length)]; + } + ``` + + Replace `` with the external IP address of the Loki Gateway service. + + This script will write logs to Loki and query logs from Loki. It will write logs in a random format between 800KB and 2MB and query logs in a random format over the last 5 minutes. + +3. Run the test: + + ```bash + ./k6 run azure-test.js + ``` + + This will run the test and output the results. You should see the test writing logs to Loki and querying logs from Loki. + +Now that you have successfully deployed Loki in microservices mode on Microsoft Azure, you may wish to explore the following: + +- [Sending data to Loki](https://grafana.com/docs/loki//query/) +- [Manage Loki](https://grafana.com/docs/loki/}} We recommend running Loki at scale within a cloud environment like AWS, Azure, or GCP. The below guides will show you how to deploy a minimally viable production environment. -- [Deploy Loki on AWS]({{< relref "../deployment-guides/aws" >}}) +- [Deploy Loki on AWS](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/aws/) +- [Deploy Loki on Azure](https://grafana.com/docs/loki//setup/install/helm/deployment-guides/azure/) ## Next Steps * Configure an agent to [send log data to Loki](/docs/loki//send-data/). diff --git a/pkg/dataobj/internal/dataset/column.go b/pkg/dataobj/internal/dataset/column.go new file mode 100644 index 0000000000000..73c63c3512984 --- /dev/null +++ b/pkg/dataobj/internal/dataset/column.go @@ -0,0 +1,25 @@ +package dataset + +import "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + +// Helper types. +type ( + // ColumnInfo describes a column. + ColumnInfo struct { + Name string // Name of the column, if any. + Type datasetmd.ValueType // Type of values in the column. + Compression datasetmd.CompressionType // Compression used for the column. + + RowsCount int // Total number of rows in the column. + CompressedSize int // Total size of all pages in the column after compression. + UncompressedSize int // Total size of all pages in the column before compression. + + Statistics *datasetmd.Statistics // Optional statistics for the column. + } +) + +// MemColumn holds a set of pages of a common type. +type MemColumn struct { + Info ColumnInfo // Information about the column. + Pages []*MemPage // The set of pages in the column. +} diff --git a/pkg/dataobj/internal/dataset/column_builder.go b/pkg/dataobj/internal/dataset/column_builder.go new file mode 100644 index 0000000000000..83476f6f04c92 --- /dev/null +++ b/pkg/dataobj/internal/dataset/column_builder.go @@ -0,0 +1,173 @@ +package dataset + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +// BuilderOptions configures common settings for building pages. +type BuilderOptions struct { + // PageSizeHint is the soft limit for the size of the page. Builders try to + // fill pages as close to this size as possible, but the actual size may be + // slightly larger or smaller. + PageSizeHint int + + // Value is the value type of data to write. + Value datasetmd.ValueType + + // Encoding is the encoding algorithm to use for values. + Encoding datasetmd.EncodingType + + // Compression is the compression algorithm to use for values. + Compression datasetmd.CompressionType +} + +// A ColumnBuilder builds a sequence of [Value] entries of a common type into a +// column. Values are accumulated into a buffer and then flushed into +// [MemPage]s once the size of data exceeds a configurable limit. +type ColumnBuilder struct { + name string + opts BuilderOptions + + rows int // Total number of rows in the column. + + pages []*MemPage + builder *pageBuilder +} + +// NewColumnBuilder creates a new ColumnBuilder from the optional name and +// provided options. NewColumnBuilder returns an error if the options are +// invalid. +func NewColumnBuilder(name string, opts BuilderOptions) (*ColumnBuilder, error) { + builder, err := newPageBuilder(opts) + if err != nil { + return nil, fmt.Errorf("creating page builder: %w", err) + } + + return &ColumnBuilder{ + name: name, + opts: opts, + + builder: builder, + }, nil +} + +// Append adds a new value into cb with the given zero-indexed row number. If +// the row number is higher than the current number of rows in cb, null values +// are added up to the new row. +// +// Append returns an error if the row number is out-of-order. +func (cb *ColumnBuilder) Append(row int, value Value) error { + if row < cb.rows { + return fmt.Errorf("row %d is older than current row %d", row, cb.rows) + } + + // We give two attempts to append the data to the buffer; if the buffer is + // full, we cut a page and then append to the newly reset buffer. + // + // The second iteration should never fail, as the buffer will always be empty + // then. + for range 2 { + if cb.append(row, value) { + cb.rows = row + 1 + return nil + } + + cb.flushPage() + } + + panic("ColumnBuilder.Append: failed to append value to fresh buffer") +} + +// Backfill adds NULLs into cb up to (but not including) the provided row +// number. If values exist up to the provided row number, Backfill does +// nothing. +func (cb *ColumnBuilder) Backfill(row int) { + // We give two attempts to append the data to the buffer; if the buffer is + // full, we cut a page and then append again to the newly reset buffer. + // + // The second iteration should never fail, as the buffer will always be + // empty. + for range 2 { + if cb.backfill(row) { + return + } + cb.flushPage() + } + + panic("ColumnBuilder.Backfill: failed to backfill buffer") +} + +func (cb *ColumnBuilder) backfill(row int) bool { + for row > cb.rows { + if !cb.builder.AppendNull() { + return false + } + cb.rows++ + } + + return true +} + +func (cb *ColumnBuilder) append(row int, value Value) bool { + // Backfill up to row. + if !cb.backfill(row) { + return false + } + return cb.builder.Append(value) +} + +// Flush converts data in cb into a [MemColumn]. Afterwards, cb is reset to a +// fresh state and can be reused. +func (cb *ColumnBuilder) Flush() (*MemColumn, error) { + cb.flushPage() + + info := ColumnInfo{ + Name: cb.name, + Type: cb.opts.Value, + + Compression: cb.opts.Compression, + } + + // TODO(rfratto): Should we compute column-wide statistics if they're + // available in pages? + // + // That would potentially work for min/max values, but not for count + // distinct, unless we had a way to pass sketches around. + + for _, page := range cb.pages { + info.RowsCount += page.Info.RowCount + info.CompressedSize += page.Info.CompressedSize + info.UncompressedSize += page.Info.UncompressedSize + } + + column := &MemColumn{ + Info: info, + Pages: cb.pages, + } + + cb.Reset() + return column, nil +} + +func (cb *ColumnBuilder) flushPage() { + if cb.builder.Rows() == 0 { + return + } + + page, err := cb.builder.Flush() + if err != nil { + // Flush should only return an error when it's empty, which we already + // ensure it's not in the lines above. + panic(fmt.Sprintf("failed to flush page: %s", err)) + } + cb.pages = append(cb.pages, page) +} + +// Reset clears all data in cb and resets it to a fresh state. +func (cb *ColumnBuilder) Reset() { + cb.rows = 0 + cb.pages = nil + cb.builder.Reset() +} diff --git a/pkg/dataobj/internal/dataset/column_iter.go b/pkg/dataobj/internal/dataset/column_iter.go new file mode 100644 index 0000000000000..53b83f159575a --- /dev/null +++ b/pkg/dataobj/internal/dataset/column_iter.go @@ -0,0 +1,20 @@ +package dataset + +import "github.com/grafana/loki/v3/pkg/dataobj/internal/result" + +func iterMemColumn(col *MemColumn) result.Seq[Value] { + return result.Iter(func(yield func(Value) bool) error { + for _, page := range col.Pages { + for result := range iterMemPage(page, col.Info.Type, col.Info.Compression) { + val, err := result.Value() + if err != nil { + return err + } else if !yield(val) { + return nil + } + } + } + + return nil + }) +} diff --git a/pkg/dataobj/internal/dataset/column_test.go b/pkg/dataobj/internal/dataset/column_test.go new file mode 100644 index 0000000000000..545c9a26e357c --- /dev/null +++ b/pkg/dataobj/internal/dataset/column_test.go @@ -0,0 +1,61 @@ +package dataset + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +func TestColumnBuilder_ReadWrite(t *testing.T) { + in := []string{ + "hello, world!", + "", + "this is a test of the emergency broadcast system", + "this is only a test", + "if this were a real emergency, you would be instructed to panic", + "but it's not, so don't", + "", + "this concludes the test", + "thank you for your cooperation", + "goodbye", + } + + opts := BuilderOptions{ + // Set the size to 0 so each column has exactly one value. + PageSizeHint: 0, + Value: datasetmd.VALUE_TYPE_STRING, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + } + b, err := NewColumnBuilder("", opts) + require.NoError(t, err) + + for i, s := range in { + require.NoError(t, b.Append(i, StringValue(s))) + } + + col, err := b.Flush() + require.NoError(t, err) + require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type) + require.Greater(t, len(col.Pages), 1) + + t.Log("Uncompressed size: ", col.Info.UncompressedSize) + t.Log("Compressed size: ", col.Info.CompressedSize) + t.Log("Pages: ", len(col.Pages)) + + var actual []string + for result := range iterMemColumn(col) { + val, err := result.Value() + require.NoError(t, err) + + if val.IsNil() || val.IsZero() { + actual = append(actual, "") + } else { + require.Equal(t, datasetmd.VALUE_TYPE_STRING, val.Type()) + actual = append(actual, val.String()) + } + } + require.Equal(t, in, actual) +} diff --git a/pkg/dataobj/internal/dataset/page.go b/pkg/dataobj/internal/dataset/page.go new file mode 100644 index 0000000000000..31cae969a9c76 --- /dev/null +++ b/pkg/dataobj/internal/dataset/page.go @@ -0,0 +1,99 @@ +package dataset + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +// Helper types. +type ( + // PageData holds the raw data for a page. Data is formatted as: + // + // + // + // The presence-bitmap is a bitmap-encoded sequence of booleans, where values + // describe which rows are present (1) or nil (0). The presence bitmap is + // always stored uncompressed. + // + // values-data is then the encoded and optionally compressed sequence of + // non-NULL values. + PageData []byte + + // PageInfo describes a page. + PageInfo struct { + UncompressedSize int // UncompressedSize is the size of a page before compression. + CompressedSize int // CompressedSize is the size of a page after compression. + CRC32 uint32 // CRC32 checksum of the page after encoding and compression. + RowCount int // RowCount is the number of rows in the page, including NULLs. + + Encoding datasetmd.EncodingType // Encoding used for values in the page. + Stats *datasetmd.Statistics // Optional statistics for the page. + } +) + +// MemPage holds an encoded (and optionally compressed) sequence of [Value] +// entries of a common type. Use [ColumnBuilder] to construct sets of pages. +type MemPage struct { + Info PageInfo // Information about the page. + Data PageData // Data for the page. +} + +var checksumTable = crc32.MakeTable(crc32.Castagnoli) + +// reader returns a reader for decompressed page data. Reader returns an error +// if the CRC32 fails to validate. +func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Reader, values io.ReadCloser, err error) { + if actual := crc32.Checksum(p.Data, checksumTable); p.Info.CRC32 != actual { + return nil, nil, fmt.Errorf("invalid CRC32 checksum %x, expected %x", actual, p.Info.CRC32) + } + + bitmapSize, n := binary.Uvarint(p.Data) + if n <= 0 { + return nil, nil, fmt.Errorf("reading presence bitmap size: %w", err) + } + + var ( + bitmapReader = bytes.NewReader(p.Data[n : n+int(bitmapSize)]) + compressedDataReader = bytes.NewReader(p.Data[n+int(bitmapSize):]) + ) + + switch compression { + case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE: + return bitmapReader, io.NopCloser(compressedDataReader), nil + + case datasetmd.COMPRESSION_TYPE_SNAPPY: + sr := snappy.NewReader(compressedDataReader) + return bitmapReader, io.NopCloser(sr), nil + + case datasetmd.COMPRESSION_TYPE_ZSTD: + zr, err := zstd.NewReader(compressedDataReader) + if err != nil { + return nil, nil, fmt.Errorf("opening zstd reader: %w", err) + } + return bitmapReader, newZstdReader(zr), nil + } + + panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", compression.String())) +} + +// zstdReader implements [io.ReadCloser] for a [zstd.Decoder]. +type zstdReader struct{ *zstd.Decoder } + +// newZstdReader returns a new [io.ReadCloser] for a [zstd.Decoder]. +func newZstdReader(dec *zstd.Decoder) io.ReadCloser { + return &zstdReader{Decoder: dec} +} + +// Close implements [io.Closer]. +func (r *zstdReader) Close() error { + r.Decoder.Close() + return nil +} diff --git a/pkg/dataobj/internal/dataset/page_builder.go b/pkg/dataobj/internal/dataset/page_builder.go new file mode 100644 index 0000000000000..34a63a2181fb5 --- /dev/null +++ b/pkg/dataobj/internal/dataset/page_builder.go @@ -0,0 +1,240 @@ +package dataset + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +// pageBuilder accumulates sequences of [Value] in memory until reaching a +// configurable size limit. A [MemPage] can then be created from a PageBuiler +// by calling [pageBuilder.Flush]. +type pageBuilder struct { + // Each pageBuilder writes two sets of data. + // + // The first set of data is a presence bitmap which tells readers which rows + // are present. Use use 1 to indicate presence and 0 to indicate absence + // (NULL). This bitmap always uses bitmap encoding regardless of the encoding + // type used for values. + // + // The second set of data is the encoded set of non-NULL values. As an + // optimization, the zero value is treated as NULL. + // + // The two sets of data are accmumulated into separate buffers, with the + // presence bitmap being written uncompresed and the values being written + // with the configured compression type, if any. + // + // To orchestrate building two sets of data, we have a few components: + // + // * The final buffers which hold encoded and potentially compressed data. + // * The writer performing compression for values. + // * The encoders that write values. + + opts BuilderOptions + + presenceBuffer *bytes.Buffer // presenceBuffer holds the encoded presence bitmap. + valuesBuffer *bytes.Buffer // valuesBuffer holds encoded and optionally compressed values. + + valuesWriter *compressWriter // Compresses data and writes to valuesBuffer. + + presenceEnc *bitmapEncoder + valuesEnc valueEncoder + + rows int // Number of rows appended to the builder. +} + +// newPageBuilder creates a new pageBuilder that stores a sequence of [Value]s. +// newPageBuilder returns an error if there is no encoder available for the +// combination of opts.Value and opts.Encoding. +func newPageBuilder(opts BuilderOptions) (*pageBuilder, error) { + var ( + presenceBuffer = bytes.NewBuffer(nil) + valuesBuffer = bytes.NewBuffer(make([]byte, 0, opts.PageSizeHint)) + + valuesWriter = newCompressWriter(valuesBuffer, opts.Compression) + ) + + presenceEnc := newBitmapEncoder(presenceBuffer) + valuesEnc, ok := newValueEncoder(opts.Value, opts.Encoding, valuesWriter) + if !ok { + return nil, fmt.Errorf("no encoder available for %s/%s", opts.Value, opts.Encoding) + } + + return &pageBuilder{ + opts: opts, + + presenceBuffer: presenceBuffer, + valuesBuffer: valuesBuffer, + + valuesWriter: valuesWriter, + + presenceEnc: presenceEnc, + valuesEnc: valuesEnc, + }, nil +} + +// Append appends value into the pageBuilder. Append returns true if the data +// was appended; false if the pageBuilder is full. +func (b *pageBuilder) Append(value Value) bool { + if value.IsNil() || value.IsZero() { + return b.AppendNull() + } + + // We can't accurately know whether adding value would tip us over the page + // size: we don't know the current state of the encoders and we don't know + // for sure how much space value will fill. + // + // We use a rough estimate which will tend to overshoot the page size, making + // sure we rarely go over. + if sz := b.estimatedSize(); sz > 0 && sz+valueSize(value) > b.opts.PageSizeHint { + return false + } + + // The following calls won't fail; they only return errors when the + // underlying writers fail, which ours cannot. + if err := b.presenceEnc.Encode(Uint64Value(1)); err != nil { + panic(fmt.Sprintf("pageBuilder.Append: encoding presence bitmap entry: %v", err)) + } + if err := b.valuesEnc.Encode(value); err != nil { + panic(fmt.Sprintf("pageBuilder.Append: encoding value: %v", err)) + } + + b.rows++ + return true +} + +// AppendNull appends a NULL value to the Builder. AppendNull returns true if +// the NULL was appended, or false if the Builder is full. +func (b *pageBuilder) AppendNull() bool { + // See comment in Append for why we can only estimate the cost of appending a + // value. + // + // Here we assume appending a NULL costs one byte, but in reality most NULLs + // have no cost depending on the state of our bitmap encoder. + if sz := b.estimatedSize(); sz > 0 && sz+1 > b.opts.PageSizeHint { + return false + } + + // The following call won't fail; it only returns an error when the + // underlying writer fails, which ours cannot. + if err := b.presenceEnc.Encode(Uint64Value(0)); err != nil { + panic(fmt.Sprintf("Builder.AppendNull: encoding presence bitmap entry: %v", err)) + } + + b.rows++ + return true +} + +func valueSize(v Value) int { + switch v.Type() { + case datasetmd.VALUE_TYPE_INT64: + // Assuming that int64s are written as varints. + return streamio.VarintSize(v.Int64()) + + case datasetmd.VALUE_TYPE_UINT64: + // Assuming that uint64s are written as uvarints. + return streamio.UvarintSize(v.Uint64()) + + case datasetmd.VALUE_TYPE_STRING: + // Assuming that strings are PLAIN encoded using their length and bytes. + str := v.String() + return binary.Size(len(str)) + len(str) + } + + return 0 +} + +// estimatedSize returns the estimated uncompressed size of the builder in +// bytes. +func (b *pageBuilder) estimatedSize() int { + // This estimate doesn't account for any values in encoders which haven't + // been flushed yet. However, encoder buffers are usually small enough that + // we wouldn't massively overshoot our estimate. + return b.presenceBuffer.Len() + b.valuesWriter.BytesWritten() +} + +// Rows returns the number of rows appended to the pageBuilder. +func (b *pageBuilder) Rows() int { return b.rows } + +// Flush converts data in pageBuilder into a [MemPage], and returns it. +// Afterwards, pageBuilder is reset to a fresh state and can be reused. Flush +// returns an error if the pageBuilder is empty. +// +// To avoid computing useless stats, the Stats field of the returned Page is +// unset. If stats are needed for a page, callers should compute them by +// iterating over the returned Page. +func (b *pageBuilder) Flush() (*MemPage, error) { + if b.rows == 0 { + return nil, fmt.Errorf("no data to flush") + } + + // Before we can build the page we need to finish flushing our encoders and writers. + if err := b.presenceEnc.Flush(); err != nil { + return nil, fmt.Errorf("flushing presence encoder: %w", err) + } else if err := b.valuesEnc.Flush(); err != nil { + return nil, fmt.Errorf("flushing values encoder: %w", err) + } else if err := b.valuesWriter.Flush(); err != nil { + return nil, fmt.Errorf("flushing values writer: %w", err) + } + + // The final data of our page is the combination of the presence bitmap and + // the values. To denote when one ends and the other begins, we prepend the + // data with the size of the presence bitmap as a uvarint. See the doc + // comment of [PageData] for more information. + var ( + headerSize = streamio.UvarintSize(uint64(b.presenceBuffer.Len())) + presenceSize = b.presenceBuffer.Len() + valuesSize = b.valuesBuffer.Len() + + finalData = bytes.NewBuffer(make([]byte, 0, headerSize+presenceSize+valuesSize)) + ) + + if err := streamio.WriteUvarint(finalData, uint64(b.presenceBuffer.Len())); err != nil { + return nil, fmt.Errorf("writing presence buffer size: %w", err) + } else if _, err := b.presenceBuffer.WriteTo(finalData); err != nil { + return nil, fmt.Errorf("writing presence buffer: %w", err) + } else if _, err := b.valuesBuffer.WriteTo(finalData); err != nil { + return nil, fmt.Errorf("writing values buffer: %w", err) + } + + checksum := crc32.Checksum(finalData.Bytes(), checksumTable) + + page := MemPage{ + Info: PageInfo{ + UncompressedSize: headerSize + presenceSize + b.valuesWriter.BytesWritten(), + CompressedSize: finalData.Len(), + CRC32: checksum, + RowCount: b.rows, + + Encoding: b.opts.Encoding, + + // TODO(rfratto): At the moment we don't compute stats because they're + // not going to be valuable in every scenario: the min/max values for log + // lines is less useful compared to the min/max values for timestamps. + // + // In the future, we may wish to add more options to pageBuilder to tell + // it to compute a subset of stats to avoid needing a second iteration + // over the page to compute them. + Stats: nil, + }, + + Data: finalData.Bytes(), + } + + b.Reset() // Reset state before returning. + return &page, nil +} + +// Reset resets the pageBuilder to a fresh state, allowing it to be reused. +func (b *pageBuilder) Reset() { + b.presenceBuffer.Reset() + b.valuesBuffer.Reset() + b.valuesWriter.Reset(b.valuesBuffer) + b.presenceBuffer.Reset() + b.valuesEnc.Reset(b.valuesWriter) + b.rows = 0 +} diff --git a/pkg/dataobj/internal/dataset/page_compress_writer.go b/pkg/dataobj/internal/dataset/page_compress_writer.go new file mode 100644 index 0000000000000..3fad4a0edfe0f --- /dev/null +++ b/pkg/dataobj/internal/dataset/page_compress_writer.go @@ -0,0 +1,123 @@ +package dataset + +import ( + "bufio" + "fmt" + "io" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +// A compressWriter is a [streamio.Writer] that compresses data passed to it. +type compressWriter struct { + // To be able to implmeent [io.ByteWriter], we always write directly to buf, + // which then flushes to w once it's full. + + w io.WriteCloser // Compressing writer. + buf *bufio.Writer // Buffered writer in front of w to be able to call WriteByte. + + compression datasetmd.CompressionType // Compression type being used. + rawBytes int // Number of uncompressed bytes written. +} + +var _ streamio.Writer = (*compressWriter)(nil) + +func newCompressWriter(w io.Writer, ty datasetmd.CompressionType) *compressWriter { + c := compressWriter{compression: ty} + c.Reset(w) + return &c +} + +// Write writes p to c. +func (c *compressWriter) Write(p []byte) (n int, err error) { + n, err = c.buf.Write(p) + c.rawBytes += n + return +} + +// WriteByte writes a single byte to c. +func (c *compressWriter) WriteByte(b byte) error { + if err := c.buf.WriteByte(b); err != nil { + return err + } + c.rawBytes++ + return nil +} + +// Flush compresses any pending uncompressed data in the buffer. +func (c *compressWriter) Flush() error { + // Flush our buffer first so c.w is up to date. + if err := c.buf.Flush(); err != nil { + return fmt.Errorf("flushing buffer: %w", err) + } + + // c.w may not support Flush (such as when using no compression), so we check + // first. + if f, ok := c.w.(interface{ Flush() error }); ok { + if err := f.Flush(); err != nil { + return fmt.Errorf("flushing compressing writer: %w", err) + } + } + + return nil +} + +// Reset discards the writer's state and switches the compressor to write to w. +// This permits reusing a compressWriter rather than allocating a new one. +func (c *compressWriter) Reset(w io.Writer) { + resetter, ok := c.w.(interface{ Reset(io.Writer) }) + switch ok { + case true: + resetter.Reset(w) + default: + // c.w is unset or doesn't support Reset; build a new writer. + var compressedWriter io.WriteCloser + + switch c.compression { + case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE: + compressedWriter = nopCloseWriter{w} + + case datasetmd.COMPRESSION_TYPE_SNAPPY: + compressedWriter = snappy.NewBufferedWriter(w) + + case datasetmd.COMPRESSION_TYPE_ZSTD: + zw, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) + if err != nil { + panic(fmt.Sprintf("compressWriter.Reset: creating zstd writer: %v", err)) + } + compressedWriter = zw + + default: + panic(fmt.Sprintf("compressWriter.Reset: unknown compression type %v", c.compression)) + } + + c.w = compressedWriter + } + + if c.buf != nil { + c.buf.Reset(c.w) + } else { + c.buf = bufio.NewWriter(c.w) + } + c.rawBytes = 0 +} + +// BytesWritten returns the number of uncompressed bytes written to c. +func (c *compressWriter) BytesWritten() int { return c.rawBytes } + +// Close flushes and then closes c. +func (c *compressWriter) Close() error { + if err := c.Flush(); err != nil { + return err + } + return c.w.Close() +} + +type nopCloseWriter struct{ w io.Writer } + +func (w nopCloseWriter) Write(p []byte) (n int, err error) { return w.w.Write(p) } +func (w nopCloseWriter) Close() error { return nil } diff --git a/pkg/dataobj/internal/dataset/page_iter.go b/pkg/dataobj/internal/dataset/page_iter.go new file mode 100644 index 0000000000000..442e104135344 --- /dev/null +++ b/pkg/dataobj/internal/dataset/page_iter.go @@ -0,0 +1,53 @@ +package dataset + +import ( + "bufio" + "errors" + "fmt" + "io" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType datasetmd.CompressionType) result.Seq[Value] { + return result.Iter(func(yield func(Value) bool) error { + presenceReader, valuesReader, err := p.reader(compressionType) + if err != nil { + return fmt.Errorf("opening page for reading: %w", err) + } + defer valuesReader.Close() + + presenceDec := newBitmapDecoder(bufio.NewReader(presenceReader)) + valuesDec, ok := newValueDecoder(valueType, p.Info.Encoding, bufio.NewReader(valuesReader)) + if !ok { + return fmt.Errorf("no decoder available for %s/%s", valueType, p.Info.Encoding) + } + + for { + var value Value + + present, err := presenceDec.Decode() + if errors.Is(err, io.EOF) { + return nil + } else if err != nil { + return fmt.Errorf("decoding presence bitmap: %w", err) + } else if present.Type() != datasetmd.VALUE_TYPE_UINT64 { + return fmt.Errorf("unexpected presence type %s", present.Type()) + } + + // value is currently nil. If the presence bitmap says our row has a + // value, we decode it into value. + if present.Uint64() == 1 { + value, err = valuesDec.Decode() + if err != nil { + return fmt.Errorf("decoding value: %w", err) + } + } + + if !yield(value) { + return nil + } + } + }) +} diff --git a/pkg/dataobj/internal/dataset/page_test.go b/pkg/dataobj/internal/dataset/page_test.go new file mode 100644 index 0000000000000..1cbd025576a4d --- /dev/null +++ b/pkg/dataobj/internal/dataset/page_test.go @@ -0,0 +1,83 @@ +package dataset + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +func Test_pageBuilder_WriteRead(t *testing.T) { + in := []string{ + "hello, world!", + "", + "this is a test of the emergency broadcast system", + "this is only a test", + "if this were a real emergency, you would be instructed to panic", + "but it's not, so don't", + "", + "this concludes the test", + "thank you for your cooperation", + "goodbye", + } + + opts := BuilderOptions{ + PageSizeHint: 1024, + Value: datasetmd.VALUE_TYPE_STRING, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + } + b, err := newPageBuilder(opts) + require.NoError(t, err) + + for _, s := range in { + require.True(t, b.Append(StringValue(s))) + } + + page, err := b.Flush() + require.NoError(t, err) + + t.Log("Uncompressed size: ", page.Info.UncompressedSize) + t.Log("Compressed size: ", page.Info.CompressedSize) + + var actual []string + for result := range iterMemPage(page, opts.Value, opts.Compression) { + val, err := result.Value() + require.NoError(t, err) + + if val.IsNil() || val.IsZero() { + actual = append(actual, "") + } else { + require.Equal(t, datasetmd.VALUE_TYPE_STRING, val.Type()) + actual = append(actual, val.String()) + } + } + require.Equal(t, in, actual) +} + +func Test_pageBuilder_Fill(t *testing.T) { + opts := BuilderOptions{ + PageSizeHint: 1_500_000, + Value: datasetmd.VALUE_TYPE_INT64, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + } + buf, err := newPageBuilder(opts) + require.NoError(t, err) + + ts := time.Now().UTC() + for buf.Append(Int64Value(ts.UnixNano())) { + ts = ts.Add(time.Duration(rand.Intn(5000)) * time.Millisecond) + } + + page, err := buf.Flush() + require.NoError(t, err) + require.Equal(t, page.Info.UncompressedSize, page.Info.CompressedSize) + + t.Log("Uncompressed size: ", page.Info.UncompressedSize) + t.Log("Compressed size: ", page.Info.CompressedSize) + t.Log("Row count: ", page.Info.RowCount) +} diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go index 52f2a35c53e9d..4121ca928c598 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -4,10 +4,15 @@ package datasetmd import ( + bytes "bytes" fmt "fmt" proto "github.com/gogo/protobuf/proto" + io "io" math "math" + math_bits "math/bits" + reflect "reflect" strconv "strconv" + strings "strings" ) // Reference imports to suppress errors if they are not otherwise used. @@ -88,9 +93,99 @@ func (EncodingType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_7ab9d5b21b743868, []int{1} } +// CompressionType represents the valid compression types that can be used for +// compressing values in a page. +type CompressionType int32 + +const ( + // Invalid compression type. + COMPRESSION_TYPE_UNSPECIFIED CompressionType = 0 + // No compression. + COMPRESSION_TYPE_NONE CompressionType = 1 + // Snappy compression. + COMPRESSION_TYPE_SNAPPY CompressionType = 2 + // Zstd compression. + COMPRESSION_TYPE_ZSTD CompressionType = 3 +) + +var CompressionType_name = map[int32]string{ + 0: "COMPRESSION_TYPE_UNSPECIFIED", + 1: "COMPRESSION_TYPE_NONE", + 2: "COMPRESSION_TYPE_SNAPPY", + 3: "COMPRESSION_TYPE_ZSTD", +} + +var CompressionType_value = map[string]int32{ + "COMPRESSION_TYPE_UNSPECIFIED": 0, + "COMPRESSION_TYPE_NONE": 1, + "COMPRESSION_TYPE_SNAPPY": 2, + "COMPRESSION_TYPE_ZSTD": 3, +} + +func (CompressionType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_7ab9d5b21b743868, []int{2} +} + +// Statistics about a column or a page. All statistics are optional and are +// conditionally set depending on the column type. +type Statistics struct { + // Minimum value. + MinValue []byte `protobuf:"bytes,1,opt,name=min_value,json=minValue,proto3" json:"min_value,omitempty"` + // Maximum value. + MaxValue []byte `protobuf:"bytes,2,opt,name=max_value,json=maxValue,proto3" json:"max_value,omitempty"` +} + +func (m *Statistics) Reset() { *m = Statistics{} } +func (*Statistics) ProtoMessage() {} +func (*Statistics) Descriptor() ([]byte, []int) { + return fileDescriptor_7ab9d5b21b743868, []int{0} +} +func (m *Statistics) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Statistics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Statistics.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Statistics) XXX_Merge(src proto.Message) { + xxx_messageInfo_Statistics.Merge(m, src) +} +func (m *Statistics) XXX_Size() int { + return m.Size() +} +func (m *Statistics) XXX_DiscardUnknown() { + xxx_messageInfo_Statistics.DiscardUnknown(m) +} + +var xxx_messageInfo_Statistics proto.InternalMessageInfo + +func (m *Statistics) GetMinValue() []byte { + if m != nil { + return m.MinValue + } + return nil +} + +func (m *Statistics) GetMaxValue() []byte { + if m != nil { + return m.MaxValue + } + return nil +} + func init() { proto.RegisterEnum("dataobj.metadata.dataset.v1.ValueType", ValueType_name, ValueType_value) proto.RegisterEnum("dataobj.metadata.dataset.v1.EncodingType", EncodingType_name, EncodingType_value) + proto.RegisterEnum("dataobj.metadata.dataset.v1.CompressionType", CompressionType_name, CompressionType_value) + proto.RegisterType((*Statistics)(nil), "dataobj.metadata.dataset.v1.Statistics") } func init() { @@ -98,27 +193,34 @@ func init() { } var fileDescriptor_7ab9d5b21b743868 = []byte{ - // 310 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2f, 0xc8, 0x4e, 0xd7, - 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, - 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0x82, 0x65, 0x8a, 0x53, 0x4b, 0x72, 0x53, 0x10, 0x2c, - 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x69, 0xa8, 0x26, 0x3d, 0x98, 0x5a, 0x3d, 0xa8, 0x0a, - 0xbd, 0x32, 0x43, 0xad, 0x6c, 0x2e, 0xce, 0xb0, 0xc4, 0x9c, 0xd2, 0xd4, 0x90, 0xca, 0x82, 0x54, - 0x21, 0x29, 0x2e, 0xb1, 0x30, 0x47, 0x9f, 0x50, 0xd7, 0xf8, 0x90, 0xc8, 0x00, 0xd7, 0xf8, 0x50, - 0xbf, 0xe0, 0x00, 0x57, 0x67, 0x4f, 0x37, 0x4f, 0x57, 0x17, 0x01, 0x06, 0x21, 0x11, 0x2e, 0x01, - 0x24, 0x39, 0x4f, 0xbf, 0x10, 0x33, 0x13, 0x01, 0x46, 0x21, 0x51, 0x2e, 0x41, 0x64, 0x1d, 0x10, - 0x61, 0x26, 0x34, 0xe1, 0xe0, 0x90, 0x20, 0x4f, 0x3f, 0x77, 0x01, 0x66, 0xad, 0x4a, 0x2e, 0x1e, - 0xd7, 0xbc, 0xe4, 0xfc, 0x94, 0xcc, 0xbc, 0x74, 0xb0, 0x7d, 0xb2, 0x5c, 0x92, 0xae, 0x7e, 0xce, - 0xfe, 0x2e, 0x9e, 0x7e, 0xee, 0xd8, 0xac, 0x14, 0xe7, 0x12, 0x46, 0x95, 0x0e, 0xf0, 0x71, 0xf4, - 0xf4, 0x13, 0x60, 0xc4, 0x94, 0x70, 0x71, 0xf5, 0x09, 0x71, 0x14, 0x60, 0x12, 0x92, 0xe0, 0x12, - 0x41, 0x95, 0x70, 0xf2, 0x0c, 0xf1, 0x75, 0x0c, 0x10, 0x60, 0x76, 0xaa, 0xb8, 0xf0, 0x50, 0x8e, - 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, - 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, - 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, - 0x78, 0x2c, 0xc7, 0x10, 0xe5, 0x94, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, 0x9f, 0xab, - 0x9f, 0x5e, 0x94, 0x98, 0x96, 0x98, 0x97, 0xa8, 0x9f, 0x93, 0x9f, 0x9d, 0xa9, 0x5f, 0x66, 0xac, - 0x4f, 0x64, 0x74, 0x24, 0xb1, 0x81, 0x63, 0xc1, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xee, 0xfd, - 0x13, 0xfe, 0xc0, 0x01, 0x00, 0x00, + // 420 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x31, 0x6f, 0xd3, 0x40, + 0x1c, 0xc5, 0x7d, 0x89, 0x84, 0xe8, 0x5f, 0x95, 0x38, 0x4c, 0x4b, 0x5b, 0x02, 0xa7, 0x8a, 0x09, + 0x65, 0xb0, 0x85, 0x8a, 0x60, 0x76, 0x92, 0x6b, 0x75, 0x52, 0x7a, 0xb1, 0x62, 0xb7, 0x52, 0xbb, + 0x44, 0x97, 0xc4, 0x84, 0x23, 0xf1, 0x9d, 0x65, 0x5f, 0xa3, 0x74, 0x63, 0x62, 0xe6, 0x63, 0xf0, + 0x51, 0x18, 0x33, 0x76, 0x24, 0xce, 0xc2, 0xd8, 0x8f, 0x80, 0x6a, 0x8c, 0x68, 0x48, 0x86, 0x6e, + 0xff, 0x7b, 0xbf, 0xf7, 0xf4, 0x6e, 0x78, 0xf0, 0x21, 0x19, 0x8f, 0xdc, 0xa1, 0x30, 0x42, 0xf7, + 0x3f, 0xbb, 0x52, 0x99, 0x28, 0x55, 0x62, 0xe2, 0xc6, 0x91, 0x11, 0x77, 0x62, 0x41, 0xb2, 0xc8, + 0xc4, 0xc3, 0x7f, 0x97, 0x93, 0xa4, 0xda, 0x68, 0xbb, 0x56, 0x86, 0x9c, 0xbf, 0x5e, 0xa7, 0x74, + 0x38, 0xd3, 0xb7, 0xaf, 0x8f, 0x01, 0x02, 0x23, 0x8c, 0xcc, 0x8c, 0x1c, 0x64, 0x76, 0x0d, 0xb6, + 0x62, 0xa9, 0x7a, 0x53, 0x31, 0xb9, 0x8a, 0xf6, 0xd1, 0x21, 0x7a, 0xb3, 0xdd, 0x7d, 0x1c, 0x4b, + 0x75, 0x7e, 0xf7, 0x2e, 0xa0, 0x98, 0x95, 0xb0, 0x52, 0x42, 0x31, 0x2b, 0x60, 0x7d, 0x0c, 0x5b, + 0xc5, 0x11, 0x5e, 0x27, 0x91, 0xfd, 0x02, 0x9e, 0x9f, 0x7b, 0xed, 0x33, 0xda, 0x0b, 0x2f, 0x7c, + 0xda, 0x3b, 0xe3, 0x81, 0x4f, 0x9b, 0xec, 0x98, 0xd1, 0x16, 0xb6, 0xec, 0x1d, 0xc0, 0xf7, 0x18, + 0xe3, 0xe1, 0xfb, 0x77, 0x18, 0xd9, 0xbb, 0xf0, 0xf4, 0x7e, 0xe2, 0x8f, 0x5c, 0xf9, 0x4f, 0x0e, + 0xc2, 0x2e, 0xe3, 0x27, 0xb8, 0x5a, 0xbf, 0x86, 0x6d, 0xaa, 0x06, 0x7a, 0x28, 0xd5, 0xa8, 0xe8, + 0x7b, 0x05, 0x07, 0x94, 0x37, 0x3b, 0x2d, 0xc6, 0x4f, 0x36, 0x55, 0xee, 0xc1, 0xb3, 0x55, 0xec, + 0xb7, 0x3d, 0xc6, 0x31, 0x5a, 0x07, 0x2d, 0xda, 0x0e, 0x3d, 0x5c, 0xb1, 0xf7, 0x61, 0x67, 0x15, + 0x34, 0x58, 0x78, 0xea, 0xf9, 0xb8, 0x5a, 0xff, 0x8a, 0xe0, 0x49, 0x53, 0xc7, 0x49, 0x1a, 0x65, + 0x99, 0xd4, 0xaa, 0xa8, 0x3f, 0x84, 0x97, 0xcd, 0xce, 0xa9, 0xdf, 0xa5, 0x41, 0xc0, 0x3a, 0x7c, + 0xd3, 0x0f, 0x0e, 0x60, 0x77, 0xcd, 0xc1, 0x3b, 0x9c, 0x62, 0x64, 0xd7, 0x60, 0x6f, 0x0d, 0x05, + 0xdc, 0xf3, 0xfd, 0x0b, 0x5c, 0xd9, 0x98, 0xbb, 0x0c, 0xc2, 0x16, 0xae, 0x36, 0x66, 0xf3, 0x05, + 0xb1, 0x6e, 0x16, 0xc4, 0xba, 0x5d, 0x10, 0xf4, 0x25, 0x27, 0xe8, 0x7b, 0x4e, 0xd0, 0x8f, 0x9c, + 0xa0, 0x79, 0x4e, 0xd0, 0xcf, 0x9c, 0xa0, 0x5f, 0x39, 0xb1, 0x6e, 0x73, 0x82, 0xbe, 0x2d, 0x89, + 0x35, 0x5f, 0x12, 0xeb, 0x66, 0x49, 0xac, 0xcb, 0xc6, 0x48, 0x9a, 0x4f, 0x57, 0x7d, 0x67, 0xa0, + 0x63, 0x77, 0x94, 0x8a, 0x8f, 0x42, 0x09, 0x77, 0xa2, 0xc7, 0xd2, 0x9d, 0x1e, 0xb9, 0x0f, 0xdc, + 0x57, 0xff, 0x51, 0x31, 0xab, 0xa3, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x79, 0x22, 0x11, 0xce, + 0x91, 0x02, 0x00, 0x00, } func (x ValueType) String() string { @@ -135,3 +237,377 @@ func (x EncodingType) String() string { } return strconv.Itoa(int(x)) } +func (x CompressionType) String() string { + s, ok := CompressionType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (this *Statistics) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Statistics) + if !ok { + that2, ok := that.(Statistics) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !bytes.Equal(this.MinValue, that1.MinValue) { + return false + } + if !bytes.Equal(this.MaxValue, that1.MaxValue) { + return false + } + return true +} +func (this *Statistics) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&datasetmd.Statistics{") + s = append(s, "MinValue: "+fmt.Sprintf("%#v", this.MinValue)+",\n") + s = append(s, "MaxValue: "+fmt.Sprintf("%#v", this.MaxValue)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringDatasetmd(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Statistics) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Statistics) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Statistics) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.MaxValue) > 0 { + i -= len(m.MaxValue) + copy(dAtA[i:], m.MaxValue) + i = encodeVarintDatasetmd(dAtA, i, uint64(len(m.MaxValue))) + i-- + dAtA[i] = 0x12 + } + if len(m.MinValue) > 0 { + i -= len(m.MinValue) + copy(dAtA[i:], m.MinValue) + i = encodeVarintDatasetmd(dAtA, i, uint64(len(m.MinValue))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintDatasetmd(dAtA []byte, offset int, v uint64) int { + offset -= sovDatasetmd(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Statistics) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.MinValue) + if l > 0 { + n += 1 + l + sovDatasetmd(uint64(l)) + } + l = len(m.MaxValue) + if l > 0 { + n += 1 + l + sovDatasetmd(uint64(l)) + } + return n +} + +func sovDatasetmd(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozDatasetmd(x uint64) (n int) { + return sovDatasetmd(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Statistics) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Statistics{`, + `MinValue:` + fmt.Sprintf("%v", this.MinValue) + `,`, + `MaxValue:` + fmt.Sprintf("%v", this.MaxValue) + `,`, + `}`, + }, "") + return s +} +func valueToStringDatasetmd(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Statistics) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Statistics: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Statistics: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MinValue", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDatasetmd + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDatasetmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MinValue = append(m.MinValue[:0], dAtA[iNdEx:postIndex]...) + if m.MinValue == nil { + m.MinValue = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxValue", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDatasetmd + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDatasetmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MaxValue = append(m.MaxValue[:0], dAtA[iNdEx:postIndex]...) + if m.MaxValue == nil { + m.MaxValue = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDatasetmd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDatasetmd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDatasetmd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipDatasetmd(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthDatasetmd + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthDatasetmd + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipDatasetmd(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthDatasetmd + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthDatasetmd = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowDatasetmd = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto index 478f74037871f..e396f9a627f49 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -37,3 +37,29 @@ enum EncodingType { // integers using a combination of run-length encoding and bitpacking. ENCODING_TYPE_BITMAP = 3; } + +// CompressionType represents the valid compression types that can be used for +// compressing values in a page. +enum CompressionType { + // Invalid compression type. + COMPRESSION_TYPE_UNSPECIFIED = 0; + + // No compression. + COMPRESSION_TYPE_NONE = 1; + + // Snappy compression. + COMPRESSION_TYPE_SNAPPY = 2; + + // Zstd compression. + COMPRESSION_TYPE_ZSTD = 3; +} + +// Statistics about a column or a page. All statistics are optional and are +// conditionally set depending on the column type. +message Statistics { + // Minimum value. + bytes min_value = 1; + + // Maximum value. + bytes max_value = 2; +} diff --git a/pkg/dataobj/internal/result/result.go b/pkg/dataobj/internal/result/result.go new file mode 100644 index 0000000000000..7712e7a46ff7c --- /dev/null +++ b/pkg/dataobj/internal/result/result.go @@ -0,0 +1,101 @@ +// Package result provides utilities for dealing with iterators that can fail +// during iteration. +// +// Result is useful to make it harder for callers to ignore errors. Using +// iter.Seq2[V, error] can make it easy to accidentally ignore errors: +// +// func myIter() iter.Seq2[V, error] { ... } +// +// func main() { +// for v := range myIter() { /* errors are ignored! */ } +// } +package result + +import ( + "errors" + "iter" +) + +// Result is a type used for representing a result from an operation that can +// fail. +type Result[V any] struct { + value V // Valid only if err is nil. + err error +} + +// Value returns a successful result with the given value. +func Value[V any](v V) Result[V] { + return Result[V]{value: v} +} + +// Error returns a failed result with the given error. +func Error[V any](err error) Result[V] { + return Result[V]{err: err} +} + +// Value returns r's value and error. +func (r Result[V]) Value() (V, error) { + return r.value, r.err +} + +// MustValue returns r's value. If r is an error, MustValue panics. +func (r Result[V]) MustValue() V { + if r.err != nil { + panic(r.err) + } + return r.value +} + +// Err returns r's error, if any. +func (r Result[V]) Err() error { + return r.err +} + +// Seq is an iterator over sequences of result values. When called as +// seq(yield), seq calls yield(r) for each value r in the sequence, stopping +// early if yield returns false. +// +// See the [iter] package for more information on iterators. +type Seq[V any] func(yield func(Result[V]) bool) + +// Iter produces a new Seq[V] from a given function that can fail. Values +// passed to yield are wrapped in a call to [Value], while a non-nil error is +// wrapped in a call to [Error]. +// +// Iter makes it easier to write failable iterators and removes the need to +// manually wrap values and errors into a [Result]. +func Iter[V any](seq func(yield func(V) bool) error) Seq[V] { + return func(yield func(Result[V]) bool) { + err := seq(func(v V) bool { return yield(Value(v)) }) + if err != nil { + yield(Error[V](err)) + } + } +} + +// Pull converts the "push-style" Result iterator sequence seq into a +// "pull-style" iterator accessed by the two functions next and stop. +// +// Pull is a wrapper around [iter.Pull]. +func Pull[V any](seq Seq[V]) (next func() (Result[V], bool), stop func()) { + iseq := iter.Seq[Result[V]](seq) + return iter.Pull(iseq) +} + +// Collect collects values from seq into a new slice and returns it. Any errors +// from seq are joined and returned as the second value. +func Collect[V any](seq Seq[V]) ([]V, error) { + var ( + vals []V + errs []error + ) + for res := range seq { + val, err := res.Value() + if err != nil { + errs = append(errs, err) + } else { + vals = append(vals, val) + } + } + return vals, errors.Join(errs...) +} diff --git a/pkg/dataobj/internal/streamio/varint.go b/pkg/dataobj/internal/streamio/varint.go index e8420605e8c45..832a0cf4eb1e4 100644 --- a/pkg/dataobj/internal/streamio/varint.go +++ b/pkg/dataobj/internal/streamio/varint.go @@ -3,6 +3,7 @@ package streamio import ( "encoding/binary" "io" + "math/bits" ) // [binary] does not have an implementation to write varints directly @@ -10,6 +11,23 @@ import ( // encoders to stream values, we provide equivalent implementations of // [binary.AppendUvarint] and [binary.AppendVarint] which accept a ByteWriter. +// VarintSize returns the number of bytes needed to encode x. +func VarintSize(x int64) int { + ux := uint64(x) << 1 + if x < 0 { + ux = ^ux + } + return UvarintSize(ux) +} + +// UvarintSize returns the number of bytes needed to encode x. +func UvarintSize(x uint64) int { + if x == 0 { + return 1 + } + return 1 + (63-bits.LeadingZeros64(x))/7 +} + // WriteVarint writes an encoded signed integer to w. func WriteVarint(w io.ByteWriter, x int64) error { // Like [binary.AppendVarint], we use zig-zag encoding so small negative