Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up airflow variable defaults with descriptions automatically #4297

Prev Previous commit
Next Next commit
Automatically set up airflow variable defaults with descriptions.
  • Loading branch information
madewithkode committed May 14, 2024
commit d45461f385547ec9d5cb3a407aadaf17914ca377
1 change: 1 addition & 0 deletions catalog/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ ARG CONSTRAINTS_FILE="https://raw.githubusercontent.com/apache/airflow/constrain
RUN pip install -r ${REQUIREMENTS_FILE} -c ${CONSTRAINTS_FILE}

COPY entrypoint.sh /opt/airflow/entrypoint.sh
COPY variables.tsv /opt/airflow/variables.tsv

ENTRYPOINT ["/usr/bin/dumb-init", "--", "/opt/airflow/entrypoint.sh"]
117 changes: 60 additions & 57 deletions catalog/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,71 +61,74 @@ while read -r var_string; do
# only include Slack airflow connections
done < <(env | grep "^AIRFLOW_CONN_SLACK*")

# Set up Airflow Variable defaults with descriptions automatically
# List all existing airflow variables
output=$(airflow variables list -o plain)
found_existing_vars=true

# if there are no existing variable, print this notification and continue
if [[ -z $output || $output == "No data found" ]]; then
header "No existing variables found, proceeding to set all variables"
found_existing_vars=false
fi
if [[ $* == "webserver" ]]; then
# Wait for the database to initialize, will time out if not
airflow db check-migrations
# Set up Airflow Variable defaults with descriptions automatically
header "SETTING VARIABLE DEFAULTS"

# List all existing airflow variables, ignoring the first descriptive "key"
output=$(airflow variables list -o plain | tail -n +2)
found_existing_vars=true

# if there are no existing variable, print this notification and continue
obulat marked this conversation as resolved.
Show resolved Hide resolved
if [[ -z $output || $output == "No data found" ]]; then
echo "No existing variables found, proceeding to set all variables"
found_existing_vars=false
fi

# Initialize an empty array to store the variables from the output
existing_variables=()
# Initialize an empty array to store the variables from the output
existing_variables=()

# Iterate through each variable and add it to $existing_variables
while IFS= read -r variable; do
# skip airflow's default descriptive 'key' output
if [[ $variable == "key" ]]; then
continue
fi
# Append the current variable to the array
existing_variables+=("$variable")
done <<<"$output"

if $found_existing_vars; then
header "Found the following existing variables(The values of these will not be overwritten):"
for variable in "${existing_variables[@]}"; do
echo "$variable"
done
fi
# Iterate through each variable and add it to $existing_variables
while IFS= read -r variable; do
# Append the current variable to the array
existing_variables+=("$variable")
done <<<"$output"

# now iterate through each row of variables.tsv and and only
# run airflow variables set --description <description> <key> <value>
# if the key doesn't already exist in the database i.e not found in
# $existing_variables
while IFS=$'\t' read -r column1 column2 column3; do
# skip the first meta row or a row with empty data
if [[ $column3 == "description" ]] || [[ -z $column2 ]]; then
continue
if $found_existing_vars; then
echo -e "Found the following existing variables(the values of these will not be overwritten):\n"
obulat marked this conversation as resolved.
Show resolved Hide resolved
for variable in "${existing_variables[@]}"; do
echo "$variable"
done
fi

# check if current key already exists
matched=false
for variable in "${existing_variables[@]}"; do
if [[ $variable == "$column1" ]]; then
matched=true
# now iterate through each row of variables.tsv and and only
obulat marked this conversation as resolved.
Show resolved Hide resolved
# run airflow variables set --description <description> <key> <value>
# if the key doesn't already exist in the database i.e not found in
obulat marked this conversation as resolved.
Show resolved Hide resolved
# $existing_variables
while IFS=$'\t' read -r column1 column2 column3; do
# skip the first meta row or a row with empty data
if [[ $column3 == "description" ]] || [[ -z $column2 ]]; then
continue
fi
done

if [ "$column1" != "Key" ] && ! $matched; then
airflow variables set --description "$column3" "$column1" "$column2"
# check if current key already exists
matched=false
for variable in "${existing_variables[@]}"; do
if [[ $variable == "$column1" ]]; then
matched=true
fi
done

