Skip to content

Commit

Permalink
[AIRFLOW-4199] Remove all sys.version_info[0] == 3 (apache#5019)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie authored and kaxil committed Apr 22, 2019
1 parent bc0b8a1 commit 8b38478
Show file tree
Hide file tree
Showing 22 changed files with 29 additions and 129 deletions.
2 changes: 0 additions & 2 deletions airflow/_vendor/slugify/slugify.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def slugify(text, entities=True, decimal=True, hexadecimal=True, max_length=0, w

# translate
text = unicodedata.normalize('NFKD', text)
if sys.version_info < (3,):
text = text.encode('ascii', 'ignore')

# make the text lowercase (optional)
if lowercase:
Expand Down
12 changes: 2 additions & 10 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from future import standard_library
import os
import shlex
import six
from six import iteritems
import subprocess
import sys
Expand Down Expand Up @@ -97,13 +96,8 @@ def run_command(command):
def _read_default_config_file(file_name):
templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
file_path = os.path.join(templates_dir, file_name)
if six.PY2:
with open(file_path) as f:
config = f.read()
return config.decode('utf-8')
else:
with open(file_path, encoding='utf-8') as f:
return f.read()
with open(file_path, encoding='utf-8') as f:
return f.read()


DEFAULT_CONFIG = _read_default_config_file('default_airflow.cfg')
Expand Down Expand Up @@ -526,8 +520,6 @@ def parameterized_config(template):
with open(AIRFLOW_CONFIG, 'w') as f:
cfg = parameterized_config(DEFAULT_CONFIG)
cfg = cfg.split(TEMPLATE_START)[-1].strip()
if six.PY2:
cfg = cfg.encode('utf8')
f.write(cfg)

log.info("Reading the config from %s", AIRFLOW_CONFIG)
Expand Down
7 changes: 1 addition & 6 deletions airflow/contrib/auth/backends/password_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# specific language governing permissions and limitations
# under the License.

from sys import version_info

import base64
import flask_login
from flask_login import login_required, current_user, logout_user # noqa: F401
Expand All @@ -42,7 +40,6 @@
login_manager.login_message = None

log = LoggingMixin().log
PY3 = version_info[0] == 3


client_auth = None
Expand All @@ -64,9 +61,7 @@ def password(self):

@password.setter
def password(self, plaintext):
self._password = generate_password_hash(plaintext, 12)
if PY3:
self._password = str(self._password, 'utf-8')
self._password = str(generate_password_hash(plaintext, 12), 'utf-8')

def authenticate(self, plaintext):
return check_password_hash(self._password, plaintext)
Expand Down
8 changes: 0 additions & 8 deletions airflow/contrib/hooks/pinot_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# specific language governing permissions and limitations
# under the License.

import six

from pinotdb import connect

from airflow.hooks.dbapi_hook import DbApiHook
Expand Down Expand Up @@ -73,9 +71,6 @@ def get_records(self, sql):
sql statements to execute
:type sql: str
"""
if six.PY2:
sql = sql.encode('utf-8')

with self.get_conn() as cur:
cur.execute(sql)
return cur.fetchall()
Expand All @@ -88,9 +83,6 @@ def get_first(self, sql):
sql statements to execute
:type sql: str or list
"""
if six.PY2:
sql = sql.encode('utf-8')

with self.get_conn() as cur:
cur.execute(sql)
return cur.fetchone()
Expand Down
7 changes: 0 additions & 7 deletions airflow/contrib/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
from airflow.configuration import conf
from six import PY2

try:
from kubernetes import config, client
Expand All @@ -36,12 +35,6 @@ def _load_kube_config(in_cluster, cluster_context, config_file):
config.load_incluster_config()
else:
config.load_kube_config(config_file=config_file, context=cluster_context)
if PY2:
# For connect_get_namespaced_pod_exec
from kubernetes.client import Configuration
configuration = Configuration()
configuration.assert_hostname = False
Configuration.set_default(configuration)
return client.CoreV1Api()


Expand Down
10 changes: 3 additions & 7 deletions airflow/contrib/operators/cassandra_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from cassandra.util import Date, Time, SortedSet, OrderedMapSerializedKey
from datetime import datetime
from decimal import Decimal
from six import text_type, binary_type, PY3
from six import text_type, binary_type
from tempfile import NamedTemporaryFile
from uuid import UUID

Expand Down Expand Up @@ -166,9 +166,7 @@ def _write_local_data_files(self, cursor):
tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
for row in cursor:
row_dict = self.generate_data_dict(row._fields, row)
s = json.dumps(row_dict)
if PY3:
s = s.encode('utf-8')
s = json.dumps(row_dict).encode('utf-8')
tmp_file_handle.write(s)

# Append newline to make dumps BigQuery compatible.
Expand All @@ -195,9 +193,7 @@ def _write_local_schema_file(self, cursor):

for name, type in zip(cursor.column_names, cursor.column_types):
schema.append(self.generate_schema_dict(name, type))
json_serialized_schema = json.dumps(schema)
if PY3:
json_serialized_schema = json_serialized_schema.encode('utf-8')
json_serialized_schema = json.dumps(schema).encode('utf-8')

tmp_schema_file_handle.write(json_serialized_schema)
return {self.schema_filename: tmp_schema_file_handle}
Expand Down
19 changes: 5 additions & 14 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# specific language governing permissions and limitations
# under the License.

import sys
import json
import time
import base64
Expand All @@ -33,8 +32,6 @@
from six import string_types
import unicodecsv as csv

PY3 = sys.version_info[0] == 3


class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""Copy data from MySQL to Google cloud storage in JSON or CSV format.
Expand Down Expand Up @@ -180,9 +177,7 @@ def _write_local_data_files(self, cursor):
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
s = json.dumps(row_dict, sort_keys=True)
if PY3:
s = s.encode('utf-8')
s = json.dumps(row_dict, sort_keys=True).encode('utf-8')
tmp_file_handle.write(s)

# Append newline to make dumps BigQuery compatible.
Expand Down Expand Up @@ -225,9 +220,9 @@ def _write_local_schema_file(self, cursor):
schema_file_mime_type = 'application/json'
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
if self.schema is not None and isinstance(self.schema, string_types):
schema_str = self.schema
schema_str = self.schema.encode('utf-8')
elif self.schema is not None and isinstance(self.schema, list):
schema_str = json.dumps(self.schema)
schema_str = json.dumps(self.schema).encode('utf-8')
else:
schema = []
for field in cursor.description:
Expand All @@ -246,9 +241,7 @@ def _write_local_schema_file(self, cursor):
'type': field_type,
'mode': field_mode,
})
schema_str = json.dumps(schema, sort_keys=True)
if PY3:
schema_str = schema_str.encode('utf-8')
schema_str = json.dumps(schema, sort_keys=True).encode('utf-8')
tmp_schema_file_handle.write(schema_str)

