Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
criccomini committed Jul 21, 2017
2 parents 173860a + b49986c commit f1f0d23
Show file tree
Hide file tree
Showing 25 changed files with 1,198 additions and 617 deletions.
9 changes: 8 additions & 1 deletion UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ assists people when migrating to a new version.

## Master

### SSH Hook updates, along with new SSH Operator & SFTP Operator
SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
- update SSHHook constructor
- use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue work as is.

### New Features

#### Dask Executor
Expand Down Expand Up @@ -116,7 +123,7 @@ If you experience problems connecting with your operator make sure you set the c

Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
account.

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0
Expand Down
81 changes: 43 additions & 38 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
from sqlalchemy import func
from sqlalchemy.orm import exc


api.load_auth()
api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
Expand Down Expand Up @@ -316,7 +315,7 @@ def run(args, dag=None):
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
conf_dict = json.load(conf_file)
conf_dict = json.load(conf_file)

if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)
Expand All @@ -327,6 +326,21 @@ def run(args, dag=None):
settings.configure_vars()
settings.configure_orm()

if not args.pickle and not dag:
dag = get_dag(args)
elif not dag:
session = settings.Session()
logging.info('Loading pickle id {args.pickle}'.format(args=args))
dag_pickle = session.query(
DagPickle).filter(DagPickle.id == args.pickle).first()
if not dag_pickle:
raise AirflowException("Who hid the pickle!? [missing pickle]")
dag = dag_pickle.pickle

task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()

logging.root.handlers = []
if args.raw:
# Output to STDOUT for the parent process to read and log
Expand All @@ -350,19 +364,23 @@ def run(args, dag=None):
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
try_number = ti.try_number
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id,
args.execution_date)
directory = os.path.join(log_base, log_relative_dir)
# Create the log file and give it group writable permissions
# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
if not os.path.exists(directory):
if not os.path.isdir(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o775)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())
log_relative = logging_utils.get_log_filename(
args.dag_id, args.task_id, args.execution_date, try_number)
filename = os.path.join(log_base, log_relative)

if not os.path.exists(filename):
open(filename, "a").close()
Expand All @@ -376,21 +394,6 @@ def run(args, dag=None):
hostname = socket.getfqdn()
logging.info("Running on host {}".format(hostname))

if not args.pickle and not dag:
dag = get_dag(args)
elif not dag:
session = settings.Session()
logging.info('Loading pickle id {args.pickle}'.format(**locals()))
dag_pickle = session.query(
DagPickle).filter(DagPickle.id == args.pickle).first()
if not dag_pickle:
raise AirflowException("Who hid the pickle!? [missing pickle]")
dag = dag_pickle.pickle
task = dag.get_task(task_id=args.task_id)

ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()

if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
Expand Down Expand Up @@ -424,8 +427,8 @@ def run(args, dag=None):
session.commit()
pickle_id = pickle.id
print((
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals()))
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals()))
except Exception as e:
print('Could not pickle the DAG')
print(e)
Expand Down Expand Up @@ -475,7 +478,8 @@ def run(args, dag=None):
with open(filename, 'r') as logfile:
log = logfile.read()

remote_log_location = filename.replace(log_base, remote_base)
remote_log_location = os.path.join(remote_base, log_relative)
logging.debug("Uploading to remote log location {}".format(remote_log_location))
# S3
if remote_base.startswith('s3:/'):
logging_utils.S3Log().write(log, remote_log_location)
Expand Down Expand Up @@ -669,10 +673,10 @@ def start_refresh(gunicorn_master_proc):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))
get_num_workers_running(gunicorn_master_proc))

wait_until_true(lambda: num_workers_expected ==
get_num_workers_running(gunicorn_master_proc))
get_num_workers_running(gunicorn_master_proc))

while True:
num_workers_running = get_num_workers_running(gunicorn_master_proc)
Expand All @@ -695,7 +699,7 @@ def start_refresh(gunicorn_master_proc):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
excess -= 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))
get_num_workers_running(gunicorn_master_proc))

# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
Expand Down Expand Up @@ -887,6 +891,7 @@ def serve_logs(filename): # noqa
filename,
mimetype="application/json",
as_attachment=False)

WORKER_LOG_SERVER_PORT = \
int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
flask_app.run(
Expand Down Expand Up @@ -947,8 +952,8 @@ def initdb(args): # noqa
def resetdb(args):
print("DB: " + repr(settings.engine.url))
if args.yes or input(
"This will drop existing tables if they exist. "
"Proceed? (y/n)").upper() == "Y":
"This will drop existing tables if they exist. "
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
db_utils.resetdb()
Expand All @@ -966,7 +971,7 @@ def upgradedb(args): # noqa
if not ds_rows:
qry = (
session.query(DagRun.dag_id, DagRun.state, func.count('*'))
.group_by(DagRun.dag_id, DagRun.state)
.group_by(DagRun.dag_id, DagRun.state)
)
for dag_id, state, count in qry:
session.add(DagStat(dag_id=dag_id, state=state, count=count))
Expand Down Expand Up @@ -1065,8 +1070,8 @@ def connections(args):

session = settings.Session()
if not (session
.query(Connection)
.filter(Connection.conn_id == new_conn.conn_id).first()):
.query(Connection)
.filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
Expand Down Expand Up @@ -1168,16 +1173,16 @@ class CLIFactory(object):
'dry_run': Arg(
("-dr", "--dry_run"), "Perform a dry run", "store_true"),
'pid': Arg(
("--pid", ), "PID file location",
("--pid",), "PID file location",
nargs='?'),
'daemon': Arg(
("-D", "--daemon"), "Daemonize instead of running "
"in the foreground",
"store_true"),
'stderr': Arg(
("--stderr", ), "Redirect stderr to this file"),
("--stderr",), "Redirect stderr to this file"),
'stdout': Arg(
("--stdout", ), "Redirect stdout to this file"),
("--stdout",), "Redirect stdout to this file"),
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),

Expand Down Expand Up @@ -1333,19 +1338,19 @@ class CLIFactory(object):
"Serialized pickle object of the entire dag (used internally)"),
'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
'cfg_path': Arg(
("--cfg_path", ), "Path to config file to use instead of airflow.cfg"),
("--cfg_path",), "Path to config file to use instead of airflow.cfg"),
# webserver
'port': Arg(
("-p", "--port"),
default=conf.get('webserver', 'WEB_SERVER_PORT'),
type=int,
help="The port on which to run the server"),
'ssl_cert': Arg(
("--ssl_cert", ),
("--ssl_cert",),
default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
help="Path to the SSL certificate for the webserver"),
'ssl_key': Arg(
("--ssl_key", ),
("--ssl_key",),
default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
help="Path to the key to use with the SSL certificate"),
'workers': Arg(
Expand Down
Loading

0 comments on commit f1f0d23

Please sign in to comment.