您当前的位置: 首页 >  Python

嗨学编程

暂无认证

  • 1浏览

    0关注

    1405博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

用Python实现邮件发送Hive明细数据

嗨学编程 发布时间:2019-08-12 15:06:41 ,浏览量:1

一、需求描述

客户需要每周周一接收特定的活动数据,生成Excel或是CSV文件,并通过邮件发送给指定接收者。需求初步分析得出如下结论:

1.客户所要的数据并不太复杂,无须通过特殊处理,可以简单的认为就是SQL查询结果输出 2.查询结果输出CSV文件,及邮件发送技术相对成熟,通用性强 3.Linux系统的Crond服务支持定时任务

二、系统环境要求
  • Linux CentOS6.x
  • Hadoop2.x
  • Python2.7
标题三、Python依赖库
  • PyHive
  • ppytools2
  • thrift
  • thrift-sasl
  • sasl

####1.全局配置settings.py

from ppytools.cfgreader import ConfReader

import logging
'''
遇到不懂的问题?Python学习交流群:821460695满足你的需求,资料都已经上传群文件,可以自行下载!
'''
'''日志输出格式
'''
logging.basicConfig(level=logging.INFO,
                    encoding='UTF-8', format='%(asctime)s [%(levelname)s] {%(name)-10s} - %(message)s')


class ProjectConfig(object):
    """ProjectConfig
    """
    def __init__(self, *conf_paths):
        self.cr = ConfReader(*conf_paths)

    def getHiveConf(self):
        return self.cr.getValues('HiveServer')

    def getEmailServer(self):
        return self.cr.getValues('EmailServer')

    def getJobInfo(self):
        return self.cr.getValues('JobInfo')

    def getCSVFolder(self):
        return self.cr.getValues('CSVFolder')['folder']

    def getCSVFile(self):
        return self.cr.getValues('CSVFile')

    def getCSVHead(self):
        return self.cr.getValues('CSVHead')

    def getHQLScript(self):
        return self.cr.getValues('HQLScript')

    def getEmailInfo(self):
        return self.cr.getValues('EmailInfo')

####2.核心代码 main.py

from hiveemailjob.settings import ProjectConfig
from ppytools.csvhelper import write
from ppytools.emailclient import EmailClient
from ppytools.hiveclient import HiveClient
from ppytools.lang.timerhelper import timeMeter

import datetime
import logging
import sys
import time

logger = logging.getLogger(__name__)


def build_rk(ts):
    """
    Build HBase row key value
    :param ts: date time
    :return: row key
    """
    return hex(int(time.mktime(ts.timetuple())*1000))[2:]