self.log.info('Using schema for %s: %s', self.schema_filename, schema_str)
Expand Down Expand Up @@ -288,9 +281,7 @@ def _convert_types(schema, col_type_dict, row):
elif isinstance(col_val, Decimal):
col_val = float(col_val)
elif col_type_dict.get(col_name) == "BYTES":
col_val = base64.standard_b64encode(col_val)
if PY3:
col_val = col_val.decode('ascii')
col_val = base64.standard_b64encode(col_val).decode('ascii')
else:
col_val = col_val
converted_row.append(col_val)
Expand Down
11 changes: 2 additions & 9 deletions airflow/contrib/operators/postgres_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# specific language governing permissions and limitations
# under the License.

import sys
import json
import time
import datetime
Expand All @@ -29,8 +28,6 @@
from decimal import Decimal
from tempfile import NamedTemporaryFile

PY3 = sys.version_info[0] == 3


class PostgresToGoogleCloudStorageOperator(BaseOperator):
"""
Expand Down Expand Up @@ -151,9 +148,7 @@ def _create_new_file():
row = map(self.convert_types, row)
row_dict = dict(zip(schema, row))

s = json.dumps(row_dict, sort_keys=True)
if PY3:
s = s.encode('utf-8')
s = json.dumps(row_dict, sort_keys=True).encode('utf-8')
tmp_file_handle.write(s)

# Append newline to make dumps BigQuery compatible.
Expand Down Expand Up @@ -192,9 +187,7 @@ def _write_local_schema_file(self, cursor):

self.log.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
s = json.dumps(schema, sort_keys=True)
if PY3:
s = s.encode('utf-8')
s = json.dumps(schema, sort_keys=True).encode('utf-8')
tmp_schema_file_handle.write(s)
return {self.schema_filename: tmp_schema_file_handle}

Expand Down
6 changes: 1 addition & 5 deletions airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
from collections import OrderedDict
from tempfile import NamedTemporaryFile

import six
import unicodecsv as csv
from past.builtins import basestring
from past.builtins import unicode
from six.moves import zip

from airflow import configuration
Expand Down Expand Up @@ -358,9 +356,7 @@ def _infer_field_types_from_df(df):
field_dict = _infer_field_types_from_df(df)

df.to_csv(path_or_buf=f,
sep=(delimiter.encode(encoding)
if six.PY2 and isinstance(delimiter, unicode)
else delimiter),
sep=delimiter,
header=False,
index=False,
encoding=encoding,
Expand Down
9 changes: 2 additions & 7 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
# under the License.
#
import logging
import os
import socket
from typing import Any

import six
from flask import Flask
from flask_appbuilder import AppBuilder, SQLA
from flask_caching import Cache
Expand Down Expand Up @@ -191,11 +189,8 @@ def init_plugin_blueprints(app):
# required for testing purposes otherwise the module retains
# a link to the default_auth
if app.config['TESTING']:
if six.PY2:
reload(e) # noqa
else:
import importlib
importlib.reload(e)
import importlib
importlib.reload(e)

app.register_blueprint(e.api_experimental, url_prefix='/api/experimental')

Expand Down
3 changes: 1 addition & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from flask import (
redirect, request, Markup, Response, render_template,
make_response, flash, jsonify, send_file, url_for)
from flask._compat import PY2
from flask_appbuilder import BaseView, ModelView, expose, has_access
from flask_appbuilder.actions import action
from flask_appbuilder.models.sqla.filters import BaseFilter
Expand Down Expand Up @@ -2157,7 +2156,7 @@ def action_varexport(self, items):
def varimport(self):
try:
out = request.files['file'].read()
if not PY2 and isinstance(out, bytes):
if isinstance(out, bytes):
d = json.loads(out.decode('utf-8'))
else:
d = json.loads(out)
Expand Down
3 changes: 0 additions & 3 deletions dev/airflow-jira
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ import sys
TMP_CREDENTIALS = {}
PROJECT = "AIRFLOW"

# Python 3 compatibility
if sys.version_info[0] == 3:
raw_input = input

try:
import click
Expand Down
2 changes: 0 additions & 2 deletions dev/airflow-pr
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ try:
import urllib2 as urllib
except ImportError:
import urllib.request as urllib
if sys.version_info[0] == 3:
raw_input = input

try:
import click
Expand Down
6 changes: 2 additions & 4 deletions tests/contrib/minikube/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import six

import re

try:
Expand All @@ -43,9 +43,7 @@ def get_minikube_host():
if "MINIKUBE_IP" in os.environ:
host_ip = os.environ['MINIKUBE_IP']
else:
host_ip = check_output(['/usr/local/bin/minikube', 'ip'])
if six.PY3:
host_ip = host_ip.decode('UTF-8')
host_ip = check_output(['/usr/local/bin/minikube', 'ip']).decode('UTF-8')

host = '{}:30809'.format(host_ip.strip())
return host
Expand Down
3 changes: 0 additions & 3 deletions tests/contrib/operators/test_mysql_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
# specific language governing permissions and limitations
# under the License.

import sys
import unittest

from airflow.contrib.operators.mysql_to_gcs import \
MySqlToGoogleCloudStorageOperator
from tests.compat import mock

PY3 = sys.version_info[0] == 3

TASK_ID = 'test-mysql-to-gcs'
MYSQL_CONN_ID = 'mysql_conn_test'
SQL = 'select 1'
Expand Down
3 changes: 0 additions & 3 deletions tests/contrib/operators/test_sftp_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import os
import unittest
from base64 import b64encode
import six

from airflow import configuration
from airflow import models
Expand Down Expand Up @@ -375,8 +374,6 @@ def test_arg_checking(self):
os.environ['AIRFLOW_CONN_' + conn_id.upper()] = "ssh://test_id@localhost"

# Exception should be raised if neither ssh_hook nor ssh_conn_id is provided
if six.PY2:
self.assertRaisesRegex = self.assertRaisesRegexp
with self.assertRaisesRegex(AirflowException,
"Cannot operate without ssh_hook or ssh_conn_id."):
task_0 = SFTPOperator(
Expand Down
Loading

0 comments on commit 8b38478

Please sign in to comment.