Sophie

Sophie

distrib > Mandriva > 2010.1 > x86_64 > by-pkgid > 198ec980a14476b3528ef36124bdce0e > files > 664

python-gdata-2.0.9-1mdv2010.1.noarch.rpm

#!/usr/bin/python
#
# Copyright (C) 2009 Tan Swee Heng
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


__author__ = 'thesweeheng@gmail.com'


from gdata.finance.service import \
    FinanceService, PortfolioQuery, PositionQuery
from gdata.finance import \
    PortfolioEntry, PortfolioData, TransactionEntry, TransactionData, \
    Price, Commission, Money
import datetime
import sys


def PrintReturns(pfx, d):
  """Print returns."""
  print pfx, '%1.5f(1w)  %1.5f(4w)  %1.5f(3m)  %1.5f(YTD)' % tuple(
    float(i) for i in (d.return1w, d.return4w, d.return3m, d.returnYTD))
  pfx = ' ' * len(pfx)
  print pfx, '%1.5f(1y)  %1.5f(3y)  %1.5f(5y)  %1.5f(overall)' % tuple(
    float(i) for i in (d.return1y, d.return3y, d.return5y, d.return_overall))


PrRtn = PrintReturns


def PrintTransactions(transactions):
  """Print transactions."""
  print "    Transactions:"
  fmt = '    %4s %-23s %-10s %6s %-11s %-11s'
  print fmt % ('ID','Date','Type','Shares','Price','Commission')
  for txn in transactions:
    d = txn.transaction_data
    print fmt % (txn.transaction_id, d.date or '----', d.type,
        d.shares, d.price.money[0], d.commission.money[0])
    if d.notes:
        print "         Notes:", d.notes
  print


def PrintPosition(pos, with_returns=False):
  """Print single position."""
  print '    Position      :', pos.position_title
  print '    Ticker ID     :', pos.ticker_id
  print '    Symbol        :', pos.symbol
  print '    Last updated  :', pos.updated.text
  d = pos.position_data
  print '    Shares        :', d.shares
  if with_returns:
    print '    Gain %        :', d.gain_percentage
    PrRtn('    Returns       :', d)
    print '    Cost basis    :', d.cost_basis
    print '    Days gain     :', d.days_gain
    print '    Gain          :', d.gain
    print '    Market value  :', d.market_value
  print
  if pos.transactions:
    print "    <inlined transactions>\n"
    PrintTransactions(pos.transactions)
    print "    </inlined transactions>\n"


def PrintPositions(positions, with_returns=False):
  for pos in positions:
    PrintPosition(pos, with_returns)


def PrintPortfolio(pfl, with_returns=False):
  """Print single portfolio."""
  print 'Portfolio Title   :', pfl.portfolio_title
  print 'Portfolio ID      :', pfl.portfolio_id
  print '  Last updated    :', pfl.updated.text
  d = pfl.portfolio_data
  print '  Currency        :', d.currency_code
  if with_returns:
    print '  Gain %          :', d.gain_percentage
    PrRtn('  Returns         :', d)
    print '  Cost basis      :', d.cost_basis
    print '  Days gain       :', d.days_gain
    print '  Gain            :', d.gain
    print '  Market value    :', d.market_value
  print
  if pfl.positions:
    print "  <inlined positions>\n"
    PrintPositions(pfl.positions, with_returns)
    print "  </inlined positions>\n"


def PrintPortfolios(portfolios, with_returns=False):
  for pfl in portfolios:
    PrintPortfolio(pfl, with_returns)


def ShowCallDetails(meth):
  def wrap(*args, **kwargs):
    print '@', meth.__name__, args[1:], kwargs
    meth(*args, **kwargs)
  return wrap


