The function that you are passing to .apply()
uses the start_time
field of the input argument (in the conditional checks if (time['start_time'] >= 5) and (time['start_time'] <= 8)
). So it should be applied to a DataFrame or Series that has a start_time
column.
However, before you call apply you are first calling df[['Event']]
, which returns a Series. So df[['Event']].apply()
will apply a function to the resulting Series. But when the function reaches the expression time['start_time']
, it is looking for a column called start_time
in the Series, can’t find it (because only ‘Event’ column was kept), and raises a KeyError.
The solution is to pass a DataFrame or a Series that has a start_time
column in it. In your case you want to apply the function to the entire DataFrame so replace df[['Event']]
with the whole DataFrame df
.
df = df.apply(test, axis=1)
and change your function to modify the Event
column instead of returning a value. Replace return -1
with time['Event'] = -1
and eliminate the else return time
part (i.e., don’t change anything if the conditions aren’t met).
I am using Pandas to create a new column in a data frame created from a csv.
[in] DfT_raw = pd.read_csv('./file.csv', index_col = False)
[in] print(DfT_raw)
[out] Region Name dCount ONS CP S Ref E S Ref N Road
0 East Midlands E06000015 14/04/00 00:00 37288 434400 336000 A516
1 East Midlands E06000015 14/04/00 00:00 37288 434400 336000 A516
2 East Midlands E06000015 14/04/00 00:00 37288 434400 336000 A516
3 East Midlands E06000015 14/04/00 00:00 37288 434400 336000 A516
I define a function to strip the time from the datetime fieldn (dCount) and then create a new column ‘date’
[in] def date_convert(dCount):
return dCount.date()
DfT_raw['date'] = DfT_raw.apply(lambda row: date_convert(row['dCount']), axis=1)
[out] AttributeError: ("'str' object has no attribute 'date'", u'occurred at index 0')
There is some issue with the index_col. I previously used index_col = 1 but got the same error.
When I print ‘dCount’ I get
0 14/04/00 00:00
1 14/04/00 00:00
2 14/04/00 00:00
3 14/04/00 00:00
4 14/04/00 00:00
The index column is causing the error. How do I ensure this isn’t given to the function?
Feedback
You can get the errors by doing another selection before dropping the failed lines:
If the column had an error, you could define some error text to be displayed in the table:
If you want to display the error to a log you can loop over your error table and output your message (considering the first version with boolean values in the columns):
So you will be able to process the rows with a function like this:
I also saw this question: pandas get rows which are NOT in other dataframe but removes rows which are equal in both data frames but I also did not find this useful.
Table of contents
- Error when accessing pandas dataframe index
- Python pandas data frame remove row where index name DOES NOT occurs in other data frame
- Pandas KeyError: ‘occurred at index 0’
- Adding a error log message row in pandas dataframe
- How to find the matched indexes of a Dataframe in Python?
- How to find the indices of a Dataframe in pandas?
- How do I get the index of a row in a Dataframe?
- What is the error that pandas pivot produces?
Error when accessing pandas dataframe index
Question:
I get an error when trying to access a single element in a pandas dataframe this way test_df[«LABEL»][0]. Here is a code snippet on how I am loading the data:
print "reading test set"
test_set = pd.read_csv(data_path+"small_test_products.txt", header=0, delimiter="|")
print "shape of the test set", test_set.shape
test_df = pd.DataFrame(test_set)
lengthOfTestSet = len(test_df["LABEL"])
print test_df["LABEL"][0]
Here is the error I am getting:
File «code.py», line 80, in
print test_df[«LABEL»][0] File «/usr/local/lib/python2.7/dist-packages/pandas/core/series.py», line
521, in
getitem
result = self.index.get_value(self, key) File «/usr/local/lib/python2.7/dist-packages/pandas/core/index.py», line
3562, in get_value
loc = self.get_loc(k) File «/usr/local/lib/python2.7/dist-packages/pandas/core/index.py», line
3619, in get_loc
return super(Float64Index, self).get_loc(key, method=method) File «/usr/local/lib/python2.7/dist-packages/pandas/core/index.py»,
line 1572, in get_loc
return self._engine.get_loc(_values_from_object(key)) File «pandas/index.pyx», line 134, in pandas.index.IndexEngine.get_loc
(pandas/index.c:3824) File «pandas/index.pyx», line 154, in
pandas.index.IndexEngine.get_loc (pandas/index.c:3704) File
«pandas/hashtable.pyx», line 541, in
pandas.hashtable.Float64HashTable.get_item (pandas/hashtable.c:9914)
File «pandas/hashtable.pyx», line 547, in
pandas.hashtable.Float64HashTable.get_item (pandas/hashtable.c:9852)
KeyError: 0.0
What am I missing?
Solution:
Like EdChum said 0 is probably not in your index.
Try:
df.iloc[0]
or
df['label'].iloc[0]
, which is integer based location.
To reset the index if you are having trouble with that:
df.reset_index(drop=True)
Check out panda’s indexing doc for more information on it
How a find an error occurring in rows in dataframe in, an efficient solution would be to parse the date strings to datetime, with keyword errors set to ‘coerce’.That will give NaT (not-a-time) for invalid strings. You can derive a boolean mask from that by calling .isnull() which you can then use to extract respective values.. Ex:
Python pandas data frame remove row where index name DOES NOT occurs in other data frame
Question:
I have two data frames. I want to remove rows where the indexes do not occur in both data frames.
Here is an example of the data frames:
import pandas as pd
data = {'Correlation': [1.000000, 0.607340, 0.348844]}
df = pd.DataFrame(data, columns=['Correlation'])
df = df.rename(index={0: 'GINI'})
df = df.rename(index={1: 'Central government debt, total (% of GDP)'})
df = df.rename(index={2: 'Grants and other revenue (% of revenue)'})
data_2 = {'Correlation': [1.000000, 0.607340, 0.348844, 0.309390, -0.661046]}
df_2 = pd.DataFrame(data_2, columns=['Correlation'])
df_2 = df_2.rename(index={0: 'GINI'})
df_2 = df_2.rename(index={1: 'Central government debt, total (% of GDP)'})
df_2 = df_2.rename(index={2: 'Grants and other revenue (% of revenue)'})
df_2 = df_2.rename(index={3: 'Compensation of employees (% of expense)'})
df_2 = df_2.rename(index={4: 'Central government debt, total (current LCU)'})
I have found this question: How to remove rows in a Pandas dataframe if the same row exists in another dataframe? but was unable to use it as I am trying to remove if the index name is the same.
I also saw this question: pandas get rows which are NOT in other dataframe but removes rows which are equal in both data frames but I also did not find this useful.
What I have thought to do is to transpose then concat the data frames and remove duplicate columns:
df = df.T
df_2 = df_2.T
df3 = pd.concat([df,df_2],axis = 1)
df3.iloc[: , ~df3.columns.duplicated()]
The problem with this is that it only removes one of the columns that is duplicated but I want it to remove both these columns.
Any help doing this would be much appreciated, cheers.
Solution 1:
You can just compare the indexes and use
.loc
to pull the relevant rows:
In [19]: df1 = pd.DataFrame(list(range(50)), index=range(0, 100, 2))
In [20]: df2 = pd.DataFrame(list(range(34)), index=range(0, 100, 3))
In [21]: df2.loc[df2.index.difference(df1.index)]
Out[21]:
0
3 1
9 3
15 5
21 7
27 9
33 11
39 13
45 15
51 17
57 19
63 21
69 23
75 25
81 27
87 29
93 31
99 33
Solution 2:
you can simply do this for indices in df2 but not in df1
df_2[~df_2.index.isin(df.index)]
Correlation
Compensation of employees (% of expense) 0.309390
Central government debt, total (current LCU) -0.661046
Solution 3:
I have managed to work this out by adapting the answers already submitted:
df_2[df_2.index.isin(df.index)]
Python — error when accessing pandas dataframe index, 1 Answer. Like EdChum said 0 is probably not in your index. Try: df.iloc [0] or df [‘label’].iloc [0], which is integer based location. To reset the index …
Pandas KeyError: ‘occurred at index 0’
Question:
Let’s say I have a Pandas dataframe
df
:
start_time Event
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
I want to set the value of the
Event
column to -1 when the corresponding
start_time
lies between two values, so I define this function:
def test(time):
if (time['start_time'] >= 5) and (time['start_time'] <= 8):
return -1
else:
return time
To apply this to the event column, I do the following:
df[['Event']] = df[['Event']].apply(test,axis=1)
which yields this error:
KeyError: ('start_time', 'occurred at index 0')
Why is this happening? Should be a simple fix.
Solution 1:
Simply do:
df['Event'] = df.apply(test, axis=1)['Event']
Solution 2:
The function that you are passing to
.apply()
uses the
start_time
field of the input argument (in the conditional checks
if (time['start_time'] >= 5) and (time['start_time'] <= 8)
). So it should be applied to a DataFrame or Series that has a
start_time
column.
However, before you call apply you are first calling
df[['Event']]
, which returns a Series. So
df[['Event']].apply()
will apply a function to the resulting Series. But when the function reaches the expression
time['start_time']
, it is looking for a column called
start_time
in the Series, can’t find it (because only ‘Event’ column was kept), and raises a KeyError.
The solution is to pass a DataFrame or a Series that has a
start_time
column in it. In your case you want to apply the function to the entire DataFrame so replace
df[['Event']]
with the whole DataFrame
df
.
df = df.apply(test, axis=1)
and change your function to modify the
Event
column instead of returning a value. Replace
return -1
with
time['Event'] = -1
and eliminate the
else return time
part (i.e., don’t change anything if the conditions aren’t met).
Python — Find row corresponding to timestamp in pandas, Find row corresponding to timestamp in pandas DataFrame. I am trying to find the row number corresponding to a timestamp in a pandas dataframe. I think the …
Adding a error log message row in pandas dataframe
Question:
Based on this answer,
Avoiding KeyError in Dataframe, I am able to do my validations. But I need to keep a track as to which row is failing due to which validation condition.
Is there a way where I can add a new column and provide a fail message?
My code-
valid_dict = {'name': 'WI 80 INDEMNITY 18 OPTION 1 SILVER RX $10/45/90/25%',
'issuer_id': 484,
'service_area_id': 1,
'plan_year': 2018,
'network_url': np.nan,
'formulary_url': np.nan,
'sbc_download_url': np.nan,
'treatment_cost_calculator_url': np.nan,
'promotional_label': np.nan,
'hios_plan_identifier': '99806CAAUSJ-TMP',
'type': 'MetalPlan',
'price_period': 'Monthly',
'is_age_29_plan': False,
'sort_rank_override': np.nan,
'composite_rating': False,
}
data_obj = DataService()
hios_issuer_identifer_list = data_obj.get_hios_issuer_identifer(df)
d1 = {k: v for k, v in valid_dict.items() if k in set(valid_dict.keys()) - set(df.columns)}
df1 = df.assign(**d1)
cols_url = df.columns.intersection(['network_url', 'formulary_url', 'sbc_download_url', 'treatment_cost_calculator_url'])
m1 = (df1[['name', 'issuer_id', 'service_area_id']].notnull().all(axis=1))
m2 = (df1[['promotional_label']].astype(str).apply(lambda x: (x.str.len <= 65) | x.isin(['nan'])).all(axis=1))
m3 = (df1[cols_url].astype(str).apply(lambda x: (x.str.contains('A(https?://)([a-zA-Z0-9-_])*(.)*([a-zA-Z0-9-]+).([a-zA-Z.]{2,5})(.*.*)?Z')) | x.isin(['nan'])).all(axis=1))
m4 = ((df1['plan_year'].notnull()) & (df['plan_year'].astype(str).str.isdigit()) & (df['plan_year'].astype(str).str.len() == 4))
m5 = ((df1['hios_plan_identifier'].notnull()) & (df['hios_plan_identifier'].str.len() >= 10) & (df['hios_plan_identifier'].str.contains('A(d{5}[A-Z]{2}[a-zA-Z0-9]{3,7}-TMP|d{5}[A-Z]{2}d{3,7}(-?d{2})*)Z')))
m6 = (df1['type'].isin(['MetalPlan', 'MedicarePlan', 'BasicHealthPlan', 'DualPlan', 'MedicaidPlan', 'ChipPlan']))
m7 = (df1['price_period'].isin(['Monthly', 'Yearly']))
m8 = (df1['is_age_29_plan'].astype(str).isin(['True', 'False', 'nan']))
m9 = (df1[['sort_rank_override']].astype(str).apply(lambda x: (x.str.isdigit()) | x.isin(['nan'])).all(axis=1))
m10 = (df1['composite_rating'].astype(str).isin(['True', 'False']))
m11 = (df1['hios_plan_identifier'].astype(str).str[:5].isin(hios_issuer_identifer_list))
df1 = df1[m1 & m2 & m3 & m4 & m5 & m6 & m7 & m8 & m9 & m10 & m11].drop(d1.keys(), axis=1)
merged = df.merge(df1.drop_duplicates(), how='outer', indicator=True)
merged[merged['_merge'] == 'left_only'].to_csv('logs/invalid_plan_data.csv')
return df1
Something like below-
wellthie_issuer_identifier issuer_name ... service_area_id _error
0 UHC99806 Fake Humana ... 1 failed on plan_year
Solution:
With
df1 = df1[m1 & m2 & m3 & m4 & m5 & m6 & m7 & m8 & m9 & m10 & m11].drop(d1.keys(), axis=1)
you are selecting the rows where none of your conditions failed. So clearly you will not have what you would like here, and that is ok, as this is the validated part which should not have errors.
You can get the errors by doing another selection before dropping the failed lines:
df_error = df1.copy()
df_error['error_message'] = ~m1
...
If the column had an error, you could define some error text to be displayed in the table:
df_error['failed_on_name'] = pd.where(m1, your_message_here)
If you want to display the error to a log you can loop over your error table and output your message (considering the first version with boolean values in the columns):
for _, row in df_error.iterrows():
print (error_message(dict(row)))
So you will be able to process the rows with a function like this:
def error_message(row):
row_desc = []
error_msg = []
for k, v in row.items():
if isinstance(v, bool):
if v:
error_msg.append(k)
else:
row_desc.append(v)
return 'Row ' + ' '.join(row_desc) + ' failed with errors: ' + ' '.join(error_msg)
Python — Keyword errors when trying to get a column, Teams. Q&A for work. Connect and share knowledge within a single location that is structured and easy to search. Learn more about Teams
====== [Postscript] ======
What you ultimately want to achieve
Where there is only a close value (the left column in the table below),
‘True’ if the difference between the current day and the previous day of the close value is + and the difference between the previous day and the two days before is +,
Otherwise, we want to insert ‘-‘ and ‘signal’ in the right column.
# close signal
# 0 1807 nan
# 1 1805 nan
# 2 1833-
# 3 1905 true
# 4 1915 true
==================
What i am having trouble with and the corresponding source code
When there is the following data (simplified),
I want to signal when conditions are met and when not.
# close
# 0 1807
# 1 1805
# 2 1833
# 3 1905
# 4 1915
If the latest number (# 4 in the above case) is greater than the previous number, display ‘true’,
Otherwise, I thought it would be ‘-‘ and wrote the following,
def signal (df):
if df ['close']. iloc [-1]>df ['close']. iloc [-2]:
return 'true'
else:
return '-'
df ['signal'] = df.apply (signal, axis = 1)
AttributeError: («‘float’ object has no attribute ‘iloc'», ‘occurred at index 0’)
Error was displayed.
If i call df [‘close’]. iloc [-1] alone, the correct number will be extracted.
df [‘close’]. iloc [-1]>df [‘close’]. iloc [-2] is displayed as ‘true’ if it is a single unit.
If the above conditional statement is used, an error will be displayed.
And I don’t understand the meaning of the error, so I am in trouble with how to deal with it.
Because the knowledge is still low, the understanding is wrong, insufficient, or the application is not effective.
The question itself may not be as good as it may have been, but it would be helpful if you could give me some advice.
Sorry to trouble you, but thank you.
====== [Postscript] ======
After examining various things,
In: type (df [‘close’]. iloc [-1])
Out: numpy.float64
It is displayed as
, and it seems that the cause is that pulling in .iloc is not in pandas format.
* Because the type is displayed as «pandas.core.series.Series» although it is displayed well in other similar programs
So in this case,
・ Change df [‘close’]. Iloc [-1]>df [‘close’]. Iloc [-2] to pandas format
Or,
・ Adjust at df.apply?
I thought that was possible.
I don’t know how to solve it, so if you like it, I would be grateful if you could give me a solution. I still have questions, please point out. Thank you.
Issue
I am running a spark script that needs to perform a count(*)
query 30x for every row in a dataframe. The dataframe on average has 25000 rows, which means after completing the script should have made 750000 requests/queries to the BigQuery table.
For some reason with a large amount of executors I ran into the RuntimeError
detailed in the stacktrace below, where it seems the google api core is unable to create a bigquery client. Is this because my script is creating too many clients? Apologies if my code is incorrect or I am using the client wrong, I am new to BigQuery and have not used this api before. What would be the best way to use the BIgQuery storage api in this use case?
Environment details
- PySpark script running in AWS EMR clusters, with 30+ executors
- BigQuery Storage API (python)
Steps to reproduce
- ?
Code Example Of PySpark Script
def query_cache(query):
bqclient, bqstorageclient = clients()
dataframe = (
bqclient.query(query)
.result()
.to_dataframe(bqstorage_client=bqstorageclient)
)
return dataframe['f0_'][0]
@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
query = "select count(*) from dataset.table where ...{some column filters}..."
df['count'] = df.apply(query_cache, args=(query), axis=1)
Stack trace
py4j.protocol.Py4JJavaError: An error occurred while calling o526.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 54, ip-172-31-8-118.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
result = f(pd.concat(value_series, axis=1))
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
return op.get_result()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
return self.apply_standard()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
self.apply_series_generator()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
results[i] = self.f(v)
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
return func(x, *args, **kwds)
File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
File "/home/hadoop/metrics_bq.py", line 652, in query_cache
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
bqstorage_client=bqstorage_client, dtypes=dtypes
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
for item in bqstorage_download():
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
requested_streams=requested_streams,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
request, retry=retry, timeout=timeout, metadata=metadata
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
on_error=on_error,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
return target()
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
wait_for_ready, compression)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
),), self._context)
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
result = f(pd.concat(value_series, axis=1))
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
return op.get_result()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
return self.apply_standard()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
self.apply_series_generator()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
results[i] = self.f(v)
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
return func(x, *args, **kwds)
File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
File "/home/hadoop/metrics_bq.py", line 652, in query_cache
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
bqstorage_client=bqstorage_client, dtypes=dtypes
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
for item in bqstorage_download():
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
requested_streams=requested_streams,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
request, retry=retry, timeout=timeout, metadata=metadata
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
on_error=on_error,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
return target()
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
wait_for_ready, compression)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
),), self._context)
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more