Consolidating the Data Preprocessing Steps¶
Summary¶
The CBOE published reivsed historical data, so I'm using this change to consolidate the data preprocessing scripts I've discussed in earlier posts into several small functions. I've created a module called cboe, and I've tried to streamline the loops used in those functions.
I now have two directories where I store the historical data, based on when they were downloaded (1905_SPX, 1911_SPX). Below I will echo the cboe module, then in the next post I will run a preprocessing script against the two sets of data. It turns out that there are some significant differences in the two historical extracts.
In [1]:
# The cboe module
# Import module dependencies
import os
import csv
# CBOE Option Chain Column Names
col_names = (
'underlying_symbol',
'quote_date',
'root',
'expiration',
'strike',
'option_type',
'open',
'high',
'low',
'close',
'trade_volume',
'bid_size_1545',
'bid_1545',
'ask_size_1545',
'ask_1545',
'underlying_bid_1545',
'underlying_ask_1545',
'implied_underlying_price_1545',
'active_underlying_price_1545',
'implied_volatility_1545',
'delta_1545',
'gamma_1545',
'theta_1545',
'vega_1545',
'rho_1545',
'bid_size_eod',
'bid_eod',
'ask_size_eod',
'ask_eod',
'underlying_bid_eod',
'underlying_ask_eod',
'vwap',
'open_interest',
'delivery_code',
)
# CBOE Option Chain Column Types
col_types = {
'underlying_symbol': str,
'quote_date': str,
'root': str,
'expiration': str,
'strike': float,
'option_type': str,
'open': float,
'high': float,
'low': float,
'close': float,
'trade_volume': float,
'bid_size_1545': float,
'bid_1545': float,
'ask_size_1545': float,
'ask_1545': float,
'underlying_bid_1545': float,
'underlying_ask_1545': float,
'implied_underlying_price_1545': float,
'active_underlying_price_1545': float,
'implied_volatility_1545': float,
'delta_1545': float,
'gamma_1545': float,
'theta_1545': float,
'vega_1545': float,
'rho_1545': float,
'bid_size_eod': float,
'bid_eod': float,
'ask_size_eod': float,
'ask_eod': float,
'underlying_bid_eod': float,
'underlying_ask_eod': float,
'vwap': float,
'open_interest': float,
'delivery_code': str,
}
def get_cboe_csv_file_list(path, suffix='csv'):
"""
Purpose:
Takes an input path, and extracts all of the csv files in
that directory
Input:
path: The path to the serach directory
Ouput:
A list of csv files in the directory. Each file in the
list is prepended with the full qualified path name.
"""
file_list = [''.join([path, '/', dir_file]) for dir_file in
sorted(os.listdir(path)) if dir_file.endswith(suffix)]
return file_list
def count_cboe_csv_file_lines(file_list):
"""
Purpose:
Takes a list of files as an input and counts the number of lines
in each file.
Input:
file_list: A list of input files
Output:
A list of file line counts. The lenght of the list equals the
number of scanned files.
"""
num_lines = []
num_files = len(file_list)
for i, file_name in enumerate(file_list):
num_lines.append(len(open(file_name).readlines()))
if (i % 10 == 0):
print('Counted lines for {} files of {}'.format(i, num_files))
print("Total lines in {} files = {}".format(num_files, sum(num_lines)))
return num_lines
def count_cboe_csv_file_lines_v02(file_list):
"""
Purpose:
Takes a list of files as an input and counts the number of lines
in each file.
Input:
file_list: A list of input files
Output:
A list of file line counts. The lenght of the list equals the
number of scanned files.
Note:
This is an alternate version of count_cboe_csv_file_lines() that
uses a list comprehension as a counting procedure
"""
num_lines = []
num_files = len(file_list)
for i, file_name in enumerate(file_list):
num_lines.append(sum([1 for line in open(file_name, 'r')]))
if ((i+1) % 10 == 0):
print('Counted lines for {} files of {}'.format(i, num_files))
print("Total lines in {} files = {}".format(num_files, sum(num_lines)))
return num_lines
def concat_cboe_csv_files(csv_list, output_file=''):
"""
Purpose:
Takes a list of files as an input and concatenates the files,
taking care to not repeat the header.
Input:
csv_list: A list of csv files to concatenate
Output:
output_file: The concatenated csv file
"""
if (not output_file):
print('Error: Enter an output file')
print('Usage: concat_cboe_csv_files(csv_list, output_file)')
return
else:
print('Concatenating {} files ...'.format(len(csv_list)))
with open(output_file, 'w') as outfile:
for i, file_name in enumerate(csv_list):
# print('Concat file {} of {}'.format(i+1, len(csv_list)))
if (i == 0):
with open(file_name) as infile:
for j, line in enumerate(infile):
outfile.write(line)
else:
with open(file_name) as infile:
for j, line in enumerate(infile):
if (j > 0):
outfile.write(line)
def check_line(s, n, cnames, ctypes):
"""
Purpose:
Takes comma delimited line from a CBOE historical option chain data
.csv file, splits it, and then tries to check the column type based
on a passed dictionary of the format:
{ <column_name> : <column_type> }
The returned value consists of N 4-tuples embedded in a list. Each
4-tuple contains the location information regarding the non-standard
column entry (i.e., the row number, column number, column name) and it
also contains the value of the non-standard field. The value of N can
range from 0 (i.e., no non-standard fields) up to the total number of
integer and float columns in the column data type dictionary.
Input:
s: A line read from a csv file (string)
n: The line number of the string passed as s
cnames: A tuple of column names
ctypes: A dictionary of column types, with column names as keys
Output:
bad_value: A list of 4-tuples, where each 4-tuple contains (i) the
row number with the bad field, (ii) the column number of the bad
field, (iii) the column name of the bad field, and (iv) the bad
string that appeared in the field.
"""
# split the input line
s = s.rstrip().split(sep=',')
# number of string elements, number of column names
ns = len(s)
nc = len(cnames)
# lists to capture the value and location of unexpected input
bad_value = []
# use an index to loop over all split elements
if (ns == nc):
for i in range(ns):
tmp_s = s[i]
tmp_typ = ctypes[cnames[i]]
# only test columns that are not expected to house a string,
# but if there is a type error in that column, then capture the
# location, column name, and the offending value as a tuple
if (tmp_typ != str):
if (tmp_typ == float):
try:
float(tmp_s)
except ValueError:
bad_value.append((n, i, cnames[i], tmp_s))
elif (tmp_typ == int):
try:
int(tmp_s)
except ValueError:
bad_value.append((n, i, cnames[i], tmp_s))
else:
print('Warning: Column type not recognized\n')
exit()
else:
print('Warning: Unequal string length and column length\n')
exit()
# Default return is an empty list, else it houses info on the bad value
return bad_value
def check_cboe_file(cboe_csv_file, cnames, ctypes):
"""
Purpose:
Takes as input a CBOE data file as well as the accompanying column
names and column types. It then steps through each lnon-header ine in
the file, checks the contents of each assumed-to-be float or integer
column, and then identifies the location and content of any input
that differs from pure numeric. For example, in various greek fields
there are the occassional "NAN" type fields. It returns a dictionary
housing the unique bad row/values
Input:
cboe_csv_file: The name of the CBOE file to process
cnames: A tuple of column names
ctypes: A dictionary of column types, with column names as keys
Output:
bad_values: A list housing all of the identified bad values in the
file. Each list entry is a 4-tuple returned from check_line()
bad_dict: A dictionary that includes the unique set of (i) bad rows,
(ii) bad columns, and (iii) bad text located in the input file.
"""
bad_values = []
with open(cboe_csv_file, 'r') as infile:
for j, line in enumerate(infile):
# skip the header line
if (j > 0):
# Echo output
if (j % 500000 == 0):
print('Scanned {} rows ...'.format(j))
# Check a line; Return information on bad values detected
tmp_bad = check_line(s=line, n=j, cnames=cnames, ctypes=ctypes)
# check to see if tmp_bad is not empty
if (tmp_bad != []):
# Note the use of extend() ... the output of
# cboe.check_line() can be N 4-tuples, so we use
# extend() to iterate over each tuple element and
# append each to the output object
bad_values.extend(tmp_bad)
# collect unique items
uniq_bad_rows = set([item[0] for item in bad_values])
uniq_bad_cols = set([item[2] for item in bad_values])
uniq_bad_text = set([item[3] for item in bad_values])
# capture unique items in a dictionary of sets
bad_dict = {'bad_rows': uniq_bad_rows,
'bad_cols': uniq_bad_cols,
'bad_text': uniq_bad_text}
# Echo results
print('-' * 80)
print('Number of unique affected rows = ', len(bad_dict['bad_rows']))
print('Unique offending columns = ', bad_dict['bad_cols'])
print('Unique offending field entries = ', bad_dict['bad_text'])
print('-' * 80)
return bad_values, bad_dict
def clean_cboe_file(input_file, output_file, bad_rows, bad_text):
"""
Purpose:
Takes as input a CBOE csv file, the name of a cleaned version of the
CBOE csv file, and a set of known "bad values" that need to be
replaced in the file.
Input:
input_file: The name of the csv file to clean
output_file: The name of the cleaned csv file
bad_rows: A list of the rows that contain text to correct
bad_text: A list of the actual text that needs to be replaced
Output:
A file that has the identified bad text values replaced with
a special value (here: -999999) that is numeric and unlikely
to collide with known good values.
"""
# Define a counter
bad_count = 0
# Open the file to write via the csv module
with open(output_file, 'w', newline='') as outfile:
csv_writer = csv.writer(outfile)
# Open the input file to read / process
with open(input_file, 'r') as infile:
for j, line in enumerate(infile):
s = line.rstrip().split(sep=',')
# Echo progress
if (j % 500000 == 0):
print('Reviewed / cleaned {} rows ...'.format(j))
# if this is a known bad row, then replace the bad text with
# a dummy value ... otherwise just write the row to a file
if (j in bad_rows):
s_set = set(s)
if (s_set.isdisjoint(bad_text)):
# Echo the original line on a no-hit
csv_writer.writerow(s)
else:
# do processing here b/c there was a bad value; use a
# list comprehension approach where each element
# (s_i_ of the split string (s) is tested for
# membership in the bad_text
new_s = [-999999 if (s_i in bad_text)
else s_i for s_i in s]
csv_writer.writerow(new_s)
bad_count += 1
else:
csv_writer.writerow(s)
# Echo results
print('-' * 80)
print('Number of modified rows {}'.format(bad_count))
print('-' * 80)
return bad_count
No comments:
Post a Comment