class FinanceTester(object):

  def __init__(self, email, password):
    self.client = FinanceService(source='gdata-finance-test')
    self.client.ClientLogin(email, password)
  
  def GetPortfolios(self, with_returns=False, inline_positions=False): 
    query = PortfolioQuery()
    query.returns = with_returns
    query.positions = inline_positions
    return self.client.GetPortfolioFeed(query=query).entry

  def GetPositions(self, portfolio, with_returns=False, inline_transactions=False):
    query = PositionQuery()
    query.returns = with_returns
    query.transactions = inline_transactions
    return self.client.GetPositionFeed(portfolio, query=query).entry

  def GetTransactions(self, position=None, portfolio=None, ticker=None):
    if position:
      feed = self.client.GetTransactionFeed(position)
    elif portfolio and ticker:
      feed = self.client.GetTransactionFeed(
          portfolio_id=portfolio.portfolio_id, ticker_id=ticker)
    return feed.entry

  @ShowCallDetails
  def TestShowDetails(self, with_returns=False, inline_positions=False,
      inline_transactions=False):
    portfolios = self.GetPortfolios(with_returns, inline_positions)
    for pfl in portfolios:
      PrintPortfolio(pfl, with_returns)
      positions = self.GetPositions(pfl, with_returns, inline_transactions)
      for pos in positions:
        PrintPosition(pos, with_returns)
        PrintTransactions(self.GetTransactions(pos))

  def DeletePortfoliosByName(self, portfolio_titles):
    for pfl in self.GetPortfolios():
      if pfl.portfolio_title in portfolio_titles:
        self.client.DeletePortfolio(pfl)

  def AddPortfolio(self, portfolio_title, currency_code):
    pfl = PortfolioEntry(portfolio_data=PortfolioData(
        currency_code=currency_code))
    pfl.portfolio_title = portfolio_title
    return self.client.AddPortfolio(pfl)

  def UpdatePortfolio(self, portfolio,
      portfolio_title=None, currency_code=None):
    if portfolio_title:
      portfolio.portfolio_title = portfolio_title
    if currency_code:
      portfolio.portfolio_data.currency_code = currency_code
    return self.client.UpdatePortfolio(portfolio)

  def DeletePortfolio(self, portfolio):
    self.client.DeletePortfolio(portfolio)

  @ShowCallDetails
  def TestManagePortfolios(self):
    pfl_one = 'Portfolio Test: Emerging Markets 12345'
    pfl_two = 'Portfolio Test: Renewable Energy 31415'

    print '---- Deleting portfolios ----'
    self.DeletePortfoliosByName([pfl_one, pfl_two])
    PrintPortfolios(self.GetPortfolios())
    print '---- Adding new portfolio ----'
    pfl = self.AddPortfolio(pfl_one, 'SGD')
    PrintPortfolios(self.GetPortfolios())
    print '---- Changing portfolio title and currency code ----'
    pfl = self.UpdatePortfolio(pfl, pfl_two, 'USD')
    PrintPortfolios(self.GetPortfolios())
    print '---- Deleting portfolio ----'
    self.DeletePortfolio(pfl)
    PrintPortfolios(self.GetPortfolios())

  def Transact(self, type, portfolio, ticker, date=None, shares=None,
      notes=None, price=None, commission=None, currency_code=None):
    if price is not None:
      price = Price(money=[Money(amount=str(price),
          currency_code=currency_code or
          portfolio.portfolio_data.currency_code)])
    if commission is not None:
      commission = Commission(money=[Money(amount=str(comission),
          currency_code=currency_code or
          portfolio.portfolio_data.currency_code)])
    if date is not None and isinstance(date, datetime.datetime):
      date = date.isoformat()
    if shares is not None:
      shares = str(shares)
    txn = TransactionEntry(transaction_data=TransactionData(type=type,
        date=date, shares=shares, notes=notes, price=price,
        commission=commission))
    return self.client.AddTransaction(txn,
        portfolio_id=portfolio.portfolio_id, ticker_id=ticker)
  
  def Buy(self, portfolio, ticker, **kwargs):
    return self.Transact('Buy', portfolio, ticker, **kwargs)
  
  def Sell(self, portfolio, ticker, **kwargs):
    return self.Transact('Sell', portfolio, ticker, **kwargs)

  def GetPosition(self, portfolio, ticker, with_returns=False, inline_transactions=False):
    query = PositionQuery()
    query.returns = with_returns
    query.transactions = inline_transactions
    return self.client.GetPosition(
        portfolio_id=portfolio.portfolio_id, ticker_id=ticker, query=query)

  def DeletePosition(self, position):
    self.client.DeletePosition(position_entry=position)

  def UpdateTransaction(self, transaction):
    self.client.UpdateTransaction(transaction)

  def DeleteTransaction(self, transaction):
    self.client.DeleteTransaction(transaction)

  @ShowCallDetails
  def TestManageTransactions(self):
    pfl_title = 'Transaction Test: Technology 27182'
    self.DeletePortfoliosByName([pfl_title])

    print '---- Adding new portfolio ----'
    pfl = self.AddPortfolio(pfl_title, 'USD')
    PrintPortfolios(self.GetPortfolios())

    print '---- Adding buy transactions ----'
    tkr1 = 'NASDAQ:GOOG'
    date = datetime.datetime(2009,04,01)
    days = datetime.timedelta(1)
    txn1 = self.Buy(pfl, tkr1, shares=500, price=321.00, date=date)
    txn2 = self.Buy(pfl, tkr1, shares=150, price=312.00, date=date+15*days)
    pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
    PrintPosition(pos, with_returns=True)
    PrintTransactions(self.GetTransactions(pos))

    print '---- Adding sell transactions ----'
    txn3 = self.Sell(pfl, tkr1, shares=400, price=322.00, date=date+30*days)
    txn4 = self.Sell(pfl, tkr1, shares=200, price=330.00, date=date+45*days)
    pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
    PrintPosition(pos, with_returns=True)
    PrintTransactions(self.GetTransactions(pos))

    print "---- Modifying first and deleting third ----"
    txn1.transaction_data.shares = '400.0'
    self.UpdateTransaction(txn1)
    self.DeleteTransaction(txn3)
    pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
    PrintPosition(pos, with_returns=True)
    PrintTransactions(self.GetTransactions(pos))

    print "---- Deleting position ----"
    print "Number of positions (before):", len(self.GetPositions(pfl))
    self.DeletePosition(pos)
    print "Number of positions (after) :", len(self.GetPositions(pfl))

    print '---- Deleting portfolio ----'
    self.DeletePortfolio(pfl)
    PrintPortfolios(self.GetPortfolios())


if __name__ == '__main__':
  try:
    email = sys.argv[1]
    password = sys.argv[2]
    cases = sys.argv[3:]
  except IndexError:
    print "Usage: test_finance account@google.com password [0 1 2...]"
    sys.exit(1)

  tester = FinanceTester(email, password)
  tests = [
      tester.TestShowDetails,
      lambda: tester.TestShowDetails(with_returns=True),
      tester.TestManagePortfolios,
      tester.TestManageTransactions,
      lambda: tester.TestShowDetails(with_returns=True, inline_positions=True),
      lambda: tester.TestShowDetails(with_returns=True, inline_positions=True,
          inline_transactions=True),]
  if not cases:
    cases = range(len(tests))
  for i in cases:
    print "===== TEST CASE", i, "="*50
    tests[int(i)]()