if ! $matched; then
airflow variables set --description "$column3" "$column1" "$column2"
fi
done <"variables.tsv"

# Print the new variables list
new_varibles_list=$(airflow variables list -o plain | tail -n +2)
echo -e "The following variables are now set:\n"
echo "$new_varibles_list"

# if the last line in variables.tsv did not correctly terminate
obulat marked this conversation as resolved.
Show resolved Hide resolved
# with a new line character then this variable would not be empty
# and this means the last line would not be read correctly.
if [ -n "$column1" ]; then
echo -e "WARNING: Missing new line character detected!!!\n"
echo -e "Last variable added to variables.tsv might not be picked up,\nEnsure it ends with a new line character and retry."
fi
done <"variables.tsv"

# Print the new variables list
new_varibles_list=$(airflow variables list -o plain)
header "The following variables are now set:"
echo "$new_varibles_list"

# if the last line in variables.tsv did not correctly terminate
# with a new line character then this variable would not be empty
# and this means the last line would not be read correctly.
if [ -n "$column1" ]; then
header "Missing new line character detected!!!"
echo -e "Last variable added to variables.tsv might not be picked up,\nensure it ends with a new line character and retry."

fi

exec /entrypoint "$@"
1 change: 0 additions & 1 deletion catalog/variables.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ key default description
SILENCED_SLACK_NOTIFICATIONS {} Configuration for a silencing Slack notifications from a DAG. Mapping of DAG ID to a list of dictionaries containing the following keys: "issue" (a link to a GitHub issue which describes why the notification is silenced and tracks resolving the problem), "predicate" (Slack notifications whose text or username contain the predicate will be silenced, matching is case-insensitive), and "task_id_pattern" (a regex pattern that matches the task_id of the task that triggered the notification, optional). Declaration: https://github.com/WordPress/openverse/blob/d500b7764c411f7d228ae12c57dce519c8709610/catalog/dags/common/slack.py#L72-L86 Example: { "finnish_museums_workflow": [ { "issue": "https://github.com/WordPress/openverse/issues/1605", "predicate": "AirflowTaskTimeout", "task_id_pattern": "clean_data" } ] }
SKIPPED_INGESTION_ERRORS {} Configuration for silencing an ingestion error and preventing a Slack message from being sent. Mapping of DAG ID to a list of dictionaries containing the following keys: "issue" (a link to a GitHub issue which describes why the error is silenced and tracks resolving the problem), and "predicate" (errors whose classname or message contain the predicate will be skipped, matching is case-insensitive). Declaration: https://github.com/WordPress/openverse/blob/6636dcfbb57abca19ef32027975f78548e10411f/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py#L53-L64 Example: { "science_museum_workflow": [ { "issue": "https://github.com/WordPress/openverse/issues/4013", "predicate": "Service unavailable for url" } ] }
CONFIGURATION_OVERRIDES {} DAG configuration overrides for the provider ingestion workflows. Currently only supports overriding the execution timeout for certain tasks, but allows dynamic overrides at DAG run time. Mapping of DAG ID to a list of dictionaries containing the following keys: "task_id" (a regex pattern that matches the task_id of the task to be modified), and "timeout" (str in "%d:%H:%M:%S" format giving the amount of time the task may take, example: 6d:10h:30m). Declaration: https://github.com/WordPress/openverse/blob/2cffcb9f8da6961e84a00854a3cd472fd0f9dad8/catalog/dags/providers/provider_workflows.py#L42-L58 Example: { "brooklyn_museum_workflow": [ { "task_id_pattern": "pull_image_data", "timeout": "10h" } ] }
TESTING_CONFIGURATION_OVERRIDES {} DAG configuration overrides for the provider ingestion workflows. Currently only supports overriding the execution timeout for certain tasks, but allows dynamic overrides at DAG run time. Mapping of DAG ID to a list of dictionaries containing the following keys: "task_id" (a regex pattern that matches the task_id of the task to be modified), and "timeout" (str in "%d:%H:%M:%S" format giving the amount of time the task may take, example: 6d:10h:30m). Declaration: https://github.com/WordPress/openverse/blob/2cffcb9f8da6961e84a00854a3cd472fd0f9dad8/catalog/dags/providers/provider_workflows.py#L42-L58 Example: { "brooklyn_museum_workflow": [ { "task_id_pattern": "pull_image_data", "timeout": "10h" } ] }