def email_att(folder, name):
    return '{}/{}_{}.csv'.format(folder, name, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

@timeMeter()
def run(args):
    """
    Email Job program execute entrance
    :param args:
    1. job file file path
    2. start time, format: 2018-01-30 17:09:38 (not require)
    3. stop time (not require)
    :return: Empty
    """
    '''
    Read system args start
    '''

    args_len = len(args)
    if args_len is not 2 and args_len is not 4:
        logger.error('Enter args is error. Please check!!!')
        logger.error('1: job file path.')
        logger.error('2: start time, format: 2018-01-30 17:09:38(option)')
        logger.error('3: stop time(option)')
        sys.exit(1)
    elif args == 4:
        try:
            start_time = datetime.datetime.strptime(args[2], '%Y-%m-%d %H:%M:%S')
            stop_time = datetime.datetime.strptime(args[3], '%Y-%m-%d %H:%M:%S')
        except Exception, e:
            raise RuntimeError('Parse start or stop time failed!!!\n', e)
    else:
        stop_time = datetime.date.today()
        start_time = stop_time - datetime.timedelta(days=1)

    job_file = args[1]
    start_rk = build_rk(start_time)
    stop_rk = build_rk(stop_time)

    '''System settings files (hard code)
    '''
    hive_conf = '/etc/pythoncfg/hive.ini'
    email_conf = '/etc/pythoncfg/email.ini'
    sets = ProjectConfig(hive_conf, email_conf, job_file)

    job_info = sets.getJobInfo()
    csv_folder = sets.getCSVFolder()
    logger.info('Now running %s Email Job...', job_info['title'])
    logger.info('Start time: %s', start_time)
    logger.info('Stop time: %s', stop_time)

    hc = HiveClient(**sets.getHiveConf())

    csv_file = sets.getCSVFile().items()
    csv_file.sort()
    file_list = []
    logger.info('File name list: ')
    for (k, v) in csv_file:
        logging.info('%s: %s', k, v)
        file_list.append(v)

    csv_head = sets.getCSVHead().items()
    csv_head.sort()
    head_list = []
    logger.info('CSV file head list: ')
    for (k, v) in csv_head:
        logging.info('%s: %s', k, v)
        head_list.append(v)

    hql_scripts = sets.getHQLScript().items()
    hql_scripts.sort()
    email_atts = []
    index = 0
    for (k, hql) in hql_scripts:
        logging.info('%s: %s', k, hql)
        '''Please instance of your logic in here.
        '''
        result, size = hc.execQuery(hql.format(start_rk, stop_rk))
        if size is 0:
            logging.info('The above HQL script not found any data!!!')
        else:
            csv_file = email_att(csv_folder, file_list[index])
            email_atts.append(csv_file)
            write(csv_file, head_list[index].split(','), result)

        index += 1

    '''Flush Hive Server connected.
    '''
    hc.closeConn()

    email_sub = sets.getEmailInfo()['subject'] % start_time
    email_body = sets.getEmailInfo()['body']
    email_to = sets.getEmailInfo()['to'].split(';')
    email_cc = sets.getEmailInfo()['cc'].split(';')

    if len(email_atts) == 0:
        email_body = '抱歉当前未找到任何数据。\n\n' + email_body


    ec = EmailClient(**sets.getEmailServer())
    ec.send(email_to, email_cc, email_sub, email_body, email_atts, False)
    ec.quit()

    logger.info('Finished %s Email Job.', job_info['title'])

####3.系统配置文件hive.ini与email.ini

# /etc/pythoncfg/hive.ini

[HiveServer]
host=127.0.0.1
port=10000
user=hive
db=default

# /etc/pythoncfg/email.ini

[EmailServer]
server=mail.163.com
port=25
user=elkan1788@gmail.com
passwd=xxxxxx
mode=TSL

注意: 需要在指定的目录/etc/pythoncfg/下配置上述两个文件。

####4.邮件Job配置参考emailjob.ini

[JobInfo]
title=邮件报表任务测试

[CSVFolder]
folder=/opt/csv_files/

# Please notice that CSVFile,CSVHead,HQLScript must be same length.
# And suggest that use prefix+number to flag and write.
[CSVFile]
file1=省份分组统计
file2=城市分组统计

[CSVHead]
head1=省份,累计
head2=省份,城市,累计

[HQLScript]
script1=select cn_state,count(1) m from ext_act_ja1
script2=select cn_state,cn_city,count(1) m from ext_act_ja2

[EmailInfo]
to=elkan1788@gmail.com;
cc=2292706174@qq.com;
# %s it will replace as the start date.
subject=%s区域抽奖统计[测试]
body=此邮件由系统自动发送,请勿回复,谢谢!

注意: CSVFile,CSVHead,HQLScript的数量要保持一致,也包括顺序,建议使用前缀+数字的格式命名。

####5.Bin文件hive-emailjob.py

from hiveemailjob import main

import sys

if __name__ == '__main__':
    main.run(sys.argv)

####6.执行效果 在系统终端中敲入python -u bin/hive_email_job.py即可,输出如下:

2018-02-20 16:28:21,561 [INFO] {__main__  } - Now running 邮件报表任务测试 Email Job...
2018-02-20 16:28:21,561 [INFO] {__main__  } - Start time: 2018-02-22
2018-02-20 16:28:21,562 [INFO] {__main__  } - Stop time: 2018-02-20
2018-02-20 16:28:21,691 [INFO] {pyhive.hive} - USE `default`
2018-02-20 16:28:21,731 [INFO] {ppytools.hive_client} - Hive server connect is ready. Transport open: True
2018-02-20 16:28:31,957 [INFO] {ppytools.email_client} - Email SMTP server connect ready.
2018-02-20 16:28:31,957 [INFO] {root      } - File name list:
2018-02-20 16:28:31,957 [INFO] {root      } - file1: 省份分组统计
2018-02-20 16:28:31,957 [INFO] {root      } - file2: 城市分组统计
2018-02-20 16:28:31,957 [INFO] {root      } - CSV file head list:
2018-02-20 16:28:31,957 [INFO] {root      } - head1: 省份,累计
2018-02-20 16:28:31,957 [INFO] {root      } - head2: 省份,城市,累计
2018-02-20 16:28:31,957 [INFO] {root      } - script1: select cn_state,count(1) m from ext_act_ja2
2018-02-20 16:28:31,958 [INFO] {pyhive.hive} - select cn_state,count(1) m from ext_act_ja2
2018-02-20 16:29:04,258 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 31
2018-02-20 16:29:04,259 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 32.3012499809 seconds.
2018-02-20 16:29:04,261 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/省份分组统计_20180223162904.csv
2018-02-20 16:29:04,262 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 0.00222992897034 seconds.
2018-02-20 16:29:04,262 [INFO] {root      } - script2: select cn_state,cn_city,count(1) m from ext_act_ja2
2018-02-20 16:29:04,262 [INFO] {pyhive.hive} - select cn_state,cn_city,count(1) m from ext_act_ja2
2018-02-20 16:29:23,462 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 367
2018-02-20 16:29:23,463 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 19.2005498409 seconds.
2018-02-20 16:29:23,465 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/城市分组统计_20180223162923.csv
2018-02-20 16:29:23,465 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 0.00227284431458 seconds.
2018-02-20 16:29:23,669 [INFO] {ppytools.email_client} - Send email[2018-02-22区域抽奖统计[测试]] success. To users: elkan1788@163.com.
2018-02-20 16:29:23,669 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 0.204078912735 seconds.
2018-02-20 16:29:23,714 [INFO] {__main__  } - Finished 邮件报表任务测试 Email Job.
2018-02-20 16:29:23,715 [INFO] {ppytools.lang.timer_helper} - Execute  method cost 62.1566159725 seconds.
关注
打赏
1663681728
查看更多评论
立即登录/注册

微信扫码登录

0.0595s